1use crate::error::{ClientError, Result};
2use async_broadcast::Receiver;
3use async_trait::async_trait;
4use eyeball::AsyncLock;
5use futures::Stream;
6use std::{ops::Deref, sync::Arc};
7use tokio::{select, task::JoinHandle};
8use tracing::{debug, error, warn};
9use zoe_client_storage::{MessageStorage, SubscriptionState};
10use zoe_wire_protocol::{
11 CatchUpResponse, Filter, KeyId, MessageFull, PublishResult, StreamMessage, VerifyingKey,
12};
13
14use super::messages_manager::{
15 MessageEvent, MessagesManager, MessagesManagerBuilder, MessagesManagerTrait,
16};
17
18pub struct GenericMessagePersistenceManagerBuilder<T: MessagesManagerTrait> {
55 storage: Option<Arc<dyn MessageStorage>>,
56 relay_id: Option<KeyId>,
57 buffer_size: Option<usize>,
58 autosubscribe: bool,
59 _phantom: std::marker::PhantomData<T>,
60}
61
62pub type MessagePersistenceManagerBuilder =
64 GenericMessagePersistenceManagerBuilder<MessagesManager>;
65
66impl<T: MessagesManagerTrait> GenericMessagePersistenceManagerBuilder<T> {
67 pub fn new() -> Self {
69 Self {
70 storage: None,
71 relay_id: None,
72 buffer_size: None,
73 autosubscribe: false,
74 _phantom: std::marker::PhantomData,
75 }
76 }
77
78 pub fn storage(mut self, storage: Arc<dyn MessageStorage>) -> Self {
80 self.storage = Some(storage);
81 self
82 }
83
84 pub fn autosubscribe(mut self, autosubscribe: bool) -> Self {
86 self.autosubscribe = autosubscribe;
87 self
88 }
89
90 pub fn relay_id(mut self, relay_id: KeyId) -> Self {
92 self.relay_id = Some(relay_id);
93 self
94 }
95
96 pub fn relay_pubkey(mut self, relay_pubkey: VerifyingKey) -> Self {
98 self.relay_id = Some(relay_pubkey.id());
99 self
100 }
101
102 pub fn buffer_size(mut self, buffer_size: usize) -> Self {
104 self.buffer_size = Some(buffer_size);
105 self
106 }
107
108 pub async fn build_with_messages_manager(
109 self,
110 messages_manager: Arc<T>,
111 ) -> Result<GenericMessagePersistenceManager<T>> {
112 let storage = self
113 .storage
114 .ok_or_else(|| ClientError::Generic("Storage is required".to_string()))?;
115
116 let relay_id = self
117 .relay_id
118 .ok_or_else(|| ClientError::Generic("Relay ID is required".to_string()))?;
119
120 let manager =
122 GenericMessagePersistenceManager::new(storage, messages_manager, relay_id).await?;
123
124 Ok(manager)
125 }
126}
127
128impl MessagePersistenceManagerBuilder {
130 pub async fn build_with_messages_manager_configuration<F>(
141 self,
142 connection: &quinn::Connection,
143 configure: F,
144 ) -> Result<MessagePersistenceManager>
145 where
146 F: FnOnce(MessagesManagerBuilder) -> MessagesManagerBuilder,
147 {
148 let storage = self
149 .storage
150 .as_ref()
151 .ok_or_else(|| ClientError::Generic("Storage is required".to_string()))?;
152
153 let subscription_state = if let Some(relay_id) = &self.relay_id {
155 MessagePersistenceManager::load_subscription_state(&**storage, relay_id)
156 .await?
157 .unwrap_or_default()
158 } else {
159 SubscriptionState::default()
160 };
161
162 let messages_manager = Arc::new(
164 configure(
165 MessagesManagerBuilder::new()
166 .state(subscription_state)
167 .buffer_size(self.buffer_size.unwrap_or(1000))
168 .autosubscribe(self.autosubscribe),
169 )
170 .build(connection)
171 .await?,
172 );
173
174 self.build_with_messages_manager(messages_manager).await
175 }
176
177 pub async fn build(self, connection: &quinn::Connection) -> Result<MessagePersistenceManager> {
188 self.build_with_messages_manager_configuration(connection, |builder| builder)
189 .await
190 }
191}
192
193impl Default for MessagePersistenceManagerBuilder {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[derive(Debug)]
200struct AbortOnDrop<T>(JoinHandle<T>);
201
202impl<T> Drop for AbortOnDrop<T> {
203 fn drop(&mut self) {
204 self.0.abort();
205 }
206}
207
208#[derive(Debug, Clone)]
219pub struct GenericMessagePersistenceManager<T: MessagesManagerTrait> {
220 messages_manager: Arc<T>,
222 persistence_task: Arc<AbortOnDrop<Result<()>>>,
224}
225
226pub type MessagePersistenceManager = GenericMessagePersistenceManager<MessagesManager>;
228
229impl<T: MessagesManagerTrait> GenericMessagePersistenceManager<T> {
230 pub fn messages_manager(&self) -> &Arc<T> {
235 &self.messages_manager
236 }
237
238 pub async fn load_subscription_state(
240 storage: &dyn MessageStorage,
241 relay_id: &KeyId,
242 ) -> Result<Option<SubscriptionState>> {
243 storage
244 .get_subscription_state(relay_id)
245 .await
246 .map_err(|e| ClientError::Generic(format!("Failed to load subscription state: {e}")))
247 }
248
249 pub async fn load_all_subscription_states(
251 storage: &dyn MessageStorage,
252 ) -> Result<std::collections::HashMap<KeyId, SubscriptionState>> {
253 storage.get_all_subscription_states().await.map_err(|e| {
254 ClientError::Generic(format!("Failed to load all subscription states: {e}"))
255 })
256 }
257
258 async fn new(
268 storage: Arc<dyn MessageStorage>,
269 messages_manager: Arc<T>,
270 relay_id: KeyId,
271 ) -> Result<Self> {
272 let mut events_stream = messages_manager.message_events_stream();
274 let mut state_updates = messages_manager.get_subscription_state_updates().await;
275 let storage_clone = storage.clone();
276
277 let persistence_task = tokio::spawn(async move {
279 debug!("MessagePersistenceManager started");
281
282 loop {
284 select! {
285 event_result = events_stream.recv() => {
286 let event = match event_result {
287 Ok(event) => event,
288 Err(e) => {
289 warn!("Message events stream unexpectedly ended: {e}");
290 break;
291 }
292 };
293 if let Err(e) =
294 Self::handle_message_event(&*storage_clone, &event, &relay_id).await
295 {
296 warn!("Failed to handle message event {:?}: {}", event, e);
297 }
299 }
300 state = state_updates.next() => {
301 let Some(state) = state else {
302 continue;
304 };
305 debug!("Subscription state updated: {:?}", state);
306 if let Err(e) = storage_clone.store_subscription_state(&relay_id, &state).await {
307 error!(error=?e, "Failed to store subscription state");
308 }
310 }
311 }
312 }
313 Ok(())
314 });
315
316 Ok(Self {
317 messages_manager,
318 persistence_task: Arc::new(AbortOnDrop(persistence_task)),
319 })
320 }
321
322 async fn handle_message_event(
324 storage: &dyn MessageStorage,
325 event: &MessageEvent,
326 relay_id: &KeyId,
327 ) -> Result<()> {
328 match event {
329 MessageEvent::MessageReceived {
330 message,
331 stream_height,
332 } => {
333 debug!(
334 "Persisting received message: {}",
335 hex::encode(message.id().as_bytes())
336 );
337 storage.store_message(message).await.map_err(|e| {
338 ClientError::Generic(format!("Failed to store received message: {e}"))
339 })?;
340
341 storage
343 .mark_message_synced(message.id(), relay_id, stream_height)
344 .await
345 .map_err(|e| {
346 ClientError::Generic(format!("Failed to mark message as synced: {e}"))
347 })?;
348 }
349 MessageEvent::MessageSent { message, .. } => {
350 debug!(
351 "Persisting sent message: {}",
352 hex::encode(message.id().as_bytes())
353 );
354 storage.store_message(message).await.map_err(|e| {
355 ClientError::Generic(format!("Failed to store sent message: {e}"))
356 })?;
357 }
358 MessageEvent::CatchUpMessage {
359 message,
360 request_id,
361 } => {
362 debug!(
363 "Persisting catch-up message (request {}): {}",
364 request_id,
365 hex::encode(message.id().as_bytes())
366 );
367 storage.store_message(message).await.map_err(|e| {
368 ClientError::Generic(format!("Failed to store catch-up message: {e}"))
369 })?;
370 }
371 MessageEvent::StreamHeightUpdate { .. } => {
372 }
375 MessageEvent::CatchUpCompleted { request_id } => {
376 debug!("Catch-up completed for request {}", request_id);
377 }
379 }
380
381 Ok(())
382 }
383
384 pub fn is_running(&self) -> bool {
386 !self.persistence_task.0.is_finished()
387 }
388}
389
390impl MessagePersistenceManager {
392 pub fn builder() -> MessagePersistenceManagerBuilder {
394 MessagePersistenceManagerBuilder::new()
395 }
396
397 pub fn inner_messages_manager(&self) -> &MessagesManager {
403 &self.messages_manager
404 }
405}
406
407impl Deref for MessagePersistenceManager {
408 type Target = MessagesManager;
409
410 fn deref(&self) -> &Self::Target {
411 &self.messages_manager
412 }
413}
414
415#[async_trait]
418impl<T: MessagesManagerTrait + 'static> MessagesManagerTrait
419 for GenericMessagePersistenceManager<T>
420{
421 fn message_events_stream(&self) -> Receiver<MessageEvent> {
422 self.messages_manager.message_events_stream()
423 }
424
425 async fn get_subscription_state_updates(
426 &self,
427 ) -> eyeball::Subscriber<SubscriptionState, AsyncLock> {
428 self.messages_manager.get_subscription_state_updates().await
429 }
430
431 async fn subscribe(&self) -> Result<()> {
432 self.messages_manager.subscribe().await
433 }
434
435 async fn publish(&self, message: MessageFull) -> Result<PublishResult> {
436 self.messages_manager.publish(message).await
437 }
438
439 async fn ensure_contains_filter(&self, filter: Filter) -> Result<()> {
440 self.messages_manager.ensure_contains_filter(filter).await
441 }
442
443 fn messages_stream(&self) -> Receiver<StreamMessage> {
444 self.messages_manager.messages_stream()
445 }
446
447 fn catch_up_stream(&self) -> Receiver<CatchUpResponse> {
448 self.messages_manager.catch_up_stream()
449 }
450
451 fn filtered_messages_stream(
452 &self,
453 filter: Filter,
454 ) -> std::pin::Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>> {
455 self.messages_manager.filtered_messages_stream(filter)
456 }
457
458 async fn catch_up_and_subscribe(
459 &self,
460 filter: Filter,
461 since: Option<String>,
462 ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>> {
463 self.messages_manager
464 .catch_up_and_subscribe(filter, since)
465 .await
466 }
467
468 async fn user_data(
469 &self,
470 author: zoe_wire_protocol::KeyId,
471 storage_key: zoe_wire_protocol::StoreKey,
472 ) -> Result<Option<MessageFull>> {
473 self.messages_manager.user_data(author, storage_key).await
474 }
475
476 async fn check_messages(
477 &self,
478 message_ids: Vec<zoe_wire_protocol::MessageId>,
479 ) -> Result<Vec<Option<String>>> {
480 self.messages_manager.check_messages(message_ids).await
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use mockall::predicate::*;
488 use rand::rngs::OsRng;
489 use std::collections::HashMap;
490
491 use crate::services::messages_manager::MockMessagesManagerTrait;
492 use zoe_client_storage::{StorageError, storage::MockMessageStorage};
493 use zoe_wire_protocol::{Content, KeyPair, Kind, MessageFilters, MessageFull, PublishResult};
494
495 fn create_test_message(content: &str) -> MessageFull {
496 let keypair = KeyPair::generate(&mut OsRng);
497 let content_obj = Content::Raw(content.as_bytes().to_vec());
498 let message = zoe_wire_protocol::Message::new_v0(
499 content_obj,
500 keypair.public_key(),
501 std::time::SystemTime::now()
502 .duration_since(std::time::UNIX_EPOCH)
503 .unwrap()
504 .as_secs(),
505 Kind::Regular,
506 vec![],
507 );
508 MessageFull::new(message, &keypair).expect("Failed to create MessageFull")
509 }
510
511 #[tokio::test]
512 async fn test_builder_defaults() {
513 let builder = MessagePersistenceManagerBuilder::new();
514 assert!(builder.storage.is_none());
515 assert!(builder.relay_id.is_none());
516 assert!(builder.buffer_size.is_none());
517 assert!(!builder.autosubscribe);
518 }
519
520 #[tokio::test]
521 async fn test_builder_relay_pubkey_conversion() {
522 let keypair = KeyPair::generate(&mut OsRng);
523 let pubkey = keypair.public_key();
524 let expected_key_id = pubkey.id();
525
526 let builder = MessagePersistenceManagerBuilder::new().relay_pubkey(pubkey);
527
528 assert_eq!(builder.relay_id, Some(expected_key_id));
529 }
530
531 #[tokio::test]
532 async fn test_builder_configuration() {
533 let relay_id = KeyId::from_bytes([1u8; 32]);
534 let buffer_size = 2000;
535
536 let builder = MessagePersistenceManagerBuilder::new()
537 .relay_id(relay_id)
538 .buffer_size(buffer_size)
539 .autosubscribe(true);
540
541 assert_eq!(builder.relay_id, Some(relay_id));
542 assert_eq!(builder.buffer_size, Some(buffer_size));
543 assert!(builder.autosubscribe);
544 }
545
546 #[tokio::test]
547 async fn test_message_received_persistence() {
548 let mut mock_storage = MockMessageStorage::new();
549 let message = create_test_message("Test message");
550 let relay_id = KeyId::from_bytes([1u8; 32]);
551 let stream_height = "100".to_string();
552
553 mock_storage
555 .expect_store_message()
556 .with(eq(message.clone()))
557 .times(1)
558 .returning(|_| Ok(()));
559
560 mock_storage
561 .expect_mark_message_synced()
562 .with(eq(*message.id()), eq(relay_id), eq("100"))
563 .times(1)
564 .returning(|_, _, _| Ok(()));
565
566 let event = MessageEvent::MessageReceived {
568 message: message.clone(),
569 stream_height: stream_height.clone(),
570 };
571
572 let result =
574 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
575
576 assert!(result.is_ok());
577 }
578
579 #[tokio::test]
580 async fn test_message_sent_persistence() {
581 let mut mock_storage = MockMessageStorage::new();
582 let relay_id = KeyId::from_bytes([1u8; 32]);
583 let message = create_test_message("Sent message");
584 let publish_result = PublishResult::StoredNew {
585 global_stream_id: "200".to_string(),
586 };
587
588 mock_storage
590 .expect_store_message()
591 .with(eq(message.clone()))
592 .times(1)
593 .returning(|_| Ok(()));
594
595 let event = MessageEvent::MessageSent {
597 message: message.clone(),
598 publish_result,
599 };
600
601 let result =
603 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
604
605 assert!(result.is_ok());
606 }
607
608 #[tokio::test]
609 async fn test_catch_up_message_persistence() {
610 let mut mock_storage = MockMessageStorage::new();
611 let relay_id = KeyId::from_bytes([1u8; 32]);
612 let message = create_test_message("Catch-up message");
613 let request_id = 42;
614
615 mock_storage
617 .expect_store_message()
618 .with(eq(message.clone()))
619 .times(1)
620 .returning(|_| Ok(()));
621
622 let event = MessageEvent::CatchUpMessage {
624 message: message.clone(),
625 request_id,
626 };
627
628 let result =
630 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
631
632 assert!(result.is_ok());
633 }
634
635 #[tokio::test]
636 async fn test_stream_height_update_no_persistence() {
637 let mock_storage = MockMessageStorage::new();
638 let relay_id = KeyId::from_bytes([1u8; 32]);
639
640 let event = MessageEvent::StreamHeightUpdate {
642 height: "300".to_string(),
643 };
644
645 let result =
647 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
648
649 assert!(result.is_ok());
650 }
651
652 #[tokio::test]
653 async fn test_catch_up_completed_no_persistence() {
654 let mock_storage = MockMessageStorage::new();
655 let relay_id = KeyId::from_bytes([1u8; 32]);
656
657 let event = MessageEvent::CatchUpCompleted { request_id: 123 };
659
660 let result =
662 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
663
664 assert!(result.is_ok());
665 }
666
667 #[tokio::test]
668 async fn test_storage_error_handling() {
669 let mut mock_storage = MockMessageStorage::new();
670 let relay_id = KeyId::from_bytes([1u8; 32]);
671 let message = create_test_message("Error test message");
672
673 mock_storage
675 .expect_store_message()
676 .with(eq(message.clone()))
677 .times(1)
678 .returning(|_| Err(StorageError::Internal("Test error".to_string())));
679
680 let event = MessageEvent::MessageSent {
682 message: message.clone(),
683 publish_result: PublishResult::StoredNew {
684 global_stream_id: "400".to_string(),
685 },
686 };
687
688 let result =
690 MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
691
692 assert!(result.is_err());
693 assert!(
694 result
695 .unwrap_err()
696 .to_string()
697 .contains("Failed to store sent message")
698 );
699 }
700
701 #[tokio::test]
702 async fn test_load_subscription_state() {
703 let mut mock_storage = MockMessageStorage::new();
704 let relay_id = KeyId::from_bytes([4u8; 32]);
705 let expected_state = SubscriptionState {
706 latest_stream_height: Some("500".to_string()),
707 current_filters: MessageFilters {
708 filters: Some(vec![]),
709 },
710 };
711
712 mock_storage
714 .expect_get_subscription_state()
715 .with(eq(relay_id))
716 .times(1)
717 .returning(move |_| Ok(Some(expected_state.clone())));
718
719 let result =
721 MessagePersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
722
723 assert!(result.is_ok());
724 let loaded_state = result.unwrap();
725 assert!(loaded_state.is_some());
726 assert_eq!(
727 loaded_state.unwrap().latest_stream_height,
728 Some("500".to_string())
729 );
730 }
731
732 #[tokio::test]
733 async fn test_load_all_subscription_states() {
734 let mut mock_storage = MockMessageStorage::new();
735 let relay_id1 = KeyId::from_bytes([5u8; 32]);
736 let relay_id2 = KeyId::from_bytes([6u8; 32]);
737
738 let mut expected_states = HashMap::new();
739 expected_states.insert(
740 relay_id1,
741 SubscriptionState {
742 latest_stream_height: Some("600".to_string()),
743 current_filters: MessageFilters {
744 filters: Some(vec![]),
745 },
746 },
747 );
748 expected_states.insert(
749 relay_id2,
750 SubscriptionState {
751 latest_stream_height: Some("700".to_string()),
752 current_filters: MessageFilters {
753 filters: Some(vec![]),
754 },
755 },
756 );
757
758 mock_storage
760 .expect_get_all_subscription_states()
761 .times(1)
762 .returning(move || Ok(expected_states.clone()));
763
764 let result = MessagePersistenceManager::load_all_subscription_states(&mock_storage).await;
766
767 assert!(result.is_ok());
768 let loaded_states = result.unwrap();
769 assert_eq!(loaded_states.len(), 2);
770 assert!(loaded_states.contains_key(&relay_id1));
771 assert!(loaded_states.contains_key(&relay_id2));
772 }
773
774 #[tokio::test]
775 async fn test_subscription_state_helpers() {
776 let state = SubscriptionState::default();
778 assert!(state.latest_stream_height.is_none());
779 assert!(state.current_filters.filters.is_none());
780
781 let filters = MessageFilters {
783 filters: Some(vec![]),
784 };
785 let state = SubscriptionState::with_filters(filters.clone());
786 assert_eq!(state.current_filters, filters);
787 assert!(state.latest_stream_height.is_none());
788 }
789
790 #[tokio::test]
791 async fn test_subscription_state_operations() {
792 let mut state = SubscriptionState::default();
793
794 let filters = MessageFilters {
796 filters: Some(vec![]),
797 };
798 state.current_filters = filters.clone();
799 assert_eq!(state.current_filters, filters);
800
801 state.latest_stream_height = Some("500".to_string());
803 assert_eq!(state.latest_stream_height, Some("500".to_string()));
804
805 assert!(!state.has_active_filters()); let state_with_filters = SubscriptionState {
810 latest_stream_height: Some("600".to_string()),
811 current_filters: MessageFilters {
812 filters: Some(vec![]),
813 },
814 };
815 assert!(!state_with_filters.has_active_filters()); }
817
818 #[tokio::test]
819 async fn test_error_handling_scenarios() {
820 let zero_relay_id = KeyId::from_bytes([0u8; 32]);
824 assert_eq!(zero_relay_id, KeyId::from_bytes([0u8; 32]));
825
826 let max_relay_id = KeyId::from_bytes([255u8; 32]);
828 assert_eq!(max_relay_id, KeyId::from_bytes([255u8; 32]));
829
830 let empty_state = SubscriptionState::default();
832 assert!(empty_state.latest_stream_height.is_none());
833 assert!(!empty_state.has_active_filters());
834 }
835
836 #[cfg(test)]
841 mod wiring_tests {
842 use zoe_client_storage::StorageError;
843
844 use super::*;
845
846 type TestPersistenceManager = GenericMessagePersistenceManager<MockMessagesManagerTrait>;
847 type TestPersistenceManagerBuilder =
848 GenericMessagePersistenceManagerBuilder<MockMessagesManagerTrait>;
849
850 fn create_mock_message_events_stream() -> Receiver<MessageEvent> {
851 let (sender, receiver) = async_broadcast::broadcast(10);
854 std::mem::forget(sender); receiver
856 }
857
858 #[tokio::test]
859 async fn test_generic_builder_with_mock_messages_manager() {
860 let mut mock_storage = MockMessageStorage::new();
861 let mut mock_messages_manager = MockMessagesManagerTrait::new();
862 let relay_id = KeyId::from_bytes([1u8; 32]);
863
864 mock_storage.expect_store_message().returning(|_| Ok(()));
866 mock_storage
867 .expect_mark_message_synced()
868 .returning(|_, _, _| Ok(()));
869
870 mock_messages_manager
872 .expect_message_events_stream()
873 .times(1)
874 .returning(create_mock_message_events_stream);
875
876 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
878 SubscriptionState::default(),
879 );
880 let subscriber = shared.subscribe().await;
881 mock_messages_manager
882 .expect_get_subscription_state_updates()
883 .times(1)
884 .returning(move || subscriber.clone());
885
886 let persistence_manager = TestPersistenceManagerBuilder::new()
888 .storage(Arc::new(mock_storage))
889 .relay_id(relay_id)
890 .build_with_messages_manager(Arc::new(mock_messages_manager))
891 .await;
892
893 assert!(persistence_manager.is_ok());
894 let manager = persistence_manager.unwrap();
895
896 let messages_manager_ref = manager.messages_manager();
898 assert!(Arc::strong_count(messages_manager_ref) > 0);
900
901 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
903 }
904
905 #[tokio::test]
906 async fn test_subscription_state_wiring_with_mock() {
907 let mut mock_storage = MockMessageStorage::new();
908 let mut mock_messages_manager = MockMessagesManagerTrait::new();
909 let relay_id = KeyId::from_bytes([2u8; 32]);
910
911 let initial_state = SubscriptionState {
913 latest_stream_height: Some("50".to_string()),
914 current_filters: MessageFilters {
915 filters: Some(vec![]),
916 },
917 };
918
919 mock_storage
920 .expect_get_subscription_state()
921 .with(eq(relay_id))
922 .times(1)
923 .returning(move |_| Ok(Some(initial_state.clone())));
924
925 mock_storage
926 .expect_store_subscription_state()
927 .returning(|_, _| Ok(()));
928
929 mock_messages_manager
930 .expect_message_events_stream()
931 .times(1)
932 .returning(create_mock_message_events_stream);
933
934 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
936 SubscriptionState::default(),
937 );
938 let subscriber = shared.subscribe().await;
939 mock_messages_manager
940 .expect_get_subscription_state_updates()
941 .times(1)
942 .returning(move || subscriber.clone());
943
944 let loaded_state =
946 TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
947
948 assert!(loaded_state.is_ok());
949 let state = loaded_state.unwrap();
950 assert!(state.is_some());
951 assert_eq!(state.unwrap().latest_stream_height, Some("50".to_string()));
952
953 let persistence_manager = TestPersistenceManagerBuilder::new()
955 .storage(Arc::new(mock_storage))
956 .relay_id(relay_id)
957 .build_with_messages_manager(Arc::new(mock_messages_manager))
958 .await;
959
960 assert!(persistence_manager.is_ok());
961 }
962
963 #[tokio::test]
964 async fn test_message_sync_tracking_with_mock() {
965 let mut mock_storage = MockMessageStorage::new();
966 let mut mock_messages_manager = MockMessagesManagerTrait::new();
967 let relay_id = KeyId::from_bytes([3u8; 32]);
968 let test_message = create_test_message("sync test message");
969 let message_id = *test_message.id();
970
971 mock_storage
973 .expect_store_message()
974 .with(eq(test_message.clone()))
975 .times(1)
976 .returning(|_| Ok(()));
977
978 mock_storage
979 .expect_mark_message_synced()
980 .with(eq(message_id), eq(relay_id), eq("150"))
981 .times(1)
982 .returning(|_, _, _| Ok(()));
983
984 mock_messages_manager
985 .expect_message_events_stream()
986 .times(1)
987 .returning(create_mock_message_events_stream);
988
989 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
991 SubscriptionState::default(),
992 );
993 let subscriber = shared.subscribe().await;
994 mock_messages_manager
995 .expect_get_subscription_state_updates()
996 .times(1)
997 .returning(move || subscriber.clone());
998
999 let persistence_manager = TestPersistenceManagerBuilder::new()
1001 .storage(Arc::new(mock_storage))
1002 .relay_id(relay_id)
1003 .build_with_messages_manager(Arc::new(mock_messages_manager))
1004 .await;
1005
1006 assert!(persistence_manager.is_ok());
1007
1008 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1010 }
1011
1012 #[tokio::test]
1013 async fn test_multiple_relay_sync_status() {
1014 let mut mock_storage = MockMessageStorage::new();
1015 let relay_id1 = KeyId::from_bytes([4u8; 32]);
1016 let relay_id2 = KeyId::from_bytes([5u8; 32]);
1017
1018 let mut all_states = HashMap::new();
1020 all_states.insert(
1021 relay_id1,
1022 SubscriptionState {
1023 latest_stream_height: Some("100".to_string()),
1024 current_filters: MessageFilters {
1025 filters: Some(vec![]),
1026 },
1027 },
1028 );
1029 all_states.insert(
1030 relay_id2,
1031 SubscriptionState {
1032 latest_stream_height: Some("200".to_string()),
1033 current_filters: MessageFilters {
1034 filters: Some(vec![]),
1035 },
1036 },
1037 );
1038
1039 mock_storage
1040 .expect_get_all_subscription_states()
1041 .times(1)
1042 .returning(move || Ok(all_states.clone()));
1043
1044 let result = TestPersistenceManager::load_all_subscription_states(&mock_storage).await;
1046
1047 assert!(result.is_ok());
1048 let loaded_states = result.unwrap();
1049 assert_eq!(loaded_states.len(), 2);
1050 assert!(loaded_states.contains_key(&relay_id1));
1051 assert!(loaded_states.contains_key(&relay_id2));
1052 assert_eq!(
1053 loaded_states.get(&relay_id1).unwrap().latest_stream_height,
1054 Some("100".to_string())
1055 );
1056 assert_eq!(
1057 loaded_states.get(&relay_id2).unwrap().latest_stream_height,
1058 Some("200".to_string())
1059 );
1060 }
1061
1062 #[tokio::test]
1063 async fn test_builder_error_handling_with_mock() {
1064 let mock_messages_manager = MockMessagesManagerTrait::new();
1065
1066 let result = TestPersistenceManagerBuilder::new()
1068 .relay_id(KeyId::from_bytes([6u8; 32]))
1069 .build_with_messages_manager(Arc::new(mock_messages_manager))
1070 .await;
1071
1072 assert!(result.is_err());
1073 assert!(
1074 result
1075 .unwrap_err()
1076 .to_string()
1077 .contains("Storage is required")
1078 );
1079 }
1080
1081 #[tokio::test]
1082 async fn test_persistence_manager_deref_with_mock() {
1083 let mock_storage = MockMessageStorage::new();
1084 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1085
1086 mock_messages_manager
1088 .expect_message_events_stream()
1089 .times(1)
1090 .returning(create_mock_message_events_stream);
1091
1092 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1094 SubscriptionState::default(),
1095 );
1096 let subscriber = shared.subscribe().await;
1097 mock_messages_manager
1098 .expect_get_subscription_state_updates()
1099 .times(1)
1100 .returning(move || subscriber.clone());
1101
1102 let relay_id = KeyId::from_bytes([10u8; 32]);
1104 let persistence_manager = TestPersistenceManagerBuilder::new()
1105 .storage(Arc::new(mock_storage))
1106 .relay_id(relay_id)
1107 .build_with_messages_manager(Arc::new(mock_messages_manager))
1108 .await
1109 .unwrap();
1110
1111 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1113
1114 let messages_manager_ref = persistence_manager.messages_manager();
1116 assert!(Arc::strong_count(messages_manager_ref) > 0);
1118 }
1119
1120 #[tokio::test]
1121 async fn test_background_task_error_resilience() {
1122 let mut mock_storage = MockMessageStorage::new();
1123 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1124 let relay_id = KeyId::from_bytes([7u8; 32]);
1125
1126 let mut call_count = 0;
1128 mock_storage.expect_store_message().returning(move |_| {
1129 call_count += 1;
1130 if call_count == 1 {
1131 Err(StorageError::Internal("Simulated failure".to_string()))
1132 } else {
1133 Ok(())
1134 }
1135 });
1136
1137 mock_storage
1138 .expect_mark_message_synced()
1139 .returning(|_, _, _| Ok(()));
1140
1141 mock_messages_manager
1142 .expect_message_events_stream()
1143 .times(1)
1144 .returning(create_mock_message_events_stream);
1145
1146 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1148 SubscriptionState::default(),
1149 );
1150 let subscriber = shared.subscribe().await;
1151 mock_messages_manager
1152 .expect_get_subscription_state_updates()
1153 .times(1)
1154 .returning(move || subscriber.clone());
1155
1156 let persistence_manager = TestPersistenceManagerBuilder::new()
1158 .storage(Arc::new(mock_storage))
1159 .relay_id(relay_id)
1160 .build_with_messages_manager(Arc::new(mock_messages_manager))
1161 .await;
1162
1163 assert!(persistence_manager.is_ok());
1164
1165 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1167
1168 }
1171
1172 #[tokio::test]
1173 async fn test_stream_height_update_persistence() {
1174 let mut mock_storage = MockMessageStorage::new();
1175 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1176 let relay_id = KeyId::from_bytes([9u8; 32]);
1177
1178 let initial_state = SubscriptionState {
1180 latest_stream_height: Some("100".to_string()),
1181 current_filters: MessageFilters {
1182 filters: Some(vec![]),
1183 },
1184 };
1185 let shared_observable =
1186 eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1187 let subscriber = shared_observable.subscribe().await;
1188
1189 mock_storage
1191 .expect_store_subscription_state()
1192 .with(
1193 eq(relay_id),
1194 function(|state: &SubscriptionState| {
1195 state.latest_stream_height == Some("150".to_string())
1196 }),
1197 )
1198 .times(1)
1199 .returning(|_, _| Ok(()));
1200
1201 mock_messages_manager
1203 .expect_message_events_stream()
1204 .times(1)
1205 .returning(create_mock_message_events_stream);
1206
1207 let subscriber_clone = subscriber.clone();
1209 mock_messages_manager
1210 .expect_get_subscription_state_updates()
1211 .times(1)
1212 .returning(move || subscriber_clone.clone());
1213
1214 let persistence_manager = TestPersistenceManagerBuilder::new()
1216 .storage(Arc::new(mock_storage))
1217 .relay_id(relay_id)
1218 .build_with_messages_manager(Arc::new(mock_messages_manager))
1219 .await;
1220
1221 assert!(persistence_manager.is_ok());
1222
1223 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1225
1226 shared_observable
1228 .set(SubscriptionState {
1229 latest_stream_height: Some("150".to_string()),
1230 current_filters: MessageFilters {
1231 filters: Some(vec![]),
1232 },
1233 })
1234 .await;
1235
1236 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1238
1239 }
1242
1243 #[tokio::test]
1244 async fn test_stream_event_filtering_and_processing() {
1245 let mut mock_storage = MockMessageStorage::new();
1246 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1247 let relay_id = KeyId::from_bytes([8u8; 32]);
1248
1249 mock_storage
1251 .expect_store_message()
1252 .times(2) .returning(|_| Ok(()));
1254
1255 mock_storage
1256 .expect_mark_message_synced()
1257 .times(1) .returning(|_, _, _| Ok(()));
1259
1260 mock_messages_manager
1261 .expect_message_events_stream()
1262 .times(1)
1263 .returning(create_mock_message_events_stream);
1264
1265 let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1267 SubscriptionState::default(),
1268 );
1269 let subscriber = shared.subscribe().await;
1270 mock_messages_manager
1271 .expect_get_subscription_state_updates()
1272 .times(1)
1273 .returning(move || subscriber.clone());
1274
1275 let persistence_manager = TestPersistenceManagerBuilder::new()
1277 .storage(Arc::new(mock_storage))
1278 .relay_id(relay_id)
1279 .build_with_messages_manager(Arc::new(mock_messages_manager))
1280 .await;
1281
1282 assert!(persistence_manager.is_ok());
1283
1284 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1286
1287 }
1290
1291 #[tokio::test]
1292 async fn test_subscription_state_persistence_on_updates() {
1293 let mut mock_storage = MockMessageStorage::new();
1294 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1295 let relay_id = KeyId::from_bytes([11u8; 32]);
1296
1297 let initial_state = SubscriptionState {
1299 latest_stream_height: Some("100".to_string()),
1300 current_filters: MessageFilters {
1301 filters: Some(vec![]),
1302 },
1303 };
1304 let shared_observable =
1305 eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1306 let subscriber = shared_observable.subscribe().await;
1307
1308 mock_storage
1311 .expect_store_subscription_state()
1312 .with(eq(relay_id), always())
1313 .times(0..=2)
1314 .returning(|_, _| Ok(()));
1315
1316 mock_messages_manager
1318 .expect_message_events_stream()
1319 .times(1)
1320 .returning(create_mock_message_events_stream);
1321
1322 let subscriber_clone = subscriber.clone();
1324 mock_messages_manager
1325 .expect_get_subscription_state_updates()
1326 .times(1)
1327 .returning(move || subscriber_clone.clone());
1328
1329 let persistence_manager = TestPersistenceManagerBuilder::new()
1331 .storage(Arc::new(mock_storage))
1332 .relay_id(relay_id)
1333 .build_with_messages_manager(Arc::new(mock_messages_manager))
1334 .await;
1335
1336 assert!(persistence_manager.is_ok());
1337
1338 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1340
1341 shared_observable
1343 .set(SubscriptionState {
1344 latest_stream_height: Some("150".to_string()),
1345 current_filters: MessageFilters {
1346 filters: Some(vec![]),
1347 },
1348 })
1349 .await;
1350
1351 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1352
1353 shared_observable
1354 .set(SubscriptionState {
1355 latest_stream_height: Some("200".to_string()),
1356 current_filters: MessageFilters {
1357 filters: Some(vec![]),
1358 },
1359 })
1360 .await;
1361
1362 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1364
1365 }
1367
1368 #[tokio::test]
1369 async fn test_subscription_state_loading_on_startup() {
1370 let mut mock_storage = MockMessageStorage::new();
1371 let relay_id = KeyId::from_bytes([12u8; 32]);
1372
1373 let stored_state = SubscriptionState {
1375 latest_stream_height: Some("500".to_string()),
1376 current_filters: MessageFilters {
1377 filters: Some(vec![]),
1378 },
1379 };
1380
1381 mock_storage
1382 .expect_get_subscription_state()
1383 .with(eq(relay_id))
1384 .times(1)
1385 .returning(move |_| Ok(Some(stored_state.clone())));
1386
1387 let loaded_state =
1389 TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
1390
1391 assert!(loaded_state.is_ok());
1392 let state = loaded_state.unwrap();
1393 assert!(state.is_some());
1394 let state = state.unwrap();
1395 assert_eq!(state.latest_stream_height, Some("500".to_string()));
1396 assert!(state.current_filters.filters.is_some());
1397 }
1398
1399 #[tokio::test]
1400 async fn test_subscription_state_error_handling() {
1401 let mut mock_storage = MockMessageStorage::new();
1402 let relay_id = KeyId::from_bytes([13u8; 32]);
1403
1404 mock_storage
1406 .expect_get_subscription_state()
1407 .with(eq(relay_id))
1408 .times(1)
1409 .returning(|_| Err(StorageError::Internal("Database error".to_string())));
1410
1411 let loaded_state =
1413 TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
1414
1415 assert!(loaded_state.is_err());
1416 assert!(
1417 loaded_state
1418 .unwrap_err()
1419 .to_string()
1420 .contains("Failed to load subscription state")
1421 );
1422 }
1423
1424 #[tokio::test]
1425 async fn test_subscription_state_persistence_error_resilience() {
1426 let _ = tracing_subscriber::fmt()
1427 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1428 .try_init();
1429
1430 let mut mock_storage = MockMessageStorage::new();
1431 let mut mock_messages_manager = MockMessagesManagerTrait::new();
1432 let relay_id = KeyId::from_bytes([14u8; 32]);
1433
1434 let initial_state = SubscriptionState::default();
1436 let shared_observable =
1437 eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1438 let subscriber = shared_observable.subscribe().await;
1439
1440 mock_storage
1443 .expect_store_subscription_state()
1444 .with(eq(relay_id), always())
1445 .times(0..=1)
1446 .returning(|_, _| Err(StorageError::Internal("Storage failure".to_string())));
1447
1448 mock_messages_manager
1450 .expect_message_events_stream()
1451 .times(1)
1452 .returning(create_mock_message_events_stream);
1453
1454 let subscriber_clone = subscriber.clone();
1456 mock_messages_manager
1457 .expect_get_subscription_state_updates()
1458 .times(1)
1459 .returning(move || subscriber_clone.clone());
1460
1461 let persistence_manager = TestPersistenceManagerBuilder::new()
1463 .storage(Arc::new(mock_storage))
1464 .relay_id(relay_id)
1465 .build_with_messages_manager(Arc::new(mock_messages_manager))
1466 .await;
1467
1468 assert!(persistence_manager.is_ok());
1469 let manager = persistence_manager.unwrap();
1470
1471 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1473
1474 assert!(manager.is_running());
1476
1477 shared_observable
1479 .set(SubscriptionState {
1480 latest_stream_height: Some("300".to_string()),
1481 current_filters: MessageFilters {
1482 filters: Some(vec![]),
1483 },
1484 })
1485 .await;
1486
1487 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1489
1490 }
1493 }
1494}