diff --git a/Cargo.lock b/Cargo.lock index fdc4da4..15ab685 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,19 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62" +[[package]] +name = "async-compression" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d548918a155e5d6f19f414450e184a632ba492bd09f0e8e4622919ccb940664" +dependencies = [ + "bzip2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", +] + [[package]] name = "atty" version = "0.2.14" @@ -99,6 +112,32 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +[[package]] +name = "bzip2" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.8+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05305b41c5034ff0e93937ac64133d109b5a2660114ec45e9760bc6816d83038" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "cc" +version = "1.0.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d87b23d6a92cd03af510a5ade527033f6aa6fa92161e2d5863a907d4c5e31d" + [[package]] name = "cfg-if" version = "0.1.10" @@ -180,6 +219,7 @@ checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -202,12 +242,35 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" +[[package]] +name = "futures-executor" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" +[[package]] +name = "futures-macro" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.4" @@ -226,10 +289,17 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", ] [[package]] @@ -546,6 +616,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "proc-macro-hack" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" + +[[package]] +name = "proc-macro-nested" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" + [[package]] name = "proc-macro2" version = "1.0.12" @@ -768,10 +850,12 @@ checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33" name = "udpt-rs" version = "3.0.0-alpha" dependencies = [ + "async-compression", "binascii", "bincode", "clap", "fern", + "futures", "log", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 3d0106f..4c5c2d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,5 @@ clap = "2.33" log = "0.4" fern = "0.6" serde_json = "1.0" +futures = "0.3" +async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]} diff --git a/src/main.rs b/src/main.rs index 3b84fc1..89c0ba8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -#![forbid(unsafe_code)] - use clap; use fern; use log::{error, info, trace, warn}; @@ -8,6 +6,7 @@ mod config; mod server; mod stackvec; mod tracker; +mod utils; mod webserver; use config::Configuration; diff --git a/src/tracker.rs b/src/tracker.rs index e4337c1..20ac9f2 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -1,9 +1,10 @@ use crate::server::Events; +use crate::utils; use log::{error, trace}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::BTreeMap; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; +use tokio::io::AsyncBufReadExt; use tokio::stream::StreamExt; use tokio::sync::RwLock; @@ -256,6 +257,11 @@ impl TorrentTracker { mode: TrackerMode, reader: &mut R, ) -> Result { let reader = tokio::io::BufReader::new(reader); + let reader = utils::TokioFuturesCompat::from(reader); + let reader = async_compression::futures::bufread::BzDecoder::new(reader); + let reader = utils::TokioFuturesCompat::from(reader); + let reader = tokio::io::BufReader::new(reader); + let mut tmp: Vec = Vec::with_capacity(4096); tmp.resize(tmp.capacity(), 0); @@ -264,13 +270,13 @@ impl TorrentTracker { let mut records = reader .lines() - .filter_map(|res| { + .filter_map(|res: Result| { if let Err(ref err) = res { error!("failed to read lines! {}", err); } res.ok() }) - .map(|line| serde_json::from_str::(&line)) + .map(|line: String| serde_json::from_str::(&line)) .filter_map(|jsr| { if let Err(ref err) = jsr { error!("failed to parse json: {}", err); @@ -284,6 +290,7 @@ impl TorrentTracker { let a = a.into_owned(); db.insert(b, a); } + trace!("loaded {} entries from database", db.len()); drop(db); @@ -390,8 +397,10 @@ impl TorrentTracker { self.database.torrent_peers.read().await } - pub async fn save_database(&self, mut writer: W) -> Result<(), std::io::Error> { - // TODO: find async friendly compressor + pub async fn save_database(&self, mut w: W) -> Result<(), std::io::Error> { + use futures::io::AsyncWriteExt; + + let mut writer = async_compression::futures::write::BzEncoder::new(utils::TokioFuturesCompat::from(&mut w)); let db_lock = self.database.torrent_peers.read().await; @@ -411,6 +420,8 @@ impl TorrentTracker { tmp.push(b'\n'); writer.write_all(&tmp).await?; } + + writer.close().await?; Ok(()) } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..9a1f27a --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,76 @@ +use core::marker::Unpin; +/// In order to use `tokio` with `futures` we need a compatability, This is probably temporary. +use core::pin::Pin; +use core::task::{Context, Poll}; + +/// This allows you to convert a `tokio` `AsyncWrite` into a `futures` `AsyncWrite` +#[repr(transparent)] +pub struct TokioFuturesCompat(T); + +impl TokioFuturesCompat { + fn get_self(self: Pin<&mut Self>) -> Pin<&mut T> { + unsafe { self.map_unchecked_mut(|v| &mut v.0) } + } +} + +/// tokio::AsyncWrite -> futures::AsyncWrite +impl futures::io::AsyncWrite for TokioFuturesCompat { + fn poll_write( + self: Pin<&mut Self>, cx: &mut futures::task::Context, buf: &[u8], + ) -> futures::task::Poll> { + Self::get_self(self).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, cx: &mut futures::task::Context, + ) -> futures::task::Poll> { + Self::get_self(self).poll_flush(cx) + } + + fn poll_close( + self: Pin<&mut Self>, cx: &mut futures::task::Context, + ) -> futures::task::Poll> { + Self::get_self(self).poll_shutdown(cx) + } +} + +/// tokio::AsyncRead -> futures::AsyncRead +impl futures::io::AsyncRead for TokioFuturesCompat { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + Self::get_self(self).poll_read(cx, buf) + } +} + +// tokio::AsyncBufRead -> futures::AsyncBufRead +impl futures::io::AsyncBufRead for TokioFuturesCompat { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Self::get_self(self).poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + Self::get_self(self).consume(amt) + } +} + +/// futures::AsyncRead -> tokio::AsyncRead +impl tokio::io::AsyncRead for TokioFuturesCompat { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + Self::get_self(self).poll_read(cx, buf) + } +} + +/// futures::AsyncReadBuf -> tokio::AsyncReadBuf +impl tokio::io::AsyncBufRead for TokioFuturesCompat { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Self::get_self(self).poll_fill_buf(cx) + } + fn consume(self: Pin<&mut Self>, amt: usize) { + Self::get_self(self).consume(amt) + } +} + +impl From for TokioFuturesCompat { + fn from(v: T) -> TokioFuturesCompat { + Self(v) + } +}