From 76424a2100cb7a79e3cd3d403080954cd13a3fdb Mon Sep 17 00:00:00 2001 From: Gabriel Simmer Date: Mon, 2 Oct 2023 22:35:17 +0100 Subject: [PATCH] More groundwork for gossip Working UDP -> message handler and cache middleware -> message handler. Now I just need to figure out the format of the messages passed, and do testing with other peers. --- Cargo.lock | 1 + Cargo.toml | 3 +- src/main.rs | 163 ++++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 137 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 675791e..2d84bc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1923,6 +1923,7 @@ dependencies = [ "lazy_static 1.4.0", "maud", "orgize", + "rand", "rss", "serde", "serde_dhall", diff --git a/Cargo.toml b/Cargo.toml index a56daf8..f87af6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,4 +27,5 @@ file-format = "0.18.0" rss = "2.0.6" time = { version = "0.3.28", features = ["parsing", "formatting", "macros"] } async-trait = "0.1.73" -crossbeam = "0.8.2" \ No newline at end of file +crossbeam = "0.8.2" +rand = "0.8.5" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index e9fc90b..0769199 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod cache; mod posts; use axum::extract::Path; +use axum::http::request; use axum::response::IntoResponse; use axum::{ body::Full, @@ -20,9 +21,12 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use file_format::{FileFormat, Kind}; use hyper::body::Bytes; use maud::{html, Markup, PreEscaped, Render, DOCTYPE}; +use rand::seq::SliceRandom; use rss::ChannelBuilder; use serde::Deserialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; +use std::collections::HashMap; +use std::fmt::Display; use std::net::UdpSocket; use std::str::FromStr; use std::{env, io, thread}; @@ -31,17 +35,57 @@ use tower_http::services::ServeDir; use crate::cache::{init_cache, Cache}; +enum PeerStatus { + Healthy, + Suspect, + Down +} + +enum GossipMessages { + Ready, + Cache, + Ack, +} +impl GossipMessages { + fn from_str(request: String) -> Option { + match request.as_str() { + "rdy" => Some(Self::Ready), + "cache" => Some(Self::Cache), + "ack" => Some(Self::Ack), + _ => None, + } + } +} + +impl Display for GossipMessages { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GossipMessages::Ready => write!(f, "rdy"), + GossipMessages::Cache => write!(f, "cache"), + GossipMessages::Ack => write!(f, "ack"), + } + } +} + +struct Peer { + counter: i64, + health: PeerStatus +} + #[derive(Parser)] struct Cli { #[arg(short, long, default_value_t=("gs.db").to_string())] database_path: String, #[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())] bind: String, + #[arg(short, long)] + peers: Option } #[derive(Clone, Debug)] struct AppState { cache: Cache, + to_gossip: Sender, } #[derive(Deserialize)] @@ -90,8 +134,14 @@ async fn main() -> Result<(), sqlx::Error> { sqlx::migrate!("./migrations").run(&pool).await?; env::set_var("DATABASE_PATH", &args.database_path); + + // Create channels for sending messages and receiving results + let (s_gossip, r_gossip) = unbounded::(); + let (s_main, r_main) = unbounded::(); + let state = AppState { cache: init_cache().await, + to_gossip: s_main.clone() }; let app = Router::new() @@ -105,53 +155,103 @@ async fn main() -> Result<(), sqlx::Error> { .layer(middleware::from_fn_with_state(state.clone(), cached_page)) .with_state(state); - // Create channels for sending messages and receiving results - let (to_gossip, from_gossip) = unbounded::(); - let (to_main, from_main) = unbounded(); - - // Spawn a worker thread - let gossip_server = thread::spawn(move || { - let _ = gossiper(from_main, to_gossip); - }); - println!("Running webserver on {}", args.bind); - let webserver = axum::Server::bind(&args.bind.parse().unwrap()).serve(app.into_make_service()); - tokio::spawn(webserver); + let webserver = axum::Server::bind(&args.bind.parse().unwrap()) + .serve(app.into_make_service()); - loop { - for msg in from_gossip.try_iter() { - println!("{}", msg); + if args.peers.is_some_and(|f| f != "".to_owned()) { + tokio::spawn(webserver); + println!("starting gossip worker"); + // Spawn a worker thread + let gossip_server = thread::scope(|scope| { + let _ = gossiper(s_main.clone(), r_main, s_gossip); + }); + + let _ = s_main.send(GossipMessages::Ready); + loop { + for msg in r_gossip.try_iter() { + println!("gossip thrd: {}", msg); + } } + } else { + let _ = webserver.await; } - - // Optionally, wait for the worker thread to finish - gossip_server.join().unwrap(); - + Ok(()) } -fn gossiper(rx: Receiver, tx: Sender) -> io::Result<()> { +fn gossiper(own_sender: Sender, rx: Receiver, tx: Sender) -> io::Result<()> { + let mut peers: HashMap = HashMap::new(); let mut buf = [0; 1024]; let socket = UdpSocket::bind("0.0.0.0:1337")?; - let _ = tx.send("rdy".to_owned()); + let r_socket = socket.try_clone().unwrap(); + let _ = tx.send(GossipMessages::Ready); - // Handle messages that are passed to us from the main thread. - for message in rx.try_iter() { - println!("Received: {}", message); - } - // Read from the UDP socket. - loop { + // Listen on our UDP socket and pass messages back up. + thread::spawn(move || loop { let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data"); - let request = String::from_utf8_lossy(&buf[..size]); - let _ = tx.send(request.to_string()).unwrap(); + let mut request = String::from_utf8_lossy(&buf[..size]).into_owned(); + trim_newline(&mut request); - let response = "ack!"; + let msg = GossipMessages::from_str(request).unwrap(); + let _ = own_sender.send(msg).unwrap(); + let response = "ack"; socket .send_to(response.as_bytes(), source) .expect("Failed to send response"); + }); + + // Handle messages that are passed to us from the main thread or the UDP socket. + loop { + let message = rx.recv().unwrap(); + println!("got: {}", &message); + match message { + GossipMessages::Cache => { + let selected_peers: Vec = match select_peers(&peers) { + Some(p) => { println!("found peers"); p }, + None => { println!("no peers, not gossiping"); vec![] }, + }; + for peer in selected_peers { + let msg = GossipMessages::Cache.to_string().into_bytes(); + let result = match r_socket.send_to(&msg, &peer) { + Ok(_) => "", + Err(_) => "", + }; + let p = peers.get_mut(&peer).unwrap(); + p.counter = p.counter + 1; + } + }, + GossipMessages::Ack => { + println!("healthy ack"); + } + _ => {} + } } } +fn trim_newline(s: &mut String) { + if s.ends_with('\n') { + s.pop(); + if s.ends_with('\r') { + s.pop(); + } + } +} + +/// select a third of the peers we know about to gossip to. +fn select_peers(peers: &HashMap) -> Option> { + if peers.len() == 0 { + return None; + } + let healthy: Vec = peers.into_iter() + .filter(|(&ref k, &ref peer)| matches!(peer.health, PeerStatus::Healthy)) + .map(|(&ref ip, &ref peer)| ip.to_owned()).collect(); + let len = healthy.len() as usize / (3 as usize); + let rng = &mut rand::thread_rng(); + // Select a random number of peers to gossip to. + Some(healthy.choose_multiple(rng, len).map(|f| f.to_owned()).collect()) +} + async fn raw_blog_post(Path(post): Path) -> Result { let post = posts::blog_post(post); if post.is_err() { @@ -320,6 +420,11 @@ async fn cached_page( let content = String::from_utf8(res).unwrap(); state.cache.set(path, contenttype.to_owned(), content).await; + match state.to_gossip.send(GossipMessages::Cache) { + Ok(_) => { }, + Err(_) => { }, + }; + return Response::builder() .header("content-type", contenttype) .header("cache", "miss")