From 6ed6a1524e7d877d5ea21e7a3db290df11066561 Mon Sep 17 00:00:00 2001 From: cloudwithax Date: Sat, 14 Feb 2026 21:08:10 -0500 Subject: [PATCH] add gossipsub support for relay federation and update dependencies --- Cargo.lock | 80 ++++++++++++++++++++++++ Cargo.toml | 3 + src/main.rs | 174 +++++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 242 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f9b7b6a..0213396 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,8 @@ dependencies = [ "futures", "libp2p", "log", + "serde", + "serde_json", "tokio", ] @@ -811,6 +813,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-ticker" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e" +dependencies = [ + "futures", + "futures-timer", + "instant", +] + [[package]] name = "futures-timer" version = "3.0.3" @@ -930,6 +943,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hickory-proto" version = "0.24.4" @@ -1225,6 +1244,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -1316,6 +1344,7 @@ dependencies = [ "libp2p-connection-limits", "libp2p-core", "libp2p-dns", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-mdns", @@ -1403,6 +1432,37 @@ dependencies = [ "tracing", ] +[[package]] +name = "libp2p-gossipsub" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4e830fdf24ac8c444c12415903174d506e1e077fbe3875c404a78c5935a8543" +dependencies = [ + "asynchronous-codec", + "base64", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-ticker", + "getrandom 0.2.17", + "hex_fmt", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "regex", + "sha2", + "smallvec", + "tracing", + "void", + "web-time", +] + [[package]] name = "libp2p-identify" version = "0.45.0" @@ -1473,6 +1533,7 @@ checksum = "77ebafa94a717c8442d8db8d3ae5d1c6a15e30f2d347e0cd31d057ca72e42566" dependencies = [ "futures", "libp2p-core", + "libp2p-gossipsub", "libp2p-identify", "libp2p-identity", "libp2p-ping", @@ -2655,6 +2716,19 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "sha2" version = "0.10.9" @@ -3675,3 +3749,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 226be89..698e7c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" libp2p = { version = "0.54", features = [ "relay", "rendezvous", + "gossipsub", "identify", "ping", "noise", @@ -21,3 +22,5 @@ futures = "0.3" log = "0.4" env_logger = "0.11" directories = "5" +serde = { version = "1", features = ["derive"] } +serde_json = "1" diff --git a/src/main.rs b/src/main.rs index 9b4d0cc..8567ce3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,22 +6,43 @@ // responsibilities: // - circuit relay v2: peers connect through this node, never seeing each other's IPs // - rendezvous: peers register under community namespaces, discover each other by peer ID +// - relay federation: gossips peer registrations to other relays for global discovery // - no data storage, no message routing, just connection brokering // // usage: // RUST_LOG=info cargo run // DUSK_RELAY_PORT=4001 cargo run (custom port) +// DUSK_PEER_RELAYS="addr1,addr2" cargo run (federation) +// +// canonical public relay (default in dusk chat clients): +// /dns4/relay.duskchat.app/tcp/4001/p2p/12D3KooWGQkCkACcibJPKzus7Q6U1aYngfTuS4gwYwmJkJJtrSaw use std::path::PathBuf; use std::time::Duration; use futures::StreamExt; -use libp2p::{identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, tcp, yamux, Multiaddr}; +use libp2p::{ + gossipsub, identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, tcp, yamux, Multiaddr, + PeerId, +}; + +// gossip message for relay-to-relay federation +// relays broadcast peer registrations to each other so discovery works across relay nodes +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct RelayRegistrationGossip { + peer_id: String, + namespace: String, + // unix timestamp in seconds when this registration expires + ttl: u64, + // relay peer id that originated this registration (to prevent loops) + source_relay: String, +} #[derive(libp2p::swarm::NetworkBehaviour)] struct RelayBehaviour { relay: relay::Behaviour, rendezvous: rendezvous::server::Behaviour, + gossipsub: gossipsub::Behaviour, identify: identify::Behaviour, ping: ping::Behaviour, } @@ -84,11 +105,31 @@ async fn main() -> Result<(), Box> { .with_behaviour(|key| { let peer_id = key.public().to_peer_id(); + // configure gossipsub for relay-to-relay federation + let gossipsub_config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(10)) + .validation_mode(gossipsub::ValidationMode::Strict) + .message_id_fn(|msg| { + use std::hash::{Hash, Hasher}; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + msg.data.hash(&mut hasher); + gossipsub::MessageId::from(hasher.finish().to_string()) + }) + .build() + .expect("valid gossipsub config"); + + let gossipsub = gossipsub::Behaviour::new( + gossipsub::MessageAuthenticity::Signed(key.clone()), + gossipsub_config, + ) + .expect("valid gossipsub behaviour"); + RelayBehaviour { relay: relay::Behaviour::new(peer_id, relay::Config::default()), rendezvous: rendezvous::server::Behaviour::new( rendezvous::server::Config::default(), ), + gossipsub, identify: identify::Behaviour::new(identify::Config::new( "/dusk/relay/1.0.0".to_string(), key.public(), @@ -110,16 +151,55 @@ async fn main() -> Result<(), Box> { let canonical_addr = format!("/ip4/0.0.0.0/tcp/{}/p2p/{}", port, local_peer_id); println!("\n relay address: {}\n", canonical_addr); + // subscribe to the relay federation gossip topic + let federation_topic = gossipsub::IdentTopic::new("dusk/relay/federation"); + swarm.behaviour_mut().gossipsub.subscribe(&federation_topic)?; + log::info!("subscribed to relay federation topic"); + + // connect to peer relays for federation (from env var, comma-separated multiaddrs) + let peer_relays: Vec = std::env::var("DUSK_PEER_RELAYS") + .ok() + .map(|s| { + s.split(',') + .filter_map(|addr| addr.trim().parse().ok()) + .collect() + }) + .unwrap_or_default(); + + // extract peer IDs from peer relay multiaddrs for tracking + let expected_peer_relay_ids: Vec = peer_relays + .iter() + .filter_map(|addr| { + use libp2p::multiaddr::Protocol; + addr.iter().find_map(|p| match p { + Protocol::P2p(peer_id) => Some(peer_id), + _ => None, + }) + }) + .collect(); + + if !peer_relays.is_empty() { + log::info!("connecting to {} peer relays for federation", peer_relays.len()); + for addr in &peer_relays { + log::info!(" dialing peer relay: {}", addr); + if let Err(e) = swarm.dial(addr.clone()) { + log::warn!("failed to dial peer relay {}: {}", addr, e); + } + } + } else { + log::warn!("no peer relays configured (DUSK_PEER_RELAYS env var not set)"); + log::warn!("this relay will operate in standalone mode"); + } + // track active reservations for logging let mut reservation_count: usize = 0; let mut connection_count: usize = 0; + // track peer relay connections for federation metrics + let mut connected_peer_relays: Vec = Vec::new(); + loop { match swarm.select_next_some().await { - SwarmEvent::NewListenAddr { address, .. } => { - log::debug!("listening on: {}/p2p/{}", address, local_peer_id); - } - // relay events SwarmEvent::Behaviour(RelayBehaviourEvent::Relay( relay::Event::ReservationReqAccepted { src_peer_id, .. }, @@ -169,6 +249,29 @@ async fn main() -> Result<(), Box> { peer, registration.namespace ); + + // gossip this registration to peer relays for federation + // other relays can cache this and have their clients query them directly + let gossip = RelayRegistrationGossip { + peer_id: peer.to_string(), + namespace: registration.namespace.to_string(), + // registration.ttl is already a u64 timestamp + ttl: registration.ttl, + source_relay: local_peer_id.to_string(), + }; + + if let Ok(data) = serde_json::to_vec(&gossip) { + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(federation_topic.clone(), data) { + log::warn!("failed to gossip registration to peer relays: {}", e); + } else { + log::debug!( + "gossiped registration {}:{} to {} peer relays", + gossip.namespace, + gossip.peer_id, + connected_peer_relays.len() + ); + } + } } SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous( rendezvous::server::Event::DiscoverServed { enquirer, registrations, .. }, @@ -179,6 +282,23 @@ async fn main() -> Result<(), Box> { enquirer ); } + + // gossipsub messages - receive peer registrations from other relays + SwarmEvent::Behaviour(RelayBehaviourEvent::Gossipsub( + gossipsub::Event::Message { message, .. } + )) => { + if let Ok(gossip) = serde_json::from_slice::(&message.data) { + // skip our own messages (loop prevention) + if gossip.source_relay != local_peer_id.to_string() { + log::info!( + "received remote registration {}:{} from relay {} (clients should query that relay directly)", + gossip.namespace, + gossip.peer_id, + gossip.source_relay + ); + } + } + } SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous( rendezvous::server::Event::PeerNotRegistered { peer, namespace, .. }, )) => { @@ -200,19 +320,43 @@ async fn main() -> Result<(), Box> { // connection tracking SwarmEvent::ConnectionEstablished { peer_id, .. } => { connection_count += 1; - log::info!( - "peer connected: {} (total connections: {})", - peer_id, - connection_count - ); + + // check if this is a peer relay connection + if expected_peer_relay_ids.contains(&peer_id) && !connected_peer_relays.contains(&peer_id) { + connected_peer_relays.push(peer_id); + log::info!( + "peer relay connected: {} ({}/{} peer relays online)", + peer_id, + connected_peer_relays.len(), + expected_peer_relay_ids.len() + ); + } else { + log::info!( + "peer connected: {} (total connections: {})", + peer_id, + connection_count + ); + } } SwarmEvent::ConnectionClosed { peer_id, .. } => { connection_count = connection_count.saturating_sub(1); - log::debug!( - "peer disconnected: {} (total connections: {})", - peer_id, - connection_count - ); + + // check if this was a peer relay disconnection + if let Some(pos) = connected_peer_relays.iter().position(|id| id == &peer_id) { + connected_peer_relays.remove(pos); + log::warn!( + "peer relay disconnected: {} ({}/{} peer relays online)", + peer_id, + connected_peer_relays.len(), + expected_peer_relay_ids.len() + ); + } else { + log::debug!( + "peer disconnected: {} (total connections: {})", + peer_id, + connection_count + ); + } } // identify events - log protocol info from connecting peers