zoe_client/services/blob_store/
multi_relay_blob_service.rs

1use super::BlobStore;
2use crate::services::BlobError;
3use async_trait::async_trait;
4use futures::future::join_all;
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use zoe_client_storage::{BlobStorage, BlobUploadStatus};
9use zoe_wire_protocol::{BlobId, KeyId};
10/// Multi-relay blob service that manages blob uploads across multiple relays
11/// and tracks upload status using persistent storage
12pub struct MultiRelayBlobService<S: BlobStorage> {
13    /// Map of relay ID to blob service for that relay (thread-safe)
14    relay_services: Arc<RwLock<HashMap<KeyId, Arc<dyn BlobStore>>>>,
15    /// Storage for tracking blob upload status
16    storage: Arc<S>,
17}
18
19impl<S: BlobStorage> MultiRelayBlobService<S> {
20    /// Create a new multi-relay blob service
21    pub fn new(storage: Arc<S>) -> Self {
22        Self {
23            relay_services: Arc::new(RwLock::new(HashMap::new())),
24            storage,
25        }
26    }
27    pub fn new_with_relays(
28        storage: Arc<S>,
29        relay_services: impl IntoIterator<Item = (KeyId, Arc<dyn BlobStore>)>,
30    ) -> Self {
31        Self {
32            relay_services: Arc::new(RwLock::new(HashMap::from_iter(relay_services))),
33            storage,
34        }
35    }
36
37    /// Add a relay blob service
38    pub async fn add_relay(&self, relay_id: KeyId, service: Arc<dyn BlobStore>) {
39        let mut services = self.relay_services.write().await;
40        services.insert(relay_id, service);
41        tracing::info!(
42            "Added blob service for relay: {}",
43            hex::encode(relay_id.as_bytes())
44        );
45    }
46
47    /// Remove a relay blob service
48    pub async fn remove_relay(&self, relay_id: &KeyId) -> Option<Arc<dyn BlobStore>> {
49        let mut services = self.relay_services.write().await;
50        let removed = services.remove(relay_id);
51        if removed.is_some() {
52            tracing::info!(
53                "Removed blob service for relay: {}",
54                hex::encode(relay_id.as_bytes())
55            );
56        }
57        removed
58    }
59
60    /// Get all configured relay IDs
61    pub async fn get_relay_ids(&self) -> Vec<KeyId> {
62        let services = self.relay_services.read().await;
63        services.keys().cloned().collect()
64    }
65
66    /// Check if a blob has been uploaded to a specific relay
67    pub async fn is_blob_uploaded_to_relay(
68        &self,
69        blob_hash: &BlobId,
70        relay_id: &KeyId,
71    ) -> Result<bool, BlobError> {
72        self.storage
73            .is_blob_uploaded(blob_hash, relay_id)
74            .await
75            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
76    }
77
78    /// Get upload status for a blob across all relays
79    pub async fn get_blob_upload_status(
80        &self,
81        blob_hash: &BlobId,
82    ) -> Result<Vec<BlobUploadStatus>, BlobError> {
83        self.storage
84            .get_blob_upload_status(blob_hash)
85            .await
86            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
87    }
88
89    /// Upload a blob to a specific relay and track the upload
90    pub async fn upload_blob_to_relay(
91        &self,
92        blob: &[u8],
93        relay_id: &KeyId,
94    ) -> Result<BlobId, BlobError> {
95        let service = {
96            let services = self.relay_services.read().await;
97            services.get(relay_id).cloned().ok_or_else(|| {
98                BlobError::SerializationError(format!(
99                    "No service for relay: {}",
100                    hex::encode(relay_id.as_bytes())
101                ))
102            })?
103        };
104
105        // Upload the blob
106        let blob_hash = service.upload_blob(blob).await?;
107
108        // Mark as uploaded in storage
109        self.storage
110            .mark_blob_uploaded(&blob_hash, relay_id, blob.len() as u64)
111            .await
112            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
113
114        tracing::info!(
115            "Successfully uploaded blob {} to relay {} (size: {} bytes)",
116            blob_hash,
117            hex::encode(relay_id.as_bytes()),
118            blob.len()
119        );
120
121        Ok(blob_hash)
122    }
123
124    async fn upload_blob_to_single_relay_and_mark_as_uploaded(
125        blob: &[u8],
126        service: Arc<dyn BlobStore>,
127        relay_id: &KeyId,
128        storage: Arc<S>,
129    ) -> Result<BlobId, BlobError> {
130        match service.upload_blob(blob).await {
131            Ok(blob_hash) => {
132                // Mark as uploaded in storage
133                match storage
134                    .mark_blob_uploaded(&blob_hash, relay_id, blob.len() as u64)
135                    .await
136                {
137                    Ok(()) => {
138                        tracing::info!(
139                            "Successfully uploaded blob {} to relay {} (size: {} bytes)",
140                            blob_hash,
141                            hex::encode(relay_id.as_bytes()),
142                            blob.len()
143                        );
144                        Ok(blob_hash)
145                    }
146                    Err(e) => {
147                        tracing::error!("Failed to mark blob as uploaded in storage: {}", e);
148                        Err(BlobError::SerializationError(format!("Storage error: {e}")))
149                    }
150                }
151            }
152            Err(e) => {
153                tracing::warn!(
154                    "Failed to upload blob to relay {}: {}",
155                    hex::encode(relay_id.as_bytes()),
156                    e
157                );
158                Err(e)
159            }
160        }
161    }
162
163    /// Upload a blob to all configured relays (in parallel)
164    pub async fn upload_blob_to_all_relays(
165        &self,
166        blob: &[u8],
167    ) -> Result<HashMap<KeyId, Result<BlobId, BlobError>>, BlobError> {
168        // Clone the services to avoid holding the lock during async operations
169        let services = {
170            let services = self.relay_services.read().await;
171            services.clone()
172        };
173        // only one relay, so just upload to it, don't do the memory overhead of parallel uploads
174        if services.len() == 1 {
175            let (relay_id, service) = services.into_iter().next().unwrap();
176            let result = Self::upload_blob_to_single_relay_and_mark_as_uploaded(
177                blob,
178                service,
179                &relay_id,
180                Arc::clone(&self.storage),
181            )
182            .await;
183            return Ok(HashMap::from([(relay_id, result)]));
184        }
185
186        // Share blob data across all parallel uploads to avoid memory duplication
187        let shared_blob = Arc::<[u8]>::from(blob);
188
189        // Create futures for parallel execution
190        let upload_futures = services.into_iter().map(|(relay_id, service)| {
191            let blob = Arc::clone(&shared_blob);
192            let storage = Arc::clone(&self.storage);
193
194            async move {
195                (
196                    relay_id,
197                    Self::upload_blob_to_single_relay_and_mark_as_uploaded(
198                        &blob, service, &relay_id, storage,
199                    )
200                    .await,
201                )
202            }
203        });
204
205        // Execute all uploads in parallel
206        let results = join_all(upload_futures).await;
207
208        // Convert results back to HashMap
209        Ok(results.into_iter().collect())
210    }
211
212    /// Upload a blob to relays where it hasn't been uploaded yet (in parallel)
213    pub async fn upload_blob_to_missing_relays(
214        &self,
215        blob: &[u8],
216    ) -> Result<HashMap<KeyId, Result<BlobId, BlobError>>, BlobError> {
217        // Share blob data across all operations to avoid memory duplication
218        let shared_blob = Arc::<[u8]>::from(blob);
219
220        // Clone the services to avoid holding the lock during async operations
221        let services = {
222            let services = self.relay_services.read().await;
223            services.clone()
224        };
225
226        // First, try to get the blob hash by uploading to the first relay
227        // This is needed to check upload status on other relays
228        let blob_hash = if let Some((first_relay_id, first_service)) = services.iter().next() {
229            match first_service.upload_blob(&shared_blob).await {
230                Ok(hash) => {
231                    // Mark as uploaded in storage
232                    if let Err(e) = self
233                        .storage
234                        .mark_blob_uploaded(&hash, first_relay_id, shared_blob.len() as u64)
235                        .await
236                    {
237                        tracing::error!("Failed to mark blob as uploaded in storage: {}", e);
238                    }
239                    Some(hash)
240                }
241                Err(_) => None,
242            }
243        } else {
244            return Ok(HashMap::new()); // No services configured
245        };
246
247        // Share blob data across all parallel uploads to avoid memory duplication
248        let shared_blob = Arc::<[u8]>::from(blob);
249
250        // Now check all relays in parallel and upload where needed
251        let upload_futures = services.into_iter().map(|(relay_id, service)| {
252            let blob = Arc::clone(&shared_blob);
253            let storage = Arc::clone(&self.storage);
254
255            async move {
256                // If we have a blob hash, check if already uploaded
257                if let Some(hash) = blob_hash {
258                    match storage.is_blob_uploaded(&hash, &relay_id).await {
259                        Ok(true) => {
260                            tracing::debug!(
261                                "Blob {} already uploaded to relay {}",
262                                hash,
263                                hex::encode(relay_id.as_bytes())
264                            );
265                            return (relay_id, Ok(hash));
266                        }
267                        Ok(false) => {
268                            // Need to upload - continue below
269                        }
270                        Err(e) => {
271                            tracing::error!("Storage error checking upload status: {}", e);
272                            return (
273                                relay_id,
274                                Err(BlobError::SerializationError(format!("Storage error: {e}"))),
275                            );
276                        }
277                    }
278                }
279
280                // Upload the blob
281                let result = match service.upload_blob(&blob).await {
282                    Ok(blob_hash) => {
283                        // Mark as uploaded in storage
284                        match storage
285                            .mark_blob_uploaded(&blob_hash, &relay_id, blob.len() as u64)
286                            .await
287                        {
288                            Ok(()) => {
289                                tracing::info!(
290                                    "Successfully uploaded blob {} to relay {} (size: {} bytes)",
291                                    blob_hash,
292                                    hex::encode(relay_id.as_bytes()),
293                                    blob.len()
294                                );
295                                Ok(blob_hash)
296                            }
297                            Err(e) => {
298                                tracing::error!(
299                                    "Failed to mark blob as uploaded in storage: {}",
300                                    e
301                                );
302                                Err(BlobError::SerializationError(format!("Storage error: {e}")))
303                            }
304                        }
305                    }
306                    Err(e) => {
307                        tracing::warn!(
308                            "Failed to upload blob to relay {}: {}",
309                            hex::encode(relay_id.as_bytes()),
310                            e
311                        );
312                        Err(e)
313                    }
314                };
315
316                (relay_id, result)
317            }
318        });
319
320        // Execute all uploads in parallel
321        let results = join_all(upload_futures).await;
322
323        // Convert results back to HashMap
324        Ok(results.into_iter().collect())
325    }
326
327    /// Download a blob from any available relay (tries relays in order)
328    pub async fn download_blob_from_any_relay(
329        &self,
330        blob_hash: &BlobId,
331    ) -> Result<Vec<u8>, BlobError> {
332        let mut last_error = BlobError::NotFound { hash: *blob_hash };
333
334        // Clone the services to avoid holding the lock during async operations
335        let services = {
336            let services = self.relay_services.read().await;
337            services.clone()
338        };
339
340        for (relay_id, service) in services {
341            match service.get_blob(blob_hash).await {
342                Ok(blob_data) => {
343                    tracing::debug!(
344                        "Successfully downloaded blob {} from relay {} (size: {} bytes)",
345                        blob_hash,
346                        hex::encode(relay_id.as_bytes()),
347                        blob_data.len()
348                    );
349                    return Ok(blob_data);
350                }
351                Err(e) => {
352                    tracing::debug!(
353                        "Failed to download blob {} from relay {}: {}",
354                        blob_hash,
355                        hex::encode(relay_id.as_bytes()),
356                        e
357                    );
358                    last_error = e;
359                }
360            }
361        }
362
363        Err(last_error)
364    }
365
366    /// Get statistics about blob uploads for a specific relay
367    pub async fn get_relay_blob_stats(&self, relay_id: &KeyId) -> Result<(u64, u64), BlobError> {
368        let count = self
369            .storage
370            .get_uploaded_blob_count_for_relay(relay_id)
371            .await
372            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
373
374        let size = self
375            .storage
376            .get_uploaded_blob_size_for_relay(relay_id)
377            .await
378            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))?;
379
380        Ok((count, size))
381    }
382
383    /// Remove blob upload records (when blob is deleted)
384    pub async fn remove_blob_records(&self, blob_hash: &BlobId) -> Result<u64, BlobError> {
385        self.storage
386            .remove_blob_upload_record(blob_hash, None)
387            .await
388            .map_err(|e| BlobError::SerializationError(format!("Storage error: {e}")))
389    }
390}
391
392#[async_trait]
393impl<S: BlobStorage> BlobStore for MultiRelayBlobService<S> {
394    /// Download a blob from any available relay
395    async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
396        self.download_blob_from_any_relay(blob_id).await
397    }
398
399    /// Upload a blob to all configured relays
400    async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
401        let results = self.upload_blob_to_all_relays(blob).await?;
402
403        // Return the hash from the first successful upload
404        for (relay_id, result) in results {
405            match result {
406                Ok(hash) => return Ok(hash),
407                Err(e) => {
408                    tracing::warn!(
409                        "Upload to relay {} failed: {}",
410                        hex::encode(relay_id.as_bytes()),
411                        e
412                    );
413                }
414            }
415        }
416
417        Err(BlobError::SerializationError(
418            "Failed to upload to any relay".to_string(),
419        ))
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use std::collections::HashMap;
427    use std::sync::Arc;
428    use tokio::sync::Mutex;
429    use zoe_client_storage::{StorageError, storage::MockBlobStorage};
430    use zoe_wire_protocol::Hash;
431
432    /// Mock blob store implementation for testing
433    #[derive(Clone)]
434    struct MockBlobStoreImpl {
435        blobs: Arc<Mutex<HashMap<String, Vec<u8>>>>,
436        should_fail: Arc<Mutex<bool>>,
437    }
438
439    impl MockBlobStoreImpl {
440        fn new() -> Self {
441            Self {
442                blobs: Arc::new(Mutex::new(HashMap::new())),
443                should_fail: Arc::new(Mutex::new(false)),
444            }
445        }
446
447        async fn set_should_fail(&self, should_fail: bool) {
448            *self.should_fail.lock().await = should_fail;
449        }
450
451        #[allow(dead_code)]
452        async fn get_stored_blobs(&self) -> HashMap<String, Vec<u8>> {
453            self.blobs.lock().await.clone()
454        }
455    }
456
457    #[async_trait]
458    impl BlobStore for MockBlobStoreImpl {
459        async fn get_blob(&self, blob_id: &BlobId) -> Result<Vec<u8>, BlobError> {
460            if *self.should_fail.lock().await {
461                return Err(BlobError::NotFound { hash: *blob_id });
462            }
463
464            let blobs = self.blobs.lock().await;
465            blobs
466                .get(&blob_id.to_hex())
467                .cloned()
468                .ok_or_else(|| BlobError::NotFound { hash: *blob_id })
469        }
470
471        async fn upload_blob(&self, blob: &[u8]) -> Result<BlobId, BlobError> {
472            if *self.should_fail.lock().await {
473                return Err(BlobError::IoError(std::io::Error::new(
474                    std::io::ErrorKind::ConnectionRefused,
475                    "Mock upload failure",
476                )));
477            }
478
479            // Simple hash calculation for testing (not cryptographically secure)
480            let blob_id = BlobId::from_content(blob);
481            let hash_string = blob_id.to_hex();
482
483            let mut blobs = self.blobs.lock().await;
484            blobs.insert(hash_string, blob.to_vec());
485
486            Ok(blob_id)
487        }
488    }
489
490    fn create_test_hash(value: u8) -> Hash {
491        let mut bytes = [0u8; 32];
492        bytes[0] = value;
493        Hash::from(bytes)
494    }
495
496    fn create_test_key_id(value: u8) -> KeyId {
497        KeyId(create_test_hash(value))
498    }
499
500    #[tokio::test]
501    async fn test_multi_relay_blob_service_creation() {
502        let mut mock_storage = MockBlobStorage::new();
503        mock_storage
504            .expect_is_blob_uploaded()
505            .returning(|_, _| Ok(false));
506
507        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
508        assert_eq!(service.get_relay_ids().await.len(), 0);
509    }
510
511    #[tokio::test]
512    async fn test_add_remove_relay() {
513        let mock_storage = MockBlobStorage::new();
514        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
515
516        let relay_id = create_test_key_id(1);
517        let mock_blob_store = MockBlobStoreImpl::new();
518
519        // Add relay
520        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
521        assert_eq!(service.get_relay_ids().await.len(), 1);
522        assert!(service.get_relay_ids().await.contains(&relay_id));
523
524        // Remove relay
525        let removed = service.remove_relay(&relay_id).await;
526        assert!(removed.is_some());
527        assert_eq!(service.get_relay_ids().await.len(), 0);
528    }
529
530    #[tokio::test]
531    async fn test_upload_blob_to_relay() {
532        let mut mock_storage = MockBlobStorage::new();
533        mock_storage
534            .expect_mark_blob_uploaded()
535            .withf(|_hash, _relay_id, size| *size == 11)
536            .times(1)
537            .returning(|_, _, _| Ok(()));
538
539        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
540        let relay_id = create_test_key_id(1);
541        let mock_blob_store = MockBlobStoreImpl::new();
542
543        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
544
545        let test_data = b"hello world";
546        let result = service.upload_blob_to_relay(test_data, &relay_id).await;
547
548        assert!(result.is_ok());
549        let blob_hash = result.unwrap();
550        // BlobId doesn't have starts_with, just verify it's not empty
551        assert!(!blob_hash.as_bytes().is_empty());
552    }
553
554    #[tokio::test]
555    async fn test_upload_blob_to_relay_storage_failure() {
556        let mut mock_storage = MockBlobStorage::new();
557        mock_storage
558            .expect_mark_blob_uploaded()
559            .times(1)
560            .returning(|_, _, _| Err(StorageError::Internal("Storage failure".to_string())));
561
562        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
563        let relay_id = create_test_key_id(1);
564        let mock_blob_store = MockBlobStoreImpl::new();
565
566        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
567
568        let test_data = b"hello world";
569        let result = service.upload_blob_to_relay(test_data, &relay_id).await;
570
571        assert!(result.is_err());
572        match result.unwrap_err() {
573            BlobError::SerializationError(msg) => assert!(msg.contains("Storage error")),
574            _ => panic!("Expected SerializationError"),
575        }
576    }
577
578    #[tokio::test]
579    async fn test_upload_blob_to_all_relays() {
580        let mut mock_storage = MockBlobStorage::new();
581        mock_storage
582            .expect_mark_blob_uploaded()
583            .times(2)
584            .returning(|_, _, _| Ok(()));
585
586        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
587
588        let relay_id1 = create_test_key_id(1);
589        let relay_id2 = create_test_key_id(2);
590        let mock_blob_store1 = MockBlobStoreImpl::new();
591        let mock_blob_store2 = MockBlobStoreImpl::new();
592
593        service
594            .add_relay(relay_id1, Arc::new(mock_blob_store1))
595            .await;
596        service
597            .add_relay(relay_id2, Arc::new(mock_blob_store2))
598            .await;
599
600        let test_data = b"hello world";
601        let results = service.upload_blob_to_all_relays(test_data).await.unwrap();
602
603        assert_eq!(results.len(), 2);
604        assert!(results.get(&relay_id1).unwrap().is_ok());
605        assert!(results.get(&relay_id2).unwrap().is_ok());
606    }
607
608    #[tokio::test]
609    async fn test_upload_blob_to_all_relays_partial_failure() {
610        let mut mock_storage = MockBlobStorage::new();
611        mock_storage
612            .expect_mark_blob_uploaded()
613            .times(1) // Only one successful upload
614            .returning(|_, _, _| Ok(()));
615
616        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
617
618        let relay_id1 = create_test_key_id(1);
619        let relay_id2 = create_test_key_id(2);
620        let mock_blob_store1 = MockBlobStoreImpl::new();
621        let mock_blob_store2 = MockBlobStoreImpl::new();
622
623        // Make the second store fail
624        mock_blob_store2.set_should_fail(true).await;
625
626        service
627            .add_relay(relay_id1, Arc::new(mock_blob_store1))
628            .await;
629        service
630            .add_relay(relay_id2, Arc::new(mock_blob_store2))
631            .await;
632
633        let test_data = b"hello world";
634        let results = service.upload_blob_to_all_relays(test_data).await.unwrap();
635
636        assert_eq!(results.len(), 2);
637        assert!(results.get(&relay_id1).unwrap().is_ok());
638        assert!(results.get(&relay_id2).unwrap().is_err());
639    }
640
641    #[tokio::test]
642    async fn test_upload_blob_to_missing_relays() {
643        let mut mock_storage = MockBlobStorage::new();
644
645        // Expect mark_blob_uploaded for the initial upload
646        mock_storage
647            .expect_mark_blob_uploaded()
648            .times(1)
649            .returning(|_, _, _| Ok(()));
650
651        // Expect is_blob_uploaded to be called for the relay to check if already uploaded
652        mock_storage
653            .expect_is_blob_uploaded()
654            .times(1)
655            .returning(|_, _| Ok(true)); // Return true to indicate already uploaded
656
657        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
658
659        let relay_id = create_test_key_id(1);
660        let mock_blob_store = MockBlobStoreImpl::new();
661
662        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
663
664        let test_data = b"hello world";
665        let results = service
666            .upload_blob_to_missing_relays(test_data)
667            .await
668            .unwrap();
669
670        assert_eq!(results.len(), 1);
671        assert!(results.get(&relay_id).unwrap().is_ok());
672    }
673
674    #[tokio::test]
675    async fn test_download_blob_from_any_relay() {
676        let mock_storage = MockBlobStorage::new();
677        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
678
679        let relay_id = create_test_key_id(1);
680        let mock_blob_store = MockBlobStoreImpl::new();
681
682        // Pre-populate the mock store with test data
683        let test_data = b"hello world";
684        let blob_hash = mock_blob_store.upload_blob(test_data).await.unwrap();
685
686        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
687
688        let result = service.download_blob_from_any_relay(&blob_hash).await;
689
690        assert!(result.is_ok());
691        assert_eq!(result.unwrap(), test_data);
692    }
693
694    #[tokio::test]
695    async fn test_download_blob_not_found() {
696        let mock_storage = MockBlobStorage::new();
697        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
698
699        let relay_id = create_test_key_id(1);
700        let mock_blob_store = MockBlobStoreImpl::new();
701
702        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
703
704        let nonexistent_hash = BlobId::from_content(b"nonexistent");
705        let result = service
706            .download_blob_from_any_relay(&nonexistent_hash)
707            .await;
708
709        assert!(result.is_err());
710        match result.unwrap_err() {
711            BlobError::NotFound { hash } => assert_eq!(hash, nonexistent_hash),
712            _ => panic!("Expected NotFound error"),
713        }
714    }
715
716    #[tokio::test]
717    async fn test_is_blob_uploaded_to_relay() {
718        let mut mock_storage = MockBlobStorage::new();
719        mock_storage
720            .expect_is_blob_uploaded()
721            .withf(|_hash, _| true)
722            .times(1)
723            .returning(|_, _| Ok(true));
724
725        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
726        let relay_id = create_test_key_id(1);
727
728        // Create a proper hex-encoded hash string
729        let test_hash = create_test_hash(2);
730        let _hash_string = hex::encode(test_hash.as_bytes());
731        let blob_id = BlobId::from(test_hash);
732        let result = service.is_blob_uploaded_to_relay(&blob_id, &relay_id).await;
733
734        assert!(result.is_ok());
735        assert!(result.unwrap());
736    }
737
738    #[tokio::test]
739    async fn test_get_blob_upload_status() {
740        let mut mock_storage = MockBlobStorage::new();
741        let relay_id = create_test_key_id(1);
742        let expected_status = vec![BlobUploadStatus {
743            blob_hash: create_test_hash(2),
744            relay_id,
745            uploaded_at: 1234567890,
746            blob_size: 100,
747        }];
748
749        mock_storage
750            .expect_get_blob_upload_status()
751            .withf(|_hash| true)
752            .times(1)
753            .returning(move |_| Ok(expected_status.clone()));
754
755        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
756
757        let test_hash = create_test_hash(2);
758        let blob_id = BlobId::from(test_hash);
759        let result = service.get_blob_upload_status(&blob_id).await;
760
761        assert!(result.is_ok());
762        let statuses = result.unwrap();
763        assert_eq!(statuses.len(), 1);
764        assert_eq!(statuses[0].blob_hash, test_hash);
765        assert_eq!(statuses[0].relay_id, relay_id);
766    }
767
768    #[tokio::test]
769    async fn test_get_relay_blob_stats() {
770        let mut mock_storage = MockBlobStorage::new();
771        let relay_id = create_test_key_id(1);
772
773        mock_storage
774            .expect_get_uploaded_blob_count_for_relay()
775            .times(1)
776            .returning(|_| Ok(5));
777        mock_storage
778            .expect_get_uploaded_blob_size_for_relay()
779            .times(1)
780            .returning(|_| Ok(1024));
781
782        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
783
784        let result = service.get_relay_blob_stats(&relay_id).await;
785
786        assert!(result.is_ok());
787        let (count, size) = result.unwrap();
788        assert_eq!(count, 5);
789        assert_eq!(size, 1024);
790    }
791
792    #[tokio::test]
793    async fn test_remove_blob_records() {
794        let mut mock_storage = MockBlobStorage::new();
795
796        mock_storage
797            .expect_remove_blob_upload_record()
798            .withf(|_hash, relay_id| relay_id.is_none())
799            .times(1)
800            .returning(|_, _| Ok(2));
801
802        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
803
804        // Create a proper hex-encoded hash string
805        let test_hash = create_test_hash(2);
806        let _hash_string = hex::encode(test_hash.as_bytes());
807        let blob_id = BlobId::from(test_hash);
808        let result = service.remove_blob_records(&blob_id).await;
809
810        assert!(result.is_ok());
811        assert_eq!(result.unwrap(), 2);
812    }
813
814    #[tokio::test]
815    async fn test_blob_store_trait_implementation() {
816        let mut mock_storage = MockBlobStorage::new();
817        mock_storage
818            .expect_mark_blob_uploaded()
819            .times(1)
820            .returning(|_, _, _| Ok(()));
821
822        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
823
824        let relay_id = create_test_key_id(1);
825        let mock_blob_store = MockBlobStoreImpl::new();
826
827        service.add_relay(relay_id, Arc::new(mock_blob_store)).await;
828
829        let test_data = b"hello world";
830
831        // Test upload through BlobStore trait
832        let upload_result =
833            <MultiRelayBlobService<_> as BlobStore>::upload_blob(&service, test_data).await;
834        assert!(upload_result.is_ok());
835
836        let blob_hash = upload_result.unwrap();
837
838        // Test download through BlobStore trait
839        let download_result =
840            <MultiRelayBlobService<_> as BlobStore>::get_blob(&service, &blob_hash).await;
841        assert!(download_result.is_ok());
842        assert_eq!(download_result.unwrap(), test_data);
843    }
844
845    #[tokio::test]
846    async fn test_blob_store_trait_upload_failure() {
847        let mock_storage = MockBlobStorage::new();
848        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
849        // No relays added, so upload should fail
850
851        let test_data = b"hello world";
852
853        let result =
854            <MultiRelayBlobService<_> as BlobStore>::upload_blob(&service, test_data).await;
855        assert!(result.is_err());
856        match result.unwrap_err() {
857            BlobError::SerializationError(msg) => {
858                assert!(msg.contains("Failed to upload to any relay"))
859            }
860            _ => panic!("Expected SerializationError"),
861        }
862    }
863
864    #[tokio::test]
865    async fn test_upload_to_nonexistent_relay() {
866        let mock_storage = MockBlobStorage::new();
867        let service = MultiRelayBlobService::new(Arc::new(mock_storage));
868
869        let relay_id = create_test_key_id(1);
870        let test_data = b"hello world";
871
872        let result = service.upload_blob_to_relay(test_data, &relay_id).await;
873        assert!(result.is_err());
874        match result.unwrap_err() {
875            BlobError::SerializationError(msg) => assert!(msg.contains("No service for relay")),
876            _ => panic!("Expected SerializationError"),
877        }
878    }
879}