add gossipsub support for relay federation and update dependencies

This commit is contained in:
cloudwithax 2026-02-14 21:08:10 -05:00
parent 10900e7740
commit 6ed6a1524e
3 changed files with 242 additions and 15 deletions

80
Cargo.lock generated
View File

@ -591,6 +591,8 @@ dependencies = [
"futures",
"libp2p",
"log",
"serde",
"serde_json",
"tokio",
]
@ -811,6 +813,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-ticker"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9763058047f713632a52e916cc7f6a4b3fc6e9fc1ff8c5b1dc49e5a89041682e"
dependencies = [
"futures",
"futures-timer",
"instant",
]
[[package]]
name = "futures-timer"
version = "3.0.3"
@ -930,6 +943,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex_fmt"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f"
[[package]]
name = "hickory-proto"
version = "0.24.4"
@ -1225,6 +1244,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipconfig"
version = "0.3.2"
@ -1316,6 +1344,7 @@ dependencies = [
"libp2p-connection-limits",
"libp2p-core",
"libp2p-dns",
"libp2p-gossipsub",
"libp2p-identify",
"libp2p-identity",
"libp2p-mdns",
@ -1403,6 +1432,37 @@ dependencies = [
"tracing",
]
[[package]]
name = "libp2p-gossipsub"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4e830fdf24ac8c444c12415903174d506e1e077fbe3875c404a78c5935a8543"
dependencies = [
"asynchronous-codec",
"base64",
"byteorder",
"bytes",
"either",
"fnv",
"futures",
"futures-ticker",
"getrandom 0.2.17",
"hex_fmt",
"libp2p-core",
"libp2p-identity",
"libp2p-swarm",
"prometheus-client",
"quick-protobuf",
"quick-protobuf-codec",
"rand 0.8.5",
"regex",
"sha2",
"smallvec",
"tracing",
"void",
"web-time",
]
[[package]]
name = "libp2p-identify"
version = "0.45.0"
@ -1473,6 +1533,7 @@ checksum = "77ebafa94a717c8442d8db8d3ae5d1c6a15e30f2d347e0cd31d057ca72e42566"
dependencies = [
"futures",
"libp2p-core",
"libp2p-gossipsub",
"libp2p-identify",
"libp2p-identity",
"libp2p-ping",
@ -2655,6 +2716,19 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
dependencies = [
"itoa",
"memchr",
"serde",
"serde_core",
"zmij",
]
[[package]]
name = "sha2"
version = "0.10.9"
@ -3675,3 +3749,9 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zmij"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"

View File

@ -8,6 +8,7 @@ edition = "2021"
libp2p = { version = "0.54", features = [
"relay",
"rendezvous",
"gossipsub",
"identify",
"ping",
"noise",
@ -21,3 +22,5 @@ futures = "0.3"
log = "0.4"
env_logger = "0.11"
directories = "5"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View File

@ -6,22 +6,43 @@
// responsibilities:
// - circuit relay v2: peers connect through this node, never seeing each other's IPs
// - rendezvous: peers register under community namespaces, discover each other by peer ID
// - relay federation: gossips peer registrations to other relays for global discovery
// - no data storage, no message routing, just connection brokering
//
// usage:
// RUST_LOG=info cargo run
// DUSK_RELAY_PORT=4001 cargo run (custom port)
// DUSK_PEER_RELAYS="addr1,addr2" cargo run (federation)
//
// canonical public relay (default in dusk chat clients):
// /dns4/relay.duskchat.app/tcp/4001/p2p/12D3KooWGQkCkACcibJPKzus7Q6U1aYngfTuS4gwYwmJkJJtrSaw
use std::path::PathBuf;
use std::time::Duration;
use futures::StreamExt;
use libp2p::{identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, tcp, yamux, Multiaddr};
use libp2p::{
gossipsub, identify, noise, ping, relay, rendezvous, swarm::SwarmEvent, tcp, yamux, Multiaddr,
PeerId,
};
// gossip message for relay-to-relay federation
// relays broadcast peer registrations to each other so discovery works across relay nodes
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct RelayRegistrationGossip {
peer_id: String,
namespace: String,
// unix timestamp in seconds when this registration expires
ttl: u64,
// relay peer id that originated this registration (to prevent loops)
source_relay: String,
}
#[derive(libp2p::swarm::NetworkBehaviour)]
struct RelayBehaviour {
relay: relay::Behaviour,
rendezvous: rendezvous::server::Behaviour,
gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
ping: ping::Behaviour,
}
@ -84,11 +105,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_behaviour(|key| {
let peer_id = key.public().to_peer_id();
// configure gossipsub for relay-to-relay federation
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(|msg| {
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
msg.data.hash(&mut hasher);
gossipsub::MessageId::from(hasher.finish().to_string())
})
.build()
.expect("valid gossipsub config");
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)
.expect("valid gossipsub behaviour");
RelayBehaviour {
relay: relay::Behaviour::new(peer_id, relay::Config::default()),
rendezvous: rendezvous::server::Behaviour::new(
rendezvous::server::Config::default(),
),
gossipsub,
identify: identify::Behaviour::new(identify::Config::new(
"/dusk/relay/1.0.0".to_string(),
key.public(),
@ -110,16 +151,55 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let canonical_addr = format!("/ip4/0.0.0.0/tcp/{}/p2p/{}", port, local_peer_id);
println!("\n relay address: {}\n", canonical_addr);
// subscribe to the relay federation gossip topic
let federation_topic = gossipsub::IdentTopic::new("dusk/relay/federation");
swarm.behaviour_mut().gossipsub.subscribe(&federation_topic)?;
log::info!("subscribed to relay federation topic");
// connect to peer relays for federation (from env var, comma-separated multiaddrs)
let peer_relays: Vec<Multiaddr> = std::env::var("DUSK_PEER_RELAYS")
.ok()
.map(|s| {
s.split(',')
.filter_map(|addr| addr.trim().parse().ok())
.collect()
})
.unwrap_or_default();
// extract peer IDs from peer relay multiaddrs for tracking
let expected_peer_relay_ids: Vec<PeerId> = peer_relays
.iter()
.filter_map(|addr| {
use libp2p::multiaddr::Protocol;
addr.iter().find_map(|p| match p {
Protocol::P2p(peer_id) => Some(peer_id),
_ => None,
})
})
.collect();
if !peer_relays.is_empty() {
log::info!("connecting to {} peer relays for federation", peer_relays.len());
for addr in &peer_relays {
log::info!(" dialing peer relay: {}", addr);
if let Err(e) = swarm.dial(addr.clone()) {
log::warn!("failed to dial peer relay {}: {}", addr, e);
}
}
} else {
log::warn!("no peer relays configured (DUSK_PEER_RELAYS env var not set)");
log::warn!("this relay will operate in standalone mode");
}
// track active reservations for logging
let mut reservation_count: usize = 0;
let mut connection_count: usize = 0;
// track peer relay connections for federation metrics
let mut connected_peer_relays: Vec<PeerId> = Vec::new();
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => {
log::debug!("listening on: {}/p2p/{}", address, local_peer_id);
}
// relay events
SwarmEvent::Behaviour(RelayBehaviourEvent::Relay(
relay::Event::ReservationReqAccepted { src_peer_id, .. },
@ -169,6 +249,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
peer,
registration.namespace
);
// gossip this registration to peer relays for federation
// other relays can cache this and have their clients query them directly
let gossip = RelayRegistrationGossip {
peer_id: peer.to_string(),
namespace: registration.namespace.to_string(),
// registration.ttl is already a u64 timestamp
ttl: registration.ttl,
source_relay: local_peer_id.to_string(),
};
if let Ok(data) = serde_json::to_vec(&gossip) {
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(federation_topic.clone(), data) {
log::warn!("failed to gossip registration to peer relays: {}", e);
} else {
log::debug!(
"gossiped registration {}:{} to {} peer relays",
gossip.namespace,
gossip.peer_id,
connected_peer_relays.len()
);
}
}
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous(
rendezvous::server::Event::DiscoverServed { enquirer, registrations, .. },
@ -179,6 +282,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
enquirer
);
}
// gossipsub messages - receive peer registrations from other relays
SwarmEvent::Behaviour(RelayBehaviourEvent::Gossipsub(
gossipsub::Event::Message { message, .. }
)) => {
if let Ok(gossip) = serde_json::from_slice::<RelayRegistrationGossip>(&message.data) {
// skip our own messages (loop prevention)
if gossip.source_relay != local_peer_id.to_string() {
log::info!(
"received remote registration {}:{} from relay {} (clients should query that relay directly)",
gossip.namespace,
gossip.peer_id,
gossip.source_relay
);
}
}
}
SwarmEvent::Behaviour(RelayBehaviourEvent::Rendezvous(
rendezvous::server::Event::PeerNotRegistered { peer, namespace, .. },
)) => {
@ -200,20 +320,44 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// connection tracking
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
connection_count += 1;
// check if this is a peer relay connection
if expected_peer_relay_ids.contains(&peer_id) && !connected_peer_relays.contains(&peer_id) {
connected_peer_relays.push(peer_id);
log::info!(
"peer relay connected: {} ({}/{} peer relays online)",
peer_id,
connected_peer_relays.len(),
expected_peer_relay_ids.len()
);
} else {
log::info!(
"peer connected: {} (total connections: {})",
peer_id,
connection_count
);
}
}
SwarmEvent::ConnectionClosed { peer_id, .. } => {
connection_count = connection_count.saturating_sub(1);
// check if this was a peer relay disconnection
if let Some(pos) = connected_peer_relays.iter().position(|id| id == &peer_id) {
connected_peer_relays.remove(pos);
log::warn!(
"peer relay disconnected: {} ({}/{} peer relays online)",
peer_id,
connected_peer_relays.len(),
expected_peer_relay_ids.len()
);
} else {
log::debug!(
"peer disconnected: {} (total connections: {})",
peer_id,
connection_count
);
}
}
// identify events - log protocol info from connecting peers
SwarmEvent::Behaviour(RelayBehaviourEvent::Identify(