From 8a65ef8cb481903617393765cc8d1914a7cd0bc2 Mon Sep 17 00:00:00 2001 From: Gabriel Simmer Date: Sat, 7 Oct 2023 17:15:15 +0100 Subject: [PATCH] Implement basic gossip protocol Disabled unless peers are provided, communicates with other instances (up to 3, and then a random selection of a third of the available hosts). Still to do: - [ ] DNS based peer discovery - [ ] Probing of instances / healthchecks - [ ] Test propogation of membership changes/additions --- Cargo.lock | 158 ++++++++++++++++++- Cargo.toml | 6 +- build.rs | 6 + flake.lock | 18 +-- flake.nix | 144 +++++++++-------- protobuf/.keep | 0 protobuf/items.proto | 51 ++++++ src/cache/memory.rs | 46 ------ src/cache/mod.rs | 147 +++++------------ src/cache/sqlite.rs | 60 ------- src/main.rs | 331 ++++++++++++++++++++++++++------------- src/protobuf/items.proto | 6 - 12 files changed, 552 insertions(+), 421 deletions(-) create mode 100644 build.rs create mode 100644 protobuf/.keep create mode 100644 protobuf/items.proto delete mode 100644 src/cache/memory.rs delete mode 100644 src/cache/sqlite.rs delete mode 100644 src/protobuf/items.proto diff --git a/Cargo.lock b/Cargo.lock index 2d84bc4..7f41e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,6 +69,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aho-corasick" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" +dependencies = [ + "memchr", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -157,6 +166,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "arrayvec" version = "0.5.2" @@ -470,7 +485,7 @@ dependencies = [ "clap 2.34.0", "entities", "lazy_static 0.2.11", - "regex", + "regex 0.2.11", "typed-arena 1.7.0", "unicode_categories", ] @@ -855,6 +870,12 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd991bcfc01ee8f9ed83108da842aeddfca8a9550962cbffc9579050109c2aa9" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.0.26" @@ -1382,9 +1403,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.5" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" +checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" [[package]] name = "lock_api" @@ -1519,6 +1540,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "native-tls" version = "0.2.11" @@ -1796,6 +1823,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.0.0", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -1873,6 +1910,16 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "prettyplease" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" +dependencies = [ + "proc-macro2", + "syn 2.0.28", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1906,6 +1953,60 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex 1.9.4", + "syn 2.0.28", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.28", +] + +[[package]] +name = "prost-types" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" +dependencies = [ + "prost", +] + [[package]] name = "quick-start" version = "0.1.0" @@ -1923,6 +2024,8 @@ dependencies = [ "lazy_static 1.4.0", "maud", "orgize", + "prost", + "prost-build", "rand", "rss", "serde", @@ -1999,13 +2102,36 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9329abc99e39129fcceabd24cf5d85b4671ef7c29c50e972bc5afe32438ec384" dependencies = [ - "aho-corasick", + "aho-corasick 0.6.10", "memchr", - "regex-syntax", + "regex-syntax 0.5.6", "thread_local", "utf8-ranges", ] +[[package]] +name = "regex" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +dependencies = [ + "aho-corasick 1.1.1", + "memchr", + "regex-automata", + "regex-syntax 0.7.5", +] + +[[package]] +name = "regex-automata" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +dependencies = [ + "aho-corasick 1.1.1", + "memchr", + "regex-syntax 0.7.5", +] + [[package]] name = "regex-syntax" version = "0.5.6" @@ -2015,6 +2141,12 @@ dependencies = [ "ucd-util", ] +[[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + [[package]] name = "reqwest" version = "0.11.18" @@ -2109,9 +2241,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.8" +version = "0.38.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" +checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" dependencies = [ "bitflags 2.4.0", "errno", @@ -3135,6 +3267,18 @@ dependencies = [ "rustls-webpki", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "whoami" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index f87af6e..2097297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,8 @@ rss = "2.0.6" time = { version = "0.3.28", features = ["parsing", "formatting", "macros"] } async-trait = "0.1.73" crossbeam = "0.8.2" -rand = "0.8.5" \ No newline at end of file +rand = "0.8.5" +prost = "0.12" + +[build-dependencies] +prost-build = "0.12" \ No newline at end of file diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..5b5fc2f --- /dev/null +++ b/build.rs @@ -0,0 +1,6 @@ +use std::io::Result; + +fn main() -> Result<()> { + prost_build::compile_protos(&["protobuf/items.proto"], &["protobuf/"])?; + Ok(()) +} diff --git a/flake.lock b/flake.lock index cb59d5a..2b0cac0 100644 --- a/flake.lock +++ b/flake.lock @@ -26,11 +26,11 @@ "rust-overlay": "rust-overlay" }, "locked": { - "lastModified": 1696124486, - "narHash": "sha256-a/xfX9CZnNQoW38w7+fpxBJ8aFQVQQg+Va3OcHBBCZs=", + "lastModified": 1696266955, + "narHash": "sha256-GhaBeBWwejBTzBQl803x7iUXQ6GGUZgBxz+qyk1E3v4=", "owner": "ipetkov", "repo": "crane", - "rev": "af842d7319f8c3d9e5215a79306824897fcb667c", + "rev": "581245bf1233d6f621ce3b6cb99224a948c3a37f", "type": "github" }, "original": { @@ -47,11 +47,11 @@ "rust-analyzer-src": [] }, "locked": { - "lastModified": 1696141234, - "narHash": "sha256-0dZpggYjjmWEk+rGixiBHOHuQfLzEzNfrtjSig04s6Q=", + "lastModified": 1696314121, + "narHash": "sha256-Dd0xm92D6cQ0c46uTMzBy7Zq9tyHScArpJtkzT87L4E=", "owner": "nix-community", "repo": "fenix", - "rev": "9ccae1754eec0341b640d5705302ac0923d22875", + "rev": "c543df32930d075f807fc0b00c3101bc6a3b163d", "type": "github" }, "original": { @@ -114,11 +114,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1696009558, - "narHash": "sha256-/1nNL8lCF0gn38XaFyu2ufpWcBFwCDZyYUxdZkM6GxU=", + "lastModified": 1696261572, + "narHash": "sha256-s8TtSYJ1LBpuITXjbPLUPyxzAKw35LhETcajJjCS5f0=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "c182df2e68bd97deb32c7e4765adfbbbcaf75b60", + "rev": "0c7ffbc66e6d78c50c38e717ec91a2a14e0622fb", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 4a364eb..4b265f1 100644 --- a/flake.nix +++ b/flake.nix @@ -33,8 +33,16 @@ inherit (pkgs) lib; craneLib = crane.lib.${system}; - src = craneLib.cleanCargoSource (craneLib.path ./.); - + src = lib.cleanSourceWith { + src = craneLib.path ./.; # The original, unfiltered source + filter = path: type: + let + protoFilter = path: _type: builtins.match ".*proto$" path != null; + protoOrCargo = path: type: + (protoFilter path type) || (craneLib.filterCargoSources path type); + in + protoOrCargo path type; + }; # Common arguments can be set here to avoid repeating them later commonArgs = { inherit src; @@ -42,6 +50,7 @@ pkgs.sqlite pkgs.pkg-config pkgs.openssl + pkgs.protobuf ] ++ lib.optionals pkgs.stdenv.isDarwin [ # Additional darwin specific inputs can be set here pkgs.libiconv @@ -61,83 +70,84 @@ # Build the actual crate itself, reusing the dependency # artifacts from above. + my-crate = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; }); in - { - checks = { - # Build the crate as part of `nix flake check` for convenience - inherit my-crate; + { + checks = { + # Build the crate as part of `nix flake check` for convenience + inherit my-crate; - # Run clippy (and deny all warnings) on the crate source, - # again, resuing the dependency artifacts from above. - # - # Note that this is done as a separate derivation so that - # we can block the CI if there are issues here, but not - # prevent downstream consumers from building our crate by itself. - my-crate-clippy = craneLib.cargoClippy (commonArgs // { - inherit cargoArtifacts; - cargoClippyExtraArgs = "--all-targets -- --deny warnings"; - }); + # Run clippy (and deny all warnings) on the crate source, + # again, resuing the dependency artifacts from above. + # + # Note that this is done as a separate derivation so that + # we can block the CI if there are issues here, but not + # prevent downstream consumers from building our crate by itself. + my-crate-clippy = craneLib.cargoClippy (commonArgs // { + inherit cargoArtifacts; + cargoClippyExtraArgs = "--all-targets -- --deny warnings"; + }); - my-crate-doc = craneLib.cargoDoc (commonArgs // { - inherit cargoArtifacts; - }); + my-crate-doc = craneLib.cargoDoc (commonArgs // { + inherit cargoArtifacts; + }); - # Check formatting - my-crate-fmt = craneLib.cargoFmt { - inherit src; + # Check formatting + my-crate-fmt = craneLib.cargoFmt { + inherit src; + }; + + # Audit dependencies + my-crate-audit = craneLib.cargoAudit { + inherit src advisory-db; + }; + + # Run tests with cargo-nextest + # Consider setting `doCheck = false` on `my-crate` if you do not want + # the tests to run twice + my-crate-nextest = craneLib.cargoNextest (commonArgs // { + inherit cargoArtifacts; + partitions = 1; + partitionType = "count"; + }); + } // lib.optionalAttrs (system == "x86_64-linux") { + # NB: cargo-tarpaulin only supports x86_64 systems + # Check code coverage (note: this will not upload coverage anywhere) + my-crate-coverage = craneLib.cargoTarpaulin (commonArgs // { + inherit cargoArtifacts; + }); }; - # Audit dependencies - my-crate-audit = craneLib.cargoAudit { - inherit src advisory-db; + packages = { + default = my-crate; + my-crate-llvm-coverage = craneLibLLvmTools.cargoLlvmCov (commonArgs // { + inherit cargoArtifacts; + }); }; - # Run tests with cargo-nextest - # Consider setting `doCheck = false` on `my-crate` if you do not want - # the tests to run twice - my-crate-nextest = craneLib.cargoNextest (commonArgs // { - inherit cargoArtifacts; - partitions = 1; - partitionType = "count"; - }); - } // lib.optionalAttrs (system == "x86_64-linux") { - # NB: cargo-tarpaulin only supports x86_64 systems - # Check code coverage (note: this will not upload coverage anywhere) - my-crate-coverage = craneLib.cargoTarpaulin (commonArgs // { - inherit cargoArtifacts; - }); - }; + apps.default = flake-utils.lib.mkApp { + drv = my-crate; + }; - packages = { - default = my-crate; - my-crate-llvm-coverage = craneLibLLvmTools.cargoLlvmCov (commonArgs // { - inherit cargoArtifacts; - }); - }; + devShells.default = pkgs.mkShell { + inputsFrom = builtins.attrValues self.checks.${system}; - apps.default = flake-utils.lib.mkApp { - drv = my-crate; - }; + # Additional dev-shell environment variables can be set directly + # MY_CUSTOM_DEVELOPMENT_VAR = "something else"; - devShells.default = pkgs.mkShell { - inputsFrom = builtins.attrValues self.checks.${system}; - - # Additional dev-shell environment variables can be set directly - # MY_CUSTOM_DEVELOPMENT_VAR = "something else"; - - # Extra inputs can be added here - nativeBuildInputs = with pkgs; [ - cargo - rustc - rust-analyzer - sqlite - sqlx-cli - flyctl - cargo-flamegraph - ]; - }; - }); + # Extra inputs can be added here + nativeBuildInputs = with pkgs; [ + cargo + rustc + rust-analyzer + sqlite + sqlx-cli + flyctl + cargo-flamegraph + ]; + }; + }); } diff --git a/protobuf/.keep b/protobuf/.keep new file mode 100644 index 0000000..e69de29 diff --git a/protobuf/items.proto b/protobuf/items.proto new file mode 100644 index 0000000..e250999 --- /dev/null +++ b/protobuf/items.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package gabrielsimmerdotcom.gossip; + +message Ping {} + +message PingRequest { + string peer = 1; +} + +enum Status { + PEER_UNKNOWN = 0; + PEER_ALIVE = 1; + PEER_SUSPECT = 2; + PEER_CONFIRM = 3; +} + +// { "127.0.0.1:1337", confirm } +message Membership { + string peer = 1; + Status status = 2; + int64 inc = 3; +} + +message NewCache { + string key = 1; + string content = 2; + string content_type = 3; +} + +message Cache { + string key = 1; + string content = 2; + string content_type = 3; + int64 timestamp = 4; +} + +message Payload { + string peer = 1; + oneof msg { + Cache cache = 2; + Ping ping = 3; + Ack ack = 4; + Membership membership = 5; + NewCache new_cache = 6; + } +} + +message Ack { + +} diff --git a/src/cache/memory.rs b/src/cache/memory.rs deleted file mode 100644 index acc5445..0000000 --- a/src/cache/memory.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::collections::HashMap; - -use async_trait::async_trait; -use tokio::sync::Mutex; - -use super::{should_use, CacheMechanism, CachedItem, Tier}; - -#[derive(Clone, Debug)] -pub struct Memory {} - -lazy_static! { - static ref CACHE: Mutex> = Mutex::new(HashMap::new()); -} - -pub fn new() -> Memory { - return Memory {}; -} - -#[async_trait] -impl CacheMechanism for Memory { - async fn get(&self, key: &String) -> Option { - let data = CACHE.lock().await; - match data.get(key) { - Some(c) => { - if should_use(&c, Tier::Memory) { - let mut r = c.clone(); - r.tier = Some(Tier::Memory); - Some(r) - } else { - None - } - } - None => None, - } - } - - async fn rm(&mut self, key: String) { - let mut data = CACHE.lock().await; - data.remove(&key); - } - - async fn set(&self, key: String, item: CachedItem) { - let mut data = CACHE.lock().await; - data.insert(key, item); - } -} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 93d81e6..a1d2248 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -1,6 +1,3 @@ -mod memory; -mod sqlite; - use async_trait::async_trait; use sqlx::FromRow; use std::{ @@ -8,122 +5,18 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use self::{memory::Memory, sqlite::Sqlite}; - -#[derive(Clone, Debug)] -pub struct Cache { - memory: Memory, - sqlite: Option, -} - -pub async fn init_cache() -> Cache { - Cache { - memory: memory::new(), - sqlite: sqlite::new().await, - } -} - -/// Tier enums take an i64, which is the amount of time in seconds -/// the tier should consider the contents of the cache valid. -#[derive(Clone, Debug, sqlx::Type)] -pub enum Tier { - Memory, - Sqlite, - External, - None, -} - -impl fmt::Display for Tier { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Tier::Memory => write!(f, "memory"), - Tier::Sqlite => write!(f, "sqlite"), - Tier::External => write!(f, "external"), - Tier::None => write!(f, ""), - } - } -} +use std::collections::HashMap; +use tokio::sync::Mutex; #[derive(Clone, Debug, FromRow)] pub struct CachedItem { pub content_type: String, pub content: String, - cached: i64, - #[sqlx(default)] - tier: Option, -} - -impl CachedItem { - pub fn tier(&self) -> Tier { - self.tier.clone().unwrap_or(Tier::None) - } -} - -#[async_trait] -pub trait CacheMechanism: Sized + Clone + Send + Sync + 'static { - async fn get(&self, key: &String) -> Option; - async fn rm(&mut self, key: String); - async fn set(&self, key: String, item: CachedItem); -} - -impl Cache { - pub async fn get(&self, key: &String) -> Option { - let m = self.memory.get(key).await; - if m.is_some() { - return m; - } - if self.sqlite.is_some() { - let sq = self.sqlite.clone().unwrap(); - let s = sq.get(key).await; - if s.is_some() { - let current_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH!") - .as_secs() - .try_into() - .unwrap(); - let mut refresh_memory = s.clone().unwrap(); - refresh_memory.cached = current_time; - let _ = self.memory.set(key.clone(), refresh_memory).await; - return s; - } - } - - return None; - } - - pub async fn set(&self, key: String, content_type: String, content: String) -> bool { - let current_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH!") - .as_secs() - .try_into() - .unwrap(); - let cached_item = CachedItem { - content_type, - content, - cached: current_time, - tier: None, - }; - self.memory.set(key.clone(), cached_item.clone()).await; - if self.sqlite.is_some() { - let sq = self.sqlite.clone().unwrap(); - sq.set(key.clone(), cached_item.clone()).await; - } - true - } + pub cached: i64, } /// Determine whether we should actually use the cached item or not. -fn should_use(item: &CachedItem, tier: Tier) -> bool { - // TODO: Make configurable. - let cache_time = match tier { - Tier::Memory => 2 * 60, - Tier::Sqlite => 10 * 60, - Tier::External => 0, - Tier::None => 0, - }; - +fn should_use(item: &CachedItem) -> bool { let current_time: i64 = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("SystemTime before UNIX EPOCH!") @@ -131,5 +24,35 @@ fn should_use(item: &CachedItem, tier: Tier) -> bool { .try_into() .unwrap(); - current_time <= (item.cached + cache_time) && item.content != "" + current_time <= (item.cached + (2*60)) && item.content != "" +} + +lazy_static! { + static ref CACHE: Mutex> = Mutex::new(HashMap::new()); +} + +pub async fn get(key: &String) -> Option { + let data = CACHE.lock().await; + dbg!(&key); + match data.get(key) { + Some(c) => { + if should_use(&c) { + Some(c.clone()) + } else { + //let _rm = rm(key.to_string()).await; + None + } + } + None => None, + } +} + +async fn rm(key: String) { + let mut data = CACHE.lock().await; + data.remove(&key); +} + +pub async fn set(key: String, item: CachedItem) { + let mut data = CACHE.lock().await; + data.insert(key, item); } diff --git a/src/cache/sqlite.rs b/src/cache/sqlite.rs deleted file mode 100644 index 3663490..0000000 --- a/src/cache/sqlite.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::{env, str::FromStr}; - -use async_trait::async_trait; -use sqlx::{ - sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, - Pool, -}; - -use super::{should_use, CacheMechanism, CachedItem, Tier}; - -#[derive(Clone, Debug)] -pub struct Sqlite { - pool: Pool, -} - -pub async fn new() -> Option { - let path = env::var("DATABASE_PATH").unwrap_or("gs.db".to_owned()); - let opts = SqliteConnectOptions::from_str(&path) - .unwrap() - .journal_mode(SqliteJournalMode::Wal) - .create_if_missing(true); - - let pool = SqlitePoolOptions::new().connect_with(opts).await.unwrap(); - return Some(Sqlite { pool }); -} - -#[async_trait] -impl CacheMechanism for Sqlite { - async fn get(&self, key: &String) -> Option { - let res = sqlx::query_as::<_, CachedItem>("SELECT * FROM cached WHERE route = $1") - .bind(&key) - .fetch_one(&self.pool) - .await; - if res.is_ok() { - let c = res.unwrap(); - if should_use(&c, Tier::Sqlite) { - let mut r = c.clone(); - r.tier = Some(Tier::Sqlite); - return Some(r); - } - } - None - } - - async fn rm(&mut self, key: String) { - todo!() - } - - async fn set(&self, key: String, item: CachedItem) { - let cache_sqlite = sqlx::query( - "INSERT OR REPLACE INTO cached (route, cached, content_type, content) VALUES ( $1, $2, $3, $4 )", - ) - .bind(key) - .bind(item.cached) - .bind(item.content_type) - .bind(item.content) - .execute(&self.pool) - .await; - } -} diff --git a/src/main.rs b/src/main.rs index 0769199..19e965b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ #[macro_use] extern crate lazy_static; +// Include the `items` module, which is generated from items.proto. +pub mod items { + include!(concat!(env!("OUT_DIR"), "/gabrielsimmerdotcom.gossip.rs")); +} + mod cache; mod posts; use axum::extract::Path; -use axum::http::request; use axum::response::IntoResponse; use axum::{ body::Full, @@ -16,60 +20,29 @@ use axum::{ routing::get, Router, }; +use cache::CachedItem; use clap::Parser; use crossbeam::channel::{unbounded, Receiver, Sender}; use file_format::{FileFormat, Kind}; use hyper::body::Bytes; use maud::{html, Markup, PreEscaped, Render, DOCTYPE}; +use prost::Message; use rand::seq::SliceRandom; use rss::ChannelBuilder; use serde::Deserialize; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use std::collections::HashMap; -use std::fmt::Display; use std::net::UdpSocket; use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; use std::{env, io, thread}; use time::{self, format_description, format_description::well_known::Rfc2822}; use tower_http::services::ServeDir; -use crate::cache::{init_cache, Cache}; - -enum PeerStatus { - Healthy, - Suspect, - Down -} - -enum GossipMessages { - Ready, - Cache, - Ack, -} -impl GossipMessages { - fn from_str(request: String) -> Option { - match request.as_str() { - "rdy" => Some(Self::Ready), - "cache" => Some(Self::Cache), - "ack" => Some(Self::Ack), - _ => None, - } - } -} - -impl Display for GossipMessages { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - GossipMessages::Ready => write!(f, "rdy"), - GossipMessages::Cache => write!(f, "cache"), - GossipMessages::Ack => write!(f, "ack"), - } - } -} - +#[derive(Debug)] struct Peer { counter: i64, - health: PeerStatus + health: items::Status, } #[derive(Parser)] @@ -79,13 +52,14 @@ struct Cli { #[arg(short, long, default_value_t=("0.0.0.0:3000").to_string())] bind: String, #[arg(short, long)] - peers: Option + peers: Option, + #[arg(long, default_value_t=("0.0.0.0:1337").to_string())] + gossip_bind: String, } #[derive(Clone, Debug)] struct AppState { - cache: Cache, - to_gossip: Sender, + to_gossip: Sender, } #[derive(Deserialize)] @@ -126,22 +100,11 @@ impl Render for Project { async fn main() -> Result<(), sqlx::Error> { let args = Cli::parse(); - let opts = SqliteConnectOptions::from_str(&args.database_path)? - .journal_mode(SqliteJournalMode::Wal) - .create_if_missing(true); - - let pool = SqlitePoolOptions::new().connect_with(opts).await?; - sqlx::migrate!("./migrations").run(&pool).await?; - - env::set_var("DATABASE_PATH", &args.database_path); - // Create channels for sending messages and receiving results - let (s_gossip, r_gossip) = unbounded::(); - let (s_main, r_main) = unbounded::(); - + let (s_gossip, r_gossip) = unbounded::(); + let (s_main, r_main) = unbounded::(); let state = AppState { - cache: init_cache().await, - to_gossip: s_main.clone() + to_gossip: s_main.clone(), }; let app = Router::new() @@ -156,84 +119,195 @@ async fn main() -> Result<(), sqlx::Error> { .with_state(state); println!("Running webserver on {}", args.bind); - let webserver = axum::Server::bind(&args.bind.parse().unwrap()) - .serve(app.into_make_service()); + let webserver = axum::Server::bind(&args.bind.parse().unwrap()).serve(app.into_make_service()); - if args.peers.is_some_and(|f| f != "".to_owned()) { + if args.peers.is_some() { tokio::spawn(webserver); println!("starting gossip worker"); // Spawn a worker thread - let gossip_server = thread::scope(|scope| { - let _ = gossiper(s_main.clone(), r_main, s_gossip); + let _gossip_server = thread::spawn(move || { + let binding = args.peers.unwrap(); + let peer_list: Vec<&str> = binding.split(",").collect(); + let _ = gossiper( + &args.gossip_bind, + peer_list, + s_main.clone(), + r_main, + s_gossip, + ); }); - - let _ = s_main.send(GossipMessages::Ready); loop { - for msg in r_gossip.try_iter() { - println!("gossip thrd: {}", msg); + let message = r_gossip.recv().unwrap(); + match &message.msg { + Some(items::payload::Msg::Cache(cache)) => { + cache::set( + cache.key.clone(), + CachedItem{ + content_type: cache.content_type.clone(), + content: cache.content.clone(), + cached: cache.timestamp, + } + ).await; + } + _ => {} } } } else { let _ = webserver.await; } - + Ok(()) } -fn gossiper(own_sender: Sender, rx: Receiver, tx: Sender) -> io::Result<()> { +fn gossiper( + bind: &String, + peer_list: Vec<&str>, + own_sender: Sender, + rx: Receiver, + tx: Sender, +) -> io::Result<()> { let mut peers: HashMap = HashMap::new(); - let mut buf = [0; 1024]; - let socket = UdpSocket::bind("0.0.0.0:1337")?; + for peer in peer_list { + peers.insert( + peer.to_owned(), + Peer { + counter: 0, + health: items::Status::PeerAlive, + }, + ); + } + let mut buf = [0; 10240]; + let socket = UdpSocket::bind(bind.as_str())?; let r_socket = socket.try_clone().unwrap(); - let _ = tx.send(GossipMessages::Ready); - - // Listen on our UDP socket and pass messages back up. + // When we come up, try to communicate we're alive to other peers. + let selected_peers: Vec = match select_peers(&peers) { + Some(p) => { + println!("found peers"); + p + } + None => { + println!("no peers, not gossiping"); + vec![] + } + }; + for peer in selected_peers { + let membership = items::Payload { + msg: Some(items::payload::Msg::Membership(items::Membership { + peer: bind.to_string(), + status: items::Status::PeerAlive as i32, + inc: 0, + })), + peer: bind.to_string(), + }; + let response = Message::encode_to_vec(&membership); + let _result = match r_socket.send_to(&response, &peer) { + Ok(_) => "", + Err(_) => "", + }; + let existing = peers.get(&peer).unwrap(); + peers.insert( + peer, + Peer { + counter: existing.counter + 1, + health: items::Status::PeerAlive, + }, + ); + } + let bound = bind.clone(); thread::spawn(move || loop { let (size, source) = socket.recv_from(&mut buf).expect("Failed to receive data"); - let mut request = String::from_utf8_lossy(&buf[..size]).into_owned(); - trim_newline(&mut request); + let msg: Result = Message::decode(&buf[..size]); - let msg = GossipMessages::from_str(request).unwrap(); - let _ = own_sender.send(msg).unwrap(); - let response = "ack"; - socket - .send_to(response.as_bytes(), source) - .expect("Failed to send response"); + let _ = own_sender.send(msg.clone().unwrap()).unwrap(); + if msg.unwrap().msg != Some(items::payload::Msg::Ack(items::Ack {})) { + let ack = items::Payload { + peer: bound.to_string(), + msg: Some(items::payload::Msg::Ack(items::Ack {})), + }; + let res = Message::encode_to_vec(&ack); + socket + .send_to(&res, source) + .expect("Failed to send response"); + } }); - + // Handle messages that are passed to us from the main thread or the UDP socket. loop { let message = rx.recv().unwrap(); - println!("got: {}", &message); - match message { - GossipMessages::Cache => { - let selected_peers: Vec = match select_peers(&peers) { - Some(p) => { println!("found peers"); p }, - None => { println!("no peers, not gossiping"); vec![] }, + match &message.msg { + Some(items::payload::Msg::Cache(cache)) => { + println!("got new cache: {}", cache.content_type); + let _ = tx.send(message.clone()).unwrap(); + } + Some(items::payload::Msg::Ack(_)) => { + println!("healthy ack"); + let peer = message.peer; + let existing = peers.get(&peer).unwrap(); + peers.insert( + peer, + Peer { + counter: existing.counter + 1, + health: items::Status::PeerAlive, + }, + ); + } + Some(items::payload::Msg::Ping(_)) => todo!(), + Some(items::payload::Msg::Membership(membership)) => { + println!("membership update"); + let peer = message.peer; + let existing = peers.get(&peer).unwrap(); + + peers.insert( + peer, + Peer { + counter: existing.counter + 1, + health: membership.status(), + }, + ); + } + Some(items::payload::Msg::NewCache(new_cache)) => { + println!("instructed to send cache"); + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!") + .as_secs() + .try_into() + .unwrap(); + let cache = items::Payload { + peer: bind.to_owned(), + msg: Some(items::payload::Msg::Cache(items::Cache { + key: new_cache.key.clone(), + content: new_cache.content.clone(), + content_type: new_cache.content_type.clone(), + timestamp: current_time, + })), }; + let selected_peers: Vec = match select_peers(&peers) { + Some(p) => { + println!("found peers"); + p + } + None => { + println!("no peers, not gossiping"); + vec![] + } + }; + let payload = Message::encode_to_vec(&cache); for peer in selected_peers { - let msg = GossipMessages::Cache.to_string().into_bytes(); - let result = match r_socket.send_to(&msg, &peer) { - Ok(_) => "", - Err(_) => "", + dbg!(&peer); + let _result = match r_socket.send_to(&payload, &peer) { + Ok(_r) => { + // dbg!(r); + } + Err(e) => { + dbg!(e); + } }; let p = peers.get_mut(&peer).unwrap(); p.counter = p.counter + 1; } - }, - GossipMessages::Ack => { - println!("healthy ack"); } - _ => {} - } - } -} - -fn trim_newline(s: &mut String) { - if s.ends_with('\n') { - s.pop(); - if s.ends_with('\r') { - s.pop(); + None => {} } } } @@ -243,13 +317,25 @@ fn select_peers(peers: &HashMap) -> Option> { if peers.len() == 0 { return None; } - let healthy: Vec = peers.into_iter() - .filter(|(&ref k, &ref peer)| matches!(peer.health, PeerStatus::Healthy)) - .map(|(&ref ip, &ref peer)| ip.to_owned()).collect(); - let len = healthy.len() as usize / (3 as usize); + let healthy: Vec = peers + .into_iter() + .filter(|(&ref k, &ref peer)| matches!(peer.health, items::Status::PeerAlive)) + .map(|(&ref ip, &ref peer)| ip.to_owned()) + .collect(); + let mut len_rounded = healthy.len(); let rng = &mut rand::thread_rng(); + + if len_rounded > 3 { + let len: f64 = (healthy.len() / 3) as f64; + len_rounded = len.ceil() as usize; + } // Select a random number of peers to gossip to. - Some(healthy.choose_multiple(rng, len).map(|f| f.to_owned()).collect()) + Some( + healthy + .choose_multiple(rng, len_rounded) + .map(|f| f.to_owned()) + .collect(), + ) } async fn raw_blog_post(Path(post): Path) -> Result { @@ -379,7 +465,7 @@ async fn cached_page( ) -> Response> { let path = request.uri().path().to_string(); - let item = state.cache.get(&path).await; + let item = cache::get(&path).await; if item.is_none() { let res = next.run(request).await; let (res_parts, res_body) = res.into_parts(); @@ -418,11 +504,30 @@ async fn cached_page( .unwrap(); } + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!") + .as_secs() + .try_into() + .unwrap(); let content = String::from_utf8(res).unwrap(); - state.cache.set(path, contenttype.to_owned(), content).await; - match state.to_gossip.send(GossipMessages::Cache) { - Ok(_) => { }, - Err(_) => { }, + cache::set(path.clone(), CachedItem{ + content_type: contenttype.to_owned(), + content: content.clone(), + cached: current_time, + }) + .await; + let gossip = items::Payload { + peer: "".to_owned(), + msg: Some(items::payload::Msg::NewCache(items::NewCache { + key: path, + content, + content_type: contenttype.to_owned(), + })), + }; + match state.to_gossip.send(gossip) { + Ok(_) => {} + Err(_) => {} }; return Response::builder() @@ -435,7 +540,7 @@ async fn cached_page( let i = item.unwrap(); return Response::builder() .header("content-type", &i.content_type) - .header("cache", format!("hit-{}", &i.tier())) + .header("cache", format!("hit")) .status(StatusCode::OK) .body(Full::from(i.content)) .unwrap(); diff --git a/src/protobuf/items.proto b/src/protobuf/items.proto deleted file mode 100644 index 0efca31..0000000 --- a/src/protobuf/items.proto +++ /dev/null @@ -1,6 +0,0 @@ -syntax = "proto3"; - -package gabrielsimmerdotcom.gossip; - -message GossipMessage { -}