update tokio 1.0 + warp 0.3

This commit is contained in:
Naim A 2021-01-20 16:02:53 +02:00
parent 8c7b2d18e4
commit d2e7c6d5a3
3 changed files with 179 additions and 417 deletions

532
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package] [package]
name = "udpt-rs" name = "udpt-rs"
version = "3.1.0" version = "3.1.1"
authors = ["Naim A. <naim@abda.nl>"] authors = ["Naim A. <naim@abda.nl>"]
description = "High performance torrent tracker" description = "High performance torrent tracker"
edition = "2018" edition = "2018"
@ -11,15 +11,13 @@ lto = "fat"
[dependencies] [dependencies]
serde = {version = "1.0", features = ["derive"]} serde = {version = "1.0", features = ["derive"]}
bincode = "1.3" bincode = "1.3"
warp = {version = "0.2", default-features = false} warp = {version = "0.3", default-features = false}
tokio = {version = "0.3", features = ["macros", "io-util", "net", "time", "stream", "rt-multi-thread", "fs", "sync", "signal"]} tokio = {version = "1.0", features = ["macros", "io-util", "net", "time", "rt-multi-thread", "fs", "sync", "signal"]}
tokio-util = {version = "0.5", features = ["compat"]}
binascii = "0.1" binascii = "0.1"
toml = "0.5" toml = "0.5"
clap = "2.33" clap = "2.33"
log = {version = "0.4", features = ["release_max_level_info"]} log = {version = "0.4", features = ["release_max_level_info"]}
fern = "0.6" fern = "0.6"
serde_json = "1.0" serde_json = "1.0"
futures = "0.3" async-compression = {version = "^0.3.7", features = ["bzip2", "tokio"]}
async-compression = {version = "0.3", features = ["bzip2", "futures-bufread", "futures-write"]}
chrono = "0.4" chrono = "0.4"

View file

@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::stream::StreamExt;
use tokio::sync::RwLock; use tokio::sync::RwLock;
const TWO_HOURS: std::time::Duration = std::time::Duration::from_secs(3600 * 2); const TWO_HOURS: std::time::Duration = std::time::Duration::from_secs(3600 * 2);
@ -373,40 +372,35 @@ impl TorrentTracker {
pub async fn load_database<R: tokio::io::AsyncRead + Unpin>( pub async fn load_database<R: tokio::io::AsyncRead + Unpin>(
mode: TrackerMode, reader: &mut R, mode: TrackerMode, reader: &mut R,
) -> Result<TorrentTracker, std::io::Error> { ) -> Result<TorrentTracker, std::io::Error> {
use tokio_util::compat::{FuturesAsyncReadCompatExt, Tokio02AsyncReadCompatExt};
let reader = tokio::io::BufReader::new(reader).compat();
let reader = async_compression::futures::bufread::BzDecoder::new(reader).compat();
let reader = tokio::io::BufReader::new(reader); let reader = tokio::io::BufReader::new(reader);
let reader = async_compression::tokio::bufread::BzDecoder::new(reader);
let mut tmp: Vec<u8> = Vec::with_capacity(4096); let reader = tokio::io::BufReader::new(reader);
tmp.resize(tmp.capacity(), 0);
let res = TorrentTracker::new(mode); let res = TorrentTracker::new(mode);
let mut db = res.database.torrent_peers.write().await; let mut db = res.database.torrent_peers.write().await;
let mut records = reader let mut records = reader.lines();
.lines() loop {
.filter_map(|res: Result<String, _>| { let line = match records.next_line().await {
if let Err(ref err) = res { Ok(Some(v)) => v,
Ok(None) => break,
Err(ref err) => {
error!("failed to read lines! {}", err); error!("failed to read lines! {}", err);
} continue;
res.ok() },
}) };
.map(|line: String| serde_json::from_str::<DatabaseRow>(&line)) let row: DatabaseRow = match serde_json::from_str(&line) {
.filter_map(|jsr| { Ok(v) => v,
if let Err(ref err) = jsr { Err(err) => {
error!("failed to parse json: {}", err); error!("failed to parse json: {}", err);
continue;
}
};
let entry = row.entry.into_owned();
let infohash = row.info_hash;
db.insert(infohash, entry);
} }
jsr.ok()
});
while let Some(entry) = records.next().await {
let x = || (entry.entry, entry.info_hash);
let (a, b) = x();
let a = a.into_owned();
db.insert(b, a);
}
trace!("loaded {} entries from database", db.len()); trace!("loaded {} entries from database", db.len());
drop(db); drop(db);
@ -515,10 +509,9 @@ impl TorrentTracker {
} }
pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, w: W) -> Result<(), std::io::Error> { pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, w: W) -> Result<(), std::io::Error> {
use futures::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio_util::compat::Tokio02AsyncWriteCompatExt;
let mut writer = async_compression::futures::write::BzEncoder::new(w.compat_write()); let mut writer = async_compression::tokio::write::BzEncoder::new(w);
let db_lock = self.database.torrent_peers.read().await; let db_lock = self.database.torrent_peers.read().await;
@ -538,8 +531,7 @@ impl TorrentTracker {
tmp.push(b'\n'); tmp.push(b'\n');
writer.write_all(&tmp).await?; writer.write_all(&tmp).await?;
} }
writer.flush().await?;
writer.close().await?;
Ok(()) Ok(())
} }