replace utils.rs with official tokio-util
This commit is contained in:
parent
856bbb8585
commit
db28591026
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -851,6 +851,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499"
|
|||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
|
@ -899,6 +900,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"warp",
|
||||
]
|
||||
|
|
|
@ -13,6 +13,7 @@ serde = {version = "1.0", features = ["derive"]}
|
|||
bincode = "1.2"
|
||||
warp = {version = "0.2", default-features = false}
|
||||
tokio = {version = "0.2", features = ["macros", "net", "rt-threaded", "fs", "sync", "blocking", "signal"]}
|
||||
tokio-util = {version = "0.3", features = ["compat"]}
|
||||
binascii = "0.1"
|
||||
toml = "0.5"
|
||||
clap = "2.33"
|
||||
|
|
|
@ -6,7 +6,6 @@ mod config;
|
|||
mod server;
|
||||
mod stackvec;
|
||||
mod tracker;
|
||||
mod utils;
|
||||
mod webserver;
|
||||
|
||||
use config::Configuration;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use crate::server::Events;
|
||||
use crate::utils;
|
||||
use log::{error, trace};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Cow;
|
||||
|
@ -374,10 +373,10 @@ impl TorrentTracker {
|
|||
pub async fn load_database<R: tokio::io::AsyncRead + Unpin>(
|
||||
mode: TrackerMode, reader: &mut R,
|
||||
) -> Result<TorrentTracker, std::io::Error> {
|
||||
let reader = tokio::io::BufReader::new(reader);
|
||||
let reader = utils::TokioFuturesCompat::from(reader);
|
||||
let reader = async_compression::futures::bufread::BzDecoder::new(reader);
|
||||
let reader = utils::TokioFuturesCompat::from(reader);
|
||||
use tokio_util::compat::{Tokio02AsyncReadCompatExt, FuturesAsyncReadCompatExt};
|
||||
|
||||
let reader = tokio::io::BufReader::new(reader).compat();
|
||||
let reader = async_compression::futures::bufread::BzDecoder::new(reader).compat();
|
||||
let reader = tokio::io::BufReader::new(reader);
|
||||
|
||||
let mut tmp: Vec<u8> = Vec::with_capacity(4096);
|
||||
|
@ -515,10 +514,11 @@ impl TorrentTracker {
|
|||
self.database.torrent_peers.read().await
|
||||
}
|
||||
|
||||
pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, mut w: W) -> Result<(), std::io::Error> {
|
||||
pub async fn save_database<W: tokio::io::AsyncWrite + Unpin>(&self, w: W) -> Result<(), std::io::Error> {
|
||||
use futures::io::AsyncWriteExt;
|
||||
use tokio_util::compat::Tokio02AsyncWriteCompatExt;
|
||||
|
||||
let mut writer = async_compression::futures::write::BzEncoder::new(utils::TokioFuturesCompat::from(&mut w));
|
||||
let mut writer = async_compression::futures::write::BzEncoder::new(w.compat_write());
|
||||
|
||||
let db_lock = self.database.torrent_peers.read().await;
|
||||
|
||||
|
|
76
src/utils.rs
76
src/utils.rs
|
@ -1,76 +0,0 @@
|
|||
use core::marker::Unpin;
|
||||
/// In order to use `tokio` with `futures` we need a compatability, This is probably temporary.
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
/// This allows you to convert a `tokio` `AsyncWrite` into a `futures` `AsyncWrite`
|
||||
#[repr(transparent)]
|
||||
pub struct TokioFuturesCompat<T: Unpin>(T);
|
||||
|
||||
impl<T: Unpin> TokioFuturesCompat<T> {
|
||||
fn get_self(self: Pin<&mut Self>) -> Pin<&mut T> {
|
||||
unsafe { self.map_unchecked_mut(|v| &mut v.0) }
|
||||
}
|
||||
}
|
||||
|
||||
/// tokio::AsyncWrite -> futures::AsyncWrite
|
||||
impl<T: tokio::io::AsyncWrite + Unpin> futures::io::AsyncWrite for TokioFuturesCompat<T> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>, cx: &mut futures::task::Context, buf: &[u8],
|
||||
) -> futures::task::Poll<Result<usize, futures::io::Error>> {
|
||||
Self::get_self(self).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>, cx: &mut futures::task::Context,
|
||||
) -> futures::task::Poll<Result<(), futures::io::Error>> {
|
||||
Self::get_self(self).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(
|
||||
self: Pin<&mut Self>, cx: &mut futures::task::Context,
|
||||
) -> futures::task::Poll<Result<(), futures::io::Error>> {
|
||||
Self::get_self(self).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// tokio::AsyncRead -> futures::AsyncRead
|
||||
impl<T: tokio::io::AsyncRead + Unpin> futures::io::AsyncRead for TokioFuturesCompat<T> {
|
||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {
|
||||
Self::get_self(self).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
// tokio::AsyncBufRead -> futures::AsyncBufRead
|
||||
impl<T: tokio::io::AsyncBufRead + Unpin> futures::io::AsyncBufRead for TokioFuturesCompat<T> {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8], futures::io::Error>> {
|
||||
Self::get_self(self).poll_fill_buf(cx)
|
||||
}
|
||||
|
||||
fn consume(self: Pin<&mut Self>, amt: usize) {
|
||||
Self::get_self(self).consume(amt)
|
||||
}
|
||||
}
|
||||
|
||||
/// futures::AsyncRead -> tokio::AsyncRead
|
||||
impl<T: Unpin + futures::io::AsyncRead> tokio::io::AsyncRead for TokioFuturesCompat<T> {
|
||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<tokio::io::Result<usize>> {
|
||||
Self::get_self(self).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
/// futures::AsyncReadBuf -> tokio::AsyncReadBuf
|
||||
impl<T: Unpin + futures::io::AsyncBufRead> tokio::io::AsyncBufRead for TokioFuturesCompat<T> {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<tokio::io::Result<&[u8]>> {
|
||||
Self::get_self(self).poll_fill_buf(cx)
|
||||
}
|
||||
fn consume(self: Pin<&mut Self>, amt: usize) {
|
||||
Self::get_self(self).consume(amt)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Unpin> From<T> for TokioFuturesCompat<T> {
|
||||
fn from(v: T) -> TokioFuturesCompat<T> {
|
||||
Self(v)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue