zoe_client/
file_storage.rs

1//! High-level file storage that encrypts files and stores them in blob storage
2//!
3//! This module provides a higher-level abstraction for storing files that:
4//! 1. Reads files from disk
5//! 2. Encrypts them using convergent encryption
6//! 3. Stores encrypted data in blob storage  
7//! 4. Returns metadata for later retrieval
8//!
9//! ## Usage
10//!
11//! ```rust,no_run
12//! use zoe_client::FileStorage;
13//! use std::path::Path;
14//! use tempfile::tempdir;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! let temp_dir = tempdir()?;
18//! let storage = FileStorage::new(temp_dir.path()).await?;
19//!
20//! // Store a file
21//! let file_path = Path::new("/path/to/my/file.txt");
22//! let stored_info = storage.store_file(file_path).await?;
23//!
24//! // Later, retrieve the file
25//! let retrieved_data = storage.retrieve_file(&stored_info).await?;
26//! # Ok(())
27//! # }
28//! ```
29
30use crate::error::{ClientError, Result};
31use crate::services::BlobStore;
32#[cfg(any(feature = "mock", test))]
33use crate::services::MockBlobStore;
34use hex;
35use std::path::Path;
36use std::sync::Arc;
37use tokio::fs;
38use tracing::{debug, info, warn};
39use zoe_app_primitives::FileRef;
40use zoe_blob_store::BlobClient;
41use zoe_encrypted_storage::{CompressionConfig, ConvergentEncryption};
42use zoe_wire_protocol::BlobId;
43
44// FileRef is now defined in zoe-app-primitives and imported above
45
46/// High-level file storage client that encrypts files and stores them as blobs
47#[derive(Clone)]
48pub struct FileStorage<B: BlobStore> {
49    blob_client: BlobClient,
50    remote_blob_service: Arc<B>,
51    compression_config: CompressionConfig,
52}
53
54impl<B: BlobStore> FileStorage<B> {
55    /// Create a new file storage client with remote blob service and custom compression
56    pub async fn new(
57        blob_storage_path: &Path,
58        remote_blob_service: Arc<B>,
59        compression_config: CompressionConfig,
60    ) -> Result<Self> {
61        let blob_client = BlobClient::new(blob_storage_path.to_path_buf()).await?;
62
63        Ok(Self {
64            blob_client,
65            remote_blob_service,
66            compression_config,
67        })
68    }
69
70    /// Store a file by reading from disk, encrypting, and storing in blob storage
71    ///
72    /// This method:
73    /// 1. Reads the file from the provided path
74    /// 2. Encrypts the content using convergent encryption
75    /// 3. Stores the encrypted data in the blob store
76    /// 4. Returns metadata needed to retrieve the file later
77    ///
78    /// # Arguments
79    ///
80    /// * `file_path` - Path to the file to store
81    ///
82    /// # Returns
83    ///
84    /// `FileRef` containing the blob hash, encryption info, and metadata
85    ///
86    /// # Example
87    ///
88    /// ```rust,no_run
89    /// # use zoe_client::FileStorage;
90    /// # use std::path::Path;
91    /// # use tempfile::tempdir;
92    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
93    /// let temp_dir = tempdir()?;
94    /// let storage = FileStorage::new(temp_dir.path()).await?;
95    ///
96    /// let file_path = Path::new("/path/to/document.pdf");
97    /// let stored_info = storage.store_file(file_path).await?;
98    ///
99    /// println!("File stored with blob hash: {}", stored_info.blob_hash);
100    /// # Ok(())
101    /// # }
102    /// ```
103    pub async fn store_file(&self, file_path: &Path) -> Result<FileRef> {
104        info!("Storing file: {}", file_path.display());
105
106        // Read file content
107        let file_content = fs::read(file_path).await.map_err(|e| {
108            ClientError::FileStorage(format!(
109                "Failed to read file {}: {}",
110                file_path.display(),
111                e
112            ))
113        })?;
114
115        let original_size = file_content.len() as u64;
116        debug!("File size: {} bytes", original_size);
117
118        // Encrypt the file content using convergent encryption
119        let (encrypted_data, encryption_info) =
120            ConvergentEncryption::encrypt_with_compression_config(
121                &file_content,
122                self.compression_config.clone(),
123            )?;
124
125        debug!(
126            "File encrypted, compressed: {}",
127            encryption_info.was_compressed
128        );
129
130        // Store encrypted data in blob storage
131        let blob_hash = self.blob_client.store_blob(encrypted_data.clone()).await?;
132
133        info!("File stored successfully with blob hash: {}", blob_hash);
134
135        // Push to remote blob service
136        match self.remote_blob_service.upload_blob(&encrypted_data).await {
137            Ok(remote_hash) => {
138                if remote_hash.to_hex() == blob_hash {
139                    info!("File successfully pushed to remote storage: {}", blob_hash);
140                } else {
141                    warn!(
142                        "Remote hash mismatch: local={}, remote={}",
143                        blob_hash, remote_hash
144                    );
145                }
146            }
147            Err(e) => {
148                warn!("Failed to push file to remote storage: {}", e);
149                // Don't fail the operation since local storage succeeded
150            }
151        }
152
153        // Determine content type from file extension
154        let content_type = file_path
155            .extension()
156            .and_then(|ext| ext.to_str())
157            .map(|ext| format!("application/{ext}"));
158
159        // Extract filename from path
160        let filename = file_path
161            .file_name()
162            .and_then(|name| name.to_str())
163            .unwrap_or("unknown")
164            .to_string();
165
166        let mut stored_info = FileRef::new(blob_hash, encryption_info, Some(filename));
167
168        if let Some(content_type) = content_type {
169            stored_info = stored_info.with_content_type(content_type);
170        }
171
172        Ok(stored_info)
173    }
174
175    /// Store raw data (not from a file) with encryption and blob storage
176    ///
177    /// This method allows storing arbitrary data without reading from disk.
178    ///
179    /// # Arguments
180    ///
181    /// * `data` - The raw data to store
182    /// * `reference_name` - A reference name for the data (used in metadata)
183    /// * `content_type` - Optional content type for metadata
184    ///
185    /// # Returns
186    ///
187    /// `FileRef` containing the blob hash, encryption info, and metadata
188    pub async fn store_data(
189        &self,
190        data: &[u8],
191        reference_name: &str,
192        content_type: Option<String>,
193    ) -> Result<FileRef> {
194        info!(
195            "Storing raw data: {} ({} bytes)",
196            reference_name,
197            data.len()
198        );
199
200        let _original_size = data.len() as u64; // Size is tracked in encryption_info
201
202        // Encrypt the data using convergent encryption
203        let (encrypted_data, encryption_info) =
204            ConvergentEncryption::encrypt_with_compression_config(
205                data,
206                self.compression_config.clone(),
207            )?;
208
209        debug!(
210            "Data encrypted, compressed: {}",
211            encryption_info.was_compressed
212        );
213
214        // Store encrypted data in blob storage
215        let blob_hash = self.blob_client.store_blob(encrypted_data.clone()).await?;
216
217        info!("Data stored successfully with blob hash: {}", blob_hash);
218
219        // Push to remote blob service if available
220        match self.remote_blob_service.upload_blob(&encrypted_data).await {
221            Ok(remote_hash) => {
222                if remote_hash.to_hex() == blob_hash {
223                    info!("Data successfully pushed to remote storage: {}", blob_hash);
224                } else {
225                    warn!(
226                        "Remote hash mismatch: local={}, remote={}",
227                        blob_hash, remote_hash
228                    );
229                }
230            }
231            Err(e) => {
232                warn!("Failed to push data to remote storage: {}", e);
233                // Don't fail the operation since local storage succeeded
234            }
235        }
236
237        let mut stored_info =
238            FileRef::new(blob_hash, encryption_info, Some(reference_name.to_string()));
239
240        if let Some(content_type) = content_type {
241            stored_info = stored_info.with_content_type(content_type);
242        }
243
244        Ok(stored_info)
245    }
246
247    /// Retrieve a file from storage and decrypt it
248    ///
249    /// This method:
250    /// 1. Retrieves the encrypted data from blob storage using the hash
251    /// 2. Decrypts the data using the provided encryption info
252    /// 3. Returns the original file content
253    ///
254    /// # Arguments
255    ///
256    /// * `stored_info` - Metadata from when the file was stored
257    ///
258    /// # Returns
259    ///
260    /// The original file content as bytes
261    ///
262    /// # Example
263    ///
264    /// ```rust,no_run
265    /// # use zoe_client::FileStorage;
266    /// # use std::path::Path;
267    /// # use tempfile::tempdir;
268    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
269    /// let temp_dir = tempdir()?;
270    /// let storage = FileStorage::new(temp_dir.path()).await?;
271    ///
272    /// // Assume we have stored_info from a previous store_file call
273    /// # let stored_info = zoe_client::FileRef {
274    /// #     blob_hash: "example".to_string(),
275    /// #     encryption_info: zoe_encrypted_storage::ConvergentEncryptionInfo {
276    /// #         key: [0; 32],
277    /// #         was_compressed: false,
278    /// #         source_size: 100,
279    /// #     },
280    /// #     filename: Some("example.txt".to_string()),
281    /// #     content_type: None,
282    /// #     metadata: vec![],
283    /// # };
284    ///
285    /// let file_content = storage.retrieve_file(&stored_info).await?;
286    /// println!("Retrieved {} bytes", file_content.len());
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub async fn retrieve_file(&self, stored_info: &FileRef) -> Result<Vec<u8>> {
291        info!("Retrieving file with blob hash: {}", stored_info.blob_hash);
292
293        // Get encrypted data from blob storage
294        let encrypted_data = match self.blob_client.get_blob(&stored_info.blob_hash).await? {
295            Some(data) => {
296                debug!("Retrieved blob from local storage");
297                data
298            }
299            None => {
300                // Try to fetch from remote if available
301                // Convert string hash to BlobId
302                let blob_id = match hex::decode(&stored_info.blob_hash) {
303                    Ok(bytes) if bytes.len() == 32 => {
304                        let mut array = [0u8; 32];
305                        array.copy_from_slice(&bytes);
306                        BlobId::from_bytes(array)
307                    }
308                    _ => {
309                        return Err(ClientError::FileStorage(format!(
310                            "Invalid blob hash: {}",
311                            stored_info.blob_hash
312                        )));
313                    }
314                };
315
316                match self.remote_blob_service.get_blob(&blob_id).await {
317                    Ok(remote_data) => {
318                        info!("Successfully retrieved blob from remote storage");
319                        // Store the fetched data locally for future use
320                        if let Err(e) = self.blob_client.store_blob(remote_data.clone()).await {
321                            warn!("Failed to cache remotely fetched blob locally: {}", e);
322                        } else {
323                            debug!("Cached remotely fetched blob to local storage");
324                        }
325                        remote_data
326                    }
327                    Err(e) => {
328                        return Err(ClientError::FileStorage(format!(
329                            "Blob not found locally or remotely with hash {}: {}",
330                            stored_info.blob_hash, e
331                        )));
332                    }
333                }
334            }
335        };
336
337        debug!("Retrieved encrypted data: {} bytes", encrypted_data.len());
338
339        // Decrypt the data
340        let decrypted_data =
341            ConvergentEncryption::decrypt(&encrypted_data, &stored_info.encryption_info)?;
342
343        info!(
344            "File decrypted successfully: {} bytes",
345            decrypted_data.len()
346        );
347
348        // Verify the size matches expectations
349        if decrypted_data.len() != stored_info.original_size() {
350            return Err(ClientError::FileStorage(format!(
351                "Decrypted file size mismatch: expected {}, got {}",
352                stored_info.original_size(),
353                decrypted_data.len()
354            )));
355        }
356
357        Ok(decrypted_data)
358    }
359
360    /// Check if a file exists in storage
361    ///
362    /// # Arguments
363    ///
364    /// * `stored_info` - Metadata from when the file was stored
365    ///
366    /// # Returns
367    ///
368    /// `true` if the file exists in storage, `false` otherwise
369    pub async fn has_file(&self, stored_info: &FileRef) -> Result<bool> {
370        self.blob_client
371            .has_blob(&stored_info.blob_hash)
372            .await
373            .map_err(Into::into)
374    }
375
376    /// Save retrieved file content to disk
377    ///
378    /// This is a convenience method that combines `retrieve_file` with writing to disk.
379    ///
380    /// # Arguments
381    ///
382    /// * `stored_info` - Metadata from when the file was stored
383    /// * `output_path` - Path where to write the retrieved file
384    ///
385    /// # Example
386    ///
387    /// ```rust,no_run
388    /// # use zoe_client::FileStorage;
389    /// # use std::path::Path;
390    /// # use tempfile::tempdir;
391    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
392    /// let temp_dir = tempdir()?;
393    /// let storage = FileStorage::new(temp_dir.path()).await?;
394    ///
395    /// # let stored_info = zoe_client::FileRef {
396    /// #     blob_hash: "example".to_string(),
397    /// #     encryption_info: zoe_encrypted_storage::ConvergentEncryptionInfo {
398    /// #         key: [0; 32],
399    /// #         was_compressed: false,
400    /// #         source_size: 100,
401    /// #     },
402    /// #     filename: Some("example.txt".to_string()),
403    /// #     content_type: None,
404    /// #     metadata: vec![],
405    /// # };
406    ///
407    /// let output_path = Path::new("/tmp/retrieved_file.txt");
408    /// storage.retrieve_file_to_disk(&stored_info, output_path).await?;
409    /// # Ok(())
410    /// # }
411    /// ```
412    pub async fn retrieve_file_to_disk(
413        &self,
414        stored_info: &FileRef,
415        output_path: &Path,
416    ) -> Result<()> {
417        let file_content = self.retrieve_file(stored_info).await?;
418
419        // Ensure parent directory exists
420        if let Some(parent) = output_path.parent() {
421            fs::create_dir_all(parent).await.map_err(|e| {
422                ClientError::FileStorage(format!(
423                    "Failed to create parent directory {}: {}",
424                    parent.display(),
425                    e
426                ))
427            })?;
428        }
429
430        fs::write(output_path, file_content).await.map_err(|e| {
431            ClientError::FileStorage(format!(
432                "Failed to write file {}: {}",
433                output_path.display(),
434                e
435            ))
436        })?;
437
438        info!("File retrieved and saved to: {}", output_path.display());
439        Ok(())
440    }
441
442    /// Get the underlying blob client for advanced operations
443    pub fn blob_client(&self) -> &BlobClient {
444        &self.blob_client
445    }
446}
447
448#[cfg(test)]
449impl FileStorage<MockBlobStore> {
450    /// Create a new file storage client for testing with a mock blob service
451    pub async fn new_for_test(blob_storage_path: &Path) -> Result<Self> {
452        let blob_client = BlobClient::new(blob_storage_path.to_path_buf()).await?;
453        let mut mock_blob_service = MockBlobStore::new();
454
455        // Set up mock expectations to return the hash of the uploaded data
456        mock_blob_service.expect_upload_blob().returning(|data| {
457            // Return a deterministic hash based on the data
458            let hash = blake3::hash(data);
459            Ok(BlobId::from(hash))
460        });
461
462        mock_blob_service.expect_get_blob().returning(|_| {
463            Err(crate::services::BlobError::NotFound {
464                hash: BlobId::from_content(b"test"),
465            })
466        });
467
468        Ok(Self {
469            blob_client,
470            remote_blob_service: Arc::new(mock_blob_service),
471            compression_config: CompressionConfig::default(),
472        })
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use tempfile::tempdir;
480    use tokio::fs::File;
481    use tokio::io::AsyncWriteExt;
482
483    #[tokio::test]
484    async fn test_store_and_retrieve_file() {
485        let temp_dir = tempdir().unwrap();
486        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
487
488        // Create a temporary file with test content
489        let test_content = b"Hello, this is a test file for storage!";
490        let file_path = temp_dir.path().join("test_file.txt");
491        let mut file = File::create(&file_path).await.unwrap();
492        file.write_all(test_content).await.unwrap();
493        file.flush().await.unwrap();
494
495        // Store the file
496        let stored_info = storage.store_file(&file_path).await.unwrap();
497
498        // Verify metadata
499        assert_eq!(stored_info.original_size(), test_content.len());
500        assert_eq!(stored_info.filename(), Some("test_file.txt"));
501        assert!(stored_info.content_type.is_some());
502
503        // Retrieve the file
504        let retrieved_content = storage.retrieve_file(&stored_info).await.unwrap();
505
506        // Verify content matches
507        assert_eq!(retrieved_content, test_content);
508    }
509
510    #[tokio::test]
511    async fn test_store_and_retrieve_data() {
512        let temp_dir = tempdir().unwrap();
513        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
514
515        let test_data = b"This is raw data without a file";
516        let reference_name = "test_data";
517        let content_type = Some("text/plain".to_string());
518
519        // Store raw data
520        let stored_info = storage
521            .store_data(test_data, reference_name, content_type.clone())
522            .await
523            .unwrap();
524
525        // Verify metadata
526        assert_eq!(stored_info.original_size(), test_data.len());
527        assert_eq!(stored_info.filename(), Some(reference_name));
528        assert_eq!(stored_info.content_type, content_type);
529
530        // Retrieve the data
531        let retrieved_data = storage.retrieve_file(&stored_info).await.unwrap();
532
533        // Verify content matches
534        assert_eq!(retrieved_data, test_data);
535    }
536
537    #[tokio::test]
538    async fn test_has_file() {
539        let temp_dir = tempdir().unwrap();
540        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
541
542        let test_data = b"Test data for existence check";
543        let stored_info = storage
544            .store_data(test_data, "existence_test", None)
545            .await
546            .unwrap();
547
548        // Check that file exists
549        assert!(storage.has_file(&stored_info).await.unwrap());
550
551        // Create a fake stored info that doesn't exist in the current storage
552        // Use a different temp directory to generate a hash that won't exist in the main storage
553        let fake_temp_dir = tempdir().unwrap();
554        let fake_storage = FileStorage::new_for_test(fake_temp_dir.path())
555            .await
556            .unwrap();
557        let fake_data = b"different fake data for different hash";
558        let fake_info_temp = fake_storage
559            .store_data(fake_data, "fake_test", None)
560            .await
561            .unwrap();
562
563        let fake_info = FileRef::new(
564            fake_info_temp.blob_hash, // This hash exists in fake_storage, but not in main storage
565            fake_info_temp.encryption_info,
566            Some("fake_file.txt".to_string()),
567        );
568
569        // Check that fake file doesn't exist
570        assert!(!storage.has_file(&fake_info).await.unwrap());
571    }
572
573    #[tokio::test]
574    async fn test_retrieve_file_to_disk() {
575        let temp_dir = tempdir().unwrap();
576        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
577
578        let test_content = b"Content to save to disk";
579        let stored_info = storage
580            .store_data(test_content, "disk_test", None)
581            .await
582            .unwrap();
583
584        // Retrieve to a new file
585        let output_path = temp_dir.path().join("output_file.txt");
586        storage
587            .retrieve_file_to_disk(&stored_info, &output_path)
588            .await
589            .unwrap();
590
591        // Verify the file was written correctly
592        let saved_content = fs::read(&output_path).await.unwrap();
593        assert_eq!(saved_content, test_content);
594    }
595
596    #[tokio::test]
597    async fn test_convergent_encryption_property() {
598        let temp_dir = tempdir().unwrap();
599        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
600
601        let test_data = b"Same content for convergent test";
602
603        // Store the same data twice
604        let stored_info1 = storage
605            .store_data(test_data, "convergent1", None)
606            .await
607            .unwrap();
608        let stored_info2 = storage
609            .store_data(test_data, "convergent2", None)
610            .await
611            .unwrap();
612
613        // Should produce the same blob hash (convergent property)
614        assert_eq!(stored_info1.blob_hash, stored_info2.blob_hash);
615        assert_eq!(
616            stored_info1.encryption_info.key,
617            stored_info2.encryption_info.key
618        );
619    }
620
621    // We can't easily test with real BlobService due to connection dependencies,
622    // so these tests focus on the logic flow and error handling
623    #[tokio::test]
624    async fn test_file_storage_without_remote() {
625        let temp_dir = tempdir().unwrap();
626        let storage = FileStorage::new_for_test(temp_dir.path()).await.unwrap();
627
628        // Test that normal operations work without remote service
629        let test_data = b"Test data without remote";
630        let stored_info = storage
631            .store_data(test_data, "no_remote_test", None)
632            .await
633            .unwrap();
634
635        let retrieved_data = storage.retrieve_file(&stored_info).await.unwrap();
636        assert_eq!(retrieved_data, test_data);
637    }
638
639    #[tokio::test]
640    async fn test_remote_push_simulation() {
641        let temp_dir1 = tempdir().unwrap();
642        let temp_dir2 = tempdir().unwrap();
643
644        // Create two separate storages to simulate local and remote
645        let local_storage = FileStorage::new_for_test(temp_dir1.path()).await.unwrap();
646        let remote_storage = FileStorage::new_for_test(temp_dir2.path()).await.unwrap();
647
648        let test_data = b"Test data for remote push simulation";
649
650        // Store in local
651        let stored_info = local_storage
652            .store_data(test_data, "remote_push_test", None)
653            .await
654            .unwrap();
655
656        // Simulate remote push by storing the same encrypted data
657        let encrypted_data = {
658            let (encrypted, _) =
659                zoe_encrypted_storage::ConvergentEncryption::encrypt_with_compression_config(
660                    test_data,
661                    zoe_encrypted_storage::CompressionConfig::default(),
662                )
663                .unwrap();
664            encrypted
665        };
666
667        let remote_hash = remote_storage
668            .blob_client
669            .store_blob(encrypted_data)
670            .await
671            .unwrap();
672
673        // Verify the hashes should match (convergent encryption)
674        assert_eq!(stored_info.blob_hash, remote_hash);
675    }
676
677    #[tokio::test]
678    async fn test_remote_fetch_simulation() {
679        let temp_dir1 = tempdir().unwrap();
680        let temp_dir2 = tempdir().unwrap();
681
682        // Create storages
683        let storage1 = FileStorage::new_for_test(temp_dir1.path()).await.unwrap();
684        let storage2 = FileStorage::new_for_test(temp_dir2.path()).await.unwrap();
685
686        let test_data = b"Test data for remote fetch simulation";
687
688        // Store data in storage2 (simulating remote)
689        let stored_info = storage2
690            .store_data(test_data, "remote_fetch_test", None)
691            .await
692            .unwrap();
693
694        // Verify storage1 doesn't have it
695        assert!(!storage1.has_file(&stored_info).await.unwrap());
696
697        // Manually transfer blob (simulating remote fetch)
698        let encrypted_data = storage2
699            .blob_client
700            .get_blob(&stored_info.blob_hash)
701            .await
702            .unwrap()
703            .unwrap();
704        storage1
705            .blob_client
706            .store_blob(encrypted_data)
707            .await
708            .unwrap();
709
710        // Now storage1 should have it and be able to retrieve
711        assert!(storage1.has_file(&stored_info).await.unwrap());
712        let retrieved_data = storage1.retrieve_file(&stored_info).await.unwrap();
713        assert_eq!(retrieved_data, test_data);
714    }
715}