zoe_wire_protocol/relay/
postcard.rs

1use anyhow::Result;
2use tokio_util::bytes;
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
7
8use tarpc::tokio_serde::{Deserializer, Serializer};
9
10use super::StreamPair;
11use serde::{Deserialize, Serialize};
12
13/// Postcard serialization format for tarpc
14#[derive(Default, Clone, Debug)]
15pub struct PostcardFormat;
16
17impl<Item> Serializer<Item> for PostcardFormat
18where
19    Item: Serialize,
20{
21    type Error = std::io::Error;
22
23    fn serialize(self: Pin<&mut Self>, item: &Item) -> Result<bytes::Bytes, Self::Error> {
24        let serialized = postcard::to_allocvec(item)
25            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
26        Ok(bytes::Bytes::from(serialized))
27    }
28}
29
30impl<Item> Deserializer<Item> for PostcardFormat
31where
32    Item: for<'de> Deserialize<'de>,
33{
34    type Error = std::io::Error;
35
36    fn deserialize(self: Pin<&mut Self>, src: &bytes::BytesMut) -> Result<Item, Self::Error> {
37        postcard::from_bytes(src)
38            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
39    }
40}
41
42impl AsyncRead for StreamPair {
43    fn poll_read(
44        mut self: Pin<&mut Self>,
45        cx: &mut Context<'_>,
46        buf: &mut ReadBuf<'_>,
47    ) -> Poll<std::io::Result<()>> {
48        Pin::new(&mut self.recv).poll_read(cx, buf)
49    }
50}
51
52impl AsyncWrite for StreamPair {
53    fn poll_write(
54        mut self: Pin<&mut Self>,
55        cx: &mut Context<'_>,
56        buf: &[u8],
57    ) -> Poll<Result<usize, std::io::Error>> {
58        match Pin::new(&mut self.send).poll_write(cx, buf) {
59            Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)),
60            Poll::Ready(Err(e)) => Poll::Ready(Err(std::io::Error::other(e))),
61            Poll::Pending => Poll::Pending,
62        }
63    }
64
65    fn poll_flush(
66        mut self: Pin<&mut Self>,
67        cx: &mut Context<'_>,
68    ) -> Poll<Result<(), std::io::Error>> {
69        match Pin::new(&mut self.send).poll_flush(cx) {
70            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
71            Poll::Ready(Err(e)) => Poll::Ready(Err(std::io::Error::other(e))),
72            Poll::Pending => Poll::Pending,
73        }
74    }
75
76    fn poll_shutdown(
77        mut self: Pin<&mut Self>,
78        cx: &mut Context<'_>,
79    ) -> Poll<Result<(), std::io::Error>> {
80        match Pin::new(&mut self.send).poll_shutdown(cx) {
81            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82            Poll::Ready(Err(e)) => Poll::Ready(Err(std::io::Error::other(e))),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86}