1mod error;
136mod handler;
137mod message_listener;
138mod session;
139mod state;
140mod transport;
141
142pub use error::*;
143pub use handler::*;
144pub use message_listener::*;
145pub use session::*;
146pub use state::*;
147pub use transport::*;
148
149#[cfg(test)]
150mod tests {
151 use std::sync::Arc;
152
153 use super::*;
154 use crate::services::messages_manager::MockMessagesManagerTrait;
155 use eyeball::{AsyncLock, SharedObservable};
156 use futures::stream;
157 use mockall::predicate::*;
158
159 use zoe_wire_protocol::{
160 Content, KeyId, KeyPair, Kind, Message, MessageFull, PqxdhInboxProtocol, PqxdhSharedSecret,
161 inbox::pqxdh::{InboxType, PqxdhInbox},
162 };
163 use zoe_wire_protocol::{PublishResult, StoreKey, Tag};
164
165 #[test]
166 fn test_pqxdh_session_serialization() {
167 let shared_secret = PqxdhSharedSecret {
169 shared_key: [42u8; 32],
170 consumed_one_time_key_ids: vec!["key1".to_string(), "key2".to_string()],
171 };
172 let session_channel_id = [1u8; 32];
173
174 let keypair = KeyPair::generate(&mut rand::thread_rng());
175 let session = PqxdhSession::from_shared_secret(
176 shared_secret,
177 session_channel_id,
178 session_channel_id, keypair.public_key(),
180 );
181
182 let serialized = postcard::to_stdvec(&session).expect("Failed to serialize session");
184 let deserialized: PqxdhSession =
185 postcard::from_bytes(&serialized).expect("Failed to deserialize session");
186
187 assert_eq!(
189 session.shared_secret.shared_key,
190 deserialized.shared_secret.shared_key
191 );
192 assert_eq!(
193 session.shared_secret.consumed_one_time_key_ids,
194 deserialized.shared_secret.consumed_one_time_key_ids
195 );
196 assert_eq!(session.sequence_number, deserialized.sequence_number);
197 assert_eq!(
198 session.my_session_channel_id,
199 deserialized.my_session_channel_id
200 );
201 }
202
203 #[test]
204 fn test_pqxdh_protocol_state_serialization() {
205 let protocol = PqxdhInboxProtocol::EchoService;
206 let mut state = PqxdhProtocolState::new(protocol.clone());
207
208 state.inbox_tag = Some(Tag::Channel {
210 id: vec![1, 2, 3, 4],
211 relays: vec![],
212 });
213
214 let target_id: KeyId = KeyId::from_bytes([1u8; 32]);
215 let shared_secret = PqxdhSharedSecret {
216 shared_key: [99u8; 32],
217 consumed_one_time_key_ids: vec!["consumed_key".to_string()],
218 };
219 let keypair = KeyPair::generate(&mut rand::thread_rng());
220 let session = PqxdhSession::from_shared_secret(
221 shared_secret,
222 [5u8; 32],
223 [6u8; 32], keypair.public_key(),
225 );
226 state.sessions.insert(target_id, session);
227
228 let serialized = postcard::to_stdvec(&state).expect("Failed to serialize state");
230 let deserialized: PqxdhProtocolState =
231 postcard::from_bytes(&serialized).expect("Failed to deserialize state");
232
233 assert_eq!(state.protocol, deserialized.protocol);
235 assert_eq!(state.inbox_tag, deserialized.inbox_tag);
236 assert_eq!(state.sessions.len(), deserialized.sessions.len());
237
238 let original_session = &state.sessions[&target_id];
240 let deserialized_session = &deserialized.sessions[&target_id];
241 assert_eq!(
242 original_session.shared_secret.shared_key,
243 deserialized_session.shared_secret.shared_key
244 );
245 assert_eq!(
246 original_session.sequence_number,
247 deserialized_session.sequence_number
248 );
249 assert_eq!(
250 original_session.my_session_channel_id,
251 deserialized_session.my_session_channel_id
252 );
253 }
254
255 #[test]
256 fn test_pqxdh_private_keys_serialization() -> Result<()> {
257 let mut rng = rand::thread_rng();
259 let keypair = KeyPair::generate(&mut rng);
260
261 let (_prekey_bundle, private_keys) =
263 create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3)?;
264
265 let serialized = postcard::to_stdvec(&private_keys)?;
267 let deserialized: zoe_wire_protocol::inbox::pqxdh::PqxdhPrivateKeys =
268 postcard::from_bytes(&serialized)?;
269
270 assert_eq!(
273 private_keys.signed_prekey_private.to_bytes(),
274 deserialized.signed_prekey_private.to_bytes()
275 );
276 assert_eq!(
277 private_keys.one_time_prekey_privates.len(),
278 deserialized.one_time_prekey_privates.len()
279 );
280 assert_eq!(
281 private_keys.pq_signed_prekey_private,
282 deserialized.pq_signed_prekey_private
283 );
284 assert_eq!(
285 private_keys.pq_one_time_prekey_privates,
286 deserialized.pq_one_time_prekey_privates
287 );
288
289 for (key_id, original_key) in &private_keys.one_time_prekey_privates {
291 let deserialized_key = &deserialized.one_time_prekey_privates[key_id];
292 assert_eq!(original_key.to_bytes(), deserialized_key.to_bytes());
293 }
294
295 let (_prekey_bundle2, private_keys2) =
297 create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3)?;
298 assert_ne!(
299 private_keys.signed_prekey_private.to_bytes(),
300 private_keys2.signed_prekey_private.to_bytes(),
301 "Keys should be randomly generated and different"
302 );
303
304 Ok(())
305 }
306
307 fn create_test_keypair() -> KeyPair {
309 KeyPair::generate(&mut rand::thread_rng())
310 }
311
312 fn create_test_inbox() -> PqxdhInbox {
313 let keypair = Arc::new(create_test_keypair());
314 let (prekey_bundle, _) = create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3).unwrap();
315 PqxdhInbox::new(InboxType::Public, prekey_bundle, Some(1024), None)
316 }
317
318 fn create_test_message_full(content: Content, author: &KeyPair) -> MessageFull {
319 let timestamp = std::time::SystemTime::now()
320 .duration_since(std::time::UNIX_EPOCH)
321 .unwrap()
322 .as_secs();
323 let message = Message::new_v0(
324 content,
325 author.public_key(),
326 timestamp,
327 Kind::Regular,
328 vec![],
329 );
330 MessageFull::new(message, author).unwrap()
331 }
332
333 type TestPqxdhHandler = PqxdhProtocolHandler<MockMessagesManagerTrait>;
334
335 #[tokio::test]
336 async fn test_publish_service_success() {
337 let mut mock_manager = MockMessagesManagerTrait::new();
338 let keypair = Arc::new(create_test_keypair());
339
340 mock_manager.expect_publish().times(1).returning(|_| {
342 Ok(PublishResult::StoredNew {
343 global_stream_id: "123".to_string(),
344 })
345 });
346
347 let handler = TestPqxdhHandler::new(
348 Arc::new(mock_manager),
349 keypair.clone(),
350 PqxdhInboxProtocol::EchoService,
351 );
352
353 let result = handler.publish_service(false).await;
354 assert!(result.is_ok());
355
356 let state = handler.state.read().await;
358 assert!(state.inbox_tag.is_some());
359 assert!(state.private_keys.is_some());
360 assert!(state.inbox.is_some());
361 }
362
363 #[tokio::test]
365 async fn test_publish_service_already_published() {
366 let mock_manager = MockMessagesManagerTrait::new();
367 let keypair = Arc::new(create_test_keypair());
368
369 let handler = TestPqxdhHandler::new(
370 Arc::new(mock_manager),
371 keypair.clone(),
372 PqxdhInboxProtocol::EchoService,
373 );
374
375 SharedObservable::<_, AsyncLock>::update(
378 &handler.state,
379 |state: &mut PqxdhProtocolState| {
380 state.inbox_tag = Some(Tag::Channel {
381 id: vec![1, 2, 3],
382 relays: vec![],
383 });
384 },
385 )
386 .await;
387
388 let result = handler.publish_service(false).await;
389 assert!(matches!(result, Err(PqxdhError::InboxAlreadyPublished)));
390 }
391
392 #[tokio::test]
394 async fn test_publish_service_force_overwrite() {
395 let mut mock_manager = MockMessagesManagerTrait::new();
396 let keypair = Arc::new(create_test_keypair());
397
398 mock_manager.expect_publish().times(1).returning(|_| {
400 Ok(PublishResult::StoredNew {
401 global_stream_id: "123".to_string(),
402 })
403 });
404
405 let handler = TestPqxdhHandler::new(
406 Arc::new(mock_manager),
407 keypair.clone(),
408 PqxdhInboxProtocol::EchoService,
409 );
410
411 SharedObservable::<_, AsyncLock>::update(
414 &handler.state,
415 |state: &mut PqxdhProtocolState| {
416 state.inbox_tag = Some(Tag::Channel {
417 id: vec![1, 2, 3],
418 relays: vec![],
419 });
420 },
421 )
422 .await;
423
424 let result = handler.publish_service(true).await;
425 assert!(result.is_ok());
426 }
427
428 #[tokio::test]
430 async fn test_connect_to_service_success() {
431 let mut mock_manager = MockMessagesManagerTrait::new();
432 let client_keypair = Arc::new(create_test_keypair());
433 let service_keypair = create_test_keypair();
434 let service_key = service_keypair.public_key();
435
436 let test_inbox = create_test_inbox();
437
438 let inbox_message = create_test_message_full(
440 Content::Raw(postcard::to_stdvec(&test_inbox).unwrap()),
441 &service_keypair,
442 );
443 mock_manager
444 .expect_user_data()
445 .with(
446 eq(KeyId::from(*service_key.id())),
447 eq(StoreKey::PqxdhInbox(PqxdhInboxProtocol::EchoService)),
448 )
449 .times(1)
450 .returning(move |_, _| Ok(Some(inbox_message.clone())));
451
452 mock_manager.expect_publish().times(1).returning(|_| {
454 Ok(PublishResult::StoredNew {
455 global_stream_id: "456".to_string(),
456 })
457 });
458
459 mock_manager
461 .expect_catch_up_and_subscribe()
462 .times(1)
463 .returning(|_, _| Ok(Box::pin(stream::empty())));
464
465 let handler = TestPqxdhHandler::new(
466 Arc::new(mock_manager),
467 client_keypair.clone(),
468 PqxdhInboxProtocol::EchoService,
469 );
470
471 let initial_message = "Hello, service!".to_string();
472 let result = handler
473 .connect_to_service::<String, String>(&service_key, &initial_message)
474 .await;
475
476 assert!(result.is_ok());
477 let (session_id, _stream) = result.unwrap();
478
479 drop(_stream);
481
482 let state = handler.state.read().await;
484 assert!(state.sessions.contains_key(&KeyId::from_bytes(session_id)));
485 }
486
487 #[tokio::test]
489 async fn test_connect_to_service_inbox_not_found() {
490 let mut mock_manager = MockMessagesManagerTrait::new();
491 let client_keypair = Arc::new(create_test_keypair());
492 let service_keypair = create_test_keypair();
493 let service_key = service_keypair.public_key();
494
495 mock_manager
497 .expect_user_data()
498 .with(
499 eq(KeyId::from(*service_key.id())),
500 eq(StoreKey::PqxdhInbox(PqxdhInboxProtocol::EchoService)),
501 )
502 .times(1)
503 .returning(|_, _| Ok(None));
504
505 let handler = TestPqxdhHandler::new(
506 Arc::new(mock_manager),
507 client_keypair.clone(),
508 PqxdhInboxProtocol::EchoService,
509 );
510
511 let initial_message = "Hello, service!".to_string();
512 let result = handler
513 .connect_to_service::<String, String>(&service_key, &initial_message)
514 .await;
515
516 assert!(matches!(result, Err(PqxdhError::InboxNotFound)));
517 }
518
519 #[tokio::test]
521 async fn test_send_message_success() {
522 let mut mock_manager = MockMessagesManagerTrait::new();
523 let keypair = Arc::new(create_test_keypair());
524
525 mock_manager.expect_publish().times(1).returning(|_| {
527 Ok(PublishResult::StoredNew {
528 global_stream_id: "789".to_string(),
529 })
530 });
531
532 let handler = TestPqxdhHandler::new(
533 Arc::new(mock_manager),
534 keypair.clone(),
535 PqxdhInboxProtocol::EchoService,
536 );
537
538 let session_id: PqxdhSessionId = [42u8; 32];
540 let shared_secret = PqxdhSharedSecret {
541 shared_key: [1u8; 32],
542 consumed_one_time_key_ids: vec![],
543 };
544 let test_session = PqxdhSession::from_shared_secret(
545 shared_secret,
546 session_id,
547 [43u8; 32], keypair.public_key(),
549 );
550
551 SharedObservable::<_, AsyncLock>::update(
554 &handler.state,
555 |state: &mut PqxdhProtocolState| {
556 state
557 .sessions
558 .insert(KeyId::from_bytes(session_id), test_session);
559 },
560 )
561 .await;
562
563 let message = "Test message".to_string();
564 let result = handler.send_message(&session_id, &message).await;
565
566 assert!(result.is_ok());
567 }
568
569 #[tokio::test]
571 async fn test_send_message_session_not_found() {
572 let mock_manager = MockMessagesManagerTrait::new();
573 let keypair = Arc::new(create_test_keypair());
574
575 let handler = TestPqxdhHandler::new(
576 Arc::new(mock_manager),
577 keypair.clone(),
578 PqxdhInboxProtocol::EchoService,
579 );
580
581 let session_id: PqxdhSessionId = [42u8; 32];
582 let message = "Test message".to_string();
583 let result = handler.send_message(&session_id, &message).await;
584
585 assert!(matches!(result, Err(PqxdhError::SessionNotFound)));
586 }
587
588 #[tokio::test]
590 async fn test_inbox_stream_success() {
591 let mut mock_manager = MockMessagesManagerTrait::new();
592 let keypair = Arc::new(create_test_keypair());
593
594 mock_manager
596 .expect_ensure_contains_filter()
597 .times(1)
598 .returning(|_| Ok(()));
599
600 mock_manager
601 .expect_filtered_messages_stream()
602 .times(1)
603 .returning(|_| Box::pin(stream::empty()));
604
605 let handler = TestPqxdhHandler::new(
606 Arc::new(mock_manager),
607 keypair.clone(),
608 PqxdhInboxProtocol::EchoService,
609 );
610
611 SharedObservable::<_, AsyncLock>::update(
614 &handler.state,
615 |state: &mut PqxdhProtocolState| {
616 state.inbox_tag = Some(Tag::Channel {
617 id: vec![1, 2, 3],
618 relays: vec![],
619 });
620 let (_, private_keys) =
621 create_pqxdh_prekey_bundle_with_private_keys(&keypair, 3).unwrap();
622 state.private_keys = Some(private_keys);
623 state.inbox = Some(create_test_inbox());
624 },
625 )
626 .await;
627
628 let result = handler.inbox_stream::<String>().await;
629 assert!(result.is_ok());
630 }
631
632 #[tokio::test]
634 async fn test_inbox_stream_service_not_published() {
635 let mock_manager = MockMessagesManagerTrait::new();
636 let keypair = Arc::new(create_test_keypair());
637
638 let handler = TestPqxdhHandler::new(
639 Arc::new(mock_manager),
640 keypair.clone(),
641 PqxdhInboxProtocol::EchoService,
642 );
643
644 let result = handler.inbox_stream::<String>().await;
645 assert!(matches!(result, Err(PqxdhError::ServiceNotPublished)));
646 }
647
648 #[tokio::test]
650 async fn test_state_persistence() {
651 let mock_manager = Arc::new(MockMessagesManagerTrait::new());
652 let keypair = Arc::new(create_test_keypair());
653
654 let original_handler = TestPqxdhHandler::new(
656 mock_manager.clone(),
657 keypair.clone(),
658 PqxdhInboxProtocol::EchoService,
659 );
660
661 let session_id: PqxdhSessionId = [99u8; 32];
663 let shared_secret = PqxdhSharedSecret {
664 shared_key: [2u8; 32],
665 consumed_one_time_key_ids: vec!["test_key".to_string()],
666 };
667 let test_session = PqxdhSession::from_shared_secret(
668 shared_secret,
669 session_id,
670 [100u8; 32], keypair.public_key(),
672 );
673
674 SharedObservable::<_, AsyncLock>::update(
675 &original_handler.state,
676 |state: &mut PqxdhProtocolState| {
677 state
678 .sessions
679 .insert(KeyId::from_bytes(session_id), test_session);
680 state.inbox_tag = Some(Tag::Channel {
681 id: vec![4, 5, 6],
682 relays: vec![],
683 });
684 },
685 )
686 .await;
687
688 let serialized_state = {
690 let state = original_handler.state.read().await;
691 postcard::to_stdvec(&*state).unwrap()
692 };
693
694 let restored_state: PqxdhProtocolState = postcard::from_bytes(&serialized_state).unwrap();
696 let restored_handler =
697 TestPqxdhHandler::from_state(mock_manager.clone(), keypair.clone(), restored_state);
698
699 let restored_state = restored_handler.state.read().await;
701 assert!(
702 restored_state
703 .sessions
704 .contains_key(&KeyId::from_bytes(session_id))
705 );
706 assert_eq!(restored_state.protocol, PqxdhInboxProtocol::EchoService);
707 assert_eq!(
708 restored_state.inbox_tag,
709 Some(Tag::Channel {
710 id: vec![4, 5, 6],
711 relays: vec![]
712 })
713 );
714 }
715
716 #[tokio::test]
718 async fn test_error_handling_scenarios() {
719 let mut mock_manager = MockMessagesManagerTrait::new();
720 let keypair = Arc::new(create_test_keypair());
721
722 mock_manager
724 .expect_publish()
725 .times(1)
726 .returning(|_| Err(crate::ClientError::Generic("Network error".to_string())));
727
728 let handler = TestPqxdhHandler::new(
729 Arc::new(mock_manager),
730 keypair.clone(),
731 PqxdhInboxProtocol::EchoService,
732 );
733
734 let result = handler.publish_service(false).await;
735 assert!(matches!(result, Err(PqxdhError::MessagesService(_))));
736 }
737
738 #[tokio::test]
740 async fn test_multiple_session_management() {
741 let mock_manager = MockMessagesManagerTrait::new();
742 let keypair = Arc::new(create_test_keypair());
743
744 let handler = TestPqxdhHandler::new(
745 Arc::new(mock_manager),
746 keypair.clone(),
747 PqxdhInboxProtocol::EchoService,
748 );
749
750 let session_ids = [[1u8; 32], [2u8; 32], [3u8; 32]];
752
753 for (i, session_id) in session_ids.iter().enumerate() {
754 let shared_secret = PqxdhSharedSecret {
755 shared_key: [i as u8; 32],
756 consumed_one_time_key_ids: vec![format!("key_{}", i)],
757 };
758 let test_session = PqxdhSession::from_shared_secret(
759 shared_secret,
760 *session_id,
761 [(i + 10) as u8; 32], keypair.public_key(),
763 );
764
765 SharedObservable::<_, AsyncLock>::update(
766 &handler.state,
767 |state: &mut PqxdhProtocolState| {
768 state
769 .sessions
770 .insert(KeyId::from_bytes(*session_id), test_session);
771 },
772 )
773 .await;
774 }
775
776 let state = handler.state.read().await;
778 assert_eq!(state.sessions.len(), 3);
779 for session_id in &session_ids {
780 assert!(state.sessions.contains_key(&KeyId::from_bytes(*session_id)));
781 }
782 }
783
784 #[tokio::test]
786 async fn test_send_ephemeral_message() {
787 let mut mock_manager = MockMessagesManagerTrait::new();
788 let keypair = Arc::new(create_test_keypair());
789
790 mock_manager.expect_publish().times(1).returning(|_| {
792 Ok(PublishResult::StoredNew {
793 global_stream_id: "emp123".to_string(),
794 })
795 });
796
797 let handler = TestPqxdhHandler::new(
798 Arc::new(mock_manager),
799 keypair.clone(),
800 PqxdhInboxProtocol::EchoService,
801 );
802
803 let session_id: PqxdhSessionId = [55u8; 32];
805 let shared_secret = PqxdhSharedSecret {
806 shared_key: [3u8; 32],
807 consumed_one_time_key_ids: vec![],
808 };
809 let test_session = PqxdhSession::from_shared_secret(
810 shared_secret,
811 session_id,
812 [56u8; 32], keypair.public_key(),
814 );
815
816 SharedObservable::<_, AsyncLock>::update(
819 &handler.state,
820 |state: &mut PqxdhProtocolState| {
821 state
822 .sessions
823 .insert(KeyId::from_bytes(session_id), test_session);
824 },
825 )
826 .await;
827
828 let message = "Ephemeral message".to_string();
829 let result = handler
830 .send_ephemeral_message(&session_id, &message, 60)
831 .await;
832
833 assert!(result.is_ok());
834 }
835
836 #[tokio::test]
838 async fn test_listen_for_messages() {
839 let mut mock_manager = MockMessagesManagerTrait::new();
840 let keypair = Arc::new(create_test_keypair());
841
842 mock_manager
844 .expect_catch_up_and_subscribe()
845 .times(1)
846 .returning(|_, _| Ok(Box::pin(stream::empty())));
847
848 let handler = TestPqxdhHandler::new(
849 Arc::new(mock_manager),
850 keypair.clone(),
851 PqxdhInboxProtocol::EchoService,
852 );
853
854 let session_id: PqxdhSessionId = [77u8; 32];
856 let shared_secret = PqxdhSharedSecret {
857 shared_key: [4u8; 32],
858 consumed_one_time_key_ids: vec![],
859 };
860 let test_session = PqxdhSession::from_shared_secret(
861 shared_secret,
862 session_id,
863 [78u8; 32], keypair.public_key(),
865 );
866
867 SharedObservable::<_, AsyncLock>::update(
870 &handler.state,
871 |state: &mut PqxdhProtocolState| {
872 state
873 .sessions
874 .insert(KeyId::from_bytes(session_id), test_session);
875 state.inbox_tag = Some(Tag::Channel {
876 id: vec![1, 2, 3],
877 relays: vec![],
878 });
879 },
880 )
881 .await;
882
883 let result = handler
884 .listen_for_messages::<String>(session_id, true)
885 .await;
886 assert!(result.is_ok());
887 }
888
889 #[tokio::test]
891 async fn test_listen_for_messages_session_not_found() {
892 let mock_manager = MockMessagesManagerTrait::new();
893 let keypair = Arc::new(create_test_keypair());
894
895 let handler = TestPqxdhHandler::new(
896 Arc::new(mock_manager),
897 keypair.clone(),
898 PqxdhInboxProtocol::EchoService,
899 );
900
901 let session_id: PqxdhSessionId = [88u8; 32];
902 let result = handler
903 .listen_for_messages::<String>(session_id, true)
904 .await;
905
906 assert!(matches!(result, Err(PqxdhError::SessionNotFound)));
907 }
908}