add multithreaded udp send with tokio

This commit is contained in:
Naim A 2020-05-13 22:59:29 +03:00
parent ad369d1d22
commit 346aa4bbd0
6 changed files with 149 additions and 96 deletions

139
Cargo.lock generated
View file

@ -47,9 +47,9 @@ checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.12.0" version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3" checksum = "53d1ccbaf7d9ec9537465a97bf19edc1a4e158ecb49fc16178202238c569cc42"
[[package]] [[package]]
name = "binascii" name = "binascii"
@ -145,10 +145,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "clap" name = "chrono"
version = "2.33.0" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
dependencies = [
"num-integer",
"num-traits",
"time",
]
[[package]]
name = "clap"
version = "2.33.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
dependencies = [ dependencies = [
"ansi_term", "ansi_term",
"atty", "atty",
@ -213,9 +224,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -228,9 +239,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -238,15 +249,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba" checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -255,15 +266,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
dependencies = [ dependencies = [
"proc-macro-hack", "proc-macro-hack",
"proc-macro2", "proc-macro2",
@ -273,21 +284,24 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
dependencies = [
"once_cell",
]
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -296,6 +310,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project",
"pin-utils", "pin-utils",
"proc-macro-hack", "proc-macro-hack",
"proc-macro-nested", "proc-macro-nested",
@ -313,9 +328,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.2.4" version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -468,9 +483,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.69" version = "0.2.70"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e85c08494b21a9054e7fe1374a732aeadaff3980b6990b94bfd3a70f690005" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f"
[[package]] [[package]]
name = "log" name = "log"
@ -562,6 +577,25 @@ dependencies = [
"winapi 0.3.8", "winapi 0.3.8",
] ]
[[package]]
name = "num-integer"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.0" version = "1.13.0"
@ -572,6 +606,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "once_cell"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
version = "0.2.3" version = "0.2.3"
@ -586,18 +626,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "0.4.10" version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36e3dcd42688c05a66f841d22c5d8390d9a5d4c9aaf57b9285eae4900a080063" checksum = "81d480cb4e89522ccda96d0eed9af94180b7a5f93fb28f66e1fd7d68431663d1"
dependencies = [ dependencies = [
"pin-project-internal", "pin-project-internal",
] ]
[[package]] [[package]]
name = "pin-project-internal" name = "pin-project-internal"
version = "0.4.10" version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4d7346ac577ff1296e06a418e7618e22655bae834d4970cb6e39d6da8119969" checksum = "a82996f11efccb19b685b14b5df818de31c1edcee3daa256ab5775dd98e72feb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -606,9 +646,9 @@ dependencies = [
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.1.4" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f"
[[package]] [[package]]
name = "pin-utils" name = "pin-utils"
@ -639,9 +679,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.4" version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c1f4b0efa5fc5e8ceb705136bfee52cfdb6a4e3509f770b478cd6ed434232a7" checksum = "42934bc9c8ab0d3b273a16d8551c8f0fcff46be73276ca083ec2414c15c4ba5e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -660,18 +700,18 @@ checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.106" version = "1.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.106" version = "1.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -680,9 +720,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.52" version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd" checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -743,9 +783,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.18" version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "410a7488c0a728c7ceb4ad59b9567eb4053d02e8cc7f5c0e0eeeb39518369213" checksum = "4696caa4048ac7ce2bcd2e484b3cef88c1004e41b8e945a277e2c25dc0b72060"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -773,9 +813,8 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "0.2.20" version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/naim94a/tokio?branch=tokio-with-trysendto#4fb99d0fe2c3deb8dac774edd11316384ebb6667"
checksum = "05c1d570eb1a36f0345a5ce9c6c6e665b70b73d11236912c0b477616aeec47b1"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -797,8 +836,7 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "0.2.5" version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/naim94a/tokio?branch=tokio-with-trysendto#4fb99d0fe2c3deb8dac774edd11316384ebb6667"
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -853,6 +891,7 @@ dependencies = [
"async-compression", "async-compression",
"binascii", "binascii",
"bincode", "bincode",
"chrono",
"clap", "clap",
"fern", "fern",
"futures", "futures",
@ -922,9 +961,9 @@ checksum = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]] [[package]]
name = "version_check" name = "version_check"

View file

@ -16,8 +16,12 @@ tokio = {version = "0.2", features = ["macros", "net", "rt-threaded", "fs", "syn
binascii = "0.1" binascii = "0.1"
toml = "0.5" toml = "0.5"
clap = "2.33" clap = "2.33"
log = "0.4" log = {version = "0.4", features = ["release_max_level_info"]}
fern = "0.6" fern = "0.6"
serde_json = "1.0" serde_json = "1.0"
futures = "0.3" futures = "0.3"
async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]} async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]}
chrono = "0.4"
[patch.crates-io]
tokio = {git = "https://github.com/naim94a/tokio", branch = "tokio-with-trysendto", features = ["macros", "net", "rt-threaded", "fs", "sync", "blocking", "signal"]}

View file

@ -34,11 +34,8 @@ fn setup_logging(cfg: &Configuration) {
if let Err(err) = fern::Dispatch::new() if let Err(err) = fern::Dispatch::new()
.format(|out, message, record| { .format(|out, message, record| {
out.finish(format_args!( out.finish(format_args!(
"{}[{}][{}]\t{}", "{} [{}][{}] {}",
std::time::SystemTime::now() chrono::Local::now().format("%+"),
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
record.target(), record.target(),
record.level(), record.level(),
message message
@ -127,17 +124,15 @@ async fn main() {
}); });
} }
let mut udp_server = server::UDPTracker::new(cfg.clone(), tracker.clone()) let udp_server = server::UDPTracker::new(cfg.clone(), tracker.clone())
.await .await
.expect("failed to bind udp socket"); .expect("failed to bind udp socket");
trace!("Waiting for UDP packets"); trace!("Waiting for UDP packets");
let udp_server = tokio::spawn(async move { let udp_server = tokio::spawn(async move {
loop { if let Err(err) = udp_server.accept_packets().await {
if let Err(err) = udp_server.accept_packet().await {
eprintln!("error: {}", err); eprintln!("error: {}", err);
} }
}
}); });
let weak_tracker = std::sync::Arc::downgrade(&tracker); let weak_tracker = std::sync::Arc::downgrade(&tracker);

View file

@ -108,7 +108,8 @@ struct UDPScrapeResponseEntry {
} }
pub struct UDPTracker { pub struct UDPTracker {
server: UdpSocket, srv_send: tokio::net::udp::SendHalf,
srv_recv: Option<tokio::net::udp::RecvHalf>,
tracker: std::sync::Arc<tracker::TorrentTracker>, tracker: std::sync::Arc<tracker::TorrentTracker>,
config: Arc<Configuration>, config: Arc<Configuration>,
} }
@ -120,16 +121,17 @@ impl UDPTracker {
let cfg = config.clone(); let cfg = config.clone();
let server = UdpSocket::bind(cfg.get_udp_config().get_address()).await?; let server = UdpSocket::bind(cfg.get_udp_config().get_address()).await?;
let (srv_recv, srv_send) = server.split();
Ok(UDPTracker { Ok(UDPTracker {
server, srv_send,
srv_recv: Some(srv_recv),
tracker, tracker,
config: cfg, config: cfg,
}) })
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn handle_packet(&self, remote_address: &SocketAddr, payload: &[u8]) {
async fn handle_packet(&mut self, remote_address: &SocketAddr, payload: &[u8]) {
let header: UDPRequestHeader = match unpack(payload) { let header: UDPRequestHeader = match unpack(payload) {
Some(val) => val, Some(val) => val,
None => { None => {
@ -150,8 +152,7 @@ impl UDPTracker {
} }
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn handle_connect(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) {
async fn handle_connect(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) {
if header.connection_id != PROTOCOL_ID { if header.connection_id != PROTOCOL_ID {
trace!("Bad protocol magic from {}", remote_addr); trace!("Bad protocol magic from {}", remote_addr);
return; return;
@ -176,8 +177,7 @@ impl UDPTracker {
} }
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn handle_announce(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
async fn handle_announce(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
if header.connection_id != self.get_connection_id(remote_addr) { if header.connection_id != self.get_connection_id(remote_addr) {
return; return;
} }
@ -282,8 +282,7 @@ impl UDPTracker {
} }
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
async fn handle_scrape(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) {
if header.connection_id != self.get_connection_id(remote_addr) { if header.connection_id != self.get_connection_id(remote_addr) {
return; return;
} }
@ -361,13 +360,14 @@ impl UDPTracker {
} }
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result<usize, std::io::Error> {
async fn send_packet(&mut self, remote_addr: &SocketAddr, payload: &[u8]) -> Result<usize, std::io::Error> { self.srv_send.try_send_to(payload, remote_addr).await.map_err(|e| {
self.server.send_to(payload, remote_addr).await debug!("failed to send a packet: {}", e);
e
})
} }
// TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved async fn send_error(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) {
async fn send_error(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) {
let mut payload_buffer = vec![0u8; MAX_PACKET_SIZE]; let mut payload_buffer = vec![0u8; MAX_PACKET_SIZE];
let mut payload = StackVec::from(&mut payload_buffer); let mut payload = StackVec::from(&mut payload_buffer);
@ -382,16 +382,20 @@ impl UDPTracker {
} }
} }
// TODO: remove `mut` for `accept_packet`, and spawn once https://github.com/tokio-rs/tokio/issues/1624 is resolved pub async fn accept_packets(mut self) -> Result<(), std::io::Error> {
pub async fn accept_packet(&mut self) -> Result<(), std::io::Error> { let mut recv = self.srv_recv.take().unwrap();
let mut packet = vec![0u8; MAX_PACKET_SIZE]; let tracker = Arc::new(self);
let (size, remote_address) = self.server.recv_from(packet.as_mut_slice()).await?;
// tokio::spawn(async { loop {
let mut packet = vec![0u8; MAX_PACKET_SIZE];
let (size, remote_address) = recv.recv_from(packet.as_mut_slice()).await?;
let tracker = tracker.clone();
tokio::spawn(async move {
debug!("Received {} bytes from {}", size, remote_address); debug!("Received {} bytes from {}", size, remote_address);
self.handle_packet(&remote_address, &packet[..size]).await; tracker.handle_packet(&remote_address, &packet[..size]).await;
// }); });
Ok(()) }
} }
} }

View file

@ -224,7 +224,8 @@ impl PeerId {
} }
impl Serialize for PeerId { impl Serialize for PeerId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: serde::Serializer { where
S: serde::Serializer, {
let mut tmp = [0u8; 40]; let mut tmp = [0u8; 40];
binascii::bin2hex(&self.0, &mut tmp).unwrap(); binascii::bin2hex(&self.0, &mut tmp).unwrap();
let id = std::str::from_utf8(&tmp).ok(); let id = std::str::from_utf8(&tmp).ok();
@ -235,7 +236,10 @@ impl Serialize for PeerId {
client: Option<&'a str>, client: Option<&'a str>,
} }
let obj = PeerIdInfo { id, client: self.get_client_name() }; let obj = PeerIdInfo {
id,
client: self.get_client_name(),
};
obj.serialize(serializer) obj.serialize(serializer)
} }
} }
@ -318,7 +322,7 @@ impl TorrentEntry {
list list
} }
pub fn get_peers_iter(&self) -> impl Iterator<Item=(&PeerId, &TorrentPeer)> { pub fn get_peers_iter(&self) -> impl Iterator<Item = (&PeerId, &TorrentPeer)> {
self.peers.iter() self.peers.iter()
} }

View file

@ -34,7 +34,7 @@ struct TorrentEntry<'a> {
seeders: u32, seeders: u32,
leechers: u32, leechers: u32,
#[serde(skip_serializing_if="Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
peers: Option<Vec<(crate::tracker::PeerId, crate::tracker::TorrentPeer)>>, peers: Option<Vec<(crate::tracker::PeerId, crate::tracker::TorrentPeer)>>,
} }
@ -108,7 +108,13 @@ pub fn build_server(
.iter() .iter()
.map(|(k, v)| { .map(|(k, v)| {
let (seeders, _, leechers) = v.get_stats(); let (seeders, _, leechers) = v.get_stats();
TorrentEntry { info_hash: k, data: v, seeders, leechers, peers: None } TorrentEntry {
info_hash: k,
data: v,
seeders,
leechers,
peers: None,
}
}) })
.skip(offset as usize) .skip(offset as usize)
.take(limit as usize) .take(limit as usize)
@ -121,7 +127,8 @@ pub fn build_server(
let t2 = tracker.clone(); let t2 = tracker.clone();
// view_torrent_info -> GET /t/:infohash HTTP/* // view_torrent_info -> GET /t/:infohash HTTP/*
let view_torrent_info = filters::method::get() let view_torrent_info = filters::method::get()
.and(filters::path::param()).and(filters::path::end()) .and(filters::path::param())
.and(filters::path::end())
.map(move |info_hash: InfoHash| { .map(move |info_hash: InfoHash| {
let tracker = t2.clone(); let tracker = t2.clone();
(info_hash, tracker) (info_hash, tracker)
@ -138,9 +145,7 @@ pub fn build_server(
let peers: Vec<_> = info let peers: Vec<_> = info
.get_peers_iter() .get_peers_iter()
.take(1000) .take(1000)
.map(|(peer_id, peer_info)| { .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.clone()))
(peer_id.clone(), peer_info.clone())
})
.collect(); .collect();
Ok(reply::json(&TorrentEntry { Ok(reply::json(&TorrentEntry {
@ -156,7 +161,8 @@ pub fn build_server(
// DELETE /t/:info_hash // DELETE /t/:info_hash
let t3 = tracker.clone(); let t3 = tracker.clone();
let delete_torrent = filters::method::delete() let delete_torrent = filters::method::delete()
.and(filters::path::param()).and(filters::path::end()) .and(filters::path::param())
.and(filters::path::end())
.map(move |info_hash: InfoHash| { .map(move |info_hash: InfoHash| {
let tracker = t3.clone(); let tracker = t3.clone();
(info_hash, tracker) (info_hash, tracker)
@ -180,7 +186,8 @@ pub fn build_server(
// add_torrent/alter: POST /t/:info_hash // add_torrent/alter: POST /t/:info_hash
// (optional) BODY: json: {"is_flagged": boolean} // (optional) BODY: json: {"is_flagged": boolean}
let change_torrent = filters::method::post() let change_torrent = filters::method::post()
.and(filters::path::param()).and(filters::path::end()) .and(filters::path::param())
.and(filters::path::end())
.and(filters::body::content_length_limit(4096)) .and(filters::body::content_length_limit(4096))
.and(filters::body::json()) .and(filters::body::json())
.map(move |info_hash: InfoHash, body: Option<TorrentFlag>| { .map(move |info_hash: InfoHash, body: Option<TorrentFlag>| {