zoe_relay/services/
blob.rs1use crate::Service;
2use async_trait::async_trait;
3use futures::StreamExt;
4use tarpc::{
5 serde_transport,
6 server::{BaseChannel, Channel},
7};
8use tokio_util::codec::LengthDelimitedCodec;
9use zoe_blob_store::BlobServiceImpl;
10use zoe_wire_protocol::{BlobError, BlobService as _, PostcardFormat, StreamPair};
11
12#[derive(Debug, thiserror::Error)]
13pub enum BlobServiceError {
14 #[error("Blob error: {0}")]
15 BlobError(BlobError),
16
17 #[error("IO error: {0}")]
18 IoError(std::io::Error),
19
20 #[error("Join error: {0}")]
21 JoinError(tokio::task::JoinError),
22}
23
24pub struct BlobService {
25 streams: StreamPair,
26 service: BlobServiceImpl,
27}
28
29impl BlobService {
30 pub fn new(streams: StreamPair, blob_service: BlobServiceImpl) -> Self {
31 Self {
32 streams,
33 service: blob_service,
34 }
35 }
36}
37
38#[async_trait]
39impl Service for BlobService {
40 type Error = BlobServiceError;
41 async fn run(mut self) -> Result<(), Self::Error> {
42 self.streams
43 .send_ack()
44 .await
45 .map_err(BlobServiceError::IoError)?;
46 let s = self.service.serve();
47
48 let framed = tokio_util::codec::Framed::new(self.streams, LengthDelimitedCodec::new());
49 let transport = serde_transport::new(framed, PostcardFormat);
50 let channel = BaseChannel::with_defaults(transport);
51
52 tokio::spawn(async move {
53 channel
54 .execute(s)
55 .for_each(|response| async move {
56 tokio::spawn(response);
57 })
58 .await;
59 })
60 .await
61 .map_err(BlobServiceError::JoinError)?;
62 Ok(())
63 }
64}