zoe_state_machine/
group.rs

1// ChaCha20-Poly1305 and AES-GCM functionality moved to crypto module
2use serde::Serialize;
3use zoe_wire_protocol::{KeyPair, MessageId, VerifyingKey};
4// Random number generation imports removed - no longer needed
5// Temporary import for Ed25519 workaround in create_role_update_event
6
7use zoe_app_primitives::{GroupInfo, IdentityRef};
8// Random number generation moved to wire-protocol crypto module
9use std::{collections::HashMap, sync::Arc};
10
11use zoe_wire_protocol::{Kind, Message, MessageFull, Tag};
12
13use crate::{
14    error::{GroupError, GroupResult},
15    state::GroupSession,
16    state::encrypt_group_event_content,
17};
18use zoe_app_primitives::{GroupActivityEvent, GroupKeyInfo, events::roles::GroupRole};
19
20use async_broadcast::{Receiver, Sender};
21use tokio::sync::RwLock;
22
23// Import the unified GroupState from app-primitives
24use zoe_app_primitives::{GroupState, GroupStateError};
25use zoe_wire_protocol::{Content, EncryptionKey, MnemonicPhrase};
26
27#[cfg(feature = "frb-api")]
28use flutter_rust_bridge::frb;
29
30#[cfg_attr(feature = "frb-api", frb(ignore))]
31#[derive(Debug, Clone)]
32pub enum GroupDataUpdate {
33    GroupAdded(GroupSession),
34    GroupUpdated(GroupSession),
35    GroupRemoved(GroupSession),
36}
37
38/// Digital Group Assistant - manages encrypted groups using the wire protocol
39#[cfg_attr(feature = "frb-api", frb(opaque))]
40#[derive(Debug, Clone)]
41pub struct GroupManager {
42    /// All group states managed by this DGA instance
43    /// Key is the Blake3 hash of the CreateGroup message (which serves as both group ID and root event ID)
44    pub(crate) groups: Arc<RwLock<HashMap<MessageId, GroupSession>>>,
45
46    broadcast_channel: Arc<Sender<GroupDataUpdate>>,
47    /// Keeper receiver to prevent broadcast channel closure (not actively used)
48    /// Arc-wrapped to ensure channel stays open even when GroupManager instances are cloned and dropped
49    _broadcast_keeper: Arc<async_broadcast::InactiveReceiver<GroupDataUpdate>>,
50}
51
52/// Result of creating a new group
53#[cfg_attr(feature = "frb-api", frb(ignore))]
54#[derive(Debug, Clone)]
55pub struct CreateGroupResult {
56    /// The created group's unique identifier (Blake3 hash of the CreateGroup message)
57    /// This is also the root event ID used as channel tag for subsequent events
58    pub group_id: MessageId,
59    /// The full message that was created
60    pub message: MessageFull,
61}
62pub struct GroupManagerBuilder {
63    sessions: Vec<GroupSession>,
64}
65
66impl GroupManagerBuilder {
67    pub fn with_sessions(mut self, sessions: Vec<GroupSession>) -> Self {
68        self.sessions = sessions;
69        self
70    }
71
72    pub fn build(self) -> GroupManager {
73        let GroupManagerBuilder { sessions } = self;
74        let (tx, rx) = async_broadcast::broadcast(1000);
75        let broadcast_keeper = rx.deactivate();
76        GroupManager {
77            groups: Arc::new(RwLock::new(HashMap::from_iter(
78                sessions
79                    .into_iter()
80                    .map(|session| (session.state.group_id, session)),
81            ))),
82            broadcast_channel: Arc::new(tx),
83            _broadcast_keeper: Arc::new(broadcast_keeper),
84        }
85    }
86}
87
88#[cfg_attr(feature = "frb-api", frb)]
89impl GroupManager {
90    /// Generate a new encryption key for a group (ChaCha20-Poly1305)
91    pub fn generate_group_key(timestamp: u64) -> EncryptionKey {
92        EncryptionKey::generate(timestamp)
93    }
94    /// Get a group's current state
95    pub async fn group_state(&self, group_id: &MessageId) -> Option<GroupState> {
96        let groups = self.groups.read().await;
97        groups.get(group_id).map(|session| session.state.clone())
98    }
99    /// Get a group session (state + encryption)
100    pub async fn group_session(&self, group_id: &MessageId) -> Option<GroupSession> {
101        let groups = self.groups.read().await;
102        groups.get(group_id).cloned()
103    }
104
105    /// Get all managed group sessions
106    pub async fn all_group_sessions(&self) -> HashMap<MessageId, GroupSession> {
107        let groups = self.groups.read().await;
108        groups.clone()
109    }
110
111    /// Get all managed groups (state only, for backward compatibility)
112    pub async fn all_groups(&self) -> HashMap<MessageId, GroupState> {
113        let groups = self.groups.read().await;
114        groups
115            .iter()
116            .map(|(id, session)| (*id, session.state.clone()))
117            .collect()
118    }
119
120    /// Check if a user is a member of a specific group
121    pub async fn is_member(&self, group_id: &MessageId, user: &VerifyingKey) -> bool {
122        let groups = self.groups.read().await;
123        groups
124            .get(group_id)
125            .map(|session| session.state.is_member(user))
126            .unwrap_or(false)
127    }
128
129    /// Get a user's role in a specific group
130    pub async fn member_role(
131        &self,
132        group_id: &MessageId,
133        user: &VerifyingKey,
134    ) -> Option<GroupRole> {
135        let groups = self.groups.read().await;
136        groups
137            .get(group_id)
138            .and_then(|session| session.state.member_role(user).cloned())
139    }
140}
141
142#[cfg_attr(feature = "frb-api", frb(ignore))]
143impl GroupManager {
144    /// Create a new DGA instance builder
145    pub fn builder() -> GroupManagerBuilder {
146        GroupManagerBuilder { sessions: vec![] }
147    }
148
149    /// Create a group key from a mnemonic phrase
150    pub fn create_key_from_mnemonic(
151        mnemonic: &MnemonicPhrase,
152        passphrase: &str,
153        group_name: &str,
154        timestamp: u64,
155    ) -> GroupResult<EncryptionKey> {
156        let context = format!("dga-group-{group_name}");
157
158        EncryptionKey::from_mnemonic(mnemonic, passphrase, &context, timestamp)
159            .map_err(|e| GroupError::CryptoError(format!("Key derivation failed: {e}")))
160    }
161
162    /// Recover a group key from a mnemonic phrase with specific salt
163    pub fn recover_key_from_mnemonic(
164        mnemonic: &MnemonicPhrase,
165        passphrase: &str,
166        group_name: &str,
167        salt: &[u8; 32],
168        timestamp: u64,
169    ) -> GroupResult<EncryptionKey> {
170        let context = format!("dga-group-{group_name}");
171
172        EncryptionKey::from_mnemonic_with_salt(mnemonic, passphrase, &context, salt, timestamp)
173            .map_err(|e| GroupError::CryptoError(format!("Key recovery failed: {e}")))
174    }
175
176    /// Create a new encrypted group, returning the root event message to be sent
177    pub async fn create_group(
178        &self,
179        create_group: zoe_app_primitives::CreateGroup,
180        encryption_key: Option<EncryptionKey>,
181        creator: &KeyPair,
182        timestamp: u64,
183    ) -> GroupResult<CreateGroupResult> {
184        // Generate or use provided encryption key
185        let encryption_key = encryption_key.unwrap_or_else(|| Self::generate_group_key(timestamp));
186
187        // Get the group info from the CreateGroup object and update its key_info
188        let mut group_info = create_group.into_group_info();
189
190        // Update the key info with the actual encryption key metadata
191        group_info.key_info = GroupKeyInfo::ChaCha20Poly1305 {
192            key_id: encryption_key.key_id.clone(),
193            derivation_info: encryption_key.derivation_info.clone().unwrap_or_else(|| {
194                // Default derivation info if none provided
195                zoe_wire_protocol::crypto::KeyDerivationInfo {
196                    method: zoe_wire_protocol::crypto::KeyDerivationMethod::ChaCha20Poly1305Keygen,
197                    salt: vec![],
198                    argon2_params: zoe_wire_protocol::crypto::Argon2Params::default(),
199                    context: "dga-group-key".to_string(),
200                }
201            }),
202        };
203
204        let event: GroupActivityEvent<()> = GroupActivityEvent::UpdateGroup(group_info.clone());
205
206        // Encrypt the event before creating the wire protocol message
207        let encrypted_payload = encrypt_group_event_content(&encryption_key, &event)?;
208
209        // Create the wire protocol message with encrypted payload
210        let message = Message::new_v0_encrypted(
211            encrypted_payload,
212            creator.public_key(),
213            timestamp,
214            Kind::Regular, // Group creation events should be permanently stored
215            vec![],        // No tags needed for the root event
216        );
217
218        // Sign the message and create MessageFull
219        let message_full = MessageFull::new(message, creator)?;
220        let group_id = message_full.id(); // The group ID is the Blake3 hash of this message
221
222        // Create the initial group state using the unified constructor
223        let group_state = GroupState::new(
224            *group_id,
225            group_info.name.clone(),
226            group_info.settings.clone(),
227            group_info.metadata.clone(),
228            creator.public_key(),
229            timestamp,
230        );
231
232        // Create the unified group session
233        let group_session = GroupSession::new(group_state, encryption_key);
234
235        // Store the group session
236        self.groups
237            .write()
238            .await
239            .insert(*group_id, group_session.clone());
240
241        // Broadcast the group addition
242        if let Err(e) = self
243            .broadcast_channel
244            .try_broadcast(GroupDataUpdate::GroupAdded(group_session))
245        {
246            tracing::error!(error=?e, "Failed to broadcast group addition");
247        }
248
249        Ok(CreateGroupResult {
250            group_id: *group_id,
251            message: message_full,
252        })
253    }
254
255    /// Create an encrypted message for a group activity event
256    /// The group_id parameter should be the Blake3 hash of the CreateGroup message
257    pub async fn create_group_event_message<T>(
258        &self,
259        group_id: MessageId,
260        event: GroupActivityEvent<T>,
261        sender: &KeyPair,
262        timestamp: u64,
263    ) -> GroupResult<MessageFull>
264    where
265        T: Serialize,
266    {
267        // Find the group session to verify it exists and get encryption key
268        let groups = self.groups.read().await;
269        let group_session = groups
270            .get(&group_id)
271            .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
272
273        // Encrypt the event using the session's current key
274        let encrypted_payload = group_session.encrypt_group_event_content(&event)?;
275
276        // Create the message with the group ID (root event ID) as a channel tag
277        let message = Message::new_v0_encrypted(
278            encrypted_payload,
279            sender.public_key(),
280            timestamp,
281            Kind::Regular,
282            vec![Tag::Event {
283                id: group_id,   // The group ID is the root event ID
284                relays: vec![], // Could be populated with known relays
285            }],
286        );
287
288        // Sign and return the message
289        Ok(MessageFull::new(message, sender)?)
290    }
291
292    /// Process an incoming group event message
293    pub async fn process_group_event(&self, message_full: &MessageFull) -> GroupResult<()> {
294        // Extract the encrypted payload from the message
295        let Message::MessageV0(message) = message_full.message();
296
297        // Get the encrypted payload from the message content
298        let Content::ChaCha20Poly1305(encrypted_payload) = &message.content else {
299            return Err(GroupError::InvalidEvent(
300                "Message is not a ChaCha20Poly1305 encrypted message".to_string(),
301            ));
302        };
303
304        let sender = &message.sender;
305        let timestamp = message.when;
306
307        // Determine the group ID and decrypt the event
308        let (group_id, event) = if message.tags.is_empty() {
309            // This is a root event (CreateGroup) - the group ID is the message ID itself
310            let group_id = message_full.id();
311
312            // Get the group session for this group (must have been added via inbox system)
313            let groups = self.groups.read().await;
314            let group_session = groups.get(group_id).ok_or_else(|| {
315                GroupError::InvalidEvent(format!(
316                    "No group session available for group {group_id:?}"
317                ))
318            })?;
319
320            let event = group_session.decrypt_group_event::<()>(encrypted_payload)?;
321            (*group_id, event)
322        } else {
323            // This is a subsequent event - find the group by channel tag
324            let group_id = self.find_group_by_event_tag(&message.tags).await?;
325
326            // Get the group session for this group
327            let groups = self.groups.read().await;
328            let group_session = groups.get(&group_id).ok_or_else(|| {
329                GroupError::InvalidEvent(format!(
330                    "No group session available for group {group_id:?}"
331                ))
332            })?;
333
334            let event = group_session.decrypt_group_event(encrypted_payload)?;
335            (group_id, event)
336        };
337
338        // Handle the root event (group creation) specially
339        if let GroupActivityEvent::UpdateGroup(group_info) = &event {
340            // This is a root event - the group session should already exist from create_group
341            // Just verify it exists and update if needed
342            let mut groups = self.groups.write().await;
343            if let Some(group_session) = groups.get_mut(&group_id) {
344                // Update the group state within the session
345                group_session.state = GroupState::new(
346                    group_id,
347                    group_info.name.clone(),
348                    group_info.settings.clone(),
349                    group_info.metadata.clone(),
350                    sender.clone(),
351                    timestamp,
352                );
353
354                // Broadcast the group update
355                let _ = self
356                    .broadcast_channel
357                    .try_broadcast(GroupDataUpdate::GroupUpdated(group_session.clone()));
358            }
359            return Ok(());
360        }
361
362        // This is a subsequent event - apply to existing group state
363        let mut groups = self.groups.write().await;
364        let group_session = groups
365            .get_mut(&group_id)
366            .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
367
368        // Apply the event to the group state (convert GroupStateError to GroupError)
369        group_session
370            .state
371            .apply_event(&event, *message_full.id(), sender.clone(), timestamp)
372            .map_err(|e| match e {
373                GroupStateError::PermissionDenied(msg) => GroupError::PermissionDenied(msg),
374                GroupStateError::MemberNotFound { member, group } => {
375                    GroupError::MemberNotFound { member, group }
376                }
377                GroupStateError::StateTransition(msg) => GroupError::StateTransition(msg),
378                GroupStateError::InvalidOperation(msg) => GroupError::InvalidOperation(msg),
379            })?;
380
381        // Broadcast the group update
382        let _ = self
383            .broadcast_channel
384            .try_broadcast(GroupDataUpdate::GroupUpdated(group_session.clone()));
385
386        Ok(())
387    }
388
389    /// Find a group by looking for Event tags in the message
390    async fn find_group_by_event_tag(&self, tags: &[Tag]) -> GroupResult<MessageId> {
391        let groups = self.groups.read().await;
392        for tag in tags {
393            if let Tag::Event { id, .. } = tag
394                && groups.contains_key(id)
395            {
396                return Ok(*id);
397            }
398        }
399        Err(GroupError::InvalidEvent(
400            "No valid group channel tag found".to_string(),
401        ))
402    }
403
404    /// List all groups a user is a member of
405    pub async fn user_groups(&self, user: &VerifyingKey) -> Vec<GroupState> {
406        let groups = self.groups.read().await;
407        groups
408            .values()
409            .filter(|session| session.state.is_member(user))
410            .map(|session| session.state.clone())
411            .collect()
412    }
413
414    /// Add a complete group session
415    pub async fn add_group_session(&self, group_id: MessageId, session: GroupSession) {
416        self.groups.write().await.insert(group_id, session.clone());
417        let _ = self
418            .broadcast_channel
419            .try_broadcast(GroupDataUpdate::GroupAdded(session));
420    }
421
422    /// Remove a group session
423    pub async fn remove_group_session(&self, group_id: &MessageId) -> Option<GroupSession> {
424        if let Some(session) = self.groups.write().await.remove(group_id) {
425            let _ = self
426                .broadcast_channel
427                .try_broadcast(GroupDataUpdate::GroupRemoved(session.clone()));
428            Some(session)
429        } else {
430            None
431        }
432    }
433
434    /// Update a group session's encryption key (for key rotation)
435    pub async fn rotate_group_key(
436        &self,
437        group_id: &MessageId,
438        new_key: EncryptionKey,
439    ) -> GroupResult<()> {
440        let mut groups = self.groups.write().await;
441        let session = groups
442            .get_mut(group_id)
443            .ok_or_else(|| GroupError::GroupNotFound(format!("{group_id:?}")))?;
444
445        session.rotate_key(new_key);
446        let _ = self
447            .broadcast_channel
448            .try_broadcast(GroupDataUpdate::GroupUpdated(session.clone()));
449        Ok(())
450    }
451
452    /// Subscribe to group updates
453    pub fn subscribe_to_updates(&self) -> Receiver<GroupDataUpdate> {
454        self.broadcast_channel.new_receiver()
455    }
456
457    /// Create a subscription filter for a specific group
458    /// This returns the Event tag that should be used to subscribe to group events
459    pub async fn create_group_subscription_filter(&self, group_id: &MessageId) -> GroupResult<Tag> {
460        // Verify the group exists
461        let groups = self.groups.read().await;
462        if !groups.contains_key(group_id) {
463            return Err(GroupError::GroupNotFound(format!("{group_id:?}")));
464        }
465
466        Ok(Tag::Event {
467            id: *group_id,  // The group ID is the root event ID
468            relays: vec![], // Could be populated with known relays
469        })
470    }
471}
472
473impl Default for GroupManager {
474    fn default() -> Self {
475        Self::builder().build()
476    }
477}
478
479// Helper functions for common encrypted group operations
480
481/// Create a leave group event
482pub fn create_leave_group_event<T>(message: Option<String>) -> GroupActivityEvent<T> {
483    GroupActivityEvent::LeaveGroup { message }
484}
485
486/// Create a role update event
487/// TODO: This function is temporarily disabled due to IdentityRef expecting Ed25519 keys
488/// while the message system now uses ML-DSA keys. This needs to be updated when
489/// IdentityRef is migrated to ML-DSA.
490pub fn create_role_update_event<T>(member: VerifyingKey, role: GroupRole) -> GroupActivityEvent<T> {
491    // Use the provided ML-DSA member key directly
492    GroupActivityEvent::AssignRole {
493        target: IdentityRef::Key(member),
494        role,
495    }
496}
497
498/// Create a custom group activity event
499pub fn create_group_activity_event<T>(activity_data: T) -> GroupActivityEvent<T> {
500    GroupActivityEvent::Activity(activity_data)
501}
502
503/// Create a group update event
504pub fn create_group_update_event<T>(group_info: GroupInfo) -> GroupActivityEvent<T> {
505    GroupActivityEvent::UpdateGroup(group_info)
506}