diff --git a/Cargo.lock b/Cargo.lock index 3e3dbb6..ade2654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -614,6 +626,7 @@ dependencies = [ "libp2p", "log", "reqwest", + "rusqlite", "serde", "serde_json", "tokio", @@ -701,6 +714,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fiat-crypto" version = "0.2.9" @@ -938,6 +963,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -955,6 +989,15 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -1925,6 +1968,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -2332,6 +2386,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "polling" version = "3.11.0" @@ -2744,6 +2804,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags 2.10.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-hash" version = "2.1.1" @@ -3392,6 +3466,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index d919631..ec7d1b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,5 @@ reqwest = { version = "0.12", default-features = false, features = [ "json", ] } dotenvy = "0.15" +rusqlite = { version = "0.32", features = ["bundled"] } urlencoding = "2" diff --git a/src/main.rs b/src/main.rs index 7ade09a..a916078 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,8 @@ use std::collections::{HashMap, VecDeque}; use std::path::PathBuf; use std::time::{Duration, Instant}; +use rusqlite::{params, Connection}; + use futures::StreamExt; use libp2p::{ connection_limits, gossipsub, identify, noise, ping, relay, rendezvous, @@ -59,6 +61,8 @@ struct RelayBehaviour { limits: connection_limits::Behaviour, // gif search service - clients send GifRequest, relay responds with GifResponse gif_service: cbor::Behaviour, + // persistent directory service - clients register/search peer profiles + directory_service: cbor::Behaviour, } // ---- gif protocol ---- @@ -67,6 +71,8 @@ struct RelayBehaviour { // relay so clients never need credentials. const GIF_PROTOCOL: StreamProtocol = StreamProtocol::new("/dusk/gif/1.0.0"); +const DIRECTORY_PROTOCOL: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/dusk/directory/1.0.0"); const KLIPY_API_BASE: &str = "https://api.klipy.com/v2"; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -202,6 +208,35 @@ impl GifRateLimiter { } // ---- end gif rate limiter ---- +// ---- directory protocol ---- +// clients register their display_name here so others can search +// even when that peer is offline. the relay only stores peer_id + display_name. + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum DirectoryRequest { + // register/update this peer's display_name in the index + // peer_id is taken from the libp2p connection, not trusted from the request + Register { display_name: String }, + // search the index by display_name (LIKE %query%) or exact peer_id + Search { query: String }, + // remove this peer's profile from the index + Remove, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum DirectoryResponse { + Ok, + Results(Vec), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DirectoryProfileEntry { + pub peer_id: String, + pub display_name: String, + pub last_seen: u64, +} +// ---- end directory protocol ---- + // fetch from klipy and normalize into our GifResult format async fn fetch_klipy( http: &reqwest::Client, @@ -313,6 +348,34 @@ fn load_or_generate_keypair() -> libp2p::identity::Keypair { kp } +// resolve path for the relay's persistent directory database +fn directory_db_path() -> std::path::PathBuf { + if let Some(proj_dirs) = directories::ProjectDirs::from("", "", "dusk-relay") { + let dir = proj_dirs.data_dir().to_path_buf(); + std::fs::create_dir_all(&dir).ok(); + dir.join("directory.sqlite3") + } else { + std::path::PathBuf::from("./relay_directory.sqlite3") + } +} + +fn open_directory_db() -> Result { + let conn = Connection::open(directory_db_path())?; + conn.execute_batch( + "PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + CREATE TABLE IF NOT EXISTS peer_profiles ( + peer_id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + last_seen INTEGER NOT NULL, + registered_at INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_display_name + ON peer_profiles(display_name COLLATE NOCASE);", + )?; + Ok(conn) +} + #[tokio::main] async fn main() -> Result<(), Box> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -428,6 +491,12 @@ async fn main() -> Result<(), Box> { request_response::Config::default() .with_request_timeout(Duration::from_secs(15)), ), + // persistent directory service - clients register/search/remove profiles + directory_service: cbor::Behaviour::new( + [(DIRECTORY_PROTOCOL, ProtocolSupport::Inbound)], + request_response::Config::default() + .with_request_timeout(Duration::from_secs(15)), + ), } })? .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300))) @@ -502,6 +571,23 @@ async fn main() -> Result<(), Box> { // track peer relay connections for federation metrics let mut connected_peer_relays: Vec = Vec::new(); + let dir_db = match open_directory_db() { + Ok(db) => { + log::info!("directory db opened at {}", directory_db_path().display()); + db + } + Err(e) => { + log::error!("failed to open directory db: {}", e); + return Err(e.into()); + } + }; + let now_secs = || { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + }; + loop { let event = tokio::select! { // periodic cache eviction @@ -800,6 +886,94 @@ async fn main() -> Result<(), Box> { ); } + // directory service - clients register/search/remove their profiles + SwarmEvent::Behaviour(RelayBehaviourEvent::DirectoryService( + request_response::Event::Message { + peer, + message: request_response::Message::Request { request, channel, .. }, + .. + }, + )) => { + let response = match request { + DirectoryRequest::Register { display_name } => { + let ts = now_secs(); + let result = dir_db.execute( + "INSERT INTO peer_profiles (peer_id, display_name, last_seen, registered_at) + VALUES (?1, ?2, ?3, ?3) + ON CONFLICT(peer_id) DO UPDATE SET + display_name = excluded.display_name, + last_seen = excluded.last_seen", + params![peer.to_string(), display_name, ts as i64], + ); + match result { + Ok(_) => { + log::info!("directory: registered peer {} as '{}'", peer, display_name); + DirectoryResponse::Ok + } + Err(e) => { + log::warn!("directory: failed to register {}: {}", peer, e); + DirectoryResponse::Ok + } + } + } + DirectoryRequest::Search { query } => { + let trimmed = query.trim().to_string(); + let like_pattern = format!("%{}%", trimmed); + + let mut stmt = dir_db.prepare( + "SELECT peer_id, display_name, last_seen FROM peer_profiles + WHERE lower(display_name) LIKE lower(?1) + OR peer_id = ?2 + ORDER BY last_seen DESC + LIMIT 20" + ); + let entries = match stmt { + Ok(ref mut s) => s.query_map( + params![like_pattern, trimmed], + |row| { + let last_seen: i64 = row.get(2)?; + Ok(DirectoryProfileEntry { + peer_id: row.get(0)?, + display_name: row.get(1)?, + last_seen: last_seen.max(0) as u64, + }) + }, + ).map(|rows| rows.filter_map(|r| r.ok()).collect::>()) + .unwrap_or_default(), + Err(e) => { + log::warn!("directory: search query failed: {}", e); + vec![] + } + }; + log::info!( + "directory: search '{}' -> {} results", + trimmed, + entries.len() + ); + DirectoryResponse::Results(entries) + } + DirectoryRequest::Remove => { + let _ = dir_db.execute( + "DELETE FROM peer_profiles WHERE peer_id = ?1", + params![peer.to_string()], + ); + log::info!("directory: removed profile for {}", peer); + DirectoryResponse::Ok + } + }; + + if swarm + .behaviour_mut() + .directory_service + .send_response(channel, response) + .is_err() + { + log::warn!("directory: failed to send response to {}", peer); + } + } + // ignore outbound and other directory service events + SwarmEvent::Behaviour(RelayBehaviourEvent::DirectoryService(_)) => {} + _ => {} } }