zoe_client/services/blob_store/
blob_service.rs1use crate::error::{ClientError, Result as ClientResult};
2use async_trait::async_trait;
3use quinn::Connection;
4use tarpc::{context, serde_transport};
5use tokio::io::{AsyncReadExt, AsyncWriteExt};
6use tokio_util::codec::LengthDelimitedCodec;
7use zoe_wire_protocol::{BlobId, BlobServiceClient, PostcardFormat, StreamPair, ZoeServices};
8
9use super::{BlobError, BlobStore};
10
11#[derive(Clone)]
12pub struct BlobService {
13 client: BlobServiceClient,
14}
15
16impl BlobService {
17 pub async fn connect(connection: &Connection) -> ClientResult<Self> {
18 let (mut send, mut recv) = connection.open_bi().await?;
19 send.write_u8(ZoeServices::Blob as u8).await?;
20 let service_ok = recv.read_u8().await?;
21 if service_ok != 1 {
22 return Err(ClientError::Generic(
23 "Service ID not acknowledged".to_string(),
24 ));
25 }
26
27 let streams = StreamPair::new(recv, send);
28
29 let framed = tokio_util::codec::Framed::new(streams, LengthDelimitedCodec::new());
30 let transport = serde_transport::new(framed, PostcardFormat);
31 let client = BlobServiceClient::new(Default::default(), transport).spawn();
32 Ok(Self { client })
33 }
34}
35
36#[async_trait]
37impl BlobStore for BlobService {
38 async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
39 let Some(blob) = self
40 .client
41 .download(context::current(), *blob_id)
42 .await
43 .map_err(BlobError::RpcError)?
44 .map_err(BlobError::WireError)?
45 else {
46 return Err(BlobError::NotFound { hash: *blob_id });
47 };
48 Ok(blob)
49 }
50
51 async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
52 let hash = self
53 .client
54 .upload(context::current(), blob.to_vec())
55 .await
56 .map_err(BlobError::RpcError)?
57 .map_err(BlobError::WireError)?;
58 Ok(hash)
59 }
60}
61
62impl BlobService {
63 pub async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
65 <Self as BlobStore>::get_blob(self, blob_id).await
66 }
67
68 pub async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
70 <Self as BlobStore>::upload_blob(self, blob).await
71 }
72}