feat(node): wire directory service into event loop

- Registers profile on relay reservation accepted (if discoverable)
- DirectorySearch, DirectoryRegister, DirectoryRemove, SetRelayDiscoverable commands
- Pending reply map for async search results

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
This commit is contained in:
cloudwithax 2026-02-19 12:25:15 -05:00
parent c01dc94338
commit cb45e2e463
1 changed files with 128 additions and 0 deletions

View File

@ -208,6 +208,21 @@ pub enum NodeCommand {
request: crate::protocol::gif::GifRequest,
reply: tokio::sync::oneshot::Sender<Result<crate::protocol::gif::GifResponse, String>>,
},
// register this peer's profile in the relay's persistent directory
DirectoryRegister,
// remove this peer's profile from the relay's directory
DirectoryRemove,
// search the relay's directory by display_name or peer_id
DirectorySearch {
query: String,
reply: tokio::sync::oneshot::Sender<
Result<Vec<crate::protocol::directory::DirectoryProfileEntry>, String>,
>,
},
// update the relay_discoverable flag at runtime (from settings toggle)
SetRelayDiscoverable {
enabled: bool,
},
}
// events emitted from the node to the tauri frontend
@ -464,6 +479,20 @@ pub async fn start(
tokio::sync::oneshot::Sender<Result<crate::protocol::gif::GifResponse, String>>,
> = HashMap::new();
// pending directory search replies keyed by request_response request id
let mut pending_directory_replies: HashMap<
libp2p::request_response::OutboundRequestId,
tokio::sync::oneshot::Sender<
Result<Vec<crate::protocol::directory::DirectoryProfileEntry>, String>,
>,
> = HashMap::new();
// relay_discoverable flag -- read from storage once at startup
let mut relay_discoverable = storage
.load_settings()
.map(|s| s.relay_discoverable)
.unwrap_or(true);
// relay reconnection state with exponential backoff
let mut relay_backoff_secs = RELAY_INITIAL_BACKOFF_SECS;
// deferred warning timer -- only notify the frontend after the grace
@ -1035,6 +1064,18 @@ pub async fn start(
// any WAN peers are reachable, so this ensures remote
// peers learn about us once the relay mesh is live
publish_profile(&mut swarm_instance, &node_keypair, &storage);
// register profile in relay's persistent directory if discoverable
if relay_discoverable {
let profile = storage.load_profile().unwrap_or_default();
swarm_instance.behaviour_mut().directory_service.send_request(
&relay_peer_id,
crate::protocol::directory::DirectoryRequest::Register {
display_name: profile.display_name,
},
);
log::info!("directory: sent Register to relay");
}
}
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::RelayClient(event)) => {
log::debug!("relay client event: {:?}", event);
@ -1345,6 +1386,35 @@ pub async fn start(
// ignore inbound requests (we only send outbound) and other events
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::GifService(_)) => {}
// directory service response from relay
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::DirectoryService(
libp2p::request_response::Event::Message {
message: libp2p::request_response::Message::Response { request_id, response },
..
}
)) => {
if let Some(reply) = pending_directory_replies.remove(&request_id) {
match response {
crate::protocol::directory::DirectoryResponse::Results(entries) => {
let _ = reply.send(Ok(entries));
}
crate::protocol::directory::DirectoryResponse::Ok => {
let _ = reply.send(Ok(vec![]));
}
}
}
}
// directory service outbound failure
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::DirectoryService(
libp2p::request_response::Event::OutboundFailure { request_id, error, .. }
)) => {
if let Some(reply) = pending_directory_replies.remove(&request_id) {
let _ = reply.send(Err(format!("directory request failed: {:?}", error)));
}
log::warn!("directory: outbound failure: {:?}", error);
}
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::DirectoryService(_)) => {}
_ => {}
}
}
@ -1643,6 +1713,64 @@ pub async fn start(
let _ = reply.send(Err("not connected to relay".to_string()));
}
}
Some(NodeCommand::DirectoryRegister) => {
if let Some(rp) = relay_peer {
let profile = storage.load_profile().unwrap_or_default();
swarm_instance.behaviour_mut().directory_service.send_request(
&rp,
crate::protocol::directory::DirectoryRequest::Register {
display_name: profile.display_name,
},
);
log::info!("directory: sent Register (command)");
}
}
Some(NodeCommand::DirectoryRemove) => {
if let Some(rp) = relay_peer {
swarm_instance.behaviour_mut().directory_service.send_request(
&rp,
crate::protocol::directory::DirectoryRequest::Remove,
);
log::info!("directory: sent Remove (command)");
}
}
Some(NodeCommand::DirectorySearch { query, reply }) => {
if let Some(rp) = relay_peer {
let request_id = swarm_instance
.behaviour_mut()
.directory_service
.send_request(
&rp,
crate::protocol::directory::DirectoryRequest::Search { query },
);
pending_directory_replies.insert(request_id, reply);
} else {
let _ = reply.send(Err("relay not connected".to_string()));
}
}
Some(NodeCommand::SetRelayDiscoverable { enabled }) => {
relay_discoverable = enabled;
if enabled {
if let Some(rp) = relay_peer {
let profile = storage.load_profile().unwrap_or_default();
swarm_instance.behaviour_mut().directory_service.send_request(
&rp,
crate::protocol::directory::DirectoryRequest::Register {
display_name: profile.display_name,
},
);
log::info!("directory: registered after opt-in");
}
} else {
if let Some(rp) = relay_peer {
swarm_instance.behaviour_mut().directory_service.send_request(
&rp,
crate::protocol::directory::DirectoryRequest::Remove,
);
log::info!("directory: removed after opt-out");
}
}
}
}
}
}