More groundwork for gossip
All checks were successful
Fly Deploy / Deploy app (push) Successful in 3m0s

Working UDP -> message handler and cache middleware -> message
handler. Now I just need to figure out the format of the messages
passed, and do testing with other peers.
This commit is contained in:
Gabriel Simmer 2023-10-02 22:35:17 +01:00
parent c5c7363e1a
commit 76424a2100
Signed by: arch
SSH key fingerprint: SHA256:m3OEcdtrnBpMX+2BDGh/byv3hrCekCLzDYMdvGEKPPQ
3 changed files with 137 additions and 30 deletions

1
Cargo.lock generated
View file

@ -1923,6 +1923,7 @@ dependencies = [
"lazy_static 1.4.0", "lazy_static 1.4.0",
"maud", "maud",
"orgize", "orgize",
"rand",
"rss", "rss",
"serde", "serde",
"serde_dhall", "serde_dhall",

View file

@ -28,3 +28,4 @@ rss = "2.0.6"
time = { version = "0.3.28", features = ["parsing", "formatting", "macros"] } time = { version = "0.3.28", features = ["parsing", "formatting", "macros"] }
async-trait = "0.1.73" async-trait = "0.1.73"
crossbeam = "0.8.2" crossbeam = "0.8.2"
rand = "0.8.5"

View file

@ -5,6 +5,7 @@ mod cache;
mod posts; mod posts;
use axum::extract::Path; use axum::extract::Path;
use axum::http::request;
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::{ use axum::{
body::Full, body::Full,
@ -20,9 +21,12 @@ use crossbeam::channel::{unbounded, Receiver, Sender};
use file_format::{FileFormat, Kind}; use file_format::{FileFormat, Kind};
use hyper::body::Bytes; use hyper::body::Bytes;
use maud::{html, Markup, PreEscaped, Render, DOCTYPE}; use maud::{html, Markup, PreEscaped, Render, DOCTYPE};
use rand::seq::SliceRandom;
use rss::ChannelBuilder; use rss::ChannelBuilder;
use serde::Deserialize; use serde::Deserialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use std::collections::HashMap;
use std::fmt::Display;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::str::FromStr; use std::str::FromStr;
use std::{env, io, thread}; use std::{env, io, thread};
@ -31,17 +35,57 @@ use tower_http::services::ServeDir;
use crate::cache::{init_cache, Cache}; use crate::cache::{init_cache, Cache};
enum PeerStatus {
Healthy,
Suspect,
Down
}
enum GossipMessages {
Ready,
Cache,
Ack,
}
impl GossipMessages {
fn from_str(request: String) -> Option<Self> {
match request.as_str() {
"rdy" => Some(Self::Ready),
"cache" => Some(Self::Cache),
"ack" => Some(Self::Ack),
_ => None,
}
}
}
impl Display for GossipMessages {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GossipMessages::Ready => write!(f, "rdy"),
GossipMessages::Cache => write!(f, "cache"),
GossipMessages::Ack => write!(f, "ack"),
}
}
}
struct Peer {
counter: i64,
health: PeerStatus
}
#[derive(Parser)] #[derive(Parser)]
struct Cli { struct Cli {
#[arg(short, long, default_value_t=("gs.db").to_string())] #[arg(short, long, default_value_t=("gs.db").to_string())]
database_path: String, database_path: String,
#[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())] #[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())]
bind: String, bind: String,
#[arg(short, long)]
peers: Option<String>
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AppState { struct AppState {
cache: Cache, cache: Cache,
to_gossip: Sender<GossipMessages>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -90,8 +134,14 @@ async fn main() -> Result<(), sqlx::Error> {
sqlx::migrate!("./migrations").run(&pool).await?; sqlx::migrate!("./migrations").run(&pool).await?;
env::set_var("DATABASE_PATH", &args.database_path); env::set_var("DATABASE_PATH", &args.database_path);
// Create channels for sending messages and receiving results
let (s_gossip, r_gossip) = unbounded::<GossipMessages>();
let (s_main, r_main) = unbounded::<GossipMessages>();
let state = AppState { let state = AppState {
cache: init_cache().await, cache: init_cache().await,
to_gossip: s_main.clone()
}; };
let app = Router::new() let app = Router::new()
@ -105,53 +155,103 @@ async fn main() -> Result<(), sqlx::Error> {
.layer(middleware::from_fn_with_state(state.clone(), cached_page)) .layer(middleware::from_fn_with_state(state.clone(), cached_page))
.with_state(state); .with_state(state);
// Create channels for sending messages and receiving results
let (to_gossip, from_gossip) = unbounded::<String>();
let (to_main, from_main) = unbounded();
// Spawn a worker thread
let gossip_server = thread::spawn(move || {
let _ = gossiper(from_main, to_gossip);
});
println!("Running webserver on {}", args.bind); println!("Running webserver on {}", args.bind);
let webserver = axum::Server::bind(&args.bind.parse().unwrap()).serve(app.into_make_service()); let webserver = axum::Server::bind(&args.bind.parse().unwrap())
tokio::spawn(webserver); .serve(app.into_make_service());
loop { if args.peers.is_some_and(|f| f != "".to_owned()) {
for msg in from_gossip.try_iter() { tokio::spawn(webserver);
println!("{}", msg); println!("starting gossip worker");
// Spawn a worker thread
let gossip_server = thread::scope(|scope| {
let _ = gossiper(s_main.clone(), r_main, s_gossip);
});
let _ = s_main.send(GossipMessages::Ready);
loop {
for msg in r_gossip.try_iter() {
println!("gossip thrd: {}", msg);
}
} }
} else {
let _ = webserver.await;
} }
// Optionally, wait for the worker thread to finish
gossip_server.join().unwrap();
Ok(()) Ok(())
} }
fn gossiper(rx: Receiver<String>, tx: Sender<String>) -> io::Result<()> { fn gossiper(own_sender: Sender<GossipMessages>, rx: Receiver<GossipMessages>, tx: Sender<GossipMessages>) -> io::Result<()> {
let mut peers: HashMap<String, Peer> = HashMap::new();
let mut buf = [0; 1024]; let mut buf = [0; 1024];
let socket = UdpSocket::bind("0.0.0.0:1337")?; let socket = UdpSocket::bind("0.0.0.0:1337")?;
let _ = tx.send("rdy".to_owned()); let r_socket = socket.try_clone().unwrap();
let _ = tx.send(GossipMessages::Ready);
// Handle messages that are passed to us from the main thread. // Listen on our UDP socket and pass messages back up.
for message in rx.try_iter() { thread::spawn(move || loop {
println!("Received: {}", message);
}
// Read from the UDP socket.
loop {
let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data"); let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data");
let request = String::from_utf8_lossy(&buf[..size]); let mut request = String::from_utf8_lossy(&buf[..size]).into_owned();
let _ = tx.send(request.to_string()).unwrap(); trim_newline(&mut request);
let response = "ack!"; let msg = GossipMessages::from_str(request).unwrap();
let _ = own_sender.send(msg).unwrap();
let response = "ack";
socket socket
.send_to(response.as_bytes(), source) .send_to(response.as_bytes(), source)
.expect("Failed to send response"); .expect("Failed to send response");
});
// Handle messages that are passed to us from the main thread or the UDP socket.
loop {
let message = rx.recv().unwrap();
println!("got: {}", &message);
match message {
GossipMessages::Cache => {
let selected_peers: Vec<String> = match select_peers(&peers) {
Some(p) => { println!("found peers"); p },
None => { println!("no peers, not gossiping"); vec![] },
};
for peer in selected_peers {
let msg = GossipMessages::Cache.to_string().into_bytes();
let result = match r_socket.send_to(&msg, &peer) {
Ok(_) => "",
Err(_) => "",
};
let p = peers.get_mut(&peer).unwrap();
p.counter = p.counter + 1;
}
},
GossipMessages::Ack => {
println!("healthy ack");
}
_ => {}
}
} }
} }
fn trim_newline(s: &mut String) {
if s.ends_with('\n') {
s.pop();
if s.ends_with('\r') {
s.pop();
}
}
}
/// select a third of the peers we know about to gossip to.
fn select_peers(peers: &HashMap<String, Peer>) -> Option<Vec<String>> {
if peers.len() == 0 {
return None;
}
let healthy: Vec<String> = peers.into_iter()
.filter(|(&ref k, &ref peer)| matches!(peer.health, PeerStatus::Healthy))
.map(|(&ref ip, &ref peer)| ip.to_owned()).collect();
let len = healthy.len() as usize / (3 as usize);
let rng = &mut rand::thread_rng();
// Select a random number of peers to gossip to.
Some(healthy.choose_multiple(rng, len).map(|f| f.to_owned()).collect())
}
async fn raw_blog_post(Path(post): Path<String>) -> Result<impl IntoResponse, StatusCode> { async fn raw_blog_post(Path(post): Path<String>) -> Result<impl IntoResponse, StatusCode> {
let post = posts::blog_post(post); let post = posts::blog_post(post);
if post.is_err() { if post.is_err() {
@ -320,6 +420,11 @@ async fn cached_page<T>(
let content = String::from_utf8(res).unwrap(); let content = String::from_utf8(res).unwrap();
state.cache.set(path, contenttype.to_owned(), content).await; state.cache.set(path, contenttype.to_owned(), content).await;
match state.to_gossip.send(GossipMessages::Cache) {
Ok(_) => { },
Err(_) => { },
};
return Response::builder() return Response::builder()
.header("content-type", contenttype) .header("content-type", contenttype)
.header("cache", "miss") .header("cache", "miss")