pub struct MessagesManager {
messages_service: Arc<MessagesService>,
broadcast_tx: Arc<Sender<StreamMessage>>,
catch_up_tx: Arc<Sender<CatchUpResponse>>,
message_events_tx: Arc<Sender<MessageEvent>>,
_broadcast_keeper: InactiveReceiver<StreamMessage>,
_catch_up_keeper: InactiveReceiver<CatchUpResponse>,
_message_events_keeper: InactiveReceiver<MessageEvent>,
state: SharedObservable<SubscriptionState, AsyncLock>,
_sync_handler: Arc<AbortOnDrop<Result<()>>>,
catch_up_request_id: Arc<AtomicU32>,
}Expand description
High-level messages manager that provides a unified interface for message operations.
The MessagesManager combines message broadcasting and subscription management:
- Message Broadcasting: Distributes incoming messages to multiple subscribers
- Subscription Management: Manages server-side subscriptions with in-flight updates
- Stream Filtering: Provides client-side filtering and routing capabilities
- Lifecycle Management: Automatic subscription creation, updates, and cleanup
This is the primary interface for interacting with the messaging system.
Fields§
§messages_service: Arc<MessagesService>The underlying messages service for RPC operations
broadcast_tx: Arc<Sender<StreamMessage>>Broadcast sender for distributing messages to subscribers
catch_up_tx: Arc<Sender<CatchUpResponse>>Broadcast sender for distributing catch-up responses to subscribers
message_events_tx: Arc<Sender<MessageEvent>>Broadcast sender for all message events (for persistence and monitoring)
_broadcast_keeper: InactiveReceiver<StreamMessage>Keeper receiver to prevent broadcast channel closure (not actively used)
_catch_up_keeper: InactiveReceiver<CatchUpResponse>Keeper receiver to prevent catch-up channel closure (not actively used)
_message_events_keeper: InactiveReceiver<MessageEvent>Keeper receiver to prevent message events channel closure (not actively used)
state: SharedObservable<SubscriptionState, AsyncLock>Current subscription state (persistent across reconnections)
_sync_handler: Arc<AbortOnDrop<Result<()>>>Background task handle for syncing with the server
catch_up_request_id: Arc<AtomicU32>Catch-up request ID counter
Implementations§
Source§impl MessagesManager
impl MessagesManager
pub fn builder() -> MessagesManagerBuilder
Sourcefn safe_broadcast<T: Clone>(sender: &Sender<T>, message: T, context: &str)
fn safe_broadcast<T: Clone>(sender: &Sender<T>, message: T, context: &str)
Helper function to safely broadcast messages using try_broadcast Handles TrySendError cases gracefully without panicking
Sourcefn new_with_state(
messages_service: MessagesService,
messages_stream: MessagesStream,
catch_up_stream: CatchUpStream,
state: SubscriptionState,
buffer_size: Option<usize>,
) -> Self
fn new_with_state( messages_service: MessagesService, messages_stream: MessagesStream, catch_up_stream: CatchUpStream, state: SubscriptionState, buffer_size: Option<usize>, ) -> Self
Create a new MessagesManager with existing subscription state.
This allows restoring a manager to a previous state after reconnection.
§Arguments
messages_service- The underlying messages service for RPC operationsmessages_stream- The stream of messages from the serverstate- Previous subscription state to restorebuffer_size- Optional buffer size for the broadcast channel (default: 1000)
pub async fn subscribe(&self) -> Result<()>
pub async fn ensure_contains_filter(&self, filter: Filter) -> Result<()>
pub async fn catch_up_and_subscribe( self, filter: Filter, since: Option<String>, ) -> Result<impl Stream<Item = Box<MessageFull>>>
pub fn filtered_messages_stream( self, filter: Filter, ) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>
Sourcepub fn filtered_fn<F, T>(self, filter: F) -> impl Stream<Item = T>
pub fn filtered_fn<F, T>(self, filter: F) -> impl Stream<Item = T>
Get a filtered stream of messages.
This creates a client-side filtered stream from the internal broadcast channel. The filter function is applied to all messages received by the manager.
§Arguments
filter- A function that returns true for messages to include
§Returns
A stream of messages that match the filter
Sourcepub fn all_messages_stream(&self) -> Receiver<StreamMessage>
pub fn all_messages_stream(&self) -> Receiver<StreamMessage>
pub async fn publish(&self, message: MessageFull) -> Result<PublishResult>
Sourcepub async fn get_subscription_state(&self) -> SubscriptionState
pub async fn get_subscription_state(&self) -> SubscriptionState
Get the current subscription state for persistence.
This state can be saved and later restored using MessagesManagerBuilder.
§Returns
A clone of the current subscription state
Sourcepub async fn get_latest_stream_height(&self) -> Option<String>
pub async fn get_latest_stream_height(&self) -> Option<String>
Get the latest stream height received.
This can be used to determine how up-to-date the client is.
Sourcepub async fn get_current_filters(&self) -> MessageFilters
pub async fn get_current_filters(&self) -> MessageFilters
Get the current combined filters.
This shows all the filters that are currently active in the subscription.
Sourcepub fn message_events_stream(&self) -> Receiver<MessageEvent>
pub fn message_events_stream(&self) -> Receiver<MessageEvent>
Get a stream of all message events for persistence and monitoring.
This stream captures every message activity including:
- Messages received from subscriptions
- Messages sent by this client
- Historical messages from catch-up requests
- Stream height updates
- Catch-up completion notifications
This is primarily intended for persistence services and audit logging.
§Returns
A stream of MessageEvent that captures all message activities
Trait Implementations§
Source§impl Clone for MessagesManager
impl Clone for MessagesManager
Source§fn clone(&self) -> MessagesManager
fn clone(&self) -> MessagesManager
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl MessagesManagerTrait for MessagesManager
impl MessagesManagerTrait for MessagesManager
Source§fn message_events_stream(&self) -> Receiver<MessageEvent>
fn message_events_stream(&self) -> Receiver<MessageEvent>
Source§fn get_subscription_state_updates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Subscriber<SubscriptionState, AsyncLock>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_subscription_state_updates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Subscriber<SubscriptionState, AsyncLock>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn publish<'life0, 'async_trait>(
&'life0 self,
message: MessageFull,
) -> Pin<Box<dyn Future<Output = Result<PublishResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn publish<'life0, 'async_trait>(
&'life0 self,
message: MessageFull,
) -> Pin<Box<dyn Future<Output = Result<PublishResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn ensure_contains_filter<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ensure_contains_filter<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn messages_stream(&self) -> Receiver<StreamMessage>
fn messages_stream(&self) -> Receiver<StreamMessage>
Source§fn catch_up_stream(&self) -> Receiver<CatchUpResponse>
fn catch_up_stream(&self) -> Receiver<CatchUpResponse>
Source§fn filtered_messages_stream(
&self,
filter: Filter,
) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>
fn filtered_messages_stream( &self, filter: Filter, ) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>
Source§fn catch_up_and_subscribe<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
since: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn catch_up_and_subscribe<'life0, 'async_trait>(
&'life0 self,
filter: Filter,
since: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn user_data<'life0, 'async_trait>(
&'life0 self,
author: KeyId,
storage_key: StoreKey,
) -> Pin<Box<dyn Future<Output = Result<Option<MessageFull>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn user_data<'life0, 'async_trait>(
&'life0 self,
author: KeyId,
storage_key: StoreKey,
) -> Pin<Box<dyn Future<Output = Result<Option<MessageFull>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn check_messages<'life0, 'async_trait>(
&'life0 self,
message_ids: Vec<MessageId>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Option<String>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn check_messages<'life0, 'async_trait>(
&'life0 self,
message_ids: Vec<MessageId>,
) -> Pin<Box<dyn Future<Output = Result<Vec<Option<String>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Option<String> in the same order as the input, where: Read more