1use serde::Serialize;
3use zoe_wire_protocol::{KeyPair, MessageId, VerifyingKey};
4use zoe_app_primitives::{GroupInfo, IdentityRef};
8use std::{collections::HashMap, sync::Arc};
10
11use zoe_wire_protocol::{Kind, Message, MessageFull, Tag};
12
13use crate::{
14 error::{GroupError, GroupResult},
15 state::GroupSession,
16 state::encrypt_group_event_content,
17};
18use zoe_app_primitives::{GroupActivityEvent, GroupKeyInfo, events::roles::GroupRole};
19
20use async_broadcast::{Receiver, Sender};
21use tokio::sync::RwLock;
22
23use zoe_app_primitives::{GroupState, GroupStateError};
25use zoe_wire_protocol::{Content, EncryptionKey, MnemonicPhrase};
26
27#[cfg(feature = "frb-api")]
28use flutter_rust_bridge::frb;
29
30#[cfg_attr(feature = "frb-api", frb(ignore))]
31#[derive(Debug, Clone)]
32pub enum GroupDataUpdate {
33 GroupAdded(GroupSession),
34 GroupUpdated(GroupSession),
35 GroupRemoved(GroupSession),
36}
37
38#[cfg_attr(feature = "frb-api", frb(opaque))]
40#[derive(Debug, Clone)]
41pub struct GroupManager {
42 pub(crate) groups: Arc<RwLock<HashMap<MessageId, GroupSession>>>,
45
46 broadcast_channel: Arc<Sender<GroupDataUpdate>>,
47 _broadcast_keeper: Arc<async_broadcast::InactiveReceiver<GroupDataUpdate>>,
50}
51
52#[cfg_attr(feature = "frb-api", frb(ignore))]
54#[derive(Debug, Clone)]
55pub struct CreateGroupResult {
56 pub group_id: MessageId,
59 pub message: MessageFull,
61}
62pub struct GroupManagerBuilder {
63 sessions: Vec<GroupSession>,
64}
65
66impl GroupManagerBuilder {
67 pub fn with_sessions(mut self, sessions: Vec<GroupSession>) -> Self {
68 self.sessions = sessions;
69 self
70 }
71
72 pub fn build(self) -> GroupManager {
73 let GroupManagerBuilder { sessions } = self;
74 let (tx, rx) = async_broadcast::broadcast(1000);
75 let broadcast_keeper = rx.deactivate();
76 GroupManager {
77 groups: Arc::new(RwLock::new(HashMap::from_iter(
78 sessions
79 .into_iter()
80 .map(|session| (session.state.group_id, session)),
81 ))),
82 broadcast_channel: Arc::new(tx),
83 _broadcast_keeper: Arc::new(broadcast_keeper),
84 }
85 }
86}
87
88#[cfg_attr(feature = "frb-api", frb)]
89impl GroupManager {
90 pub fn generate_group_key(timestamp: u64) -> EncryptionKey {
92 EncryptionKey::generate(timestamp)
93 }
94 pub async fn group_state(&self, group_id: &MessageId) -> Option<GroupState> {
96 let groups = self.groups.read().await;
97 groups.get(group_id).map(|session| session.state.clone())
98 }
99 pub async fn group_session(&self, group_id: &MessageId) -> Option<GroupSession> {
101 let groups = self.groups.read().await;
102 groups.get(group_id).cloned()
103 }
104
105 pub async fn all_group_sessions(&self) -> HashMap<MessageId, GroupSession> {
107 let groups = self.groups.read().await;
108 groups.clone()
109 }
110
111 pub async fn all_groups(&self) -> HashMap<MessageId, GroupState> {
113 let groups = self.groups.read().await;
114 groups
115 .iter()
116 .map(|(id, session)| (*id, session.state.clone()))
117 .collect()
118 }
119
120 pub async fn is_member(&self, group_id: &MessageId, user: &VerifyingKey) -> bool {
122 let groups = self.groups.read().await;
123 groups
124 .get(group_id)
125 .map(|session| session.state.is_member(user))
126 .unwrap_or(false)
127 }
128
129 pub async fn member_role(
131 &self,
132 group_id: &MessageId,
133 user: &VerifyingKey,
134 ) -> Option<GroupRole> {
135 let groups = self.groups.read().await;
136 groups
137 .get(group_id)
138 .and_then(|session| session.state.member_role(user).cloned())
139 }
140}
141
142#[cfg_attr(feature = "frb-api", frb(ignore))]
143impl GroupManager {
144 pub fn builder() -> GroupManagerBuilder {
146 GroupManagerBuilder { sessions: vec![] }
147 }
148
149 pub fn create_key_from_mnemonic(
151 mnemonic: &MnemonicPhrase,
152 passphrase: &str,
153 group_name: &str,
154 timestamp: u64,
155 ) -> GroupResult<EncryptionKey> {
156 let context = format!("dga-group-{group_name}");
157
158 EncryptionKey::from_mnemonic(mnemonic, passphrase, &context, timestamp)
159 .map_err(|e| GroupError::CryptoError(format!("Key derivation failed: {e}")))
160 }
161
162 pub fn recover_key_from_mnemonic(
164 mnemonic: &MnemonicPhrase,
165 passphrase: &str,
166 group_name: &str,
167 salt: &[u8; 32],
168 timestamp: u64,
169 ) -> GroupResult<EncryptionKey> {
170 let context = format!("dga-group-{group_name}");
171
172 EncryptionKey::from_mnemonic_with_salt(mnemonic, passphrase, &context, salt, timestamp)
173 .map_err(|e| GroupError::CryptoError(format!("Key recovery failed: {e}")))
174 }
175
176 pub async fn create_group(
178 &self,
179 create_group: zoe_app_primitives::CreateGroup,
180 encryption_key: Option<EncryptionKey>,
181 creator: &KeyPair,
182 timestamp: u64,
183 ) -> GroupResult<CreateGroupResult> {
184 let encryption_key = encryption_key.unwrap_or_else(|| Self::generate_group_key(timestamp));
186
187 let mut group_info = create_group.into_group_info();
189
190 group_info.key_info = GroupKeyInfo::ChaCha20Poly1305 {
192 key_id: encryption_key.key_id.clone(),
193 derivation_info: encryption_key.derivation_info.clone().unwrap_or_else(|| {
194 zoe_wire_protocol::crypto::KeyDerivationInfo {
196 method: zoe_wire_protocol::crypto::KeyDerivationMethod::ChaCha20Poly1305Keygen,
197 salt: vec![],
198 argon2_params: zoe_wire_protocol::crypto::Argon2Params::default(),
199 context: "dga-group-key".to_string(),
200 }
201 }),
202 };
203
204 let event: GroupActivityEvent<()> = GroupActivityEvent::UpdateGroup(group_info.clone());
205
206 let encrypted_payload = encrypt_group_event_content(&encryption_key, &event)?;
208
209 let message = Message::new_v0_encrypted(
211 encrypted_payload,
212 creator.public_key(),
213 timestamp,
214 Kind::Regular, vec![], );
217
218 let message_full = MessageFull::new(message, creator)?;
220 let group_id = message_full.id(); let group_state = GroupState::new(
224 *group_id,
225 group_info.name.clone(),
226 group_info.settings.clone(),
227 group_info.metadata.clone(),
228 creator.public_key(),
229 timestamp,
230 );
231
232 let group_session = GroupSession::new(group_state, encryption_key);
234
235 self.groups
237 .write()
238 .await
239 .insert(*group_id, group_session.clone());
240
241 if let Err(e) = self
243 .broadcast_channel
244 .try_broadcast(GroupDataUpdate::GroupAdded(group_session))
245 {
246 tracing::error!(error=?e, "Failed to broadcast group addition");
247 }
248
249 Ok(CreateGroupResult {
250 group_id: *group_id,
251 message: message_full,
252 })
253 }
254
255 pub async fn create_group_event_message<T>(
258 &self,
259 group_id: MessageId,
260 event: GroupActivityEvent<T>,
261 sender: &KeyPair,
262 timestamp: u64,
263 ) -> GroupResult<MessageFull>
264 where
265 T: Serialize,
266 {
267 let groups = self.groups.read().await;
269 let group_session = groups
270 .get(&group_id)
271 .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
272
273 let encrypted_payload = group_session.encrypt_group_event_content(&event)?;
275
276 let message = Message::new_v0_encrypted(
278 encrypted_payload,
279 sender.public_key(),
280 timestamp,
281 Kind::Regular,
282 vec![Tag::Event {
283 id: group_id, relays: vec![], }],
286 );
287
288 Ok(MessageFull::new(message, sender)?)
290 }
291
292 pub async fn process_group_event(&self, message_full: &MessageFull) -> GroupResult<()> {
294 let Message::MessageV0(message) = message_full.message();
296
297 let Content::ChaCha20Poly1305(encrypted_payload) = &message.content else {
299 return Err(GroupError::InvalidEvent(
300 "Message is not a ChaCha20Poly1305 encrypted message".to_string(),
301 ));
302 };
303
304 let sender = &message.sender;
305 let timestamp = message.when;
306
307 let (group_id, event) = if message.tags.is_empty() {
309 let group_id = message_full.id();
311
312 let groups = self.groups.read().await;
314 let group_session = groups.get(group_id).ok_or_else(|| {
315 GroupError::InvalidEvent(format!(
316 "No group session available for group {group_id:?}"
317 ))
318 })?;
319
320 let event = group_session.decrypt_group_event::<()>(encrypted_payload)?;
321 (*group_id, event)
322 } else {
323 let group_id = self.find_group_by_event_tag(&message.tags).await?;
325
326 let groups = self.groups.read().await;
328 let group_session = groups.get(&group_id).ok_or_else(|| {
329 GroupError::InvalidEvent(format!(
330 "No group session available for group {group_id:?}"
331 ))
332 })?;
333
334 let event = group_session.decrypt_group_event(encrypted_payload)?;
335 (group_id, event)
336 };
337
338 if let GroupActivityEvent::UpdateGroup(group_info) = &event {
340 let mut groups = self.groups.write().await;
343 if let Some(group_session) = groups.get_mut(&group_id) {
344 group_session.state = GroupState::new(
346 group_id,
347 group_info.name.clone(),
348 group_info.settings.clone(),
349 group_info.metadata.clone(),
350 sender.clone(),
351 timestamp,
352 );
353
354 let _ = self
356 .broadcast_channel
357 .try_broadcast(GroupDataUpdate::GroupUpdated(group_session.clone()));
358 }
359 return Ok(());
360 }
361
362 let mut groups = self.groups.write().await;
364 let group_session = groups
365 .get_mut(&group_id)
366 .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
367
368 group_session
370 .state
371 .apply_event(&event, *message_full.id(), sender.clone(), timestamp)
372 .map_err(|e| match e {
373 GroupStateError::PermissionDenied(msg) => GroupError::PermissionDenied(msg),
374 GroupStateError::MemberNotFound { member, group } => {
375 GroupError::MemberNotFound { member, group }
376 }
377 GroupStateError::StateTransition(msg) => GroupError::StateTransition(msg),
378 GroupStateError::InvalidOperation(msg) => GroupError::InvalidOperation(msg),
379 })?;
380
381 let _ = self
383 .broadcast_channel
384 .try_broadcast(GroupDataUpdate::GroupUpdated(group_session.clone()));
385
386 Ok(())
387 }
388
389 async fn find_group_by_event_tag(&self, tags: &[Tag]) -> GroupResult<MessageId> {
391 let groups = self.groups.read().await;
392 for tag in tags {
393 if let Tag::Event { id, .. } = tag
394 && groups.contains_key(id)
395 {
396 return Ok(*id);
397 }
398 }
399 Err(GroupError::InvalidEvent(
400 "No valid group channel tag found".to_string(),
401 ))
402 }
403
404 pub async fn user_groups(&self, user: &VerifyingKey) -> Vec<GroupState> {
406 let groups = self.groups.read().await;
407 groups
408 .values()
409 .filter(|session| session.state.is_member(user))
410 .map(|session| session.state.clone())
411 .collect()
412 }
413
414 pub async fn add_group_session(&self, group_id: MessageId, session: GroupSession) {
416 self.groups.write().await.insert(group_id, session.clone());
417 let _ = self
418 .broadcast_channel
419 .try_broadcast(GroupDataUpdate::GroupAdded(session));
420 }
421
422 pub async fn remove_group_session(&self, group_id: &MessageId) -> Option<GroupSession> {
424 if let Some(session) = self.groups.write().await.remove(group_id) {
425 let _ = self
426 .broadcast_channel
427 .try_broadcast(GroupDataUpdate::GroupRemoved(session.clone()));
428 Some(session)
429 } else {
430 None
431 }
432 }
433
434 pub async fn rotate_group_key(
436 &self,
437 group_id: &MessageId,
438 new_key: EncryptionKey,
439 ) -> GroupResult<()> {
440 let mut groups = self.groups.write().await;
441 let session = groups
442 .get_mut(group_id)
443 .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
444
445 session.rotate_key(new_key);
446 let _ = self
447 .broadcast_channel
448 .try_broadcast(GroupDataUpdate::GroupUpdated(session.clone()));
449 Ok(())
450 }
451
452 pub fn subscribe_to_updates(&self) -> Receiver<GroupDataUpdate> {
454 self.broadcast_channel.new_receiver()
455 }
456
457 pub async fn create_group_subscription_filter(&self, group_id: &MessageId) -> GroupResult<Tag> {
460 let groups = self.groups.read().await;
462 if !groups.contains_key(group_id) {
463 return Err(GroupError::GroupNotFound(format!("{group_id:?}")));
464 }
465
466 Ok(Tag::Event {
467 id: *group_id, relays: vec![], })
470 }
471}
472
473impl Default for GroupManager {
474 fn default() -> Self {
475 Self::builder().build()
476 }
477}
478
479pub fn create_leave_group_event<T>(message: Option<String>) -> GroupActivityEvent<T> {
483 GroupActivityEvent::LeaveGroup { message }
484}
485
486pub fn create_role_update_event<T>(member: VerifyingKey, role: GroupRole) -> GroupActivityEvent<T> {
491 GroupActivityEvent::AssignRole {
493 target: IdentityRef::Key(member),
494 role,
495 }
496}
497
498pub fn create_group_activity_event<T>(activity_data: T) -> GroupActivityEvent<T> {
500 GroupActivityEvent::Activity(activity_data)
501}
502
503pub fn create_group_update_event<T>(group_info: GroupInfo) -> GroupActivityEvent<T> {
505 GroupActivityEvent::UpdateGroup(group_info)
506}