WIP: only announce implemented...

This commit is contained in:
Naim A 2018-06-11 22:46:57 +03:00
parent 2269e939ef
commit dd04df33f6
No known key found for this signature in database
GPG key ID: FD7948915D9EF8B9
5 changed files with 498 additions and 0 deletions

View file

@ -4,6 +4,9 @@ version = "0.1.0"
authors = ["Naim A. <227396+naim94a@users.noreply.github.com>"] authors = ["Naim A. <227396+naim94a@users.noreply.github.com>"]
description = "High performance torrent tracker" description = "High performance torrent tracker"
[profile.release]
lto = true
[dependencies] [dependencies]
serde = "1.0.66" serde = "1.0.66"
bincode = "1.0.0" bincode = "1.0.0"

25
README.md Normal file
View file

@ -0,0 +1,25 @@
# UDPT
_UDPT_ is a UDP based torrent tracker which fully implements [BEP-15](http://www.bittorrent.org/beps/bep_0015.html) & [BEP-41](http://www.bittorrent.org/beps/bep_0041.html).
This project was written in Rust, it is a complete rewrite of a previous C/C++ UDPT project (which is still currently available in the `master` branch of the repository).
## Features
* UDP torrent tracking server
* In memory database
* Choice of Dynamic/Static/Private tracker modes
* Ability to block a torrent from being tracked
* HTTP REST API for management
* Logging
* Windows Service or Linux/Unix daemon
## Getting started
This rewrite is currently still under development and shouldn't be used at the moment.
If you'd like to contribute in making everything in the "Features" list come true, please feel free to submit a pull-request.
Since we are using Rust, getting started is fairly easy:
```commandline
git clone https://github.com/naim94a/udpt.git
cd udpt
git checkout udpt-rs
cargo build
```

21
src/main.rs Normal file
View file

@ -0,0 +1,21 @@
extern crate bincode;
extern crate serde;
#[macro_use] extern crate serde_derive;
mod server;
mod tracker;
fn main() {
let mut tracker = tracker::TorrentTracker::new();
let addr = "0.0.0.0:1212";
let mut s = server::UDPTracker::new(addr, &mut tracker).unwrap();
loop {
match s.accept_packet() {
Err(e) => {
println!("error: {}", e);
},
Ok(_) => {},
}
}
}

300
src/server.rs Normal file
View file

@ -0,0 +1,300 @@
use std;
use std::net::{SocketAddr, UdpSocket};
use bincode;
use serde::{Serialize, Deserialize, Serializer, Deserializer};
use tracker;
// 2000 should be enough, MTU is usually 1500
const MAX_PACKET_SIZE: usize = 2000;
// protocol contants
const PROTOCOL_ID: u64 = 0x0000041727101980;
#[repr(u32)]
#[derive(Serialize, Deserialize)]
enum Actions {
Connect = 0,
Announce = 1,
Scrape = 2,
Error = 3,
}
#[repr(u32)]
#[derive(Serialize, Deserialize)]
pub enum Events {
None = 0,
Complete = 1,
Started = 2,
Stopped = 3,
}
fn pack<T: Serialize>(data: &T) -> Option<Vec<u8>> {
let mut bo = bincode::config();
bo.big_endian();
match bo.serialize(data) {
Ok(bytes) => Some(bytes),
Err(_) => None,
}
}
fn unpack<'a, T: Deserialize<'a>>(data: &'a [u8]) -> Option<T> {
let mut bo = bincode::config();
bo.big_endian();
match bo.deserialize(data) {
Ok(obj) => Some(obj),
Err(_) => None,
}
}
#[derive(Serialize, Deserialize)]
struct UDPRequestHeader {
connection_id: u64,
action: Actions,
transaction_id: u32,
}
#[derive(Serialize, Deserialize)]
struct UDPResponseHeader {
action: Actions,
transaction_id: u32,
}
#[derive(Serialize, Deserialize)]
struct UDPConnectionResponse {
header: UDPResponseHeader,
connection_id: u64,
}
#[derive(Serialize, Deserialize)]
struct UDPAnnounceRequest {
header: UDPRequestHeader,
info_hash: [u8; 20],
peer_id: [u8; 20],
downloaded: u64,
left: u64,
uploaded: u64,
event: Events,
ip_address: u32,
key: u32,
num_want: i32,
port: u16,
}
#[derive(Serialize, Deserialize)]
struct UDPAnnounceResponse {
header: UDPResponseHeader,
interval: u32,
leechers: u32,
seeders: u32,
}
pub struct UDPTracker<'a> {
server: std::net::UdpSocket,
tracker: &'a mut tracker::TorrentTracker,
}
impl<'a> UDPTracker<'a> {
pub fn new<T: std::net::ToSocketAddrs>(bind_address: T, tracker: &mut tracker::TorrentTracker) -> Result<UDPTracker, std::io::Error> {
let server = match UdpSocket::bind(bind_address) {
Ok(s) => s,
Err(e) => {
return Err(e);
}
};
Ok(UDPTracker{
server,
tracker,
})
}
fn handle_packet(&mut self, remote_address: &SocketAddr, payload: &[u8]) {
let header : UDPRequestHeader = match unpack(payload) {
Some(val) => val,
None => {
return;
}
};
match header.action {
Actions::Connect => self.handle_connect(remote_address, &header, payload),
Actions::Announce => self.handle_announce(remote_address, &header, payload),
Actions::Scrape => self.handle_scrape(remote_address, &header, payload),
_ => {
// someone is playing around... ignore request.
return;
}
}
}
fn handle_connect(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) {
if header.connection_id != PROTOCOL_ID {
return;
}
// send response...
let conn_id = self.get_connection_id(remote_addr);
let response = UDPConnectionResponse{
header: UDPResponseHeader{
transaction_id: header.transaction_id,
action: Actions::Connect,
},
connection_id: conn_id,
};
if let Some(payload) = pack(&response) {
let _ = self.send_packet(remote_addr, payload.as_slice());
}
}
fn handle_announce(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
if header.connection_id != self.get_connection_id(remote_addr) {
return;
}
let announce_packet: Option<UDPAnnounceRequest> = unpack(payload);
match announce_packet {
Some(packet) => {
let plen = bincode::serialized_size(&packet).unwrap() as usize;
println!("payload len={}, announce len={}", payload.len(), plen);
if payload.len() > plen {
let bep41_payload = &payload[std::mem::size_of::<UDPAnnounceRequest>()..];
println!("bep41: {:?}", bep41_payload);
}
if packet.ip_address != 0 {
// TODO: allow configurability of ip address
// for now, ignore request.
return;
}
let client_addr = SocketAddr::new(remote_addr.ip(), packet.port);
self.tracker.update_torrent_peer(&packet.info_hash, &packet.peer_id, &client_addr, packet.uploaded, packet.downloaded, packet.left, packet.event);
let stats = self.tracker.get_stats(&packet.info_hash).unwrap_or((0, 0, 0));
let results = self.tracker.get_peers(&packet.info_hash, &client_addr);
if let Some(mut payload) = pack(&UDPAnnounceResponse{
header: UDPResponseHeader{
action: Actions::Announce,
transaction_id: packet.header.transaction_id,
},
seeders: stats.0 as u32,
interval: 20,
leechers: stats.2 as u32,
}) {
for peer in results {
match client_addr {
SocketAddr::V4(ipv4) => {
payload.extend(&ipv4.ip().octets());
},
SocketAddr::V6(ipv6) => {
payload.extend(&ipv6.ip().octets());
}
};
let port_hton = client_addr.port().to_be();
payload.extend(&[ (port_hton & 0xff) as u8, ((port_hton >> 8) & 0xff) as u8 ]);
}
} else {
self.send_error(&remote_addr, &packet.header, "internal error");
}
},
None => {
return;
}
}
}
fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
if header.connection_id != self.get_connection_id(remote_addr) {
return;
}
self.send_error(remote_addr, header, "scrape not yet implemented");
}
fn get_connection_id(&self, remote_address: &SocketAddr) -> u64 {
match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
Ok(duration) => {
(duration.as_secs() / 3600) | ((remote_address.port() as u64) << 36)
},
Err(_) => {
0x8000000000000000
}
}
}
fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result<usize, std::io::Error> {
self.server.send_to(payload, remote_addr)
}
fn send_error(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) {
if let Some(mut payload) = pack(&UDPResponseHeader{
transaction_id: header.transaction_id,
action: Actions::Error,
}) {
let msg_bytes = Vec::from(error_msg.as_bytes());
payload.extend(msg_bytes);
let _ = self.send_packet(remote_addr, payload.as_slice());
}
}
pub fn accept_packet(&mut self) -> Result<(), std::io::Error> {
let mut packet = [0u8; MAX_PACKET_SIZE];
match self.server.recv_from(&mut packet) {
Ok((size, remote_address)) => {
self.handle_packet(&remote_address, &packet[..size]);
Ok(())
},
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn pack() {
let mystruct = super::UDPRequestHeader {
connection_id: 200,
action: super::Actions::Connect,
transaction_id: 77771,
};
match super::pack(&mystruct) {
Some(data) => {
println!("serialized data = [{}, {:?}]", data.len(), data);
},
None => {
assert!(false);
}
};
}
#[test]
fn unpack() {
let buf = [0u8, 0, 0, 0, 0, 0, 0, 200, 0, 0, 0, 1, 0, 1, 47, 203];
match super::unpack(&buf) {
Some(obj) => {
let x : super::UDPResponseHeader = obj;
println!("conn_id={}", x.action as u32);
},
None => {
assert!(false);
}
}
}
}

149
src/tracker.rs Normal file
View file

@ -0,0 +1,149 @@
use std;
use server::Events;
pub enum TrackerMode {
/// In static mode torrents are tracked only if they were added ahead of time.
StaticMode,
/// In dynamic mode, torrents are tracked being added ahead of time.
DynamicMode,
/// Tracker will only serve authenticated peers.
PrivateMode,
}
struct TorrentPeer {
ip: std::net::SocketAddr,
last_connection_id: u64,
uploaded: u64,
downloaded: u64,
left: u64,
event: Events,
updated: std::time::SystemTime,
}
type PeerId = [u8; 20];
type InfoHash = [u8; 20];
struct TorrentEntry {
is_flagged: bool,
peers: std::collections::BTreeMap<PeerId, TorrentPeer>,
}
struct TorrentDatabase {
torrent_peers: std::collections::BTreeMap<InfoHash, TorrentEntry>,
}
pub struct TorrentTracker {
mode: TrackerMode,
database: TorrentDatabase,
}
impl TorrentTracker {
pub fn new() -> TorrentTracker {
TorrentTracker{
mode: TrackerMode::DynamicMode,
database: TorrentDatabase{
torrent_peers: std::collections::BTreeMap::new(),
}
}
}
/// Adding torrents is not relevant to dynamic trackers.
pub fn add_torrent(&mut self, info_hash: &InfoHash) {
use std::collections::BTreeMap;
self.database.torrent_peers.entry(*info_hash).or_insert(TorrentEntry{
is_flagged: false,
peers: std::collections::BTreeMap::new(),
});
}
/// If the torrent is flagged, it will not be removed unless force is set to true.
pub fn remove_torrent(&mut self, info_hash: &InfoHash, force: bool) {
if !force {
if let Some(entry) = self.database.torrent_peers.get(info_hash) {
if entry.is_flagged {
// torrent is flagged, ignore request.
return;
}
} else {
// torrent not found, no point looking for it again...
return;
}
}
self.database.torrent_peers.remove(info_hash);
}
/// flagged torrents will result in a tracking error. This is to allow enforcement against piracy.
pub fn set_torrent_flag(&mut self, info_hash: &InfoHash, is_flagged: bool) {
if let Some(mut entry) = self.database.torrent_peers.get_mut(info_hash) {
if is_flagged && !entry.is_flagged {
// empty peer list.
entry.peers.clear();
}
entry.is_flagged = is_flagged;
}
}
pub fn update_torrent_peer(&mut self, info_hash: &InfoHash, peer_id: &PeerId, remote_address: &std::net::SocketAddr, uploaded: u64, downloaded: u64, left: u64, event: Events) {
if let Some(mut torrent_entry) = self.database.torrent_peers.get_mut(info_hash) {
torrent_entry.peers.insert(*peer_id, TorrentPeer{
updated: std::time::SystemTime::now(),
left,
downloaded,
uploaded,
ip: *remote_address,
event,
last_connection_id: 0,
});
}
}
/// returns a list of peers with the same type of address of the remote_addr (IP v4/v6)
pub fn get_peers(&self, info_hash: &InfoHash, remote_addr: &std::net::SocketAddr) -> Vec<std::net::SocketAddr> {
let mut list = Vec::new();
if let Some(entry) = self.database.torrent_peers.get(info_hash) {
for (_, peer) in entry.peers.iter().filter(|e| e.1.ip.is_ipv4() == remote_addr.is_ipv4()) {
if peer.ip == *remote_addr {
continue;
}
list.push(peer.ip);
if list.len() >= 74 {
// 74 is maximum peers supported by the protocol.
break;
}
}
}
list
}
pub fn get_stats(&self, info_hash: &InfoHash) -> Option<(i32, i32, i32)> {
if let Some(torrent_entry) = self.database.torrent_peers.get(info_hash) {
// TODO: store stats in temporary location...
let mut seeders = 0;
let mut leechers = 0;
for (_, peer) in torrent_entry.peers.iter() {
if peer.left == 0 {
seeders += 1;
}
else {
leechers += 1;
}
}
Some((seeders, -1, leechers))
} else {
None
}
}
pub fn cleanup(&mut self) {
}
}