add async bzip compression to database

This commit is contained in:
Naim A 2020-05-07 02:53:25 +03:00
parent e42fec3ece
commit e1cf56aba4
5 changed files with 179 additions and 7 deletions

84
Cargo.lock generated
View file

@ -15,6 +15,19 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62"
[[package]]
name = "async-compression"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d548918a155e5d6f19f414450e184a632ba492bd09f0e8e4622919ccb940664"
dependencies = [
"bzip2",
"futures-core",
"futures-io",
"memchr",
"pin-project-lite",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -99,6 +112,32 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
[[package]]
name = "bzip2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42b7c3cbf0fa9c1b82308d57191728ca0256cb821220f4e2fd410a72ade26e3b"
dependencies = [
"bzip2-sys",
"libc",
]
[[package]]
name = "bzip2-sys"
version = "0.1.8+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05305b41c5034ff0e93937ac64133d109b5a2660114ec45e9760bc6816d83038"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "cc"
version = "1.0.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d87b23d6a92cd03af510a5ade527033f6aa6fa92161e2d5863a907d4c5e31d"
[[package]]
name = "cfg-if"
version = "0.1.10"
@ -180,6 +219,7 @@ checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -202,12 +242,35 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
[[package]]
name = "futures-executor"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f674f3e1bcb15b37284a90cedf55afdba482ab061c407a9c0ebbd0f3109741ba"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6"
[[package]]
name = "futures-macro"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.4"
@ -226,10 +289,17 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
@ -546,6 +616,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63"
[[package]]
name = "proc-macro-nested"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
[[package]]
name = "proc-macro2"
version = "1.0.12"
@ -768,10 +850,12 @@ checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33"
name = "udpt-rs"
version = "3.0.0-alpha"
dependencies = [
"async-compression",
"binascii",
"bincode",
"clap",
"fern",
"futures",
"log",
"serde",
"serde_json",

View file

@ -19,3 +19,5 @@ clap = "2.33"
log = "0.4"
fern = "0.6"
serde_json = "1.0"
futures = "0.3"
async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]}

View file

@ -1,5 +1,3 @@
#![forbid(unsafe_code)]
use clap;
use fern;
use log::{error, info, trace, warn};
@ -8,6 +6,7 @@ mod config;
mod server;
mod stackvec;
mod tracker;
mod utils;
mod webserver;
use config::Configuration;

View file

@ -1,9 +1,10 @@
use crate::server::Events;
use crate::utils;
use log::{error, trace};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::BTreeMap;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt;
use tokio::stream::StreamExt;
use tokio::sync::RwLock;
@ -256,6 +257,11 @@ impl TorrentTracker {
mode: TrackerMode, reader: &mut R,
) -> Result<TorrentTracker, std::io::Error> {
let reader = tokio::io::BufReader::new(reader);
let reader = utils::TokioFuturesCompat::from(reader);
let reader = async_compression::futures::bufread::BzDecoder::new(reader);
let reader = utils::TokioFuturesCompat::from(reader);
let reader = tokio::io::BufReader::new(reader);
let mut tmp: Vec<u8> = Vec::with_capacity(4096);
tmp.resize(tmp.capacity(), 0);
@ -264,13 +270,13 @@ impl TorrentTracker {
let mut records = reader
.lines()
.filter_map(|res| {
.filter_map(|res: Result<String, _>| {
if let Err(ref err) = res {
error!("failed to read lines! {}", err);
}
res.ok()
})
.map(|line| serde_json::from_str::<DatabaseRow>(&line))
.map(|line: String| serde_json::from_str::<DatabaseRow>(&line))
.filter_map(|jsr| {
if let Err(ref err) = jsr {
error!("failed to parse json: {}", err);
@ -284,6 +290,7 @@ impl TorrentTracker {
let a = a.into_owned();
db.insert(b, a);
}
trace!("loaded {} entries from database", db.len());
drop(db);
@ -390,8 +397,10 @@ impl TorrentTracker {
self.database.torrent_peers.read().await
}
pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, mut writer: W) -> Result<(), std::io::Error> {
// TODO: find async friendly compressor
pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, mut w: W) -> Result<(), std::io::Error> {
use futures::io::AsyncWriteExt;
let mut writer = async_compression::futures::write::BzEncoder::new(utils::TokioFuturesCompat::from(&mut w));
let db_lock = self.database.torrent_peers.read().await;
@ -411,6 +420,8 @@ impl TorrentTracker {
tmp.push(b'\n');
writer.write_all(&tmp).await?;
}
writer.close().await?;
Ok(())
}

76
src/utils.rs Normal file
View file

@ -0,0 +1,76 @@
use core::marker::Unpin;
/// In order to use `tokio` with `futures` we need a compatability, This is probably temporary.
use core::pin::Pin;
use core::task::{Context, Poll};
/// This allows you to convert a `tokio` `AsyncWrite` into a `futures` `AsyncWrite`
#[repr(transparent)]
pub struct TokioFuturesCompat<T: Unpin>(T);
impl<T: Unpin> TokioFuturesCompat<T> {
fn get_self(self: Pin<&mut Self>) -> Pin<&mut T> {
unsafe { self.map_unchecked_mut(|v| &mut v.0) }
}
}
/// tokio::AsyncWrite -> futures::AsyncWrite
impl<T: tokio::io::AsyncWrite + Unpin> futures::io::AsyncWrite for TokioFuturesCompat<T> {
fn poll_write(
self: Pin<&mut Self>, cx: &mut futures::task::Context, buf: &[u8],
) -> futures::task::Poll<Result<usize, futures::io::Error>> {
Self::get_self(self).poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>, cx: &mut futures::task::Context,
) -> futures::task::Poll<Result<(), futures::io::Error>> {
Self::get_self(self).poll_flush(cx)
}
fn poll_close(
self: Pin<&mut Self>, cx: &mut futures::task::Context,
) -> futures::task::Poll<Result<(), futures::io::Error>> {
Self::get_self(self).poll_shutdown(cx)
}
}
/// tokio::AsyncRead -> futures::AsyncRead
impl<T: tokio::io::AsyncRead + Unpin> futures::io::AsyncRead for TokioFuturesCompat<T> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {
Self::get_self(self).poll_read(cx, buf)
}
}
// tokio::AsyncBufRead -> futures::AsyncBufRead
impl<T: tokio::io::AsyncBufRead + Unpin> futures::io::AsyncBufRead for TokioFuturesCompat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8], futures::io::Error>> {
Self::get_self(self).poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
Self::get_self(self).consume(amt)
}
}
/// futures::AsyncRead -> tokio::AsyncRead
impl<T: Unpin + futures::io::AsyncRead> tokio::io::AsyncRead for TokioFuturesCompat<T> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<tokio::io::Result<usize>> {
Self::get_self(self).poll_read(cx, buf)
}
}
/// futures::AsyncReadBuf -> tokio::AsyncReadBuf
impl<T: Unpin + futures::io::AsyncBufRead> tokio::io::AsyncBufRead for TokioFuturesCompat<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<&[u8]>> {
Self::get_self(self).poll_fill_buf(cx)
}
fn consume(self: Pin<&mut Self>, amt: usize) {
Self::get_self(self).consume(amt)
}
}
impl<T: Unpin> From<T> for TokioFuturesCompat<T> {
fn from(v: T) -> TokioFuturesCompat<T> {
Self(v)
}
}