app/src-tauri/src/commands/community.rs

898 lines
28 KiB
Rust

use std::time::{SystemTime, UNIX_EPOCH};
use sha2::{Digest, Sha256};
use tauri::State;
use super::ipc_log;
use crate::crdt::sync::{DocumentSnapshot, SyncMessage};
use crate::node::gossip;
use crate::node::NodeCommand;
use crate::protocol::community::{CategoryMeta, ChannelKind, ChannelMeta, CommunityMeta, Member};
use crate::protocol::messages::PeerStatus;
use crate::AppState;
// check if the requester has one of the required roles in the community
fn check_permission(
members: &[Member],
requester_id: &str,
required_roles: &[&str],
) -> Result<(), String> {
let requester = members
.iter()
.find(|m| m.peer_id == requester_id)
.ok_or("requester not found in community")?;
let has_permission = requester
.roles
.iter()
.any(|r| required_roles.contains(&r.as_str()));
if !has_permission {
return Err("insufficient permissions".to_string());
}
Ok(())
}
// helper to broadcast a crdt change to peers via the sync topic
async fn broadcast_sync(state: &State<'_, AppState>, community_id: &str) {
let doc_bytes = {
let mut engine = state.crdt_engine.lock().await;
engine.get_doc_bytes(community_id)
};
let Some(doc_bytes) = doc_bytes else {
return;
};
let sync_msg = SyncMessage::DocumentOffer(DocumentSnapshot {
community_id: community_id.to_string(),
doc_bytes,
});
let data = match serde_json::to_vec(&sync_msg) {
Ok(data) => data,
Err(_) => return,
};
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let _ = handle
.command_tx
.send(NodeCommand::SendMessage {
topic: gossip::topic_for_sync(),
data,
})
.await;
}
}
// request a full sync from currently connected peers
async fn request_sync(state: &State<'_, AppState>) {
let peer_id = {
let identity = state.identity.lock().await;
let Some(id) = identity.as_ref() else {
return;
};
id.peer_id.to_string()
};
let sync_msg = SyncMessage::RequestSync { peer_id };
let data = match serde_json::to_vec(&sync_msg) {
Ok(data) => data,
Err(_) => return,
};
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let _ = handle
.command_tx
.send(NodeCommand::SendMessage {
topic: gossip::topic_for_sync(),
data,
})
.await;
}
}
#[tauri::command]
pub async fn create_community(
state: State<'_, AppState>,
name: String,
description: String,
) -> Result<CommunityMeta, String> {
ipc_log!("create_community", {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
// generate a deterministic community id from name + creator + timestamp
let mut hasher = Sha256::new();
hasher.update(name.as_bytes());
hasher.update(id.peer_id.to_bytes());
hasher.update(now.to_le_bytes());
let hash = hasher.finalize();
let community_id = format!("com_{}", &hex::encode(hash)[..16]);
let peer_id_str = id.peer_id.to_string();
drop(identity);
let mut engine = state.crdt_engine.lock().await;
engine.create_community(&community_id, &name, &description, &peer_id_str)?;
let meta = engine.get_community_meta(&community_id)?;
// save metadata cache
let _ = state.storage.save_community_meta(&meta);
drop(engine);
// subscribe to community topics on the p2p node
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let presence_topic = gossip::topic_for_presence(&community_id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe {
topic: presence_topic,
})
.await;
// subscribe to the default general channel
let engine = state.crdt_engine.lock().await;
if let Ok(channels) = engine.get_channels(&community_id) {
for channel in &channels {
let msg_topic = gossip::topic_for_messages(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe { topic: msg_topic })
.await;
let typing_topic = gossip::topic_for_typing(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe {
topic: typing_topic,
})
.await;
}
}
// register on rendezvous so peers joining via invite can discover us
let namespace = format!("dusk/community/{}", community_id);
let _ = handle
.command_tx
.send(NodeCommand::RegisterRendezvous { namespace })
.await;
}
Ok(meta)
})
}
#[tauri::command]
pub async fn join_community(
state: State<'_, AppState>,
invite_code: String,
) -> Result<CommunityMeta, String> {
ipc_log!("join_community", {
let invite = crate::protocol::community::InviteCode::decode(&invite_code)?;
let local_peer_id = {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
id.peer_id.to_string()
};
// create a placeholder document that will be backfilled via crdt sync
// once we connect to existing community members through the relay
let mut engine = state.crdt_engine.lock().await;
let had_existing_doc = engine.has_community(&invite.community_id);
if !had_existing_doc {
engine.create_placeholder_community(&invite.community_id, &invite.community_name, "")?;
}
// joining via invite must never keep elevated local roles from stale local docs
if had_existing_doc {
if let Ok(members) = engine.get_members(&invite.community_id) {
let local_has_elevated_role = members.iter().any(|member| {
member.peer_id == local_peer_id
&& member
.roles
.iter()
.any(|role| role == "owner" || role == "admin")
});
if local_has_elevated_role {
let roles = vec!["member".to_string()];
let _ = engine.set_member_role(&invite.community_id, &local_peer_id, &roles);
}
}
}
let meta = engine.get_community_meta(&invite.community_id)?;
let _ = state.storage.save_community_meta(&meta);
// subscribe to gossipsub topics so we receive messages
let channels = engine
.get_channels(&invite.community_id)
.unwrap_or_default();
drop(engine);
// mark this community for one-time role hardening on first sync merge
{
let mut guard = state.pending_join_role_guard.lock().await;
guard.insert(invite.community_id.clone());
}
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
// subscribe to the community presence topic
let presence_topic = gossip::topic_for_presence(&invite.community_id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe {
topic: presence_topic,
})
.await;
// subscribe to all channel topics
for channel in &channels {
let msg_topic = gossip::topic_for_messages(&invite.community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe { topic: msg_topic })
.await;
let typing_topic = gossip::topic_for_typing(&invite.community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe {
topic: typing_topic,
})
.await;
}
// register on rendezvous so existing members can find us
let namespace = format!("dusk/community/{}", invite.community_id);
let _ = handle
.command_tx
.send(NodeCommand::RegisterRendezvous {
namespace: namespace.clone(),
})
.await;
// discover existing members through rendezvous
let _ = handle
.command_tx
.send(NodeCommand::DiscoverRendezvous { namespace })
.await;
}
// request a snapshot now so joins work even when peers were already connected
request_sync(&state).await;
Ok(meta)
})
}
#[tauri::command]
pub async fn leave_community(
state: State<'_, AppState>,
community_id: String,
) -> Result<(), String> {
ipc_log!("leave_community", {
let local_peer_id = {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
id.peer_id.to_string()
};
// remove local user from the shared member list before leaving
let mut removed_self = false;
let channels = {
let mut engine = state.crdt_engine.lock().await;
let channels = engine.get_channels(&community_id).unwrap_or_default();
if let Ok(members) = engine.get_members(&community_id) {
if members.iter().any(|member| member.peer_id == local_peer_id) {
if engine.remove_member(&community_id, &local_peer_id).is_ok() {
removed_self = true;
}
}
}
channels
};
if removed_self {
broadcast_sync(&state, &community_id).await;
}
// unsubscribe from all community topics and stop advertising this namespace
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
for channel in &channels {
let msg_topic = gossip::topic_for_messages(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Unsubscribe { topic: msg_topic })
.await;
let typing_topic = gossip::topic_for_typing(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Unsubscribe {
topic: typing_topic,
})
.await;
}
let presence_topic = gossip::topic_for_presence(&community_id);
let _ = handle
.command_tx
.send(NodeCommand::Unsubscribe {
topic: presence_topic,
})
.await;
let namespace = format!("dusk/community/{}", community_id);
let _ = handle
.command_tx
.send(NodeCommand::UnregisterRendezvous { namespace })
.await;
}
// remove local cached community state so leave persists across restarts
let mut engine = state.crdt_engine.lock().await;
engine.remove_community(&community_id)?;
drop(engine);
let mut guard = state.pending_join_role_guard.lock().await;
guard.remove(&community_id);
Ok(())
})
}
#[tauri::command]
pub async fn get_communities(state: State<'_, AppState>) -> Result<Vec<CommunityMeta>, String> {
ipc_log!("get_communities", {
let engine = state.crdt_engine.lock().await;
let mut communities = Vec::new();
for id in engine.community_ids() {
if let Ok(meta) = engine.get_community_meta(&id) {
communities.push(meta);
}
}
Ok(communities)
})
}
#[tauri::command]
pub async fn create_channel(
state: State<'_, AppState>,
community_id: String,
name: String,
topic: String,
kind: Option<String>,
category_id: Option<String>,
) -> Result<ChannelMeta, String> {
ipc_log!("create_channel", {
let mut hasher = Sha256::new();
hasher.update(community_id.as_bytes());
hasher.update(name.as_bytes());
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
hasher.update(now.to_le_bytes());
let hash = hasher.finalize();
let channel_id = format!("ch_{}", &hex::encode(hash)[..12]);
let channel_kind = match kind.as_deref() {
Some("voice") | Some("Voice") => ChannelKind::Voice,
_ => ChannelKind::Text,
};
let channel = ChannelMeta {
id: channel_id,
community_id: community_id.clone(),
name,
topic,
kind: channel_kind,
position: 0,
category_id,
};
let mut engine = state.crdt_engine.lock().await;
engine.create_channel(&community_id, &channel)?;
drop(engine);
// subscribe to the new channel's topics
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let msg_topic = gossip::topic_for_messages(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe { topic: msg_topic })
.await;
let typing_topic = gossip::topic_for_typing(&community_id, &channel.id);
let _ = handle
.command_tx
.send(NodeCommand::Subscribe {
topic: typing_topic,
})
.await;
}
broadcast_sync(&state, &community_id).await;
Ok(channel)
})
}
#[tauri::command]
pub async fn get_channels(
state: State<'_, AppState>,
community_id: String,
) -> Result<Vec<ChannelMeta>, String> {
ipc_log!("get_channels", {
let engine = state.crdt_engine.lock().await;
engine.get_channels(&community_id)
})
}
#[tauri::command]
pub async fn create_category(
state: State<'_, AppState>,
community_id: String,
name: String,
) -> Result<CategoryMeta, String> {
ipc_log!("create_category", {
let mut hasher = Sha256::new();
hasher.update(community_id.as_bytes());
hasher.update(name.as_bytes());
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
hasher.update(now.to_le_bytes());
let hash = hasher.finalize();
let category_id = format!("cat_{}", &hex::encode(hash)[..12]);
let category = CategoryMeta {
id: category_id,
community_id: community_id.clone(),
name,
position: 0,
};
let mut engine = state.crdt_engine.lock().await;
engine.create_category(&community_id, &category)?;
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(category)
})
}
#[tauri::command]
pub async fn get_categories(
state: State<'_, AppState>,
community_id: String,
) -> Result<Vec<CategoryMeta>, String> {
let engine = state.crdt_engine.lock().await;
engine.get_categories(&community_id)
}
#[tauri::command]
pub async fn get_members(
state: State<'_, AppState>,
community_id: String,
) -> Result<Vec<Member>, String> {
let engine = state.crdt_engine.lock().await;
let mut members = engine.get_members(&community_id)?;
drop(engine);
// overlay the local user's identity so their display name stays current
let identity = state.identity.lock().await;
if let Some(ref id) = *identity {
let local_peer = id.peer_id.to_string();
let found = members.iter_mut().find(|m| m.peer_id == local_peer);
if let Some(member) = found {
member.display_name = id.display_name.clone();
member.status = PeerStatus::Online;
} else {
// local user isn't in the doc yet (shouldn't happen, but be safe)
members.push(Member {
peer_id: local_peer,
display_name: id.display_name.clone(),
status: PeerStatus::Online,
roles: vec!["member".to_string()],
trust_level: 1.0,
joined_at: 0,
});
}
}
Ok(members)
}
#[tauri::command]
pub async fn delete_message(
state: State<'_, AppState>,
community_id: String,
message_id: String,
) -> Result<(), String> {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let peer_id_str = id.peer_id.to_string();
drop(identity);
// verify the user is the message author or has admin rights
let mut engine = state.crdt_engine.lock().await;
let message = engine
.get_message(&community_id, &message_id)?
.ok_or_else(|| format!("message {} not found", message_id))?;
// only allow deletion by the author
if message.author_id != peer_id_str {
return Err("not authorized to delete this message".to_string());
}
engine.delete_message(&community_id, &message_id)?;
drop(engine);
// broadcast the deletion to peers
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
// find the channel for this message to get the correct topic
let engine = state.crdt_engine.lock().await;
if let Ok(channels) = engine.get_channels(&community_id) {
for channel in &channels {
let topic = gossip::topic_for_messages(&community_id, &channel.id);
let deletion = crate::protocol::messages::GossipMessage::DeleteMessage {
message_id: message_id.clone(),
};
if let Ok(data) = serde_json::to_vec(&deletion) {
let _ = handle
.command_tx
.send(NodeCommand::SendMessage { topic, data })
.await;
}
}
}
}
Ok(())
}
#[tauri::command]
pub async fn kick_member(
state: State<'_, AppState>,
community_id: String,
member_peer_id: String,
) -> Result<(), String> {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
// verify the requester has admin rights
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
let requester = members
.iter()
.find(|m| m.peer_id == requester_id)
.ok_or("requester not found in community")?;
let is_admin = requester.roles.iter().any(|r| r == "admin" || r == "owner");
if !is_admin {
return Err("not authorized to kick members".to_string());
}
// cannot kick the owner
let target = members
.iter()
.find(|m| m.peer_id == member_peer_id)
.ok_or("member not found")?;
if target.roles.iter().any(|r| r == "owner") {
return Err("cannot kick the community owner".to_string());
}
drop(engine);
// remove the member from the community
let mut engine = state.crdt_engine.lock().await;
engine.remove_member(&community_id, &member_peer_id)?;
drop(engine);
// broadcast the kick to peers
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let presence_topic = gossip::topic_for_presence(&community_id);
let kick_msg = crate::protocol::messages::GossipMessage::MemberKicked {
peer_id: member_peer_id.clone(),
};
if let Ok(data) = serde_json::to_vec(&kick_msg) {
let _ = handle
.command_tx
.send(NodeCommand::SendMessage {
topic: presence_topic,
data,
})
.await;
}
}
Ok(())
}
#[tauri::command]
pub async fn generate_invite(
state: State<'_, AppState>,
community_id: String,
) -> Result<String, String> {
let engine = state.crdt_engine.lock().await;
let meta = engine.get_community_meta(&community_id)?;
drop(engine);
// invite contains only the community id and name
// no IP addresses or peer addresses are included
// peers discover each other through the relay's rendezvous protocol
let invite = crate::protocol::community::InviteCode {
community_id: meta.id.clone(),
community_name: meta.name.clone(),
};
Ok(invite.encode())
}
#[tauri::command]
pub async fn reorder_channels(
state: State<'_, AppState>,
community_id: String,
channel_ids: Vec<String>,
) -> Result<Vec<ChannelMeta>, String> {
let mut engine = state.crdt_engine.lock().await;
let channels = engine.reorder_channels(&community_id, &channel_ids)?;
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(channels)
}
#[tauri::command]
pub async fn update_community(
state: State<'_, AppState>,
community_id: String,
name: String,
description: String,
) -> Result<CommunityMeta, String> {
if name.len() > 64 {
return Err("community name must be 64 characters or fewer".to_string());
}
if description.len() > 256 {
return Err("description must be 256 characters or fewer".to_string());
}
if name.trim().is_empty() {
return Err("community name cannot be empty".to_string());
}
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner", "admin"])?;
drop(engine);
let mut engine = state.crdt_engine.lock().await;
engine.update_community_meta(&community_id, &name, &description)?;
let meta = engine.get_community_meta(&community_id)?;
let _ = state.storage.save_community_meta(&meta);
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(meta)
}
#[tauri::command]
pub async fn update_channel(
state: State<'_, AppState>,
community_id: String,
channel_id: String,
name: String,
topic: String,
) -> Result<ChannelMeta, String> {
if name.trim().is_empty() {
return Err("channel name cannot be empty".to_string());
}
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner", "admin"])?;
drop(engine);
let mut engine = state.crdt_engine.lock().await;
engine.update_channel(&community_id, &channel_id, &name, &topic)?;
let channels = engine.get_channels(&community_id)?;
drop(engine);
let channel = channels
.into_iter()
.find(|c| c.id == channel_id)
.ok_or("channel not found after update")?;
broadcast_sync(&state, &community_id).await;
Ok(channel)
}
#[tauri::command]
pub async fn delete_channel(
state: State<'_, AppState>,
community_id: String,
channel_id: String,
) -> Result<(), String> {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner", "admin"])?;
drop(engine);
// unsubscribe from channel topics before deletion
let node_handle = state.node_handle.lock().await;
if let Some(ref handle) = *node_handle {
let msg_topic = gossip::topic_for_messages(&community_id, &channel_id);
let _ = handle
.command_tx
.send(NodeCommand::Unsubscribe { topic: msg_topic })
.await;
let typing_topic = gossip::topic_for_typing(&community_id, &channel_id);
let _ = handle
.command_tx
.send(NodeCommand::Unsubscribe {
topic: typing_topic,
})
.await;
}
drop(node_handle);
let mut engine = state.crdt_engine.lock().await;
engine.delete_channel(&community_id, &channel_id)?;
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(())
}
#[tauri::command]
pub async fn delete_category(
state: State<'_, AppState>,
community_id: String,
category_id: String,
) -> Result<(), String> {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner", "admin"])?;
drop(engine);
let mut engine = state.crdt_engine.lock().await;
engine.delete_category(&community_id, &category_id)?;
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(())
}
#[tauri::command]
pub async fn set_member_role(
state: State<'_, AppState>,
community_id: String,
member_peer_id: String,
role: String,
) -> Result<(), String> {
if role != "admin" && role != "member" {
return Err("invalid role: must be 'admin' or 'member'".to_string());
}
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
// only the owner can change roles
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner"])?;
// cannot change the owner's own role through this command
let target = members
.iter()
.find(|m| m.peer_id == member_peer_id)
.ok_or("member not found")?;
if target.roles.iter().any(|r| r == "owner") {
return Err("cannot change the owner's role, use transfer_ownership instead".to_string());
}
drop(engine);
let mut engine = state.crdt_engine.lock().await;
engine.set_member_role(&community_id, &member_peer_id, &[role])?;
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(())
}
#[tauri::command]
pub async fn transfer_ownership(
state: State<'_, AppState>,
community_id: String,
new_owner_peer_id: String,
) -> Result<(), String> {
let identity = state.identity.lock().await;
let id = identity.as_ref().ok_or("no identity loaded")?;
let requester_id = id.peer_id.to_string();
drop(identity);
let engine = state.crdt_engine.lock().await;
let members = engine.get_members(&community_id)?;
check_permission(&members, &requester_id, &["owner"])?;
if requester_id == new_owner_peer_id {
return Err("cannot transfer ownership to yourself".to_string());
}
// verify the target is actually a member
members
.iter()
.find(|m| m.peer_id == new_owner_peer_id)
.ok_or("target member not found in community")?;
drop(engine);
let mut engine = state.crdt_engine.lock().await;
engine.transfer_ownership(&community_id, &requester_id, &new_owner_peer_id)?;
let meta = engine.get_community_meta(&community_id)?;
let _ = state.storage.save_community_meta(&meta);
drop(engine);
broadcast_sync(&state, &community_id).await;
Ok(())
}