zoe_client/services/
message_persistence_manager.rs

1use crate::error::{ClientError, Result};
2use async_broadcast::Receiver;
3use async_trait::async_trait;
4use eyeball::AsyncLock;
5use futures::Stream;
6use std::{ops::Deref, sync::Arc};
7use tokio::{select, task::JoinHandle};
8use tracing::{debug, error, warn};
9use zoe_client_storage::{MessageStorage, SubscriptionState};
10use zoe_wire_protocol::{
11    CatchUpResponse, Filter, KeyId, MessageFull, PublishResult, StreamMessage, VerifyingKey,
12};
13
14use super::messages_manager::{
15    MessageEvent, MessagesManager, MessagesManagerBuilder, MessagesManagerTrait,
16};
17
18/// Builder for creating MessagePersistenceManager instances.
19///
20/// This builder allows configuring persistence behavior and connecting
21/// to existing MessageStorage and MessagesManager instances via references.
22///
23/// # Example
24///
25/// ```rust,no_run
26/// # use zoe_client::services::MessagePersistenceManagerBuilder;
27/// # use zoe_client_storage::MessageStorage;
28/// # use zoe_wire_protocol::VerifyingKey;
29/// # use std::sync::Arc;
30/// # use zoe_client_storage::StorageError;
31/// # async fn example(
32/// #     storage: Arc<dyn MessageStorage>,
33/// #     connection: &quinn::Connection,
34/// #     relay_key: VerifyingKey,
35/// # ) -> zoe_client::error::Result<()> {
36/// // Create persistence manager with embedded MessagesManager
37/// let persistence_manager = MessagePersistenceManagerBuilder::new()
38///     .storage(storage)
39///     .autosubscribe(true)
40///     .relay_pubkey(relay_key)
41///     .buffer_size(1000)
42///     .build(connection)
43///     .await?;
44///
45/// // Access MessagesManager via Deref
46/// let _stream = persistence_manager.subscribe_to_messages().await?;
47///
48/// // Or access explicitly for better discoverability
49/// let messages_manager = persistence_manager.messages_manager();
50/// let _stream = messages_manager.subscribe_to_messages().await?;
51/// # Ok(())
52/// # }
53/// ```
54pub struct GenericMessagePersistenceManagerBuilder<T: MessagesManagerTrait> {
55    storage: Option<Arc<dyn MessageStorage>>,
56    relay_id: Option<KeyId>,
57    buffer_size: Option<usize>,
58    autosubscribe: bool,
59    _phantom: std::marker::PhantomData<T>,
60}
61
62/// Type alias for the common case of MessagePersistenceManagerBuilder with concrete MessagesManager
63pub type MessagePersistenceManagerBuilder =
64    GenericMessagePersistenceManagerBuilder<MessagesManager>;
65
66impl<T: MessagesManagerTrait> GenericMessagePersistenceManagerBuilder<T> {
67    /// Create a new builder with default settings
68    pub fn new() -> Self {
69        Self {
70            storage: None,
71            relay_id: None,
72            buffer_size: None,
73            autosubscribe: false,
74            _phantom: std::marker::PhantomData,
75        }
76    }
77
78    /// Set the storage implementation to use for persistence
79    pub fn storage(mut self, storage: Arc<dyn MessageStorage>) -> Self {
80        self.storage = Some(storage);
81        self
82    }
83
84    /// Set whether to automatically subscribe after creating the messages manager
85    pub fn autosubscribe(mut self, autosubscribe: bool) -> Self {
86        self.autosubscribe = autosubscribe;
87        self
88    }
89
90    /// Set the relay ID (hash of public key) for sync tracking
91    pub fn relay_id(mut self, relay_id: KeyId) -> Self {
92        self.relay_id = Some(relay_id);
93        self
94    }
95
96    /// Set the relay public key for sync tracking (convenience method that computes the ID)
97    pub fn relay_pubkey(mut self, relay_pubkey: VerifyingKey) -> Self {
98        self.relay_id = Some(relay_pubkey.id());
99        self
100    }
101
102    /// Set the buffer size for the persistence task queue
103    pub fn buffer_size(mut self, buffer_size: usize) -> Self {
104        self.buffer_size = Some(buffer_size);
105        self
106    }
107
108    pub async fn build_with_messages_manager(
109        self,
110        messages_manager: Arc<T>,
111    ) -> Result<GenericMessagePersistenceManager<T>> {
112        let storage = self
113            .storage
114            .ok_or_else(|| ClientError::Generic("Storage is required".to_string()))?;
115
116        let relay_id = self
117            .relay_id
118            .ok_or_else(|| ClientError::Generic("Relay ID is required".to_string()))?;
119
120        // Create the persistence manager
121        let manager =
122            GenericMessagePersistenceManager::new(storage, messages_manager, relay_id).await?;
123
124        Ok(manager)
125    }
126}
127
128// Concrete implementation for the type alias
129impl MessagePersistenceManagerBuilder {
130    /// Build the MessagePersistenceManager and MessagesManager
131    ///
132    /// This will:
133    /// 1. Create the MessagesManager from the connection and configuration
134    /// 2. Create the MessagePersistenceManager with the MessagesManager
135    /// 3. Start the background persistence task
136    /// 4. Return a fully configured MessagePersistenceManager
137    ///
138    /// # Errors
139    /// Returns an error if storage is not provided or connection fails
140    pub async fn build_with_messages_manager_configuration<F>(
141        self,
142        connection: &quinn::Connection,
143        configure: F,
144    ) -> Result<MessagePersistenceManager>
145    where
146        F: FnOnce(MessagesManagerBuilder) -> MessagesManagerBuilder,
147    {
148        let storage = self
149            .storage
150            .as_ref()
151            .ok_or_else(|| ClientError::Generic("Storage is required".to_string()))?;
152
153        // Load subscription state from storage if we have a relay ID
154        let subscription_state = if let Some(relay_id) = &self.relay_id {
155            MessagePersistenceManager::load_subscription_state(&**storage, relay_id)
156                .await?
157                .unwrap_or_default()
158        } else {
159            SubscriptionState::default()
160        };
161
162        // Create the MessagesManager with loaded state
163        let messages_manager = Arc::new(
164            configure(
165                MessagesManagerBuilder::new()
166                    .state(subscription_state)
167                    .buffer_size(self.buffer_size.unwrap_or(1000))
168                    .autosubscribe(self.autosubscribe),
169            )
170            .build(connection)
171            .await?,
172        );
173
174        self.build_with_messages_manager(messages_manager).await
175    }
176
177    /// Build the MessagePersistenceManager and MessagesManager
178    ///
179    /// This will:
180    /// 1. Create the MessagesManager from the connection and configuration
181    /// 2. Create the MessagePersistenceManager with the MessagesManager
182    /// 3. Start the background persistence task
183    /// 4. Return a fully configured MessagePersistenceManager
184    ///
185    /// # Errors
186    /// Returns an error if storage is not provided or connection fails
187    pub async fn build(self, connection: &quinn::Connection) -> Result<MessagePersistenceManager> {
188        self.build_with_messages_manager_configuration(connection, |builder| builder)
189            .await
190    }
191}
192
193impl Default for MessagePersistenceManagerBuilder {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199#[derive(Debug)]
200struct AbortOnDrop<T>(JoinHandle<T>);
201
202impl<T> Drop for AbortOnDrop<T> {
203    fn drop(&mut self) {
204        self.0.abort();
205    }
206}
207
208/// High-level message persistence manager that automatically stores messages.
209///
210/// The `MessagePersistenceManager` bridges the gap between real-time messaging
211/// and persistent storage by:
212/// - **Automatic Persistence**: Stores all message events as they occur
213/// - **Relay Sync Tracking**: Tracks which messages have been synced to which relays
214/// - **Non-blocking Operation**: Runs persistence in the background without affecting message flow
215///
216/// This manager operates by subscribing to the message events stream from MessagesManager
217/// and automatically persisting all events to the configured storage backend.
218#[derive(Debug, Clone)]
219pub struct GenericMessagePersistenceManager<T: MessagesManagerTrait> {
220    /// The messages manager that this persistence manager wraps
221    messages_manager: Arc<T>,
222    /// Handle to the background persistence task
223    persistence_task: Arc<AbortOnDrop<Result<()>>>,
224}
225
226/// Type alias for the common case of MessagePersistenceManager with concrete MessagesManager
227pub type MessagePersistenceManager = GenericMessagePersistenceManager<MessagesManager>;
228
229impl<T: MessagesManagerTrait> GenericMessagePersistenceManager<T> {
230    /// Get a reference to the underlying MessagesManager
231    ///
232    /// This provides explicit access to the MessagesManager for cases where
233    /// method discovery is important or when you need to pass it to other components.
234    pub fn messages_manager(&self) -> &Arc<T> {
235        &self.messages_manager
236    }
237
238    /// Load subscription state from storage for a specific relay
239    pub async fn load_subscription_state(
240        storage: &dyn MessageStorage,
241        relay_id: &KeyId,
242    ) -> Result<Option<SubscriptionState>> {
243        storage
244            .get_subscription_state(relay_id)
245            .await
246            .map_err(|e| ClientError::Generic(format!("Failed to load subscription state: {e}")))
247    }
248
249    /// Load all subscription states from storage
250    pub async fn load_all_subscription_states(
251        storage: &dyn MessageStorage,
252    ) -> Result<std::collections::HashMap<KeyId, SubscriptionState>> {
253        storage.get_all_subscription_states().await.map_err(|e| {
254            ClientError::Generic(format!("Failed to load all subscription states: {e}"))
255        })
256    }
257
258    /// Create a new MessagePersistenceManager with the given components.
259    ///
260    /// This starts the background persistence task immediately.
261    ///
262    /// # Arguments
263    /// * `storage` - The storage implementation to persist messages to
264    /// * `messages_manager` - The messages manager to monitor for events
265    /// * `relay_pubkey` - Optional relay public key for sync tracking
266    /// * `buffer_size` - Optional buffer size for the task queue
267    async fn new(
268        storage: Arc<dyn MessageStorage>,
269        messages_manager: Arc<T>,
270        relay_id: KeyId,
271    ) -> Result<Self> {
272        // Get the message events stream before spawning the task to avoid lifetime issues
273        let mut events_stream = messages_manager.message_events_stream();
274        let mut state_updates = messages_manager.get_subscription_state_updates().await;
275        let storage_clone = storage.clone();
276
277        // Start the background persistence task
278        let persistence_task = tokio::spawn(async move {
279            // Get the message events stream inside the async block
280            debug!("MessagePersistenceManager started");
281
282            // let mut state_updates = Box::pin(state_updates);
283            loop {
284                select! {
285                    event_result = events_stream.recv() => {
286                        let event = match event_result {
287                            Ok(event) => event,
288                            Err(e) => {
289                                warn!("Message events stream unexpectedly ended: {e}");
290                                break;
291                            }
292                        };
293                        if let Err(e) =
294                            Self::handle_message_event(&*storage_clone, &event, &relay_id).await
295                        {
296                            warn!("Failed to handle message event {:?}: {}", event, e);
297                            // Continue processing other events even if one fails
298                        }
299                    }
300                    state = state_updates.next() => {
301                        let Some(state) = state else {
302                            // non updates are not of interest to us
303                            continue;
304                        };
305                        debug!("Subscription state updated: {:?}", state);
306                        if let Err(e) = storage_clone.store_subscription_state(&relay_id, &state).await {
307                            error!(error=?e, "Failed to store subscription state");
308                            // Continue processing other events even if state storage fails
309                        }
310                    }
311                }
312            }
313            Ok(())
314        });
315
316        Ok(Self {
317            messages_manager,
318            persistence_task: Arc::new(AbortOnDrop(persistence_task)),
319        })
320    }
321
322    /// Handle a single message event by persisting it
323    async fn handle_message_event(
324        storage: &dyn MessageStorage,
325        event: &MessageEvent,
326        relay_id: &KeyId,
327    ) -> Result<()> {
328        match event {
329            MessageEvent::MessageReceived {
330                message,
331                stream_height,
332            } => {
333                debug!(
334                    "Persisting received message: {}",
335                    hex::encode(message.id().as_bytes())
336                );
337                storage.store_message(message).await.map_err(|e| {
338                    ClientError::Generic(format!("Failed to store received message: {e}"))
339                })?;
340
341                // Mark as synced if we have relay info
342                storage
343                    .mark_message_synced(message.id(), relay_id, stream_height)
344                    .await
345                    .map_err(|e| {
346                        ClientError::Generic(format!("Failed to mark message as synced: {e}"))
347                    })?;
348            }
349            MessageEvent::MessageSent { message, .. } => {
350                debug!(
351                    "Persisting sent message: {}",
352                    hex::encode(message.id().as_bytes())
353                );
354                storage.store_message(message).await.map_err(|e| {
355                    ClientError::Generic(format!("Failed to store sent message: {e}"))
356                })?;
357            }
358            MessageEvent::CatchUpMessage {
359                message,
360                request_id,
361            } => {
362                debug!(
363                    "Persisting catch-up message (request {}): {}",
364                    request_id,
365                    hex::encode(message.id().as_bytes())
366                );
367                storage.store_message(message).await.map_err(|e| {
368                    ClientError::Generic(format!("Failed to store catch-up message: {e}"))
369                })?;
370            }
371            MessageEvent::StreamHeightUpdate { .. } => {
372                // Stream height updates don't need persistence by themselves
373                // They're already handled as part of MessageReceived events
374            }
375            MessageEvent::CatchUpCompleted { request_id } => {
376                debug!("Catch-up completed for request {}", request_id);
377                // Could be used for metrics or completion tracking
378            }
379        }
380
381        Ok(())
382    }
383
384    /// Check if the persistence task is still running
385    pub fn is_running(&self) -> bool {
386        !self.persistence_task.0.is_finished()
387    }
388}
389
390// Concrete implementation for the type alias
391impl MessagePersistenceManager {
392    /// Create a new MessagePersistenceManager builder
393    pub fn builder() -> MessagePersistenceManagerBuilder {
394        MessagePersistenceManagerBuilder::new()
395    }
396
397    /// Get a reference to the inner MessagesManager for use with components
398    /// that require the concrete type (like PqxdhProtocolHandler)
399    ///
400    /// This method is only available on the concrete MessagePersistenceManager
401    /// type alias, not the generic version.
402    pub fn inner_messages_manager(&self) -> &MessagesManager {
403        &self.messages_manager
404    }
405}
406
407impl Deref for MessagePersistenceManager {
408    type Target = MessagesManager;
409
410    fn deref(&self) -> &Self::Target {
411        &self.messages_manager
412    }
413}
414
415// Implement MessagesManagerTrait for GenericMessagePersistenceManager
416// This allows it to act as a transparent proxy to the underlying MessagesManager
417#[async_trait]
418impl<T: MessagesManagerTrait + 'static> MessagesManagerTrait
419    for GenericMessagePersistenceManager<T>
420{
421    fn message_events_stream(&self) -> Receiver<MessageEvent> {
422        self.messages_manager.message_events_stream()
423    }
424
425    async fn get_subscription_state_updates(
426        &self,
427    ) -> eyeball::Subscriber<SubscriptionState, AsyncLock> {
428        self.messages_manager.get_subscription_state_updates().await
429    }
430
431    async fn subscribe(&self) -> Result<()> {
432        self.messages_manager.subscribe().await
433    }
434
435    async fn publish(&self, message: MessageFull) -> Result<PublishResult> {
436        self.messages_manager.publish(message).await
437    }
438
439    async fn ensure_contains_filter(&self, filter: Filter) -> Result<()> {
440        self.messages_manager.ensure_contains_filter(filter).await
441    }
442
443    fn messages_stream(&self) -> Receiver<StreamMessage> {
444        self.messages_manager.messages_stream()
445    }
446
447    fn catch_up_stream(&self) -> Receiver<CatchUpResponse> {
448        self.messages_manager.catch_up_stream()
449    }
450
451    fn filtered_messages_stream(
452        &self,
453        filter: Filter,
454    ) -> std::pin::Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>> {
455        self.messages_manager.filtered_messages_stream(filter)
456    }
457
458    async fn catch_up_and_subscribe(
459        &self,
460        filter: Filter,
461        since: Option<String>,
462    ) -> Result<std::pin::Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>> {
463        self.messages_manager
464            .catch_up_and_subscribe(filter, since)
465            .await
466    }
467
468    async fn user_data(
469        &self,
470        author: zoe_wire_protocol::KeyId,
471        storage_key: zoe_wire_protocol::StoreKey,
472    ) -> Result<Option<MessageFull>> {
473        self.messages_manager.user_data(author, storage_key).await
474    }
475
476    async fn check_messages(
477        &self,
478        message_ids: Vec<zoe_wire_protocol::MessageId>,
479    ) -> Result<Vec<Option<String>>> {
480        self.messages_manager.check_messages(message_ids).await
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use mockall::predicate::*;
488    use rand::rngs::OsRng;
489    use std::collections::HashMap;
490
491    use crate::services::messages_manager::MockMessagesManagerTrait;
492    use zoe_client_storage::{StorageError, storage::MockMessageStorage};
493    use zoe_wire_protocol::{Content, KeyPair, Kind, MessageFilters, MessageFull, PublishResult};
494
495    fn create_test_message(content: &str) -> MessageFull {
496        let keypair = KeyPair::generate(&mut OsRng);
497        let content_obj = Content::Raw(content.as_bytes().to_vec());
498        let message = zoe_wire_protocol::Message::new_v0(
499            content_obj,
500            keypair.public_key(),
501            std::time::SystemTime::now()
502                .duration_since(std::time::UNIX_EPOCH)
503                .unwrap()
504                .as_secs(),
505            Kind::Regular,
506            vec![],
507        );
508        MessageFull::new(message, &keypair).expect("Failed to create MessageFull")
509    }
510
511    #[tokio::test]
512    async fn test_builder_defaults() {
513        let builder = MessagePersistenceManagerBuilder::new();
514        assert!(builder.storage.is_none());
515        assert!(builder.relay_id.is_none());
516        assert!(builder.buffer_size.is_none());
517        assert!(!builder.autosubscribe);
518    }
519
520    #[tokio::test]
521    async fn test_builder_relay_pubkey_conversion() {
522        let keypair = KeyPair::generate(&mut OsRng);
523        let pubkey = keypair.public_key();
524        let expected_key_id = pubkey.id();
525
526        let builder = MessagePersistenceManagerBuilder::new().relay_pubkey(pubkey);
527
528        assert_eq!(builder.relay_id, Some(expected_key_id));
529    }
530
531    #[tokio::test]
532    async fn test_builder_configuration() {
533        let relay_id = KeyId::from_bytes([1u8; 32]);
534        let buffer_size = 2000;
535
536        let builder = MessagePersistenceManagerBuilder::new()
537            .relay_id(relay_id)
538            .buffer_size(buffer_size)
539            .autosubscribe(true);
540
541        assert_eq!(builder.relay_id, Some(relay_id));
542        assert_eq!(builder.buffer_size, Some(buffer_size));
543        assert!(builder.autosubscribe);
544    }
545
546    #[tokio::test]
547    async fn test_message_received_persistence() {
548        let mut mock_storage = MockMessageStorage::new();
549        let message = create_test_message("Test message");
550        let relay_id = KeyId::from_bytes([1u8; 32]);
551        let stream_height = "100".to_string();
552
553        // Set up expectations
554        mock_storage
555            .expect_store_message()
556            .with(eq(message.clone()))
557            .times(1)
558            .returning(|_| Ok(()));
559
560        mock_storage
561            .expect_mark_message_synced()
562            .with(eq(*message.id()), eq(relay_id), eq("100"))
563            .times(1)
564            .returning(|_, _, _| Ok(()));
565
566        // Create message event
567        let event = MessageEvent::MessageReceived {
568            message: message.clone(),
569            stream_height: stream_height.clone(),
570        };
571
572        // Test the handler
573        let result =
574            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
575
576        assert!(result.is_ok());
577    }
578
579    #[tokio::test]
580    async fn test_message_sent_persistence() {
581        let mut mock_storage = MockMessageStorage::new();
582        let relay_id = KeyId::from_bytes([1u8; 32]);
583        let message = create_test_message("Sent message");
584        let publish_result = PublishResult::StoredNew {
585            global_stream_id: "200".to_string(),
586        };
587
588        // Set up expectations
589        mock_storage
590            .expect_store_message()
591            .with(eq(message.clone()))
592            .times(1)
593            .returning(|_| Ok(()));
594
595        // Create message event
596        let event = MessageEvent::MessageSent {
597            message: message.clone(),
598            publish_result,
599        };
600
601        // Test the handler
602        let result =
603            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
604
605        assert!(result.is_ok());
606    }
607
608    #[tokio::test]
609    async fn test_catch_up_message_persistence() {
610        let mut mock_storage = MockMessageStorage::new();
611        let relay_id = KeyId::from_bytes([1u8; 32]);
612        let message = create_test_message("Catch-up message");
613        let request_id = 42;
614
615        // Set up expectations
616        mock_storage
617            .expect_store_message()
618            .with(eq(message.clone()))
619            .times(1)
620            .returning(|_| Ok(()));
621
622        // Create message event
623        let event = MessageEvent::CatchUpMessage {
624            message: message.clone(),
625            request_id,
626        };
627
628        // Test the handler
629        let result =
630            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
631
632        assert!(result.is_ok());
633    }
634
635    #[tokio::test]
636    async fn test_stream_height_update_no_persistence() {
637        let mock_storage = MockMessageStorage::new();
638        let relay_id = KeyId::from_bytes([1u8; 32]);
639
640        // Stream height updates should not trigger any storage calls
641        let event = MessageEvent::StreamHeightUpdate {
642            height: "300".to_string(),
643        };
644
645        // Test the handler - should succeed without any storage calls
646        let result =
647            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
648
649        assert!(result.is_ok());
650    }
651
652    #[tokio::test]
653    async fn test_catch_up_completed_no_persistence() {
654        let mock_storage = MockMessageStorage::new();
655        let relay_id = KeyId::from_bytes([1u8; 32]);
656
657        // Catch-up completed events should not trigger any storage calls
658        let event = MessageEvent::CatchUpCompleted { request_id: 123 };
659
660        // Test the handler - should succeed without any storage calls
661        let result =
662            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
663
664        assert!(result.is_ok());
665    }
666
667    #[tokio::test]
668    async fn test_storage_error_handling() {
669        let mut mock_storage = MockMessageStorage::new();
670        let relay_id = KeyId::from_bytes([1u8; 32]);
671        let message = create_test_message("Error test message");
672
673        // Set up storage to return an error
674        mock_storage
675            .expect_store_message()
676            .with(eq(message.clone()))
677            .times(1)
678            .returning(|_| Err(StorageError::Internal("Test error".to_string())));
679
680        // Create message event
681        let event = MessageEvent::MessageSent {
682            message: message.clone(),
683            publish_result: PublishResult::StoredNew {
684                global_stream_id: "400".to_string(),
685            },
686        };
687
688        // Test the handler - should return an error
689        let result =
690            MessagePersistenceManager::handle_message_event(&mock_storage, &event, &relay_id).await;
691
692        assert!(result.is_err());
693        assert!(
694            result
695                .unwrap_err()
696                .to_string()
697                .contains("Failed to store sent message")
698        );
699    }
700
701    #[tokio::test]
702    async fn test_load_subscription_state() {
703        let mut mock_storage = MockMessageStorage::new();
704        let relay_id = KeyId::from_bytes([4u8; 32]);
705        let expected_state = SubscriptionState {
706            latest_stream_height: Some("500".to_string()),
707            current_filters: MessageFilters {
708                filters: Some(vec![]),
709            },
710        };
711
712        // Set up expectations
713        mock_storage
714            .expect_get_subscription_state()
715            .with(eq(relay_id))
716            .times(1)
717            .returning(move |_| Ok(Some(expected_state.clone())));
718
719        // Test loading subscription state
720        let result =
721            MessagePersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
722
723        assert!(result.is_ok());
724        let loaded_state = result.unwrap();
725        assert!(loaded_state.is_some());
726        assert_eq!(
727            loaded_state.unwrap().latest_stream_height,
728            Some("500".to_string())
729        );
730    }
731
732    #[tokio::test]
733    async fn test_load_all_subscription_states() {
734        let mut mock_storage = MockMessageStorage::new();
735        let relay_id1 = KeyId::from_bytes([5u8; 32]);
736        let relay_id2 = KeyId::from_bytes([6u8; 32]);
737
738        let mut expected_states = HashMap::new();
739        expected_states.insert(
740            relay_id1,
741            SubscriptionState {
742                latest_stream_height: Some("600".to_string()),
743                current_filters: MessageFilters {
744                    filters: Some(vec![]),
745                },
746            },
747        );
748        expected_states.insert(
749            relay_id2,
750            SubscriptionState {
751                latest_stream_height: Some("700".to_string()),
752                current_filters: MessageFilters {
753                    filters: Some(vec![]),
754                },
755            },
756        );
757
758        // Set up expectations
759        mock_storage
760            .expect_get_all_subscription_states()
761            .times(1)
762            .returning(move || Ok(expected_states.clone()));
763
764        // Test loading all subscription states
765        let result = MessagePersistenceManager::load_all_subscription_states(&mock_storage).await;
766
767        assert!(result.is_ok());
768        let loaded_states = result.unwrap();
769        assert_eq!(loaded_states.len(), 2);
770        assert!(loaded_states.contains_key(&relay_id1));
771        assert!(loaded_states.contains_key(&relay_id2));
772    }
773
774    #[tokio::test]
775    async fn test_subscription_state_helpers() {
776        // Test default subscription state
777        let state = SubscriptionState::default();
778        assert!(state.latest_stream_height.is_none());
779        assert!(state.current_filters.filters.is_none());
780
781        // Test subscription state with filters
782        let filters = MessageFilters {
783            filters: Some(vec![]),
784        };
785        let state = SubscriptionState::with_filters(filters.clone());
786        assert_eq!(state.current_filters, filters);
787        assert!(state.latest_stream_height.is_none());
788    }
789
790    #[tokio::test]
791    async fn test_subscription_state_operations() {
792        let mut state = SubscriptionState::default();
793
794        // Test setting filters directly
795        let filters = MessageFilters {
796            filters: Some(vec![]),
797        };
798        state.current_filters = filters.clone();
799        assert_eq!(state.current_filters, filters);
800
801        // Test setting stream height
802        state.latest_stream_height = Some("500".to_string());
803        assert_eq!(state.latest_stream_height, Some("500".to_string()));
804
805        // Test has_active_filters
806        assert!(!state.has_active_filters()); // Empty filters
807
808        // Test with actual filters (would need real Filter objects for a complete test)
809        let state_with_filters = SubscriptionState {
810            latest_stream_height: Some("600".to_string()),
811            current_filters: MessageFilters {
812                filters: Some(vec![]),
813            },
814        };
815        assert!(!state_with_filters.has_active_filters()); // Still empty vec
816    }
817
818    #[tokio::test]
819    async fn test_error_handling_scenarios() {
820        // Test various error scenarios that might occur
821
822        // Test with invalid relay ID (all zeros)
823        let zero_relay_id = KeyId::from_bytes([0u8; 32]);
824        assert_eq!(zero_relay_id, KeyId::from_bytes([0u8; 32]));
825
826        // Test with maximum relay ID (all 255s)
827        let max_relay_id = KeyId::from_bytes([255u8; 32]);
828        assert_eq!(max_relay_id, KeyId::from_bytes([255u8; 32]));
829
830        // Test subscription state edge cases
831        let empty_state = SubscriptionState::default();
832        assert!(empty_state.latest_stream_height.is_none());
833        assert!(!empty_state.has_active_filters());
834    }
835
836    // ============================================================================
837    // COMPREHENSIVE WIRING AND SYNC STATUS TESTS WITH MOCKED MESSAGES MANAGER
838    // ============================================================================
839
840    #[cfg(test)]
841    mod wiring_tests {
842        use zoe_client_storage::StorageError;
843
844        use super::*;
845
846        type TestPersistenceManager = GenericMessagePersistenceManager<MockMessagesManagerTrait>;
847        type TestPersistenceManagerBuilder =
848            GenericMessagePersistenceManagerBuilder<MockMessagesManagerTrait>;
849
850        fn create_mock_message_events_stream() -> Receiver<MessageEvent> {
851            // Create a broadcast channel and return the receiver
852            // Keep the sender alive by leaking it (for test purposes)
853            let (sender, receiver) = async_broadcast::broadcast(10);
854            std::mem::forget(sender); // Prevent the sender from being dropped
855            receiver
856        }
857
858        #[tokio::test]
859        async fn test_generic_builder_with_mock_messages_manager() {
860            let mut mock_storage = MockMessageStorage::new();
861            let mut mock_messages_manager = MockMessagesManagerTrait::new();
862            let relay_id = KeyId::from_bytes([1u8; 32]);
863
864            // Set up storage expectations
865            mock_storage.expect_store_message().returning(|_| Ok(()));
866            mock_storage
867                .expect_mark_message_synced()
868                .returning(|_, _, _| Ok(()));
869
870            // Set up messages manager expectations
871            mock_messages_manager
872                .expect_message_events_stream()
873                .times(1)
874                .returning(create_mock_message_events_stream);
875
876            // Add missing subscription state updates expectation
877            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
878                SubscriptionState::default(),
879            );
880            let subscriber = shared.subscribe().await;
881            mock_messages_manager
882                .expect_get_subscription_state_updates()
883                .times(1)
884                .returning(move || subscriber.clone());
885
886            // Build the persistence manager
887            let persistence_manager = TestPersistenceManagerBuilder::new()
888                .storage(Arc::new(mock_storage))
889                .relay_id(relay_id)
890                .build_with_messages_manager(Arc::new(mock_messages_manager))
891                .await;
892
893            assert!(persistence_manager.is_ok());
894            let manager = persistence_manager.unwrap();
895
896            // Verify we can access the messages manager
897            let messages_manager_ref = manager.messages_manager();
898            // Just verify we got a reference (Arc is never null)
899            assert!(Arc::strong_count(messages_manager_ref) > 0);
900
901            // Give some time for background task to process events
902            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
903        }
904
905        #[tokio::test]
906        async fn test_subscription_state_wiring_with_mock() {
907            let mut mock_storage = MockMessageStorage::new();
908            let mut mock_messages_manager = MockMessagesManagerTrait::new();
909            let relay_id = KeyId::from_bytes([2u8; 32]);
910
911            // Set up initial subscription state in storage
912            let initial_state = SubscriptionState {
913                latest_stream_height: Some("50".to_string()),
914                current_filters: MessageFilters {
915                    filters: Some(vec![]),
916                },
917            };
918
919            mock_storage
920                .expect_get_subscription_state()
921                .with(eq(relay_id))
922                .times(1)
923                .returning(move |_| Ok(Some(initial_state.clone())));
924
925            mock_storage
926                .expect_store_subscription_state()
927                .returning(|_, _| Ok(()));
928
929            mock_messages_manager
930                .expect_message_events_stream()
931                .times(1)
932                .returning(create_mock_message_events_stream);
933
934            // Add missing subscription state updates expectation
935            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
936                SubscriptionState::default(),
937            );
938            let subscriber = shared.subscribe().await;
939            mock_messages_manager
940                .expect_get_subscription_state_updates()
941                .times(1)
942                .returning(move || subscriber.clone());
943
944            // Test loading subscription state
945            let loaded_state =
946                TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
947
948            assert!(loaded_state.is_ok());
949            let state = loaded_state.unwrap();
950            assert!(state.is_some());
951            assert_eq!(state.unwrap().latest_stream_height, Some("50".to_string()));
952
953            // Test building with the state
954            let persistence_manager = TestPersistenceManagerBuilder::new()
955                .storage(Arc::new(mock_storage))
956                .relay_id(relay_id)
957                .build_with_messages_manager(Arc::new(mock_messages_manager))
958                .await;
959
960            assert!(persistence_manager.is_ok());
961        }
962
963        #[tokio::test]
964        async fn test_message_sync_tracking_with_mock() {
965            let mut mock_storage = MockMessageStorage::new();
966            let mut mock_messages_manager = MockMessagesManagerTrait::new();
967            let relay_id = KeyId::from_bytes([3u8; 32]);
968            let test_message = create_test_message("sync test message");
969            let message_id = *test_message.id();
970
971            // Set up expectations for message sync tracking
972            mock_storage
973                .expect_store_message()
974                .with(eq(test_message.clone()))
975                .times(1)
976                .returning(|_| Ok(()));
977
978            mock_storage
979                .expect_mark_message_synced()
980                .with(eq(message_id), eq(relay_id), eq("150"))
981                .times(1)
982                .returning(|_, _, _| Ok(()));
983
984            mock_messages_manager
985                .expect_message_events_stream()
986                .times(1)
987                .returning(create_mock_message_events_stream);
988
989            // Add missing subscription state updates expectation
990            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
991                SubscriptionState::default(),
992            );
993            let subscriber = shared.subscribe().await;
994            mock_messages_manager
995                .expect_get_subscription_state_updates()
996                .times(1)
997                .returning(move || subscriber.clone());
998
999            // Build and test the persistence manager
1000            let persistence_manager = TestPersistenceManagerBuilder::new()
1001                .storage(Arc::new(mock_storage))
1002                .relay_id(relay_id)
1003                .build_with_messages_manager(Arc::new(mock_messages_manager))
1004                .await;
1005
1006            assert!(persistence_manager.is_ok());
1007
1008            // Give time for the background task to process the message
1009            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1010        }
1011
1012        #[tokio::test]
1013        async fn test_multiple_relay_sync_status() {
1014            let mut mock_storage = MockMessageStorage::new();
1015            let relay_id1 = KeyId::from_bytes([4u8; 32]);
1016            let relay_id2 = KeyId::from_bytes([5u8; 32]);
1017
1018            // Set up multiple subscription states
1019            let mut all_states = HashMap::new();
1020            all_states.insert(
1021                relay_id1,
1022                SubscriptionState {
1023                    latest_stream_height: Some("100".to_string()),
1024                    current_filters: MessageFilters {
1025                        filters: Some(vec![]),
1026                    },
1027                },
1028            );
1029            all_states.insert(
1030                relay_id2,
1031                SubscriptionState {
1032                    latest_stream_height: Some("200".to_string()),
1033                    current_filters: MessageFilters {
1034                        filters: Some(vec![]),
1035                    },
1036                },
1037            );
1038
1039            mock_storage
1040                .expect_get_all_subscription_states()
1041                .times(1)
1042                .returning(move || Ok(all_states.clone()));
1043
1044            // Test loading all subscription states
1045            let result = TestPersistenceManager::load_all_subscription_states(&mock_storage).await;
1046
1047            assert!(result.is_ok());
1048            let loaded_states = result.unwrap();
1049            assert_eq!(loaded_states.len(), 2);
1050            assert!(loaded_states.contains_key(&relay_id1));
1051            assert!(loaded_states.contains_key(&relay_id2));
1052            assert_eq!(
1053                loaded_states.get(&relay_id1).unwrap().latest_stream_height,
1054                Some("100".to_string())
1055            );
1056            assert_eq!(
1057                loaded_states.get(&relay_id2).unwrap().latest_stream_height,
1058                Some("200".to_string())
1059            );
1060        }
1061
1062        #[tokio::test]
1063        async fn test_builder_error_handling_with_mock() {
1064            let mock_messages_manager = MockMessagesManagerTrait::new();
1065
1066            // Test building without storage should fail
1067            let result = TestPersistenceManagerBuilder::new()
1068                .relay_id(KeyId::from_bytes([6u8; 32]))
1069                .build_with_messages_manager(Arc::new(mock_messages_manager))
1070                .await;
1071
1072            assert!(result.is_err());
1073            assert!(
1074                result
1075                    .unwrap_err()
1076                    .to_string()
1077                    .contains("Storage is required")
1078            );
1079        }
1080
1081        #[tokio::test]
1082        async fn test_persistence_manager_deref_with_mock() {
1083            let mock_storage = MockMessageStorage::new();
1084            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1085
1086            // Set up minimal expectations
1087            mock_messages_manager
1088                .expect_message_events_stream()
1089                .times(1)
1090                .returning(create_mock_message_events_stream);
1091
1092            // Add missing subscription state updates expectation
1093            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1094                SubscriptionState::default(),
1095            );
1096            let subscriber = shared.subscribe().await;
1097            mock_messages_manager
1098                .expect_get_subscription_state_updates()
1099                .times(1)
1100                .returning(move || subscriber.clone());
1101
1102            // Build the persistence manager
1103            let relay_id = KeyId::from_bytes([10u8; 32]);
1104            let persistence_manager = TestPersistenceManagerBuilder::new()
1105                .storage(Arc::new(mock_storage))
1106                .relay_id(relay_id)
1107                .build_with_messages_manager(Arc::new(mock_messages_manager))
1108                .await
1109                .unwrap();
1110
1111            // Give time for the background task to start and call message_events_stream
1112            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1113
1114            // Test that we can access the messages manager through the persistence manager
1115            let messages_manager_ref = persistence_manager.messages_manager();
1116            // Just verify we got a reference (Arc is never null)
1117            assert!(Arc::strong_count(messages_manager_ref) > 0);
1118        }
1119
1120        #[tokio::test]
1121        async fn test_background_task_error_resilience() {
1122            let mut mock_storage = MockMessageStorage::new();
1123            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1124            let relay_id = KeyId::from_bytes([7u8; 32]);
1125
1126            // Set up storage to fail on first message, succeed on second
1127            let mut call_count = 0;
1128            mock_storage.expect_store_message().returning(move |_| {
1129                call_count += 1;
1130                if call_count == 1 {
1131                    Err(StorageError::Internal("Simulated failure".to_string()))
1132                } else {
1133                    Ok(())
1134                }
1135            });
1136
1137            mock_storage
1138                .expect_mark_message_synced()
1139                .returning(|_, _, _| Ok(()));
1140
1141            mock_messages_manager
1142                .expect_message_events_stream()
1143                .times(1)
1144                .returning(create_mock_message_events_stream);
1145
1146            // Add missing subscription state updates expectation
1147            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1148                SubscriptionState::default(),
1149            );
1150            let subscriber = shared.subscribe().await;
1151            mock_messages_manager
1152                .expect_get_subscription_state_updates()
1153                .times(1)
1154                .returning(move || subscriber.clone());
1155
1156            // Build the persistence manager
1157            let persistence_manager = TestPersistenceManagerBuilder::new()
1158                .storage(Arc::new(mock_storage))
1159                .relay_id(relay_id)
1160                .build_with_messages_manager(Arc::new(mock_messages_manager))
1161                .await;
1162
1163            assert!(persistence_manager.is_ok());
1164
1165            // Give time for both messages to be processed
1166            tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1167
1168            // The persistence manager should still be running despite the first failure
1169            // This test mainly verifies that the background task continues after errors
1170        }
1171
1172        #[tokio::test]
1173        async fn test_stream_height_update_persistence() {
1174            let mut mock_storage = MockMessageStorage::new();
1175            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1176            let relay_id = KeyId::from_bytes([9u8; 32]);
1177
1178            // Create a real SharedObservable for subscription state with AsyncLock
1179            let initial_state = SubscriptionState {
1180                latest_stream_height: Some("100".to_string()),
1181                current_filters: MessageFilters {
1182                    filters: Some(vec![]),
1183                },
1184            };
1185            let shared_observable =
1186                eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1187            let subscriber = shared_observable.subscribe().await;
1188
1189            // Set up storage expectations - should be called when state updates
1190            mock_storage
1191                .expect_store_subscription_state()
1192                .with(
1193                    eq(relay_id),
1194                    function(|state: &SubscriptionState| {
1195                        state.latest_stream_height == Some("150".to_string())
1196                    }),
1197                )
1198                .times(1)
1199                .returning(|_, _| Ok(()));
1200
1201            // Set up mock messages manager to return empty message events stream
1202            mock_messages_manager
1203                .expect_message_events_stream()
1204                .times(1)
1205                .returning(create_mock_message_events_stream);
1206
1207            // Set up mock to return the cloned subscriber
1208            let subscriber_clone = subscriber.clone();
1209            mock_messages_manager
1210                .expect_get_subscription_state_updates()
1211                .times(1)
1212                .returning(move || subscriber_clone.clone());
1213
1214            // Build the persistence manager
1215            let persistence_manager = TestPersistenceManagerBuilder::new()
1216                .storage(Arc::new(mock_storage))
1217                .relay_id(relay_id)
1218                .build_with_messages_manager(Arc::new(mock_messages_manager))
1219                .await;
1220
1221            assert!(persistence_manager.is_ok());
1222
1223            // Give a moment for the persistence manager to start up
1224            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1225
1226            // Now trigger a subscription state update by updating the SharedObservable
1227            shared_observable
1228                .set(SubscriptionState {
1229                    latest_stream_height: Some("150".to_string()),
1230                    current_filters: MessageFilters {
1231                        filters: Some(vec![]),
1232                    },
1233                })
1234                .await;
1235
1236            // Give time for the background task to process the state update
1237            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1238
1239            // The mock expectation will verify that store_subscription_state was called
1240            // with the updated height of "150"
1241        }
1242
1243        #[tokio::test]
1244        async fn test_stream_event_filtering_and_processing() {
1245            let mut mock_storage = MockMessageStorage::new();
1246            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1247            let relay_id = KeyId::from_bytes([8u8; 32]);
1248
1249            // Set up expectations for different event types
1250            mock_storage
1251                .expect_store_message()
1252                .times(2) // Only for MessageReceived and MessageSent
1253                .returning(|_| Ok(()));
1254
1255            mock_storage
1256                .expect_mark_message_synced()
1257                .times(1) // Only for MessageReceived
1258                .returning(|_, _, _| Ok(()));
1259
1260            mock_messages_manager
1261                .expect_message_events_stream()
1262                .times(1)
1263                .returning(create_mock_message_events_stream);
1264
1265            // Add missing subscription state updates expectation
1266            let shared = eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(
1267                SubscriptionState::default(),
1268            );
1269            let subscriber = shared.subscribe().await;
1270            mock_messages_manager
1271                .expect_get_subscription_state_updates()
1272                .times(1)
1273                .returning(move || subscriber.clone());
1274
1275            // Build the persistence manager
1276            let persistence_manager = TestPersistenceManagerBuilder::new()
1277                .storage(Arc::new(mock_storage))
1278                .relay_id(relay_id)
1279                .build_with_messages_manager(Arc::new(mock_messages_manager))
1280                .await;
1281
1282            assert!(persistence_manager.is_ok());
1283
1284            // Give time for all events to be processed
1285            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1286
1287            // The mock expectations will verify that the right methods were called
1288            // for the right event types
1289        }
1290
1291        #[tokio::test]
1292        async fn test_subscription_state_persistence_on_updates() {
1293            let mut mock_storage = MockMessageStorage::new();
1294            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1295            let relay_id = KeyId::from_bytes([11u8; 32]);
1296
1297            // Create a real SharedObservable for subscription state with AsyncLock
1298            let initial_state = SubscriptionState {
1299                latest_stream_height: Some("100".to_string()),
1300                current_filters: MessageFilters {
1301                    filters: Some(vec![]),
1302                },
1303            };
1304            let shared_observable =
1305                eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1306            let subscriber = shared_observable.subscribe().await;
1307
1308            // Set up storage expectations - should be called when state updates
1309            // Allow 0-2 calls since the timing of observable updates is not deterministic
1310            mock_storage
1311                .expect_store_subscription_state()
1312                .with(eq(relay_id), always())
1313                .times(0..=2)
1314                .returning(|_, _| Ok(()));
1315
1316            // Set up mock messages manager to return empty message events stream
1317            mock_messages_manager
1318                .expect_message_events_stream()
1319                .times(1)
1320                .returning(create_mock_message_events_stream);
1321
1322            // Set up mock to return the cloned subscriber
1323            let subscriber_clone = subscriber.clone();
1324            mock_messages_manager
1325                .expect_get_subscription_state_updates()
1326                .times(1)
1327                .returning(move || subscriber_clone.clone());
1328
1329            // Build the persistence manager
1330            let persistence_manager = TestPersistenceManagerBuilder::new()
1331                .storage(Arc::new(mock_storage))
1332                .relay_id(relay_id)
1333                .build_with_messages_manager(Arc::new(mock_messages_manager))
1334                .await;
1335
1336            assert!(persistence_manager.is_ok());
1337
1338            // Give a moment for the persistence manager to start up
1339            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1340
1341            // Trigger subscription state updates
1342            shared_observable
1343                .set(SubscriptionState {
1344                    latest_stream_height: Some("150".to_string()),
1345                    current_filters: MessageFilters {
1346                        filters: Some(vec![]),
1347                    },
1348                })
1349                .await;
1350
1351            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1352
1353            shared_observable
1354                .set(SubscriptionState {
1355                    latest_stream_height: Some("200".to_string()),
1356                    current_filters: MessageFilters {
1357                        filters: Some(vec![]),
1358                    },
1359                })
1360                .await;
1361
1362            // Give time for the background task to process all state updates
1363            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1364
1365            // The mock expectations will verify that store_subscription_state was called
1366        }
1367
1368        #[tokio::test]
1369        async fn test_subscription_state_loading_on_startup() {
1370            let mut mock_storage = MockMessageStorage::new();
1371            let relay_id = KeyId::from_bytes([12u8; 32]);
1372
1373            // Set up initial subscription state in storage
1374            let stored_state = SubscriptionState {
1375                latest_stream_height: Some("500".to_string()),
1376                current_filters: MessageFilters {
1377                    filters: Some(vec![]),
1378                },
1379            };
1380
1381            mock_storage
1382                .expect_get_subscription_state()
1383                .with(eq(relay_id))
1384                .times(1)
1385                .returning(move |_| Ok(Some(stored_state.clone())));
1386
1387            // Test loading subscription state
1388            let loaded_state =
1389                TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
1390
1391            assert!(loaded_state.is_ok());
1392            let state = loaded_state.unwrap();
1393            assert!(state.is_some());
1394            let state = state.unwrap();
1395            assert_eq!(state.latest_stream_height, Some("500".to_string()));
1396            assert!(state.current_filters.filters.is_some());
1397        }
1398
1399        #[tokio::test]
1400        async fn test_subscription_state_error_handling() {
1401            let mut mock_storage = MockMessageStorage::new();
1402            let relay_id = KeyId::from_bytes([13u8; 32]);
1403
1404            // Set up storage to return an error
1405            mock_storage
1406                .expect_get_subscription_state()
1407                .with(eq(relay_id))
1408                .times(1)
1409                .returning(|_| Err(StorageError::Internal("Database error".to_string())));
1410
1411            // Test loading subscription state with storage error
1412            let loaded_state =
1413                TestPersistenceManager::load_subscription_state(&mock_storage, &relay_id).await;
1414
1415            assert!(loaded_state.is_err());
1416            assert!(
1417                loaded_state
1418                    .unwrap_err()
1419                    .to_string()
1420                    .contains("Failed to load subscription state")
1421            );
1422        }
1423
1424        #[tokio::test]
1425        async fn test_subscription_state_persistence_error_resilience() {
1426            let _ = tracing_subscriber::fmt()
1427                .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1428                .try_init();
1429
1430            let mut mock_storage = MockMessageStorage::new();
1431            let mut mock_messages_manager = MockMessagesManagerTrait::new();
1432            let relay_id = KeyId::from_bytes([14u8; 32]);
1433
1434            // Create a real SharedObservable for subscription state with AsyncLock
1435            let initial_state = SubscriptionState::default();
1436            let shared_observable =
1437                eyeball::SharedObservable::<SubscriptionState, AsyncLock>::new_async(initial_state);
1438            let subscriber = shared_observable.subscribe().await;
1439
1440            // Set up storage to fail on state persistence - allow 0 or 1 calls
1441            // since the timing of when the observable triggers is not deterministic
1442            mock_storage
1443                .expect_store_subscription_state()
1444                .with(eq(relay_id), always())
1445                .times(0..=1)
1446                .returning(|_, _| Err(StorageError::Internal("Storage failure".to_string())));
1447
1448            // Set up mock messages manager to return a message events stream that stays open
1449            mock_messages_manager
1450                .expect_message_events_stream()
1451                .times(1)
1452                .returning(create_mock_message_events_stream);
1453
1454            // Set up mock to return the cloned subscriber
1455            let subscriber_clone = subscriber.clone();
1456            mock_messages_manager
1457                .expect_get_subscription_state_updates()
1458                .times(1)
1459                .returning(move || subscriber_clone.clone());
1460
1461            // Build the persistence manager
1462            let persistence_manager = TestPersistenceManagerBuilder::new()
1463                .storage(Arc::new(mock_storage))
1464                .relay_id(relay_id)
1465                .build_with_messages_manager(Arc::new(mock_messages_manager))
1466                .await;
1467
1468            assert!(persistence_manager.is_ok());
1469            let manager = persistence_manager.unwrap();
1470
1471            // Give a moment for the persistence manager to start up
1472            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1473
1474            // Verify the manager starts running initially
1475            assert!(manager.is_running());
1476
1477            // Trigger a subscription state update that may fail to persist
1478            shared_observable
1479                .set(SubscriptionState {
1480                    latest_stream_height: Some("300".to_string()),
1481                    current_filters: MessageFilters {
1482                        filters: Some(vec![]),
1483                    },
1484                })
1485                .await;
1486
1487            // Give time for the background task to process the state update
1488            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1489
1490            // The main goal is to verify that the background task handles errors gracefully
1491            // and continues running. The exact timing of state updates is not deterministic.
1492        }
1493    }
1494}