add directory service for peer profile management with SQLite integration

This commit is contained in:
cloudwithax 2026-02-19 17:48:07 -05:00
parent 5c8f57e07c
commit b29039557a
3 changed files with 255 additions and 0 deletions

80
Cargo.lock generated
View File

@ -37,6 +37,18 @@ dependencies = [
"subtle",
]
[[package]]
name = "ahash"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@ -614,6 +626,7 @@ dependencies = [
"libp2p",
"log",
"reqwest",
"rusqlite",
"serde",
"serde_json",
"tokio",
@ -701,6 +714,18 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "fallible-iterator"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fiat-crypto"
version = "0.2.9"
@ -938,6 +963,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.15.5"
@ -955,6 +989,15 @@ version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "hashlink"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "heck"
version = "0.5.0"
@ -1925,6 +1968,17 @@ dependencies = [
"libc",
]
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
@ -2332,6 +2386,12 @@ dependencies = [
"spki",
]
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "polling"
version = "3.11.0"
@ -2744,6 +2804,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "rusqlite"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e"
dependencies = [
"bitflags 2.10.0",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"smallvec",
]
[[package]]
name = "rustc-hash"
version = "2.1.1"
@ -3392,6 +3466,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.5"

View File

@ -31,4 +31,5 @@ reqwest = { version = "0.12", default-features = false, features = [
"json",
] }
dotenvy = "0.15"
rusqlite = { version = "0.32", features = ["bundled"] }
urlencoding = "2"

View File

@ -29,6 +29,8 @@ use std::collections::{HashMap, VecDeque};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use rusqlite::{params, Connection};
use futures::StreamExt;
use libp2p::{
connection_limits, gossipsub, identify, noise, ping, relay, rendezvous,
@ -59,6 +61,8 @@ struct RelayBehaviour {
limits: connection_limits::Behaviour,
// gif search service - clients send GifRequest, relay responds with GifResponse
gif_service: cbor::Behaviour<GifRequest, GifResponse>,
// persistent directory service - clients register/search peer profiles
directory_service: cbor::Behaviour<DirectoryRequest, DirectoryResponse>,
}
// ---- gif protocol ----
@ -67,6 +71,8 @@ struct RelayBehaviour {
// relay so clients never need credentials.
const GIF_PROTOCOL: StreamProtocol = StreamProtocol::new("/dusk/gif/1.0.0");
const DIRECTORY_PROTOCOL: libp2p::StreamProtocol =
libp2p::StreamProtocol::new("/dusk/directory/1.0.0");
const KLIPY_API_BASE: &str = "https://api.klipy.com/v2";
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@ -202,6 +208,35 @@ impl GifRateLimiter {
}
// ---- end gif rate limiter ----
// ---- directory protocol ----
// clients register their display_name here so others can search
// even when that peer is offline. the relay only stores peer_id + display_name.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum DirectoryRequest {
// register/update this peer's display_name in the index
// peer_id is taken from the libp2p connection, not trusted from the request
Register { display_name: String },
// search the index by display_name (LIKE %query%) or exact peer_id
Search { query: String },
// remove this peer's profile from the index
Remove,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum DirectoryResponse {
Ok,
Results(Vec<DirectoryProfileEntry>),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DirectoryProfileEntry {
pub peer_id: String,
pub display_name: String,
pub last_seen: u64,
}
// ---- end directory protocol ----
// fetch from klipy and normalize into our GifResult format
async fn fetch_klipy(
http: &reqwest::Client,
@ -313,6 +348,34 @@ fn load_or_generate_keypair() -> libp2p::identity::Keypair {
kp
}
// resolve path for the relay's persistent directory database
fn directory_db_path() -> std::path::PathBuf {
if let Some(proj_dirs) = directories::ProjectDirs::from("", "", "dusk-relay") {
let dir = proj_dirs.data_dir().to_path_buf();
std::fs::create_dir_all(&dir).ok();
dir.join("directory.sqlite3")
} else {
std::path::PathBuf::from("./relay_directory.sqlite3")
}
}
fn open_directory_db() -> Result<Connection, rusqlite::Error> {
let conn = Connection::open(directory_db_path())?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS peer_profiles (
peer_id TEXT PRIMARY KEY,
display_name TEXT NOT NULL,
last_seen INTEGER NOT NULL,
registered_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_display_name
ON peer_profiles(display_name COLLATE NOCASE);",
)?;
Ok(conn)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
@ -428,6 +491,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
request_response::Config::default()
.with_request_timeout(Duration::from_secs(15)),
),
// persistent directory service - clients register/search/remove profiles
directory_service: cbor::Behaviour::new(
[(DIRECTORY_PROTOCOL, ProtocolSupport::Inbound)],
request_response::Config::default()
.with_request_timeout(Duration::from_secs(15)),
),
}
})?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300)))
@ -502,6 +571,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// track peer relay connections for federation metrics
let mut connected_peer_relays: Vec<PeerId> = Vec::new();
let dir_db = match open_directory_db() {
Ok(db) => {
log::info!("directory db opened at {}", directory_db_path().display());
db
}
Err(e) => {
log::error!("failed to open directory db: {}", e);
return Err(e.into());
}
};
let now_secs = || {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
};
loop {
let event = tokio::select! {
// periodic cache eviction
@ -800,6 +886,94 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}
// directory service - clients register/search/remove their profiles
SwarmEvent::Behaviour(RelayBehaviourEvent::DirectoryService(
request_response::Event::Message {
peer,
message: request_response::Message::Request { request, channel, .. },
..
},
)) => {
let response = match request {
DirectoryRequest::Register { display_name } => {
let ts = now_secs();
let result = dir_db.execute(
"INSERT INTO peer_profiles (peer_id, display_name, last_seen, registered_at)
VALUES (?1, ?2, ?3, ?3)
ON CONFLICT(peer_id) DO UPDATE SET
display_name = excluded.display_name,
last_seen = excluded.last_seen",
params![peer.to_string(), display_name, ts as i64],
);
match result {
Ok(_) => {
log::info!("directory: registered peer {} as '{}'", peer, display_name);
DirectoryResponse::Ok
}
Err(e) => {
log::warn!("directory: failed to register {}: {}", peer, e);
DirectoryResponse::Ok
}
}
}
DirectoryRequest::Search { query } => {
let trimmed = query.trim().to_string();
let like_pattern = format!("%{}%", trimmed);
let mut stmt = dir_db.prepare(
"SELECT peer_id, display_name, last_seen FROM peer_profiles
WHERE lower(display_name) LIKE lower(?1)
OR peer_id = ?2
ORDER BY last_seen DESC
LIMIT 20"
);
let entries = match stmt {
Ok(ref mut s) => s.query_map(
params![like_pattern, trimmed],
|row| {
let last_seen: i64 = row.get(2)?;
Ok(DirectoryProfileEntry {
peer_id: row.get(0)?,
display_name: row.get(1)?,
last_seen: last_seen.max(0) as u64,
})
},
).map(|rows| rows.filter_map(|r| r.ok()).collect::<Vec<_>>())
.unwrap_or_default(),
Err(e) => {
log::warn!("directory: search query failed: {}", e);
vec![]
}
};
log::info!(
"directory: search '{}' -> {} results",
trimmed,
entries.len()
);
DirectoryResponse::Results(entries)
}
DirectoryRequest::Remove => {
let _ = dir_db.execute(
"DELETE FROM peer_profiles WHERE peer_id = ?1",
params![peer.to_string()],
);
log::info!("directory: removed profile for {}", peer);
DirectoryResponse::Ok
}
};
if swarm
.behaviour_mut()
.directory_service
.send_response(channel, response)
.is_err()
{
log::warn!("directory: failed to send response to {}", peer);
}
}
// ignore outbound and other directory service events
SwarmEvent::Behaviour(RelayBehaviourEvent::DirectoryService(_)) => {}
_ => {}
}
}