feat: implement pending join role management for community invites
This commit is contained in:
parent
11a987e0de
commit
70377f13b8
|
|
@ -32,6 +32,7 @@ pub async fn start_node(app: tauri::AppHandle, state: State<'_, AppState>) -> Re
|
||||||
state.storage.clone(),
|
state.storage.clone(),
|
||||||
app,
|
app,
|
||||||
state.voice_channels.clone(),
|
state.voice_channels.clone(),
|
||||||
|
state.pending_join_role_guard.clone(),
|
||||||
custom_relay,
|
custom_relay,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
||||||
|
|
@ -177,19 +177,38 @@ pub async fn join_community(
|
||||||
) -> Result<CommunityMeta, String> {
|
) -> Result<CommunityMeta, String> {
|
||||||
let invite = crate::protocol::community::InviteCode::decode(&invite_code)?;
|
let invite = crate::protocol::community::InviteCode::decode(&invite_code)?;
|
||||||
|
|
||||||
let identity = state.identity.lock().await;
|
let local_peer_id = {
|
||||||
if identity.is_none() {
|
let identity = state.identity.lock().await;
|
||||||
return Err("no identity loaded".to_string());
|
let id = identity.as_ref().ok_or("no identity loaded")?;
|
||||||
}
|
id.peer_id.to_string()
|
||||||
drop(identity);
|
};
|
||||||
|
|
||||||
// create a placeholder document that will be backfilled via crdt sync
|
// create a placeholder document that will be backfilled via crdt sync
|
||||||
// once we connect to existing community members through the relay
|
// once we connect to existing community members through the relay
|
||||||
let mut engine = state.crdt_engine.lock().await;
|
let mut engine = state.crdt_engine.lock().await;
|
||||||
if !engine.has_community(&invite.community_id) {
|
let had_existing_doc = engine.has_community(&invite.community_id);
|
||||||
|
if !had_existing_doc {
|
||||||
engine.create_placeholder_community(&invite.community_id, &invite.community_name, "")?;
|
engine.create_placeholder_community(&invite.community_id, &invite.community_name, "")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// joining via invite must never keep elevated local roles from stale local docs
|
||||||
|
if had_existing_doc {
|
||||||
|
if let Ok(members) = engine.get_members(&invite.community_id) {
|
||||||
|
let local_has_elevated_role = members.iter().any(|member| {
|
||||||
|
member.peer_id == local_peer_id
|
||||||
|
&& member
|
||||||
|
.roles
|
||||||
|
.iter()
|
||||||
|
.any(|role| role == "owner" || role == "admin")
|
||||||
|
});
|
||||||
|
|
||||||
|
if local_has_elevated_role {
|
||||||
|
let roles = vec!["member".to_string()];
|
||||||
|
let _ = engine.set_member_role(&invite.community_id, &local_peer_id, &roles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let meta = engine.get_community_meta(&invite.community_id)?;
|
let meta = engine.get_community_meta(&invite.community_id)?;
|
||||||
let _ = state.storage.save_community_meta(&meta);
|
let _ = state.storage.save_community_meta(&meta);
|
||||||
|
|
||||||
|
|
@ -199,6 +218,12 @@ pub async fn join_community(
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
drop(engine);
|
drop(engine);
|
||||||
|
|
||||||
|
// mark this community for one-time role hardening on first sync merge
|
||||||
|
{
|
||||||
|
let mut guard = state.pending_join_role_guard.lock().await;
|
||||||
|
guard.insert(invite.community_id.clone());
|
||||||
|
}
|
||||||
|
|
||||||
let node_handle = state.node_handle.lock().await;
|
let node_handle = state.node_handle.lock().await;
|
||||||
if let Some(ref handle) = *node_handle {
|
if let Some(ref handle) = *node_handle {
|
||||||
// subscribe to the community presence topic
|
// subscribe to the community presence topic
|
||||||
|
|
@ -318,6 +343,10 @@ pub async fn leave_community(
|
||||||
// remove local cached community state so leave persists across restarts
|
// remove local cached community state so leave persists across restarts
|
||||||
let mut engine = state.crdt_engine.lock().await;
|
let mut engine = state.crdt_engine.lock().await;
|
||||||
engine.remove_community(&community_id)?;
|
engine.remove_community(&community_id)?;
|
||||||
|
drop(engine);
|
||||||
|
|
||||||
|
let mut guard = state.pending_join_role_guard.lock().await;
|
||||||
|
guard.remove(&community_id);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -397,6 +397,11 @@ pub async fn reset_identity(state: State<'_, AppState>) -> Result<(), String> {
|
||||||
engine.clear();
|
engine.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut guard = state.pending_join_role_guard.lock().await;
|
||||||
|
guard.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// clear in-memory identity
|
// clear in-memory identity
|
||||||
*identity = None;
|
*identity = None;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@
|
||||||
//
|
//
|
||||||
// NEVER enable this in production builds.
|
// NEVER enable this in production builds.
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
|
@ -38,6 +38,7 @@ pub struct DevState {
|
||||||
pub storage: Arc<DiskStorage>,
|
pub storage: Arc<DiskStorage>,
|
||||||
pub node_handle: Arc<Mutex<Option<crate::node::NodeHandle>>>,
|
pub node_handle: Arc<Mutex<Option<crate::node::NodeHandle>>>,
|
||||||
pub voice_channels: Arc<Mutex<HashMap<String, Vec<VoiceParticipant>>>>,
|
pub voice_channels: Arc<Mutex<HashMap<String, Vec<VoiceParticipant>>>>,
|
||||||
|
pub pending_join_role_guard: Arc<Mutex<HashSet<String>>>,
|
||||||
pub app_handle: tauri::AppHandle,
|
pub app_handle: tauri::AppHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -466,22 +467,40 @@ async fn join_community(
|
||||||
let invite = crate::protocol::community::InviteCode::decode(&body.invite_code)
|
let invite = crate::protocol::community::InviteCode::decode(&body.invite_code)
|
||||||
.map_err(|e| ApiError(StatusCode::BAD_REQUEST, e))?;
|
.map_err(|e| ApiError(StatusCode::BAD_REQUEST, e))?;
|
||||||
|
|
||||||
let identity = state.identity.lock().await;
|
let local_peer_id = {
|
||||||
if identity.is_none() {
|
let identity = state.identity.lock().await;
|
||||||
return Err(ApiError(
|
let id = identity
|
||||||
StatusCode::UNAUTHORIZED,
|
.as_ref()
|
||||||
"no identity loaded".into(),
|
.ok_or_else(|| ApiError(StatusCode::UNAUTHORIZED, "no identity loaded".into()))?;
|
||||||
));
|
id.peer_id.to_string()
|
||||||
}
|
};
|
||||||
drop(identity);
|
|
||||||
|
|
||||||
let mut engine = state.crdt_engine.lock().await;
|
let mut engine = state.crdt_engine.lock().await;
|
||||||
if !engine.has_community(&invite.community_id) {
|
let had_existing_doc = engine.has_community(&invite.community_id);
|
||||||
|
if !had_existing_doc {
|
||||||
engine
|
engine
|
||||||
.create_placeholder_community(&invite.community_id, &invite.community_name, "")
|
.create_placeholder_community(&invite.community_id, &invite.community_name, "")
|
||||||
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// joining via invite must never keep elevated local roles from stale local docs
|
||||||
|
if had_existing_doc {
|
||||||
|
if let Ok(members) = engine.get_members(&invite.community_id) {
|
||||||
|
let local_has_elevated_role = members.iter().any(|member| {
|
||||||
|
member.peer_id == local_peer_id
|
||||||
|
&& member
|
||||||
|
.roles
|
||||||
|
.iter()
|
||||||
|
.any(|role| role == "owner" || role == "admin")
|
||||||
|
});
|
||||||
|
|
||||||
|
if local_has_elevated_role {
|
||||||
|
let roles = vec!["member".to_string()];
|
||||||
|
let _ = engine.set_member_role(&invite.community_id, &local_peer_id, &roles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let meta = engine
|
let meta = engine
|
||||||
.get_community_meta(&invite.community_id)
|
.get_community_meta(&invite.community_id)
|
||||||
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||||
|
|
@ -492,6 +511,12 @@ async fn join_community(
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
drop(engine);
|
drop(engine);
|
||||||
|
|
||||||
|
// mark this community for one-time role hardening on first sync merge
|
||||||
|
{
|
||||||
|
let mut guard = state.pending_join_role_guard.lock().await;
|
||||||
|
guard.insert(invite.community_id.clone());
|
||||||
|
}
|
||||||
|
|
||||||
// subscribe and discover via rendezvous
|
// subscribe and discover via rendezvous
|
||||||
let node_handle = state.node_handle.lock().await;
|
let node_handle = state.node_handle.lock().await;
|
||||||
if let Some(ref handle) = *node_handle {
|
if let Some(ref handle) = *node_handle {
|
||||||
|
|
@ -606,6 +631,10 @@ async fn leave_community(
|
||||||
engine
|
engine
|
||||||
.remove_community(&community_id)
|
.remove_community(&community_id)
|
||||||
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
.map_err(|e| ApiError(StatusCode::INTERNAL_SERVER_ERROR, e))?;
|
||||||
|
drop(engine);
|
||||||
|
|
||||||
|
let mut guard = state.pending_join_role_guard.lock().await;
|
||||||
|
guard.remove(&community_id);
|
||||||
|
|
||||||
Ok(Json(serde_json::json!({ "ok": true })))
|
Ok(Json(serde_json::json!({ "ok": true })))
|
||||||
}
|
}
|
||||||
|
|
@ -1052,6 +1081,7 @@ async fn start_node(State(state): State<DevState>) -> ApiResult<serde_json::Valu
|
||||||
state.storage.clone(),
|
state.storage.clone(),
|
||||||
state.app_handle.clone(),
|
state.app_handle.clone(),
|
||||||
state.voice_channels.clone(),
|
state.voice_channels.clone(),
|
||||||
|
state.pending_join_role_guard.clone(),
|
||||||
custom_relay,
|
custom_relay,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ mod protocol;
|
||||||
mod storage;
|
mod storage;
|
||||||
mod verification;
|
mod verification;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
|
@ -24,6 +24,8 @@ pub struct AppState {
|
||||||
pub node_handle: Arc<Mutex<Option<node::NodeHandle>>>,
|
pub node_handle: Arc<Mutex<Option<node::NodeHandle>>>,
|
||||||
// tracks which peers are in which voice channels, keyed by "community_id:channel_id"
|
// tracks which peers are in which voice channels, keyed by "community_id:channel_id"
|
||||||
pub voice_channels: Arc<Mutex<HashMap<String, Vec<VoiceParticipant>>>>,
|
pub voice_channels: Arc<Mutex<HashMap<String, Vec<VoiceParticipant>>>>,
|
||||||
|
// communities joined via invite that require initial role hardening
|
||||||
|
pub pending_join_role_guard: Arc<Mutex<HashSet<String>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
|
|
@ -44,6 +46,7 @@ impl AppState {
|
||||||
storage,
|
storage,
|
||||||
node_handle: Arc::new(Mutex::new(None)),
|
node_handle: Arc::new(Mutex::new(None)),
|
||||||
voice_channels: Arc::new(Mutex::new(HashMap::new())),
|
voice_channels: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
pending_join_role_guard: Arc::new(Mutex::new(HashSet::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -89,6 +92,7 @@ pub fn run() {
|
||||||
storage: std::sync::Arc::clone(&state.storage),
|
storage: std::sync::Arc::clone(&state.storage),
|
||||||
node_handle: std::sync::Arc::clone(&state.node_handle),
|
node_handle: std::sync::Arc::clone(&state.node_handle),
|
||||||
voice_channels: std::sync::Arc::clone(&state.voice_channels),
|
voice_channels: std::sync::Arc::clone(&state.voice_channels),
|
||||||
|
pending_join_role_guard: std::sync::Arc::clone(&state.pending_join_role_guard),
|
||||||
app_handle: app.handle().clone(),
|
app_handle: app.handle().clone(),
|
||||||
};
|
};
|
||||||
tauri::async_runtime::spawn(dev_server::start(dev_state));
|
tauri::async_runtime::spawn(dev_server::start(dev_state));
|
||||||
|
|
|
||||||
|
|
@ -245,6 +245,7 @@ pub async fn start(
|
||||||
storage: Arc<crate::storage::DiskStorage>,
|
storage: Arc<crate::storage::DiskStorage>,
|
||||||
app_handle: tauri::AppHandle,
|
app_handle: tauri::AppHandle,
|
||||||
voice_channels: VoiceChannelMap,
|
voice_channels: VoiceChannelMap,
|
||||||
|
pending_join_role_guard: Arc<Mutex<HashSet<String>>>,
|
||||||
custom_relay_addr: Option<String>,
|
custom_relay_addr: Option<String>,
|
||||||
) -> Result<NodeHandle, String> {
|
) -> Result<NodeHandle, String> {
|
||||||
let mut swarm_instance =
|
let mut swarm_instance =
|
||||||
|
|
@ -390,10 +391,61 @@ pub async fn start(
|
||||||
} else {
|
} else {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
};
|
};
|
||||||
|
let mut corrected_local_role = false;
|
||||||
|
let mut corrected_doc_bytes: Option<Vec<u8>> = None;
|
||||||
|
if merge_result.is_ok() {
|
||||||
|
let should_harden_join_role = {
|
||||||
|
let guard = pending_join_role_guard.lock().await;
|
||||||
|
guard.contains(&community_id)
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_harden_join_role {
|
||||||
|
let local_peer_id = swarm_instance.local_peer_id().to_string();
|
||||||
|
let local_has_elevated_role = engine
|
||||||
|
.get_members(&community_id)
|
||||||
|
.map(|members| {
|
||||||
|
members.iter().any(|member| {
|
||||||
|
member.peer_id == local_peer_id
|
||||||
|
&& member.roles.iter().any(|role| role == "owner" || role == "admin")
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
if local_has_elevated_role {
|
||||||
|
let roles = vec!["member".to_string()];
|
||||||
|
if engine.set_member_role(&community_id, &local_peer_id, &roles).is_ok() {
|
||||||
|
corrected_local_role = true;
|
||||||
|
corrected_doc_bytes = engine.get_doc_bytes(&community_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut guard = pending_join_role_guard.lock().await;
|
||||||
|
guard.remove(&community_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
drop(engine);
|
drop(engine);
|
||||||
|
|
||||||
match merge_result {
|
match merge_result {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
|
if let Some(doc_bytes) = corrected_doc_bytes {
|
||||||
|
let corrected_snapshot = crate::crdt::sync::DocumentSnapshot {
|
||||||
|
community_id: community_id.clone(),
|
||||||
|
doc_bytes,
|
||||||
|
};
|
||||||
|
let corrected_offer = crate::crdt::sync::SyncMessage::DocumentOffer(corrected_snapshot);
|
||||||
|
if let Ok(data) = serde_json::to_vec(&corrected_offer) {
|
||||||
|
let sync_topic = libp2p::gossipsub::IdentTopic::new(gossip::topic_for_sync());
|
||||||
|
let _ = swarm_instance.behaviour_mut().gossipsub.publish(sync_topic, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if corrected_local_role {
|
||||||
|
log::warn!(
|
||||||
|
"downgraded local elevated role to member during invite join sync for {}",
|
||||||
|
community_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// keep topic subscriptions aligned with merged channels
|
// keep topic subscriptions aligned with merged channels
|
||||||
let presence_topic = libp2p::gossipsub::IdentTopic::new(
|
let presence_topic = libp2p::gossipsub::IdentTopic::new(
|
||||||
gossip::topic_for_presence(&community_id),
|
gossip::topic_for_presence(&community_id),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue