PqxdhProtocolHandler

Struct PqxdhProtocolHandler 

Source
pub struct PqxdhProtocolHandler<T: MessagesManagerTrait> {
    messages_manager: Arc<T>,
    client_keypair: Arc<KeyPair>,
    pub(crate) state: SharedObservable<PqxdhProtocolState, AsyncLock>,
}
Expand description

A complete PQXDH protocol handler that encapsulates all session management, key observation, subscription handling, and message routing logic.

This provides a high-level abstraction over the entire PQXDH workflow with automatic state management and persistence support. It can operate in two modes:

§Service Provider Mode

  • Publishes a PQXDH inbox for clients to discover
  • Listens for incoming client connections
  • Manages multiple concurrent client sessions
  • Handles initial message decryption and session establishment

§Client Mode

  • Discovers and connects to service provider inboxes
  • Establishes secure sessions with service providers
  • Sends messages over established sessions
  • Manages session state and channel subscriptions

§Key Features

  • State Persistence: All state can be serialized and restored across restarts
  • State Observation: Observable state changes via broadcast channels for reactive programming
  • Automatic Subscriptions: Handles message routing and channel management
  • Session Management: Tracks multiple concurrent sessions by user ID
  • Privacy Preserving: Uses randomized channel IDs and derived tags
  • Type Safety: Generic over message payload types with compile-time safety

Fields§

§messages_manager: Arc<T>§client_keypair: Arc<KeyPair>§state: SharedObservable<PqxdhProtocolState, AsyncLock>

Observable state that can be subscribed to for reactive programming

Implementations§

Source§

impl<T: MessagesManagerTrait + 'static> PqxdhProtocolHandler<T>

Source

pub fn new( messages_manager: Arc<T>, client_keypair: Arc<KeyPair>, protocol: PqxdhInboxProtocol, ) -> Self

Creates a new protocol handler for a specific PQXDH protocol

This creates a fresh handler with empty state that can be used either as:

  • Service Provider: Call publish_service() then use inbox_stream()
  • Client: Call connect_to_service() then send_message()
§Arguments
  • messages_manager - The messages manager to use for message operations
  • client_keypair - The client’s keypair for signing and encryption
  • protocol - The specific PQXDH protocol variant to use
§Example
let handler = PqxdhProtocolHandler::new(
    &messages_manager,
    &keypair,
    PqxdhInboxProtocol::EchoService
);
Source

pub fn from_state( messages_manager: Arc<T>, client_keypair: Arc<KeyPair>, state: PqxdhProtocolState, ) -> Self

Creates a protocol handler from existing serialized state

This allows restoring a handler across application restarts by loading previously serialized state from a database or file. All sessions and cryptographic state will be restored to their previous state.

§Arguments
  • messages_manager - The messages manager to use for message operations
  • client_keypair - The client’s keypair for signing and encryption
  • state - Previously serialized protocol state
§Example
// Load state from storage
let saved_state: PqxdhProtocolState = load_from_database()?;

// Restore handler with previous state
let handler = PqxdhProtocolHandler::from_state(
    &messages_manager,
    &keypair,
    saved_state
);
Source

pub async fn subscribe_to_state( &self, ) -> Subscriber<PqxdhProtocolState, AsyncLock>

Subscribe to state changes for reactive programming

This method returns a broadcast receiver that can be used to observe changes to the protocol handler’s internal state. This is useful for building reactive UIs or implementing state-dependent logic that needs to respond to changes in session state, inbox status, or other protocol state.

§Returns

Returns a broadcast::Receiver<PqxdhProtocolState> that receives state updates

§Example
// Subscribe to state changes
let mut state_receiver = handler.subscribe_to_state();

// Get current state
let current_state = handler.current_state();
println!("Current sessions: {}", current_state.sessions.len());

// Watch for state changes
let mut state_stream = state_receiver.subscribe();
while let Some(new_state) = state_stream.next().await {
    println!("State updated! Sessions: {}", new_state.sessions.len());
}
Source

pub async fn publish_service(&self, force_overwrite: bool) -> Result<Tag>

Publishes a service inbox for this protocol (SERVICE PROVIDERS ONLY)

This makes the current client discoverable as a service provider for the given protocol. It generates fresh prekey bundles, publishes the inbox to the message store, and sets up the necessary subscriptions for receiving client connections.

Only call this if you want to provide a service that others can connect to. After calling this, use inbox_stream() to listen for incoming client messages.

§Arguments
  • force_overwrite - If true, overwrites any existing published inbox
§Returns

Returns the Tag of the published inbox

§Errors

Returns an error if an inbox is already published and force_overwrite is false, or if the publishing process fails.

§Example
// Publish service for clients to discover
let inbox_tag = handler.publish_service(false).await?;
println!("Service published with tag: {:?}", inbox_tag);

// Now listen for client connections
let mut inbox_stream = Box::pin(handler.inbox_stream::<String>().await?);
Source

pub async fn connect_to_service<O, I>( &self, target_service_key: &VerifyingKey, initial_message: &O, ) -> Result<(PqxdhSessionId, PqxdhMessageListener<I>)>
where O: Serialize + for<'de> Deserialize<'de> + Clone, I: for<'de> Deserialize<'de> + Clone + 'static,

Connects to a service provider’s inbox (CLIENTS ONLY)

This discovers the target service provider’s inbox and establishes a secure PQXDH session. It performs the full PQXDH key exchange protocol and sets up the necessary subscriptions for receiving responses from the service.

Use this when you want to connect to a service as a client. After calling this, use send_message() to send additional messages to the service.

§Arguments
  • target_service_key - The public key of the service provider to connect to
  • initial_message - The first message to send as part of the connection
§Returns

Returns a stream of messages from the service provider

§Example
// Connect to service and send initial message
let mut response_stream = Box::pin(handler.connect_to_service::<String, String>(
    &service_key,
    &"Hello, service!".to_string()
).await?);

// Listen for responses
while let Some(message) = response_stream.next().await {
    // Handle service responses
}
Source

pub async fn listen_for_messages<I>( &self, my_session_id: PqxdhSessionId, catch_up: bool, ) -> Result<PqxdhMessageListener<I>>
where I: for<'de> Deserialize<'de>,

Source

pub async fn tarpc_transport<Req, Resp>( &self, session_id: PqxdhSessionId, ) -> Result<PqxdhTarpcTransport<Req, Resp>>
where Req: for<'de> Deserialize<'de> + Send + 'static, Resp: Serialize + Send + Sync + 'static,

Source

pub async fn send_message<U>( &self, session_id: &PqxdhSessionId, message: &U, ) -> Result<()>
where U: Serialize + for<'de> Deserialize<'de> + Clone,

Sends a message to an established session (CLIENTS ONLY)

Use this to send additional messages after calling connect_to_service(). The message will be encrypted and sent over the established secure PQXDH session using the session’s private channel.

§Arguments
  • session_id - The session ID of the established PQXDH session
  • message - The message payload to encrypt and send
§Errors

Returns an error if no active session exists with the given session ID

§Example
// First establish connection
let _stream = handler.connect_to_service::<String, String>(&service_key, &"initial".to_string()).await?;

// Send follow-up messages using session ID
handler.send_message(&session_id, &"follow-up message".to_string()).await?;
handler.send_message(&session_id, &"another message".to_string()).await?;
Source

pub async fn send_ephemeral_message<U>( &self, session_id: &PqxdhSessionId, message: &U, timeout: u32, ) -> Result<()>
where U: Serialize + for<'de> Deserialize<'de> + Clone,

Source

pub async fn inbox_stream<U>( &self, ) -> Result<impl Stream<Item = (PqxdhSessionId, U)>>
where U: for<'de> Deserialize<'de>,

Creates a stream of messages that arrive to our inbox (SERVICE PROVIDERS ONLY)

This method returns a stream of incoming messages from clients who are connecting to or communicating with this service. The stream includes both initial PQXDH messages (new client connections) and session messages (ongoing communication).

§Returns

Returns a stream of (PqxdhSessionId, T) tuples where:

  • PqxdhSessionId is the session ID for the client connection
  • T is the deserialized message payload from the client
§Errors

Returns an error if publish_service() has not been called first

§Example
// First publish the service
handler.publish_service(false).await?;

// Then listen for client messages
let mut inbox_stream = Box::pin(handler.inbox_stream::<String>().await?);
while let Some((session_id, message)) = inbox_stream.next().await {
    println!("Received message from session {:?}: {}", session_id, message);
     
    // Echo the message back to the client
    handler.send_message(&session_id, &format!("Echo: {}", message)).await?;
}
Source§

impl<T: MessagesManagerTrait> PqxdhProtocolHandler<T>

Source

async fn fetch_pqxdh_inbox<U: for<'de> Deserialize<'de>>( &self, provider_key: &VerifyingKey, protocol: PqxdhInboxProtocol, ) -> Result<(U, Tag)>

Fetch a PQXDH inbox using the trait method

Source

async fn initiate_session<U: Serialize>( &self, target_public_key: VerifyingKey, inbox: &PqxdhInbox, target_tags: Vec<Tag>, initial_payload: &U, ) -> Result<PqxdhSession>

Initiates a PQXDH session with a target user using an already loaded inbox

Source

async fn on_incoming_inbox_message<U>( state: &SharedObservable<PqxdhProtocolState, AsyncLock>, my_public_key: &VerifyingKey, message_full: &MessageFull, ) -> Result<(PqxdhSessionId, U)>
where U: for<'de> Deserialize<'de>,

Source

async fn send_message_inner<U>( &self, session_id: &PqxdhSessionId, message: &U, kind: Kind, ) -> Result<()>
where U: Serialize,

Trait Implementations§

Source§

impl<T: MessagesManagerTrait> Clone for PqxdhProtocolHandler<T>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<Resp, T: MessagesManagerTrait> PqxdhTarpcTransportSender<Resp> for PqxdhProtocolHandler<T>
where Resp: Serialize + Send + Sync,

Source§

fn send_response<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, session_id: &'life1 PqxdhSessionId, resp: &'life2 Resp, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> Classify for T

§

type Classified = T

§

fn classify(self) -> T

§

impl<T> Classify for T

§

type Classified = T

§

fn classify(self) -> T

Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
§

impl<T> CompatExt for T

§

fn compat(self) -> Compat<T>

Applies the [Compat] adapter by value. Read more
§

fn compat_ref(&self) -> Compat<&T>

Applies the [Compat] adapter by shared reference. Read more
§

fn compat_mut(&mut self) -> Compat<&mut T>

Applies the [Compat] adapter by mutable reference. Read more
§

impl<T> Declassify for T

§

type Declassified = T

§

fn declassify(self) -> T

§

impl<T> Declassify for T

§

type Declassified = T

§

fn declassify(self) -> T

Source§

impl<T> DynClone for T
where T: Clone,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> DartSafe for T

§

impl<T> ErasedDestructor for T
where T: 'static,

§

impl<T> TaskRetFutTrait for T
where T: Send,