From f97cde8608fda36385483c5e81a6230be85e53ff Mon Sep 17 00:00:00 2001 From: cloudwithax Date: Wed, 18 Feb 2026 18:23:24 -0500 Subject: [PATCH] feat: enhance relay address validation and configuration management --- src-tauri/src/commands/identity.rs | 14 +- src-tauri/src/node/mod.rs | 468 +++++++++++++++++++++++++---- 2 files changed, 421 insertions(+), 61 deletions(-) diff --git a/src-tauri/src/commands/identity.rs b/src-tauri/src/commands/identity.rs index 4bb87a3..80f9962 100644 --- a/src-tauri/src/commands/identity.rs +++ b/src-tauri/src/commands/identity.rs @@ -337,10 +337,14 @@ pub async fn set_relay_address( state: State<'_, AppState>, relay_addr: String, ) -> Result<(), String> { - // validate the relay address format - let _ = relay_addr - .parse::() - .map_err(|_| "invalid relay address format")?; + // validate relay format and require /p2p/ component + let (validated_multiaddr, validated_peer_id) = + crate::node::validate_relay_multiaddr(&relay_addr)?; + log::info!( + "updating relay address to {} (peer {})", + validated_multiaddr, + validated_peer_id + ); // stop the current node if running { @@ -356,7 +360,7 @@ pub async fn set_relay_address( // update settings with the new relay address let mut settings = state.storage.load_settings().unwrap_or_default(); - settings.custom_relay_addr = Some(relay_addr); + settings.custom_relay_addr = Some(validated_multiaddr.to_string()); state .storage .save_settings(&settings) diff --git a/src-tauri/src/node/mod.rs b/src-tauri/src/node/mod.rs index e8d4324..62f803a 100644 --- a/src-tauri/src/node/mod.rs +++ b/src-tauri/src/node/mod.rs @@ -21,28 +21,20 @@ const DEFAULT_RELAY_ADDR: &str = const RELAY_INITIAL_BACKOFF_SECS: u64 = 2; const RELAY_MAX_BACKOFF_SECS: u64 = 120; const RELAY_BACKOFF_MULTIPLIER: u64 = 2; +const RENDEZVOUS_TICK_SECS: u64 = 120; +const KAD_BOOTSTRAP_TICK_SECS: u64 = 180; +const DUSK_BOOTSTRAP_PEERS_ENV: &str = "DUSK_BOOTSTRAP_PEERS"; // max time to hold pending rendezvous registrations before discarding (10 min) const PENDING_QUEUE_TTL_SECS: u64 = 600; // grace period before warning the frontend about relay being down, // prevents banner flashing on transient disconnections const RELAY_WARN_GRACE_SECS: u64 = 8; -// resolve the relay multiaddr from env var, custom setting, or default -// priority: DUSK_RELAY_ADDR env var > custom setting > DEFAULT_RELAY_ADDR -fn relay_addr(custom_addr: Option<&str>) -> Option { - let addr_str = std::env::var("DUSK_RELAY_ADDR") - .ok() - .filter(|s| !s.is_empty()) - .or_else(|| custom_addr.map(|s| s.to_string())) - .or_else(|| { - if DEFAULT_RELAY_ADDR.is_empty() { - None - } else { - Some(DEFAULT_RELAY_ADDR.to_string()) - } - })?; - - addr_str.parse().ok() +#[derive(Clone)] +struct RelayConfig { + addr: libp2p::Multiaddr, + peer_id: libp2p::PeerId, + source: &'static str, } // extract the peer id from a multiaddr (the /p2p/ component) @@ -54,6 +46,119 @@ fn peer_id_from_multiaddr(addr: &libp2p::Multiaddr) -> Option { }) } +pub fn validate_relay_multiaddr( + relay_addr: &str, +) -> Result<(libp2p::Multiaddr, libp2p::PeerId), String> { + let trimmed = relay_addr.trim(); + if trimmed.is_empty() { + return Err("relay address cannot be empty".to_string()); + } + + let multiaddr = trimmed.parse::().map_err(|e| { + format!( + "invalid relay multiaddr '{}': {}", + trimmed, + e + ) + })?; + + let peer_id = peer_id_from_multiaddr(&multiaddr).ok_or_else(|| { + format!( + "relay multiaddr must include '/p2p/': '{}'", + trimmed + ) + })?; + + Ok((multiaddr, peer_id)) +} + +// resolve the relay multiaddr from env var, custom setting, or default +// priority: DUSK_RELAY_ADDR env var > custom setting > DEFAULT_RELAY_ADDR +fn resolve_relay_config(custom_addr: Option<&str>) -> Option { + let mut candidates: Vec<(&'static str, String)> = Vec::new(); + + if let Ok(env_addr) = std::env::var("DUSK_RELAY_ADDR") { + let trimmed = env_addr.trim(); + if !trimmed.is_empty() { + candidates.push(("DUSK_RELAY_ADDR", trimmed.to_string())); + } + } + + if let Some(custom) = custom_addr.map(str::trim).filter(|s| !s.is_empty()) { + candidates.push(("custom_relay_addr", custom.to_string())); + } + + let default_trimmed = DEFAULT_RELAY_ADDR.trim(); + if !default_trimmed.is_empty() { + candidates.push(("DEFAULT_RELAY_ADDR", default_trimmed.to_string())); + } + + for (source, candidate) in candidates { + match validate_relay_multiaddr(&candidate) { + Ok((addr, peer_id)) => { + return Some(RelayConfig { + addr, + peer_id, + source, + }); + } + Err(e) => { + log::warn!( + "ignoring invalid relay address from {}: {}", + source, + e + ); + } + } + } + + None +} + +fn bootstrap_peers(relay_config: Option<&RelayConfig>) -> Vec<(libp2p::Multiaddr, libp2p::PeerId)> { + let mut peers: Vec<(libp2p::Multiaddr, libp2p::PeerId)> = Vec::new(); + let mut seen = HashSet::new(); + + if let Some(cfg) = relay_config { + let key = format!("{}|{}", cfg.addr, cfg.peer_id); + if seen.insert(key) { + peers.push((cfg.addr.clone(), cfg.peer_id)); + } + } + + if let Ok(raw) = std::env::var(DUSK_BOOTSTRAP_PEERS_ENV) { + for addr in raw + .split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + { + match validate_relay_multiaddr(addr) { + Ok((multiaddr, peer_id)) => { + let key = format!("{}|{}", multiaddr, peer_id); + if seen.insert(key) { + peers.push((multiaddr, peer_id)); + } + } + Err(e) => { + log::warn!( + "ignoring invalid bootstrap peer in {}: {}", + DUSK_BOOTSTRAP_PEERS_ENV, + e + ); + } + } + } + } + + peers +} + +fn queue_namespace_unique(queue: &mut Vec, namespace: String) { + if !queue.contains(&namespace) { + queue.push(namespace); + } +} + // handle to the running p2p node, used to stop it pub struct NodeHandle { pub task: JoinHandle<()>, @@ -267,24 +372,52 @@ pub async fn start( }, ); - // resolve the relay address for WAN connectivity - let relay_multiaddr = relay_addr(custom_relay_addr.as_deref()); - let relay_peer_id = relay_multiaddr.as_ref().and_then(peer_id_from_multiaddr); + // resolve validated relay and bootstrap peer configuration for WAN connectivity + let relay_config = resolve_relay_config(custom_relay_addr.as_deref()); + if let Some(cfg) = relay_config.as_ref() { + log::info!( + "using relay {} (peer {}) from {}", + cfg.addr, + cfg.peer_id, + cfg.source + ); + } + let relay_multiaddr = relay_config.as_ref().map(|cfg| cfg.addr.clone()); + let relay_peer_id = relay_config.as_ref().map(|cfg| cfg.peer_id); + + let bootstrap_nodes = bootstrap_peers(relay_config.as_ref()); + if !bootstrap_nodes.is_empty() { + log::info!( + "configured {} WAN bootstrap peer(s) (relay + optional {})", + bootstrap_nodes.len(), + DUSK_BOOTSTRAP_PEERS_ENV + ); + } + for (addr, peer) in &bootstrap_nodes { + swarm_instance + .behaviour_mut() + .kademlia + .add_address(peer, addr.clone()); + } // if a relay is configured, dial it immediately // don't emit RelayStatus here -- the store defaults to connected=true so // no warning shows during the initial handshake. the warning only appears // if the dial actually fails (OutgoingConnectionError) or the connection drops. if let Some(ref addr) = relay_multiaddr { - log::info!("dialing relay at {}", addr); + log::info!("relay dial start (startup): {}", addr); if let Err(e) = swarm_instance.dial(addr.clone()) { - log::warn!("failed to dial relay: {}", e); + log::warn!("relay dial failed (startup): {}", e); // emit disconnected status immediately if dial fails let _ = app_handle.emit("dusk-event", DuskEvent::RelayStatus { connected: false }); + } else { + log::info!("relay dial initiated (startup)"); } } else { // if relay address is invalid or not configured, emit disconnected status - log::warn!("no valid relay address configured, running in LAN-only mode"); + log::warn!( + "no valid relay address configured from DUSK_RELAY_ADDR/custom/default, running in LAN-only mode" + ); let _ = app_handle.emit("dusk-event", DuskEvent::RelayStatus { connected: false }); } @@ -315,11 +448,15 @@ pub async fn start( // timestamp when pending items were first queued (for TTL cleanup) let mut pending_queued_at: Option = None; - // rendezvous registration refresh interval (registrations expire) - let mut rendezvous_tick = tokio::time::interval(std::time::Duration::from_secs(120)); + // rendezvous registration/rediscovery refresh interval + let mut rendezvous_tick = + tokio::time::interval(std::time::Duration::from_secs(RENDEZVOUS_TICK_SECS)); + // periodic Kademlia bootstrap fallback for WAN resilience + let mut kad_bootstrap_tick = + tokio::time::interval(std::time::Duration::from_secs(KAD_BOOTSTRAP_TICK_SECS)); - // all community namespaces we're registered under (for refresh) - let mut registered_namespaces: HashSet = HashSet::new(); + // all namespaces we should keep active for rendezvous register + rediscover + let mut active_namespaces: HashSet = HashSet::new(); // pending gif search replies keyed by request_response request id let mut pending_gif_replies: HashMap< @@ -347,6 +484,54 @@ pub async fn start( tokio::select! { event = swarm_instance.select_next_some() => { match event { + // --- kademlia fallback discovery lifecycle --- + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Kademlia( + libp2p::kad::Event::OutboundQueryProgressed { + id, + result: libp2p::kad::QueryResult::Bootstrap(Ok(result)), + .. + } + )) => { + log::debug!( + "kademlia bootstrap progress (query {:?}): remaining={}", + id, + result.num_remaining + ); + } + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Kademlia( + libp2p::kad::Event::OutboundQueryProgressed { + id, + result: libp2p::kad::QueryResult::Bootstrap(Err(e)), + .. + } + )) => { + log::warn!("kademlia bootstrap query {:?} failed: {:?}", id, e); + } + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Kademlia( + libp2p::kad::Event::OutboundQueryProgressed { + id, + result: libp2p::kad::QueryResult::GetClosestPeers(result), + .. + } + )) => { + match result { + Ok(ok) => { + log::debug!( + "kademlia closest-peers query {:?} returned {} peer(s)", + id, + ok.peers.len() + ); + } + Err(e) => { + log::warn!( + "kademlia closest-peers query {:?} failed: {:?}", + id, + e + ); + } + } + } + // --- gossipsub messages --- libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Gossipsub( libp2p::gossipsub::Event::Message { message, .. } @@ -792,14 +977,20 @@ pub async fn start( if let Some(rp) = relay_peer { match libp2p::rendezvous::Namespace::new(ns.clone()) { Ok(namespace) => { + log::info!( + "rendezvous register start (queued replay) for namespace '{}'", + ns + ); if let Err(e) = swarm_instance.behaviour_mut().rendezvous.register( namespace, rp, None, ) { - log::warn!("failed to register on rendezvous for {}: {:?}", ns, e); - } else { - registered_namespaces.insert(ns); + log::warn!( + "rendezvous register failed (queued replay) for '{}': {:?}", + ns, + e + ); } } Err(e) => { @@ -812,12 +1003,27 @@ pub async fn start( let queued = std::mem::take(&mut pending_discoveries); for ns in queued { if let Some(rp) = relay_peer { - swarm_instance.behaviour_mut().rendezvous.discover( - Some(libp2p::rendezvous::Namespace::new(ns.clone()).unwrap()), - None, - None, - rp, - ); + match libp2p::rendezvous::Namespace::new(ns.clone()) { + Ok(namespace) => { + log::info!( + "rendezvous discover start (queued replay) for namespace '{}'", + ns + ); + swarm_instance.behaviour_mut().rendezvous.discover( + Some(namespace), + None, + None, + rp, + ); + } + Err(e) => { + log::warn!( + "invalid queued rendezvous namespace '{}': {:?}", + ns, + e + ); + } + } } } @@ -830,17 +1036,33 @@ pub async fn start( // peers learn about us once the relay mesh is live publish_profile(&mut swarm_instance, &node_keypair, &storage); } + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::RelayClient(event)) => { + log::debug!("relay client event: {:?}", event); + } // --- rendezvous client events --- libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Rendezvous( libp2p::rendezvous::client::Event::Registered { namespace, .. } )) => { - log::info!("registered on rendezvous under namespace '{}'", namespace); - registered_namespaces.insert(namespace.to_string()); + log::info!( + "rendezvous register success for namespace '{}'", + namespace + ); + active_namespaces.insert(namespace.to_string()); } libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Rendezvous( - libp2p::rendezvous::client::Event::Discovered { registrations, .. } + libp2p::rendezvous::client::Event::Discovered { registrations, cookie, .. } )) => { + let namespace_desc = cookie + .namespace() + .map(|ns| ns.to_string()) + .unwrap_or_else(|| "".to_string()); + log::info!( + "rendezvous discover success: namespace '{}' returned {} peer record(s)", + namespace_desc, + registrations.len() + ); + // discovered peers on rendezvous, connect to them through the relay for registration in registrations { let discovered_peer = registration.record.peer_id(); @@ -897,8 +1119,22 @@ pub async fn start( .with(libp2p::multiaddr::Protocol::P2pCircuit) .with(libp2p::multiaddr::Protocol::P2p(discovered_peer)); + log::info!( + "relay-circuit dial start to discovered peer {} via {}", + discovered_peer, + relay_addr + ); if let Err(e) = swarm_instance.dial(circuit_addr) { - log::warn!("failed to dial peer {} through relay: {}", discovered_peer, e); + log::warn!( + "relay-circuit dial failed for peer {}: {}", + discovered_peer, + e + ); + } else { + log::info!( + "relay-circuit dial initiated for peer {}", + discovered_peer + ); } } } @@ -908,6 +1144,23 @@ pub async fn start( )) => { log::warn!("rendezvous registration failed for '{}': {:?}", namespace, error); } + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Rendezvous( + libp2p::rendezvous::client::Event::DiscoverFailed { namespace, error, .. } + )) => { + let ns = namespace + .map(|ns| ns.to_string()) + .unwrap_or_else(|| "".to_string()); + log::warn!( + "rendezvous discover failed for namespace '{}': {:?}", + ns, + error + ); + } + libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Rendezvous( + libp2p::rendezvous::client::Event::Expired { peer } + )) => { + log::debug!("rendezvous registration expired for peer {}", peer); + } // --- identify events --- libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::Identify( @@ -950,6 +1203,10 @@ pub async fn start( swarm_instance.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); connected_peers.insert(peer_id.to_string()); + if Some(peer_id) == relay_peer { + log::info!("relay dial success: connected to relay peer {}", peer_id); + } + let _ = app_handle.emit("dusk-event", DuskEvent::PeerConnected { peer_id: peer_id.to_string(), }); @@ -973,9 +1230,15 @@ pub async fn start( let relay_circuit_addr = addr.clone() .with(libp2p::multiaddr::Protocol::P2pCircuit); - log::info!("connected to relay, requesting reservation"); + log::info!( + "relay reservation request start via listen_on {}", + relay_circuit_addr + ); if let Err(e) = swarm_instance.listen_on(relay_circuit_addr) { - log::warn!("failed to listen on relay circuit: {}", e); + log::warn!( + "relay reservation request failed (listen_on error): {}", + e + ); } } } @@ -1039,7 +1302,10 @@ pub async fn start( // and schedule a retry with backoff if Some(peer_id) == relay_peer { relay_reservation_active = false; - log::warn!("lost connection to relay, scheduling reconnect in {}s", relay_backoff_secs); + log::warn!( + "relay reservation closed (relay connection dropped), scheduling reconnect in {}s", + relay_backoff_secs + ); // defer the warning so quick reconnections don't flash the banner if relay_warn_at.is_none() { relay_warn_at = Some( @@ -1087,13 +1353,54 @@ pub async fn start( _ = rendezvous_tick.tick() => { if relay_reservation_active { if let Some(rp) = relay_peer { - for ns in registered_namespaces.clone() { - if let Err(e) = swarm_instance.behaviour_mut().rendezvous.register( - libp2p::rendezvous::Namespace::new(ns.clone()).unwrap(), - rp, - None, - ) { - log::warn!("failed to refresh rendezvous registration for {}: {:?}", ns, e); + for ns in active_namespaces.clone() { + match libp2p::rendezvous::Namespace::new(ns.clone()) { + Ok(namespace) => { + log::info!( + "rendezvous register refresh start for namespace '{}'", + ns + ); + if let Err(e) = swarm_instance.behaviour_mut().rendezvous.register( + namespace, + rp, + None, + ) { + log::warn!( + "failed to refresh rendezvous registration for '{}': {:?}", + ns, + e + ); + } + } + Err(e) => { + log::warn!( + "invalid active rendezvous namespace '{}' during refresh: {:?}", + ns, + e + ); + } + } + + match libp2p::rendezvous::Namespace::new(ns.clone()) { + Ok(namespace) => { + log::info!( + "rendezvous rediscovery start for namespace '{}'", + ns + ); + swarm_instance.behaviour_mut().rendezvous.discover( + Some(namespace), + None, + None, + rp, + ); + } + Err(e) => { + log::warn!( + "invalid active rendezvous namespace '{}' during rediscovery: {:?}", + ns, + e + ); + } } } } @@ -1118,6 +1425,42 @@ pub async fn start( } } + // periodic kademlia bootstrap/query as WAN fallback when relay+rendezvous are degraded + _ = kad_bootstrap_tick.tick() => { + for (addr, peer) in &bootstrap_nodes { + swarm_instance + .behaviour_mut() + .kademlia + .add_address(peer, addr.clone()); + } + + match swarm_instance.behaviour_mut().kademlia.bootstrap() { + Ok(query_id) => { + log::info!( + "kademlia bootstrap started (query {:?}, peers: {})", + query_id, + bootstrap_nodes.len() + ); + } + Err(e) => { + log::warn!( + "kademlia bootstrap start failed: {:?}", + e + ); + } + } + + let local_peer_id = *swarm_instance.local_peer_id(); + let query_id = swarm_instance + .behaviour_mut() + .kademlia + .get_closest_peers(local_peer_id); + log::debug!( + "kademlia get_closest_peers started (query {:?})", + query_id + ); + } + // relay reconnection with exponential backoff _ = tokio::time::sleep_until( relay_retry_at.unwrap_or_else(|| tokio::time::Instant::now() + std::time::Duration::from_secs(86400)) @@ -1125,15 +1468,17 @@ pub async fn start( relay_retry_at = None; if !relay_reservation_active { if let Some(ref addr) = relay_multiaddr { - log::info!("attempting relay reconnect to {}", addr); + log::info!("relay dial start (reconnect): {}", addr); if let Err(e) = swarm_instance.dial(addr.clone()) { - log::warn!("failed to dial relay: {}", e); + log::warn!("relay dial failed (reconnect): {}", e); // schedule another retry relay_retry_at = Some( tokio::time::Instant::now() + std::time::Duration::from_secs(relay_backoff_secs), ); relay_backoff_secs = (relay_backoff_secs * RELAY_BACKOFF_MULTIPLIER) .min(RELAY_MAX_BACKOFF_SECS); + } else { + log::info!("relay dial initiated (reconnect)"); } } } @@ -1174,8 +1519,11 @@ pub async fn start( let _ = reply.send(addrs); } Some(NodeCommand::Dial { addr }) => { + log::info!("manual dial start: {}", addr); if let Err(e) = swarm_instance.dial(addr.clone()) { log::warn!("failed to dial {}: {}", addr, e); + } else { + log::info!("manual dial initiated: {}", addr); } } Some(NodeCommand::BroadcastPresence { status }) => { @@ -1209,14 +1557,17 @@ pub async fn start( } } Some(NodeCommand::RegisterRendezvous { namespace }) => { + active_namespaces.insert(namespace.clone()); if relay_reservation_active { if let Some(rp) = relay_peer { match libp2p::rendezvous::Namespace::new(namespace.clone()) { Ok(ns) => { + log::info!( + "rendezvous register start for namespace '{}'", + namespace + ); if let Err(e) = swarm_instance.behaviour_mut().rendezvous.register(ns, rp, None) { log::warn!("failed to register on rendezvous: {:?}", e); - } else { - registered_namespaces.insert(namespace); } } Err(e) => log::warn!("invalid rendezvous namespace '{}': {:?}", namespace, e), @@ -1227,14 +1578,19 @@ pub async fn start( if pending_queued_at.is_none() { pending_queued_at = Some(std::time::Instant::now()); } - pending_registrations.push(namespace); + queue_namespace_unique(&mut pending_registrations, namespace); } } Some(NodeCommand::DiscoverRendezvous { namespace }) => { + active_namespaces.insert(namespace.clone()); if relay_reservation_active { if let Some(rp) = relay_peer { match libp2p::rendezvous::Namespace::new(namespace.clone()) { Ok(ns) => { + log::info!( + "rendezvous discover start for namespace '{}'", + namespace + ); swarm_instance.behaviour_mut().rendezvous.discover( Some(ns), None, @@ -1250,7 +1606,7 @@ pub async fn start( if pending_queued_at.is_none() { pending_queued_at = Some(std::time::Instant::now()); } - pending_discoveries.push(namespace); + queue_namespace_unique(&mut pending_discoveries, namespace); } } Some(NodeCommand::UnregisterRendezvous { namespace }) => { @@ -1259,7 +1615,7 @@ pub async fn start( if pending_registrations.is_empty() && pending_discoveries.is_empty() { pending_queued_at = None; } - registered_namespaces.remove(&namespace); + active_namespaces.remove(&namespace); if relay_reservation_active { if let Some(rp) = relay_peer {