WIP: announce and DB better implementation
This commit is contained in:
parent
dd04df33f6
commit
53eb629f50
|
@ -2,7 +2,7 @@ use std;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
|
||||||
use bincode;
|
use bincode;
|
||||||
use serde::{Serialize, Deserialize, Serializer, Deserializer};
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
use tracker;
|
use tracker;
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ enum Actions {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, Clone, Copy)]
|
||||||
pub enum Events {
|
pub enum Events {
|
||||||
None = 0,
|
None = 0,
|
||||||
Complete = 1,
|
Complete = 1,
|
||||||
|
@ -159,9 +159,13 @@ impl<'a> UDPTracker<'a> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let announce_packet: Option<UDPAnnounceRequest> = unpack(payload);
|
let packet: UDPAnnounceRequest = match unpack(payload) {
|
||||||
match announce_packet {
|
Some(v) => v,
|
||||||
Some(packet) => {
|
None => {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let plen = bincode::serialized_size(&packet).unwrap() as usize;
|
let plen = bincode::serialized_size(&packet).unwrap() as usize;
|
||||||
|
|
||||||
println!("payload len={}, announce len={}", payload.len(), plen);
|
println!("payload len={}, announce len={}", payload.len(), plen);
|
||||||
|
@ -178,14 +182,16 @@ impl<'a> UDPTracker<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let client_addr = SocketAddr::new(remote_addr.ip(), packet.port);
|
let client_addr = SocketAddr::new(remote_addr.ip(), packet.port);
|
||||||
|
match self.tracker.get_torrent(&packet.info_hash, |torrent: &mut tracker::TorrentEntry | -> Result<Vec<u8>, &str> {
|
||||||
|
if torrent.is_flagged() {
|
||||||
|
return Err("Torrent has been flagged.");
|
||||||
|
}
|
||||||
|
torrent.update_peer(&packet.peer_id, &client_addr, packet.uploaded, packet.downloaded, packet.left, packet.event);
|
||||||
|
|
||||||
self.tracker.update_torrent_peer(&packet.info_hash, &packet.peer_id, &client_addr, packet.uploaded, packet.downloaded, packet.left, packet.event);
|
let stats = torrent.get_stats();
|
||||||
|
let peers = torrent.get_peers(&client_addr);
|
||||||
let stats = self.tracker.get_stats(&packet.info_hash).unwrap_or((0, 0, 0));
|
if let Some(mut payload) = pack(&UDPAnnounceResponse {
|
||||||
let results = self.tracker.get_peers(&packet.info_hash, &client_addr);
|
header: UDPResponseHeader {
|
||||||
|
|
||||||
if let Some(mut payload) = pack(&UDPAnnounceResponse{
|
|
||||||
header: UDPResponseHeader{
|
|
||||||
action: Actions::Announce,
|
action: Actions::Announce,
|
||||||
transaction_id: packet.header.transaction_id,
|
transaction_id: packet.header.transaction_id,
|
||||||
},
|
},
|
||||||
|
@ -193,8 +199,8 @@ impl<'a> UDPTracker<'a> {
|
||||||
interval: 20,
|
interval: 20,
|
||||||
leechers: stats.2 as u32,
|
leechers: stats.2 as u32,
|
||||||
}) {
|
}) {
|
||||||
for peer in results {
|
for peer in peers {
|
||||||
match client_addr {
|
match peer {
|
||||||
SocketAddr::V4(ipv4) => {
|
SocketAddr::V4(ipv4) => {
|
||||||
payload.extend(&ipv4.ip().octets());
|
payload.extend(&ipv4.ip().octets());
|
||||||
},
|
},
|
||||||
|
@ -204,17 +210,68 @@ impl<'a> UDPTracker<'a> {
|
||||||
};
|
};
|
||||||
|
|
||||||
let port_hton = client_addr.port().to_be();
|
let port_hton = client_addr.port().to_be();
|
||||||
payload.extend(&[ (port_hton & 0xff) as u8, ((port_hton >> 8) & 0xff) as u8 ]);
|
payload.extend(&[(port_hton & 0xff) as u8, ((port_hton >> 8) & 0xff) as u8]);
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
return Ok(payload);
|
||||||
self.send_error(&remote_addr, &packet.header, "internal error");
|
}
|
||||||
|
}
|
||||||
|
return Err("");
|
||||||
|
}) {
|
||||||
|
Some(error_message) => {
|
||||||
|
match error_message {
|
||||||
|
Err(err) => {
|
||||||
|
if err.len() > 0 {
|
||||||
|
self.send_error(remote_addr, header, err);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ok(payload) => {
|
||||||
|
// send packet...
|
||||||
|
let _ = self.send_packet(remote_addr, payload.as_slice());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
|
self.send_error(remote_addr, header, "Unregistered torrent");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
/*
|
||||||
|
if torrent.is_flagged() {
|
||||||
|
error = Some("Torrent was flagged");
|
||||||
|
} else {
|
||||||
|
torrent.update_peer(&packet.peer_id, &client_addr, packet.uploaded, packet.downloaded, packet.left, packet.event);
|
||||||
|
|
||||||
|
let stats = torrent.get_stats();
|
||||||
|
let peers = torrent.get_peers(&client_addr);
|
||||||
|
|
||||||
|
if let Some(mut payload) = pack(&UDPAnnounceResponse {
|
||||||
|
header: UDPResponseHeader {
|
||||||
|
action: Actions::Announce,
|
||||||
|
transaction_id: packet.header.transaction_id,
|
||||||
|
},
|
||||||
|
seeders: stats.0 as u32,
|
||||||
|
interval: 20,
|
||||||
|
leechers: stats.2 as u32,
|
||||||
|
}) {
|
||||||
|
for peer in peers {
|
||||||
|
match peer {
|
||||||
|
SocketAddr::V4(ipv4) => {
|
||||||
|
payload.extend(&ipv4.ip().octets());
|
||||||
|
},
|
||||||
|
SocketAddr::V6(ipv6) => {
|
||||||
|
payload.extend(&ipv6.ip().octets());
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let port_hton = client_addr.port().to_be();
|
||||||
|
payload.extend(&[(port_hton & 0xff) as u8, ((port_hton >> 8) & 0xff) as u8]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(error_message) = error {
|
||||||
|
self.send_error(remote_addr, &packet.header, error_message);
|
||||||
|
}*/
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
|
fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
|
||||||
|
|
130
src/tracker.rs
130
src/tracker.rs
|
@ -16,7 +16,6 @@ pub enum TrackerMode {
|
||||||
|
|
||||||
struct TorrentPeer {
|
struct TorrentPeer {
|
||||||
ip: std::net::SocketAddr,
|
ip: std::net::SocketAddr,
|
||||||
last_connection_id: u64,
|
|
||||||
uploaded: u64,
|
uploaded: u64,
|
||||||
downloaded: u64,
|
downloaded: u64,
|
||||||
left: u64,
|
left: u64,
|
||||||
|
@ -27,9 +26,71 @@ struct TorrentPeer {
|
||||||
type PeerId = [u8; 20];
|
type PeerId = [u8; 20];
|
||||||
type InfoHash = [u8; 20];
|
type InfoHash = [u8; 20];
|
||||||
|
|
||||||
struct TorrentEntry {
|
pub struct TorrentEntry {
|
||||||
is_flagged: bool,
|
is_flagged: bool,
|
||||||
peers: std::collections::BTreeMap<PeerId, TorrentPeer>,
|
peers: std::collections::BTreeMap<PeerId, TorrentPeer>,
|
||||||
|
|
||||||
|
completed: u32,
|
||||||
|
seeders: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TorrentEntry {
|
||||||
|
pub fn is_flagged(&self) -> bool {
|
||||||
|
self.is_flagged
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_peer(&mut self, peer_id: &PeerId, remote_address: &std::net::SocketAddr, uploaded: u64, downloaded: u64, left: u64, event: Events) {
|
||||||
|
let is_seeder = left == 0 && uploaded > 0;
|
||||||
|
let mut was_seeder = false;
|
||||||
|
let mut is_completed = left == 0 && (event as u32) == (Events::Complete as u32);
|
||||||
|
if let Some(prev) = self.peers.insert(*peer_id, TorrentPeer{
|
||||||
|
updated: std::time::SystemTime::now(),
|
||||||
|
left,
|
||||||
|
downloaded,
|
||||||
|
uploaded,
|
||||||
|
ip: *remote_address,
|
||||||
|
event,
|
||||||
|
}) {
|
||||||
|
was_seeder = prev.left == 0 && prev.uploaded > 0;
|
||||||
|
|
||||||
|
if is_completed && (prev.event as u32) == (Events::Complete as u32) {
|
||||||
|
// don't update count again. a torrent should only be updated once per peer.
|
||||||
|
is_completed = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_seeder && !was_seeder {
|
||||||
|
self.seeders += 1;
|
||||||
|
} else if was_seeder && !is_seeder {
|
||||||
|
self.seeders -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if is_completed {
|
||||||
|
self.completed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_peers(&self, remote_addr: &std::net::SocketAddr) -> Vec<std::net::SocketAddr> {
|
||||||
|
let mut list = Vec::new();
|
||||||
|
for (_, peer) in self.peers.iter().filter(|e| e.1.ip.is_ipv4() == remote_addr.is_ipv4()) {
|
||||||
|
if peer.ip == *remote_addr {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
list.push(peer.ip);
|
||||||
|
|
||||||
|
if list.len() >= 74 {
|
||||||
|
// 74 is maximum peers supported by the protocol.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
list
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_stats(&self) -> (u32, u32, u32) {
|
||||||
|
let leechers = (self.peers.len() as u32) - self.seeders;
|
||||||
|
(self.seeders, self.completed, leechers)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TorrentDatabase {
|
struct TorrentDatabase {
|
||||||
|
@ -53,10 +114,11 @@ impl TorrentTracker {
|
||||||
|
|
||||||
/// Adding torrents is not relevant to dynamic trackers.
|
/// Adding torrents is not relevant to dynamic trackers.
|
||||||
pub fn add_torrent(&mut self, info_hash: &InfoHash) {
|
pub fn add_torrent(&mut self, info_hash: &InfoHash) {
|
||||||
use std::collections::BTreeMap;
|
|
||||||
self.database.torrent_peers.entry(*info_hash).or_insert(TorrentEntry{
|
self.database.torrent_peers.entry(*info_hash).or_insert(TorrentEntry{
|
||||||
is_flagged: false,
|
is_flagged: false,
|
||||||
peers: std::collections::BTreeMap::new(),
|
peers: std::collections::BTreeMap::new(),
|
||||||
|
seeders: 0,
|
||||||
|
completed: 0,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,61 +149,21 @@ impl TorrentTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_torrent_peer(&mut self, info_hash: &InfoHash, peer_id: &PeerId, remote_address: &std::net::SocketAddr, uploaded: u64, downloaded: u64, left: u64, event: Events) {
|
pub fn get_torrent<F, R>(&mut self, info_hash: &InfoHash, action: F) -> Option<R>
|
||||||
if let Some(mut torrent_entry) = self.database.torrent_peers.get_mut(info_hash) {
|
where F: Fn(&mut TorrentEntry) -> R
|
||||||
torrent_entry.peers.insert(*peer_id, TorrentPeer{
|
{
|
||||||
updated: std::time::SystemTime::now(),
|
if let Some(torrent_entry) = self.database.torrent_peers.get_mut(info_hash) {
|
||||||
left,
|
Some(action(torrent_entry))
|
||||||
downloaded,
|
|
||||||
uploaded,
|
|
||||||
ip: *remote_address,
|
|
||||||
event,
|
|
||||||
last_connection_id: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// returns a list of peers with the same type of address of the remote_addr (IP v4/v6)
|
|
||||||
pub fn get_peers(&self, info_hash: &InfoHash, remote_addr: &std::net::SocketAddr) -> Vec<std::net::SocketAddr> {
|
|
||||||
let mut list = Vec::new();
|
|
||||||
if let Some(entry) = self.database.torrent_peers.get(info_hash) {
|
|
||||||
for (_, peer) in entry.peers.iter().filter(|e| e.1.ip.is_ipv4() == remote_addr.is_ipv4()) {
|
|
||||||
if peer.ip == *remote_addr {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
list.push(peer.ip);
|
|
||||||
|
|
||||||
if list.len() >= 74 {
|
|
||||||
// 74 is maximum peers supported by the protocol.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
list
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_stats(&self, info_hash: &InfoHash) -> Option<(i32, i32, i32)> {
|
|
||||||
if let Some(torrent_entry) = self.database.torrent_peers.get(info_hash) {
|
|
||||||
|
|
||||||
// TODO: store stats in temporary location...
|
|
||||||
let mut seeders = 0;
|
|
||||||
let mut leechers = 0;
|
|
||||||
|
|
||||||
for (_, peer) in torrent_entry.peers.iter() {
|
|
||||||
if peer.left == 0 {
|
|
||||||
seeders += 1;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
leechers += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((seeders, -1, leechers))
|
|
||||||
} else {
|
} else {
|
||||||
|
match self.mode {
|
||||||
|
TrackerMode::StaticMode => None,
|
||||||
|
TrackerMode::PrivateMode => None,
|
||||||
|
TrackerMode::DynamicMode => {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn cleanup(&mut self) {
|
pub fn cleanup(&mut self) {
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue