feat(voice): add TURN credential support, ICE restart logic, and connection quality tracking

Introduce a TURN credentials protocol for requesting time-limited relay
server credentials from the relay node, improving NAT traversal for
WebRTC voice connections.

WebRTC layer improvements:
- Add ICE candidate buffering before remote description is set
- Implement automatic ICE restart with configurable max attempts
- Add disconnection timeout with delayed restart recovery
- Add per-peer connection state tracking and callbacks
- Include public STUN servers (Google, Cloudflare) by default
- Add detailed logging throughout the WebRTC lifecycle

Voice store improvements:
- Fetch TURN credentials from relay before establishing connections
- Track per-peer RTCPeerConnectionState with reactive signals
- Derive overall voice quality signal (good/connecting/degraded/failed)
- Evaluate and surface degraded connection state to the UI
- Clean up peer connection states on participant leave
- Fix video toggle resource leak (stop unused audio tracks)

Backend changes:
- Add TurnCredentialRequest/Response protocol types
- Wire turn_credentials behaviour into swarm and node event loop
- Add get_turn_credentials Tauri command
- Propagate errors from voice commands instead of silently dropping
- Add input validation for SDP types in send_voice_sdp
- Handle missing node handle as explicit error in all voice commands
- Add DirectoryResponse::Error variant handling
- Scope Linux WebKitGTK permission grants to UserMediaPermissionRequest

UI improvements:
- Show connection quality indicator in voice channel header
- Add per-peer connection state ring on participant tiles
- Display failed connection warning icon on participant tiles
- Stack muted/deafened indicators vertically to avoid overlap
- Show participant grid in degraded connection state

Also remove unused imports across several frontend components.
This commit is contained in:
cloudwithax 2026-02-24 20:59:19 -05:00
parent 044b5ca111
commit 48e49ee6a5
18 changed files with 897 additions and 200 deletions

View File

@ -7,7 +7,7 @@
"dev": "vite", "dev": "vite",
"build": "vite build", "build": "vite build",
"preview": "vite preview", "preview": "vite preview",
"tauri": "tauri", "tauri": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri",
"tauri:linux": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri", "tauri:linux": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri",
"tauri:dev": "tauri dev", "tauri:dev": "tauri dev",
"tauri:dev:linux": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri dev", "tauri:dev:linux": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri dev",

View File

@ -3,6 +3,7 @@ use tauri::State;
use crate::node::gossip; use crate::node::gossip;
use crate::node::NodeCommand; use crate::node::NodeCommand;
use crate::protocol::messages::{GossipMessage, VoiceMediaState, VoiceParticipant}; use crate::protocol::messages::{GossipMessage, VoiceMediaState, VoiceParticipant};
use crate::protocol::turn::TurnCredentialResponse;
use crate::AppState; use crate::AppState;
#[tauri::command] #[tauri::command]
@ -11,6 +12,11 @@ pub async fn join_voice_channel(
community_id: String, community_id: String,
channel_id: String, channel_id: String,
) -> Result<Vec<VoiceParticipant>, String> { ) -> Result<Vec<VoiceParticipant>, String> {
eprintln!(
"[Voice] join_voice_channel called: community={}, channel={}",
community_id, channel_id
);
let identity = state.identity.lock().await; let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?; let id = identity.as_ref().ok_or("no identity loaded")?;
@ -29,12 +35,16 @@ pub async fn join_voice_channel(
let voice_topic = gossip::topic_for_voice(&community_id, &channel_id); let voice_topic = gossip::topic_for_voice(&community_id, &channel_id);
let node_handle = state.node_handle.lock().await; let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle { if let Some(ref handle) = *node_handle {
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::Subscribe { .send(NodeCommand::Subscribe {
topic: voice_topic.clone(), topic: voice_topic.clone(),
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to subscribe to voice topic: {}", e);
format!("Failed to subscribe to voice channel: {}", e)
})?;
// publish our join announcement // publish our join announcement
let msg = GossipMessage::VoiceJoin { let msg = GossipMessage::VoiceJoin {
@ -45,13 +55,22 @@ pub async fn join_voice_channel(
media_state: media_state.clone(), media_state: media_state.clone(),
}; };
let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?;
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::SendMessage { .send(NodeCommand::SendMessage {
topic: voice_topic, topic: voice_topic,
data, data,
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to publish VoiceJoin: {}", e);
format!("Failed to send voice join announcement: {}", e)
})?;
eprintln!("[Voice] Successfully published VoiceJoin for peer {}", peer_id);
} else {
eprintln!("[Voice] No node handle available — cannot join voice channel");
return Err("Node not running — cannot join voice channel".to_string());
} }
// add ourselves to the local voice channel tracking // add ourselves to the local voice channel tracking
@ -68,6 +87,16 @@ pub async fn join_voice_channel(
let result = participants.clone(); let result = participants.clone();
drop(vc); drop(vc);
// TODO: Participant list race condition
// The participant list returned here only includes locally-tracked peers.
// A newly joining peer will not see existing participants until they receive
// VoiceJoin gossip messages from those peers. To fix this properly, we need:
// 1. A new GossipMessage::VoiceParticipantsRequest variant
// 2. Existing peers respond to the request by re-broadcasting their VoiceJoin
// 3. Or implement a request/response protocol over gossipsub or a direct stream
// This requires changes to protocol/messages.rs and node/mod.rs (gossip handler).
// For now, the frontend should handle late-arriving VoiceJoin events gracefully.
log::info!("joined voice channel {}:{}", community_id, channel_id); log::info!("joined voice channel {}:{}", community_id, channel_id);
Ok(result) Ok(result)
@ -79,6 +108,11 @@ pub async fn leave_voice_channel(
community_id: String, community_id: String,
channel_id: String, channel_id: String,
) -> Result<(), String> { ) -> Result<(), String> {
eprintln!(
"[Voice] leave_voice_channel called: community={}, channel={}",
community_id, channel_id
);
let identity = state.identity.lock().await; let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?; let id = identity.as_ref().ok_or("no identity loaded")?;
let peer_id = id.peer_id.to_string(); let peer_id = id.peer_id.to_string();
@ -95,19 +129,32 @@ pub async fn leave_voice_channel(
peer_id: peer_id.clone(), peer_id: peer_id.clone(),
}; };
let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?;
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::SendMessage { .send(NodeCommand::SendMessage {
topic: voice_topic.clone(), topic: voice_topic.clone(),
data, data,
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to publish VoiceLeave: {}", e);
format!("Failed to send voice leave announcement: {}", e)
})?;
eprintln!("[Voice] Successfully published VoiceLeave for peer {}", peer_id);
// unsubscribe from the voice topic // unsubscribe from the voice topic
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::Unsubscribe { topic: voice_topic }) .send(NodeCommand::Unsubscribe { topic: voice_topic })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to unsubscribe from voice topic: {}", e);
format!("Failed to unsubscribe from voice channel: {}", e)
})?;
} else {
eprintln!("[Voice] No node handle available — cannot leave voice channel");
return Err("Node not running — cannot leave voice channel".to_string());
} }
// remove ourselves from local tracking // remove ourselves from local tracking
@ -133,6 +180,11 @@ pub async fn update_voice_media_state(
channel_id: String, channel_id: String,
media_state: VoiceMediaState, media_state: VoiceMediaState,
) -> Result<(), String> { ) -> Result<(), String> {
eprintln!(
"[Voice] update_voice_media_state called: community={}, channel={}",
community_id, channel_id
);
let identity = state.identity.lock().await; let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?; let id = identity.as_ref().ok_or("no identity loaded")?;
let peer_id = id.peer_id.to_string(); let peer_id = id.peer_id.to_string();
@ -148,13 +200,22 @@ pub async fn update_voice_media_state(
media_state: media_state.clone(), media_state: media_state.clone(),
}; };
let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?;
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::SendMessage { .send(NodeCommand::SendMessage {
topic: voice_topic, topic: voice_topic,
data, data,
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to publish VoiceMediaStateUpdate: {}", e);
format!("Failed to send media state update: {}", e)
})?;
eprintln!("[Voice] Successfully published VoiceMediaStateUpdate for peer {}", peer_id);
} else {
eprintln!("[Voice] No node handle available — cannot update media state");
return Err("Node not running — cannot update media state".to_string());
} }
// update local tracking // update local tracking
@ -179,6 +240,23 @@ pub async fn send_voice_sdp(
sdp_type: String, sdp_type: String,
sdp: String, sdp: String,
) -> Result<(), String> { ) -> Result<(), String> {
eprintln!(
"[Voice] send_voice_sdp called: community={}, channel={}, to_peer={}, sdp_type={}",
community_id, channel_id, to_peer, sdp_type
);
// Validate SDP type before doing anything else
match sdp_type.as_str() {
"offer" | "answer" | "pranswer" => {}
_ => {
eprintln!("[Voice] Invalid SDP type: {}", sdp_type);
return Err(format!(
"Invalid SDP type '{}': must be one of 'offer', 'answer', 'pranswer'",
sdp_type
));
}
}
let identity = state.identity.lock().await; let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?; let id = identity.as_ref().ok_or("no identity loaded")?;
let from_peer = id.peer_id.to_string(); let from_peer = id.peer_id.to_string();
@ -190,19 +268,28 @@ pub async fn send_voice_sdp(
let msg = GossipMessage::VoiceSdp { let msg = GossipMessage::VoiceSdp {
community_id, community_id,
channel_id, channel_id,
from_peer, from_peer: from_peer.clone(),
to_peer, to_peer,
sdp_type, sdp_type,
sdp, sdp,
}; };
let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?;
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::SendMessage { .send(NodeCommand::SendMessage {
topic: voice_topic, topic: voice_topic,
data, data,
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to publish VoiceSdp: {}", e);
format!("Failed to send voice SDP: {}", e)
})?;
eprintln!("[Voice] Successfully published VoiceSdp from peer {}", from_peer);
} else {
eprintln!("[Voice] No node handle available — cannot send SDP");
return Err("Node not running — cannot send SDP".to_string());
} }
Ok(()) Ok(())
@ -218,6 +305,11 @@ pub async fn send_voice_ice_candidate(
sdp_mid: Option<String>, sdp_mid: Option<String>,
sdp_mline_index: Option<u32>, sdp_mline_index: Option<u32>,
) -> Result<(), String> { ) -> Result<(), String> {
eprintln!(
"[Voice] send_voice_ice_candidate called: community={}, channel={}, to_peer={}",
community_id, channel_id, to_peer
);
let identity = state.identity.lock().await; let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?; let id = identity.as_ref().ok_or("no identity loaded")?;
let from_peer = id.peer_id.to_string(); let from_peer = id.peer_id.to_string();
@ -229,20 +321,29 @@ pub async fn send_voice_ice_candidate(
let msg = GossipMessage::VoiceIceCandidate { let msg = GossipMessage::VoiceIceCandidate {
community_id, community_id,
channel_id, channel_id,
from_peer, from_peer: from_peer.clone(),
to_peer, to_peer,
candidate, candidate,
sdp_mid, sdp_mid,
sdp_mline_index, sdp_mline_index,
}; };
let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?;
let _ = handle handle
.command_tx .command_tx
.send(NodeCommand::SendMessage { .send(NodeCommand::SendMessage {
topic: voice_topic, topic: voice_topic,
data, data,
}) })
.await; .await
.map_err(|e| {
eprintln!("[Voice] Failed to publish VoiceIceCandidate: {}", e);
format!("Failed to send voice ICE candidate: {}", e)
})?;
eprintln!("[Voice] Successfully published VoiceIceCandidate from peer {}", from_peer);
} else {
eprintln!("[Voice] No node handle available — cannot send ICE candidate");
return Err("Node not running — cannot send ICE candidate".to_string());
} }
Ok(()) Ok(())
@ -254,8 +355,44 @@ pub async fn get_voice_participants(
community_id: String, community_id: String,
channel_id: String, channel_id: String,
) -> Result<Vec<VoiceParticipant>, String> { ) -> Result<Vec<VoiceParticipant>, String> {
eprintln!(
"[Voice] get_voice_participants called: community={}, channel={}",
community_id, channel_id
);
let key = format!("{}:{}", community_id, channel_id); let key = format!("{}:{}", community_id, channel_id);
let vc = state.voice_channels.lock().await; let vc = state.voice_channels.lock().await;
let participants = vc.get(&key).cloned().unwrap_or_default(); let participants = vc.get(&key).cloned().unwrap_or_default();
eprintln!(
"[Voice] Returning {} participants for {}",
participants.len(),
key
);
Ok(participants) Ok(participants)
} }
#[tauri::command]
pub async fn get_turn_credentials(
state: State<'_, AppState>,
) -> Result<TurnCredentialResponse, String> {
eprintln!("[Voice] get_turn_credentials called");
let handle_ref = state.node_handle.lock().await;
let handle = handle_ref.as_ref().ok_or("node not running")?;
let (tx, rx) = tokio::sync::oneshot::channel();
handle
.command_tx
.send(NodeCommand::GetTurnCredentials { reply: tx })
.await
.map_err(|_| "failed to send get_turn_credentials command".to_string())?;
// drop the lock before awaiting the response
drop(handle_ref);
rx.await
.map_err(|_| "turn credentials response channel closed".to_string())?
}

View File

@ -63,18 +63,26 @@ pub fn run() {
.setup(|app| { .setup(|app| {
// grant microphone/camera permissions on linux webkitgtk // grant microphone/camera permissions on linux webkitgtk
// without this, getUserMedia is denied by default // without this, getUserMedia is denied by default
// only allow UserMediaPermissionRequest (mic/camera), deny everything else
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {
use tauri::Manager; use tauri::Manager;
if let Some(window) = app.get_webview_window("main") { if let Some(window) = app.get_webview_window("main") {
window window
.with_webview(|webview| { .with_webview(|webview| {
use webkit2gtk::glib::prelude::ObjectExt;
use webkit2gtk::PermissionRequestExt; use webkit2gtk::PermissionRequestExt;
use webkit2gtk::WebViewExt; use webkit2gtk::WebViewExt;
let wv = webview.inner(); let wv = webview.inner();
wv.connect_permission_request(|_webview, request| { wv.connect_permission_request(|_webview, request| {
request.allow(); if request.is::<webkit2gtk::UserMediaPermissionRequest>() {
true request.allow();
true
} else {
// deny all other permission types (geolocation, etc.)
request.deny();
true
}
}); });
}) })
.ok(); .ok();
@ -150,6 +158,7 @@ pub fn run() {
commands::voice::send_voice_sdp, commands::voice::send_voice_sdp,
commands::voice::send_voice_ice_candidate, commands::voice::send_voice_ice_candidate,
commands::voice::get_voice_participants, commands::voice::get_voice_participants,
commands::voice::get_turn_credentials,
commands::dm::send_dm, commands::dm::send_dm,
commands::dm::get_dm_messages, commands::dm::get_dm_messages,
commands::dm::search_dm_messages, commands::dm::search_dm_messages,

View File

@ -1,5 +1,6 @@
use crate::protocol::directory::{DirectoryRequest, DirectoryResponse}; use crate::protocol::directory::{DirectoryRequest, DirectoryResponse};
use crate::protocol::gif::{GifRequest, GifResponse}; use crate::protocol::gif::{GifRequest, GifResponse};
use crate::protocol::turn::{TurnCredentialRequest, TurnCredentialResponse};
use libp2p::{ use libp2p::{
gossipsub, identify, kad, mdns, ping, relay, rendezvous, request_response::cbor, gossipsub, identify, kad, mdns, ping, relay, rendezvous, request_response::cbor,
swarm::NetworkBehaviour, swarm::NetworkBehaviour,
@ -18,4 +19,6 @@ pub struct DuskBehaviour {
pub gif_service: cbor::Behaviour<GifRequest, GifResponse>, pub gif_service: cbor::Behaviour<GifRequest, GifResponse>,
// directory search: register/search/remove profiles on the relay // directory search: register/search/remove profiles on the relay
pub directory_service: cbor::Behaviour<DirectoryRequest, DirectoryResponse>, pub directory_service: cbor::Behaviour<DirectoryRequest, DirectoryResponse>,
// turn credentials: request time-limited TURN server credentials from the relay
pub turn_credentials: cbor::Behaviour<TurnCredentialRequest, TurnCredentialResponse>,
} }

View File

@ -223,6 +223,10 @@ pub enum NodeCommand {
SetRelayDiscoverable { SetRelayDiscoverable {
enabled: bool, enabled: bool,
}, },
// request time-limited TURN server credentials from the relay
GetTurnCredentials {
reply: tokio::sync::oneshot::Sender<Result<crate::protocol::turn::TurnCredentialResponse, String>>,
},
} }
// events emitted from the node to the tauri frontend // events emitted from the node to the tauri frontend
@ -488,6 +492,14 @@ pub async fn start(
>, >,
> = HashMap::new(); > = HashMap::new();
// pending turn credential replies keyed by request_response request id
let mut pending_turn_credential_replies: HashMap<
libp2p::request_response::OutboundRequestId,
tokio::sync::oneshot::Sender<
Result<crate::protocol::turn::TurnCredentialResponse, String>,
>,
> = HashMap::new();
// relay_discoverable flag -- read from storage once at startup // relay_discoverable flag -- read from storage once at startup
let mut relay_discoverable = storage let mut relay_discoverable = storage
.load_settings() .load_settings()
@ -1401,6 +1413,10 @@ pub async fn start(
crate::protocol::directory::DirectoryResponse::Ok => { crate::protocol::directory::DirectoryResponse::Ok => {
let _ = reply.send(Ok(vec![])); let _ = reply.send(Ok(vec![]));
} }
crate::protocol::directory::DirectoryResponse::Error(msg) => {
log::warn!("directory service error from relay: {}", msg);
let _ = reply.send(Ok(vec![]));
}
} }
} }
} }
@ -1415,6 +1431,29 @@ pub async fn start(
} }
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::DirectoryService(_)) => {} libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::DirectoryService(_)) => {}
// turn credentials response from relay
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::TurnCredentials(
libp2p::request_response::Event::Message {
message: libp2p::request_response::Message::Response { request_id, response },
..
}
)) => {
if let Some(reply) = pending_turn_credential_replies.remove(&request_id) {
let _ = reply.send(Ok(response));
}
}
// turn credentials outbound failure
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::TurnCredentials(
libp2p::request_response::Event::OutboundFailure { request_id, error, .. }
)) => {
log::warn!("turn credentials: outbound failure: {:?}", error);
if let Some(reply) = pending_turn_credential_replies.remove(&request_id) {
let _ = reply.send(Err(format!("turn credentials request failed: {:?}", error)));
}
}
// ignore inbound requests and other events for turn credentials
libp2p::swarm::SwarmEvent::Behaviour(behaviour::DuskBehaviourEvent::TurnCredentials(_)) => {}
_ => {} _ => {}
} }
} }
@ -1777,6 +1816,23 @@ pub async fn start(
} }
} }
} }
Some(NodeCommand::GetTurnCredentials { reply }) => {
if let Some(rp) = relay_peer {
let local_peer_id = swarm_instance.local_peer_id().to_string();
let request_id = swarm_instance
.behaviour_mut()
.turn_credentials
.send_request(
&rp,
crate::protocol::turn::TurnCredentialRequest {
peer_id: local_peer_id,
},
);
pending_turn_credential_replies.insert(request_id, reply);
} else {
let _ = reply.send(Err("not connected to relay".to_string()));
}
}
} }
} }
} }

View File

@ -11,6 +11,9 @@ use libp2p::{
use super::behaviour::DuskBehaviour; use super::behaviour::DuskBehaviour;
use crate::protocol::directory::{DirectoryRequest, DirectoryResponse, DIRECTORY_PROTOCOL}; use crate::protocol::directory::{DirectoryRequest, DirectoryResponse, DIRECTORY_PROTOCOL};
use crate::protocol::gif::{GifRequest, GifResponse, GIF_PROTOCOL}; use crate::protocol::gif::{GifRequest, GifResponse, GIF_PROTOCOL};
use crate::protocol::turn::{
TurnCredentialRequest, TurnCredentialResponse, TURN_CREDENTIALS_PROTOCOL,
};
pub fn build_swarm( pub fn build_swarm(
keypair: &identity::Keypair, keypair: &identity::Keypair,
@ -92,6 +95,13 @@ pub fn build_swarm(
request_response::Config::default() request_response::Config::default()
.with_request_timeout(Duration::from_secs(15)), .with_request_timeout(Duration::from_secs(15)),
), ),
// turn credentials via request-response to the relay (outbound only)
turn_credentials:
cbor::Behaviour::<TurnCredentialRequest, TurnCredentialResponse>::new(
[(TURN_CREDENTIALS_PROTOCOL, ProtocolSupport::Outbound)],
request_response::Config::default()
.with_request_timeout(Duration::from_secs(10)),
),
} }
})? })?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300))) .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(300)))

View File

@ -17,6 +17,7 @@ pub enum DirectoryRequest {
pub enum DirectoryResponse { pub enum DirectoryResponse {
Ok, Ok,
Results(Vec<DirectoryProfileEntry>), Results(Vec<DirectoryProfileEntry>),
Error(String),
} }
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]

View File

@ -3,3 +3,4 @@ pub mod directory;
pub mod gif; pub mod gif;
pub mod identity; pub mod identity;
pub mod messages; pub mod messages;
pub mod turn;

View File

@ -0,0 +1,21 @@
// turn credential protocol types for requesting time-limited TURN server
// credentials from the relay. the client sends a TurnCredentialRequest
// and receives a TurnCredentialResponse with HMAC-based credentials.
use libp2p::StreamProtocol;
pub const TURN_CREDENTIALS_PROTOCOL: StreamProtocol =
StreamProtocol::new("/dusk/turn-credentials/1.0.0");
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TurnCredentialRequest {
pub peer_id: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TurnCredentialResponse {
pub username: String,
pub password: String,
pub ttl: u64,
pub uris: Vec<String>,
}

View File

@ -8,7 +8,6 @@ import { Extension } from "@tiptap/core";
import Mention from "@tiptap/extension-mention"; import Mention from "@tiptap/extension-mention";
import { tiptapToMarkdown } from "../../lib/markdown"; import { tiptapToMarkdown } from "../../lib/markdown";
import { members } from "../../stores/members"; import { members } from "../../stores/members";
import { identity } from "../../stores/identity";
import EmojiPicker from "./EmojiPicker"; import EmojiPicker from "./EmojiPicker";
import GifPicker from "./GifPicker"; import GifPicker from "./GifPicker";
import MentionList from "./MentionList"; import MentionList from "./MentionList";
@ -65,7 +64,6 @@ const MessageInput: Component<MessageInputProps> = (props) => {
// build the mention items list from community members or dm peers // build the mention items list from community members or dm peers
function getMentionItems(query: string): MentionItem[] { function getMentionItems(query: string): MentionItem[] {
const q = query.toLowerCase(); const q = query.toLowerCase();
const currentUser = identity();
// dm context uses the explicit peer list passed via props // dm context uses the explicit peer list passed via props
if (props.mentionPeers) { if (props.mentionPeers) {

View File

@ -1,6 +1,6 @@
import type { Component } from "solid-js"; import type { Component } from "solid-js";
import { For, Show, createSignal } from "solid-js"; import { For, Show, createSignal } from "solid-js";
import { MessageCircle, Search, X, Plus, Group, Users } from "lucide-solid"; import { Search, X, Plus, Users } from "lucide-solid";
import { resolveMentionsPlainText } from "../../lib/mentions"; import { resolveMentionsPlainText } from "../../lib/mentions";
import { import {
dmConversations, dmConversations,

View File

@ -9,7 +9,6 @@ import {
import { knownPeers, friends } from "../../stores/directory"; import { knownPeers, friends } from "../../stores/directory";
import { onlinePeerIds } from "../../stores/members"; import { onlinePeerIds } from "../../stores/members";
import { identity } from "../../stores/identity"; import { identity } from "../../stores/identity";
import { peerCount, nodeStatus } from "../../stores/connection";
import { openModal } from "../../stores/ui"; import { openModal } from "../../stores/ui";
import * as tauri from "../../lib/tauri"; import * as tauri from "../../lib/tauri";
import Avatar from "../common/Avatar"; import Avatar from "../common/Avatar";

View File

@ -42,7 +42,7 @@ import { clearMessages } from "../../stores/messages";
import * as tauri from "../../lib/tauri"; import * as tauri from "../../lib/tauri";
import Avatar from "../common/Avatar"; import Avatar from "../common/Avatar";
import Button from "../common/Button"; import Button from "../common/Button";
import type { ChannelMeta, CategoryMeta, Member } from "../../lib/types"; import type { ChannelMeta, Member } from "../../lib/types";
type CommunitySettingsSection = type CommunitySettingsSection =
| "overview" | "overview"
@ -87,8 +87,6 @@ const CommunitySettingsModal: Component<CommunitySettingsModalProps> = (
} }
}); });
const community = () => activeCommunity();
const sections: { const sections: {
id: CommunitySettingsSection; id: CommunitySettingsSection;
label: string; label: string;

View File

@ -7,6 +7,8 @@ import {
remoteStreams, remoteStreams,
voiceConnectionState, voiceConnectionState,
voiceError, voiceError,
voiceQuality,
peerConnectionStates,
joinVoice, joinVoice,
} from "../../stores/voice"; } from "../../stores/voice";
import { identity } from "../../stores/identity"; import { identity } from "../../stores/identity";
@ -61,11 +63,51 @@ const VoiceChannel: Component<VoiceChannelProps> = (props) => {
return allParticipants().length; return allParticipants().length;
}; };
// voice quality indicator config
const qualityConfig = () => {
const q = voiceQuality();
switch (q) {
case "good":
return { color: "bg-green-500", text: "Connected" };
case "connecting":
return { color: "bg-amber-400 animate-pulse", text: "Connecting..." };
case "degraded":
return { color: "bg-orange-500", text: "Degraded" };
case "failed":
return { color: "bg-red-500", text: "Connection Failed" };
default:
return { color: "bg-white/40", text: "" };
}
};
// look up per-peer connection state for a participant
const getPeerState = (peerId: string, isLocal: boolean) => {
if (isLocal) return "connected" as RTCPeerConnectionState;
return peerConnectionStates()[peerId];
};
return ( return (
<div class="flex flex-col h-full bg-black"> <div class="flex flex-col h-full bg-black">
<div class="flex-1 overflow-auto p-4"> <div class="flex-1 overflow-auto p-4">
<div class="mb-4"> <div class="mb-4">
<h2 class="text-white text-lg font-semibold">Voice Channel</h2> <div class="flex items-center gap-2">
<h2 class="text-white text-lg font-semibold">Voice Channel</h2>
<Show
when={
voiceConnectionState() === "connected" ||
voiceConnectionState() === "degraded"
}
>
<div class="flex items-center gap-1.5 ml-2">
<span
class={`inline-block w-2 h-2 rounded-full ${qualityConfig().color}`}
/>
<span class="text-white/50 text-xs">
{qualityConfig().text}
</span>
</div>
</Show>
</div>
<p class="text-white/60 text-sm"> <p class="text-white/60 text-sm">
{participantCount()} participant {participantCount()} participant
{participantCount() !== 1 ? "s" : ""} {participantCount() !== 1 ? "s" : ""}
@ -100,8 +142,13 @@ const VoiceChannel: Component<VoiceChannelProps> = (props) => {
</div> </div>
</Show> </Show>
{/* connected state with participants grid */} {/* connected / degraded state with participants grid */}
<Show when={voiceConnectionState() === "connected"}> <Show
when={
voiceConnectionState() === "connected" ||
voiceConnectionState() === "degraded"
}
>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4"> <div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
<For each={allParticipants()}> <For each={allParticipants()}>
{(participant) => ( {(participant) => (
@ -111,6 +158,10 @@ const VoiceChannel: Component<VoiceChannelProps> = (props) => {
media_state={participant.media_state} media_state={participant.media_state}
stream={participant.stream} stream={participant.stream}
is_local={participant.is_local} is_local={participant.is_local}
connectionState={getPeerState(
participant.peer_id,
participant.is_local,
)}
/> />
)} )}
</For> </For>
@ -118,7 +169,12 @@ const VoiceChannel: Component<VoiceChannelProps> = (props) => {
</Show> </Show>
</div> </div>
<Show when={voiceConnectionState() === "connected"}> <Show
when={
voiceConnectionState() === "connected" ||
voiceConnectionState() === "degraded"
}
>
<VoiceControls /> <VoiceControls />
</Show> </Show>
</div> </div>

View File

@ -1,6 +1,6 @@
import type { Component } from "solid-js"; import type { Component } from "solid-js";
import { Show, createEffect, onCleanup } from "solid-js"; import { Show, createEffect, onCleanup } from "solid-js";
import { MicOff, VolumeX } from "lucide-solid"; import { MicOff, VolumeX, AlertTriangle } from "lucide-solid";
import Avatar from "../common/Avatar"; import Avatar from "../common/Avatar";
import { openProfileCard } from "../../stores/ui"; import { openProfileCard } from "../../stores/ui";
import type { VoiceMediaState } from "../../lib/types"; import type { VoiceMediaState } from "../../lib/types";
@ -11,6 +11,7 @@ interface VoiceParticipantTileProps {
media_state: VoiceMediaState; media_state: VoiceMediaState;
stream?: MediaStream | null; stream?: MediaStream | null;
is_local?: boolean; is_local?: boolean;
connectionState?: RTCPeerConnectionState;
} }
const VoiceParticipantTile: Component<VoiceParticipantTileProps> = (props) => { const VoiceParticipantTile: Component<VoiceParticipantTileProps> = (props) => {
@ -38,9 +39,28 @@ const VoiceParticipantTile: Component<VoiceParticipantTileProps> = (props) => {
); );
}; };
// per-peer connection state ring styling
const connectionRingClass = () => {
const state = props.connectionState;
switch (state) {
case "connected":
return "ring-2 ring-green-500/70";
case "connecting":
case "new":
return "ring-2 ring-amber-400/70 animate-pulse";
case "failed":
return "ring-2 ring-red-500/80";
case "disconnected":
return "ring-2 ring-white/30";
case "closed":
default:
return "";
}
};
return ( return (
<div <div
class="relative bg-black border border-white/10 aspect-video flex items-center justify-center overflow-hidden cursor-pointer" class={`relative bg-black border border-white/10 aspect-video flex items-center justify-center overflow-hidden cursor-pointer ${connectionRingClass()}`}
onClick={(e) => { onClick={(e) => {
openProfileCard({ openProfileCard({
peerId: props.peer_id, peerId: props.peer_id,
@ -75,15 +95,25 @@ const VoiceParticipantTile: Component<VoiceParticipantTileProps> = (props) => {
</div> </div>
</Show> </Show>
<Show when={props.media_state.muted}> {/* media state indicators — stacked vertically to avoid overlap */}
<div class="absolute top-2 right-2 bg-black/80 p-1"> <div class="absolute top-2 right-2 flex flex-col gap-1">
<MicOff size={16} class="text-[#FF4F00]" /> <Show when={props.media_state.muted}>
</div> <div class="bg-black/80 p-1">
</Show> <MicOff size={16} class="text-[#FF4F00]" />
</div>
</Show>
<Show when={props.media_state.deafened}> <Show when={props.media_state.deafened}>
<div class="absolute top-2 right-2 bg-black/80 p-1"> <div class="bg-black/80 p-1">
<VolumeX size={16} class="text-[#FF4F00]" /> <VolumeX size={16} class="text-[#FF4F00]" />
</div>
</Show>
</div>
{/* connection failed warning */}
<Show when={props.connectionState === "failed"}>
<div class="absolute bottom-2 right-2 bg-red-900/80 p-1 rounded">
<AlertTriangle size={14} class="text-red-400" />
</div> </div>
</Show> </Show>

View File

@ -398,6 +398,19 @@ export async function getVoiceParticipants(
return invoke("get_voice_participants", { communityId, channelId }); return invoke("get_voice_participants", { communityId, channelId });
} }
// -- turn credentials --
export interface TurnCredentials {
username: string;
password: string;
ttl: number;
uris: string[];
}
export async function getTurnCredentials(): Promise<TurnCredentials> {
return invoke("get_turn_credentials");
}
// -- direct messages -- // -- direct messages --
export async function sendDM( export async function sendDM(

View File

@ -2,25 +2,77 @@
// manages one RTCPeerConnection per remote peer in a full mesh topology // manages one RTCPeerConnection per remote peer in a full mesh topology
// this is a utility module with no signals - the voice store drives it // this is a utility module with no signals - the voice store drives it
// no external stun/turn servers for now, rely on host candidates only const DEFAULT_ICE_SERVERS: RTCIceServer[] = [
// this works for LAN peers and peers on the same network segment // Public STUN servers (free, no auth needed)
const rtcConfig: RTCConfiguration = { { urls: ['stun:stun.l.google.com:19302', 'stun:stun1.l.google.com:19302'] },
iceServers: [], { urls: 'stun:stun.cloudflare.com:3478' },
}; // TURN servers are added dynamically via getTurnCredentials()
];
/** Maximum ICE restart attempts before giving up on a peer */
const MAX_ICE_RESTART_ATTEMPTS = 3;
/** Delay before attempting ICE restart after disconnection (ms) */
const DISCONNECT_TIMEOUT_MS = 5000;
export interface PeerConnectionManagerConfig {
onNegotiationNeeded: (peerId: string, sdp: RTCSessionDescriptionInit) => void;
onIceCandidate: (peerId: string, candidate: RTCIceCandidate) => void;
onRemoteStream: (peerId: string, stream: MediaStream) => void;
onRemoteStreamRemoved: (peerId: string) => void;
onPeerConnectionStateChanged?: (peerId: string, state: RTCPeerConnectionState) => void;
iceServers?: RTCIceServer[];
}
/** Per-peer state tracking beyond just the RTCPeerConnection */
interface PeerState {
pc: RTCPeerConnection;
/** ICE candidates received before remote description was set */
candidateBuffer: RTCIceCandidateInit[];
/** Number of ICE restart attempts for this peer */
restartAttempts: number;
/** Timeout handle for delayed ICE restart after disconnection */
disconnectTimer: ReturnType<typeof setTimeout> | null;
}
export class PeerConnectionManager { export class PeerConnectionManager {
private connections: Map<string, RTCPeerConnection> = new Map(); private peers: Map<string, PeerState> = new Map();
private localStream: MediaStream | null = null; private localStream: MediaStream | null = null;
private screenStream: MediaStream | null = null; private screenStream: MediaStream | null = null;
private rtcConfig: RTCConfiguration;
// the local peer id, used for glare resolution during simultaneous offers // the local peer id, used for glare resolution during simultaneous offers
private localPeerId: string | null = null; private localPeerId: string | null = null;
// callbacks set by the voice store to bridge webrtc events into reactive state // ---- legacy callback properties (for backward compat with voice store) ----
// these are used when the class is constructed without a config object
onRemoteStream: ((peerId: string, stream: MediaStream) => void) | null = null; onRemoteStream: ((peerId: string, stream: MediaStream) => void) | null = null;
onRemoteStreamRemoved: ((peerId: string) => void) | null = null; onRemoteStreamRemoved: ((peerId: string) => void) | null = null;
onIceCandidate: ((peerId: string, candidate: RTCIceCandidate) => void) | null = null; onIceCandidate: ((peerId: string, candidate: RTCIceCandidate) => void) | null = null;
onNegotiationNeeded: ((peerId: string) => void) | null = null; onNegotiationNeeded: ((peerId: string, sdp?: RTCSessionDescriptionInit) => void) | null = null;
onPeerConnectionStateChanged: ((peerId: string, state: RTCPeerConnectionState) => void) | null = null;
private config: PeerConnectionManagerConfig | null = null;
constructor(config?: PeerConnectionManagerConfig) {
const iceServers = config?.iceServers ?? DEFAULT_ICE_SERVERS;
this.rtcConfig = {
iceServers,
iceTransportPolicy: 'all',
};
if (config) {
this.config = config;
// Also set legacy callback properties from config for internal use
this.onRemoteStream = config.onRemoteStream;
this.onRemoteStreamRemoved = config.onRemoteStreamRemoved;
this.onIceCandidate = config.onIceCandidate;
this.onNegotiationNeeded = (peerId: string, sdp?: RTCSessionDescriptionInit) => {
if (sdp) config.onNegotiationNeeded(peerId, sdp);
};
this.onPeerConnectionStateChanged = config.onPeerConnectionStateChanged ?? null;
}
}
setLocalPeerId(peerId: string): void { setLocalPeerId(peerId: string): void {
this.localPeerId = peerId; this.localPeerId = peerId;
@ -34,20 +86,32 @@ export class PeerConnectionManager {
this.screenStream = stream; this.screenStream = stream;
} }
// create a new peer connection for a remote peer // determine if we should be the offerer based on lexicographic peer_id comparison
// uses lexicographic peer_id comparison for glare resolution: shouldOffer(remotePeerId: string): boolean {
// the peer with the smaller id is always the offerer if (!this.localPeerId) return false;
createConnection(peerId: string): RTCPeerConnection { return this.localPeerId < remotePeerId;
// close any existing connection to this peer before creating a new one }
this.closeConnection(peerId);
const pc = new RTCPeerConnection(rtcConfig); // create a new peer connection for a remote peer
this.connections.set(peerId, pc); // accepts an optional localStream parameter (for new API), falls back to this.localStream
createConnection(peerId: string, localStream?: MediaStream | null): RTCPeerConnection {
// close any existing connection to this peer before creating a new one
this.removeConnection(peerId);
const pc = new RTCPeerConnection(this.rtcConfig);
const peerState: PeerState = {
pc,
candidateBuffer: [],
restartAttempts: 0,
disconnectTimer: null,
};
this.peers.set(peerId, peerState);
// add all local tracks to the new connection // add all local tracks to the new connection
if (this.localStream) { const stream = localStream ?? this.localStream;
for (const track of this.localStream.getTracks()) { if (stream) {
pc.addTrack(track, this.localStream); for (const track of stream.getTracks()) {
pc.addTrack(track, stream);
} }
} }
@ -58,141 +122,293 @@ export class PeerConnectionManager {
} }
// wire up event handlers // wire up event handlers
this.setupPeerEventHandlers(peerId, peerState);
return pc;
}
private setupPeerEventHandlers(peerId: string, peerState: PeerState): void {
const { pc } = peerState;
pc.onicecandidate = (event) => { pc.onicecandidate = (event) => {
if (event.candidate && this.onIceCandidate) { if (event.candidate) {
this.onIceCandidate(peerId, event.candidate); console.log(`[WebRTC] ICE candidate for ${peerId}: ${event.candidate.type ?? 'null'} ${event.candidate.candidate.substring(0, 60)}...`);
if (this.onIceCandidate) {
this.onIceCandidate(peerId, event.candidate);
}
} }
}; };
pc.ontrack = (event) => { pc.ontrack = (event) => {
console.log(`[WebRTC] Remote track received from ${peerId}: kind=${event.track.kind}`);
if (event.streams.length > 0 && this.onRemoteStream) { if (event.streams.length > 0 && this.onRemoteStream) {
this.onRemoteStream(peerId, event.streams[0]); this.onRemoteStream(peerId, event.streams[0]);
} }
}; };
pc.onnegotiationneeded = () => { pc.onnegotiationneeded = async () => {
console.log(`[WebRTC] Negotiation needed for ${peerId}`);
if (this.onNegotiationNeeded) { if (this.onNegotiationNeeded) {
this.onNegotiationNeeded(peerId); // If using new API (config-based), auto-create offer and pass SDP
if (this.config) {
if (!this.shouldOffer(peerId)) {
console.log(`[WebRTC] Skipping negotiation for ${peerId} (remote peer should offer)`);
return;
}
try {
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
this.onNegotiationNeeded(peerId, offer);
} catch (err) {
console.error(`[WebRTC] Failed to create offer during negotiation for ${peerId}:`, err);
}
} else {
// Legacy API: just notify the caller (voice store handles offer creation)
this.onNegotiationNeeded(peerId);
}
} }
}; };
pc.onconnectionstatechange = () => { pc.onconnectionstatechange = () => {
if (pc.connectionState === "failed" || pc.connectionState === "closed") { const state = pc.connectionState;
console.log(`[WebRTC] Connection state for ${peerId}: ${state}`);
// Fire per-peer connection state callback
if (this.onPeerConnectionStateChanged) {
this.onPeerConnectionStateChanged(peerId, state);
}
if (state === 'failed' || state === 'closed') {
if (this.onRemoteStreamRemoved) { if (this.onRemoteStreamRemoved) {
this.onRemoteStreamRemoved(peerId); this.onRemoteStreamRemoved(peerId);
} }
} }
}; };
return pc; pc.oniceconnectionstatechange = () => {
const iceState = pc.iceConnectionState;
console.log(`[WebRTC] ICE connection state for ${peerId}: ${iceState}`);
if (iceState === 'disconnected') {
// Start a timeout — if still disconnected after DISCONNECT_TIMEOUT_MS, attempt restart
this.clearDisconnectTimer(peerState);
peerState.disconnectTimer = setTimeout(() => {
peerState.disconnectTimer = null;
if (pc.iceConnectionState === 'disconnected') {
console.log(`[WebRTC] Peer ${peerId} still disconnected after timeout, attempting ICE restart`);
this.attemptIceRestart(peerId, peerState);
}
}, DISCONNECT_TIMEOUT_MS);
} else if (iceState === 'failed') {
// Immediately attempt ICE restart
this.clearDisconnectTimer(peerState);
console.log(`[WebRTC] ICE failed for ${peerId}, attempting ICE restart`);
this.attemptIceRestart(peerId, peerState);
} else if (iceState === 'connected' || iceState === 'completed') {
// Connection recovered — reset restart counter and clear timers
this.clearDisconnectTimer(peerState);
peerState.restartAttempts = 0;
}
};
pc.onicegatheringstatechange = () => {
console.log(`[WebRTC] ICE gathering state for ${peerId}: ${pc.iceGatheringState}`);
};
} }
// determine if we should be the offerer based on lexicographic peer_id comparison private clearDisconnectTimer(peerState: PeerState): void {
shouldOffer(remotePeerId: string): boolean { if (peerState.disconnectTimer !== null) {
if (!this.localPeerId) return false; clearTimeout(peerState.disconnectTimer);
return this.localPeerId < remotePeerId; peerState.disconnectTimer = null;
}
}
private async attemptIceRestart(peerId: string, peerState: PeerState): Promise<void> {
if (peerState.restartAttempts >= MAX_ICE_RESTART_ATTEMPTS) {
console.error(`[WebRTC] Max ICE restart attempts (${MAX_ICE_RESTART_ATTEMPTS}) reached for ${peerId}, giving up`);
if (this.onRemoteStreamRemoved) {
this.onRemoteStreamRemoved(peerId);
}
if (this.onPeerConnectionStateChanged) {
this.onPeerConnectionStateChanged(peerId, 'failed');
}
return;
}
peerState.restartAttempts++;
console.log(`[WebRTC] ICE restart attempt ${peerState.restartAttempts}/${MAX_ICE_RESTART_ATTEMPTS} for ${peerId}`);
try {
const offer = await peerState.pc.createOffer({ iceRestart: true });
await peerState.pc.setLocalDescription(offer);
if (this.onNegotiationNeeded) {
this.onNegotiationNeeded(peerId, offer);
}
} catch (err) {
console.error(`[WebRTC] Failed ICE restart for ${peerId}:`, err);
}
}
/** Flush buffered ICE candidates after remote description is set */
private async flushCandidateBuffer(peerId: string, peerState: PeerState): Promise<void> {
if (peerState.candidateBuffer.length === 0) return;
console.log(`[WebRTC] Flushing ${peerState.candidateBuffer.length} buffered ICE candidates for ${peerId}`);
const buffered = peerState.candidateBuffer.splice(0);
for (const candidate of buffered) {
try {
await peerState.pc.addIceCandidate(new RTCIceCandidate(candidate));
} catch (err) {
console.error(`[WebRTC] Failed to add buffered ICE candidate for ${peerId}:`, err);
}
}
} }
async createOffer(peerId: string): Promise<RTCSessionDescriptionInit> { async createOffer(peerId: string): Promise<RTCSessionDescriptionInit> {
const pc = this.connections.get(peerId); const peerState = this.peers.get(peerId);
if (!pc) { if (!peerState) {
throw new Error(`no connection for peer ${peerId}`); throw new Error(`[WebRTC] No connection for peer ${peerId}`);
} }
try { try {
const offer = await pc.createOffer(); const offer = await peerState.pc.createOffer();
await pc.setLocalDescription(offer); await peerState.pc.setLocalDescription(offer);
console.log(`[WebRTC] Created offer for ${peerId}`);
return offer; return offer;
} catch (err) { } catch (err) {
console.error(`failed to create offer for peer ${peerId}:`, err); console.error(`[WebRTC] Failed to create offer for ${peerId}:`, err);
throw err; throw err;
} }
} }
// handleOffer replaces createAnswer — sets remote description, creates answer, flushes candidates
async handleOffer(
peerId: string,
sdp: RTCSessionDescriptionInit,
localStream?: MediaStream | null,
): Promise<RTCSessionDescriptionInit> {
let peerState = this.peers.get(peerId);
if (!peerState) {
// Auto-create connection if it doesn't exist
this.createConnection(peerId, localStream);
peerState = this.peers.get(peerId)!;
}
try {
await peerState.pc.setRemoteDescription(new RTCSessionDescription(sdp));
console.log(`[WebRTC] Set remote offer for ${peerId}`);
// Flush any buffered ICE candidates now that remote description is set
await this.flushCandidateBuffer(peerId, peerState);
const answer = await peerState.pc.createAnswer();
await peerState.pc.setLocalDescription(answer);
console.log(`[WebRTC] Created answer for ${peerId}`);
return answer;
} catch (err) {
console.error(`[WebRTC] Failed to handle offer from ${peerId}:`, err);
throw err;
}
}
// Legacy alias for handleOffer (backward compat with voice store)
async createAnswer( async createAnswer(
peerId: string, peerId: string,
offer: RTCSessionDescriptionInit, offer: RTCSessionDescriptionInit,
): Promise<RTCSessionDescriptionInit> { ): Promise<RTCSessionDescriptionInit> {
const pc = this.connections.get(peerId); return this.handleOffer(peerId, offer);
if (!pc) { }
throw new Error(`no connection for peer ${peerId}`);
// handleAnswer replaces setRemoteAnswer — sets remote description and flushes candidates
async handleAnswer(
peerId: string,
sdp: RTCSessionDescriptionInit,
): Promise<void> {
const peerState = this.peers.get(peerId);
if (!peerState) {
throw new Error(`[WebRTC] No connection for peer ${peerId}`);
} }
try { try {
await pc.setRemoteDescription(new RTCSessionDescription(offer)); await peerState.pc.setRemoteDescription(new RTCSessionDescription(sdp));
const answer = await pc.createAnswer(); console.log(`[WebRTC] Set remote answer for ${peerId}`);
await pc.setLocalDescription(answer);
return answer; // Flush any buffered ICE candidates now that remote description is set
await this.flushCandidateBuffer(peerId, peerState);
} catch (err) { } catch (err) {
console.error(`failed to create answer for peer ${peerId}:`, err); console.error(`[WebRTC] Failed to handle answer from ${peerId}:`, err);
throw err; throw err;
} }
} }
// Legacy alias for handleAnswer (backward compat with voice store)
async setRemoteAnswer( async setRemoteAnswer(
peerId: string, peerId: string,
answer: RTCSessionDescriptionInit, answer: RTCSessionDescriptionInit,
): Promise<void> { ): Promise<void> {
const pc = this.connections.get(peerId); return this.handleAnswer(peerId, answer);
if (!pc) {
throw new Error(`no connection for peer ${peerId}`);
}
try {
await pc.setRemoteDescription(new RTCSessionDescription(answer));
} catch (err) {
console.error(`failed to set remote answer for peer ${peerId}:`, err);
throw err;
}
} }
async addIceCandidate( async addIceCandidate(
peerId: string, peerId: string,
candidate: RTCIceCandidateInit, candidate: RTCIceCandidateInit,
): Promise<void> { ): Promise<void> {
const pc = this.connections.get(peerId); const peerState = this.peers.get(peerId);
if (!pc) { if (!peerState) {
// candidate arrived before connection was created, safe to ignore // Candidate arrived before connection was created — buffer it in a temporary queue
// that will be checked when the connection is created. For now, log and drop.
console.warn(`[WebRTC] ICE candidate arrived for unknown peer ${peerId}, ignoring`);
return;
}
// If remote description is not yet set, buffer the candidate
if (!peerState.pc.remoteDescription) {
console.log(`[WebRTC] Buffering ICE candidate for ${peerId} (no remote description yet)`);
peerState.candidateBuffer.push(candidate);
return; return;
} }
try { try {
await pc.addIceCandidate(new RTCIceCandidate(candidate)); await peerState.pc.addIceCandidate(new RTCIceCandidate(candidate));
} catch (err) { } catch (err) {
// ice candidates can arrive out of order or for stale connections // ICE candidates can arrive out of order or for stale connections
console.error(`failed to add ice candidate for peer ${peerId}:`, err); console.error(`[WebRTC] Failed to add ICE candidate for ${peerId}:`, err);
} }
} }
closeConnection(peerId: string): void { /** Perform an ICE restart for a specific peer. Returns new offer SDP or null on failure. */
const pc = this.connections.get(peerId); async restartIce(peerId: string): Promise<RTCSessionDescriptionInit | null> {
if (pc) { const peerState = this.peers.get(peerId);
pc.onicecandidate = null; if (!peerState) {
pc.ontrack = null; console.error(`[WebRTC] Cannot restart ICE: no connection for peer ${peerId}`);
pc.onnegotiationneeded = null; return null;
pc.onconnectionstatechange = null; }
pc.close();
this.connections.delete(peerId); try {
peerState.restartAttempts++;
console.log(`[WebRTC] Manual ICE restart for ${peerId} (attempt ${peerState.restartAttempts})`);
const offer = await peerState.pc.createOffer({ iceRestart: true });
await peerState.pc.setLocalDescription(offer);
return offer;
} catch (err) {
console.error(`[WebRTC] Failed manual ICE restart for ${peerId}:`, err);
return null;
} }
} }
closeAll(): void { /** Get the current connection state for a specific peer */
for (const [peerId] of this.connections) { getPeerState(peerId: string): RTCPeerConnectionState | undefined {
this.closeConnection(peerId); const peerState = this.peers.get(peerId);
} return peerState?.pc.connectionState;
this.connections.clear();
this.localStream = null;
this.screenStream = null;
}
getConnection(peerId: string): RTCPeerConnection | undefined {
return this.connections.get(peerId);
} }
// replaces tracks on all existing connections // replaces tracks on all existing connections
// used when toggling video or screen share mid-call // overloaded: can be called with no args (legacy) or with (stream, kind) (new API)
updateTracks(): void { updateTracks(stream?: MediaStream | null, kind?: 'audio' | 'video'): void {
for (const [, pc] of this.connections) { for (const [, peerState] of this.peers) {
const { pc } = peerState;
const senders = pc.getSenders(); const senders = pc.getSenders();
// build the set of tracks we want active on each connection // build the set of tracks we want active on each connection
@ -204,6 +420,16 @@ export class PeerConnectionManager {
desiredTracks.push(...this.screenStream.getTracks()); desiredTracks.push(...this.screenStream.getTracks());
} }
// If called with specific stream and kind, handle targeted update
if (stream && kind) {
const newTracks = stream.getTracks().filter((t) => t.kind === kind);
for (const track of newTracks) {
if (!desiredTracks.some((t) => t.id === track.id)) {
desiredTracks.push(track);
}
}
}
// replace or add tracks that should be present // replace or add tracks that should be present
for (const track of desiredTracks) { for (const track of desiredTracks) {
const existingSender = senders.find( const existingSender = senders.find(
@ -212,19 +438,20 @@ export class PeerConnectionManager {
if (!existingSender) { if (!existingSender) {
// check if there is a sender with the same kind we can replace // check if there is a sender with the same kind we can replace
const kindSender = senders.find( const kindSender = senders.find(
(s) => s.track?.kind === track.kind || (!s.track && true), (s) => s.track?.kind === track.kind || (s.track === null && track.kind !== undefined),
); );
if (kindSender) { if (kindSender) {
kindSender.replaceTrack(track).catch((err) => { kindSender.replaceTrack(track).catch((err) => {
console.error("failed to replace track:", err); console.error('[WebRTC] Failed to replace track:', err);
}); });
} else { } else {
// no existing sender for this kind, add a new one // no existing sender for this kind, add a new one
const stream = track.kind === "video" && this.screenStream?.getVideoTracks().includes(track) const parentStream =
? this.screenStream track.kind === 'video' && this.screenStream?.getVideoTracks().includes(track)
: this.localStream; ? this.screenStream
if (stream) { : this.localStream;
pc.addTrack(track, stream); if (parentStream) {
pc.addTrack(track, parentStream);
} }
} }
} }
@ -237,10 +464,45 @@ export class PeerConnectionManager {
try { try {
pc.removeTrack(sender); pc.removeTrack(sender);
} catch (err) { } catch (err) {
console.error("failed to remove track:", err); console.error('[WebRTC] Failed to remove track:', err);
} }
} }
} }
} }
} }
// removeConnection (also aliased as closeConnection for backward compat)
removeConnection(peerId: string): void {
const peerState = this.peers.get(peerId);
if (peerState) {
this.clearDisconnectTimer(peerState);
const { pc } = peerState;
pc.onicecandidate = null;
pc.ontrack = null;
pc.onnegotiationneeded = null;
pc.onconnectionstatechange = null;
pc.oniceconnectionstatechange = null;
pc.onicegatheringstatechange = null;
pc.close();
this.peers.delete(peerId);
}
}
// Legacy alias for removeConnection (backward compat with voice store)
closeConnection(peerId: string): void {
this.removeConnection(peerId);
}
closeAll(): void {
for (const [peerId] of this.peers) {
this.removeConnection(peerId);
}
this.peers.clear();
this.localStream = null;
this.screenStream = null;
}
getConnection(peerId: string): RTCPeerConnection | undefined {
return this.peers.get(peerId)?.pc;
}
} }

View File

@ -1,4 +1,4 @@
import { createSignal } from "solid-js"; import { createSignal, createMemo } from "solid-js";
import type { VoiceMediaState, VoiceParticipant } from "../lib/types"; import type { VoiceMediaState, VoiceParticipant } from "../lib/types";
import { PeerConnectionManager } from "../lib/webrtc"; import { PeerConnectionManager } from "../lib/webrtc";
import { import {
@ -7,6 +7,7 @@ import {
updateVoiceMediaState, updateVoiceMediaState,
sendVoiceSdp, sendVoiceSdp,
sendVoiceIceCandidate, sendVoiceIceCandidate,
getTurnCredentials,
} from "../lib/tauri"; } from "../lib/tauri";
import { identity } from "./identity"; import { identity } from "./identity";
@ -30,16 +31,35 @@ const [remoteStreams, setRemoteStreams] = createSignal<
>(new Map()); >(new Map());
const [screenStream, setScreenStream] = createSignal<MediaStream | null>(null); const [screenStream, setScreenStream] = createSignal<MediaStream | null>(null);
// per-peer WebRTC connection state tracking
const [peerConnectionStates, setPeerConnectionStates] = createSignal<
Record<string, RTCPeerConnectionState>
>({});
// tracks the voice connection lifecycle so the ui can show proper feedback // tracks the voice connection lifecycle so the ui can show proper feedback
export type VoiceConnectionState = export type VoiceConnectionState =
| "idle" | "idle"
| "connecting" | "connecting"
| "connected" | "connected"
| "degraded"
| "error"; | "error";
const [voiceConnectionState, setVoiceConnectionState] = const [voiceConnectionState, setVoiceConnectionState] =
createSignal<VoiceConnectionState>("idle"); createSignal<VoiceConnectionState>("idle");
const [voiceError, setVoiceError] = createSignal<string | null>(null); const [voiceError, setVoiceError] = createSignal<string | null>(null);
// overall voice connection quality summary derived from per-peer states
const voiceQuality = createMemo(() => {
const states = peerConnectionStates();
const entries = Object.entries(states);
if (entries.length === 0) return "good";
const connected = entries.filter(([, s]) => s === "connected").length;
const failed = entries.filter(([, s]) => s === "failed").length;
if (failed === entries.length) return "failed";
if (failed > 0) return "degraded";
if (connected === entries.length) return "good";
return "connecting";
});
// derived signal for convenience // derived signal for convenience
export function isInVoice(): boolean { export function isInVoice(): boolean {
return voiceChannelId() !== null; return voiceChannelId() !== null;
@ -48,71 +68,109 @@ export function isInVoice(): boolean {
// single peer connection manager instance for the lifetime of a voice session // single peer connection manager instance for the lifetime of a voice session
let peerManager: PeerConnectionManager | null = null; let peerManager: PeerConnectionManager | null = null;
// evaluate overall voice connection state from per-peer states
function evaluateOverallVoiceState(): void {
const states = peerConnectionStates();
const entries = Object.entries(states);
// if no peers, we're the only participant — stay connected
if (entries.length === 0) {
// only update if we're currently in a voice channel (not leaving)
if (voiceChannelId() !== null && voiceConnectionState() !== "idle") {
setVoiceConnectionState("connected");
}
return;
}
const connected = entries.filter(([, s]) => s === "connected").length;
const failed = entries.filter(([, s]) => s === "failed").length;
if (connected > 0 && failed > 0) {
setVoiceConnectionState("degraded");
} else if (connected > 0) {
setVoiceConnectionState("connected");
} else if (failed === entries.length) {
setVoiceConnectionState("error");
}
// otherwise remain in "connecting" state (peers still negotiating)
}
// initialize the peer manager with callbacks wired to our handlers // initialize the peer manager with callbacks wired to our handlers
function initPeerManager(): PeerConnectionManager { function initPeerManager(iceServers?: RTCIceServer[]): PeerConnectionManager {
const manager = new PeerConnectionManager(); const manager = new PeerConnectionManager({
iceServers,
onRemoteStream: (peerId: string, stream: MediaStream) => {
console.log(`[Voice] Remote stream received from ${peerId}`);
setRemoteStreams((prev) => {
const next = new Map(prev);
next.set(peerId, stream);
return next;
});
},
manager.onRemoteStream = (peerId: string, stream: MediaStream) => { onRemoteStreamRemoved: (peerId: string) => {
setRemoteStreams((prev) => { console.log(`[Voice] Remote stream removed for ${peerId}`);
const next = new Map(prev); setRemoteStreams((prev) => {
next.set(peerId, stream); const next = new Map(prev);
return next; next.delete(peerId);
}); return next;
}; });
},
manager.onRemoteStreamRemoved = (peerId: string) => { onIceCandidate: async (peerId: string, candidate: RTCIceCandidate) => {
setRemoteStreams((prev) => { const communityId = voiceCommunityId();
const next = new Map(prev); const channelId = voiceChannelId();
next.delete(peerId); if (!communityId || !channelId) return;
return next;
});
};
manager.onIceCandidate = async ( try {
peerId: string, await sendVoiceIceCandidate(
candidate: RTCIceCandidate, communityId,
) => { channelId,
const communityId = voiceCommunityId(); peerId,
const channelId = voiceChannelId(); candidate.candidate,
if (!communityId || !channelId) return; candidate.sdpMid,
candidate.sdpMLineIndex,
);
} catch (err) {
console.error("[Voice] Failed to send ICE candidate:", err);
}
},
try { onNegotiationNeeded: async (
await sendVoiceIceCandidate( peerId: string,
communityId, sdp: RTCSessionDescriptionInit,
channelId, ) => {
peerId, const communityId = voiceCommunityId();
candidate.candidate, const channelId = voiceChannelId();
candidate.sdpMid, if (!communityId || !channelId) return;
candidate.sdpMLineIndex,
);
} catch (err) {
console.error("failed to send ice candidate:", err);
}
};
manager.onNegotiationNeeded = async (peerId: string) => { // the webrtc module handles glare resolution and creates the offer/restart SDP
const communityId = voiceCommunityId(); // we just need to send it via the signaling channel
const channelId = voiceChannelId(); try {
if (!communityId || !channelId) return; console.log(
`[Voice] Sending ${sdp.type} SDP to ${peerId} (negotiation/restart)`,
);
await sendVoiceSdp(
communityId,
channelId,
peerId,
sdp.type || "offer",
sdp.sdp || "",
);
} catch (err) {
console.error("[Voice] Failed to send SDP during negotiation:", err);
}
},
// only the peer with the lexicographically smaller id initiates the offer onPeerConnectionStateChanged: (
if (!manager.shouldOffer(peerId)) { peerId: string,
return; state: RTCPeerConnectionState,
} ) => {
console.log(`[Voice] Peer ${peerId} connection state: ${state}`);
try { setPeerConnectionStates((prev) => ({ ...prev, [peerId]: state }));
const offer = await manager.createOffer(peerId); evaluateOverallVoiceState();
await sendVoiceSdp( },
communityId, });
channelId,
peerId,
offer.type || "offer",
offer.sdp || "",
);
} catch (err) {
console.error("failed to send offer during renegotiation:", err);
}
};
return manager; return manager;
} }
@ -177,8 +235,29 @@ export async function joinVoice(
const stream = await acquireLocalMedia(false); const stream = await acquireLocalMedia(false);
setLocalStream(stream); setLocalStream(stream);
// fetch TURN credentials from our relay server
let turnServers: RTCIceServer[] = [];
try {
const creds = await getTurnCredentials();
turnServers = [{
urls: creds.uris,
username: creds.username,
credential: creds.password,
}];
console.log(`[Voice] Fetched TURN credentials (ttl=${creds.ttl}s, uris=${creds.uris.length})`);
} catch (e) {
console.warn('[Voice] Failed to fetch TURN credentials, proceeding without TURN:', e);
}
// combine public STUN servers with dynamic TURN servers
const iceServers: RTCIceServer[] = [
{ urls: ['stun:stun.l.google.com:19302', 'stun:stun1.l.google.com:19302'] },
{ urls: 'stun:stun.cloudflare.com:3478' },
...turnServers,
];
// initialize peer manager and set our local peer id for glare resolution // initialize peer manager and set our local peer id for glare resolution
peerManager = initPeerManager(); peerManager = initPeerManager(iceServers);
const localPeerId = identity()?.peer_id; const localPeerId = identity()?.peer_id;
if (localPeerId) { if (localPeerId) {
peerManager.setLocalPeerId(localPeerId); peerManager.setLocalPeerId(localPeerId);
@ -191,13 +270,22 @@ export async function joinVoice(
setVoiceChannelId(channelId); setVoiceChannelId(channelId);
setVoiceCommunityId(communityId); setVoiceCommunityId(communityId);
setVoiceParticipants(participants); setVoiceParticipants(participants);
setVoiceConnectionState("connected");
// determine how many remote peers we need to connect to
const remotePeers = participants.filter(
(p) => p.peer_id !== localPeerId,
);
if (remotePeers.length === 0) {
// we're the only participant — no peers to connect to, so we're connected
console.log("[Voice] No remote peers, marking as connected");
setVoiceConnectionState("connected");
}
// otherwise stay in "connecting" until onPeerConnectionStateChanged fires
// create peer connections for all existing participants // create peer connections for all existing participants
// we only initiate offers if our peer id is lexicographically smaller // we only initiate offers if our peer id is lexicographically smaller
for (const participant of participants) { for (const participant of remotePeers) {
if (participant.peer_id === localPeerId) continue;
peerManager.createConnection(participant.peer_id); peerManager.createConnection(participant.peer_id);
if (peerManager.shouldOffer(participant.peer_id)) { if (peerManager.shouldOffer(participant.peer_id)) {
@ -212,14 +300,14 @@ export async function joinVoice(
); );
} catch (err) { } catch (err) {
console.error( console.error(
`failed to create offer for ${participant.peer_id}:`, `[Voice] Failed to create offer for ${participant.peer_id}:`,
err, err,
); );
} }
} }
} }
} catch (err) { } catch (err) {
console.error("failed to join voice channel:", err); console.error("[Voice] Failed to join voice channel:", err);
// surface a readable error message to the ui // surface a readable error message to the ui
const message = err instanceof Error ? err.message : String(err); const message = err instanceof Error ? err.message : String(err);
setVoiceError(message); setVoiceError(message);
@ -248,8 +336,9 @@ export async function leaveVoice(): Promise<void> {
releaseLocalMedia(); releaseLocalMedia();
releaseScreenShare(); releaseScreenShare();
// clear remote streams // clear remote streams and peer connection states
setRemoteStreams(new Map()); setRemoteStreams(new Map());
setPeerConnectionStates({});
// tell the backend to leave // tell the backend to leave
if (communityId && channelId) { if (communityId && channelId) {
@ -381,6 +470,10 @@ export async function toggleVideo(): Promise<void> {
if (videoTrack && localStream()) { if (videoTrack && localStream()) {
// add video track to existing stream // add video track to existing stream
localStream()!.addTrack(videoTrack); localStream()!.addTrack(videoTrack);
// stop the unused audio tracks from the new stream to prevent resource leak
for (const audioTrack of videoStream.getAudioTracks()) {
audioTrack.stop();
}
} else if (videoTrack) { } else if (videoTrack) {
setLocalStream(videoStream); setLocalStream(videoStream);
} }
@ -601,12 +694,20 @@ export function handleVoiceParticipantLeft(payload: {
peerManager.closeConnection(payload.peer_id); peerManager.closeConnection(payload.peer_id);
} }
// remove remote stream // remove remote stream and peer connection state
setRemoteStreams((prev) => { setRemoteStreams((prev) => {
const next = new Map(prev); const next = new Map(prev);
next.delete(payload.peer_id); next.delete(payload.peer_id);
return next; return next;
}); });
setPeerConnectionStates((prev) => {
const next = { ...prev };
delete next[payload.peer_id];
return next;
});
// re-evaluate overall voice state after peer removal
evaluateOverallVoiceState();
} }
export function handleVoiceMediaStateChanged(payload: { export function handleVoiceMediaStateChanged(payload: {
@ -708,4 +809,6 @@ export {
screenStream, screenStream,
voiceConnectionState, voiceConnectionState,
voiceError, voiceError,
peerConnectionStates,
voiceQuality,
}; };