zoe_wire_protocol/relay/
stream_pair.rs1use quinn::{RecvStream, SendStream};
2use tarpc::tokio_serde;
3use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
4
5use tokio::io::AsyncWriteExt;
6
7use super::postcard::PostcardFormat;
8
9#[derive(Debug)]
11pub struct StreamPair {
12 pub recv: RecvStream,
14 pub send: SendStream,
16}
17
18type WrappedStream = FramedRead<RecvStream, LengthDelimitedCodec>;
19type WrappedSink = FramedWrite<SendStream, LengthDelimitedCodec>;
20
21type SerStream<I> = tokio_serde::Framed<WrappedStream, I, I, PostcardFormat>;
23type DeSink<I> = tokio_serde::Framed<WrappedSink, I, I, PostcardFormat>;
24
25impl StreamPair {
26 pub fn new(recv: RecvStream, send: SendStream) -> Self {
27 Self { recv, send }
28 }
29
30 pub async fn send_ack(&mut self) -> std::io::Result<()> {
31 self.send.write_u8(1).await
32 }
33
34 pub fn unpack_transports<S, D>(self) -> (SerStream<S>, DeSink<D>) {
35 let StreamPair { recv, send } = self;
36 create_postcard_streams(recv, send)
37 }
38}
39
40pub fn create_postcard_streams<S, D>(
41 recv: RecvStream,
42 send: SendStream,
43) -> (SerStream<S>, DeSink<D>) {
44 let wrapped_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
45 let wrapped_send = FramedWrite::new(send, LengthDelimitedCodec::new());
46 let ser_stream = tokio_serde::Framed::new(wrapped_recv, PostcardFormat);
47 let de_sink = tokio_serde::Framed::new(wrapped_send, PostcardFormat);
48 (ser_stream, de_sink)
49}