From 11a987e0de493e418173fe2b74ccb5da4095f5b5 Mon Sep 17 00:00:00 2001 From: cloudwithax Date: Sun, 15 Feb 2026 22:46:59 -0500 Subject: [PATCH] feat: enhance DM search functionality and improve message loading - Refactor DMSearchPanel to utilize a debounced search mechanism with improved filtering options. - Implement asynchronous message searching with new searchDMMessages API. - Introduce VirtualMessageList for optimized rendering of messages in DMChatArea. - Add unread message count indicator in ServerList for better user awareness. - Enhance message loading logic to support pagination and lazy loading of older messages. - Update types and stores to accommodate new search filters and message handling. --- src-tauri/Cargo.lock | 74 ++ src-tauri/Cargo.toml | 1 + src-tauri/src/commands/chat.rs | 8 +- src-tauri/src/commands/community.rs | 151 ++- src-tauri/src/commands/dm.rs | 43 + src-tauri/src/commands/identity.rs | 10 +- src-tauri/src/commands/voice.rs | 19 +- src-tauri/src/crdt/document.rs | 26 + src-tauri/src/crdt/mod.rs | 34 +- src-tauri/src/dev_server.rs | 159 ++- src-tauri/src/lib.rs | 1 + src-tauri/src/node/mod.rs | 69 +- src-tauri/src/storage/disk.rs | 1275 +++++++++++++++++--- src-tauri/src/storage/mod.rs | 1 + src-tauri/src/verification/mod.rs | 17 +- src/App.tsx | 110 +- src/components/chat/DMSearchPanel.tsx | 326 ++--- src/components/chat/VirtualMessageList.tsx | 451 +++++++ src/components/layout/DMChatArea.tsx | 138 ++- src/components/layout/ServerList.tsx | 40 +- src/lib/tauri.ts | 17 + src/lib/types.ts | 13 + src/stores/communities.ts | 11 +- src/stores/dms.ts | 20 +- 24 files changed, 2510 insertions(+), 504 deletions(-) create mode 100644 src/components/chat/VirtualMessageList.tsx diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 4fb9b00..887da1f 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -43,6 +43,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -1287,6 +1299,7 @@ dependencies = [ "libp2p", "log", "rand 0.8.5", + "rusqlite", "serde", "serde_json", "sha2", @@ -1473,6 +1486,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -2058,6 +2083,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -2075,6 +2109,15 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.4.1" @@ -3318,6 +3361,17 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -4944,6 +4998,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags 2.10.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-hash" version = "2.1.1" @@ -6559,6 +6627,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version-compare" version = "0.2.1" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 290c477..49a1265 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -52,6 +52,7 @@ hex = "0.4" # data storage directories = "5" +rusqlite = { version = "0.32", features = ["bundled"] } # env file support dotenvy = "0.15" diff --git a/src-tauri/src/commands/chat.rs b/src-tauri/src/commands/chat.rs index ffa4bf2..afd5ae9 100644 --- a/src-tauri/src/commands/chat.rs +++ b/src-tauri/src/commands/chat.rs @@ -129,7 +129,13 @@ pub async fn start_node(app: tauri::AppHandle, state: State<'_, AppState>) -> Re let namespace = format!("dusk/community/{}", community_id); let _ = handle .command_tx - .send(NodeCommand::RegisterRendezvous { namespace }) + .send(NodeCommand::RegisterRendezvous { + namespace: namespace.clone(), + }) + .await; + let _ = handle + .command_tx + .send(NodeCommand::DiscoverRendezvous { namespace }) .await; } diff --git a/src-tauri/src/commands/community.rs b/src-tauri/src/commands/community.rs index a432b66..84bf0cd 100644 --- a/src-tauri/src/commands/community.rs +++ b/src-tauri/src/commands/community.rs @@ -3,6 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use sha2::{Digest, Sha256}; use tauri::State; +use crate::crdt::sync::{DocumentSnapshot, SyncMessage}; use crate::node::gossip; use crate::node::NodeCommand; use crate::protocol::community::{CategoryMeta, ChannelKind, ChannelMeta, CommunityMeta, Member}; @@ -34,14 +35,60 @@ fn check_permission( // helper to broadcast a crdt change to peers via the sync topic async fn broadcast_sync(state: &State<'_, AppState>, community_id: &str) { + let doc_bytes = { + let mut engine = state.crdt_engine.lock().await; + engine.get_doc_bytes(community_id) + }; + + let Some(doc_bytes) = doc_bytes else { + return; + }; + + let sync_msg = SyncMessage::DocumentOffer(DocumentSnapshot { + community_id: community_id.to_string(), + doc_bytes, + }); + + let data = match serde_json::to_vec(&sync_msg) { + Ok(data) => data, + Err(_) => return, + }; + let node_handle = state.node_handle.lock().await; if let Some(ref handle) = *node_handle { - let sync_topic = "dusk/sync".to_string(); let _ = handle .command_tx .send(NodeCommand::SendMessage { - topic: sync_topic, - data: community_id.as_bytes().to_vec(), + topic: gossip::topic_for_sync(), + data, + }) + .await; + } +} + +// request a full sync from currently connected peers +async fn request_sync(state: &State<'_, AppState>) { + let peer_id = { + let identity = state.identity.lock().await; + let Some(id) = identity.as_ref() else { + return; + }; + id.peer_id.to_string() + }; + + let sync_msg = SyncMessage::RequestSync { peer_id }; + let data = match serde_json::to_vec(&sync_msg) { + Ok(data) => data, + Err(_) => return, + }; + + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: gossip::topic_for_sync(), + data, }) .await; } @@ -131,20 +178,16 @@ pub async fn join_community( let invite = crate::protocol::community::InviteCode::decode(&invite_code)?; let identity = state.identity.lock().await; - let id = identity.as_ref().ok_or("no identity loaded")?; - let peer_id_str = id.peer_id.to_string(); + if identity.is_none() { + return Err("no identity loaded".to_string()); + } drop(identity); // create a placeholder document that will be backfilled via crdt sync // once we connect to existing community members through the relay let mut engine = state.crdt_engine.lock().await; if !engine.has_community(&invite.community_id) { - engine.create_community( - &invite.community_id, - &invite.community_name, - "", - &peer_id_str, - )?; + engine.create_placeholder_community(&invite.community_id, &invite.community_name, "")?; } let meta = engine.get_community_meta(&invite.community_id)?; @@ -200,6 +243,9 @@ pub async fn join_community( .await; } + // request a snapshot now so joins work even when peers were already connected + request_sync(&state).await; + Ok(meta) } @@ -208,26 +254,50 @@ pub async fn leave_community( state: State<'_, AppState>, community_id: String, ) -> Result<(), String> { - // unsubscribe from all community topics + let local_peer_id = { + let identity = state.identity.lock().await; + let id = identity.as_ref().ok_or("no identity loaded")?; + id.peer_id.to_string() + }; + + // remove local user from the shared member list before leaving + let mut removed_self = false; + let channels = { + let mut engine = state.crdt_engine.lock().await; + let channels = engine.get_channels(&community_id).unwrap_or_default(); + + if let Ok(members) = engine.get_members(&community_id) { + if members.iter().any(|member| member.peer_id == local_peer_id) { + if engine.remove_member(&community_id, &local_peer_id).is_ok() { + removed_self = true; + } + } + } + + channels + }; + + if removed_self { + broadcast_sync(&state, &community_id).await; + } + + // unsubscribe from all community topics and stop advertising this namespace let node_handle = state.node_handle.lock().await; if let Some(ref handle) = *node_handle { - let engine = state.crdt_engine.lock().await; - if let Ok(channels) = engine.get_channels(&community_id) { - for channel in &channels { - let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); - let _ = handle - .command_tx - .send(NodeCommand::Unsubscribe { topic: msg_topic }) - .await; + for channel in &channels { + let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { topic: msg_topic }) + .await; - let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); - let _ = handle - .command_tx - .send(NodeCommand::Unsubscribe { - topic: typing_topic, - }) - .await; - } + let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { + topic: typing_topic, + }) + .await; } let presence_topic = gossip::topic_for_presence(&community_id); @@ -237,8 +307,18 @@ pub async fn leave_community( topic: presence_topic, }) .await; + + let namespace = format!("dusk/community/{}", community_id); + let _ = handle + .command_tx + .send(NodeCommand::UnregisterRendezvous { namespace }) + .await; } + // remove local cached community state so leave persists across restarts + let mut engine = state.crdt_engine.lock().await; + engine.remove_community(&community_id)?; + Ok(()) } @@ -313,6 +393,8 @@ pub async fn create_channel( .await; } + broadcast_sync(&state, &community_id).await; + Ok(channel) } @@ -353,18 +435,7 @@ pub async fn create_category( engine.create_category(&community_id, &category)?; drop(engine); - // broadcast the change via document sync - let node_handle = state.node_handle.lock().await; - if let Some(ref handle) = *node_handle { - let sync_topic = "dusk/sync".to_string(); - let _ = handle - .command_tx - .send(NodeCommand::SendMessage { - topic: sync_topic, - data: community_id.as_bytes().to_vec(), - }) - .await; - } + broadcast_sync(&state, &community_id).await; Ok(category) } diff --git a/src-tauri/src/commands/dm.rs b/src-tauri/src/commands/dm.rs index 180f621..b1e4b74 100644 --- a/src-tauri/src/commands/dm.rs +++ b/src-tauri/src/commands/dm.rs @@ -7,6 +7,7 @@ use crate::node::NodeCommand; use crate::protocol::messages::{ DMConversationMeta, DMTypingIndicator, DirectMessage, GossipMessage, }; +use crate::storage::DmSearchParams; use crate::AppState; // send a direct message to a peer @@ -138,6 +139,48 @@ pub async fn get_dm_messages( .map_err(|e| format!("failed to load dm messages: {}", e)) } +// search dm messages on the backend using sqlite indexes +#[tauri::command] +pub async fn search_dm_messages( + state: State<'_, AppState>, + peer_id: String, + query: Option, + from_filter: Option, + media_filter: Option, + mentions_only: Option, + date_after: Option, + date_before: Option, + limit: Option, +) -> Result, String> { + let identity = state.identity.lock().await; + let id = identity.as_ref().ok_or("no identity loaded")?; + let local_peer_id = id.peer_id.to_string(); + drop(identity); + + let conversation_id = gossip::dm_conversation_id(&local_peer_id, &peer_id); + + let from_peer = match from_filter.as_deref() { + Some("me") => Some(local_peer_id), + Some("them") => Some(peer_id.clone()), + _ => None, + }; + + let params = DmSearchParams { + query, + from_peer, + media_filter, + mentions_only: mentions_only.unwrap_or(false), + date_after, + date_before, + limit: limit.unwrap_or(200), + }; + + state + .storage + .search_dm_messages(&conversation_id, ¶ms) + .map_err(|e| format!("failed to search dm messages: {}", e)) +} + // load all dm conversations for the sidebar #[tauri::command] pub async fn get_dm_conversations( diff --git a/src-tauri/src/commands/identity.rs b/src-tauri/src/commands/identity.rs index 46babfa..f129e3c 100644 --- a/src-tauri/src/commands/identity.rs +++ b/src-tauri/src/commands/identity.rs @@ -323,7 +323,10 @@ pub async fn set_relay_address( { let mut node_handle = state.node_handle.lock().await; if let Some(handle) = node_handle.take() { - let _ = handle.command_tx.send(crate::node::NodeCommand::Shutdown).await; + let _ = handle + .command_tx + .send(crate::node::NodeCommand::Shutdown) + .await; let _ = handle.task.await; } } @@ -409,10 +412,7 @@ pub async fn reset_identity(state: State<'_, AppState>) -> Result<(), String> { // write an svg string to a cache directory and return the absolute path // used for notification icons so the os can display the user's avatar #[tauri::command] -pub async fn cache_avatar_icon( - cache_key: String, - svg_content: String, -) -> Result { +pub async fn cache_avatar_icon(cache_key: String, svg_content: String) -> Result { let cache_dir = std::env::temp_dir().join("dusk-avatars"); std::fs::create_dir_all(&cache_dir) .map_err(|e| format!("failed to create avatar cache dir: {}", e))?; diff --git a/src-tauri/src/commands/voice.rs b/src-tauri/src/commands/voice.rs index 34a9946..b871177 100644 --- a/src-tauri/src/commands/voice.rs +++ b/src-tauri/src/commands/voice.rs @@ -44,8 +44,7 @@ pub async fn join_voice_channel( display_name: display_name.clone(), media_state: media_state.clone(), }; - let data = - serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; + let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let _ = handle .command_tx .send(NodeCommand::SendMessage { @@ -95,8 +94,7 @@ pub async fn leave_voice_channel( channel_id: channel_id.clone(), peer_id: peer_id.clone(), }; - let data = - serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; + let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let _ = handle .command_tx .send(NodeCommand::SendMessage { @@ -108,9 +106,7 @@ pub async fn leave_voice_channel( // unsubscribe from the voice topic let _ = handle .command_tx - .send(NodeCommand::Unsubscribe { - topic: voice_topic, - }) + .send(NodeCommand::Unsubscribe { topic: voice_topic }) .await; } @@ -151,8 +147,7 @@ pub async fn update_voice_media_state( peer_id: peer_id.clone(), media_state: media_state.clone(), }; - let data = - serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; + let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let _ = handle .command_tx .send(NodeCommand::SendMessage { @@ -200,8 +195,7 @@ pub async fn send_voice_sdp( sdp_type, sdp, }; - let data = - serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; + let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let _ = handle .command_tx .send(NodeCommand::SendMessage { @@ -241,8 +235,7 @@ pub async fn send_voice_ice_candidate( sdp_mid, sdp_mline_index, }; - let data = - serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; + let data = serde_json::to_vec(&msg).map_err(|e| format!("serialize error: {}", e))?; let _ = handle .command_tx .send(NodeCommand::SendMessage { diff --git a/src-tauri/src/crdt/document.rs b/src-tauri/src/crdt/document.rs index b499632..4b1ce4d 100644 --- a/src-tauri/src/crdt/document.rs +++ b/src-tauri/src/crdt/document.rs @@ -50,6 +50,32 @@ pub fn init_community_doc( Ok(()) } +// initialize a placeholder community document used while waiting for remote sync +// this avoids granting local owner role or creating channels that may conflict +pub fn init_placeholder_community_doc( + doc: &mut AutoCommit, + name: &str, + description: &str, +) -> Result<(), automerge::AutomergeError> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let meta = doc.put_object(ROOT, "meta", ObjType::Map)?; + doc.put(&meta, "name", name)?; + doc.put(&meta, "description", description)?; + doc.put(&meta, "created_by", "")?; + doc.put(&meta, "created_at", now as i64)?; + + let _channels = doc.put_object(ROOT, "channels", ObjType::Map)?; + let _categories = doc.put_object(ROOT, "categories", ObjType::Map)?; + let _members = doc.put_object(ROOT, "members", ObjType::Map)?; + let _roles = doc.put_object(ROOT, "roles", ObjType::Map)?; + + Ok(()) +} + // add a new channel to the community document pub fn add_channel( doc: &mut AutoCommit, diff --git a/src-tauri/src/crdt/mod.rs b/src-tauri/src/crdt/mod.rs index 7b90531..5e4d53e 100644 --- a/src-tauri/src/crdt/mod.rs +++ b/src-tauri/src/crdt/mod.rs @@ -64,6 +64,22 @@ impl CrdtEngine { Ok(()) } + // create a minimal community document while waiting for remote sync + pub fn create_placeholder_community( + &mut self, + community_id: &str, + name: &str, + description: &str, + ) -> Result<(), String> { + let mut doc = AutoCommit::new(); + document::init_placeholder_community_doc(&mut doc, name, description) + .map_err(|e| format!("failed to init placeholder community doc: {}", e))?; + + self.documents.insert(community_id.to_string(), doc); + self.persist(community_id)?; + Ok(()) + } + // add a channel to an existing community pub fn create_channel( &mut self, @@ -189,6 +205,18 @@ impl CrdtEngine { self.documents.contains_key(community_id) } + // fully remove a community from memory and disk + pub fn remove_community(&mut self, community_id: &str) -> Result<(), String> { + self.documents.remove(community_id); + self.storage + .delete_document(community_id) + .map_err(|e| format!("failed to delete community document: {}", e))?; + self.storage + .delete_community_meta(community_id) + .map_err(|e| format!("failed to delete community meta: {}", e))?; + Ok(()) + } + // save a document to disk pub fn persist(&mut self, community_id: &str) -> Result<(), String> { let doc = self @@ -350,11 +378,7 @@ impl CrdtEngine { } // remove a category and ungroup its channels - pub fn delete_category( - &mut self, - community_id: &str, - category_id: &str, - ) -> Result<(), String> { + pub fn delete_category(&mut self, community_id: &str, category_id: &str) -> Result<(), String> { let doc = self .documents .get_mut(community_id) diff --git a/src-tauri/src/dev_server.rs b/src-tauri/src/dev_server.rs index 42732c5..27b83ad 100644 --- a/src-tauri/src/dev_server.rs +++ b/src-tauri/src/dev_server.rs @@ -19,6 +19,7 @@ use axum::Router; use serde::Deserialize; use tokio::sync::Mutex; +use crate::crdt::sync::{DocumentSnapshot, SyncMessage}; use crate::crdt::CrdtEngine; use crate::node::gossip; use crate::node::NodeCommand; @@ -134,6 +135,67 @@ fn now_ms() -> u64 { .as_millis() as u64 } +// publish the latest document snapshot for a community to connected peers +async fn broadcast_sync(state: &DevState, community_id: &str) { + let doc_bytes = { + let mut engine = state.crdt_engine.lock().await; + engine.get_doc_bytes(community_id) + }; + + let Some(doc_bytes) = doc_bytes else { + return; + }; + + let message = SyncMessage::DocumentOffer(DocumentSnapshot { + community_id: community_id.to_string(), + doc_bytes, + }); + + let data = match serde_json::to_vec(&message) { + Ok(data) => data, + Err(_) => return, + }; + + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: gossip::topic_for_sync(), + data, + }) + .await; + } +} + +// request snapshots from connected peers +async fn request_sync(state: &DevState) { + let peer_id = { + let identity = state.identity.lock().await; + let Some(id) = identity.as_ref() else { + return; + }; + id.peer_id.to_string() + }; + + let message = SyncMessage::RequestSync { peer_id }; + let data = match serde_json::to_vec(&message) { + Ok(data) => data, + Err(_) => return, + }; + + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: gossip::topic_for_sync(), + data, + }) + .await; + } +} + // find the community that owns a given channel fn find_community_for_channel( engine: &crate::crdt::CrdtEngine, @@ -405,21 +467,18 @@ async fn join_community( .map_err(|e| ApiError(StatusCode::BAD_REQUEST, e))?; let identity = state.identity.lock().await; - let id = identity - .as_ref() - .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; - let peer_id_str = id.peer_id.to_string(); + if identity.is_none() { + return Err(ApiError( + StatusCode::UNAUTHORIZED, + "no identity loaded".into(), + )); + } drop(identity); let mut engine = state.crdt_engine.lock().await; if !engine.has_community(&invite.community_id) { engine - .create_community( - &invite.community_id, - &invite.community_name, - "", - &peer_id_str, - ) + .create_placeholder_community(&invite.community_id, &invite.community_name, "") .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; } @@ -473,6 +532,8 @@ async fn join_community( .await; } + request_sync(&state).await; + Ok(Json(meta)) } @@ -480,25 +541,50 @@ async fn leave_community( State(state): State, Path(community_id): Path, ) -> ApiResult { + let local_peer_id = { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + id.peer_id.to_string() + }; + + let mut removed_self = false; + let channels = { + let mut engine = state.crdt_engine.lock().await; + let channels = engine.get_channels(&community_id).unwrap_or_default(); + + if let Ok(members) = engine.get_members(&community_id) { + if members.iter().any(|member| member.peer_id == local_peer_id) { + if engine.remove_member(&community_id, &local_peer_id).is_ok() { + removed_self = true; + } + } + } + + channels + }; + + if removed_self { + broadcast_sync(&state, &community_id).await; + } + let node_handle = state.node_handle.lock().await; if let Some(ref handle) = *node_handle { - let engine = state.crdt_engine.lock().await; - if let Ok(channels) = engine.get_channels(&community_id) { - for channel in &channels { - let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); - let _ = handle - .command_tx - .send(NodeCommand::Unsubscribe { topic: msg_topic }) - .await; + for channel in &channels { + let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { topic: msg_topic }) + .await; - let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); - let _ = handle - .command_tx - .send(NodeCommand::Unsubscribe { - topic: typing_topic, - }) - .await; - } + let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { + topic: typing_topic, + }) + .await; } let presence_topic = gossip::topic_for_presence(&community_id); @@ -508,8 +594,19 @@ async fn leave_community( topic: presence_topic, }) .await; + + let namespace = format!("dusk/community/{}", community_id); + let _ = handle + .command_tx + .send(NodeCommand::UnregisterRendezvous { namespace }) + .await; } + let mut engine = state.crdt_engine.lock().await; + engine + .remove_community(&community_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + Ok(Json(serde_json::json!({ "ok": true }))) } @@ -631,6 +728,8 @@ async fn create_channel( .await; } + broadcast_sync(&state, &community_id).await; + Ok(Json(channel)) } @@ -1035,7 +1134,13 @@ async fn start_node(State(state): State) -> ApiResult { + // keep topic subscriptions aligned with merged channels + let presence_topic = libp2p::gossipsub::IdentTopic::new( + gossip::topic_for_presence(&community_id), + ); + let _ = swarm_instance + .behaviour_mut() + .gossipsub + .subscribe(&presence_topic); + + for channel in &channels_after_merge { + let messages_topic = libp2p::gossipsub::IdentTopic::new( + gossip::topic_for_messages(&community_id, &channel.id), + ); + let _ = swarm_instance + .behaviour_mut() + .gossipsub + .subscribe(&messages_topic); + + let typing_topic = libp2p::gossipsub::IdentTopic::new( + gossip::topic_for_typing(&community_id, &channel.id), + ); + let _ = swarm_instance + .behaviour_mut() + .gossipsub + .subscribe(&typing_topic); + } + let _ = app_handle.emit("dusk-event", DuskEvent::SyncComplete { - community_id: snapshot.community_id, + community_id, }); } Err(e) => { - log::warn!("failed to merge remote doc for {}: {}", snapshot.community_id, e); + log::warn!("failed to merge remote doc for {}: {}", community_id, e); } } } @@ -1161,6 +1201,29 @@ pub async fn start( pending_discoveries.push(namespace); } } + Some(NodeCommand::UnregisterRendezvous { namespace }) => { + pending_registrations.retain(|ns| ns != &namespace); + pending_discoveries.retain(|ns| ns != &namespace); + if pending_registrations.is_empty() && pending_discoveries.is_empty() { + pending_queued_at = None; + } + registered_namespaces.remove(&namespace); + + if relay_reservation_active { + if let Some(rp) = relay_peer { + match libp2p::rendezvous::Namespace::new(namespace.clone()) { + Ok(ns) => { + swarm_instance.behaviour_mut().rendezvous.unregister(ns, rp); + } + Err(e) => log::warn!( + "invalid rendezvous namespace '{}': {:?}", + namespace, + e + ), + } + } + } + } Some(NodeCommand::GifSearch { request, reply }) => { if let Some(rp) = relay_peer { let request_id = swarm_instance diff --git a/src-tauri/src/storage/disk.rs b/src-tauri/src/storage/disk.rs index 541c1a7..3e74e53 100644 --- a/src-tauri/src/storage/disk.rs +++ b/src-tauri/src/storage/disk.rs @@ -1,9 +1,12 @@ use directories::ProjectDirs; +use rusqlite::types::Value as SqlValue; +use rusqlite::{params, params_from_iter, Connection, OptionalExtension}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::io; use std::path::PathBuf; +use std::time::Duration; use crate::protocol::community::CommunityMeta; use crate::protocol::identity::{DirectoryEntry, ProfileData, VerificationProof}; @@ -44,9 +47,36 @@ impl Default for UserSettings { } } -// file-based persistence for identity, documents, and community metadata +#[derive(Debug, Clone)] +pub struct DmSearchParams { + pub query: Option, + pub from_peer: Option, + pub media_filter: Option, + pub mentions_only: bool, + pub date_after: Option, + pub date_before: Option, + pub limit: usize, +} + +impl Default for DmSearchParams { + fn default() -> Self { + Self { + query: None, + from_peer: None, + media_filter: None, + mentions_only: false, + date_after: None, + date_before: None, + limit: 200, + } + } +} + +// sqlite-based persistence for identity, documents, and direct messages pub struct DiskStorage { base_dir: PathBuf, + db_path: PathBuf, + fts_enabled: bool, } impl DiskStorage { @@ -56,197 +86,729 @@ impl DiskStorage { let base_dir = project_dirs.data_dir().to_path_buf(); - // ensure the directory tree exists + // keep legacy directories so we can migrate existing installs safely fs::create_dir_all(base_dir.join("identity"))?; fs::create_dir_all(base_dir.join("communities"))?; fs::create_dir_all(base_dir.join("directory"))?; fs::create_dir_all(base_dir.join("dms"))?; - Ok(Self { base_dir }) + let db_path = base_dir.join("storage.sqlite3"); + let conn = Self::open_conn_at(&db_path)?; + conn.execute_batch( + r#" + PRAGMA journal_mode = WAL; + PRAGMA synchronous = NORMAL; + PRAGMA foreign_keys = ON; + + CREATE TABLE IF NOT EXISTS app_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS key_value ( + key TEXT PRIMARY KEY, + value BLOB NOT NULL + ); + + CREATE TABLE IF NOT EXISTS profile ( + id INTEGER PRIMARY KEY CHECK (id = 1), + display_name TEXT NOT NULL, + bio TEXT NOT NULL, + created_at INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + json TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS verification_proof ( + id INTEGER PRIMARY KEY CHECK (id = 1), + json TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS community_documents ( + community_id TEXT PRIMARY KEY, + document BLOB NOT NULL + ); + + CREATE TABLE IF NOT EXISTS community_meta ( + community_id TEXT PRIMARY KEY, + meta_json TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS directory_entries ( + peer_id TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + bio TEXT NOT NULL, + public_key TEXT NOT NULL, + last_seen INTEGER NOT NULL, + is_friend INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS dm_conversations ( + conversation_id TEXT PRIMARY KEY, + peer_id TEXT NOT NULL, + display_name TEXT NOT NULL, + last_message TEXT, + last_message_time INTEGER, + unread_count INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS dm_messages ( + id TEXT PRIMARY KEY, + conversation_id TEXT NOT NULL, + from_peer TEXT NOT NULL, + to_peer TEXT NOT NULL, + from_display_name TEXT NOT NULL, + content TEXT NOT NULL, + timestamp INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_community_documents_id + ON community_documents (community_id); + + CREATE INDEX IF NOT EXISTS idx_directory_last_seen + ON directory_entries (last_seen DESC); + + CREATE INDEX IF NOT EXISTS idx_dm_conversations_last_message_time + ON dm_conversations (last_message_time DESC); + + CREATE INDEX IF NOT EXISTS idx_dm_messages_conversation_timestamp + ON dm_messages (conversation_id, timestamp DESC); + + CREATE INDEX IF NOT EXISTS idx_dm_messages_conversation_sender + ON dm_messages (conversation_id, from_peer, timestamp DESC); + "#, + ) + .map_err(sqlite_to_io_error)?; + + let fts_enabled = conn + .execute_batch( + r#" + CREATE VIRTUAL TABLE IF NOT EXISTS dm_message_fts USING fts5( + message_id UNINDEXED, + conversation_id UNINDEXED, + content + ); + "#, + ) + .is_ok(); + + drop(conn); + + let storage = Self { + base_dir, + db_path, + fts_enabled, + }; + + storage.migrate_legacy_if_needed()?; + + Ok(storage) + } + + fn open_conn(&self) -> Result { + Self::open_conn_at(&self.db_path) + } + + fn open_conn_at(db_path: &PathBuf) -> Result { + let conn = Connection::open(db_path).map_err(sqlite_to_io_error)?; + let _ = conn.busy_timeout(Duration::from_secs(5)); + let _ = conn.pragma_update(None, "foreign_keys", "ON"); + Ok(conn) + } + + fn migrate_legacy_if_needed(&self) -> Result<(), io::Error> { + let conn = self.open_conn()?; + let migrated = conn + .query_row( + "SELECT value FROM app_meta WHERE key = 'legacy_migrated'", + [], + |row| row.get::<_, String>(0), + ) + .optional() + .map_err(sqlite_to_io_error)?; + drop(conn); + + if migrated.as_deref() == Some("1") { + return Ok(()); + } + + self.migrate_legacy_files()?; + + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO app_meta (key, value) VALUES ('legacy_migrated', '1') + ON CONFLICT(key) DO UPDATE SET value = excluded.value", + [], + ) + .map_err(sqlite_to_io_error)?; + + // best-effort cleanup so migrated installs no longer carry stale json files + let _ = self.cleanup_legacy_files(); + + Ok(()) + } + + fn migrate_legacy_files(&self) -> Result<(), io::Error> { + self.migrate_legacy_identity()?; + self.migrate_legacy_communities()?; + self.migrate_legacy_directory()?; + self.migrate_legacy_dms()?; + + if self.fts_enabled { + self.rebuild_dm_fts_index()?; + } + + Ok(()) + } + + fn migrate_legacy_identity(&self) -> Result<(), io::Error> { + let keypair_path = self.base_dir.join("identity/keypair.bin"); + if keypair_path.exists() { + let keypair = fs::read(keypair_path)?; + let _ = self.save_keypair(&keypair); + } + + let profile_path = self.base_dir.join("identity/profile.json"); + if profile_path.exists() { + if let Ok(data) = fs::read_to_string(&profile_path) { + if let Ok(profile) = serde_json::from_str::(&data) { + let _ = self.save_profile(&profile); + } else if let Ok(value) = serde_json::from_str::(&data) { + if let Some(display_name) = value["display_name"].as_str() { + let mut profile = ProfileData::default(); + profile.display_name = display_name.to_string(); + if let Some(bio) = value["bio"].as_str() { + profile.bio = bio.to_string(); + } + if let Some(created_at) = value["created_at"].as_u64() { + profile.created_at = created_at; + } + let _ = self.save_profile(&profile); + } + } + } + } + + let settings_path = self.base_dir.join("identity/settings.json"); + if settings_path.exists() { + if let Ok(data) = fs::read_to_string(settings_path) { + if let Ok(settings) = serde_json::from_str::(&data) { + let _ = self.save_settings(&settings); + } + } + } + + let proof_path = self.base_dir.join("identity/verification.json"); + if proof_path.exists() { + if let Ok(data) = fs::read_to_string(proof_path) { + if let Ok(proof) = serde_json::from_str::(&data) { + let _ = self.save_verification_proof(&proof); + } + } + } + + Ok(()) + } + + fn migrate_legacy_communities(&self) -> Result<(), io::Error> { + let communities_dir = self.base_dir.join("communities"); + if !communities_dir.exists() { + return Ok(()); + } + + for entry in fs::read_dir(communities_dir)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + + let community_id = match entry.file_name().into_string() { + Ok(id) => id, + Err(_) => continue, + }; + + let doc_path = entry.path().join("document.bin"); + if doc_path.exists() { + if let Ok(bytes) = fs::read(&doc_path) { + let _ = self.save_document(&community_id, &bytes); + } + } + + let meta_path = entry.path().join("meta.json"); + if meta_path.exists() { + if let Ok(data) = fs::read_to_string(&meta_path) { + if let Ok(meta) = serde_json::from_str::(&data) { + let _ = self.save_community_meta(&meta); + } + } + } + } + + Ok(()) + } + + fn migrate_legacy_directory(&self) -> Result<(), io::Error> { + let directory_path = self.base_dir.join("directory/peers.json"); + if !directory_path.exists() { + return Ok(()); + } + + let data = fs::read_to_string(directory_path)?; + let entries = + serde_json::from_str::>(&data).unwrap_or_default(); + + for entry in entries.values() { + let _ = self.save_directory_entry(entry); + } + + Ok(()) + } + + fn migrate_legacy_dms(&self) -> Result<(), io::Error> { + let dms_dir = self.base_dir.join("dms"); + if !dms_dir.exists() { + return Ok(()); + } + + for entry in fs::read_dir(dms_dir)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + + let conversation_id = match entry.file_name().into_string() { + Ok(id) => id, + Err(_) => continue, + }; + + let meta_path = entry.path().join("meta.json"); + if meta_path.exists() { + if let Ok(data) = fs::read_to_string(&meta_path) { + if let Ok(meta) = serde_json::from_str::(&data) { + let _ = self.save_dm_conversation(&conversation_id, &meta); + } + } + } + + let messages_path = entry.path().join("messages.json"); + if messages_path.exists() { + if let Ok(data) = fs::read_to_string(&messages_path) { + if let Ok(messages) = serde_json::from_str::>(&data) { + for message in &messages { + let _ = self.append_dm_message(&conversation_id, message); + } + } + } + } + } + + Ok(()) + } + + fn rebuild_dm_fts_index(&self) -> Result<(), io::Error> { + if !self.fts_enabled { + return Ok(()); + } + + let conn = self.open_conn()?; + conn.execute("DELETE FROM dm_message_fts", []) + .map_err(sqlite_to_io_error)?; + + let mut stmt = conn + .prepare( + "SELECT id, conversation_id, content + FROM dm_messages", + ) + .map_err(sqlite_to_io_error)?; + + let rows = stmt + .query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )) + }) + .map_err(sqlite_to_io_error)?; + + for row in rows { + let (id, conversation_id, content) = row.map_err(sqlite_to_io_error)?; + conn.execute( + "INSERT INTO dm_message_fts (message_id, conversation_id, content) + VALUES (?1, ?2, ?3)", + params![id, conversation_id, content], + ) + .map_err(sqlite_to_io_error)?; + } + + Ok(()) + } + + fn cleanup_legacy_files(&self) -> Result<(), io::Error> { + remove_if_exists(self.base_dir.join("identity/keypair.bin"))?; + remove_if_exists(self.base_dir.join("identity/profile.json"))?; + remove_if_exists(self.base_dir.join("identity/settings.json"))?; + remove_if_exists(self.base_dir.join("identity/verification.json"))?; + remove_if_exists(self.base_dir.join("directory/peers.json"))?; + + clear_dir(self.base_dir.join("communities"))?; + clear_dir(self.base_dir.join("dms"))?; + + Ok(()) } // -- identity -- pub fn save_keypair(&self, keypair_bytes: &[u8]) -> Result<(), io::Error> { - fs::write(self.base_dir.join("identity/keypair.bin"), keypair_bytes) + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO key_value (key, value) VALUES ('identity_keypair', ?1) + ON CONFLICT(key) DO UPDATE SET value = excluded.value", + params![keypair_bytes], + ) + .map_err(sqlite_to_io_error)?; + Ok(()) } pub fn load_keypair(&self) -> Result, io::Error> { - fs::read(self.base_dir.join("identity/keypair.bin")) + let conn = self.open_conn()?; + let value = conn + .query_row( + "SELECT value FROM key_value WHERE key = 'identity_keypair'", + [], + |row| row.get::<_, Vec>(0), + ) + .optional() + .map_err(sqlite_to_io_error)?; + + value.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "keypair not found")) } pub fn save_display_name(&self, name: &str) -> Result<(), io::Error> { - let profile = serde_json::json!({ "display_name": name }); - fs::write( - self.base_dir.join("identity/profile.json"), - serde_json::to_string_pretty(&profile).unwrap(), - ) + let mut profile = self.load_profile().unwrap_or_default(); + profile.display_name = name.to_string(); + self.save_profile(&profile) } pub fn load_display_name(&self) -> Result { - let data = fs::read_to_string(self.base_dir.join("identity/profile.json"))?; - let profile: serde_json::Value = serde_json::from_str(&data) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - profile["display_name"] - .as_str() - .map(|s| s.to_string()) - .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing display_name")) + self.load_profile().map(|p| p.display_name) } // full profile data with bio and created_at pub fn save_profile(&self, profile: &ProfileData) -> Result<(), io::Error> { - let json = serde_json::to_string_pretty(profile) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("identity/profile.json"), json) + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO profile (id, display_name, bio, created_at) + VALUES (1, ?1, ?2, ?3) + ON CONFLICT(id) DO UPDATE SET + display_name = excluded.display_name, + bio = excluded.bio, + created_at = excluded.created_at", + params![profile.display_name, profile.bio, profile.created_at as i64], + ) + .map_err(sqlite_to_io_error)?; + Ok(()) } pub fn load_profile(&self) -> Result { - let path = self.base_dir.join("identity/profile.json"); - if !path.exists() { - return Ok(ProfileData::default()); - } - let data = fs::read_to_string(path)?; - serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + let conn = self.open_conn()?; + let profile = conn + .query_row( + "SELECT display_name, bio, created_at FROM profile WHERE id = 1", + [], + |row| { + let created_at: i64 = row.get(2)?; + Ok(ProfileData { + display_name: row.get(0)?, + bio: row.get(1)?, + created_at: created_at.max(0) as u64, + }) + }, + ) + .optional() + .map_err(sqlite_to_io_error)?; + + Ok(profile.unwrap_or_default()) } // check if identity exists without loading it pub fn has_identity(&self) -> bool { - self.base_dir.join("identity/keypair.bin").exists() + self.load_keypair().is_ok() } // -- verification proof -- pub fn save_verification_proof(&self, proof: &VerificationProof) -> Result<(), io::Error> { - let json = serde_json::to_string_pretty(proof) + let json = serde_json::to_string(proof) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("identity/verification.json"), json) + + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO verification_proof (id, json) + VALUES (1, ?1) + ON CONFLICT(id) DO UPDATE SET json = excluded.json", + params![json], + ) + .map_err(sqlite_to_io_error)?; + + Ok(()) } pub fn load_verification_proof(&self) -> Result, io::Error> { - let path = self.base_dir.join("identity/verification.json"); - if !path.exists() { - return Ok(None); + let conn = self.open_conn()?; + let json = conn + .query_row( + "SELECT json FROM verification_proof WHERE id = 1", + [], + |row| row.get::<_, String>(0), + ) + .optional() + .map_err(sqlite_to_io_error)?; + + match json { + Some(data) => { + let proof = serde_json::from_str::(&data) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(Some(proof)) + } + None => Ok(None), } - let data = fs::read_to_string(path)?; - let proof = serde_json::from_str(&data) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Ok(Some(proof)) } // -- automerge documents -- pub fn save_document(&self, community_id: &str, doc_bytes: &[u8]) -> Result<(), io::Error> { - let dir = self.base_dir.join(format!("communities/{}", community_id)); - fs::create_dir_all(&dir)?; - fs::write(dir.join("document.bin"), doc_bytes) + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO community_documents (community_id, document) + VALUES (?1, ?2) + ON CONFLICT(community_id) DO UPDATE SET document = excluded.document", + params![community_id, doc_bytes], + ) + .map_err(sqlite_to_io_error)?; + Ok(()) } pub fn load_document(&self, community_id: &str) -> Result, io::Error> { - fs::read( - self.base_dir - .join(format!("communities/{}/document.bin", community_id)), + let conn = self.open_conn()?; + let bytes = conn + .query_row( + "SELECT document FROM community_documents WHERE community_id = ?1", + params![community_id], + |row| row.get::<_, Vec>(0), + ) + .optional() + .map_err(sqlite_to_io_error)?; + + bytes.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "community document not found")) + } + + pub fn delete_document(&self, community_id: &str) -> Result<(), io::Error> { + let conn = self.open_conn()?; + conn.execute( + "DELETE FROM community_documents WHERE community_id = ?1", + params![community_id], ) + .map_err(sqlite_to_io_error)?; + Ok(()) } pub fn list_communities(&self) -> Result, io::Error> { - let communities_dir = self.base_dir.join("communities"); - if !communities_dir.exists() { - return Ok(Vec::new()); - } + let conn = self.open_conn()?; + let mut stmt = conn + .prepare("SELECT community_id FROM community_documents ORDER BY community_id") + .map_err(sqlite_to_io_error)?; + + let ids = stmt + .query_map([], |row| row.get::<_, String>(0)) + .map_err(sqlite_to_io_error)? + .collect::, _>>() + .map_err(sqlite_to_io_error)?; - let mut ids = Vec::new(); - for entry in fs::read_dir(communities_dir)? { - let entry = entry?; - if entry.file_type()?.is_dir() { - if let Some(name) = entry.file_name().to_str() { - ids.push(name.to_string()); - } - } - } Ok(ids) } // -- community metadata cache -- pub fn save_community_meta(&self, meta: &CommunityMeta) -> Result<(), io::Error> { - let dir = self.base_dir.join(format!("communities/{}", meta.id)); - fs::create_dir_all(&dir)?; - let json = serde_json::to_string_pretty(meta) + let json = serde_json::to_string(meta) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(dir.join("meta.json"), json) + + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO community_meta (community_id, meta_json) + VALUES (?1, ?2) + ON CONFLICT(community_id) DO UPDATE SET meta_json = excluded.meta_json", + params![meta.id, json], + ) + .map_err(sqlite_to_io_error)?; + + Ok(()) } pub fn load_community_meta(&self, community_id: &str) -> Result { - let data = fs::read_to_string( - self.base_dir - .join(format!("communities/{}/meta.json", community_id)), - )?; + let conn = self.open_conn()?; + let json = conn + .query_row( + "SELECT meta_json FROM community_meta WHERE community_id = ?1", + params![community_id], + |row| row.get::<_, String>(0), + ) + .optional() + .map_err(sqlite_to_io_error)?; + + let data = json + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "community meta not found"))?; serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) } + pub fn delete_community_meta(&self, community_id: &str) -> Result<(), io::Error> { + let conn = self.open_conn()?; + conn.execute( + "DELETE FROM community_meta WHERE community_id = ?1", + params![community_id], + ) + .map_err(sqlite_to_io_error)?; + Ok(()) + } + // -- user settings -- pub fn save_settings(&self, settings: &UserSettings) -> Result<(), io::Error> { - let json = serde_json::to_string_pretty(settings) + let json = serde_json::to_string(settings) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("identity/settings.json"), json) + + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO settings (id, json) + VALUES (1, ?1) + ON CONFLICT(id) DO UPDATE SET json = excluded.json", + params![json], + ) + .map_err(sqlite_to_io_error)?; + + Ok(()) } pub fn load_settings(&self) -> Result { - let path = self.base_dir.join("identity/settings.json"); - if !path.exists() { - return Ok(UserSettings::default()); - } + let conn = self.open_conn()?; + let json = conn + .query_row("SELECT json FROM settings WHERE id = 1", [], |row| { + row.get::<_, String>(0) + }) + .optional() + .map_err(sqlite_to_io_error)?; - let data = fs::read_to_string(path)?; - serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + match json { + Some(data) => serde_json::from_str(&data) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)), + None => Ok(UserSettings::default()), + } } // -- peer directory -- // save a discovered peer to the local directory pub fn save_directory_entry(&self, entry: &DirectoryEntry) -> Result<(), io::Error> { - let mut entries = self.load_directory().unwrap_or_default(); - entries.insert(entry.peer_id.clone(), entry.clone()); - let json = serde_json::to_string_pretty(&entries) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("directory/peers.json"), json) + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO directory_entries ( + peer_id, display_name, bio, public_key, last_seen, is_friend + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) + ON CONFLICT(peer_id) DO UPDATE SET + display_name = excluded.display_name, + bio = excluded.bio, + public_key = excluded.public_key, + last_seen = excluded.last_seen, + is_friend = excluded.is_friend", + params![ + entry.peer_id, + entry.display_name, + entry.bio, + entry.public_key, + entry.last_seen as i64, + if entry.is_friend { 1_i64 } else { 0_i64 } + ], + ) + .map_err(sqlite_to_io_error)?; + + Ok(()) } // load the entire peer directory pub fn load_directory(&self) -> Result, io::Error> { - let path = self.base_dir.join("directory/peers.json"); - if !path.exists() { - return Ok(HashMap::new()); + let conn = self.open_conn()?; + let mut stmt = conn + .prepare( + "SELECT peer_id, display_name, bio, public_key, last_seen, is_friend + FROM directory_entries", + ) + .map_err(sqlite_to_io_error)?; + + let rows = stmt + .query_map([], |row| { + let peer_id: String = row.get(0)?; + let last_seen: i64 = row.get(4)?; + let is_friend: i64 = row.get(5)?; + + Ok(( + peer_id.clone(), + DirectoryEntry { + peer_id, + display_name: row.get(1)?, + bio: row.get(2)?, + public_key: row.get(3)?, + last_seen: last_seen.max(0) as u64, + is_friend: is_friend != 0, + }, + )) + }) + .map_err(sqlite_to_io_error)?; + + let mut entries = HashMap::new(); + for row in rows { + let (peer_id, entry) = row.map_err(sqlite_to_io_error)?; + entries.insert(peer_id, entry); } - let data = fs::read_to_string(path)?; - serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + + Ok(entries) } // remove a peer from the directory pub fn remove_directory_entry(&self, peer_id: &str) -> Result<(), io::Error> { - let mut entries = self.load_directory().unwrap_or_default(); - entries.remove(peer_id); - let json = serde_json::to_string_pretty(&entries) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("directory/peers.json"), json) + let conn = self.open_conn()?; + conn.execute( + "DELETE FROM directory_entries WHERE peer_id = ?1", + params![peer_id], + ) + .map_err(sqlite_to_io_error)?; + Ok(()) } // toggle friend status for a peer pub fn set_friend_status(&self, peer_id: &str, is_friend: bool) -> Result<(), io::Error> { - let mut entries = self.load_directory().unwrap_or_default(); - if let Some(entry) = entries.get_mut(peer_id) { - entry.is_friend = is_friend; - let json = serde_json::to_string_pretty(&entries) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(self.base_dir.join("directory/peers.json"), json) - } else { - Err(io::Error::new( + let conn = self.open_conn()?; + let changed = conn + .execute( + "UPDATE directory_entries + SET is_friend = ?2 + WHERE peer_id = ?1", + params![peer_id, if is_friend { 1_i64 } else { 0_i64 }], + ) + .map_err(sqlite_to_io_error)?; + + if changed == 0 { + return Err(io::Error::new( io::ErrorKind::NotFound, "peer not found in directory", - )) + )); } + + Ok(()) } // -- direct messages -- @@ -257,11 +819,30 @@ impl DiskStorage { conversation_id: &str, meta: &DMConversationMeta, ) -> Result<(), io::Error> { - let dir = self.base_dir.join(format!("dms/{}", conversation_id)); - fs::create_dir_all(&dir)?; - let json = serde_json::to_string_pretty(meta) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(dir.join("meta.json"), json) + let conn = self.open_conn()?; + conn.execute( + "INSERT INTO dm_conversations ( + conversation_id, peer_id, display_name, last_message, last_message_time, unread_count + ) + VALUES (?1, ?2, ?3, ?4, ?5, ?6) + ON CONFLICT(conversation_id) DO UPDATE SET + peer_id = excluded.peer_id, + display_name = excluded.display_name, + last_message = excluded.last_message, + last_message_time = excluded.last_message_time, + unread_count = excluded.unread_count", + params![ + conversation_id, + meta.peer_id, + meta.display_name, + meta.last_message, + meta.last_message_time.map(|ts| ts as i64), + meta.unread_count as i64 + ], + ) + .map_err(sqlite_to_io_error)?; + + Ok(()) } // load a single dm conversation's metadata @@ -269,47 +850,104 @@ impl DiskStorage { &self, conversation_id: &str, ) -> Result { - let path = self - .base_dir - .join(format!("dms/{}/meta.json", conversation_id)); - let data = fs::read_to_string(path)?; - serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + let conn = self.open_conn()?; + + let meta = conn + .query_row( + "SELECT peer_id, display_name, last_message, last_message_time, unread_count + FROM dm_conversations + WHERE conversation_id = ?1", + params![conversation_id], + |row| { + let last_message_time: Option = row.get(3)?; + let unread_count: i64 = row.get(4)?; + + Ok(DMConversationMeta { + peer_id: row.get(0)?, + display_name: row.get(1)?, + last_message: row.get(2)?, + last_message_time: last_message_time.map(|ts| ts.max(0) as u64), + unread_count: unread_count.max(0) as u32, + }) + }, + ) + .optional() + .map_err(sqlite_to_io_error)?; + + meta.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "dm conversation not found")) } // load all dm conversations pub fn load_all_dm_conversations( &self, ) -> Result, io::Error> { - let dms_dir = self.base_dir.join("dms"); - if !dms_dir.exists() { - return Ok(Vec::new()); - } + let conn = self.open_conn()?; + let mut stmt = conn + .prepare( + "SELECT + conversation_id, + peer_id, + display_name, + last_message, + last_message_time, + unread_count + FROM dm_conversations + ORDER BY COALESCE(last_message_time, 0) DESC, display_name ASC", + ) + .map_err(sqlite_to_io_error)?; + + let rows = stmt + .query_map([], |row| { + let last_message_time: Option = row.get(4)?; + let unread_count: i64 = row.get(5)?; + + Ok(( + row.get::<_, String>(0)?, + DMConversationMeta { + peer_id: row.get(1)?, + display_name: row.get(2)?, + last_message: row.get(3)?, + last_message_time: last_message_time.map(|ts| ts.max(0) as u64), + unread_count: unread_count.max(0) as u32, + }, + )) + }) + .map_err(sqlite_to_io_error)?; let mut conversations = Vec::new(); - for entry in fs::read_dir(dms_dir)? { - let entry = entry?; - if entry.file_type()?.is_dir() { - if let Some(conv_id) = entry.file_name().to_str() { - let meta_path = entry.path().join("meta.json"); - if meta_path.exists() { - if let Ok(data) = fs::read_to_string(&meta_path) { - if let Ok(meta) = serde_json::from_str::(&data) { - conversations.push((conv_id.to_string(), meta)); - } - } - } - } - } + for row in rows { + conversations.push(row.map_err(sqlite_to_io_error)?); } + Ok(conversations) } // remove a dm conversation and all its messages pub fn remove_dm_conversation(&self, conversation_id: &str) -> Result<(), io::Error> { - let dir = self.base_dir.join(format!("dms/{}", conversation_id)); - if dir.exists() { - fs::remove_dir_all(&dir)?; + let conn = self.open_conn()?; + let tx = conn.unchecked_transaction().map_err(sqlite_to_io_error)?; + + tx.execute( + "DELETE FROM dm_messages WHERE conversation_id = ?1", + params![conversation_id], + ) + .map_err(sqlite_to_io_error)?; + + tx.execute( + "DELETE FROM dm_conversations WHERE conversation_id = ?1", + params![conversation_id], + ) + .map_err(sqlite_to_io_error)?; + + if self.fts_enabled { + tx.execute( + "DELETE FROM dm_message_fts WHERE conversation_id = ?1", + params![conversation_id], + ) + .map_err(sqlite_to_io_error)?; } + + tx.commit().map_err(sqlite_to_io_error)?; Ok(()) } @@ -319,22 +957,47 @@ impl DiskStorage { conversation_id: &str, message: &DirectMessage, ) -> Result<(), io::Error> { - let dir = self.base_dir.join(format!("dms/{}", conversation_id)); - fs::create_dir_all(&dir)?; + let conn = self.open_conn()?; + let tx = conn.unchecked_transaction().map_err(sqlite_to_io_error)?; - let messages_path = dir.join("messages.json"); - let mut messages: Vec = if messages_path.exists() { - let data = fs::read_to_string(&messages_path)?; - serde_json::from_str(&data).unwrap_or_default() - } else { - Vec::new() - }; + // ensure a placeholder conversation exists so writes never fail on first contact + tx.execute( + "INSERT INTO dm_conversations ( + conversation_id, peer_id, display_name, last_message, last_message_time, unread_count + ) VALUES (?1, ?2, ?3, NULL, NULL, 0) + ON CONFLICT(conversation_id) DO NOTHING", + params![conversation_id, message.from_peer, message.from_display_name], + ) + .map_err(sqlite_to_io_error)?; - messages.push(message.clone()); + let inserted = tx + .execute( + "INSERT OR IGNORE INTO dm_messages ( + id, conversation_id, from_peer, to_peer, from_display_name, content, timestamp + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + message.id, + conversation_id, + message.from_peer, + message.to_peer, + message.from_display_name, + message.content, + message.timestamp as i64 + ], + ) + .map_err(sqlite_to_io_error)?; - let json = serde_json::to_string_pretty(&messages) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - fs::write(&messages_path, json) + if inserted > 0 && self.fts_enabled { + tx.execute( + "INSERT INTO dm_message_fts (message_id, conversation_id, content) + VALUES (?1, ?2, ?3)", + params![message.id, conversation_id, message.content], + ) + .map_err(sqlite_to_io_error)?; + } + + tx.commit().map_err(sqlite_to_io_error)?; + Ok(()) } // load dm messages with optional pagination @@ -344,64 +1007,302 @@ impl DiskStorage { before: Option, limit: usize, ) -> Result, io::Error> { - let messages_path = self - .base_dir - .join(format!("dms/{}/messages.json", conversation_id)); - if !messages_path.exists() { + if limit == 0 { return Ok(Vec::new()); } - let data = fs::read_to_string(&messages_path)?; - let messages: Vec = serde_json::from_str(&data) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let conn = self.open_conn()?; + let mut sql = String::from( + "SELECT id, from_peer, to_peer, from_display_name, content, timestamp + FROM dm_messages + WHERE conversation_id = ?1", + ); - let filtered: Vec = if let Some(before_ts) = before { - messages - .into_iter() - .filter(|m| m.timestamp < before_ts) - .collect() - } else { - messages - }; + let mut values: Vec = vec![SqlValue::Text(conversation_id.to_string())]; - // return the last `limit` messages (most recent) - let start = if filtered.len() > limit { - filtered.len() - limit - } else { - 0 - }; - Ok(filtered[start..].to_vec()) + if let Some(before_ts) = before { + sql.push_str(" AND timestamp < ?2"); + values.push(SqlValue::Integer(before_ts as i64)); + } + + sql.push_str(" ORDER BY timestamp DESC LIMIT ?"); + values.push(SqlValue::Integer(limit as i64)); + + let mut stmt = conn.prepare(&sql).map_err(sqlite_to_io_error)?; + let rows = stmt + .query_map(params_from_iter(values.iter()), direct_message_from_row) + .map_err(sqlite_to_io_error)?; + + let mut messages = Vec::new(); + for row in rows { + messages.push(row.map_err(sqlite_to_io_error)?); + } + + // keep frontend contract stable with ascending timestamps + messages.reverse(); + Ok(messages) } - // wipe all user data - identity, communities, directory, dms, settings + // search dm messages with filters and indexed query execution + pub fn search_dm_messages( + &self, + conversation_id: &str, + params: &DmSearchParams, + ) -> Result, io::Error> { + let limit = params.limit.clamp(1, 1000); + let query = params + .query + .as_deref() + .map(str::trim) + .filter(|q| !q.is_empty()); + + let fts_query = query.and_then(build_fts_query); + + let conn = self.open_conn()?; + let mut sql; + let mut values: Vec = Vec::new(); + + if self.fts_enabled && fts_query.is_some() { + sql = String::from( + "SELECT + m.id, + m.from_peer, + m.to_peer, + m.from_display_name, + m.content, + m.timestamp + FROM dm_messages m + JOIN dm_message_fts f ON f.message_id = m.id + WHERE m.conversation_id = ?1 + AND f.conversation_id = ?2 + AND f.content MATCH ?3", + ); + values.push(SqlValue::Text(conversation_id.to_string())); + values.push(SqlValue::Text(conversation_id.to_string())); + values.push(SqlValue::Text(fts_query.unwrap_or_default())); + } else { + sql = String::from( + "SELECT + m.id, + m.from_peer, + m.to_peer, + m.from_display_name, + m.content, + m.timestamp + FROM dm_messages m + WHERE m.conversation_id = ?1", + ); + values.push(SqlValue::Text(conversation_id.to_string())); + + if let Some(text_query) = query { + sql.push_str(" AND lower(m.content) LIKE lower(?)"); + values.push(SqlValue::Text(format!("%{}%", text_query))); + } + } + + if let Some(from_peer) = params + .from_peer + .as_deref() + .map(str::trim) + .filter(|id| !id.is_empty()) + { + sql.push_str(" AND m.from_peer = ?"); + values.push(SqlValue::Text(from_peer.to_string())); + } + + if params.mentions_only { + sql.push_str(" AND m.content LIKE '%<@%'"); + } + + if let Some(after) = params.date_after { + sql.push_str(" AND m.timestamp >= ?"); + values.push(SqlValue::Integer(after as i64)); + } + + if let Some(before) = params.date_before { + sql.push_str(" AND m.timestamp <= ?"); + values.push(SqlValue::Integer(before as i64)); + } + + if let Some(media_filter) = params.media_filter.as_deref() { + append_media_filter(&mut sql, &mut values, media_filter); + } + + sql.push_str(" ORDER BY m.timestamp DESC LIMIT ?"); + values.push(SqlValue::Integer(limit as i64)); + + let mut stmt = conn.prepare(&sql).map_err(sqlite_to_io_error)?; + let rows = stmt + .query_map(params_from_iter(values.iter()), direct_message_from_row) + .map_err(sqlite_to_io_error)?; + + let mut messages = Vec::new(); + for row in rows { + messages.push(row.map_err(sqlite_to_io_error)?); + } + + messages.reverse(); + Ok(messages) + } + + // wipe all user data // used when resetting identity to leave no traces on this client pub fn wipe_all_data(&self) -> Result<(), io::Error> { - let identity_dir = self.base_dir.join("identity"); - if identity_dir.exists() { - fs::remove_dir_all(&identity_dir)?; + let conn = self.open_conn()?; + + conn.execute("DELETE FROM key_value", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM profile", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM settings", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM verification_proof", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM community_documents", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM community_meta", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM directory_entries", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM dm_messages", []) + .map_err(sqlite_to_io_error)?; + conn.execute("DELETE FROM dm_conversations", []) + .map_err(sqlite_to_io_error)?; + + if self.fts_enabled { + conn.execute("DELETE FROM dm_message_fts", []) + .map_err(sqlite_to_io_error)?; } - let communities_dir = self.base_dir.join("communities"); - if communities_dir.exists() { - fs::remove_dir_all(&communities_dir)?; - } + // keep migration marker enabled so wiped clients do not re-import old json files + conn.execute( + "INSERT INTO app_meta (key, value) VALUES ('legacy_migrated', '1') + ON CONFLICT(key) DO UPDATE SET value = excluded.value", + [], + ) + .map_err(sqlite_to_io_error)?; - let directory_dir = self.base_dir.join("directory"); - if directory_dir.exists() { - fs::remove_dir_all(&directory_dir)?; - } - - let dms_dir = self.base_dir.join("dms"); - if dms_dir.exists() { - fs::remove_dir_all(&dms_dir)?; - } - - // recreate the directory tree so the app can still function - fs::create_dir_all(self.base_dir.join("identity"))?; - fs::create_dir_all(self.base_dir.join("communities"))?; - fs::create_dir_all(self.base_dir.join("directory"))?; - fs::create_dir_all(self.base_dir.join("dms"))?; - - Ok(()) + self.cleanup_legacy_files() } } + +fn clear_dir(path: PathBuf) -> Result<(), io::Error> { + if !path.exists() { + return Ok(()); + } + + for entry in fs::read_dir(path)? { + let entry = entry?; + if entry.file_type()?.is_dir() { + fs::remove_dir_all(entry.path())?; + } else { + fs::remove_file(entry.path())?; + } + } + + Ok(()) +} + +fn remove_if_exists(path: PathBuf) -> Result<(), io::Error> { + if !path.exists() { + return Ok(()); + } + + if path.is_dir() { + fs::remove_dir_all(path)?; + } else { + fs::remove_file(path)?; + } + + Ok(()) +} + +fn direct_message_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let timestamp: i64 = row.get(5)?; + Ok(DirectMessage { + id: row.get(0)?, + from_peer: row.get(1)?, + to_peer: row.get(2)?, + from_display_name: row.get(3)?, + content: row.get(4)?, + timestamp: timestamp.max(0) as u64, + }) +} + +fn append_media_filter(sql: &mut String, values: &mut Vec, media_filter: &str) { + let normalized = media_filter.to_lowercase(); + + match normalized.as_str() { + "images" => { + append_extension_filter( + sql, + values, + &[ + "png", "jpg", "jpeg", "gif", "webp", "svg", "bmp", "ico", "avif", + ], + ); + } + "videos" => { + append_extension_filter(sql, values, &["mp4", "webm", "mov", "avi", "mkv"]); + } + "links" => { + sql.push_str(" AND (lower(m.content) LIKE ? OR lower(m.content) LIKE ?)"); + values.push(SqlValue::Text("%http://%".to_string())); + values.push(SqlValue::Text("%https://%".to_string())); + } + "files" => { + append_extension_filter( + sql, + values, + &[ + "pdf", "doc", "docx", "xls", "xlsx", "zip", "rar", "7z", "tar", "gz", + ], + ); + } + _ => {} + } +} + +fn append_extension_filter(sql: &mut String, values: &mut Vec, exts: &[&str]) { + if exts.is_empty() { + return; + } + + sql.push_str(" AND ("); + + let mut first = true; + for ext in exts { + if !first { + sql.push_str(" OR "); + } + first = false; + sql.push_str("lower(m.content) LIKE ? OR lower(m.content) LIKE ?"); + values.push(SqlValue::Text(format!("%.{}", ext))); + values.push(SqlValue::Text(format!("%.{}?%", ext))); + } + + sql.push(')'); +} + +fn build_fts_query(query: &str) -> Option { + let terms: Vec = query + .split_whitespace() + .map(|raw| { + raw.chars() + .filter(|c| c.is_ascii_alphanumeric() || *c == '_' || *c == '-') + .collect::() + }) + .filter(|token| !token.is_empty()) + .map(|token| format!("{}*", token)) + .collect(); + + if terms.is_empty() { + return None; + } + + Some(terms.join(" AND ")) +} + +fn sqlite_to_io_error(err: rusqlite::Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, err) +} diff --git a/src-tauri/src/storage/mod.rs b/src-tauri/src/storage/mod.rs index 04f5d3b..1a0b8d3 100644 --- a/src-tauri/src/storage/mod.rs +++ b/src-tauri/src/storage/mod.rs @@ -1,4 +1,5 @@ mod disk; pub use disk::DiskStorage; +pub use disk::DmSearchParams; pub use disk::UserSettings; diff --git a/src-tauri/src/verification/mod.rs b/src-tauri/src/verification/mod.rs index 3041637..e874b86 100644 --- a/src-tauri/src/verification/mod.rs +++ b/src-tauri/src/verification/mod.rs @@ -60,13 +60,17 @@ fn score_timing_variance(segments: &[SegmentData]) -> f64 { return 0.0; } - let intervals: Vec = segments.iter().map(|s| s.click_time - s.start_time).collect(); + let intervals: Vec = segments + .iter() + .map(|s| s.click_time - s.start_time) + .collect(); let mean = intervals.iter().sum::() / intervals.len() as f64; if mean == 0.0 { return 0.0; } - let variance = intervals.iter().map(|v| (v - mean).powi(2)).sum::() / intervals.len() as f64; + let variance = + intervals.iter().map(|v| (v - mean).powi(2)).sum::() / intervals.len() as f64; let cv = variance.sqrt() / mean; // humans have natural variance in click timing @@ -286,8 +290,8 @@ pub fn generate_proof( } // hash the raw challenge data to create a fingerprint - let challenge_bytes = - serde_json::to_vec(challenge).map_err(|e| format!("failed to serialize challenge: {}", e))?; + let challenge_bytes = serde_json::to_vec(challenge) + .map_err(|e| format!("failed to serialize challenge: {}", e))?; let mut hasher = Sha256::new(); hasher.update(&challenge_bytes); let metrics_hash = hex::encode(hasher.finalize()); @@ -329,7 +333,10 @@ fn announcement_sign_payload( .into_bytes() } -pub fn sign_announcement(keypair: &identity::Keypair, announcement: &ProfileAnnouncement) -> String { +pub fn sign_announcement( + keypair: &identity::Keypair, + announcement: &ProfileAnnouncement, +) -> String { let metrics_hash = announcement .verification_proof .as_ref() diff --git a/src/App.tsx b/src/App.tsx index e6f04b6..b340fc8 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -141,6 +141,73 @@ const App: Component = () => { const [inviteCode, setInviteCode] = createSignal(""); const [inviteLoading, setInviteLoading] = createSignal(false); const [inviteCopied, setInviteCopied] = createSignal(false); + let communityLoadSeq = 0; + + async function hydrateCommunityState( + communityId: string, + preserveActiveChannel: boolean, + reloadMessagesOnStableChannel = false, + ) { + if (!tauriAvailable()) return; + + const loadSeq = ++communityLoadSeq; + + try { + const [chs, cats, mems] = await Promise.all([ + tauri.getChannels(communityId), + tauri.getCategories(communityId), + tauri.getMembers(communityId), + ]); + + if (loadSeq !== communityLoadSeq || activeCommunityId() !== communityId) { + return; + } + + setChannels(chs); + setCategories(cats); + setMembers(mems); + + const currentChannelId = activeChannelId(); + const activeStillExists = + !!currentChannelId && chs.some((channel) => channel.id === currentChannelId); + + let nextChannelId: string | null = null; + if (preserveActiveChannel && activeStillExists) { + nextChannelId = currentChannelId; + } else if (chs.length > 0) { + const last = getLastChannel(communityId); + const restored = !!last && chs.some((channel) => channel.id === last); + nextChannelId = restored ? last : chs[0].id; + } + + if (nextChannelId !== currentChannelId) { + setActiveChannel(nextChannelId); + if (!nextChannelId) { + clearMessages(); + } + return; + } + + if (!nextChannelId) { + clearMessages(); + return; + } + + if (reloadMessagesOnStableChannel) { + const msgs = await tauri.getMessages(nextChannelId); + if ( + loadSeq !== communityLoadSeq || + activeCommunityId() !== communityId || + activeChannelId() !== nextChannelId + ) { + return; + } + setMessages(msgs); + } + } catch (e) { + console.error("failed to hydrate community state:", e); + } + } // react to community switches by loading channels, members, and selecting first channel createEffect( @@ -156,28 +223,7 @@ const App: Component = () => { } if (tauriAvailable()) { - try { - const [chs, cats] = await Promise.all([ - tauri.getChannels(communityId), - tauri.getCategories(communityId), - ]); - setChannels(chs); - setCategories(cats); - - if (chs.length > 0) { - const last = getLastChannel(communityId); - const restored = last && chs.some((c) => c.id === last); - setActiveChannel(restored ? last : chs[0].id); - } else { - setActiveChannel(null); - clearMessages(); - } - - const mems = await tauri.getMembers(communityId); - setMembers(mems); - } catch (e) { - console.error("failed to load community data:", e); - } + await hydrateCommunityState(communityId, false); } }), ); @@ -406,9 +452,7 @@ const App: Component = () => { // if the node itself has shut down (handled by stop_node command) break; case "sync_complete": - if (event.payload.community_id === activeCommunityId()) { - reloadCurrentChannel(); - } + void handleSyncComplete(event.payload.community_id); break; case "profile_received": // update our local directory cache when a peer announces their profile @@ -477,14 +521,18 @@ const App: Component = () => { } } - async function reloadCurrentChannel() { - const channelId = activeChannelId(); - if (!channelId || !tauriAvailable()) return; + async function handleSyncComplete(communityId: string) { + if (!tauriAvailable()) return; + try { - const msgs = await tauri.getMessages(channelId); - setMessages(msgs); + const allCommunities = await tauri.getCommunities(); + setCommunities(allCommunities); } catch (e) { - console.error("failed to reload messages:", e); + console.error("failed to refresh communities after sync:", e); + } + + if (communityId === activeCommunityId()) { + await hydrateCommunityState(communityId, true, true); } } diff --git a/src/components/chat/DMSearchPanel.tsx b/src/components/chat/DMSearchPanel.tsx index 67fb502..cc0523d 100644 --- a/src/components/chat/DMSearchPanel.tsx +++ b/src/components/chat/DMSearchPanel.tsx @@ -1,5 +1,13 @@ import type { Component } from "solid-js"; -import { createSignal, createMemo, onMount, Show, For } from "solid-js"; +import { + createSignal, + createMemo, + createEffect, + onCleanup, + onMount, + Show, + For, +} from "solid-js"; import { Search, X, @@ -13,76 +21,54 @@ import { ChevronUp, Loader2, } from "lucide-solid"; -import type { ChatMessage, DirectMessage } from "../../lib/types"; +import type { + DMSearchFrom, + DMSearchMedia, + DirectMessage, +} from "../../lib/types"; import { formatTime, formatDaySeparator } from "../../lib/utils"; -import { extractMentions } from "../../lib/mentions"; import * as tauri from "../../lib/tauri"; -// regex patterns for detecting media in message content -const IMAGE_REGEX = /\.(png|jpe?g|gif|webp|svg|bmp|ico|avif)(\?[^\s]*)?$/i; -const VIDEO_REGEX = /\.(mp4|webm|mov|avi|mkv)(\?[^\s]*)?$/i; -const LINK_REGEX = /https?:\/\/[^\s]+/i; -const FILE_REGEX = /\.(pdf|doc|docx|xls|xlsx|zip|rar|7z|tar|gz)(\?[^\s]*)?$/i; - -// upper bound so we pull the entire conversation from disk -const ALL_MESSAGES_LIMIT = 1_000_000; - -type MediaFilter = "images" | "videos" | "links" | "files"; -type FilterFrom = "anyone" | "me" | "them"; +const SEARCH_LIMIT = 300; +const RESULT_ROW_HEIGHT = 56; +const RESULT_OVERSCAN = 6; interface DMSearchPanelProps { peerId: string; - myPeerId: string; peerName: string; onClose: () => void; - onJumpToMessage: (messageId: string, allMessages: DirectMessage[]) => void; + onJumpToMessage: (messageId: string, timestamp: number) => void; } const DMSearchPanel: Component = (props) => { const [query, setQuery] = createSignal(""); - const [fromFilter, setFromFilter] = createSignal("anyone"); - const [mediaFilter, setMediaFilter] = createSignal(null); + const [fromFilter, setFromFilter] = createSignal("anyone"); + const [mediaFilter, setMediaFilter] = createSignal(null); const [mentionsOnly, setMentionsOnly] = createSignal(false); const [dateAfter, setDateAfter] = createSignal(""); const [dateBefore, setDateBefore] = createSignal(""); const [showFilters, setShowFilters] = createSignal(false); - - // full conversation loaded from disk for searching - const [allMessages, setAllMessages] = createSignal([]); - const [loading, setLoading] = createSignal(true); + const [results, setResults] = createSignal([]); + const [loading, setLoading] = createSignal(false); + const [resultScrollTop, setResultScrollTop] = createSignal(0); + const [resultViewportHeight, setResultViewportHeight] = createSignal(320); let inputRef: HTMLInputElement | undefined; + let resultsRef: HTMLDivElement | undefined; + let searchDebounceTimer: ReturnType | undefined; + let activeSearchId = 0; - // load entire conversation history from disk on mount - onMount(async () => { - try { - const msgs = await tauri.getDMMessages( - props.peerId, - undefined, - ALL_MESSAGES_LIMIT, - ); - setAllMessages(msgs); - } catch (e) { - console.error("failed to load all dm messages for search:", e); - } finally { - setLoading(false); - // focus after loading completes - inputRef?.focus(); - } + // focus the search field when the panel opens + onMount(() => { + inputRef?.focus(); }); - // adapt DirectMessage[] to a searchable shape - const searchableMessages = createMemo((): ChatMessage[] => - allMessages().map((m) => ({ - id: m.id, - channel_id: `dm_${props.peerId}`, - author_id: m.from_peer, - author_name: m.from_display_name, - content: m.content, - timestamp: m.timestamp, - edited: false, - })), - ); + onCleanup(() => { + if (searchDebounceTimer) { + clearTimeout(searchDebounceTimer); + } + activeSearchId += 1; + }); const hasActiveFilters = createMemo(() => { return ( @@ -94,50 +80,97 @@ const DMSearchPanel: Component = (props) => { ); }); - const filteredMessages = createMemo(() => { - const q = query().toLowerCase().trim(); + createEffect(() => { + const textQuery = query().trim(); const from = fromFilter(); const media = mediaFilter(); const mentions = mentionsOnly(); const after = dateAfter(); const before = dateBefore(); + const hasFilters = hasActiveFilters(); - // no search or filters active, return nothing - if (!q && !hasActiveFilters()) return []; + if (searchDebounceTimer) { + clearTimeout(searchDebounceTimer); + searchDebounceTimer = undefined; + } - const afterTs = after ? new Date(after).getTime() : null; - const beforeTs = before - ? new Date(before).getTime() + 86_400_000 - : null; + if (!textQuery && !hasFilters) { + setLoading(false); + setResults([]); + return; + } - return searchableMessages().filter((msg) => { - // text query - if (q && !msg.content.toLowerCase().includes(q)) return false; + const searchId = ++activeSearchId; + setLoading(true); - // from filter - if (from === "me" && msg.author_id !== props.myPeerId) return false; - if (from === "them" && msg.author_id === props.myPeerId) return false; + searchDebounceTimer = setTimeout(async () => { + const dateAfterTs = after ? new Date(after).getTime() : null; + const dateBeforeTs = before + ? new Date(before).getTime() + 86_399_999 + : null; - // date range - if (afterTs && msg.timestamp < afterTs) return false; - if (beforeTs && msg.timestamp > beforeTs) return false; + try { + const nextResults = await tauri.searchDMMessages(props.peerId, { + query: textQuery || undefined, + from_filter: from, + media_filter: media, + mentions_only: mentions, + date_after: dateAfterTs, + date_before: dateBeforeTs, + limit: SEARCH_LIMIT, + }); - // media type - if (media) { - const content = msg.content.trim(); - if (media === "images" && !IMAGE_REGEX.test(content)) return false; - if (media === "videos" && !VIDEO_REGEX.test(content)) return false; - if (media === "links" && !LINK_REGEX.test(content)) return false; - if (media === "files" && !FILE_REGEX.test(content)) return false; + if (searchId !== activeSearchId) return; + + setResultScrollTop(0); + if (resultsRef) { + resultsRef.scrollTop = 0; + setResultViewportHeight(resultsRef.clientHeight || 320); + } + setResults(nextResults); + } catch (error) { + if (searchId !== activeSearchId) return; + console.error("failed to search dm messages:", error); + setResults([]); + } finally { + if (searchId === activeSearchId) { + setLoading(false); + } } - - // mentions only - if (mentions && extractMentions(msg.content).length === 0) return false; - - return true; - }); + }, 120); }); + const totalResultHeight = createMemo( + () => results().length * RESULT_ROW_HEIGHT, + ); + + const visibleResults = createMemo(() => { + const rows = results(); + const viewport = resultViewportHeight(); + const scrollTop = resultScrollTop(); + + const startIndex = Math.max( + 0, + Math.floor(scrollTop / RESULT_ROW_HEIGHT) - RESULT_OVERSCAN, + ); + const endIndex = Math.min( + rows.length, + Math.ceil((scrollTop + viewport) / RESULT_ROW_HEIGHT) + RESULT_OVERSCAN, + ); + + const slice = rows.slice(startIndex, endIndex); + return slice.map((message, index) => ({ + message, + index: startIndex + index, + })); + }); + + function handleResultScroll() { + if (!resultsRef) return; + setResultScrollTop(resultsRef.scrollTop); + setResultViewportHeight(resultsRef.clientHeight || 320); + } + function clearAllFilters() { setQuery(""); setFromFilter("anyone"); @@ -147,24 +180,20 @@ const DMSearchPanel: Component = (props) => { setDateBefore(""); } - function handleJump(messageId: string) { - props.onJumpToMessage(messageId, allMessages()); + function handleJump(message: DirectMessage) { + props.onJumpToMessage(message.id, message.timestamp); } // highlight matching text in a result snippet function highlightMatch(text: string): string { - const q = query().trim(); - if (!q) return escapeHtml(truncate(text, 120)); + const textQuery = query().trim(); + if (!textQuery) return escapeHtml(truncate(text, 120)); const escaped = escapeHtml(truncate(text, 120)); - const regex = new RegExp( - `(${escapeRegex(escapeHtml(q))})`, - "gi", - ); - return escaped.replace( - regex, - '$1', - ); + const escapedQuery = escapeRegex(escapeHtml(textQuery)); + const regex = new RegExp(`(${escapedQuery})`, "gi"); + + return escaped.replace(regex, '$1'); } return ( @@ -179,30 +208,33 @@ const DMSearchPanel: Component = (props) => { > + setQuery(e.currentTarget.value)} - disabled={loading()} - class="flex-1 bg-transparent text-[14px] text-white placeholder:text-white/30 outline-none disabled:opacity-50" + onInput={(event) => setQuery(event.currentTarget.value)} + class="flex-1 bg-transparent text-[14px] text-white placeholder:text-white/30 outline-none" /> + - {filteredMessages().length} result{filteredMessages().length !== 1 ? "s" : ""} + {results().length} result{results().length !== 1 ? "s" : ""} + + - )} - + + )} + + @@ -374,7 +430,6 @@ const DMSearchPanel: Component = (props) => { ); }; -// reusable filter chip interface FilterChipProps { active: boolean; onClick: () => void; @@ -397,13 +452,12 @@ const FilterChip: Component = (props) => ( ); -// utilities function escapeHtml(str: string): string { return str .replace(/&/g, "&") .replace(//g, ">") - .replace(/"/g, """); + .replace(/\"/g, """); } function escapeRegex(str: string): string { diff --git a/src/components/chat/VirtualMessageList.tsx b/src/components/chat/VirtualMessageList.tsx new file mode 100644 index 0000000..0dda05c --- /dev/null +++ b/src/components/chat/VirtualMessageList.tsx @@ -0,0 +1,451 @@ +import type { Component } from "solid-js"; +import { + For, + Show, + createEffect, + createMemo, + createSignal, + on, + onCleanup, + onMount, + untrack, +} from "solid-js"; +import type { ChatMessage } from "../../lib/types"; +import { + isWithinGroupWindow, + isDifferentDay, + formatDaySeparator, +} from "../../lib/utils"; +import Message from "./Message"; +import { ArrowDown } from "lucide-solid"; + +interface VirtualMessageListProps { + messages: ChatMessage[]; + conversationKey: string; + focusMessageId?: string | null; + onLoadMore?: () => void; +} + +interface MessageRenderMeta { + message: ChatMessage; + isGrouped: boolean; + isFirstInGroup: boolean; + isLastInGroup: boolean; + showDaySeparator: boolean; + isLastMessage: boolean; +} + +interface VirtualRow { + key: string; + type: "separator" | "message"; + top: number; + height: number; + separatorLabel?: string; + meta?: MessageRenderMeta; +} + +const OVERSCAN_PX = 600; +const DAY_SEPARATOR_ESTIMATE = 42; + +const VirtualMessageList: Component = (props) => { + let containerRef: HTMLDivElement | undefined; + let rowResizeObserver: ResizeObserver | undefined; + let clearHighlightTimer: ReturnType | undefined; + + const [scrollTop, setScrollTop] = createSignal(0); + const [viewportHeight, setViewportHeight] = createSignal(0); + const [showScrollButton, setShowScrollButton] = createSignal(false); + const [isAtBottom, setIsAtBottom] = createSignal(true); + const [prevMessageCount, setPrevMessageCount] = createSignal(0); + const [shouldAnimateLast, setShouldAnimateLast] = createSignal(false); + const [measuredHeights, setMeasuredHeights] = createSignal< + Record + >({}); + const [highlightedMessageId, setHighlightedMessageId] = createSignal< + string | null + >(null); + + let lastLoadRequestAt = 0; + let pendingPrependCompensation: + | { + totalHeight: number; + scrollTop: number; + oldestMessageId: string | null; + } + | null = null; + + const messageMeta = createMemo((): MessageRenderMeta[] => { + const messages = props.messages; + + return messages.map((message, index) => { + const prev = index > 0 ? messages[index - 1] : undefined; + const next = + index < messages.length - 1 ? messages[index + 1] : undefined; + + const isFirstInGroup = + !prev || + prev.author_id !== message.author_id || + !isWithinGroupWindow(prev.timestamp, message.timestamp); + + const isLastInGroup = + !next || + next.author_id !== message.author_id || + !isWithinGroupWindow(message.timestamp, next.timestamp); + + const showDaySeparator = + !prev || isDifferentDay(prev.timestamp, message.timestamp); + + return { + message, + isGrouped: !isFirstInGroup, + isFirstInGroup, + isLastInGroup, + showDaySeparator, + isLastMessage: index === messages.length - 1, + }; + }); + }); + + const rows = createMemo(() => { + const heights = measuredHeights(); + const rendered = messageMeta(); + + const virtualRows: VirtualRow[] = []; + let cursorTop = 0; + + for (const meta of rendered) { + if (meta.showDaySeparator) { + const rowKey = `sep:${meta.message.id}`; + const height = heights[rowKey] ?? DAY_SEPARATOR_ESTIMATE; + virtualRows.push({ + key: rowKey, + type: "separator", + top: cursorTop, + height, + separatorLabel: formatDaySeparator(meta.message.timestamp), + }); + cursorTop += height; + } + + const rowKey = `msg:${meta.message.id}`; + const estimatedHeight = estimateMessageHeight( + meta.message.content, + meta.isFirstInGroup, + ); + const height = heights[rowKey] ?? estimatedHeight; + + virtualRows.push({ + key: rowKey, + type: "message", + top: cursorTop, + height, + meta, + }); + cursorTop += height; + } + + return { + items: virtualRows, + totalHeight: cursorTop, + }; + }); + + const visibleRows = createMemo(() => { + const allRows = rows().items; + if (allRows.length === 0) return []; + + const startY = Math.max(0, scrollTop() - OVERSCAN_PX); + const endY = scrollTop() + viewportHeight() + OVERSCAN_PX; + + const startIndex = Math.max(0, findFirstVisibleRowIndex(allRows, startY) - 2); + + let endIndex = startIndex; + while (endIndex < allRows.length && allRows[endIndex].top < endY) { + endIndex += 1; + } + + return allRows.slice(startIndex, Math.min(allRows.length, endIndex + 2)); + }); + + function setMeasuredHeight(rowKey: string, nextHeight: number) { + const roundedHeight = Math.ceil(nextHeight); + if (roundedHeight <= 0) return; + + setMeasuredHeights((prev) => { + if (prev[rowKey] === roundedHeight) return prev; + return { ...prev, [rowKey]: roundedHeight }; + }); + } + + function observeRow(el: HTMLDivElement, rowKey: string) { + el.dataset.virtualKey = rowKey; + rowResizeObserver?.observe(el); + } + + function syncViewportMetrics() { + if (!containerRef) return; + setViewportHeight(containerRef.clientHeight); + } + + function scrollToBottom(smooth = true) { + if (!containerRef) return; + + containerRef.scrollTo({ + top: rows().totalHeight, + behavior: smooth ? "smooth" : "auto", + }); + } + + function scrollToMessage(messageId: string) { + if (!containerRef) return; + + const messageRow = rows().items.find( + (row) => row.type === "message" && row.meta?.message.id === messageId, + ); + + if (!messageRow) return; + + const targetTop = Math.max( + 0, + messageRow.top - Math.floor(containerRef.clientHeight * 0.35), + ); + + containerRef.scrollTo({ top: targetTop, behavior: "smooth" }); + setHighlightedMessageId(messageId); + + if (clearHighlightTimer) { + clearTimeout(clearHighlightTimer); + } + clearHighlightTimer = setTimeout(() => { + setHighlightedMessageId(null); + }, 2000); + } + + function maybeLoadOlderMessages() { + if (!props.onLoadMore || !containerRef) return; + if (containerRef.scrollTop > 120) return; + + const now = Date.now(); + if (now - lastLoadRequestAt < 400) return; + lastLoadRequestAt = now; + + pendingPrependCompensation = { + totalHeight: rows().totalHeight, + scrollTop: containerRef.scrollTop, + oldestMessageId: props.messages[0]?.id ?? null, + }; + + props.onLoadMore(); + } + + function handleScroll() { + if (!containerRef) return; + + const currentScrollTop = containerRef.scrollTop; + setScrollTop(currentScrollTop); + + const distanceFromBottom = + rows().totalHeight - currentScrollTop - containerRef.clientHeight; + const atBottom = distanceFromBottom < 64; + + setIsAtBottom(atBottom); + setShowScrollButton(!atBottom); + + maybeLoadOlderMessages(); + } + + createEffect(() => { + const currentCount = props.messages.length; + const prevCount = untrack(() => prevMessageCount()); + + if (currentCount > prevCount && prevCount > 0) { + setShouldAnimateLast(true); + } else { + setShouldAnimateLast(false); + } + + setPrevMessageCount(currentCount); + }); + + createEffect(() => { + const messageCount = props.messages.length; + if (messageCount === 0) return; + + if (isAtBottom()) { + requestAnimationFrame(() => scrollToBottom(true)); + } + }); + + createEffect(() => { + const totalHeight = rows().totalHeight; + if (!containerRef || !pendingPrependCompensation) return; + + const currentOldestMessageId = props.messages[0]?.id ?? null; + if (currentOldestMessageId === pendingPrependCompensation.oldestMessageId) { + pendingPrependCompensation = null; + return; + } + + if (totalHeight <= pendingPrependCompensation.totalHeight) return; + + const delta = totalHeight - pendingPrependCompensation.totalHeight; + containerRef.scrollTop = pendingPrependCompensation.scrollTop + delta; + pendingPrependCompensation = null; + }); + + createEffect( + on( + () => props.focusMessageId, + (messageId) => { + if (!messageId) return; + requestAnimationFrame(() => scrollToMessage(messageId)); + }, + ), + ); + + createEffect( + on( + () => props.conversationKey, + () => { + setMeasuredHeights({}); + setHighlightedMessageId(null); + setShowScrollButton(false); + setIsAtBottom(true); + pendingPrependCompensation = null; + requestAnimationFrame(() => { + scrollToBottom(false); + handleScroll(); + }); + }, + ), + ); + + onMount(() => { + rowResizeObserver = new ResizeObserver((entries) => { + for (const entry of entries) { + const el = entry.target as HTMLElement; + const rowKey = el.dataset.virtualKey; + if (!rowKey) continue; + setMeasuredHeight(rowKey, entry.contentRect.height); + } + }); + + syncViewportMetrics(); + requestAnimationFrame(() => { + scrollToBottom(false); + handleScroll(); + }); + + window.addEventListener("resize", syncViewportMetrics); + }); + + onCleanup(() => { + window.removeEventListener("resize", syncViewportMetrics); + rowResizeObserver?.disconnect(); + if (clearHighlightTimer) { + clearTimeout(clearHighlightTimer); + } + }); + + return ( +
+
+
+ + {(row) => ( +
observeRow(el, row.key)} + class="absolute left-0 right-0" + style={{ transform: `translateY(${row.top}px)` }} + > + +
+
+ + {row.separatorLabel} + +
+
+ + + +
+
+ +
+
+
+
+ )} + +
+
+ + + + +
+ ); +}; + +function estimateMessageHeight(content: string, isFirstInGroup: boolean): number { + const baseHeight = isFirstInGroup ? 82 : 46; + const charLines = Math.max(0, Math.ceil(content.length / 90) - 1); + const newlineLines = Math.max(0, content.split("\n").length - 1); + const extraLines = Math.min(8, charLines + newlineLines); + + return baseHeight + extraLines * 18; +} + +function findFirstVisibleRowIndex(rows: VirtualRow[], offset: number): number { + let low = 0; + let high = rows.length - 1; + let best = 0; + + while (low <= high) { + const mid = Math.floor((low + high) / 2); + const row = rows[mid]; + + if (row.top + row.height < offset) { + low = mid + 1; + } else { + best = mid; + high = mid - 1; + } + } + + return best; +} + +export default VirtualMessageList; diff --git a/src/components/layout/DMChatArea.tsx b/src/components/layout/DMChatArea.tsx index babc27b..6260715 100644 --- a/src/components/layout/DMChatArea.tsx +++ b/src/components/layout/DMChatArea.tsx @@ -1,40 +1,60 @@ import type { Component } from "solid-js"; -import { Show, createMemo, createSignal } from "solid-js"; +import { Show, createMemo, createSignal, createEffect, on } from "solid-js"; import { Phone, Pin, Search } from "lucide-solid"; import { activeDMConversation, dmMessages, dmTypingPeers, + prependDMMessages, setDMMessages, } from "../../stores/dms"; import { onlinePeerIds } from "../../stores/members"; import { identity } from "../../stores/identity"; -import MessageList from "../chat/MessageList"; +import VirtualMessageList from "../chat/VirtualMessageList"; import MessageInput from "../chat/MessageInput"; import TypingIndicator from "../chat/TypingIndicator"; import DMSearchPanel from "../chat/DMSearchPanel"; import Avatar from "../common/Avatar"; import IconButton from "../common/IconButton"; -import type { ChatMessage, DirectMessage } from "../../lib/types"; +import type { ChatMessage } from "../../lib/types"; +import * as tauri from "../../lib/tauri"; interface DMChatAreaProps { onSendDM: (content: string) => void; onTyping: () => void; } +const HISTORY_PAGE_SIZE = 80; +const JUMP_WINDOW_SIZE = 500; + const DMChatArea: Component = (props) => { const [searchOpen, setSearchOpen] = createSignal(false); + const [focusMessageId, setFocusMessageId] = createSignal(null); + const [loadingHistory, setLoadingHistory] = createSignal(false); + const [hasMoreHistory, setHasMoreHistory] = createSignal(true); + const dm = () => activeDMConversation(); - // adapt DirectMessage[] to ChatMessage[] so the existing MessageList works + createEffect( + on( + () => dm()?.peer_id, + () => { + setFocusMessageId(null); + setLoadingHistory(false); + setHasMoreHistory(true); + }, + ), + ); + + // adapt direct messages to chat message shape so we can share rendering logic const adaptedMessages = createMemo((): ChatMessage[] => - dmMessages().map((m) => ({ - id: m.id, - channel_id: `dm_${m.from_peer === dm()?.peer_id ? m.from_peer : m.to_peer}`, - author_id: m.from_peer, - author_name: m.from_display_name, - content: m.content, - timestamp: m.timestamp, + dmMessages().map((message) => ({ + id: message.id, + channel_id: `dm_${message.from_peer === dm()?.peer_id ? message.from_peer : message.to_peer}`, + author_id: message.from_peer, + author_name: message.from_display_name, + content: message.content, + timestamp: message.timestamp, edited: false, })), ); @@ -47,29 +67,75 @@ const DMChatArea: Component = (props) => { return "offline"; }); - // scroll to a message by id, loading full history into the store if needed - function handleJumpToMessage( - messageId: string, - allMessages: DirectMessage[], - ) { - const alreadyLoaded = dmMessages().some((m) => m.id === messageId); + function focusMessage(messageId: string) { + setFocusMessageId(null); + requestAnimationFrame(() => { + setFocusMessageId(messageId); + }); + } - if (!alreadyLoaded) { - // replace the store with the full history so the target is in the dom - setDMMessages(allMessages); + async function loadOlderMessages() { + const peerId = dm()?.peer_id; + if (!peerId) return; + if (loadingHistory() || !hasMoreHistory()) return; + + const currentMessages = dmMessages(); + if (currentMessages.length === 0) return; + + const oldestTimestamp = currentMessages[0].timestamp; + + setLoadingHistory(true); + try { + const olderMessages = await tauri.getDMMessages( + peerId, + oldestTimestamp, + HISTORY_PAGE_SIZE, + ); + + if (olderMessages.length === 0) { + setHasMoreHistory(false); + return; + } + + prependDMMessages(olderMessages); + + if (olderMessages.length < HISTORY_PAGE_SIZE) { + setHasMoreHistory(false); + } + } catch (error) { + console.error("failed to load older dm messages:", error); + } finally { + setLoadingHistory(false); + } + } + + // scroll to a message by id and lazy-load a focused history window if needed + async function handleJumpToMessage(messageId: string, timestamp: number) { + const peerId = dm()?.peer_id; + if (!peerId) return; + + const alreadyLoaded = dmMessages().some((message) => message.id === messageId); + if (alreadyLoaded) { + focusMessage(messageId); + return; } - // wait for the dom to update then scroll and highlight - requestAnimationFrame(() => { - const el = document.querySelector( - `[data-message-id="${messageId}"]`, - ) as HTMLElement | null; - if (!el) return; + try { + const aroundTarget = await tauri.getDMMessages( + peerId, + timestamp + 1, + JUMP_WINDOW_SIZE, + ); - el.scrollIntoView({ behavior: "smooth", block: "center" }); - el.classList.add("dusk-msg-search-highlight"); - setTimeout(() => el.classList.remove("dusk-msg-search-highlight"), 2000); - }); + if (aroundTarget.length > 0) { + setDMMessages(aroundTarget); + setHasMoreHistory(aroundTarget.length >= JUMP_WINDOW_SIZE); + } + + focusMessage(messageId); + } catch (error) { + console.error("failed to jump to dm search result:", error); + } } // typing indicator names @@ -78,7 +144,7 @@ const DMChatArea: Component = (props) => { if (typing.length === 0) return []; const peer = dm(); if (!peer) return []; - // for dms there's only ever one person who can be typing + // for dms theres only ever one person who can be typing return typing.includes(peer.peer_id) ? [peer.display_name] : []; }); @@ -105,7 +171,7 @@ const DMChatArea: Component = (props) => { setSearchOpen((v) => !v)} + onClick={() => setSearchOpen((value) => !value)} > @@ -123,7 +189,6 @@ const DMChatArea: Component = (props) => { setSearchOpen(false)} onJumpToMessage={handleJumpToMessage} @@ -148,7 +213,12 @@ const DMChatArea: Component = (props) => {
} > - + {/* typing indicator */} diff --git a/src/components/layout/ServerList.tsx b/src/components/layout/ServerList.tsx index 518ea2a..f4553bb 100644 --- a/src/components/layout/ServerList.tsx +++ b/src/components/layout/ServerList.tsx @@ -6,28 +6,38 @@ import { activeCommunityId, setActiveCommunity, } from "../../stores/communities"; -import { setActiveDM } from "../../stores/dms"; +import { dmConversations, setActiveDM } from "../../stores/dms"; import { getInitials, hashColor } from "../../lib/utils"; import { openModal } from "../../stores/ui"; const ServerList: Component = () => { + const unreadDMCount = () => + dmConversations().reduce((total, dm) => total + dm.unread_count, 0); + return (
{/* home button */} - +
+ + 0}> +
+ {unreadDMCount() > 99 ? "99+" : unreadDMCount()} +
+
+
diff --git a/src/lib/tauri.ts b/src/lib/tauri.ts index 19e29c0..ca342cb 100644 --- a/src/lib/tauri.ts +++ b/src/lib/tauri.ts @@ -15,6 +15,7 @@ import type { VoiceMediaState, DirectMessage, DMConversationMeta, + DMSearchFilters, GifResponse, } from "./types"; @@ -374,6 +375,22 @@ export async function getDMMessages( return invoke("get_dm_messages", { peerId, before, limit }); } +export async function searchDMMessages( + peerId: string, + filters: DMSearchFilters, +): Promise { + return invoke("search_dm_messages", { + peerId, + query: filters.query, + fromFilter: filters.from_filter, + mediaFilter: filters.media_filter, + mentionsOnly: filters.mentions_only, + dateAfter: filters.date_after, + dateBefore: filters.date_before, + limit: filters.limit, + }); +} + export async function getDMConversations(): Promise { return invoke("get_dm_conversations"); } diff --git a/src/lib/types.ts b/src/lib/types.ts index adea561..692e445 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -125,6 +125,19 @@ export interface DMConversationMeta { unread_count: number; } +export type DMSearchFrom = "anyone" | "me" | "them"; +export type DMSearchMedia = "images" | "videos" | "links" | "files"; + +export interface DMSearchFilters { + query?: string; + from_filter?: DMSearchFrom; + media_filter?: DMSearchMedia | null; + mentions_only?: boolean; + date_after?: number | null; + date_before?: number | null; + limit?: number; +} + export interface Member { peer_id: string; display_name: string; diff --git a/src/stores/communities.ts b/src/stores/communities.ts index 37e33aa..1e61b1c 100644 --- a/src/stores/communities.ts +++ b/src/stores/communities.ts @@ -8,7 +8,16 @@ const [activeCommunityId, setActiveCommunityId] = createSignal( ); export function addCommunity(community: CommunityMeta) { - setCommunities((prev) => [...prev, community]); + setCommunities((prev) => { + const existingIndex = prev.findIndex((item) => item.id === community.id); + if (existingIndex === -1) { + return [...prev, community]; + } + + const next = [...prev]; + next[existingIndex] = community; + return next; + }); } export function removeCommunity(id: string) { diff --git a/src/stores/dms.ts b/src/stores/dms.ts index 8f5faaa..37a2bfa 100644 --- a/src/stores/dms.ts +++ b/src/stores/dms.ts @@ -61,7 +61,12 @@ export function clearDMUnread(peerId: string) { } export function addDMMessage(message: DirectMessage) { - setDMMessages((prev) => [...prev, message]); + setDMMessages((prev) => mergeUniqueMessages([...prev, message])); +} + +export function prependDMMessages(messages: DirectMessage[]) { + if (messages.length === 0) return; + setDMMessages((prev) => mergeUniqueMessages([...messages, ...prev])); } export function clearDMMessages() { @@ -143,3 +148,16 @@ export { setDMConversations, setDMMessages, }; + +function mergeUniqueMessages(messages: DirectMessage[]): DirectMessage[] { + const seen = new Set(); + const merged: DirectMessage[] = []; + + for (const message of messages) { + if (seen.has(message.id)) continue; + seen.add(message.id); + merged.push(message); + } + + return merged; +}