From 718bd9557ef7b2df70dfc26b6d2b2d4acc481346 Mon Sep 17 00:00:00 2001 From: cloudwithax Date: Sun, 15 Feb 2026 12:37:42 -0500 Subject: [PATCH] feat(dev-server): add development-only HTTP API for programmatic access feat(Cargo.toml): add axum as a dependency with dev-server feature fix(package.json): update tauri script to include dev-server option --- package.json | 3 +- src-tauri/Cargo.lock | 94 ++++ src-tauri/Cargo.toml | 6 + src-tauri/src/dev_server.rs | 1062 +++++++++++++++++++++++++++++++++++ src-tauri/src/lib.rs | 19 + 5 files changed, 1183 insertions(+), 1 deletion(-) create mode 100644 src-tauri/src/dev_server.rs diff --git a/package.json b/package.json index 35f0b16..9a123b3 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,8 @@ "dev": "vite", "build": "vite build", "preview": "vite preview", - "tauri": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri" + "tauri": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri", + "tauri:dev-server": "WEBKIT_DISABLE_DMABUF_RENDERER=1 GDK_BACKEND=x11 tauri dev -- -- --features dev-server" }, "license": "MIT", "dependencies": { diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index a044309..f25e163 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -300,6 +300,61 @@ dependencies = [ "uuid", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "base-x" version = "0.2.11" @@ -1107,6 +1162,7 @@ name = "dusk-chat" version = "0.1.0" dependencies = [ "automerge", + "axum", "bs58", "directories", "dotenvy", @@ -2056,6 +2112,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -3193,6 +3250,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.8.0" @@ -4751,6 +4814,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "same-file" version = "1.0.6" @@ -4911,6 +4980,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -4940,6 +5020,18 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "3.16.1" @@ -5924,6 +6016,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -5962,6 +6055,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 860b919..add6863 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -58,6 +58,12 @@ dotenvy = "0.15" # async utilities futures = "0.3" +# dev-only http api (behind feature flag, never in production) +axum = { version = "0.7", optional = true } + +[features] +dev-server = ["axum"] + # platform-specific: webview media permissions on linux [target.'cfg(target_os = "linux")'.dependencies] webkit2gtk = "2.0" diff --git a/src-tauri/src/dev_server.rs b/src-tauri/src/dev_server.rs new file mode 100644 index 0000000..42732c5 --- /dev/null +++ b/src-tauri/src/dev_server.rs @@ -0,0 +1,1062 @@ +// development-only http api for programmatic access to the app +// +// only compiled when the `dev-server` cargo feature is enabled. +// binds to 127.0.0.1:3333 by default (override with DUSK_DEV_PORT env var). +// all endpoints operate on the same shared state as the tauri commands, +// so changes made here are immediately visible in the running ui. +// +// NEVER enable this in production builds. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Json}; +use axum::routing::{delete, get, post, put}; +use axum::Router; +use serde::Deserialize; +use tokio::sync::Mutex; + +use crate::crdt::CrdtEngine; +use crate::node::gossip; +use crate::node::NodeCommand; +use crate::protocol::community::{ChannelKind, ChannelMeta, CommunityMeta, Member}; +use crate::protocol::identity::{DirectoryEntry, DuskIdentity}; +use crate::protocol::messages::{ + ChatMessage, DMConversationMeta, DirectMessage, GossipMessage, PeerStatus, VoiceParticipant, +}; +use crate::storage::{DiskStorage, UserSettings}; + +// mirrors the fields from AppState but owned so it can be moved into axum +#[derive(Clone)] +pub struct DevState { + pub identity: Arc>>, + pub crdt_engine: Arc>, + pub storage: Arc, + pub node_handle: Arc>>, + pub voice_channels: Arc>>>, + pub app_handle: tauri::AppHandle, +} + +// unified error response so all handlers return consistent json +struct ApiError(StatusCode, String); + +impl IntoResponse for ApiError { + fn into_response(self) -> axum::response::Response { + let body = serde_json::json!({ "error": self.1 }); + (self.0, Json(body)).into_response() + } +} + +impl From for ApiError { + fn from(msg: String) -> Self { + ApiError(StatusCode::INTERNAL_SERVER_ERROR, msg) + } +} + +type ApiResult = Result, ApiError>; + +pub async fn start(state: DevState) { + let port: u16 = std::env::var("DUSK_DEV_PORT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3333); + + let app = Router::new() + // identity + .route("/api/identity", get(get_identity)) + .route("/api/identity", put(update_profile)) + .route("/api/settings", get(get_settings)) + .route("/api/settings", put(save_settings)) + // directory + .route("/api/directory", get(get_directory)) + .route("/api/directory/search", get(search_directory)) + .route("/api/friends", get(get_friends)) + .route("/api/friends/{peer_id}", post(add_friend)) + .route("/api/friends/{peer_id}", delete(remove_friend)) + // communities + .route("/api/communities", get(get_communities)) + .route("/api/communities", post(create_community)) + .route("/api/communities/join", post(join_community)) + .route( + "/api/communities/{community_id}/invite", + post(generate_invite), + ) + .route("/api/communities/{community_id}/members", get(get_members)) + .route("/api/communities/{community_id}", delete(leave_community)) + // channels + .route( + "/api/communities/{community_id}/channels", + get(get_channels), + ) + .route( + "/api/communities/{community_id}/channels", + post(create_channel), + ) + // messages + .route("/api/channels/{channel_id}/messages", get(get_messages)) + .route("/api/channels/{channel_id}/messages", post(send_message)) + .route( + "/api/communities/{community_id}/messages/{message_id}", + delete(delete_message), + ) + // direct messages + .route("/api/dm", get(get_dm_conversations)) + .route("/api/dm/{peer_id}", get(get_dm_messages)) + .route("/api/dm/{peer_id}", post(send_dm)) + .route("/api/dm/{peer_id}", delete(delete_dm_conversation)) + // node control + .route("/api/node/start", post(start_node)) + .route("/api/node/stop", post(stop_node)) + .route("/api/node/status", get(get_node_status)) + .with_state(state); + + let addr = format!("127.0.0.1:{}", port); + log::info!("dev server listening on http://{}", addr); + + let listener = tokio::net::TcpListener::bind(&addr) + .await + .expect("failed to bind dev server"); + + axum::serve(listener, app) + .await + .expect("dev server crashed"); +} + +// -- helpers -- + +fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +} + +// find the community that owns a given channel +fn find_community_for_channel( + engine: &crate::crdt::CrdtEngine, + channel_id: &str, +) -> Result { + for community_id in engine.community_ids() { + if let Ok(channels) = engine.get_channels(&community_id) { + if channels.iter().any(|ch| ch.id == channel_id) { + return Ok(community_id); + } + } + } + Err(ApiError( + StatusCode::NOT_FOUND, + format!("no community found containing channel {}", channel_id), + )) +} + +// -- identity -- + +async fn get_identity(State(state): State) -> ApiResult { + let identity = state.identity.lock().await; + match identity.as_ref() { + Some(id) => Ok(Json(serde_json::to_value(id.public_identity()).unwrap())), + None => Err(ApiError(StatusCode::NOT_FOUND, "no identity loaded".into())), + } +} + +#[derive(Deserialize)] +struct UpdateProfileBody { + display_name: String, + bio: String, +} + +async fn update_profile( + State(state): State, + Json(body): Json, +) -> ApiResult { + let mut identity = state.identity.lock().await; + let id = identity + .as_mut() + .ok_or_else(|| ApiError(StatusCode::NOT_FOUND, "no identity loaded".into()))?; + + id.display_name = body.display_name; + id.bio = body.bio; + id.save(&state.storage) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + + let public = id.public_identity(); + Ok(Json(serde_json::to_value(public).unwrap())) +} + +// -- settings -- + +async fn get_settings(State(state): State) -> ApiResult { + state + .storage + .load_settings() + .map(Json) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e))) +} + +async fn save_settings( + State(state): State, + Json(settings): Json, +) -> ApiResult { + // sync display name to identity if it changed + let mut identity = state.identity.lock().await; + if let Some(id) = identity.as_mut() { + if id.display_name != settings.display_name { + id.display_name = settings.display_name.clone(); + let _ = id.save(&state.storage); + } + } + drop(identity); + + state + .storage + .save_settings(&settings) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + Ok(Json(serde_json::json!({ "ok": true }))) +} + +// -- directory -- + +async fn get_directory(State(state): State) -> ApiResult> { + let entries = state + .storage + .load_directory() + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + let mut peers: Vec = entries.into_values().collect(); + peers.sort_by(|a, b| b.last_seen.cmp(&a.last_seen)); + Ok(Json(peers)) +} + +#[derive(Deserialize)] +struct SearchQuery { + q: String, +} + +async fn search_directory( + State(state): State, + Query(params): Query, +) -> ApiResult> { + let entries = state + .storage + .load_directory() + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + let query_lower = params.q.to_lowercase(); + let mut results: Vec = entries + .into_values() + .filter(|entry| { + entry.display_name.to_lowercase().contains(&query_lower) + || entry.peer_id.to_lowercase().contains(&query_lower) + }) + .collect(); + + results.sort_by(|a, b| b.last_seen.cmp(&a.last_seen)); + Ok(Json(results)) +} + +async fn get_friends(State(state): State) -> ApiResult> { + let entries = state + .storage + .load_directory() + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + let mut friends: Vec = entries + .into_values() + .filter(|entry| entry.is_friend) + .collect(); + + friends.sort_by(|a, b| { + a.display_name + .to_lowercase() + .cmp(&b.display_name.to_lowercase()) + }); + Ok(Json(friends)) +} + +async fn add_friend( + State(state): State, + Path(peer_id): Path, +) -> ApiResult { + state + .storage + .set_friend_status(&peer_id, true) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + Ok(Json(serde_json::json!({ "ok": true }))) +} + +async fn remove_friend( + State(state): State, + Path(peer_id): Path, +) -> ApiResult { + state + .storage + .set_friend_status(&peer_id, false) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + Ok(Json(serde_json::json!({ "ok": true }))) +} + +// -- communities -- + +async fn get_communities(State(state): State) -> ApiResult> { + let engine = state.crdt_engine.lock().await; + let mut communities = Vec::new(); + for id in engine.community_ids() { + if let Ok(meta) = engine.get_community_meta(&id) { + communities.push(meta); + } + } + Ok(Json(communities)) +} + +#[derive(Deserialize)] +struct CreateCommunityBody { + name: String, + description: String, +} + +async fn create_community( + State(state): State, + Json(body): Json, +) -> ApiResult { + use sha2::Digest; + + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + + let now = now_ms(); + + let mut hasher = sha2::Sha256::new(); + hasher.update(body.name.as_bytes()); + hasher.update(id.peer_id.to_bytes()); + hasher.update(now.to_le_bytes()); + let hash = hasher.finalize(); + let community_id = format!("com_{}", &hex::encode(hash)[..16]); + + let peer_id_str = id.peer_id.to_string(); + drop(identity); + + let mut engine = state.crdt_engine.lock().await; + engine + .create_community(&community_id, &body.name, &body.description, &peer_id_str) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + + let meta = engine + .get_community_meta(&community_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + let _ = state.storage.save_community_meta(&meta); + drop(engine); + + // subscribe to community topics on the p2p node + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let presence_topic = gossip::topic_for_presence(&community_id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: presence_topic, + }) + .await; + + let engine = state.crdt_engine.lock().await; + if let Ok(channels) = engine.get_channels(&community_id) { + for channel in &channels { + let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { topic: msg_topic }) + .await; + + let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: typing_topic, + }) + .await; + } + } + + let namespace = format!("dusk/community/{}", community_id); + let _ = handle + .command_tx + .send(NodeCommand::RegisterRendezvous { namespace }) + .await; + } + + Ok(Json(meta)) +} + +#[derive(Deserialize)] +struct JoinCommunityBody { + invite_code: String, +} + +async fn join_community( + State(state): State, + Json(body): Json, +) -> ApiResult { + let invite = crate::protocol::community::InviteCode::decode(&body.invite_code) + .map_err(|e| ApiError(StatusCode::BAD_REQUEST, e))?; + + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + let peer_id_str = id.peer_id.to_string(); + drop(identity); + + let mut engine = state.crdt_engine.lock().await; + if !engine.has_community(&invite.community_id) { + engine + .create_community( + &invite.community_id, + &invite.community_name, + "", + &peer_id_str, + ) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + } + + let meta = engine + .get_community_meta(&invite.community_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + let _ = state.storage.save_community_meta(&meta); + + let channels = engine + .get_channels(&invite.community_id) + .unwrap_or_default(); + drop(engine); + + // subscribe and discover via rendezvous + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let presence_topic = gossip::topic_for_presence(&invite.community_id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: presence_topic, + }) + .await; + + for channel in &channels { + let msg_topic = gossip::topic_for_messages(&invite.community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { topic: msg_topic }) + .await; + + let typing_topic = gossip::topic_for_typing(&invite.community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: typing_topic, + }) + .await; + } + + let namespace = format!("dusk/community/{}", invite.community_id); + let _ = handle + .command_tx + .send(NodeCommand::RegisterRendezvous { + namespace: namespace.clone(), + }) + .await; + let _ = handle + .command_tx + .send(NodeCommand::DiscoverRendezvous { namespace }) + .await; + } + + Ok(Json(meta)) +} + +async fn leave_community( + State(state): State, + Path(community_id): Path, +) -> ApiResult { + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let engine = state.crdt_engine.lock().await; + if let Ok(channels) = engine.get_channels(&community_id) { + for channel in &channels { + let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { topic: msg_topic }) + .await; + + let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { + topic: typing_topic, + }) + .await; + } + } + + let presence_topic = gossip::topic_for_presence(&community_id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { + topic: presence_topic, + }) + .await; + } + + Ok(Json(serde_json::json!({ "ok": true }))) +} + +async fn generate_invite( + State(state): State, + Path(community_id): Path, +) -> ApiResult { + let engine = state.crdt_engine.lock().await; + let meta = engine + .get_community_meta(&community_id) + .map_err(|e| ApiError(StatusCode::NOT_FOUND, e))?; + drop(engine); + + let invite = crate::protocol::community::InviteCode { + community_id: meta.id, + community_name: meta.name, + }; + + Ok(Json(serde_json::json!({ "invite_code": invite.encode() }))) +} + +async fn get_members( + State(state): State, + Path(community_id): Path, +) -> ApiResult> { + let engine = state.crdt_engine.lock().await; + let mut members = engine + .get_members(&community_id) + .map_err(|e| ApiError(StatusCode::NOT_FOUND, e))?; + drop(engine); + + // overlay local user's current name + let identity = state.identity.lock().await; + if let Some(ref id) = *identity { + let local_peer = id.peer_id.to_string(); + if let Some(member) = members.iter_mut().find(|m| m.peer_id == local_peer) { + member.display_name = id.display_name.clone(); + member.status = PeerStatus::Online; + } + } + + Ok(Json(members)) +} + +// -- channels -- + +async fn get_channels( + State(state): State, + Path(community_id): Path, +) -> ApiResult> { + let engine = state.crdt_engine.lock().await; + let channels = engine + .get_channels(&community_id) + .map_err(|e| ApiError(StatusCode::NOT_FOUND, e))?; + Ok(Json(channels)) +} + +#[derive(Deserialize)] +struct CreateChannelBody { + name: String, + #[serde(default)] + topic: String, + #[serde(default)] + kind: Option, + #[serde(default)] + category_id: Option, +} + +async fn create_channel( + State(state): State, + Path(community_id): Path, + Json(body): Json, +) -> ApiResult { + use sha2::Digest; + + let mut hasher = sha2::Sha256::new(); + hasher.update(community_id.as_bytes()); + hasher.update(body.name.as_bytes()); + hasher.update(now_ms().to_le_bytes()); + let hash = hasher.finalize(); + let channel_id = format!("ch_{}", &hex::encode(hash)[..12]); + + let channel_kind = match body.kind.as_deref() { + Some("voice") | Some("Voice") => ChannelKind::Voice, + _ => ChannelKind::Text, + }; + + let channel = ChannelMeta { + id: channel_id, + community_id: community_id.clone(), + name: body.name, + topic: body.topic, + kind: channel_kind, + position: 0, + category_id: body.category_id, + }; + + let mut engine = state.crdt_engine.lock().await; + engine + .create_channel(&community_id, &channel) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + drop(engine); + + // subscribe to topics + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let msg_topic = gossip::topic_for_messages(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { topic: msg_topic }) + .await; + + let typing_topic = gossip::topic_for_typing(&community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: typing_topic, + }) + .await; + } + + Ok(Json(channel)) +} + +// -- messages -- + +#[derive(Deserialize)] +struct MessagesQuery { + before: Option, + limit: Option, +} + +async fn get_messages( + State(state): State, + Path(channel_id): Path, + Query(params): Query, +) -> ApiResult> { + let engine = state.crdt_engine.lock().await; + let community_id = find_community_for_channel(&engine, &channel_id)?; + let messages = engine + .get_messages( + &community_id, + &channel_id, + params.before, + params.limit.unwrap_or(50), + ) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + Ok(Json(messages)) +} + +#[derive(Deserialize)] +struct SendMessageBody { + content: String, +} + +async fn send_message( + State(state): State, + Path(channel_id): Path, + Json(body): Json, +) -> ApiResult { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + + let now = now_ms(); + let msg = ChatMessage { + id: format!("msg_{}_{}", id.peer_id, now), + channel_id: channel_id.clone(), + author_id: id.peer_id.to_string(), + author_name: id.display_name.clone(), + content: body.content, + timestamp: now, + edited: false, + }; + drop(identity); + + let mut engine = state.crdt_engine.lock().await; + let community_id = find_community_for_channel(&engine, &channel_id)?; + engine + .append_message(&community_id, &msg) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + drop(engine); + + // publish to gossipsub + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let topic = gossip::topic_for_messages(&community_id, &channel_id); + if let Ok(data) = serde_json::to_vec(&GossipMessage::Chat(msg.clone())) { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { topic, data }) + .await; + } + } + + Ok(Json(msg)) +} + +async fn delete_message( + State(state): State, + Path((community_id, message_id)): Path<(String, String)>, +) -> ApiResult { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + let peer_id_str = id.peer_id.to_string(); + drop(identity); + + let mut engine = state.crdt_engine.lock().await; + let message = engine + .get_message(&community_id, &message_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))? + .ok_or_else(|| { + ApiError( + StatusCode::NOT_FOUND, + format!("message {} not found", message_id), + ) + })?; + + if message.author_id != peer_id_str { + return Err(ApiError( + StatusCode::FORBIDDEN, + "not authorized to delete this message".into(), + )); + } + + engine + .delete_message(&community_id, &message_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + drop(engine); + + // broadcast deletion + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let engine = state.crdt_engine.lock().await; + if let Ok(channels) = engine.get_channels(&community_id) { + for channel in &channels { + let topic = gossip::topic_for_messages(&community_id, &channel.id); + let deletion = GossipMessage::DeleteMessage { + message_id: message_id.clone(), + }; + if let Ok(data) = serde_json::to_vec(&deletion) { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { topic, data }) + .await; + } + } + } + } + + Ok(Json(serde_json::json!({ "ok": true }))) +} + +// -- direct messages -- + +async fn get_dm_conversations(State(state): State) -> ApiResult> { + let conversations = state + .storage + .load_all_dm_conversations() + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + Ok(Json( + conversations.into_iter().map(|(_, meta)| meta).collect(), + )) +} + +async fn get_dm_messages( + State(state): State, + Path(peer_id): Path, + Query(params): Query, +) -> ApiResult> { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + let local_peer_id = id.peer_id.to_string(); + drop(identity); + + let conversation_id = gossip::dm_conversation_id(&local_peer_id, &peer_id); + let messages = state + .storage + .load_dm_messages(&conversation_id, params.before, params.limit.unwrap_or(50)) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + Ok(Json(messages)) +} + +#[derive(Deserialize)] +struct SendDmBody { + content: String, +} + +async fn send_dm( + State(state): State, + Path(peer_id): Path, + Json(body): Json, +) -> ApiResult { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + + let now = now_ms(); + let local_peer_id = id.peer_id.to_string(); + let display_name = id.display_name.clone(); + drop(identity); + + let msg = DirectMessage { + id: format!("dm_{}_{}", local_peer_id, now), + from_peer: local_peer_id.clone(), + to_peer: peer_id.clone(), + from_display_name: display_name, + content: body.content.clone(), + timestamp: now, + }; + + let conversation_id = gossip::dm_conversation_id(&local_peer_id, &peer_id); + + state + .storage + .append_dm_message(&conversation_id, &msg) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + // update conversation metadata + let peer_display_name = state + .storage + .load_dm_conversation(&conversation_id) + .ok() + .map(|m| m.display_name) + .unwrap_or_else(|| { + state + .storage + .load_directory() + .ok() + .and_then(|d| d.get(&peer_id).map(|e| e.display_name.clone())) + .unwrap_or_else(|| peer_id.clone()) + }); + + let meta = DMConversationMeta { + peer_id: peer_id.clone(), + display_name: peer_display_name, + last_message: Some(body.content), + last_message_time: Some(now), + unread_count: 0, + }; + + let _ = state.storage.save_dm_conversation(&conversation_id, &meta); + + // publish via gossipsub + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + if let Ok(data) = serde_json::to_vec(&GossipMessage::DirectMessage(msg.clone())) { + let pair_topic = gossip::topic_for_dm(&local_peer_id, &peer_id); + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: pair_topic, + data: data.clone(), + }) + .await; + + let inbox_topic = gossip::topic_for_dm_inbox(&peer_id); + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: inbox_topic, + data, + }) + .await; + } + } + + Ok(Json(msg)) +} + +async fn delete_dm_conversation( + State(state): State, + Path(peer_id): Path, +) -> ApiResult { + let identity = state.identity.lock().await; + let id = identity + .as_ref() + .ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?; + let local_peer_id = id.peer_id.to_string(); + drop(identity); + + let conversation_id = gossip::dm_conversation_id(&local_peer_id, &peer_id); + + // unsubscribe from topic + let node_handle = state.node_handle.lock().await; + if let Some(ref handle) = *node_handle { + let topic = gossip::topic_for_dm(&local_peer_id, &peer_id); + let _ = handle + .command_tx + .send(NodeCommand::Unsubscribe { topic }) + .await; + } + + state + .storage + .remove_dm_conversation(&conversation_id) + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, format!("{}", e)))?; + + Ok(Json(serde_json::json!({ "ok": true }))) +} + +// -- node control -- + +async fn start_node(State(state): State) -> ApiResult { + // check if already running + let node_handle = state.node_handle.lock().await; + if node_handle.is_some() { + return Err(ApiError( + StatusCode::CONFLICT, + "node is already running. if you need to restart, call POST /api/node/stop first" + .into(), + )); + } + drop(node_handle); + + let identity = state.identity.lock().await; + let id = identity.as_ref().ok_or_else(|| { + ApiError( + StatusCode::UNAUTHORIZED, + "no identity loaded, create one first".into(), + ) + })?; + + let custom_relay = state + .storage + .load_settings() + .ok() + .and_then(|s| s.custom_relay_addr); + + let handle = crate::node::start( + id.keypair.clone(), + state.crdt_engine.clone(), + state.storage.clone(), + state.app_handle.clone(), + state.voice_channels.clone(), + custom_relay, + ) + .await + .map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?; + + // build and broadcast profile announcement + let mut announcement = crate::protocol::messages::ProfileAnnouncement { + peer_id: id.peer_id.to_string(), + display_name: id.display_name.clone(), + bio: id.bio.clone(), + public_key: hex::encode(id.keypair.public().encode_protobuf()), + timestamp: now_ms() as u64, + verification_proof: id.verification_proof.clone(), + signature: String::new(), + }; + announcement.signature = crate::verification::sign_announcement(&id.keypair, &announcement); + drop(identity); + + // subscribe to global topics + let sync_topic = gossip::topic_for_sync(); + let directory_topic = gossip::topic_for_directory(); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { topic: sync_topic }) + .await; + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: directory_topic.clone(), + }) + .await; + + // announce profile + let announce_msg = GossipMessage::ProfileAnnounce(announcement); + if let Ok(data) = serde_json::to_vec(&announce_msg) { + let _ = handle + .command_tx + .send(NodeCommand::SendMessage { + topic: directory_topic, + data, + }) + .await; + } + + // subscribe to all known community topics + let engine = state.crdt_engine.lock().await; + let community_ids = engine.community_ids(); + drop(engine); + + for community_id in &community_ids { + let channels = { + let engine = state.crdt_engine.lock().await; + engine.get_channels(community_id).unwrap_or_default() + }; + + for channel in &channels { + let msg_topic = gossip::topic_for_messages(community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { topic: msg_topic }) + .await; + + let typing_topic = gossip::topic_for_typing(community_id, &channel.id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: typing_topic, + }) + .await; + } + + let presence_topic = gossip::topic_for_presence(community_id); + let _ = handle + .command_tx + .send(NodeCommand::Subscribe { + topic: presence_topic, + }) + .await; + + let namespace = format!("dusk/community/{}", community_id); + let _ = handle + .command_tx + .send(NodeCommand::RegisterRendezvous { namespace }) + .await; + } + + // store the handle + let mut node_handle = state.node_handle.lock().await; + *node_handle = Some(handle); + + Ok(Json(serde_json::json!({ "ok": true }))) +} + +async fn stop_node(State(state): State) -> ApiResult { + let mut node_handle = state.node_handle.lock().await; + if let Some(handle) = node_handle.take() { + let _ = handle.command_tx.send(NodeCommand::Shutdown).await; + let _ = handle.task.await; + } + Ok(Json(serde_json::json!({ "ok": true }))) +} + +async fn get_node_status(State(state): State) -> ApiResult { + let node_handle = state.node_handle.lock().await; + let running = node_handle.is_some(); + Ok(Json(serde_json::json!({ "running": running }))) +} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 73d07f1..dc8b7a4 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -1,5 +1,7 @@ mod commands; mod crdt; +#[cfg(feature = "dev-server")] +mod dev_server; mod node; mod protocol; mod storage; @@ -74,6 +76,23 @@ pub fn run() { .ok(); } } + // launch the dev http server when compiled with the dev-server feature + // available at http://127.0.0.1:3333 (or DUSK_DEV_PORT) + #[cfg(feature = "dev-server")] + { + use tauri::Manager; + let state = app.state::(); + let dev_state = dev_server::DevState { + identity: std::sync::Arc::clone(&state.identity), + crdt_engine: std::sync::Arc::clone(&state.crdt_engine), + storage: std::sync::Arc::clone(&state.storage), + node_handle: std::sync::Arc::clone(&state.node_handle), + voice_channels: std::sync::Arc::clone(&state.voice_channels), + app_handle: app.handle().clone(), + }; + tauri::async_runtime::spawn(dev_server::start(dev_state)); + } + Ok(()) }) .invoke_handler(tauri::generate_handler![