zoe_client/
pqxdh.rs

1//! Post-Quantum Extended Diffie-Hellman (PQXDH) Protocol Implementation
2//!
3//! This module provides a complete, high-level implementation of the PQXDH protocol
4//! for secure, post-quantum resistant communication. It includes both client and
5//! service provider functionality with automatic session management.
6//!
7//! ## Core Components
8//!
9//! ### 1. Inbox Management
10//! - **Publishing**: Service providers can publish PQXDH inboxes to advertise their availability
11//! - **Discovery**: Clients can discover and fetch service provider inboxes
12//! - **Privacy**: Uses type-safe protocols and deterministic serialization with postcard
13//!
14//! ### 2. Session Establishment
15//! - **Initiation**: Clients can establish secure sessions with service providers
16//! - **Privacy-Preserving**: Uses randomized channel IDs for unlinkable communication
17//! - **Post-Quantum Security**: Leverages PQXDH for quantum-resistant key exchange
18//!
19//! ### 3. Message Communication
20//! - **Session Messages**: Encrypted communication over established sessions
21//! - **Channel Management**: Automatic subscription handling for session channels
22//! - **Sequence Numbers**: Built-in replay protection with sequence numbering
23//!
24//! ### 4. State Management
25//! - **Persistence**: Serializable state for application restarts
26//! - **Session Tracking**: Automatic management of multiple concurrent sessions
27//! - **Key Management**: Secure handling of private keys and prekey bundles
28//! ## Usage Patterns
29//!
30//! ### Service Provider Pattern
31//! ```rust,no_run
32//! # use zoe_client::pqxdh::*;
33//! # use zoe_wire_protocol::*;
34//! # use futures::StreamExt;
35//! # async fn example() -> Result<()> {
36//! # let messages_manager = todo!();
37//! # let keypair = todo!();
38//! // 1. Create handler and publish service
39//! let mut handler = PqxdhProtocolHandler::new(
40//!     &messages_manager,
41//!     &keypair,
42//!     PqxdhInboxProtocol::EchoService
43//! );
44//! handler.publish_service(false).await?;
45//!
46//! // 2. Listen for incoming client connections
47//! let mut inbox_stream = Box::pin(handler.inbox_stream::<String>().await?);
48//! while let Some((session_id, message)) = inbox_stream.next().await {
49//!     // Handle client messages
50//!     println!("Received from {:?}: {}", session_id, message);
51//! }
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! ### Client Pattern
57//! ```rust,no_run
58//! # use zoe_client::pqxdh::*;
59//! # use zoe_wire_protocol::*;
60//! # use futures::StreamExt;
61//! # async fn example() -> Result<()> {
62//! # let messages_manager = todo!();
63//! # let keypair = todo!();
64//! # let service_key = todo!();
65//! # let session_id = [0u8; 32]; // Session ID obtained from connection
66//! # let initial_message = "hello".to_string();
67//! // 1. Create handler and connect to service
68//! let mut handler = PqxdhProtocolHandler::new(
69//!     &messages_manager,
70//!     &keypair,
71//!     PqxdhInboxProtocol::EchoService
72//! );
73//! let mut response_stream = Box::pin(handler.connect_to_service::<String, String>(
74//!     &service_key,
75//!     &initial_message
76//! ).await?);
77//!
78//! // 2. Send additional messages using session ID
79//! handler.send_message(&session_id, &"follow up message".to_string()).await?;
80//!
81//! // 3. Listen for responses
82//! while let Some(response) = response_stream.next().await {
83//!     println!("Received response: {}", response);
84//! }
85//! # Ok(())
86//! # }
87//! ```
88//!
89//! ## Security Features
90//!
91//! - **Post-Quantum Resistance**: Uses CRYSTALS-Kyber for key encapsulation
92//! - **Forward Secrecy**: Each session uses ephemeral keys
93//! - **Replay Protection**: Sequence numbers prevent message replay attacks
94//! - **Unlinkability**: Randomized channel IDs prevent traffic analysis
95//! - **Authentication**: All messages are cryptographically signed
96//!
97//! ## Error Handling
98//!
99//! This module uses a custom [`PqxdhError`] type that provides structured error handling
100//! for all PQXDH operations. The error type includes specific variants for different
101//! failure modes:
102//!
103//! - **Connection Errors**: `InboxNotFound`, `ServiceNotPublished`, `NoInboxSubscription`
104//! - **Session Errors**: `SessionNotFound`, `InvalidSender`, `NotInitialMessage`
105//! - **Cryptographic Errors**: `Crypto`, `KeyGeneration`, `PqxdhProtocol`
106//! - **Message Errors**: `InvalidContentType`, `NotPqxdhMessage`, `MessageCreation`
107//! - **Infrastructure Errors**: `Rpc`, `MessagesService`, `Serialization`
108//!
109//! ### Error Handling Example
110//! ```rust,no_run
111//! # use zoe_client::pqxdh::*;
112//! # async fn example() -> Result<()> {
113//! # let mut handler: PqxdhProtocolHandler = todo!();
114//! match handler.publish_service(false).await {
115//!     Ok(tag) => println!("Service published with tag: {:?}", tag),
116//!     Err(PqxdhError::InboxAlreadyPublished) => {
117//!         println!("Service already published, use force_overwrite=true");
118//!     }
119//!     Err(PqxdhError::KeyGeneration(msg)) => {
120//!         eprintln!("Failed to generate keys: {}", msg);
121//!     }
122//!     Err(e) => eprintln!("Unexpected error: {}", e),
123//! }
124//! # Ok(())
125//! # }
126//! ```
127//!
128//! ## Serialization
129//!
130//! All data structures use `postcard` for efficient binary serialization,
131//! providing compact wire formats and deterministic encoding for cryptographic
132//! operations. This ensures compatibility with the project's binary-first
133//! architecture and optimal network efficiency.
134
135mod error;
136mod handler;
137mod message_listener;
138mod session;
139mod state;
140mod transport;
141
142pub use error::*;
143pub use handler::*;
144pub use message_listener::*;
145pub use session::*;
146pub use state::*;
147pub use transport::*;
148
149#[cfg(test)]
150mod tests {
151    use std::sync::Arc;
152
153    use super::*;
154    use crate::services::messages_manager::MockMessagesManagerTrait;
155    use eyeball::{AsyncLock, SharedObservable};
156    use futures::stream;
157    use mockall::predicate::*;
158
159    use zoe_wire_protocol::{
160        Content, KeyId, KeyPair, Kind, Message, MessageFull, PqxdhInboxProtocol, PqxdhSharedSecret,
161        inbox::pqxdh::{InboxType, PqxdhInbox},
162    };
163    use zoe_wire_protocol::{PublishResult, StoreKey, Tag};
164
165    #[test]
166    fn test_pqxdh_session_serialization() {
167        // Create a test session
168        let shared_secret = PqxdhSharedSecret {
169            shared_key: [42u8; 32],
170            consumed_one_time_key_ids: vec!["key1".to_string(), "key2".to_string()],
171        };
172        let session_channel_id = [1u8; 32];
173
174        let keypair = KeyPair::generate(&mut rand::thread_rng());
175        let session = PqxdhSession::from_shared_secret(
176            shared_secret,
177            session_channel_id,
178            session_channel_id, // their_session_channel_id
179            keypair.public_key(),
180        );
181
182        // Test serialization round-trip
183        let serialized = postcard::to_stdvec(&session).expect("Failed to serialize session");
184        let deserialized: PqxdhSession =
185            postcard::from_bytes(&serialized).expect("Failed to deserialize session");
186
187        // Verify the data is preserved
188        assert_eq!(
189            session.shared_secret.shared_key,
190            deserialized.shared_secret.shared_key
191        );
192        assert_eq!(
193            session.shared_secret.consumed_one_time_key_ids,
194            deserialized.shared_secret.consumed_one_time_key_ids
195        );
196        assert_eq!(session.sequence_number, deserialized.sequence_number);
197        assert_eq!(
198            session.my_session_channel_id,
199            deserialized.my_session_channel_id
200        );
201    }
202
203    #[test]
204    fn test_pqxdh_protocol_state_serialization() {
205        let protocol = PqxdhInboxProtocol::EchoService;
206        let mut state = PqxdhProtocolState::new(protocol.clone());
207
208        // Add some test data
209        state.inbox_tag = Some(Tag::Channel {
210            id: vec![1, 2, 3, 4],
211            relays: vec![],
212        });
213
214        let target_id: KeyId = KeyId::from_bytes([1u8; 32]);
215        let shared_secret = PqxdhSharedSecret {
216            shared_key: [99u8; 32],
217            consumed_one_time_key_ids: vec!["consumed_key".to_string()],
218        };
219        let keypair = KeyPair::generate(&mut rand::thread_rng());
220        let session = PqxdhSession::from_shared_secret(
221            shared_secret,
222            [5u8; 32],
223            [6u8; 32], // their_session_channel_id
224            keypair.public_key(),
225        );
226        state.sessions.insert(target_id, session);
227
228        // Test serialization round-trip
229        let serialized = postcard::to_stdvec(&state).expect("Failed to serialize state");
230        let deserialized: PqxdhProtocolState =
231            postcard::from_bytes(&serialized).expect("Failed to deserialize state");
232
233        // Verify the data is preserved
234        assert_eq!(state.protocol, deserialized.protocol);
235        assert_eq!(state.inbox_tag, deserialized.inbox_tag);
236        assert_eq!(state.sessions.len(), deserialized.sessions.len());
237
238        // Verify session data
239        let original_session = &state.sessions[&target_id];
240        let deserialized_session = &deserialized.sessions[&target_id];
241        assert_eq!(
242            original_session.shared_secret.shared_key,
243            deserialized_session.shared_secret.shared_key
244        );
245        assert_eq!(
246            original_session.sequence_number,
247            deserialized_session.sequence_number
248        );
249        assert_eq!(
250            original_session.my_session_channel_id,
251            deserialized_session.my_session_channel_id
252        );
253    }
254
255    #[test]
256    fn test_pqxdh_private_keys_serialization() -> Result<()> {
257        // Generate test keypair with random data
258        let mut rng = rand::thread_rng();
259        let keypair = KeyPair::generate(&mut rng);
260
261        // Generate prekey bundle with private keys (creates random keys)
262        let (_prekey_bundle, private_keys) =
263            create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3)?;
264
265        // Test serialization round-trip
266        let serialized = postcard::to_stdvec(&private_keys)?;
267        let deserialized: zoe_wire_protocol::inbox::pqxdh::PqxdhPrivateKeys =
268            postcard::from_bytes(&serialized)?;
269
270        // Verify the data is preserved by comparing the keys directly (now that serde works)
271        // We can't use PartialEq on StaticSecret, so we compare the bytes
272        assert_eq!(
273            private_keys.signed_prekey_private.to_bytes(),
274            deserialized.signed_prekey_private.to_bytes()
275        );
276        assert_eq!(
277            private_keys.one_time_prekey_privates.len(),
278            deserialized.one_time_prekey_privates.len()
279        );
280        assert_eq!(
281            private_keys.pq_signed_prekey_private,
282            deserialized.pq_signed_prekey_private
283        );
284        assert_eq!(
285            private_keys.pq_one_time_prekey_privates,
286            deserialized.pq_one_time_prekey_privates
287        );
288
289        // Verify one-time keys (should be random and different each time)
290        for (key_id, original_key) in &private_keys.one_time_prekey_privates {
291            let deserialized_key = &deserialized.one_time_prekey_privates[key_id];
292            assert_eq!(original_key.to_bytes(), deserialized_key.to_bytes());
293        }
294
295        // Verify that keys are actually random by generating another set
296        let (_prekey_bundle2, private_keys2) =
297            create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3)?;
298        assert_ne!(
299            private_keys.signed_prekey_private.to_bytes(),
300            private_keys2.signed_prekey_private.to_bytes(),
301            "Keys should be randomly generated and different"
302        );
303
304        Ok(())
305    }
306
307    // Helper functions for tests
308    fn create_test_keypair() -> KeyPair {
309        KeyPair::generate(&mut rand::thread_rng())
310    }
311
312    fn create_test_inbox() -> PqxdhInbox {
313        let keypair = Arc::new(create_test_keypair());
314        let (prekey_bundle, _) = create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3).unwrap();
315        PqxdhInbox::new(InboxType::Public, prekey_bundle, Some(1024), None)
316    }
317
318    fn create_test_message_full(content: Content, author: &KeyPair) -> MessageFull {
319        let timestamp = std::time::SystemTime::now()
320            .duration_since(std::time::UNIX_EPOCH)
321            .unwrap()
322            .as_secs();
323        let message = Message::new_v0(
324            content,
325            author.public_key(),
326            timestamp,
327            Kind::Regular,
328            vec![],
329        );
330        MessageFull::new(message, author).unwrap()
331    }
332
333    type TestPqxdhHandler = PqxdhProtocolHandler<MockMessagesManagerTrait>;
334
335    #[tokio::test]
336    async fn test_publish_service_success() {
337        let mut mock_manager = MockMessagesManagerTrait::new();
338        let keypair = Arc::new(create_test_keypair());
339
340        // Mock the publish call
341        mock_manager.expect_publish().times(1).returning(|_| {
342            Ok(PublishResult::StoredNew {
343                global_stream_id: "123".to_string(),
344            })
345        });
346
347        let handler = TestPqxdhHandler::new(
348            Arc::new(mock_manager),
349            keypair.clone(),
350            PqxdhInboxProtocol::EchoService,
351        );
352
353        let result = handler.publish_service(false).await;
354        assert!(result.is_ok());
355
356        // Verify that the state was updated
357        let state = handler.state.read().await;
358        assert!(state.inbox_tag.is_some());
359        assert!(state.private_keys.is_some());
360        assert!(state.inbox.is_some());
361    }
362
363    /// Test service provider publish_service with already published inbox
364    #[tokio::test]
365    async fn test_publish_service_already_published() {
366        let mock_manager = MockMessagesManagerTrait::new();
367        let keypair = Arc::new(create_test_keypair());
368
369        let handler = TestPqxdhHandler::new(
370            Arc::new(mock_manager),
371            keypair.clone(),
372            PqxdhInboxProtocol::EchoService,
373        );
374
375        // Manually set inbox_tag to simulate already published state
376
377        SharedObservable::<_, AsyncLock>::update(
378            &handler.state,
379            |state: &mut PqxdhProtocolState| {
380                state.inbox_tag = Some(Tag::Channel {
381                    id: vec![1, 2, 3],
382                    relays: vec![],
383                });
384            },
385        )
386        .await;
387
388        let result = handler.publish_service(false).await;
389        assert!(matches!(result, Err(PqxdhError::InboxAlreadyPublished)));
390    }
391
392    /// Test service provider publish_service with force overwrite
393    #[tokio::test]
394    async fn test_publish_service_force_overwrite() {
395        let mut mock_manager = MockMessagesManagerTrait::new();
396        let keypair = Arc::new(create_test_keypair());
397
398        // Mock the publish call
399        mock_manager.expect_publish().times(1).returning(|_| {
400            Ok(PublishResult::StoredNew {
401                global_stream_id: "123".to_string(),
402            })
403        });
404
405        let handler = TestPqxdhHandler::new(
406            Arc::new(mock_manager),
407            keypair.clone(),
408            PqxdhInboxProtocol::EchoService,
409        );
410
411        // Manually set inbox_tag to simulate already published state
412
413        SharedObservable::<_, AsyncLock>::update(
414            &handler.state,
415            |state: &mut PqxdhProtocolState| {
416                state.inbox_tag = Some(Tag::Channel {
417                    id: vec![1, 2, 3],
418                    relays: vec![],
419                });
420            },
421        )
422        .await;
423
424        let result = handler.publish_service(true).await;
425        assert!(result.is_ok());
426    }
427
428    /// Test client connect_to_service functionality
429    #[tokio::test]
430    async fn test_connect_to_service_success() {
431        let mut mock_manager = MockMessagesManagerTrait::new();
432        let client_keypair = Arc::new(create_test_keypair());
433        let service_keypair = create_test_keypair();
434        let service_key = service_keypair.public_key();
435
436        let test_inbox = create_test_inbox();
437
438        // Mock user_data call to return the inbox
439        let inbox_message = create_test_message_full(
440            Content::Raw(postcard::to_stdvec(&test_inbox).unwrap()),
441            &service_keypair,
442        );
443        mock_manager
444            .expect_user_data()
445            .with(
446                eq(KeyId::from(*service_key.id())),
447                eq(StoreKey::PqxdhInbox(PqxdhInboxProtocol::EchoService)),
448            )
449            .times(1)
450            .returning(move |_, _| Ok(Some(inbox_message.clone())));
451
452        // Mock publish call for the initial PQXDH message
453        mock_manager.expect_publish().times(1).returning(|_| {
454            Ok(PublishResult::StoredNew {
455                global_stream_id: "456".to_string(),
456            })
457        });
458
459        // Mock catch_up_and_subscribe for listening to responses
460        mock_manager
461            .expect_catch_up_and_subscribe()
462            .times(1)
463            .returning(|_, _| Ok(Box::pin(stream::empty())));
464
465        let handler = TestPqxdhHandler::new(
466            Arc::new(mock_manager),
467            client_keypair.clone(),
468            PqxdhInboxProtocol::EchoService,
469        );
470
471        let initial_message = "Hello, service!".to_string();
472        let result = handler
473            .connect_to_service::<String, String>(&service_key, &initial_message)
474            .await;
475
476        assert!(result.is_ok());
477        let (session_id, _stream) = result.unwrap();
478
479        // Drop the stream to release the borrow on handler
480        drop(_stream);
481
482        // Verify session was created
483        let state = handler.state.read().await;
484        assert!(state.sessions.contains_key(&KeyId::from_bytes(session_id)));
485    }
486
487    /// Test client connect_to_service with inbox not found
488    #[tokio::test]
489    async fn test_connect_to_service_inbox_not_found() {
490        let mut mock_manager = MockMessagesManagerTrait::new();
491        let client_keypair = Arc::new(create_test_keypair());
492        let service_keypair = create_test_keypair();
493        let service_key = service_keypair.public_key();
494
495        // Mock user_data call to return None (inbox not found)
496        mock_manager
497            .expect_user_data()
498            .with(
499                eq(KeyId::from(*service_key.id())),
500                eq(StoreKey::PqxdhInbox(PqxdhInboxProtocol::EchoService)),
501            )
502            .times(1)
503            .returning(|_, _| Ok(None));
504
505        let handler = TestPqxdhHandler::new(
506            Arc::new(mock_manager),
507            client_keypair.clone(),
508            PqxdhInboxProtocol::EchoService,
509        );
510
511        let initial_message = "Hello, service!".to_string();
512        let result = handler
513            .connect_to_service::<String, String>(&service_key, &initial_message)
514            .await;
515
516        assert!(matches!(result, Err(PqxdhError::InboxNotFound)));
517    }
518
519    /// Test send_message functionality
520    #[tokio::test]
521    async fn test_send_message_success() {
522        let mut mock_manager = MockMessagesManagerTrait::new();
523        let keypair = Arc::new(create_test_keypair());
524
525        // Mock publish call
526        mock_manager.expect_publish().times(1).returning(|_| {
527            Ok(PublishResult::StoredNew {
528                global_stream_id: "789".to_string(),
529            })
530        });
531
532        let handler = TestPqxdhHandler::new(
533            Arc::new(mock_manager),
534            keypair.clone(),
535            PqxdhInboxProtocol::EchoService,
536        );
537
538        // Create a test session
539        let session_id: PqxdhSessionId = [42u8; 32];
540        let shared_secret = PqxdhSharedSecret {
541            shared_key: [1u8; 32],
542            consumed_one_time_key_ids: vec![],
543        };
544        let test_session = PqxdhSession::from_shared_secret(
545            shared_secret,
546            session_id,
547            [43u8; 32], // their_session_channel_id
548            keypair.public_key(),
549        );
550
551        // Add session to state
552
553        SharedObservable::<_, AsyncLock>::update(
554            &handler.state,
555            |state: &mut PqxdhProtocolState| {
556                state
557                    .sessions
558                    .insert(KeyId::from_bytes(session_id), test_session);
559            },
560        )
561        .await;
562
563        let message = "Test message".to_string();
564        let result = handler.send_message(&session_id, &message).await;
565
566        assert!(result.is_ok());
567    }
568
569    /// Test send_message with session not found
570    #[tokio::test]
571    async fn test_send_message_session_not_found() {
572        let mock_manager = MockMessagesManagerTrait::new();
573        let keypair = Arc::new(create_test_keypair());
574
575        let handler = TestPqxdhHandler::new(
576            Arc::new(mock_manager),
577            keypair.clone(),
578            PqxdhInboxProtocol::EchoService,
579        );
580
581        let session_id: PqxdhSessionId = [42u8; 32];
582        let message = "Test message".to_string();
583        let result = handler.send_message(&session_id, &message).await;
584
585        assert!(matches!(result, Err(PqxdhError::SessionNotFound)));
586    }
587
588    /// Test inbox_stream functionality for service providers
589    #[tokio::test]
590    async fn test_inbox_stream_success() {
591        let mut mock_manager = MockMessagesManagerTrait::new();
592        let keypair = Arc::new(create_test_keypair());
593
594        // Mock ensure_contains_filter and filtered_messages_stream
595        mock_manager
596            .expect_ensure_contains_filter()
597            .times(1)
598            .returning(|_| Ok(()));
599
600        mock_manager
601            .expect_filtered_messages_stream()
602            .times(1)
603            .returning(|_| Box::pin(stream::empty()));
604
605        let handler = TestPqxdhHandler::new(
606            Arc::new(mock_manager),
607            keypair.clone(),
608            PqxdhInboxProtocol::EchoService,
609        );
610
611        // Set up service provider state
612
613        SharedObservable::<_, AsyncLock>::update(
614            &handler.state,
615            |state: &mut PqxdhProtocolState| {
616                state.inbox_tag = Some(Tag::Channel {
617                    id: vec![1, 2, 3],
618                    relays: vec![],
619                });
620                let (_, private_keys) =
621                    create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3).unwrap();
622                state.private_keys = Some(private_keys);
623                state.inbox = Some(create_test_inbox());
624            },
625        )
626        .await;
627
628        let result = handler.inbox_stream::<String>().await;
629        assert!(result.is_ok());
630    }
631
632    /// Test inbox_stream without published service
633    #[tokio::test]
634    async fn test_inbox_stream_service_not_published() {
635        let mock_manager = MockMessagesManagerTrait::new();
636        let keypair = Arc::new(create_test_keypair());
637
638        let handler = TestPqxdhHandler::new(
639            Arc::new(mock_manager),
640            keypair.clone(),
641            PqxdhInboxProtocol::EchoService,
642        );
643
644        let result = handler.inbox_stream::<String>().await;
645        assert!(matches!(result, Err(PqxdhError::ServiceNotPublished)));
646    }
647
648    /// Test state serialization and restoration
649    #[tokio::test]
650    async fn test_state_persistence() {
651        let mock_manager = Arc::new(MockMessagesManagerTrait::new());
652        let keypair = Arc::new(create_test_keypair());
653
654        // Create handler with initial state
655        let original_handler = TestPqxdhHandler::new(
656            mock_manager.clone(),
657            keypair.clone(),
658            PqxdhInboxProtocol::EchoService,
659        );
660
661        // Add some state
662        let session_id: PqxdhSessionId = [99u8; 32];
663        let shared_secret = PqxdhSharedSecret {
664            shared_key: [2u8; 32],
665            consumed_one_time_key_ids: vec!["test_key".to_string()],
666        };
667        let test_session = PqxdhSession::from_shared_secret(
668            shared_secret,
669            session_id,
670            [100u8; 32], // their_session_channel_id
671            keypair.public_key(),
672        );
673
674        SharedObservable::<_, AsyncLock>::update(
675            &original_handler.state,
676            |state: &mut PqxdhProtocolState| {
677                state
678                    .sessions
679                    .insert(KeyId::from_bytes(session_id), test_session);
680                state.inbox_tag = Some(Tag::Channel {
681                    id: vec![4, 5, 6],
682                    relays: vec![],
683                });
684            },
685        )
686        .await;
687
688        // Serialize state
689        let serialized_state = {
690            let state = original_handler.state.read().await;
691            postcard::to_stdvec(&*state).unwrap()
692        };
693
694        // Deserialize and create new handler
695        let restored_state: PqxdhProtocolState = postcard::from_bytes(&serialized_state).unwrap();
696        let restored_handler =
697            TestPqxdhHandler::from_state(mock_manager.clone(), keypair.clone(), restored_state);
698
699        // Verify state was restored correctly
700        let restored_state = restored_handler.state.read().await;
701        assert!(
702            restored_state
703                .sessions
704                .contains_key(&KeyId::from_bytes(session_id))
705        );
706        assert_eq!(restored_state.protocol, PqxdhInboxProtocol::EchoService);
707        assert_eq!(
708            restored_state.inbox_tag,
709            Some(Tag::Channel {
710                id: vec![4, 5, 6],
711                relays: vec![]
712            })
713        );
714    }
715
716    /// Test error handling for various scenarios
717    #[tokio::test]
718    async fn test_error_handling_scenarios() {
719        let mut mock_manager = MockMessagesManagerTrait::new();
720        let keypair = Arc::new(create_test_keypair());
721
722        // Test RPC error during publish
723        mock_manager
724            .expect_publish()
725            .times(1)
726            .returning(|_| Err(crate::ClientError::Generic("Network error".to_string())));
727
728        let handler = TestPqxdhHandler::new(
729            Arc::new(mock_manager),
730            keypair.clone(),
731            PqxdhInboxProtocol::EchoService,
732        );
733
734        let result = handler.publish_service(false).await;
735        assert!(matches!(result, Err(PqxdhError::MessagesService(_))));
736    }
737
738    /// Test session management with multiple sessions
739    #[tokio::test]
740    async fn test_multiple_session_management() {
741        let mock_manager = MockMessagesManagerTrait::new();
742        let keypair = Arc::new(create_test_keypair());
743
744        let handler = TestPqxdhHandler::new(
745            Arc::new(mock_manager),
746            keypair.clone(),
747            PqxdhInboxProtocol::EchoService,
748        );
749
750        // Create multiple test sessions
751        let session_ids = [[1u8; 32], [2u8; 32], [3u8; 32]];
752
753        for (i, session_id) in session_ids.iter().enumerate() {
754            let shared_secret = PqxdhSharedSecret {
755                shared_key: [i as u8; 32],
756                consumed_one_time_key_ids: vec![format!("key_{}", i)],
757            };
758            let test_session = PqxdhSession::from_shared_secret(
759                shared_secret,
760                *session_id,
761                [(i + 10) as u8; 32], // their_session_channel_id
762                keypair.public_key(),
763            );
764
765            SharedObservable::<_, AsyncLock>::update(
766                &handler.state,
767                |state: &mut PqxdhProtocolState| {
768                    state
769                        .sessions
770                        .insert(KeyId::from_bytes(*session_id), test_session);
771                },
772            )
773            .await;
774        }
775
776        // Verify all sessions are tracked
777        let state = handler.state.read().await;
778        assert_eq!(state.sessions.len(), 3);
779        for session_id in &session_ids {
780            assert!(state.sessions.contains_key(&KeyId::from_bytes(*session_id)));
781        }
782    }
783
784    /// Test ephemeral message functionality
785    #[tokio::test]
786    async fn test_send_ephemeral_message() {
787        let mut mock_manager = MockMessagesManagerTrait::new();
788        let keypair = Arc::new(create_test_keypair());
789
790        // Mock publish call
791        mock_manager.expect_publish().times(1).returning(|_| {
792            Ok(PublishResult::StoredNew {
793                global_stream_id: "emp123".to_string(),
794            })
795        });
796
797        let handler = TestPqxdhHandler::new(
798            Arc::new(mock_manager),
799            keypair.clone(),
800            PqxdhInboxProtocol::EchoService,
801        );
802
803        // Create a test session
804        let session_id: PqxdhSessionId = [55u8; 32];
805        let shared_secret = PqxdhSharedSecret {
806            shared_key: [3u8; 32],
807            consumed_one_time_key_ids: vec![],
808        };
809        let test_session = PqxdhSession::from_shared_secret(
810            shared_secret,
811            session_id,
812            [56u8; 32], // their_session_channel_id
813            keypair.public_key(),
814        );
815
816        // Add session to state
817
818        SharedObservable::<_, AsyncLock>::update(
819            &handler.state,
820            |state: &mut PqxdhProtocolState| {
821                state
822                    .sessions
823                    .insert(KeyId::from_bytes(session_id), test_session);
824            },
825        )
826        .await;
827
828        let message = "Ephemeral message".to_string();
829        let result = handler
830            .send_ephemeral_message(&session_id, &message, 60)
831            .await;
832
833        assert!(result.is_ok());
834    }
835
836    /// Test listen_for_messages functionality
837    #[tokio::test]
838    async fn test_listen_for_messages() {
839        let mut mock_manager = MockMessagesManagerTrait::new();
840        let keypair = Arc::new(create_test_keypair());
841
842        // Mock catch_up_and_subscribe
843        mock_manager
844            .expect_catch_up_and_subscribe()
845            .times(1)
846            .returning(|_, _| Ok(Box::pin(stream::empty())));
847
848        let handler = TestPqxdhHandler::new(
849            Arc::new(mock_manager),
850            keypair.clone(),
851            PqxdhInboxProtocol::EchoService,
852        );
853
854        // Create a test session
855        let session_id: PqxdhSessionId = [77u8; 32];
856        let shared_secret = PqxdhSharedSecret {
857            shared_key: [4u8; 32],
858            consumed_one_time_key_ids: vec![],
859        };
860        let test_session = PqxdhSession::from_shared_secret(
861            shared_secret,
862            session_id,
863            [78u8; 32], // their_session_channel_id
864            keypair.public_key(),
865        );
866
867        // Add session to state
868
869        SharedObservable::<_, AsyncLock>::update(
870            &handler.state,
871            |state: &mut PqxdhProtocolState| {
872                state
873                    .sessions
874                    .insert(KeyId::from_bytes(session_id), test_session);
875                state.inbox_tag = Some(Tag::Channel {
876                    id: vec![1, 2, 3],
877                    relays: vec![],
878                });
879            },
880        )
881        .await;
882
883        let result = handler
884            .listen_for_messages::<String>(session_id, true)
885            .await;
886        assert!(result.is_ok());
887    }
888
889    /// Test listen_for_messages with session not found
890    #[tokio::test]
891    async fn test_listen_for_messages_session_not_found() {
892        let mock_manager = MockMessagesManagerTrait::new();
893        let keypair = Arc::new(create_test_keypair());
894
895        let handler = TestPqxdhHandler::new(
896            Arc::new(mock_manager),
897            keypair.clone(),
898            PqxdhInboxProtocol::EchoService,
899        );
900
901        let session_id: PqxdhSessionId = [88u8; 32];
902        let result = handler
903            .listen_for_messages::<String>(session_id, true)
904            .await;
905
906        assert!(matches!(result, Err(PqxdhError::SessionNotFound)));
907    }
908}