From dd04df33f6bc7741914f26e44c80264f3e1e0031 Mon Sep 17 00:00:00 2001 From: Naim A <227396+naim94a@users.noreply.github.com> Date: Mon, 11 Jun 2018 22:46:57 +0300 Subject: [PATCH] WIP: only announce implemented... --- Cargo.toml | 3 + README.md | 25 +++++ src/main.rs | 21 ++++ src/server.rs | 300 +++++++++++++++++++++++++++++++++++++++++++++++++ src/tracker.rs | 149 ++++++++++++++++++++++++ 5 files changed, 498 insertions(+) create mode 100644 README.md create mode 100644 src/main.rs create mode 100644 src/server.rs create mode 100644 src/tracker.rs diff --git a/Cargo.toml b/Cargo.toml index d7cbb05..5da04ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" authors = ["Naim A. <227396+naim94a@users.noreply.github.com>"] description = "High performance torrent tracker" +[profile.release] +lto = true + [dependencies] serde = "1.0.66" bincode = "1.0.0" diff --git a/README.md b/README.md new file mode 100644 index 0000000..2464c7e --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# UDPT +_UDPT_ is a UDP based torrent tracker which fully implements [BEP-15](http://www.bittorrent.org/beps/bep_0015.html) & [BEP-41](http://www.bittorrent.org/beps/bep_0041.html). + +This project was written in Rust, it is a complete rewrite of a previous C/C++ UDPT project (which is still currently available in the `master` branch of the repository). + +## Features +* UDP torrent tracking server +* In memory database +* Choice of Dynamic/Static/Private tracker modes +* Ability to block a torrent from being tracked +* HTTP REST API for management +* Logging +* Windows Service or Linux/Unix daemon + +## Getting started +This rewrite is currently still under development and shouldn't be used at the moment. +If you'd like to contribute in making everything in the "Features" list come true, please feel free to submit a pull-request. + +Since we are using Rust, getting started is fairly easy: +```commandline +git clone https://github.com/naim94a/udpt.git +cd udpt +git checkout udpt-rs +cargo build +``` diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ef289a9 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,21 @@ +extern crate bincode; +extern crate serde; +#[macro_use] extern crate serde_derive; + +mod server; +mod tracker; + +fn main() { + let mut tracker = tracker::TorrentTracker::new(); + + let addr = "0.0.0.0:1212"; + let mut s = server::UDPTracker::new(addr, &mut tracker).unwrap(); + loop { + match s.accept_packet() { + Err(e) => { + println!("error: {}", e); + }, + Ok(_) => {}, + } + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..f8daa67 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,300 @@ +use std; +use std::net::{SocketAddr, UdpSocket}; + +use bincode; +use serde::{Serialize, Deserialize, Serializer, Deserializer}; + +use tracker; + +// 2000 should be enough, MTU is usually 1500 +const MAX_PACKET_SIZE: usize = 2000; + +// protocol contants +const PROTOCOL_ID: u64 = 0x0000041727101980; + +#[repr(u32)] +#[derive(Serialize, Deserialize)] +enum Actions { + Connect = 0, + Announce = 1, + Scrape = 2, + Error = 3, +} + +#[repr(u32)] +#[derive(Serialize, Deserialize)] +pub enum Events { + None = 0, + Complete = 1, + Started = 2, + Stopped = 3, +} + +fn pack(data: &T) -> Option> { + let mut bo = bincode::config(); + bo.big_endian(); + + match bo.serialize(data) { + Ok(bytes) => Some(bytes), + Err(_) => None, + } +} + +fn unpack<'a, T: Deserialize<'a>>(data: &'a [u8]) -> Option { + let mut bo = bincode::config(); + bo.big_endian(); + + match bo.deserialize(data) { + Ok(obj) => Some(obj), + Err(_) => None, + } +} + +#[derive(Serialize, Deserialize)] +struct UDPRequestHeader { + connection_id: u64, + action: Actions, + transaction_id: u32, +} + +#[derive(Serialize, Deserialize)] +struct UDPResponseHeader { + action: Actions, + transaction_id: u32, +} + +#[derive(Serialize, Deserialize)] +struct UDPConnectionResponse { + header: UDPResponseHeader, + connection_id: u64, +} + +#[derive(Serialize, Deserialize)] +struct UDPAnnounceRequest { + header: UDPRequestHeader, + + info_hash: [u8; 20], + peer_id: [u8; 20], + downloaded: u64, + left: u64, + uploaded: u64, + event: Events, + ip_address: u32, + key: u32, + num_want: i32, + port: u16, +} + +#[derive(Serialize, Deserialize)] +struct UDPAnnounceResponse { + header: UDPResponseHeader, + + interval: u32, + leechers: u32, + seeders: u32, +} + +pub struct UDPTracker<'a> { + server: std::net::UdpSocket, + tracker: &'a mut tracker::TorrentTracker, +} + +impl<'a> UDPTracker<'a> { + pub fn new(bind_address: T, tracker: &mut tracker::TorrentTracker) -> Result { + let server = match UdpSocket::bind(bind_address) { + Ok(s) => s, + Err(e) => { + return Err(e); + } + }; + + Ok(UDPTracker{ + server, + tracker, + }) + } + + fn handle_packet(&mut self, remote_address: &SocketAddr, payload: &[u8]) { + let header : UDPRequestHeader = match unpack(payload) { + Some(val) => val, + None => { + return; + } + }; + + match header.action { + Actions::Connect => self.handle_connect(remote_address, &header, payload), + Actions::Announce => self.handle_announce(remote_address, &header, payload), + Actions::Scrape => self.handle_scrape(remote_address, &header, payload), + _ => { + // someone is playing around... ignore request. + return; + } + } + } + + fn handle_connect(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) { + if header.connection_id != PROTOCOL_ID { + return; + } + + // send response... + let conn_id = self.get_connection_id(remote_addr); + + let response = UDPConnectionResponse{ + header: UDPResponseHeader{ + transaction_id: header.transaction_id, + action: Actions::Connect, + }, + connection_id: conn_id, + }; + + if let Some(payload) = pack(&response) { + let _ = self.send_packet(remote_addr, payload.as_slice()); + } + } + + fn handle_announce(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { + if header.connection_id != self.get_connection_id(remote_addr) { + return; + } + + let announce_packet: Option = unpack(payload); + match announce_packet { + Some(packet) => { + let plen = bincode::serialized_size(&packet).unwrap() as usize; + + println!("payload len={}, announce len={}", payload.len(), plen); + + if payload.len() > plen { + let bep41_payload = &payload[std::mem::size_of::()..]; + println!("bep41: {:?}", bep41_payload); + } + + if packet.ip_address != 0 { + // TODO: allow configurability of ip address + // for now, ignore request. + return; + } + + let client_addr = SocketAddr::new(remote_addr.ip(), packet.port); + + self.tracker.update_torrent_peer(&packet.info_hash, &packet.peer_id, &client_addr, packet.uploaded, packet.downloaded, packet.left, packet.event); + + let stats = self.tracker.get_stats(&packet.info_hash).unwrap_or((0, 0, 0)); + let results = self.tracker.get_peers(&packet.info_hash, &client_addr); + + if let Some(mut payload) = pack(&UDPAnnounceResponse{ + header: UDPResponseHeader{ + action: Actions::Announce, + transaction_id: packet.header.transaction_id, + }, + seeders: stats.0 as u32, + interval: 20, + leechers: stats.2 as u32, + }) { + for peer in results { + match client_addr { + SocketAddr::V4(ipv4) => { + payload.extend(&ipv4.ip().octets()); + }, + SocketAddr::V6(ipv6) => { + payload.extend(&ipv6.ip().octets()); + } + }; + + let port_hton = client_addr.port().to_be(); + payload.extend(&[ (port_hton & 0xff) as u8, ((port_hton >> 8) & 0xff) as u8 ]); + } + + } else { + self.send_error(&remote_addr, &packet.header, "internal error"); + } + }, + None => { + return; + } + } + } + + fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { + if header.connection_id != self.get_connection_id(remote_addr) { + return; + } + + self.send_error(remote_addr, header, "scrape not yet implemented"); + } + + fn get_connection_id(&self, remote_address: &SocketAddr) -> u64 { + match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(duration) => { + (duration.as_secs() / 3600) | ((remote_address.port() as u64) << 36) + }, + Err(_) => { + 0x8000000000000000 + } + } + } + + fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result { + self.server.send_to(payload, remote_addr) + } + + fn send_error(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) { + if let Some(mut payload) = pack(&UDPResponseHeader{ + transaction_id: header.transaction_id, + action: Actions::Error, + }) { + let msg_bytes = Vec::from(error_msg.as_bytes()); + payload.extend(msg_bytes); + + let _ = self.send_packet(remote_addr, payload.as_slice()); + } + } + + pub fn accept_packet(&mut self) -> Result<(), std::io::Error> { + let mut packet = [0u8; MAX_PACKET_SIZE]; + match self.server.recv_from(&mut packet) { + Ok((size, remote_address)) => { + self.handle_packet(&remote_address, &packet[..size]); + + Ok(()) + }, + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn pack() { + let mystruct = super::UDPRequestHeader { + connection_id: 200, + action: super::Actions::Connect, + transaction_id: 77771, + }; + match super::pack(&mystruct) { + Some(data) => { + println!("serialized data = [{}, {:?}]", data.len(), data); + }, + None => { + assert!(false); + } + }; + } + + #[test] + fn unpack() { + let buf = [0u8, 0, 0, 0, 0, 0, 0, 200, 0, 0, 0, 1, 0, 1, 47, 203]; + match super::unpack(&buf) { + Some(obj) => { + let x : super::UDPResponseHeader = obj; + println!("conn_id={}", x.action as u32); + }, + None => { + assert!(false); + } + } + } +} diff --git a/src/tracker.rs b/src/tracker.rs new file mode 100644 index 0000000..0090d83 --- /dev/null +++ b/src/tracker.rs @@ -0,0 +1,149 @@ +use std; + +use server::Events; + +pub enum TrackerMode { + + /// In static mode torrents are tracked only if they were added ahead of time. + StaticMode, + + /// In dynamic mode, torrents are tracked being added ahead of time. + DynamicMode, + + /// Tracker will only serve authenticated peers. + PrivateMode, +} + +struct TorrentPeer { + ip: std::net::SocketAddr, + last_connection_id: u64, + uploaded: u64, + downloaded: u64, + left: u64, + event: Events, + updated: std::time::SystemTime, +} + +type PeerId = [u8; 20]; +type InfoHash = [u8; 20]; + +struct TorrentEntry { + is_flagged: bool, + peers: std::collections::BTreeMap, +} + +struct TorrentDatabase { + torrent_peers: std::collections::BTreeMap, +} + +pub struct TorrentTracker { + mode: TrackerMode, + database: TorrentDatabase, +} + +impl TorrentTracker { + pub fn new() -> TorrentTracker { + TorrentTracker{ + mode: TrackerMode::DynamicMode, + database: TorrentDatabase{ + torrent_peers: std::collections::BTreeMap::new(), + } + } + } + + /// Adding torrents is not relevant to dynamic trackers. + pub fn add_torrent(&mut self, info_hash: &InfoHash) { + use std::collections::BTreeMap; + self.database.torrent_peers.entry(*info_hash).or_insert(TorrentEntry{ + is_flagged: false, + peers: std::collections::BTreeMap::new(), + }); + } + + /// If the torrent is flagged, it will not be removed unless force is set to true. + pub fn remove_torrent(&mut self, info_hash: &InfoHash, force: bool) { + if !force { + if let Some(entry) = self.database.torrent_peers.get(info_hash) { + if entry.is_flagged { + // torrent is flagged, ignore request. + return; + } + } else { + // torrent not found, no point looking for it again... + return; + } + } + self.database.torrent_peers.remove(info_hash); + } + + /// flagged torrents will result in a tracking error. This is to allow enforcement against piracy. + pub fn set_torrent_flag(&mut self, info_hash: &InfoHash, is_flagged: bool) { + if let Some(mut entry) = self.database.torrent_peers.get_mut(info_hash) { + if is_flagged && !entry.is_flagged { + // empty peer list. + entry.peers.clear(); + } + entry.is_flagged = is_flagged; + } + } + + pub fn update_torrent_peer(&mut self, info_hash: &InfoHash, peer_id: &PeerId, remote_address: &std::net::SocketAddr, uploaded: u64, downloaded: u64, left: u64, event: Events) { + if let Some(mut torrent_entry) = self.database.torrent_peers.get_mut(info_hash) { + torrent_entry.peers.insert(*peer_id, TorrentPeer{ + updated: std::time::SystemTime::now(), + left, + downloaded, + uploaded, + ip: *remote_address, + event, + last_connection_id: 0, + }); + } + } + + /// returns a list of peers with the same type of address of the remote_addr (IP v4/v6) + pub fn get_peers(&self, info_hash: &InfoHash, remote_addr: &std::net::SocketAddr) -> Vec { + let mut list = Vec::new(); + if let Some(entry) = self.database.torrent_peers.get(info_hash) { + for (_, peer) in entry.peers.iter().filter(|e| e.1.ip.is_ipv4() == remote_addr.is_ipv4()) { + if peer.ip == *remote_addr { + continue; + } + + list.push(peer.ip); + + if list.len() >= 74 { + // 74 is maximum peers supported by the protocol. + break; + } + } + } + list + } + + pub fn get_stats(&self, info_hash: &InfoHash) -> Option<(i32, i32, i32)> { + if let Some(torrent_entry) = self.database.torrent_peers.get(info_hash) { + + // TODO: store stats in temporary location... + let mut seeders = 0; + let mut leechers = 0; + + for (_, peer) in torrent_entry.peers.iter() { + if peer.left == 0 { + seeders += 1; + } + else { + leechers += 1; + } + } + + Some((seeders, -1, leechers)) + } else { + None + } + } + + pub fn cleanup(&mut self) { + + } +}