1use super::BlobStore;
2use crate::services::BlobError;
3use async_trait::async_trait;
4use futures::future::join_all;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use zoe_client_storage::{BlobStorage, BlobUploadStatus};
9use zoe_wire_protocol::{BlobId, KeyId};
10pub struct MultiRelayBlobService<S: BlobStorage> {
13 relay_services: Arc<RwLock<HashMap<KeyId, Arc<dyn BlobStore>>>>,
15 storage: Arc<S>,
17}
18
19impl<S: BlobStorage> MultiRelayBlobService<S> {
20 pub fn new(storage: Arc<S>) -> Self {
22 Self {
23 relay_services: Arc::new(RwLock::new(HashMap::new())),
24 storage,
25 }
26 }
27 pub fn new_with_relays(
28 storage: Arc<S>,
29 relay_services: impl IntoIterator<Item = (KeyId, Arc<dyn BlobStore>)>,
30 ) -> Self {
31 Self {
32 relay_services: Arc::new(RwLock::new(HashMap::from_iter(relay_services))),
33 storage,
34 }
35 }
36
37 pub async fn add_relay(&self, relay_id: KeyId, service: Arc<dyn BlobStore>) {
39 let mut services = self.relay_services.write().await;
40 services.insert(relay_id, service);
41 tracing::info!(
42 "Added blob service for relay: {}",
43 hex::encode(relay_id.as_bytes())
44 );
45 }
46
47 pub async fn remove_relay(&self, relay_id: &KeyId) -> Option<Arc<dyn BlobStore>> {
49 let mut services = self.relay_services.write().await;
50 let removed = services.remove(relay_id);
51 if removed.is_some() {
52 tracing::info!(
53 "Removed blob service for relay: {}",
54 hex::encode(relay_id.as_bytes())
55 );
56 }
57 removed
58 }
59
60 pub async fn get_relay_ids(&self) -> Vec<KeyId> {
62 let services = self.relay_services.read().await;
63 services.keys().cloned().collect()
64 }
65
66 pub async fn is_blob_uploaded_to_relay(
68 &self,
69 blob_hash: &BlobId,
70 relay_id: &KeyId,
71 ) -> Result<bool, BlobError> {
72 self.storage
73 .is_blob_uploaded(blob_hash, relay_id)
74 .await
75 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
76 }
77
78 pub async fn get_blob_upload_status(
80 &self,
81 blob_hash: &BlobId,
82 ) -> Result<Vec<BlobUploadStatus>, BlobError> {
83 self.storage
84 .get_blob_upload_status(blob_hash)
85 .await
86 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
87 }
88
89 pub async fn upload_blob_to_relay(
91 &self,
92 blob: &[u8],
93 relay_id: &KeyId,
94 ) -> Result<BlobId, BlobError> {
95 let service = {
96 let services = self.relay_services.read().await;
97 services.get(relay_id).cloned().ok_or_else(|| {
98 BlobError::SerializationError(format!(
99 "No service for relay: {}",
100 hex::encode(relay_id.as_bytes())
101 ))
102 })?
103 };
104
105 let blob_hash = service.upload_blob(blob).await?;
107
108 self.storage
110 .mark_blob_uploaded(&blob_hash, relay_id, blob.len() as u64)
111 .await
112 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
113
114 tracing::info!(
115 "Successfully uploaded blob {} to relay {} (size: {} bytes)",
116 blob_hash,
117 hex::encode(relay_id.as_bytes()),
118 blob.len()
119 );
120
121 Ok(blob_hash)
122 }
123
124 async fn upload_blob_to_single_relay_and_mark_as_uploaded(
125 blob: &[u8],
126 service: Arc<dyn BlobStore>,
127 relay_id: &KeyId,
128 storage: Arc<S>,
129 ) -> Result<BlobId, BlobError> {
130 match service.upload_blob(blob).await {
131 Ok(blob_hash) => {
132 match storage
134 .mark_blob_uploaded(&blob_hash, relay_id, blob.len() as u64)
135 .await
136 {
137 Ok(()) => {
138 tracing::info!(
139 "Successfully uploaded blob {} to relay {} (size: {} bytes)",
140 blob_hash,
141 hex::encode(relay_id.as_bytes()),
142 blob.len()
143 );
144 Ok(blob_hash)
145 }
146 Err(e) => {
147 tracing::error!("Failed to mark blob as uploaded in storage: {}", e);
148 Err(BlobError::SerializationError(format!("Storage error: {e}")))
149 }
150 }
151 }
152 Err(e) => {
153 tracing::warn!(
154 "Failed to upload blob to relay {}: {}",
155 hex::encode(relay_id.as_bytes()),
156 e
157 );
158 Err(e)
159 }
160 }
161 }
162
163 pub async fn upload_blob_to_all_relays(
165 &self,
166 blob: &[u8],
167 ) -> Result<HashMap<KeyId, Result<BlobId, BlobError>>, BlobError> {
168 let services = {
170 let services = self.relay_services.read().await;
171 services.clone()
172 };
173 if services.len() == 1 {
175 let (relay_id, service) = services.into_iter().next().unwrap();
176 let result = Self::upload_blob_to_single_relay_and_mark_as_uploaded(
177 blob,
178 service,
179 &relay_id,
180 Arc::clone(&self.storage),
181 )
182 .await;
183 return Ok(HashMap::from([(relay_id, result)]));
184 }
185
186 let shared_blob = Arc::<[u8]>::from(blob);
188
189 let upload_futures = services.into_iter().map(|(relay_id, service)| {
191 let blob = Arc::clone(&shared_blob);
192 let storage = Arc::clone(&self.storage);
193
194 async move {
195 (
196 relay_id,
197 Self::upload_blob_to_single_relay_and_mark_as_uploaded(
198 &blob, service, &relay_id, storage,
199 )
200 .await,
201 )
202 }
203 });
204
205 let results = join_all(upload_futures).await;
207
208 Ok(results.into_iter().collect())
210 }
211
212 pub async fn upload_blob_to_missing_relays(
214 &self,
215 blob: &[u8],
216 ) -> Result<HashMap<KeyId, Result<BlobId, BlobError>>, BlobError> {
217 let shared_blob = Arc::<[u8]>::from(blob);
219
220 let services = {
222 let services = self.relay_services.read().await;
223 services.clone()
224 };
225
226 let blob_hash = if let Some((first_relay_id, first_service)) = services.iter().next() {
229 match first_service.upload_blob(&shared_blob).await {
230 Ok(hash) => {
231 if let Err(e) = self
233 .storage
234 .mark_blob_uploaded(&hash, first_relay_id, shared_blob.len() as u64)
235 .await
236 {
237 tracing::error!("Failed to mark blob as uploaded in storage: {}", e);
238 }
239 Some(hash)
240 }
241 Err(_) => None,
242 }
243 } else {
244 return Ok(HashMap::new()); };
246
247 let shared_blob = Arc::<[u8]>::from(blob);
249
250 let upload_futures = services.into_iter().map(|(relay_id, service)| {
252 let blob = Arc::clone(&shared_blob);
253 let storage = Arc::clone(&self.storage);
254
255 async move {
256 if let Some(hash) = blob_hash {
258 match storage.is_blob_uploaded(&hash, &relay_id).await {
259 Ok(true) => {
260 tracing::debug!(
261 "Blob {} already uploaded to relay {}",
262 hash,
263 hex::encode(relay_id.as_bytes())
264 );
265 return (relay_id, Ok(hash));
266 }
267 Ok(false) => {
268 }
270 Err(e) => {
271 tracing::error!("Storage error checking upload status: {}", e);
272 return (
273 relay_id,
274 Err(BlobError::SerializationError(format!("Storage error: {e}"))),
275 );
276 }
277 }
278 }
279
280 let result = match service.upload_blob(&blob).await {
282 Ok(blob_hash) => {
283 match storage
285 .mark_blob_uploaded(&blob_hash, &relay_id, blob.len() as u64)
286 .await
287 {
288 Ok(()) => {
289 tracing::info!(
290 "Successfully uploaded blob {} to relay {} (size: {} bytes)",
291 blob_hash,
292 hex::encode(relay_id.as_bytes()),
293 blob.len()
294 );
295 Ok(blob_hash)
296 }
297 Err(e) => {
298 tracing::error!(
299 "Failed to mark blob as uploaded in storage: {}",
300 e
301 );
302 Err(BlobError::SerializationError(format!("Storage error: {e}")))
303 }
304 }
305 }
306 Err(e) => {
307 tracing::warn!(
308 "Failed to upload blob to relay {}: {}",
309 hex::encode(relay_id.as_bytes()),
310 e
311 );
312 Err(e)
313 }
314 };
315
316 (relay_id, result)
317 }
318 });
319
320 let results = join_all(upload_futures).await;
322
323 Ok(results.into_iter().collect())
325 }
326
327 pub async fn download_blob_from_any_relay(
329 &self,
330 blob_hash: &BlobId,
331 ) -> Result<Vec<u8>, BlobError> {
332 let mut last_error = BlobError::NotFound { hash: *blob_hash };
333
334 let services = {
336 let services = self.relay_services.read().await;
337 services.clone()
338 };
339
340 for (relay_id, service) in services {
341 match service.get_blob(blob_hash).await {
342 Ok(blob_data) => {
343 tracing::debug!(
344 "Successfully downloaded blob {} from relay {} (size: {} bytes)",
345 blob_hash,
346 hex::encode(relay_id.as_bytes()),
347 blob_data.len()
348 );
349 return Ok(blob_data);
350 }
351 Err(e) => {
352 tracing::debug!(
353 "Failed to download blob {} from relay {}: {}",
354 blob_hash,
355 hex::encode(relay_id.as_bytes()),
356 e
357 );
358 last_error = e;
359 }
360 }
361 }
362
363 Err(last_error)
364 }
365
366 pub async fn get_relay_blob_stats(&self, relay_id: &KeyId) -> Result<(u64, u64), BlobError> {
368 let count = self
369 .storage
370 .get_uploaded_blob_count_for_relay(relay_id)
371 .await
372 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
373
374 let size = self
375 .storage
376 .get_uploaded_blob_size_for_relay(relay_id)
377 .await
378 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
379
380 Ok((count, size))
381 }
382
383 pub async fn remove_blob_records(&self, blob_hash: &BlobId) -> Result<u64, BlobError> {
385 self.storage
386 .remove_blob_upload_record(blob_hash, None)
387 .await
388 .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
389 }
390}
391
392#[async_trait]
393impl<S: BlobStorage> BlobStore for MultiRelayBlobService<S> {
394 async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
396 self.download_blob_from_any_relay(blob_id).await
397 }
398
399 async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
401 let results = self.upload_blob_to_all_relays(blob).await?;
402
403 for (relay_id, result) in results {
405 match result {
406 Ok(hash) => return Ok(hash),
407 Err(e) => {
408 tracing::warn!(
409 "Upload to relay {} failed: {}",
410 hex::encode(relay_id.as_bytes()),
411 e
412 );
413 }
414 }
415 }
416
417 Err(BlobError::SerializationError(
418 "Failed to upload to any relay".to_string(),
419 ))
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use std::collections::HashMap;
427 use std::sync::Arc;
428 use tokio::sync::Mutex;
429 use zoe_client_storage::{StorageError, storage::MockBlobStorage};
430 use zoe_wire_protocol::Hash;
431
432 #[derive(Clone)]
434 struct MockBlobStoreImpl {
435 blobs: Arc<Mutex<HashMap<String, Vec<u8>>>>,
436 should_fail: Arc<Mutex<bool>>,
437 }
438
439 impl MockBlobStoreImpl {
440 fn new() -> Self {
441 Self {
442 blobs: Arc::new(Mutex::new(HashMap::new())),
443 should_fail: Arc::new(Mutex::new(false)),
444 }
445 }
446
447 async fn set_should_fail(&self, should_fail: bool) {
448 *self.should_fail.lock().await = should_fail;
449 }
450
451 #[allow(dead_code)]
452 async fn get_stored_blobs(&self) -> HashMap<String, Vec<u8>> {
453 self.blobs.lock().await.clone()
454 }
455 }
456
457 #[async_trait]
458 impl BlobStore for MockBlobStoreImpl {
459 async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
460 if *self.should_fail.lock().await {
461 return Err(BlobError::NotFound { hash: *blob_id });
462 }
463
464 let blobs = self.blobs.lock().await;
465 blobs
466 .get(&blob_id.to_hex())
467 .cloned()
468 .ok_or_else(|| BlobError::NotFound { hash: *blob_id })
469 }
470
471 async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
472 if *self.should_fail.lock().await {
473 return Err(BlobError::IoError(std::io::Error::new(
474 std::io::ErrorKind::ConnectionRefused,
475 "Mock upload failure",
476 )));
477 }
478
479 let blob_id = BlobId::from_content(blob);
481 let hash_string = blob_id.to_hex();
482
483 let mut blobs = self.blobs.lock().await;
484 blobs.insert(hash_string, blob.to_vec());
485
486 Ok(blob_id)
487 }
488 }
489
490 fn create_test_hash(value: u8) -> Hash {
491 let mut bytes = [0u8; 32];
492 bytes[0] = value;
493 Hash::from(bytes)
494 }
495
496 fn create_test_key_id(value: u8) -> KeyId {
497 KeyId(create_test_hash(value))
498 }
499
500 #[tokio::test]
501 async fn test_multi_relay_blob_service_creation() {
502 let mut mock_storage = MockBlobStorage::new();
503 mock_storage
504 .expect_is_blob_uploaded()
505 .returning(|_, _| Ok(false));
506
507 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
508 assert_eq!(service.get_relay_ids().await.len(), 0);
509 }
510
511 #[tokio::test]
512 async fn test_add_remove_relay() {
513 let mock_storage = MockBlobStorage::new();
514 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
515
516 let relay_id = create_test_key_id(1);
517 let mock_blob_store = MockBlobStoreImpl::new();
518
519 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
521 assert_eq!(service.get_relay_ids().await.len(), 1);
522 assert!(service.get_relay_ids().await.contains(&relay_id));
523
524 let removed = service.remove_relay(&relay_id).await;
526 assert!(removed.is_some());
527 assert_eq!(service.get_relay_ids().await.len(), 0);
528 }
529
530 #[tokio::test]
531 async fn test_upload_blob_to_relay() {
532 let mut mock_storage = MockBlobStorage::new();
533 mock_storage
534 .expect_mark_blob_uploaded()
535 .withf(|_hash, _relay_id, size| *size == 11)
536 .times(1)
537 .returning(|_, _, _| Ok(()));
538
539 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
540 let relay_id = create_test_key_id(1);
541 let mock_blob_store = MockBlobStoreImpl::new();
542
543 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
544
545 let test_data = b"hello world";
546 let result = service.upload_blob_to_relay(test_data, &relay_id).await;
547
548 assert!(result.is_ok());
549 let blob_hash = result.unwrap();
550 assert!(!blob_hash.as_bytes().is_empty());
552 }
553
554 #[tokio::test]
555 async fn test_upload_blob_to_relay_storage_failure() {
556 let mut mock_storage = MockBlobStorage::new();
557 mock_storage
558 .expect_mark_blob_uploaded()
559 .times(1)
560 .returning(|_, _, _| Err(StorageError::Internal("Storage failure".to_string())));
561
562 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
563 let relay_id = create_test_key_id(1);
564 let mock_blob_store = MockBlobStoreImpl::new();
565
566 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
567
568 let test_data = b"hello world";
569 let result = service.upload_blob_to_relay(test_data, &relay_id).await;
570
571 assert!(result.is_err());
572 match result.unwrap_err() {
573 BlobError::SerializationError(msg) => assert!(msg.contains("Storage error")),
574 _ => panic!("Expected SerializationError"),
575 }
576 }
577
578 #[tokio::test]
579 async fn test_upload_blob_to_all_relays() {
580 let mut mock_storage = MockBlobStorage::new();
581 mock_storage
582 .expect_mark_blob_uploaded()
583 .times(2)
584 .returning(|_, _, _| Ok(()));
585
586 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
587
588 let relay_id1 = create_test_key_id(1);
589 let relay_id2 = create_test_key_id(2);
590 let mock_blob_store1 = MockBlobStoreImpl::new();
591 let mock_blob_store2 = MockBlobStoreImpl::new();
592
593 service
594 .add_relay(relay_id1, Arc::new(mock_blob_store1))
595 .await;
596 service
597 .add_relay(relay_id2, Arc::new(mock_blob_store2))
598 .await;
599
600 let test_data = b"hello world";
601 let results = service.upload_blob_to_all_relays(test_data).await.unwrap();
602
603 assert_eq!(results.len(), 2);
604 assert!(results.get(&relay_id1).unwrap().is_ok());
605 assert!(results.get(&relay_id2).unwrap().is_ok());
606 }
607
608 #[tokio::test]
609 async fn test_upload_blob_to_all_relays_partial_failure() {
610 let mut mock_storage = MockBlobStorage::new();
611 mock_storage
612 .expect_mark_blob_uploaded()
613 .times(1) .returning(|_, _, _| Ok(()));
615
616 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
617
618 let relay_id1 = create_test_key_id(1);
619 let relay_id2 = create_test_key_id(2);
620 let mock_blob_store1 = MockBlobStoreImpl::new();
621 let mock_blob_store2 = MockBlobStoreImpl::new();
622
623 mock_blob_store2.set_should_fail(true).await;
625
626 service
627 .add_relay(relay_id1, Arc::new(mock_blob_store1))
628 .await;
629 service
630 .add_relay(relay_id2, Arc::new(mock_blob_store2))
631 .await;
632
633 let test_data = b"hello world";
634 let results = service.upload_blob_to_all_relays(test_data).await.unwrap();
635
636 assert_eq!(results.len(), 2);
637 assert!(results.get(&relay_id1).unwrap().is_ok());
638 assert!(results.get(&relay_id2).unwrap().is_err());
639 }
640
641 #[tokio::test]
642 async fn test_upload_blob_to_missing_relays() {
643 let mut mock_storage = MockBlobStorage::new();
644
645 mock_storage
647 .expect_mark_blob_uploaded()
648 .times(1)
649 .returning(|_, _, _| Ok(()));
650
651 mock_storage
653 .expect_is_blob_uploaded()
654 .times(1)
655 .returning(|_, _| Ok(true)); let service = MultiRelayBlobService::new(Arc::new(mock_storage));
658
659 let relay_id = create_test_key_id(1);
660 let mock_blob_store = MockBlobStoreImpl::new();
661
662 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
663
664 let test_data = b"hello world";
665 let results = service
666 .upload_blob_to_missing_relays(test_data)
667 .await
668 .unwrap();
669
670 assert_eq!(results.len(), 1);
671 assert!(results.get(&relay_id).unwrap().is_ok());
672 }
673
674 #[tokio::test]
675 async fn test_download_blob_from_any_relay() {
676 let mock_storage = MockBlobStorage::new();
677 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
678
679 let relay_id = create_test_key_id(1);
680 let mock_blob_store = MockBlobStoreImpl::new();
681
682 let test_data = b"hello world";
684 let blob_hash = mock_blob_store.upload_blob(test_data).await.unwrap();
685
686 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
687
688 let result = service.download_blob_from_any_relay(&blob_hash).await;
689
690 assert!(result.is_ok());
691 assert_eq!(result.unwrap(), test_data);
692 }
693
694 #[tokio::test]
695 async fn test_download_blob_not_found() {
696 let mock_storage = MockBlobStorage::new();
697 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
698
699 let relay_id = create_test_key_id(1);
700 let mock_blob_store = MockBlobStoreImpl::new();
701
702 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
703
704 let nonexistent_hash = BlobId::from_content(b"nonexistent");
705 let result = service
706 .download_blob_from_any_relay(&nonexistent_hash)
707 .await;
708
709 assert!(result.is_err());
710 match result.unwrap_err() {
711 BlobError::NotFound { hash } => assert_eq!(hash, nonexistent_hash),
712 _ => panic!("Expected NotFound error"),
713 }
714 }
715
716 #[tokio::test]
717 async fn test_is_blob_uploaded_to_relay() {
718 let mut mock_storage = MockBlobStorage::new();
719 mock_storage
720 .expect_is_blob_uploaded()
721 .withf(|_hash, _| true)
722 .times(1)
723 .returning(|_, _| Ok(true));
724
725 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
726 let relay_id = create_test_key_id(1);
727
728 let test_hash = create_test_hash(2);
730 let _hash_string = hex::encode(test_hash.as_bytes());
731 let blob_id = BlobId::from(test_hash);
732 let result = service.is_blob_uploaded_to_relay(&blob_id, &relay_id).await;
733
734 assert!(result.is_ok());
735 assert!(result.unwrap());
736 }
737
738 #[tokio::test]
739 async fn test_get_blob_upload_status() {
740 let mut mock_storage = MockBlobStorage::new();
741 let relay_id = create_test_key_id(1);
742 let expected_status = vec![BlobUploadStatus {
743 blob_hash: create_test_hash(2),
744 relay_id,
745 uploaded_at: 1234567890,
746 blob_size: 100,
747 }];
748
749 mock_storage
750 .expect_get_blob_upload_status()
751 .withf(|_hash| true)
752 .times(1)
753 .returning(move |_| Ok(expected_status.clone()));
754
755 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
756
757 let test_hash = create_test_hash(2);
758 let blob_id = BlobId::from(test_hash);
759 let result = service.get_blob_upload_status(&blob_id).await;
760
761 assert!(result.is_ok());
762 let statuses = result.unwrap();
763 assert_eq!(statuses.len(), 1);
764 assert_eq!(statuses[0].blob_hash, test_hash);
765 assert_eq!(statuses[0].relay_id, relay_id);
766 }
767
768 #[tokio::test]
769 async fn test_get_relay_blob_stats() {
770 let mut mock_storage = MockBlobStorage::new();
771 let relay_id = create_test_key_id(1);
772
773 mock_storage
774 .expect_get_uploaded_blob_count_for_relay()
775 .times(1)
776 .returning(|_| Ok(5));
777 mock_storage
778 .expect_get_uploaded_blob_size_for_relay()
779 .times(1)
780 .returning(|_| Ok(1024));
781
782 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
783
784 let result = service.get_relay_blob_stats(&relay_id).await;
785
786 assert!(result.is_ok());
787 let (count, size) = result.unwrap();
788 assert_eq!(count, 5);
789 assert_eq!(size, 1024);
790 }
791
792 #[tokio::test]
793 async fn test_remove_blob_records() {
794 let mut mock_storage = MockBlobStorage::new();
795
796 mock_storage
797 .expect_remove_blob_upload_record()
798 .withf(|_hash, relay_id| relay_id.is_none())
799 .times(1)
800 .returning(|_, _| Ok(2));
801
802 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
803
804 let test_hash = create_test_hash(2);
806 let _hash_string = hex::encode(test_hash.as_bytes());
807 let blob_id = BlobId::from(test_hash);
808 let result = service.remove_blob_records(&blob_id).await;
809
810 assert!(result.is_ok());
811 assert_eq!(result.unwrap(), 2);
812 }
813
814 #[tokio::test]
815 async fn test_blob_store_trait_implementation() {
816 let mut mock_storage = MockBlobStorage::new();
817 mock_storage
818 .expect_mark_blob_uploaded()
819 .times(1)
820 .returning(|_, _, _| Ok(()));
821
822 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
823
824 let relay_id = create_test_key_id(1);
825 let mock_blob_store = MockBlobStoreImpl::new();
826
827 service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
828
829 let test_data = b"hello world";
830
831 let upload_result =
833 <MultiRelayBlobService<_> as BlobStore>::upload_blob(&service, test_data).await;
834 assert!(upload_result.is_ok());
835
836 let blob_hash = upload_result.unwrap();
837
838 let download_result =
840 <MultiRelayBlobService<_> as BlobStore>::get_blob(&service, &blob_hash).await;
841 assert!(download_result.is_ok());
842 assert_eq!(download_result.unwrap(), test_data);
843 }
844
845 #[tokio::test]
846 async fn test_blob_store_trait_upload_failure() {
847 let mock_storage = MockBlobStorage::new();
848 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
849 let test_data = b"hello world";
852
853 let result =
854 <MultiRelayBlobService<_> as BlobStore>::upload_blob(&service, test_data).await;
855 assert!(result.is_err());
856 match result.unwrap_err() {
857 BlobError::SerializationError(msg) => {
858 assert!(msg.contains("Failed to upload to any relay"))
859 }
860 _ => panic!("Expected SerializationError"),
861 }
862 }
863
864 #[tokio::test]
865 async fn test_upload_to_nonexistent_relay() {
866 let mock_storage = MockBlobStorage::new();
867 let service = MultiRelayBlobService::new(Arc::new(mock_storage));
868
869 let relay_id = create_test_key_id(1);
870 let test_data = b"hello world";
871
872 let result = service.upload_blob_to_relay(test_data, &relay_id).await;
873 assert!(result.is_err());
874 match result.unwrap_err() {
875 BlobError::SerializationError(msg) => assert!(msg.contains("No service for relay")),
876 _ => panic!("Expected SerializationError"),
877 }
878 }
879}