pub struct MultiRelayMessageManager<S: MessageStorage> {
relay_connections: Arc<RwLock<BTreeMap<KeyId, RelayConnection>>>,
storage: Arc<S>,
global_events_tx: Sender<MessageEvent>,
global_messages_tx: Sender<StreamMessage>,
global_catchup_tx: Sender<CatchUpResponse>,
catch_up_tasks: Arc<RwLock<BTreeMap<KeyId, JoinHandle<()>>>>,
}Expand description
Multi-relay message manager that provides unified messaging across multiple relays with offline support and automatic failover.
This manager:
- Manages connections to multiple relay servers
- Uses persistent storage for offline message queuing (no in-memory queue)
- Implements automatic failover and load balancing
- Aggregates messages from all connected relays
- Deduplicates messages based on message ID
- Maintains the same interface as a single MessagesManager
Fields§
§relay_connections: Arc<RwLock<BTreeMap<KeyId, RelayConnection>>>Map of relay ID to relay connection info
storage: Arc<S>Storage for message persistence and offline queuing
global_events_tx: Sender<MessageEvent>Global message event broadcaster (aggregates from all relays)
global_messages_tx: Sender<StreamMessage>Global message broadcaster (aggregates from all relays)
global_catchup_tx: Sender<CatchUpResponse>Global catch-up response broadcaster
catch_up_tasks: Arc<RwLock<BTreeMap<KeyId, JoinHandle<()>>>>Map of active catch-up tasks by relay ID
Implementations§
Source§impl<S: MessageStorage + 'static> MultiRelayMessageManager<S>
impl<S: MessageStorage + 'static> MultiRelayMessageManager<S>
Sourcepub async fn add_relay(
&self,
relay_id: KeyId,
manager: Arc<MessagesManager>,
should_catch_up: bool,
) -> Result<()>
pub async fn add_relay( &self, relay_id: KeyId, manager: Arc<MessagesManager>, should_catch_up: bool, ) -> Result<()>
Add a relay connection to the manager
§Arguments
relay_id- The unique identifier for the relaymanager- The messages manager for this relayshould_catch_up- Whether to start catching up on historical messages
Sourcepub async fn remove_relay(
&self,
relay_id: &KeyId,
) -> Option<Arc<MessagesManager>>
pub async fn remove_relay( &self, relay_id: &KeyId, ) -> Option<Arc<MessagesManager>>
Remove a relay connection
Sourcepub async fn get_connected_relay_ids(&self) -> Vec<KeyId>
pub async fn get_connected_relay_ids(&self) -> Vec<KeyId>
Get list of currently connected relay IDs
Sourcepub async fn get_all_relay_ids(&self) -> Vec<KeyId>
pub async fn get_all_relay_ids(&self) -> Vec<KeyId>
Get list of all relay IDs (connected and disconnected)
Sourcepub async fn has_connected_relays(&self) -> bool
pub async fn has_connected_relays(&self) -> bool
Check if any relays are currently connected
Sourceasync fn start_catch_up_task(
&self,
relay_id: KeyId,
manager: Arc<MessagesManager>,
) -> Result<()>
async fn start_catch_up_task( &self, relay_id: KeyId, manager: Arc<MessagesManager>, ) -> Result<()>
Start a catch-up task for a specific relay
Sourceasync fn process_all_unsynced_messages_for_relay<M: MessagesManagerTrait>(
storage: &Arc<S>,
relay_id: &KeyId,
manager: &Arc<M>,
batch_size: usize,
) -> Result<usize>
async fn process_all_unsynced_messages_for_relay<M: MessagesManagerTrait>( storage: &Arc<S>, relay_id: &KeyId, manager: &Arc<M>, batch_size: usize, ) -> Result<usize>
Process all unsynced messages for a specific relay in batches Returns the total number of messages processed
Sourceasync fn process_unsynced_messages_batch_for_relay<M: MessagesManagerTrait>(
storage: &Arc<S>,
relay_id: &KeyId,
manager: &Arc<M>,
batch_size: usize,
) -> Result<Option<usize>>
async fn process_unsynced_messages_batch_for_relay<M: MessagesManagerTrait>( storage: &Arc<S>, relay_id: &KeyId, manager: &Arc<M>, batch_size: usize, ) -> Result<Option<usize>>
Process a single batch of unsynced messages for a specific relay Returns:
- Ok(Some(count)) if messages were processed (count = number successfully processed)
- Ok(None) if no unsynced messages were found (indicates completion)
- Err(_) if there was an error
Trait Implementations§
Source§impl<S: Clone + MessageStorage> Clone for MultiRelayMessageManager<S>
impl<S: Clone + MessageStorage> Clone for MultiRelayMessageManager<S>
Source§fn clone(&self) -> MultiRelayMessageManager<S>
fn clone(&self) -> MultiRelayMessageManager<S>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl<S: MessageStorage> Drop for MultiRelayMessageManager<S>
impl<S: MessageStorage> Drop for MultiRelayMessageManager<S>
Source§impl<S: MessageStorage + 'static> MessagesManagerTrait for MultiRelayMessageManager<S>
impl<S: MessageStorage + 'static> MessagesManagerTrait for MultiRelayMessageManager<S>
Source§fn message_events_stream(&self) -> Receiver<MessageEvent>
fn message_events_stream(&self) -> Receiver<MessageEvent>
Get a stream of all message events from all relays
Source§fn get_subscription_state_updates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Subscriber<SubscriptionState, AsyncLock>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_subscription_state_updates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Subscriber<SubscriptionState, AsyncLock>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Subscribe to subscription state changes (aggregated from all relays)
Source§fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Subscribe to messages on all connected relays
Source§fn publish<'life0, 'async_trait>(
&'life0 self,
message: MessageFull,
) -> Pin<Box<dyn Future<Output = Result<PublishResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn publish<'life0, 'async_trait>(
&'life0 self,
message: MessageFull,
) -> Pin<Box<dyn Future<Output = Result<PublishResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Publish a message to available relays or queue for offline delivery
Source§fn ensure_contains_filter<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ensure_contains_filter<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Ensure a filter is included in the subscription on all connected relays
Source§fn messages_stream(&self) -> Receiver<StreamMessage>
fn messages_stream(&self) -> Receiver<StreamMessage>
Get a stream of incoming messages from all relays
Source§fn catch_up_stream(&self) -> Receiver<CatchUpResponse>
fn catch_up_stream(&self) -> Receiver<CatchUpResponse>
Get a stream of catch-up responses from all relays
Source§fn filtered_messages_stream(
&self,
_filter: Filter,
) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>
fn filtered_messages_stream( &self, _filter: Filter, ) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>
Get a filtered stream of messages matching the given filter from all relays
Source§fn catch_up_and_subscribe<'life0, 'async_trait>(
&'life0 self,
_filter: Filter,
_since: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn catch_up_and_subscribe<'life0, 'async_trait>(
&'life0 self,
_filter: Filter,
_since: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Catch up to historical messages and subscribe to new ones for a filter
Source§fn user_data<'life0, 'async_trait>(
&'life0 self,
author: KeyId,
storage_key: StoreKey,
) -> Pin<Box<dyn Future<Output = Result<Option<MessageFull>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn user_data<'life0, 'async_trait>(
&'life0 self,
author: KeyId,
storage_key: StoreKey,
) -> Pin<Box<dyn Future<Output = Result<Option<MessageFull>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Get user data by author and storage key from any available relay
Source§fn check_messages<'life0, 'async_trait>(
&'life0 self,
message_ids: Vec<MessageId>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Option<String>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn check_messages<'life0, 'async_trait>(
&'life0 self,
message_ids: Vec<MessageId>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Option<String>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Check which messages exist on any connected relay Returns the first successful result from any relay