zoe_relay/services/
blob.rs

1use crate::Service;
2use async_trait::async_trait;
3use futures::StreamExt;
4use tarpc::{
5    serde_transport,
6    server::{BaseChannel, Channel},
7};
8use tokio_util::codec::LengthDelimitedCodec;
9use zoe_blob_store::BlobServiceImpl;
10use zoe_wire_protocol::{BlobError, BlobService as _, PostcardFormat, StreamPair};
11
12#[derive(Debug, thiserror::Error)]
13pub enum BlobServiceError {
14    #[error("Blob error: {0}")]
15    BlobError(BlobError),
16
17    #[error("IO error: {0}")]
18    IoError(std::io::Error),
19
20    #[error("Join error: {0}")]
21    JoinError(tokio::task::JoinError),
22}
23
24pub struct BlobService {
25    streams: StreamPair,
26    service: BlobServiceImpl,
27}
28
29impl BlobService {
30    pub fn new(streams: StreamPair, blob_service: BlobServiceImpl) -> Self {
31        Self {
32            streams,
33            service: blob_service,
34        }
35    }
36}
37
38#[async_trait]
39impl Service for BlobService {
40    type Error = BlobServiceError;
41    async fn run(mut self) -> Result<(), Self::Error> {
42        self.streams
43            .send_ack()
44            .await
45            .map_err(BlobServiceError::IoError)?;
46        let s = self.service.serve();
47
48        let framed = tokio_util::codec::Framed::new(self.streams, LengthDelimitedCodec::new());
49        let transport = serde_transport::new(framed, PostcardFormat);
50        let channel = BaseChannel::with_defaults(transport);
51
52        tokio::spawn(async move {
53            channel
54                .execute(s)
55                .for_each(|response| async move {
56                    tokio::spawn(response);
57                })
58                .await;
59        })
60        .await
61        .map_err(BlobServiceError::JoinError)?;
62        Ok(())
63    }
64}