pub struct RedisMessageStorage {
pub conn: Arc<Mutex<ConnectionManager>>,
pub client: Client,
}Expand description
Redis-backed storage for the relay service
Fields§
§conn: Arc<Mutex<ConnectionManager>>§client: ClientImplementations§
Source§impl RedisMessageStorage
impl RedisMessageStorage
async fn get_inner<R: FromRedisValue>(&self, id: &str) -> Result<Option<R>>
Sourceasync fn get_inner_raw(&self, id: &str) -> Result<Option<Vec<u8>>>
async fn get_inner_raw(&self, id: &str) -> Result<Option<Vec<u8>>>
Retrieve a specific message by ID as its raw data
Sourceasync fn get_inner_full(&self, id: &str) -> Result<Option<MessageFull>>
async fn get_inner_full(&self, id: &str) -> Result<Option<MessageFull>>
Retrieve a specific string
async fn get_message_full( conn: &mut ConnectionManager, id: &[u8], ) -> Result<Option<MessageFull>>
async fn get_full( conn: &mut ConnectionManager, id: &str, ) -> Result<Option<MessageFull>>
async fn add_to_index_stream( conn: &mut ConnectionManager, stream_name: &str, message_id: &[u8], stream_height: &str, expiration_time: Option<u64>, ) -> Result<String>
Source§impl RedisMessageStorage
impl RedisMessageStorage
Sourcefn is_expired_from_timestamp(expiration_time: u64, current_time: u64) -> bool
fn is_expired_from_timestamp(expiration_time: u64, current_time: u64) -> bool
Check if a message is expired based on raw expiration time This is used when we only have the expiration timestamp from Redis metadata and matches the logic in MessageFull::is_expired()
Sourcepub async fn get_message_raw(&self, id: &[u8]) -> Result<Option<Vec<u8>>>
pub async fn get_message_raw(&self, id: &[u8]) -> Result<Option<Vec<u8>>>
Retrieve a specific message by ID as its raw data
Sourcepub async fn store_message(
&self,
message: &MessageFull,
) -> Result<PublishResult>
pub async fn store_message( &self, message: &MessageFull, ) -> Result<PublishResult>
Store a message in Redis and publish to stream for real-time delivery Returns PublishResult indicating if message was newly stored or already existed with stream ID
This method uses a Lua script to ensure atomicity of core operations:
- Message storage (SET NX)
- Global stream addition (XADD)
- Stream ID mapping (SET)
Sourcepub async fn check_messages(
&self,
message_ids: &[MessageId],
) -> Result<Vec<Option<String>>>
pub async fn check_messages( &self, message_ids: &[MessageId], ) -> Result<Vec<Option<String>>>
Check which messages the server already has and return their global stream IDs.
Returns a vec of Option<String> in the same order as the input, where:
Some(stream_id)means the server has the message with that global stream IDNonemeans the server doesn’t have this message yet
Sourcepub async fn get_message(&self, id: &[u8]) -> Result<Option<MessageFull>>
pub async fn get_message(&self, id: &[u8]) -> Result<Option<MessageFull>>
Retrieve a specific message by ID
Sourcepub async fn catch_up<'a>(
&'a self,
filter: &Filter,
since: Option<String>,
) -> Result<impl Stream<Item = Result<CatchUpItem>> + 'a>
pub async fn catch_up<'a>( &'a self, filter: &Filter, since: Option<String>, ) -> Result<impl Stream<Item = Result<CatchUpItem>> + 'a>
Catch up on a specific filter stream
Sourcepub async fn listen_for_messages<'a>(
&'a self,
filters: &'a MessageFilters,
since: Option<String>,
limit: Option<usize>,
) -> Result<impl Stream<Item = Result<GlobalStreamItem>> + 'a>
pub async fn listen_for_messages<'a>( &'a self, filters: &'a MessageFilters, since: Option<String>, limit: Option<usize>, ) -> Result<impl Stream<Item = Result<GlobalStreamItem>> + 'a>
Listen for messages matching filters (streaming)
pub async fn get_user_data( &self, user_id: KeyId, key: StoreKey, ) -> Result<Option<MessageFull>>
Trait Implementations§
Source§impl Clone for RedisMessageStorage
impl Clone for RedisMessageStorage
Source§fn clone(&self) -> RedisMessageStorage
fn clone(&self) -> RedisMessageStorage
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more