MessagesManager

Struct MessagesManager 

Source
pub struct MessagesManager {
    messages_service: Arc<MessagesService>,
    broadcast_tx: Arc<Sender<StreamMessage>>,
    catch_up_tx: Arc<Sender<CatchUpResponse>>,
    message_events_tx: Arc<Sender<MessageEvent>>,
    _broadcast_keeper: InactiveReceiver<StreamMessage>,
    _catch_up_keeper: InactiveReceiver<CatchUpResponse>,
    _message_events_keeper: InactiveReceiver<MessageEvent>,
    state: SharedObservable<SubscriptionState, AsyncLock>,
    _sync_handler: Arc<AbortOnDrop<Result<()>>>,
    catch_up_request_id: Arc<AtomicU32>,
}
Expand description

High-level messages manager that provides a unified interface for message operations.

The MessagesManager combines message broadcasting and subscription management:

  • Message Broadcasting: Distributes incoming messages to multiple subscribers
  • Subscription Management: Manages server-side subscriptions with in-flight updates
  • Stream Filtering: Provides client-side filtering and routing capabilities
  • Lifecycle Management: Automatic subscription creation, updates, and cleanup

This is the primary interface for interacting with the messaging system.

Fields§

§messages_service: Arc<MessagesService>

The underlying messages service for RPC operations

§broadcast_tx: Arc<Sender<StreamMessage>>

Broadcast sender for distributing messages to subscribers

§catch_up_tx: Arc<Sender<CatchUpResponse>>

Broadcast sender for distributing catch-up responses to subscribers

§message_events_tx: Arc<Sender<MessageEvent>>

Broadcast sender for all message events (for persistence and monitoring)

§_broadcast_keeper: InactiveReceiver<StreamMessage>

Keeper receiver to prevent broadcast channel closure (not actively used)

§_catch_up_keeper: InactiveReceiver<CatchUpResponse>

Keeper receiver to prevent catch-up channel closure (not actively used)

§_message_events_keeper: InactiveReceiver<MessageEvent>

Keeper receiver to prevent message events channel closure (not actively used)

§state: SharedObservable<SubscriptionState, AsyncLock>

Current subscription state (persistent across reconnections)

§_sync_handler: Arc<AbortOnDrop<Result<()>>>

Background task handle for syncing with the server

§catch_up_request_id: Arc<AtomicU32>

Catch-up request ID counter

Implementations§

Source§

impl MessagesManager

Source

pub fn builder() -> MessagesManagerBuilder

Source

fn safe_broadcast<T: Clone>(sender: &Sender<T>, message: T, context: &str)

Helper function to safely broadcast messages using try_broadcast Handles TrySendError cases gracefully without panicking

Source

fn new_with_state( messages_service: MessagesService, messages_stream: MessagesStream, catch_up_stream: CatchUpStream, state: SubscriptionState, buffer_size: Option<usize>, ) -> Self

Create a new MessagesManager with existing subscription state.

This allows restoring a manager to a previous state after reconnection.

§Arguments
  • messages_service - The underlying messages service for RPC operations
  • messages_stream - The stream of messages from the server
  • state - Previous subscription state to restore
  • buffer_size - Optional buffer size for the broadcast channel (default: 1000)
Source

pub async fn subscribe(&self) -> Result<()>

Source

pub async fn ensure_contains_filter(&self, filter: Filter) -> Result<()>

Source

pub async fn catch_up_and_subscribe( self, filter: Filter, since: Option<String>, ) -> Result<impl Stream<Item = Box<MessageFull>>>

Source

pub fn filtered_messages_stream( self, filter: Filter, ) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>

Source

pub fn filtered_fn<F, T>(self, filter: F) -> impl Stream<Item = T>
where F: Fn(StreamMessage) -> Option<T> + Send + Clone + 'static,

Get a filtered stream of messages.

This creates a client-side filtered stream from the internal broadcast channel. The filter function is applied to all messages received by the manager.

§Arguments
  • filter - A function that returns true for messages to include
§Returns

A stream of messages that match the filter

Source

pub fn all_messages_stream(&self) -> Receiver<StreamMessage>

Get a stream of all messages.

§Returns

A stream of all messages received by the manager

Source

pub async fn publish(&self, message: MessageFull) -> Result<PublishResult>

Source

pub async fn get_subscription_state(&self) -> SubscriptionState

Get the current subscription state for persistence.

This state can be saved and later restored using MessagesManagerBuilder.

§Returns

A clone of the current subscription state

Source

pub async fn get_latest_stream_height(&self) -> Option<String>

Get the latest stream height received.

This can be used to determine how up-to-date the client is.

Source

pub async fn get_current_filters(&self) -> MessageFilters

Get the current combined filters.

This shows all the filters that are currently active in the subscription.

Source

pub fn message_events_stream(&self) -> Receiver<MessageEvent>

Get a stream of all message events for persistence and monitoring.

This stream captures every message activity including:

  • Messages received from subscriptions
  • Messages sent by this client
  • Historical messages from catch-up requests
  • Stream height updates
  • Catch-up completion notifications

This is primarily intended for persistence services and audit logging.

§Returns

A stream of MessageEvent that captures all message activities

Trait Implementations§

Source§

impl Clone for MessagesManager

Source§

fn clone(&self) -> MessagesManager

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl MessagesManagerTrait for MessagesManager

Source§

fn message_events_stream(&self) -> Receiver<MessageEvent>

Get a stream of all message events for persistence and monitoring
Source§

fn get_subscription_state_updates<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Subscriber<SubscriptionState, AsyncLock>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to subscription state changes for reactive programming Read more
Source§

fn subscribe<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to messages with current filters
Source§

fn publish<'life0, 'async_trait>( &'life0 self, message: MessageFull, ) -> Pin<Box<dyn Future<Output = Result<PublishResult>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Publish a message
Source§

fn ensure_contains_filter<'life0, 'async_trait>( &'life0 self, filter: Filter, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Ensure a filter is included in the subscription
Source§

fn messages_stream(&self) -> Receiver<StreamMessage>

Get a stream of incoming messages
Source§

fn catch_up_stream(&self) -> Receiver<CatchUpResponse>

Get a stream of catch-up responses
Source§

fn filtered_messages_stream( &self, filter: Filter, ) -> Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>

Get a filtered stream of messages matching the given filter
Source§

fn catch_up_and_subscribe<'life0, 'async_trait>( &'life0 self, filter: Filter, since: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Box<MessageFull>> + Send>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Catch up to historical messages and subscribe to new ones for a filter
Source§

fn user_data<'life0, 'async_trait>( &'life0 self, author: KeyId, storage_key: StoreKey, ) -> Pin<Box<dyn Future<Output = Result<Option<MessageFull>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Get user data by author and storage key (for PQXDH inbox fetching)
Source§

fn check_messages<'life0, 'async_trait>( &'life0 self, message_ids: Vec<MessageId>, ) -> Pin<Box<dyn Future<Output = Result<Vec<Option<String>>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Check which messages the server already has and return their global stream IDs. Returns a vec of Option<String> in the same order as the input, where: Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Classify for T

§

type Classified = T

§

fn classify(self) -> T

§

impl<T> Classify for T

§

type Classified = T

§

fn classify(self) -> T

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
§

impl<T> CompatExt for T

§

fn compat(self) -> Compat<T>

Applies the [Compat] adapter by value. Read more
§

fn compat_ref(&self) -> Compat<&T>

Applies the [Compat] adapter by shared reference. Read more
§

fn compat_mut(&mut self) -> Compat<&mut T>

Applies the [Compat] adapter by mutable reference. Read more
§

impl<T> Declassify for T

§

type Declassified = T

§

fn declassify(self) -> T

§

impl<T> Declassify for T

§

type Declassified = T

§

fn declassify(self) -> T

Source§

impl<T> DynClone for T
where T: Clone,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> DartSafe for T

§

impl<T> ErasedDestructor for T
where T: 'static,

§

impl<T> TaskRetFutTrait for T
where T: Send,