1use crate::error::{ClientError, Result};
31use crate::services::BlobStore;
32#[cfg(any(feature = "mock", test))]
33use crate::services::MockBlobStore;
34use hex;
35use std::path::Path;
36use std::sync::Arc;
37use tokio::fs;
38use tracing::{debug, info, warn};
39use zoe_app_primitives::FileRef;
40use zoe_blob_store::BlobClient;
41use zoe_encrypted_storage::{CompressionConfig, ConvergentEncryption};
42use zoe_wire_protocol::BlobId;
43
44#[derive(Clone)]
48pub struct FileStorage<B: BlobStore> {
49 blob_client: BlobClient,
50 remote_blob_service: Arc<B>,
51 compression_config: CompressionConfig,
52}
53
54impl<B: BlobStore> FileStorage<B> {
55 pub async fn new(
57 blob_storage_path: &Path,
58 remote_blob_service: Arc<B>,
59 compression_config: CompressionConfig,
60 ) -> Result<Self> {
61 let blob_client = BlobClient::new(blob_storage_path.to_path_buf()).await?;
62
63 Ok(Self {
64 blob_client,
65 remote_blob_service,
66 compression_config,
67 })
68 }
69
70 pub async fn store_file(&self, file_path: &Path) -> Result<FileRef> {
104 info!("Storing file: {}", file_path.display());
105
106 let file_content = fs::read(file_path).await.map_err(|e| {
108 ClientError::FileStorage(format!(
109 "Failed to read file {}: {}",
110 file_path.display(),
111 e
112 ))
113 })?;
114
115 let original_size = file_content.len() as u64;
116 debug!("File size: {} bytes", original_size);
117
118 let (encrypted_data, encryption_info) =
120 ConvergentEncryption::encrypt_with_compression_config(
121 &file_content,
122 self.compression_config.clone(),
123 )?;
124
125 debug!(
126 "File encrypted, compressed: {}",
127 encryption_info.was_compressed
128 );
129
130 let blob_hash = self.blob_client.store_blob(encrypted_data.clone()).await?;
132
133 info!("File stored successfully with blob hash: {}", blob_hash);
134
135 match self.remote_blob_service.upload_blob(&encrypted_data).await {
137 Ok(remote_hash) => {
138 if remote_hash.to_hex() == blob_hash {
139 info!("File successfully pushed to remote storage: {}", blob_hash);
140 } else {
141 warn!(
142 "Remote hash mismatch: local={}, remote={}",
143 blob_hash, remote_hash
144 );
145 }
146 }
147 Err(e) => {
148 warn!("Failed to push file to remote storage: {}", e);
149 }
151 }
152
153 let content_type = file_path
155 .extension()
156 .and_then(|ext| ext.to_str())
157 .map(|ext| format!("application/{ext}"));
158
159 let filename = file_path
161 .file_name()
162 .and_then(|name| name.to_str())
163 .unwrap_or("unknown")
164 .to_string();
165
166 let mut stored_info = FileRef::new(blob_hash, encryption_info, Some(filename));
167
168 if let Some(content_type) = content_type {
169 stored_info = stored_info.with_content_type(content_type);
170 }
171
172 Ok(stored_info)
173 }
174
175 pub async fn store_data(
189 &self,
190 data: &[u8],
191 reference_name: &str,
192 content_type: Option<String>,
193 ) -> Result<FileRef> {
194 info!(
195 "Storing raw data: {} ({} bytes)",
196 reference_name,
197 data.len()
198 );
199
200 let _original_size = data.len() as u64; let (encrypted_data, encryption_info) =
204 ConvergentEncryption::encrypt_with_compression_config(
205 data,
206 self.compression_config.clone(),
207 )?;
208
209 debug!(
210 "Data encrypted, compressed: {}",
211 encryption_info.was_compressed
212 );
213
214 let blob_hash = self.blob_client.store_blob(encrypted_data.clone()).await?;
216
217 info!("Data stored successfully with blob hash: {}", blob_hash);
218
219 match self.remote_blob_service.upload_blob(&encrypted_data).await {
221 Ok(remote_hash) => {
222 if remote_hash.to_hex() == blob_hash {
223 info!("Data successfully pushed to remote storage: {}", blob_hash);
224 } else {
225 warn!(
226 "Remote hash mismatch: local={}, remote={}",
227 blob_hash, remote_hash
228 );
229 }
230 }
231 Err(e) => {
232 warn!("Failed to push data to remote storage: {}", e);
233 }
235 }
236
237 let mut stored_info =
238 FileRef::new(blob_hash, encryption_info, Some(reference_name.to_string()));
239
240 if let Some(content_type) = content_type {
241 stored_info = stored_info.with_content_type(content_type);
242 }
243
244 Ok(stored_info)
245 }
246
247 pub async fn retrieve_file(&self, stored_info: &FileRef) -> Result<Vec<u8>> {
291 info!("Retrieving file with blob hash: {}", stored_info.blob_hash);
292
293 let encrypted_data = match self.blob_client.get_blob(&stored_info.blob_hash).await? {
295 Some(data) => {
296 debug!("Retrieved blob from local storage");
297 data
298 }
299 None => {
300 let blob_id = match hex::decode(&stored_info.blob_hash) {
303 Ok(bytes) if bytes.len() == 32 => {
304 let mut array = [0u8; 32];
305 array.copy_from_slice(&bytes);
306 BlobId::from_bytes(array)
307 }
308 _ => {
309 return Err(ClientError::FileStorage(format!(
310 "Invalid blob hash: {}",
311 stored_info.blob_hash
312 )));
313 }
314 };
315
316 match self.remote_blob_service.get_blob(&blob_id).await {
317 Ok(remote_data) => {
318 info!("Successfully retrieved blob from remote storage");
319 if let Err(e) = self.blob_client.store_blob(remote_data.clone()).await {
321 warn!("Failed to cache remotely fetched blob locally: {}", e);
322 } else {
323 debug!("Cached remotely fetched blob to local storage");
324 }
325 remote_data
326 }
327 Err(e) => {
328 return Err(ClientError::FileStorage(format!(
329 "Blob not found locally or remotely with hash {}: {}",
330 stored_info.blob_hash, e
331 )));
332 }
333 }
334 }
335 };
336
337 debug!("Retrieved encrypted data: {} bytes", encrypted_data.len());
338
339 let decrypted_data =
341 ConvergentEncryption::decrypt(&encrypted_data, &stored_info.encryption_info)?;
342
343 info!(
344 "File decrypted successfully: {} bytes",
345 decrypted_data.len()
346 );
347
348 if decrypted_data.len() != stored_info.original_size() {
350 return Err(ClientError::FileStorage(format!(
351 "Decrypted file size mismatch: expected {}, got {}",
352 stored_info.original_size(),
353 decrypted_data.len()
354 )));
355 }
356
357 Ok(decrypted_data)
358 }
359
360 pub async fn has_file(&self, stored_info: &FileRef) -> Result<bool> {
370 self.blob_client
371 .has_blob(&stored_info.blob_hash)
372 .await
373 .map_err(Into::into)
374 }
375
376 pub async fn retrieve_file_to_disk(
413 &self,
414 stored_info: &FileRef,
415 output_path: &Path,
416 ) -> Result<()> {
417 let file_content = self.retrieve_file(stored_info).await?;
418
419 if let Some(parent) = output_path.parent() {
421 fs::create_dir_all(parent).await.map_err(|e| {
422 ClientError::FileStorage(format!(
423 "Failed to create parent directory {}: {}",
424 parent.display(),
425 e
426 ))
427 })?;
428 }
429
430 fs::write(output_path, file_content).await.map_err(|e| {
431 ClientError::FileStorage(format!(
432 "Failed to write file {}: {}",
433 output_path.display(),
434 e
435 ))
436 })?;
437
438 info!("File retrieved and saved to: {}", output_path.display());
439 Ok(())
440 }
441
442 pub fn blob_client(&self) -> &BlobClient {
444 &self.blob_client
445 }
446}
447
448#[cfg(test)]
449impl FileStorage<MockBlobStore> {
450 pub async fn new_for_test(blob_storage_path: &Path) -> Result<Self> {
452 let blob_client = BlobClient::new(blob_storage_path.to_path_buf()).await?;
453 let mut mock_blob_service = MockBlobStore::new();
454
455 mock_blob_service.expect_upload_blob().returning(|data| {
457 let hash = blake3::hash(data);
459 Ok(BlobId::from(hash))
460 });
461
462 mock_blob_service.expect_get_blob().returning(|_| {
463 Err(crate::services::BlobError::NotFound {
464 hash: BlobId::from_content(b"test"),
465 })
466 });
467
468 Ok(Self {
469 blob_client,
470 remote_blob_service: Arc::new(mock_blob_service),
471 compression_config: CompressionConfig::default(),
472 })
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use tempfile::tempdir;
480 use tokio::fs::File;
481 use tokio::io::AsyncWriteExt;
482
483 #[tokio::test]
484 async fn test_store_and_retrieve_file() {
485 let temp_dir = tempdir().unwrap();
486 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
487
488 let test_content = b"Hello, this is a test file for storage!";
490 let file_path = temp_dir.path().join("test_file.txt");
491 let mut file = File::create(&file_path).await.unwrap();
492 file.write_all(test_content).await.unwrap();
493 file.flush().await.unwrap();
494
495 let stored_info = storage.store_file(&file_path).await.unwrap();
497
498 assert_eq!(stored_info.original_size(), test_content.len());
500 assert_eq!(stored_info.filename(), Some("test_file.txt"));
501 assert!(stored_info.content_type.is_some());
502
503 let retrieved_content = storage.retrieve_file(&stored_info).await.unwrap();
505
506 assert_eq!(retrieved_content, test_content);
508 }
509
510 #[tokio::test]
511 async fn test_store_and_retrieve_data() {
512 let temp_dir = tempdir().unwrap();
513 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
514
515 let test_data = b"This is raw data without a file";
516 let reference_name = "test_data";
517 let content_type = Some("text/plain".to_string());
518
519 let stored_info = storage
521 .store_data(test_data, reference_name, content_type.clone())
522 .await
523 .unwrap();
524
525 assert_eq!(stored_info.original_size(), test_data.len());
527 assert_eq!(stored_info.filename(), Some(reference_name));
528 assert_eq!(stored_info.content_type, content_type);
529
530 let retrieved_data = storage.retrieve_file(&stored_info).await.unwrap();
532
533 assert_eq!(retrieved_data, test_data);
535 }
536
537 #[tokio::test]
538 async fn test_has_file() {
539 let temp_dir = tempdir().unwrap();
540 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
541
542 let test_data = b"Test data for existence check";
543 let stored_info = storage
544 .store_data(test_data, "existence_test", None)
545 .await
546 .unwrap();
547
548 assert!(storage.has_file(&stored_info).await.unwrap());
550
551 let fake_temp_dir = tempdir().unwrap();
554 let fake_storage = FileStorage::new_for_test(fake_temp_dir.path())
555 .await
556 .unwrap();
557 let fake_data = b"different fake data for different hash";
558 let fake_info_temp = fake_storage
559 .store_data(fake_data, "fake_test", None)
560 .await
561 .unwrap();
562
563 let fake_info = FileRef::new(
564 fake_info_temp.blob_hash, fake_info_temp.encryption_info,
566 Some("fake_file.txt".to_string()),
567 );
568
569 assert!(!storage.has_file(&fake_info).await.unwrap());
571 }
572
573 #[tokio::test]
574 async fn test_retrieve_file_to_disk() {
575 let temp_dir = tempdir().unwrap();
576 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
577
578 let test_content = b"Content to save to disk";
579 let stored_info = storage
580 .store_data(test_content, "disk_test", None)
581 .await
582 .unwrap();
583
584 let output_path = temp_dir.path().join("output_file.txt");
586 storage
587 .retrieve_file_to_disk(&stored_info, &output_path)
588 .await
589 .unwrap();
590
591 let saved_content = fs::read(&output_path).await.unwrap();
593 assert_eq!(saved_content, test_content);
594 }
595
596 #[tokio::test]
597 async fn test_convergent_encryption_property() {
598 let temp_dir = tempdir().unwrap();
599 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
600
601 let test_data = b"Same content for convergent test";
602
603 let stored_info1 = storage
605 .store_data(test_data, "convergent1", None)
606 .await
607 .unwrap();
608 let stored_info2 = storage
609 .store_data(test_data, "convergent2", None)
610 .await
611 .unwrap();
612
613 assert_eq!(stored_info1.blob_hash, stored_info2.blob_hash);
615 assert_eq!(
616 stored_info1.encryption_info.key,
617 stored_info2.encryption_info.key
618 );
619 }
620
621 #[tokio::test]
624 async fn test_file_storage_without_remote() {
625 let temp_dir = tempdir().unwrap();
626 let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
627
628 let test_data = b"Test data without remote";
630 let stored_info = storage
631 .store_data(test_data, "no_remote_test", None)
632 .await
633 .unwrap();
634
635 let retrieved_data = storage.retrieve_file(&stored_info).await.unwrap();
636 assert_eq!(retrieved_data, test_data);
637 }
638
639 #[tokio::test]
640 async fn test_remote_push_simulation() {
641 let temp_dir1 = tempdir().unwrap();
642 let temp_dir2 = tempdir().unwrap();
643
644 let local_storage = FileStorage::new_for_test(temp_dir1.path()).await.unwrap();
646 let remote_storage = FileStorage::new_for_test(temp_dir2.path()).await.unwrap();
647
648 let test_data = b"Test data for remote push simulation";
649
650 let stored_info = local_storage
652 .store_data(test_data, "remote_push_test", None)
653 .await
654 .unwrap();
655
656 let encrypted_data = {
658 let (encrypted, _) =
659 zoe_encrypted_storage::ConvergentEncryption::encrypt_with_compression_config(
660 test_data,
661 zoe_encrypted_storage::CompressionConfig::default(),
662 )
663 .unwrap();
664 encrypted
665 };
666
667 let remote_hash = remote_storage
668 .blob_client
669 .store_blob(encrypted_data)
670 .await
671 .unwrap();
672
673 assert_eq!(stored_info.blob_hash, remote_hash);
675 }
676
677 #[tokio::test]
678 async fn test_remote_fetch_simulation() {
679 let temp_dir1 = tempdir().unwrap();
680 let temp_dir2 = tempdir().unwrap();
681
682 let storage1 = FileStorage::new_for_test(temp_dir1.path()).await.unwrap();
684 let storage2 = FileStorage::new_for_test(temp_dir2.path()).await.unwrap();
685
686 let test_data = b"Test data for remote fetch simulation";
687
688 let stored_info = storage2
690 .store_data(test_data, "remote_fetch_test", None)
691 .await
692 .unwrap();
693
694 assert!(!storage1.has_file(&stored_info).await.unwrap());
696
697 let encrypted_data = storage2
699 .blob_client
700 .get_blob(&stored_info.blob_hash)
701 .await
702 .unwrap()
703 .unwrap();
704 storage1
705 .blob_client
706 .store_blob(encrypted_data)
707 .await
708 .unwrap();
709
710 assert!(storage1.has_file(&stored_info).await.unwrap());
712 let retrieved_data = storage1.retrieve_file(&stored_info).await.unwrap();
713 assert_eq!(retrieved_data, test_data);
714 }
715}