zoe_client/services/blob_store/
blob_service.rs

1use crate::error::{ClientError, Result as ClientResult};
2use async_trait::async_trait;
3use quinn::Connection;
4use tarpc::{context, serde_transport};
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio_util::codec::LengthDelimitedCodec;
7use zoe_wire_protocol::{BlobId, BlobServiceClient, PostcardFormat, StreamPair, ZoeServices};
8
9use super::{BlobError, BlobStore};
10
11#[derive(Clone)]
12pub struct BlobService {
13    client: BlobServiceClient,
14}
15
16impl BlobService {
17    pub async fn connect(connection: &Connection) -> ClientResult<Self> {
18        let (mut send, mut recv) = connection.open_bi().await?;
19        send.write_u8(ZoeServices::Blob as u8).await?;
20        let service_ok = recv.read_u8().await?;
21        if service_ok != 1 {
22            return Err(ClientError::Generic(
23                "Service ID not acknowledged".to_string(),
24            ));
25        }
26
27        let streams = StreamPair::new(recv, send);
28
29        let framed = tokio_util::codec::Framed::new(streams, LengthDelimitedCodec::new());
30        let transport = serde_transport::new(framed, PostcardFormat);
31        let client = BlobServiceClient::new(Default::default(), transport).spawn();
32        Ok(Self { client })
33    }
34}
35
36#[async_trait]
37impl BlobStore for BlobService {
38    async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
39        let Some(blob) = self
40            .client
41            .download(context::current(), *blob_id)
42            .await
43            .map_err(BlobError::RpcError)?
44            .map_err(BlobError::WireError)?
45        else {
46            return Err(BlobError::NotFound { hash: *blob_id });
47        };
48        Ok(blob)
49    }
50
51    async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
52        let hash = self
53            .client
54            .upload(context::current(), blob.to_vec())
55            .await
56            .map_err(BlobError::RpcError)?
57            .map_err(BlobError::WireError)?;
58        Ok(hash)
59    }
60}
61
62impl BlobService {
63    /// Get a blob by its ID (convenience method)
64    pub async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
65        <Self as BlobStore>::get_blob(self, blob_id).await
66    }
67
68    /// Upload a blob and return its hash (convenience method)
69    pub async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
70        <Self as BlobStore>::upload_blob(self, blob).await
71    }
72}