pub struct PqxdhProtocolHandler<T: MessagesManagerTrait> {
messages_manager: Arc<T>,
client_keypair: Arc<KeyPair>,
pub(crate) state: SharedObservable<PqxdhProtocolState, AsyncLock>,
}Expand description
A complete PQXDH protocol handler that encapsulates all session management, key observation, subscription handling, and message routing logic.
This provides a high-level abstraction over the entire PQXDH workflow with automatic state management and persistence support. It can operate in two modes:
§Service Provider Mode
- Publishes a PQXDH inbox for clients to discover
- Listens for incoming client connections
- Manages multiple concurrent client sessions
- Handles initial message decryption and session establishment
§Client Mode
- Discovers and connects to service provider inboxes
- Establishes secure sessions with service providers
- Sends messages over established sessions
- Manages session state and channel subscriptions
§Key Features
- State Persistence: All state can be serialized and restored across restarts
- State Observation: Observable state changes via broadcast channels for reactive programming
- Automatic Subscriptions: Handles message routing and channel management
- Session Management: Tracks multiple concurrent sessions by user ID
- Privacy Preserving: Uses randomized channel IDs and derived tags
- Type Safety: Generic over message payload types with compile-time safety
Fields§
§messages_manager: Arc<T>§client_keypair: Arc<KeyPair>§state: SharedObservable<PqxdhProtocolState, AsyncLock>Observable state that can be subscribed to for reactive programming
Implementations§
Source§impl<T: MessagesManagerTrait + 'static> PqxdhProtocolHandler<T>
impl<T: MessagesManagerTrait + 'static> PqxdhProtocolHandler<T>
Sourcepub fn new(
messages_manager: Arc<T>,
client_keypair: Arc<KeyPair>,
protocol: PqxdhInboxProtocol,
) -> Self
pub fn new( messages_manager: Arc<T>, client_keypair: Arc<KeyPair>, protocol: PqxdhInboxProtocol, ) -> Self
Creates a new protocol handler for a specific PQXDH protocol
This creates a fresh handler with empty state that can be used either as:
- Service Provider: Call
publish_service()then useinbox_stream() - Client: Call
connect_to_service()thensend_message()
§Arguments
messages_manager- The messages manager to use for message operationsclient_keypair- The client’s keypair for signing and encryptionprotocol- The specific PQXDH protocol variant to use
§Example
let handler = PqxdhProtocolHandler::new(
&messages_manager,
&keypair,
PqxdhInboxProtocol::EchoService
);Sourcepub fn from_state(
messages_manager: Arc<T>,
client_keypair: Arc<KeyPair>,
state: PqxdhProtocolState,
) -> Self
pub fn from_state( messages_manager: Arc<T>, client_keypair: Arc<KeyPair>, state: PqxdhProtocolState, ) -> Self
Creates a protocol handler from existing serialized state
This allows restoring a handler across application restarts by loading previously serialized state from a database or file. All sessions and cryptographic state will be restored to their previous state.
§Arguments
messages_manager- The messages manager to use for message operationsclient_keypair- The client’s keypair for signing and encryptionstate- Previously serialized protocol state
§Example
// Load state from storage
let saved_state: PqxdhProtocolState = load_from_database()?;
// Restore handler with previous state
let handler = PqxdhProtocolHandler::from_state(
&messages_manager,
&keypair,
saved_state
);Sourcepub async fn subscribe_to_state(
&self,
) -> Subscriber<PqxdhProtocolState, AsyncLock>
pub async fn subscribe_to_state( &self, ) -> Subscriber<PqxdhProtocolState, AsyncLock>
Subscribe to state changes for reactive programming
This method returns a broadcast receiver that can be used to observe changes to the protocol handler’s internal state. This is useful for building reactive UIs or implementing state-dependent logic that needs to respond to changes in session state, inbox status, or other protocol state.
§Returns
Returns a broadcast::Receiver<PqxdhProtocolState> that receives state updates
§Example
// Subscribe to state changes
let mut state_receiver = handler.subscribe_to_state();
// Get current state
let current_state = handler.current_state();
println!("Current sessions: {}", current_state.sessions.len());
// Watch for state changes
let mut state_stream = state_receiver.subscribe();
while let Some(new_state) = state_stream.next().await {
println!("State updated! Sessions: {}", new_state.sessions.len());
}Sourcepub async fn publish_service(&self, force_overwrite: bool) -> Result<Tag>
pub async fn publish_service(&self, force_overwrite: bool) -> Result<Tag>
Publishes a service inbox for this protocol (SERVICE PROVIDERS ONLY)
This makes the current client discoverable as a service provider for the given protocol. It generates fresh prekey bundles, publishes the inbox to the message store, and sets up the necessary subscriptions for receiving client connections.
Only call this if you want to provide a service that others can connect to.
After calling this, use inbox_stream() to listen for incoming client messages.
§Arguments
force_overwrite- If true, overwrites any existing published inbox
§Returns
Returns the Tag of the published inbox
§Errors
Returns an error if an inbox is already published and force_overwrite is false,
or if the publishing process fails.
§Example
// Publish service for clients to discover
let inbox_tag = handler.publish_service(false).await?;
println!("Service published with tag: {:?}", inbox_tag);
// Now listen for client connections
let mut inbox_stream = Box::pin(handler.inbox_stream::<String>().await?);Sourcepub async fn connect_to_service<O, I>(
&self,
target_service_key: &VerifyingKey,
initial_message: &O,
) -> Result<(PqxdhSessionId, PqxdhMessageListener<I>)>where
O: Serialize + for<'de> Deserialize<'de> + Clone,
I: for<'de> Deserialize<'de> + Clone + 'static,
pub async fn connect_to_service<O, I>(
&self,
target_service_key: &VerifyingKey,
initial_message: &O,
) -> Result<(PqxdhSessionId, PqxdhMessageListener<I>)>where
O: Serialize + for<'de> Deserialize<'de> + Clone,
I: for<'de> Deserialize<'de> + Clone + 'static,
Connects to a service provider’s inbox (CLIENTS ONLY)
This discovers the target service provider’s inbox and establishes a secure PQXDH session. It performs the full PQXDH key exchange protocol and sets up the necessary subscriptions for receiving responses from the service.
Use this when you want to connect to a service as a client.
After calling this, use send_message() to send additional messages to the service.
§Arguments
target_service_key- The public key of the service provider to connect toinitial_message- The first message to send as part of the connection
§Returns
Returns a stream of messages from the service provider
§Example
// Connect to service and send initial message
let mut response_stream = Box::pin(handler.connect_to_service::<String, String>(
&service_key,
&"Hello, service!".to_string()
).await?);
// Listen for responses
while let Some(message) = response_stream.next().await {
// Handle service responses
}pub async fn listen_for_messages<I>(
&self,
my_session_id: PqxdhSessionId,
catch_up: bool,
) -> Result<PqxdhMessageListener<I>>where
I: for<'de> Deserialize<'de>,
pub async fn tarpc_transport<Req, Resp>( &self, session_id: PqxdhSessionId, ) -> Result<PqxdhTarpcTransport<Req, Resp>>
Sourcepub async fn send_message<U>(
&self,
session_id: &PqxdhSessionId,
message: &U,
) -> Result<()>
pub async fn send_message<U>( &self, session_id: &PqxdhSessionId, message: &U, ) -> Result<()>
Sends a message to an established session (CLIENTS ONLY)
Use this to send additional messages after calling connect_to_service().
The message will be encrypted and sent over the established secure PQXDH session
using the session’s private channel.
§Arguments
session_id- The session ID of the established PQXDH sessionmessage- The message payload to encrypt and send
§Errors
Returns an error if no active session exists with the given session ID
§Example
// First establish connection
let _stream = handler.connect_to_service::<String, String>(&service_key, &"initial".to_string()).await?;
// Send follow-up messages using session ID
handler.send_message(&session_id, &"follow-up message".to_string()).await?;
handler.send_message(&session_id, &"another message".to_string()).await?;pub async fn send_ephemeral_message<U>( &self, session_id: &PqxdhSessionId, message: &U, timeout: u32, ) -> Result<()>
Sourcepub async fn inbox_stream<U>(
&self,
) -> Result<impl Stream<Item = (PqxdhSessionId, U)>>where
U: for<'de> Deserialize<'de>,
pub async fn inbox_stream<U>(
&self,
) -> Result<impl Stream<Item = (PqxdhSessionId, U)>>where
U: for<'de> Deserialize<'de>,
Creates a stream of messages that arrive to our inbox (SERVICE PROVIDERS ONLY)
This method returns a stream of incoming messages from clients who are connecting to or communicating with this service. The stream includes both initial PQXDH messages (new client connections) and session messages (ongoing communication).
§Returns
Returns a stream of (PqxdhSessionId, T) tuples where:
PqxdhSessionIdis the session ID for the client connectionTis the deserialized message payload from the client
§Errors
Returns an error if publish_service() has not been called first
§Example
// First publish the service
handler.publish_service(false).await?;
// Then listen for client messages
let mut inbox_stream = Box::pin(handler.inbox_stream::<String>().await?);
while let Some((session_id, message)) = inbox_stream.next().await {
println!("Received message from session {:?}: {}", session_id, message);
// Echo the message back to the client
handler.send_message(&session_id, &format!("Echo: {}", message)).await?;
}Source§impl<T: MessagesManagerTrait> PqxdhProtocolHandler<T>
impl<T: MessagesManagerTrait> PqxdhProtocolHandler<T>
Sourceasync fn fetch_pqxdh_inbox<U: for<'de> Deserialize<'de>>(
&self,
provider_key: &VerifyingKey,
protocol: PqxdhInboxProtocol,
) -> Result<(U, Tag)>
async fn fetch_pqxdh_inbox<U: for<'de> Deserialize<'de>>( &self, provider_key: &VerifyingKey, protocol: PqxdhInboxProtocol, ) -> Result<(U, Tag)>
Fetch a PQXDH inbox using the trait method
Sourceasync fn initiate_session<U: Serialize>(
&self,
target_public_key: VerifyingKey,
inbox: &PqxdhInbox,
target_tags: Vec<Tag>,
initial_payload: &U,
) -> Result<PqxdhSession>
async fn initiate_session<U: Serialize>( &self, target_public_key: VerifyingKey, inbox: &PqxdhInbox, target_tags: Vec<Tag>, initial_payload: &U, ) -> Result<PqxdhSession>
Initiates a PQXDH session with a target user using an already loaded inbox