From 346aa4bbd0c55867a5234cca8114ce839f084518 Mon Sep 17 00:00:00 2001 From: Naim A <227396+naim94a@users.noreply.github.com> Date: Wed, 13 May 2020 22:59:29 +0300 Subject: [PATCH] add multithreaded udp send with tokio --- Cargo.lock | 139 ++++++++++++++++++++++++++++++----------------- Cargo.toml | 6 +- src/main.rs | 15 ++--- src/server.rs | 52 ++++++++++-------- src/tracker.rs | 10 +++- src/webserver.rs | 23 +++++--- 6 files changed, 149 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c39e872..3273e03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,9 +47,9 @@ checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" [[package]] name = "base64" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3" +checksum = "53d1ccbaf7d9ec9537465a97bf19edc1a4e158ecb49fc16178202238c569cc42" [[package]] name = "binascii" @@ -145,10 +145,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] -name = "clap" -version = "2.33.0" +name = "chrono" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" +checksum = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2" +dependencies = [ + "num-integer", + "num-traits", + "time", +] + +[[package]] +name = "clap" +version = "2.33.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" dependencies = [ "ansi_term", "atty", @@ -213,9 +224,9 @@ checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" [[package]] name = "futures" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" dependencies = [ "futures-channel", "futures-core", @@ -228,9 +239,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" dependencies = [ "futures-core", "futures-sink", @@ -238,15 +249,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" +checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" [[package]] name = "futures-executor" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" dependencies = [ "futures-core", "futures-task", @@ -255,15 +266,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" [[package]] name = "futures-macro" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -273,21 +284,24 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" [[package]] name = "futures-task" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] [[package]] name = "futures-util" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" dependencies = [ "futures-channel", "futures-core", @@ -296,6 +310,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", + "pin-project", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -313,9 +328,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" +checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff" dependencies = [ "bytes", "fnv", @@ -468,9 +483,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e85c08494b21a9054e7fe1374a732aeadaff3980b6990b94bfd3a70f690005" +checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" [[package]] name = "log" @@ -562,6 +577,25 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "num-integer" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.0" @@ -572,6 +606,12 @@ dependencies = [ "libc", ] +[[package]] +name = "once_cell" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -586,18 +626,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "0.4.10" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36e3dcd42688c05a66f841d22c5d8390d9a5d4c9aaf57b9285eae4900a080063" +checksum = "81d480cb4e89522ccda96d0eed9af94180b7a5f93fb28f66e1fd7d68431663d1" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.10" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4d7346ac577ff1296e06a418e7618e22655bae834d4970cb6e39d6da8119969" +checksum = "a82996f11efccb19b685b14b5df818de31c1edcee3daa256ab5775dd98e72feb" dependencies = [ "proc-macro2", "quote", @@ -606,9 +646,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" +checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f" [[package]] name = "pin-utils" @@ -639,9 +679,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c1f4b0efa5fc5e8ceb705136bfee52cfdb6a4e3509f770b478cd6ed434232a7" +checksum = "42934bc9c8ab0d3b273a16d8551c8f0fcff46be73276ca083ec2414c15c4ba5e" dependencies = [ "proc-macro2", ] @@ -660,18 +700,18 @@ checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" [[package]] name = "serde" -version = "1.0.106" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" +checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.106" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" +checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984" dependencies = [ "proc-macro2", "quote", @@ -680,9 +720,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.52" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" dependencies = [ "itoa", "ryu", @@ -743,9 +783,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" [[package]] name = "syn" -version = "1.0.18" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "410a7488c0a728c7ceb4ad59b9567eb4053d02e8cc7f5c0e0eeeb39518369213" +checksum = "4696caa4048ac7ce2bcd2e484b3cef88c1004e41b8e945a277e2c25dc0b72060" dependencies = [ "proc-macro2", "quote", @@ -773,9 +813,8 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05c1d570eb1a36f0345a5ce9c6c6e665b70b73d11236912c0b477616aeec47b1" +version = "0.2.21" +source = "git+https://github.com/naim94a/tokio?branch=tokio-with-trysendto#4fb99d0fe2c3deb8dac774edd11316384ebb6667" dependencies = [ "bytes", "fnv", @@ -797,8 +836,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +source = "git+https://github.com/naim94a/tokio?branch=tokio-with-trysendto#4fb99d0fe2c3deb8dac774edd11316384ebb6667" dependencies = [ "proc-macro2", "quote", @@ -853,6 +891,7 @@ dependencies = [ "async-compression", "binascii", "bincode", + "chrono", "clap", "fern", "futures", @@ -922,9 +961,9 @@ checksum = "3df3561629a8bb4c57e5a2e4c43348d9e29c7c29d9b1c4c1f47166deca8f37ed" [[package]] name = "vec_map" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" [[package]] name = "version_check" diff --git a/Cargo.toml b/Cargo.toml index a184a66..2d13f79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,12 @@ tokio = {version = "0.2", features = ["macros", "net", "rt-threaded", "fs", "syn binascii = "0.1" toml = "0.5" clap = "2.33" -log = "0.4" +log = {version = "0.4", features = ["release_max_level_info"]} fern = "0.6" serde_json = "1.0" futures = "0.3" async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]} +chrono = "0.4" + +[patch.crates-io] +tokio = {git = "https://github.com/naim94a/tokio", branch = "tokio-with-trysendto", features = ["macros", "net", "rt-threaded", "fs", "sync", "blocking", "signal"]} diff --git a/src/main.rs b/src/main.rs index 89c0ba8..20976b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,11 +34,8 @@ fn setup_logging(cfg: &Configuration) { if let Err(err) = fern::Dispatch::new() .format(|out, message, record| { out.finish(format_args!( - "{}[{}][{}]\t{}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), + "{} [{}][{}] {}", + chrono::Local::now().format("%+"), record.target(), record.level(), message @@ -127,16 +124,14 @@ async fn main() { }); } - let mut udp_server = server::UDPTracker::new(cfg.clone(), tracker.clone()) + let udp_server = server::UDPTracker::new(cfg.clone(), tracker.clone()) .await .expect("failed to bind udp socket"); trace!("Waiting for UDP packets"); let udp_server = tokio::spawn(async move { - loop { - if let Err(err) = udp_server.accept_packet().await { - eprintln!("error: {}", err); - } + if let Err(err) = udp_server.accept_packets().await { + eprintln!("error: {}", err); } }); diff --git a/src/server.rs b/src/server.rs index f93bd52..6b379cf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -108,7 +108,8 @@ struct UDPScrapeResponseEntry { } pub struct UDPTracker { - server: UdpSocket, + srv_send: tokio::net::udp::SendHalf, + srv_recv: Option, tracker: std::sync::Arc, config: Arc, } @@ -120,16 +121,17 @@ impl UDPTracker { let cfg = config.clone(); let server = UdpSocket::bind(cfg.get_udp_config().get_address()).await?; + let (srv_recv, srv_send) = server.split(); Ok(UDPTracker { - server, + srv_send, + srv_recv: Some(srv_recv), tracker, config: cfg, }) } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn handle_packet(&mut self, remote_address: &SocketAddr, payload: &[u8]) { + async fn handle_packet(&self, remote_address: &SocketAddr, payload: &[u8]) { let header: UDPRequestHeader = match unpack(payload) { Some(val) => val, None => { @@ -150,8 +152,7 @@ impl UDPTracker { } } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn handle_connect(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) { + async fn handle_connect(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, _payload: &[u8]) { if header.connection_id != PROTOCOL_ID { trace!("Bad protocol magic from {}", remote_addr); return; @@ -176,8 +177,7 @@ impl UDPTracker { } } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn handle_announce(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { + async fn handle_announce(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { if header.connection_id != self.get_connection_id(remote_addr) { return; } @@ -282,8 +282,7 @@ impl UDPTracker { } } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn handle_scrape(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { + async fn handle_scrape(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, payload: &[u8]) { if header.connection_id != self.get_connection_id(remote_addr) { return; } @@ -361,13 +360,14 @@ impl UDPTracker { } } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn send_packet(&mut self, remote_addr: &SocketAddr, payload: &[u8]) -> Result { - self.server.send_to(payload, remote_addr).await + async fn send_packet(&self, remote_addr: &SocketAddr, payload: &[u8]) -> Result { + self.srv_send.try_send_to(payload, remote_addr).await.map_err(|e| { + debug!("failed to send a packet: {}", e); + e + }) } - // TODO: remove `mut` once https://github.com/tokio-rs/tokio/issues/1624 is resolved - async fn send_error(&mut self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) { + async fn send_error(&self, remote_addr: &SocketAddr, header: &UDPRequestHeader, error_msg: &str) { let mut payload_buffer = vec![0u8; MAX_PACKET_SIZE]; let mut payload = StackVec::from(&mut payload_buffer); @@ -382,16 +382,20 @@ impl UDPTracker { } } - // TODO: remove `mut` for `accept_packet`, and spawn once https://github.com/tokio-rs/tokio/issues/1624 is resolved - pub async fn accept_packet(&mut self) -> Result<(), std::io::Error> { - let mut packet = vec![0u8; MAX_PACKET_SIZE]; - let (size, remote_address) = self.server.recv_from(packet.as_mut_slice()).await?; + pub async fn accept_packets(mut self) -> Result<(), std::io::Error> { + let mut recv = self.srv_recv.take().unwrap(); + let tracker = Arc::new(self); - // tokio::spawn(async { - debug!("Received {} bytes from {}", size, remote_address); - self.handle_packet(&remote_address, &packet[..size]).await; - // }); - Ok(()) + loop { + let mut packet = vec![0u8; MAX_PACKET_SIZE]; + let (size, remote_address) = recv.recv_from(packet.as_mut_slice()).await?; + + let tracker = tracker.clone(); + tokio::spawn(async move { + debug!("Received {} bytes from {}", size, remote_address); + tracker.handle_packet(&remote_address, &packet[..size]).await; + }); + } } } diff --git a/src/tracker.rs b/src/tracker.rs index 896b23a..0bb623e 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -224,7 +224,8 @@ impl PeerId { } impl Serialize for PeerId { fn serialize(&self, serializer: S) -> Result - where S: serde::Serializer { + where + S: serde::Serializer, { let mut tmp = [0u8; 40]; binascii::bin2hex(&self.0, &mut tmp).unwrap(); let id = std::str::from_utf8(&tmp).ok(); @@ -235,7 +236,10 @@ impl Serialize for PeerId { client: Option<&'a str>, } - let obj = PeerIdInfo { id, client: self.get_client_name() }; + let obj = PeerIdInfo { + id, + client: self.get_client_name(), + }; obj.serialize(serializer) } } @@ -318,7 +322,7 @@ impl TorrentEntry { list } - pub fn get_peers_iter(&self) -> impl Iterator { + pub fn get_peers_iter(&self) -> impl Iterator { self.peers.iter() } diff --git a/src/webserver.rs b/src/webserver.rs index 208eea0..093feeb 100644 --- a/src/webserver.rs +++ b/src/webserver.rs @@ -34,7 +34,7 @@ struct TorrentEntry<'a> { seeders: u32, leechers: u32, - #[serde(skip_serializing_if="Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] peers: Option>, } @@ -108,7 +108,13 @@ pub fn build_server( .iter() .map(|(k, v)| { let (seeders, _, leechers) = v.get_stats(); - TorrentEntry { info_hash: k, data: v, seeders, leechers, peers: None } + TorrentEntry { + info_hash: k, + data: v, + seeders, + leechers, + peers: None, + } }) .skip(offset as usize) .take(limit as usize) @@ -121,7 +127,8 @@ pub fn build_server( let t2 = tracker.clone(); // view_torrent_info -> GET /t/:infohash HTTP/* let view_torrent_info = filters::method::get() - .and(filters::path::param()).and(filters::path::end()) + .and(filters::path::param()) + .and(filters::path::end()) .map(move |info_hash: InfoHash| { let tracker = t2.clone(); (info_hash, tracker) @@ -138,9 +145,7 @@ pub fn build_server( let peers: Vec<_> = info .get_peers_iter() .take(1000) - .map(|(peer_id, peer_info)| { - (peer_id.clone(), peer_info.clone()) - }) + .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.clone())) .collect(); Ok(reply::json(&TorrentEntry { @@ -156,7 +161,8 @@ pub fn build_server( // DELETE /t/:info_hash let t3 = tracker.clone(); let delete_torrent = filters::method::delete() - .and(filters::path::param()).and(filters::path::end()) + .and(filters::path::param()) + .and(filters::path::end()) .map(move |info_hash: InfoHash| { let tracker = t3.clone(); (info_hash, tracker) @@ -180,7 +186,8 @@ pub fn build_server( // add_torrent/alter: POST /t/:info_hash // (optional) BODY: json: {"is_flagged": boolean} let change_torrent = filters::method::post() - .and(filters::path::param()).and(filters::path::end()) + .and(filters::path::param()) + .and(filters::path::end()) .and(filters::body::content_length_limit(4096)) .and(filters::body::json()) .map(move |info_hash: InfoHash, body: Option| {