1use std::path::Path;
8
9use tracing::warn;
10
11use crate::blob::BlobUploadObserver;
12use crate::db::SyncBookkeeping;
13use crate::encryption::EncryptionService;
14use crate::storage::cloud::CloudHome;
15
16pub async fn process_uploads(
22 db: &dyn SyncBookkeeping,
23 cloud_home: &dyn CloudHome,
24 encryption: &std::sync::RwLock<EncryptionService>,
25 library_dir: &Path,
26 observer: Option<&dyn BlobUploadObserver>,
27) -> Result<usize, String> {
28 let uploads = db
29 .get_pending_cloud_uploads()
30 .await
31 .map_err(|e| format!("Failed to get pending uploads: {e}"))?;
32
33 let mut count = 0;
34 for entry in uploads {
35 let file_path = match &entry.source_path {
36 Some(p) => std::path::PathBuf::from(p),
37 None => library_dir.join(crate::storage::local::storage_path(&entry.file_id)),
38 };
39
40 let data = match tokio::fs::read(&file_path).await {
41 Ok(d) => d,
42 Err(e) => {
43 warn!(
44 "Upload failed: cannot read local file {}: {e}",
45 file_path.display()
46 );
47 break;
48 }
49 };
50
51 let encrypted = {
52 let enc = encryption.read().unwrap();
53 enc.encrypt(&data)
54 };
55
56 match cloud_home.write(&entry.cloud_key, encrypted).await {
57 Ok(()) => {
58 if let Err(e) = db.remove_cloud_outbox_entry(entry.id).await {
59 warn!("Failed to remove outbox entry {}: {e}", entry.id);
60 }
61 count += 1;
62
63 if let Some(obs) = observer {
64 obs.on_blob_uploaded(&entry.file_id).await;
65 }
66 }
67 Err(e) => {
68 warn!("Upload failed for {}: {e}", entry.cloud_key);
69 break;
70 }
71 }
72 }
73
74 Ok(count)
75}
76
77pub async fn process_deletes(
82 db: &dyn SyncBookkeeping,
83 cloud_home: &dyn CloudHome,
84 device_head_seqs: &[u64],
85) -> Result<usize, String> {
86 let deletes = db
87 .get_pending_cloud_deletes()
88 .await
89 .map_err(|e| format!("Failed to get pending deletes: {e}"))?;
90
91 if deletes.is_empty() {
92 return Ok(0);
93 }
94
95 let min_head = device_head_seqs.iter().copied().min();
96
97 let mut count = 0;
98 for entry in deletes {
99 if let Some(min_seq) = entry.min_seq {
100 if let Some(head) = min_head {
101 if head <= min_seq {
102 continue;
103 }
104 }
105 }
106
107 match cloud_home.delete(&entry.cloud_key).await {
108 Ok(()) => {
109 if let Err(e) = db.remove_cloud_outbox_entry(entry.id).await {
110 warn!("Failed to remove outbox entry {}: {e}", entry.id);
111 }
112 count += 1;
113 }
114 Err(e) => {
115 warn!("Delete failed for {}: {e}", entry.cloud_key);
116 }
118 }
119 }
120
121 Ok(count)
122}