Implement basic gossip protocol
Some checks failed
Fly Deploy / Deploy app (push) Failing after 3m6s

Disabled unless peers are provided, communicates with other
instances (up to 3, and then a random selection of a third of the
available hosts). Still to do:

 - [ ] DNS based peer discovery
 - [ ] Probing of instances / healthchecks
 - [ ] Test propogation of membership changes/additions
This commit is contained in:
Gabriel Simmer 2023-10-07 17:15:15 +01:00
parent 7e2015c124
commit 8a65ef8cb4
Signed by: arch
SSH key fingerprint: SHA256:m3OEcdtrnBpMX+2BDGh/byv3hrCekCLzDYMdvGEKPPQ
12 changed files with 552 additions and 421 deletions

158
Cargo.lock generated
View file

@ -69,6 +69,15 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "aho-corasick"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "alloc-no-stdlib" name = "alloc-no-stdlib"
version = "2.0.4" version = "2.0.4"
@ -157,6 +166,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "anyhow"
version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.5.2" version = "0.5.2"
@ -470,7 +485,7 @@ dependencies = [
"clap 2.34.0", "clap 2.34.0",
"entities", "entities",
"lazy_static 0.2.11", "lazy_static 0.2.11",
"regex", "regex 0.2.11",
"typed-arena 1.7.0", "typed-arena 1.7.0",
"unicode_categories", "unicode_categories",
] ]
@ -855,6 +870,12 @@ version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd991bcfc01ee8f9ed83108da842aeddfca8a9550962cbffc9579050109c2aa9" checksum = "fd991bcfc01ee8f9ed83108da842aeddfca8a9550962cbffc9579050109c2aa9"
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.26" version = "1.0.26"
@ -1382,9 +1403,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.4.5" version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -1519,6 +1540,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.11" version = "0.2.11"
@ -1796,6 +1823,16 @@ dependencies = [
"sha2", "sha2",
] ]
[[package]]
name = "petgraph"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [
"fixedbitset",
"indexmap 2.0.0",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.3" version = "1.1.3"
@ -1873,6 +1910,16 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "prettyplease"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62"
dependencies = [
"proc-macro2",
"syn 2.0.28",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -1906,6 +1953,60 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "prost"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac"
dependencies = [
"bytes",
"heck",
"itertools",
"log",
"multimap",
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex 1.9.4",
"syn 2.0.28",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.28",
]
[[package]]
name = "prost-types"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf"
dependencies = [
"prost",
]
[[package]] [[package]]
name = "quick-start" name = "quick-start"
version = "0.1.0" version = "0.1.0"
@ -1923,6 +2024,8 @@ dependencies = [
"lazy_static 1.4.0", "lazy_static 1.4.0",
"maud", "maud",
"orgize", "orgize",
"prost",
"prost-build",
"rand", "rand",
"rss", "rss",
"serde", "serde",
@ -1999,13 +2102,36 @@ version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" checksum = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick 0.6.10",
"memchr", "memchr",
"regex-syntax", "regex-syntax 0.5.6",
"thread_local", "thread_local",
"utf8-ranges", "utf8-ranges",
] ]
[[package]]
name = "regex"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29"
dependencies = [
"aho-corasick 1.1.1",
"memchr",
"regex-automata",
"regex-syntax 0.7.5",
]
[[package]]
name = "regex-automata"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629"
dependencies = [
"aho-corasick 1.1.1",
"memchr",
"regex-syntax 0.7.5",
]
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.5.6" version = "0.5.6"
@ -2015,6 +2141,12 @@ dependencies = [
"ucd-util", "ucd-util",
] ]
[[package]]
name = "regex-syntax"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.18" version = "0.11.18"
@ -2109,9 +2241,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.8" version = "0.38.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662"
dependencies = [ dependencies = [
"bitflags 2.4.0", "bitflags 2.4.0",
"errno", "errno",
@ -3135,6 +3267,18 @@ dependencies = [
"rustls-webpki", "rustls-webpki",
] ]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix",
]
[[package]] [[package]]
name = "whoami" name = "whoami"
version = "1.4.1" version = "1.4.1"

View file

@ -29,3 +29,7 @@ time = { version = "0.3.28", features = ["parsing", "formatting", "macros"] }
async-trait = "0.1.73" async-trait = "0.1.73"
crossbeam = "0.8.2" crossbeam = "0.8.2"
rand = "0.8.5" rand = "0.8.5"
prost = "0.12"
[build-dependencies]
prost-build = "0.12"

6
build.rs Normal file
View file

@ -0,0 +1,6 @@
use std::io::Result;
fn main() -> Result<()> {
prost_build::compile_protos(&["protobuf/items.proto"], &["protobuf/"])?;
Ok(())
}

View file

@ -26,11 +26,11 @@
"rust-overlay": "rust-overlay" "rust-overlay": "rust-overlay"
}, },
"locked": { "locked": {
"lastModified": 1696124486, "lastModified": 1696266955,
"narHash": "sha256-a/xfX9CZnNQoW38w7+fpxBJ8aFQVQQg+Va3OcHBBCZs=", "narHash": "sha256-GhaBeBWwejBTzBQl803x7iUXQ6GGUZgBxz+qyk1E3v4=",
"owner": "ipetkov", "owner": "ipetkov",
"repo": "crane", "repo": "crane",
"rev": "af842d7319f8c3d9e5215a79306824897fcb667c", "rev": "581245bf1233d6f621ce3b6cb99224a948c3a37f",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -47,11 +47,11 @@
"rust-analyzer-src": [] "rust-analyzer-src": []
}, },
"locked": { "locked": {
"lastModified": 1696141234, "lastModified": 1696314121,
"narHash": "sha256-0dZpggYjjmWEk+rGixiBHOHuQfLzEzNfrtjSig04s6Q=", "narHash": "sha256-Dd0xm92D6cQ0c46uTMzBy7Zq9tyHScArpJtkzT87L4E=",
"owner": "nix-community", "owner": "nix-community",
"repo": "fenix", "repo": "fenix",
"rev": "9ccae1754eec0341b640d5705302ac0923d22875", "rev": "c543df32930d075f807fc0b00c3101bc6a3b163d",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -114,11 +114,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1696009558, "lastModified": 1696261572,
"narHash": "sha256-/1nNL8lCF0gn38XaFyu2ufpWcBFwCDZyYUxdZkM6GxU=", "narHash": "sha256-s8TtSYJ1LBpuITXjbPLUPyxzAKw35LhETcajJjCS5f0=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "c182df2e68bd97deb32c7e4765adfbbbcaf75b60", "rev": "0c7ffbc66e6d78c50c38e717ec91a2a14e0622fb",
"type": "github" "type": "github"
}, },
"original": { "original": {

View file

@ -33,8 +33,16 @@
inherit (pkgs) lib; inherit (pkgs) lib;
craneLib = crane.lib.${system}; craneLib = crane.lib.${system};
src = craneLib.cleanCargoSource (craneLib.path ./.); src = lib.cleanSourceWith {
src = craneLib.path ./.; # The original, unfiltered source
filter = path: type:
let
protoFilter = path: _type: builtins.match ".*proto$" path != null;
protoOrCargo = path: type:
(protoFilter path type) || (craneLib.filterCargoSources path type);
in
protoOrCargo path type;
};
# Common arguments can be set here to avoid repeating them later # Common arguments can be set here to avoid repeating them later
commonArgs = { commonArgs = {
inherit src; inherit src;
@ -42,6 +50,7 @@
pkgs.sqlite pkgs.sqlite
pkgs.pkg-config pkgs.pkg-config
pkgs.openssl pkgs.openssl
pkgs.protobuf
] ++ lib.optionals pkgs.stdenv.isDarwin [ ] ++ lib.optionals pkgs.stdenv.isDarwin [
# Additional darwin specific inputs can be set here # Additional darwin specific inputs can be set here
pkgs.libiconv pkgs.libiconv
@ -61,6 +70,7 @@
# Build the actual crate itself, reusing the dependency # Build the actual crate itself, reusing the dependency
# artifacts from above. # artifacts from above.
my-crate = craneLib.buildPackage (commonArgs // { my-crate = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts; inherit cargoArtifacts;
}); });

0
protobuf/.keep Normal file
View file

51
protobuf/items.proto Normal file
View file

@ -0,0 +1,51 @@
syntax = "proto3";
package gabrielsimmerdotcom.gossip;
message Ping {}
message PingRequest {
string peer = 1;
}
enum Status {
PEER_UNKNOWN = 0;
PEER_ALIVE = 1;
PEER_SUSPECT = 2;
PEER_CONFIRM = 3;
}
// { "127.0.0.1:1337", confirm }
message Membership {
string peer = 1;
Status status = 2;
int64 inc = 3;
}
message NewCache {
string key = 1;
string content = 2;
string content_type = 3;
}
message Cache {
string key = 1;
string content = 2;
string content_type = 3;
int64 timestamp = 4;
}
message Payload {
string peer = 1;
oneof msg {
Cache cache = 2;
Ping ping = 3;
Ack ack = 4;
Membership membership = 5;
NewCache new_cache = 6;
}
}
message Ack {
}

46
src/cache/memory.rs vendored
View file

@ -1,46 +0,0 @@
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::sync::Mutex;
use super::{should_use, CacheMechanism, CachedItem, Tier};
#[derive(Clone, Debug)]
pub struct Memory {}
lazy_static! {
static ref CACHE: Mutex<HashMap<String, CachedItem>> = Mutex::new(HashMap::new());
}
pub fn new() -> Memory {
return Memory {};
}
#[async_trait]
impl CacheMechanism for Memory {
async fn get(&self, key: &String) -> Option<CachedItem> {
let data = CACHE.lock().await;
match data.get(key) {
Some(c) => {
if should_use(&c, Tier::Memory) {
let mut r = c.clone();
r.tier = Some(Tier::Memory);
Some(r)
} else {
None
}
}
None => None,
}
}
async fn rm(&mut self, key: String) {
let mut data = CACHE.lock().await;
data.remove(&key);
}
async fn set(&self, key: String, item: CachedItem) {
let mut data = CACHE.lock().await;
data.insert(key, item);
}
}

147
src/cache/mod.rs vendored
View file

@ -1,6 +1,3 @@
mod memory;
mod sqlite;
use async_trait::async_trait; use async_trait::async_trait;
use sqlx::FromRow; use sqlx::FromRow;
use std::{ use std::{
@ -8,122 +5,18 @@ use std::{
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
use self::{memory::Memory, sqlite::Sqlite}; use std::collections::HashMap;
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Cache {
memory: Memory,
sqlite: Option<Sqlite>,
}
pub async fn init_cache() -> Cache {
Cache {
memory: memory::new(),
sqlite: sqlite::new().await,
}
}
/// Tier enums take an i64, which is the amount of time in seconds
/// the tier should consider the contents of the cache valid.
#[derive(Clone, Debug, sqlx::Type)]
pub enum Tier {
Memory,
Sqlite,
External,
None,
}
impl fmt::Display for Tier {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Tier::Memory => write!(f, "memory"),
Tier::Sqlite => write!(f, "sqlite"),
Tier::External => write!(f, "external"),
Tier::None => write!(f, ""),
}
}
}
#[derive(Clone, Debug, FromRow)] #[derive(Clone, Debug, FromRow)]
pub struct CachedItem { pub struct CachedItem {
pub content_type: String, pub content_type: String,
pub content: String, pub content: String,
cached: i64, pub cached: i64,
#[sqlx(default)]
tier: Option<Tier>,
}
impl CachedItem {
pub fn tier(&self) -> Tier {
self.tier.clone().unwrap_or(Tier::None)
}
}
#[async_trait]
pub trait CacheMechanism: Sized + Clone + Send + Sync + 'static {
async fn get(&self, key: &String) -> Option<CachedItem>;
async fn rm(&mut self, key: String);
async fn set(&self, key: String, item: CachedItem);
}
impl Cache {
pub async fn get(&self, key: &String) -> Option<CachedItem> {
let m = self.memory.get(key).await;
if m.is_some() {
return m;
}
if self.sqlite.is_some() {
let sq = self.sqlite.clone().unwrap();
let s = sq.get(key).await;
if s.is_some() {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_secs()
.try_into()
.unwrap();
let mut refresh_memory = s.clone().unwrap();
refresh_memory.cached = current_time;
let _ = self.memory.set(key.clone(), refresh_memory).await;
return s;
}
}
return None;
}
pub async fn set(&self, key: String, content_type: String, content: String) -> bool {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_secs()
.try_into()
.unwrap();
let cached_item = CachedItem {
content_type,
content,
cached: current_time,
tier: None,
};
self.memory.set(key.clone(), cached_item.clone()).await;
if self.sqlite.is_some() {
let sq = self.sqlite.clone().unwrap();
sq.set(key.clone(), cached_item.clone()).await;
}
true
}
} }
/// Determine whether we should actually use the cached item or not. /// Determine whether we should actually use the cached item or not.
fn should_use(item: &CachedItem, tier: Tier) -> bool { fn should_use(item: &CachedItem) -> bool {
// TODO: Make configurable.
let cache_time = match tier {
Tier::Memory => 2 * 60,
Tier::Sqlite => 10 * 60,
Tier::External => 0,
Tier::None => 0,
};
let current_time: i64 = SystemTime::now() let current_time: i64 = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!") .expect("SystemTime before UNIX EPOCH!")
@ -131,5 +24,35 @@ fn should_use(item: &CachedItem, tier: Tier) -> bool {
.try_into() .try_into()
.unwrap(); .unwrap();
current_time <= (item.cached + cache_time) && item.content != "" current_time <= (item.cached + (2*60)) && item.content != ""
}
lazy_static! {
static ref CACHE: Mutex<HashMap<String, CachedItem>> = Mutex::new(HashMap::new());
}
pub async fn get(key: &String) -> Option<CachedItem> {
let data = CACHE.lock().await;
dbg!(&key);
match data.get(key) {
Some(c) => {
if should_use(&c) {
Some(c.clone())
} else {
//let _rm = rm(key.to_string()).await;
None
}
}
None => None,
}
}
async fn rm(key: String) {
let mut data = CACHE.lock().await;
data.remove(&key);
}
pub async fn set(key: String, item: CachedItem) {
let mut data = CACHE.lock().await;
data.insert(key, item);
} }

60
src/cache/sqlite.rs vendored
View file

@ -1,60 +0,0 @@
use std::{env, str::FromStr};
use async_trait::async_trait;
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
Pool,
};
use super::{should_use, CacheMechanism, CachedItem, Tier};
#[derive(Clone, Debug)]
pub struct Sqlite {
pool: Pool<sqlx::Sqlite>,
}
pub async fn new() -> Option<Sqlite> {
let path = env::var("DATABASE_PATH").unwrap_or("gs.db".to_owned());
let opts = SqliteConnectOptions::from_str(&path)
.unwrap()
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(opts).await.unwrap();
return Some(Sqlite { pool });
}
#[async_trait]
impl CacheMechanism for Sqlite {
async fn get(&self, key: &String) -> Option<CachedItem> {
let res = sqlx::query_as::<_, CachedItem>("SELECT * FROM cached WHERE route = $1")
.bind(&key)
.fetch_one(&self.pool)
.await;
if res.is_ok() {
let c = res.unwrap();
if should_use(&c, Tier::Sqlite) {
let mut r = c.clone();
r.tier = Some(Tier::Sqlite);
return Some(r);
}
}
None
}
async fn rm(&mut self, key: String) {
todo!()
}
async fn set(&self, key: String, item: CachedItem) {
let cache_sqlite = sqlx::query(
"INSERT OR REPLACE INTO cached (route, cached, content_type, content) VALUES ( $1, $2, $3, $4 )",
)
.bind(key)
.bind(item.cached)
.bind(item.content_type)
.bind(item.content)
.execute(&self.pool)
.await;
}
}

View file

@ -1,11 +1,15 @@
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
// Include the `items` module, which is generated from items.proto.
pub mod items {
include!(concat!(env!("OUT_DIR"), "/gabrielsimmerdotcom.gossip.rs"));
}
mod cache; mod cache;
mod posts; mod posts;
use axum::extract::Path; use axum::extract::Path;
use axum::http::request;
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::{ use axum::{
body::Full, body::Full,
@ -16,60 +20,29 @@ use axum::{
routing::get, routing::get,
Router, Router,
}; };
use cache::CachedItem;
use clap::Parser; use clap::Parser;
use crossbeam::channel::{unbounded, Receiver, Sender}; use crossbeam::channel::{unbounded, Receiver, Sender};
use file_format::{FileFormat, Kind}; use file_format::{FileFormat, Kind};
use hyper::body::Bytes; use hyper::body::Bytes;
use maud::{html, Markup, PreEscaped, Render, DOCTYPE}; use maud::{html, Markup, PreEscaped, Render, DOCTYPE};
use prost::Message;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rss::ChannelBuilder; use rss::ChannelBuilder;
use serde::Deserialize; use serde::Deserialize;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Display;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::str::FromStr; use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{env, io, thread}; use std::{env, io, thread};
use time::{self, format_description, format_description::well_known::Rfc2822}; use time::{self, format_description, format_description::well_known::Rfc2822};
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
use crate::cache::{init_cache, Cache}; #[derive(Debug)]
enum PeerStatus {
Healthy,
Suspect,
Down
}
enum GossipMessages {
Ready,
Cache,
Ack,
}
impl GossipMessages {
fn from_str(request: String) -> Option<Self> {
match request.as_str() {
"rdy" => Some(Self::Ready),
"cache" => Some(Self::Cache),
"ack" => Some(Self::Ack),
_ => None,
}
}
}
impl Display for GossipMessages {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GossipMessages::Ready => write!(f, "rdy"),
GossipMessages::Cache => write!(f, "cache"),
GossipMessages::Ack => write!(f, "ack"),
}
}
}
struct Peer { struct Peer {
counter: i64, counter: i64,
health: PeerStatus health: items::Status,
} }
#[derive(Parser)] #[derive(Parser)]
@ -79,13 +52,14 @@ struct Cli {
#[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())] #[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())]
bind: String, bind: String,
#[arg(short, long)] #[arg(short, long)]
peers: Option<String> peers: Option<String>,
#[arg(long, default_value_t=("0.0.0.0:1337").to_string())]
gossip_bind: String,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct AppState { struct AppState {
cache: Cache, to_gossip: Sender<items::Payload>,
to_gossip: Sender<GossipMessages>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -126,22 +100,11 @@ impl Render for Project {
async fn main() -> Result<(), sqlx::Error> { async fn main() -> Result<(), sqlx::Error> {
let args = Cli::parse(); let args = Cli::parse();
let opts = SqliteConnectOptions::from_str(&args.database_path)?
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(opts).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
env::set_var("DATABASE_PATH", &args.database_path);
// Create channels for sending messages and receiving results // Create channels for sending messages and receiving results
let (s_gossip, r_gossip) = unbounded::<GossipMessages>(); let (s_gossip, r_gossip) = unbounded::<items::Payload>();
let (s_main, r_main) = unbounded::<GossipMessages>(); let (s_main, r_main) = unbounded::<items::Payload>();
let state = AppState { let state = AppState {
cache: init_cache().await, to_gossip: s_main.clone(),
to_gossip: s_main.clone()
}; };
let app = Router::new() let app = Router::new()
@ -156,21 +119,37 @@ async fn main() -> Result<(), sqlx::Error> {
.with_state(state); .with_state(state);
println!("Running webserver on {}", args.bind); println!("Running webserver on {}", args.bind);
let webserver = axum::Server::bind(&args.bind.parse().unwrap()) let webserver = axum::Server::bind(&args.bind.parse().unwrap()).serve(app.into_make_service());
.serve(app.into_make_service());
if args.peers.is_some_and(|f| f != "".to_owned()) { if args.peers.is_some() {
tokio::spawn(webserver); tokio::spawn(webserver);
println!("starting gossip worker"); println!("starting gossip worker");
// Spawn a worker thread // Spawn a worker thread
let gossip_server = thread::scope(|scope| { let _gossip_server = thread::spawn(move || {
let _ = gossiper(s_main.clone(), r_main, s_gossip); let binding = args.peers.unwrap();
let peer_list: Vec<&str> = binding.split(",").collect();
let _ = gossiper(
&args.gossip_bind,
peer_list,
s_main.clone(),
r_main,
s_gossip,
);
}); });
let _ = s_main.send(GossipMessages::Ready);
loop { loop {
for msg in r_gossip.try_iter() { let message = r_gossip.recv().unwrap();
println!("gossip thrd: {}", msg); match &message.msg {
Some(items::payload::Msg::Cache(cache)) => {
cache::set(
cache.key.clone(),
CachedItem{
content_type: cache.content_type.clone(),
content: cache.content.clone(),
cached: cache.timestamp,
}
).await;
}
_ => {}
} }
} }
} else { } else {
@ -180,60 +159,155 @@ async fn main() -> Result<(), sqlx::Error> {
Ok(()) Ok(())
} }
fn gossiper(own_sender: Sender<GossipMessages>, rx: Receiver<GossipMessages>, tx: Sender<GossipMessages>) -> io::Result<()> { fn gossiper(
bind: &String,
peer_list: Vec<&str>,
own_sender: Sender<items::Payload>,
rx: Receiver<items::Payload>,
tx: Sender<items::Payload>,
) -> io::Result<()> {
let mut peers: HashMap<String, Peer> = HashMap::new(); let mut peers: HashMap<String, Peer> = HashMap::new();
let mut buf = [0; 1024]; for peer in peer_list {
let socket = UdpSocket::bind("0.0.0.0:1337")?; peers.insert(
peer.to_owned(),
Peer {
counter: 0,
health: items::Status::PeerAlive,
},
);
}
let mut buf = [0; 10240];
let socket = UdpSocket::bind(bind.as_str())?;
let r_socket = socket.try_clone().unwrap(); let r_socket = socket.try_clone().unwrap();
let _ = tx.send(GossipMessages::Ready); // When we come up, try to communicate we're alive to other peers.
let selected_peers: Vec<String> = match select_peers(&peers) {
// Listen on our UDP socket and pass messages back up. Some(p) => {
println!("found peers");
p
}
None => {
println!("no peers, not gossiping");
vec![]
}
};
for peer in selected_peers {
let membership = items::Payload {
msg: Some(items::payload::Msg::Membership(items::Membership {
peer: bind.to_string(),
status: items::Status::PeerAlive as i32,
inc: 0,
})),
peer: bind.to_string(),
};
let response = Message::encode_to_vec(&membership);
let _result = match r_socket.send_to(&response, &peer) {
Ok(_) => "",
Err(_) => "",
};
let existing = peers.get(&peer).unwrap();
peers.insert(
peer,
Peer {
counter: existing.counter + 1,
health: items::Status::PeerAlive,
},
);
}
let bound = bind.clone();
thread::spawn(move || loop { thread::spawn(move || loop {
let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data"); let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data");
let mut request = String::from_utf8_lossy(&buf[..size]).into_owned(); let msg: Result<items::Payload, _> = Message::decode(&buf[..size]);
trim_newline(&mut request);
let msg = GossipMessages::from_str(request).unwrap(); let _ = own_sender.send(msg.clone().unwrap()).unwrap();
let _ = own_sender.send(msg).unwrap(); if msg.unwrap().msg != Some(items::payload::Msg::Ack(items::Ack {})) {
let response = "ack"; let ack = items::Payload {
peer: bound.to_string(),
msg: Some(items::payload::Msg::Ack(items::Ack {})),
};
let res = Message::encode_to_vec(&ack);
socket socket
.send_to(response.as_bytes(), source) .send_to(&res, source)
.expect("Failed to send response"); .expect("Failed to send response");
}
}); });
// Handle messages that are passed to us from the main thread or the UDP socket. // Handle messages that are passed to us from the main thread or the UDP socket.
loop { loop {
let message = rx.recv().unwrap(); let message = rx.recv().unwrap();
println!("got: {}", &message); match &message.msg {
match message { Some(items::payload::Msg::Cache(cache)) => {
GossipMessages::Cache => { println!("got new cache: {}", cache.content_type);
let selected_peers: Vec<String> = match select_peers(&peers) { let _ = tx.send(message.clone()).unwrap();
Some(p) => { println!("found peers"); p }, }
None => { println!("no peers, not gossiping"); vec![] }, Some(items::payload::Msg::Ack(_)) => {
println!("healthy ack");
let peer = message.peer;
let existing = peers.get(&peer).unwrap();
peers.insert(
peer,
Peer {
counter: existing.counter + 1,
health: items::Status::PeerAlive,
},
);
}
Some(items::payload::Msg::Ping(_)) => todo!(),
Some(items::payload::Msg::Membership(membership)) => {
println!("membership update");
let peer = message.peer;
let existing = peers.get(&peer).unwrap();
peers.insert(
peer,
Peer {
counter: existing.counter + 1,
health: membership.status(),
},
);
}
Some(items::payload::Msg::NewCache(new_cache)) => {
println!("instructed to send cache");
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_secs()
.try_into()
.unwrap();
let cache = items::Payload {
peer: bind.to_owned(),
msg: Some(items::payload::Msg::Cache(items::Cache {
key: new_cache.key.clone(),
content: new_cache.content.clone(),
content_type: new_cache.content_type.clone(),
timestamp: current_time,
})),
}; };
let selected_peers: Vec<String> = match select_peers(&peers) {
Some(p) => {
println!("found peers");
p
}
None => {
println!("no peers, not gossiping");
vec![]
}
};
let payload = Message::encode_to_vec(&cache);
for peer in selected_peers { for peer in selected_peers {
let msg = GossipMessages::Cache.to_string().into_bytes(); dbg!(&peer);
let result = match r_socket.send_to(&msg, &peer) { let _result = match r_socket.send_to(&payload, &peer) {
Ok(_) => "", Ok(_r) => {
Err(_) => "", // dbg!(r);
}
Err(e) => {
dbg!(e);
}
}; };
let p = peers.get_mut(&peer).unwrap(); let p = peers.get_mut(&peer).unwrap();
p.counter = p.counter + 1; p.counter = p.counter + 1;
} }
},
GossipMessages::Ack => {
println!("healthy ack");
} }
_ => {} None => {}
}
}
}
fn trim_newline(s: &mut String) {
if s.ends_with('\n') {
s.pop();
if s.ends_with('\r') {
s.pop();
} }
} }
} }
@ -243,13 +317,25 @@ fn select_peers(peers: &HashMap<String, Peer>) -> Option<Vec<String>> {
if peers.len() == 0 { if peers.len() == 0 {
return None; return None;
} }
let healthy: Vec<String> = peers.into_iter() let healthy: Vec<String> = peers
.filter(|(&ref k, &ref peer)| matches!(peer.health, PeerStatus::Healthy)) .into_iter()
.map(|(&ref ip, &ref peer)| ip.to_owned()).collect(); .filter(|(&ref k, &ref peer)| matches!(peer.health, items::Status::PeerAlive))
let len = healthy.len() as usize / (3 as usize); .map(|(&ref ip, &ref peer)| ip.to_owned())
.collect();
let mut len_rounded = healthy.len();
let rng = &mut rand::thread_rng(); let rng = &mut rand::thread_rng();
if len_rounded > 3 {
let len: f64 = (healthy.len() / 3) as f64;
len_rounded = len.ceil() as usize;
}
// Select a random number of peers to gossip to. // Select a random number of peers to gossip to.
Some(healthy.choose_multiple(rng, len).map(|f| f.to_owned()).collect()) Some(
healthy
.choose_multiple(rng, len_rounded)
.map(|f| f.to_owned())
.collect(),
)
} }
async fn raw_blog_post(Path(post): Path<String>) -> Result<impl IntoResponse, StatusCode> { async fn raw_blog_post(Path(post): Path<String>) -> Result<impl IntoResponse, StatusCode> {
@ -379,7 +465,7 @@ async fn cached_page<T>(
) -> Response<Full<Bytes>> { ) -> Response<Full<Bytes>> {
let path = request.uri().path().to_string(); let path = request.uri().path().to_string();
let item = state.cache.get(&path).await; let item = cache::get(&path).await;
if item.is_none() { if item.is_none() {
let res = next.run(request).await; let res = next.run(request).await;
let (res_parts, res_body) = res.into_parts(); let (res_parts, res_body) = res.into_parts();
@ -418,11 +504,30 @@ async fn cached_page<T>(
.unwrap(); .unwrap();
} }
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_secs()
.try_into()
.unwrap();
let content = String::from_utf8(res).unwrap(); let content = String::from_utf8(res).unwrap();
state.cache.set(path, contenttype.to_owned(), content).await; cache::set(path.clone(), CachedItem{
match state.to_gossip.send(GossipMessages::Cache) { content_type: contenttype.to_owned(),
Ok(_) => { }, content: content.clone(),
Err(_) => { }, cached: current_time,
})
.await;
let gossip = items::Payload {
peer: "".to_owned(),
msg: Some(items::payload::Msg::NewCache(items::NewCache {
key: path,
content,
content_type: contenttype.to_owned(),
})),
};
match state.to_gossip.send(gossip) {
Ok(_) => {}
Err(_) => {}
}; };
return Response::builder() return Response::builder()
@ -435,7 +540,7 @@ async fn cached_page<T>(
let i = item.unwrap(); let i = item.unwrap();
return Response::builder() return Response::builder()
.header("content-type", &i.content_type) .header("content-type", &i.content_type)
.header("cache", format!("hit-{}", &i.tier())) .header("cache", format!("hit"))
.status(StatusCode::OK) .status(StatusCode::OK)
.body(Full::from(i.content)) .body(Full::from(i.content))
.unwrap(); .unwrap();

View file

@ -1,6 +0,0 @@
syntax = "proto3";
package gabrielsimmerdotcom.gossip;
message GossipMessage {
}