zoe_wire_protocol/relay/
stream_pair.rs

1use quinn::{RecvStream, SendStream};
2use tarpc::tokio_serde;
3use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
4
5use tokio::io::AsyncWriteExt;
6
7use super::postcard::PostcardFormat;
8
9/// A pair of streams for bi-directional communication
10#[derive(Debug)]
11pub struct StreamPair {
12    /// Stream for receiving data from the client
13    pub recv: RecvStream,
14    /// Stream for sending data to the client
15    pub send: SendStream,
16}
17
18type WrappedStream = FramedRead<RecvStream, LengthDelimitedCodec>;
19type WrappedSink = FramedWrite<SendStream, LengthDelimitedCodec>;
20
21// only dealing with one half of the IO
22type SerStream<I> = tokio_serde::Framed<WrappedStream, I, I, PostcardFormat>;
23type DeSink<I> = tokio_serde::Framed<WrappedSink, I, I, PostcardFormat>;
24
25impl StreamPair {
26    pub fn new(recv: RecvStream, send: SendStream) -> Self {
27        Self { recv, send }
28    }
29
30    pub async fn send_ack(&mut self) -> std::io::Result<()> {
31        self.send.write_u8(1).await
32    }
33
34    pub fn unpack_transports<S, D>(self) -> (SerStream<S>, DeSink<D>) {
35        let StreamPair { recv, send } = self;
36        create_postcard_streams(recv, send)
37    }
38}
39
40pub fn create_postcard_streams<S, D>(
41    recv: RecvStream,
42    send: SendStream,
43) -> (SerStream<S>, DeSink<D>) {
44    let wrapped_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
45    let wrapped_send = FramedWrite::new(send, LengthDelimitedCodec::new());
46    let ser_stream = tokio_serde::Framed::new(wrapped_recv, PostcardFormat);
47    let de_sink = tokio_serde::Framed::new(wrapped_send, PostcardFormat);
48    (ser_stream, de_sink)
49}