1use async_trait::async_trait;
7
8use super::oauth_session::OAuthSession;
9use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
10use crate::clock::ClockRef;
11use crate::keys::KeyService;
12use crate::oauth::{OAuthConfig, OAuthTokens};
13
14const DRIVE_API: &str = "https://www.googleapis.com/drive/v3";
15const UPLOAD_API: &str = "https://www.googleapis.com/upload/drive/v3";
16
17pub struct GoogleDriveCloudHome {
19 client: reqwest::Client,
20 folder_id: String,
21 session: OAuthSession,
22}
23
24impl GoogleDriveCloudHome {
25 pub fn new(
26 folder_id: String,
27 tokens: OAuthTokens,
28 key_service: KeyService,
29 clock: ClockRef,
30 ) -> Self {
31 Self {
32 client: reqwest::Client::new(),
33 folder_id,
34 session: OAuthSession::new(
35 tokens,
36 key_service,
37 clock,
38 Self::oauth_config(),
39 "Google Drive",
40 ),
41 }
42 }
43
44 pub fn oauth_config() -> OAuthConfig {
45 let creds = crate::oauth::oauth_client_creds("google_drive");
46 OAuthConfig {
47 client_id: creds.client_id,
48 client_secret: creds.client_secret,
49 auth_url: "https://accounts.google.com/o/oauth2/v2/auth".to_string(),
50 token_url: "https://oauth2.googleapis.com/token".to_string(),
51 scopes: vec!["https://www.googleapis.com/auth/drive.file".to_string()],
52 redirect_port: 19284,
53 extra_auth_params: vec![("access_type".to_string(), "offline".to_string())],
54 }
55 }
56
57 fn encode_key(key: &str) -> String {
60 key.replace('/', "__")
61 }
62
63 fn decode_key(filename: &str) -> String {
66 filename.replace("__", "/")
67 }
68
69 fn encode_prefix(prefix: &str) -> String {
72 prefix.replace('/', "__")
73 }
74
75 async fn api_call(
77 &self,
78 build_request: impl Fn(&str) -> reqwest::RequestBuilder,
79 ) -> Result<reqwest::Response, CloudHomeError> {
80 self.session.api_call(build_request).await
81 }
82
83 async fn find_file_id(&self, encoded_name: &str) -> Result<Option<String>, CloudHomeError> {
85 let query = format!(
86 "'{}' in parents and name = '{}' and trashed = false",
87 self.folder_id, encoded_name
88 );
89
90 let resp = self
91 .api_call(|token| {
92 self.client
93 .get(format!("{}/files", DRIVE_API))
94 .bearer_auth(token)
95 .query(&[
96 ("q", query.as_str()),
97 ("fields", "files(id)"),
98 ("pageSize", "1"),
99 ])
100 })
101 .await?;
102
103 let status = resp.status();
104 let body = resp
105 .text()
106 .await
107 .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
108
109 if !status.is_success() {
110 return Err(CloudHomeError::Storage(format!(
111 "list files (HTTP {status}): {body}"
112 )));
113 }
114
115 let json: serde_json::Value = serde_json::from_str(&body)
116 .map_err(|e| CloudHomeError::Storage(format!("parse list response: {e}")))?;
117
118 if let Some(files) = json["files"].as_array() {
119 if let Some(first) = files.first() {
120 if let Some(id) = first["id"].as_str() {
121 return Ok(Some(id.to_string()));
122 }
123 }
124 }
125
126 Ok(None)
127 }
128}
129
130#[async_trait]
131impl CloudHome for GoogleDriveCloudHome {
132 async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
133 let encoded = Self::encode_key(key);
134
135 if let Some(file_id) = self.find_file_id(&encoded).await? {
137 let resp = self
139 .api_call(|token| {
140 self.client
141 .patch(format!("{}/files/{}?uploadType=media", UPLOAD_API, file_id))
142 .bearer_auth(token)
143 .header("Content-Type", "application/octet-stream")
144 .body(data.clone())
145 })
146 .await?;
147
148 let status = resp.status();
149 if !status.is_success() {
150 let body = resp
151 .text()
152 .await
153 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
154 return Err(CloudHomeError::Storage(format!(
155 "update {key} (HTTP {status}): {body}"
156 )));
157 }
158 } else {
159 let metadata = serde_json::json!({
161 "name": encoded,
162 "parents": [self.folder_id],
163 });
164
165 let boundary = "bae_multipart_boundary";
166 let mut body = Vec::new();
167
168 body.extend_from_slice(format!("--{boundary}\r\n").as_bytes());
170 body.extend_from_slice(b"Content-Type: application/json; charset=UTF-8\r\n\r\n");
171 body.extend_from_slice(metadata.to_string().as_bytes());
172 body.extend_from_slice(b"\r\n");
173
174 body.extend_from_slice(format!("--{boundary}\r\n").as_bytes());
176 body.extend_from_slice(b"Content-Type: application/octet-stream\r\n\r\n");
177 body.extend_from_slice(&data);
178 body.extend_from_slice(b"\r\n");
179
180 body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes());
182
183 let resp = self
184 .api_call(|token| {
185 self.client
186 .post(format!("{}/files?uploadType=multipart", UPLOAD_API))
187 .bearer_auth(token)
188 .header(
189 "Content-Type",
190 format!("multipart/related; boundary={boundary}"),
191 )
192 .body(body.clone())
193 })
194 .await?;
195
196 let status = resp.status();
197 if !status.is_success() {
198 let body = resp
199 .text()
200 .await
201 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
202 return Err(CloudHomeError::Storage(format!(
203 "create {key} (HTTP {status}): {body}"
204 )));
205 }
206 }
207
208 Ok(())
209 }
210
211 async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
212 let encoded = Self::encode_key(key);
213 let file_id = self
214 .find_file_id(&encoded)
215 .await?
216 .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))?;
217
218 let resp = self
219 .api_call(|token| {
220 self.client
221 .get(format!("{}/files/{}", DRIVE_API, file_id))
222 .bearer_auth(token)
223 .query(&[("alt", "media")])
224 })
225 .await?;
226
227 let status = resp.status();
228 if status == reqwest::StatusCode::NOT_FOUND {
229 return Err(CloudHomeError::NotFound(key.to_string()));
230 }
231 if !status.is_success() {
232 let body = resp
233 .text()
234 .await
235 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
236 return Err(CloudHomeError::Storage(format!(
237 "read {key} (HTTP {status}): {body}"
238 )));
239 }
240
241 let bytes = resp
242 .bytes()
243 .await
244 .map_err(|e| CloudHomeError::Storage(format!("read body for {key}: {e}")))?;
245
246 Ok(bytes.to_vec())
247 }
248
249 async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
250 let encoded = Self::encode_key(key);
251 let file_id = self
252 .find_file_id(&encoded)
253 .await?
254 .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))?;
255
256 let range = format!("bytes={}-{}", start, end.saturating_sub(1));
257
258 let resp = self
259 .api_call(|token| {
260 self.client
261 .get(format!("{}/files/{}", DRIVE_API, file_id))
262 .bearer_auth(token)
263 .query(&[("alt", "media")])
264 .header("Range", &range)
265 })
266 .await?;
267
268 let status = resp.status();
269 if status == reqwest::StatusCode::NOT_FOUND {
270 return Err(CloudHomeError::NotFound(key.to_string()));
271 }
272 if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
273 let body = resp
274 .text()
275 .await
276 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
277 return Err(CloudHomeError::Storage(format!(
278 "read range {key} (HTTP {status}): {body}"
279 )));
280 }
281
282 let bytes = resp
283 .bytes()
284 .await
285 .map_err(|e| CloudHomeError::Storage(format!("read range body for {key}: {e}")))?;
286
287 Ok(bytes.to_vec())
288 }
289
290 async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
291 let encoded_prefix = Self::encode_prefix(prefix);
292
293 let query = format!(
295 "'{}' in parents and name contains '{}' and trashed = false",
296 self.folder_id, encoded_prefix
297 );
298
299 let mut all_keys = Vec::new();
300 let mut page_token: Option<String> = None;
301
302 loop {
303 let query_ref = query.clone();
304 let page_ref = page_token.clone();
305
306 let resp = self
307 .api_call(|token| {
308 let mut req = self
309 .client
310 .get(format!("{}/files", DRIVE_API))
311 .bearer_auth(token)
312 .query(&[
313 ("q", query_ref.as_str()),
314 ("fields", "nextPageToken,files(name)"),
315 ("pageSize", "1000"),
316 ]);
317 if let Some(ref pt) = page_ref {
318 req = req.query(&[("pageToken", pt.as_str())]);
319 }
320 req
321 })
322 .await?;
323
324 let status = resp.status();
325 let body = resp
326 .text()
327 .await
328 .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
329
330 if !status.is_success() {
331 return Err(CloudHomeError::Storage(format!(
332 "list {prefix} (HTTP {status}): {body}"
333 )));
334 }
335
336 let json: serde_json::Value = serde_json::from_str(&body)
337 .map_err(|e| CloudHomeError::Storage(format!("parse list: {e}")))?;
338
339 if let Some(files) = json["files"].as_array() {
340 for file in files {
341 if let Some(name) = file["name"].as_str() {
342 let decoded = Self::decode_key(name);
343 if decoded.starts_with(prefix) {
345 all_keys.push(decoded);
346 }
347 }
348 }
349 }
350
351 if let Some(next) = json["nextPageToken"].as_str() {
352 page_token = Some(next.to_string());
353 } else {
354 break;
355 }
356 }
357
358 Ok(all_keys)
359 }
360
361 async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
362 let encoded = Self::encode_key(key);
363
364 if let Some(file_id) = self.find_file_id(&encoded).await? {
365 let resp = self
366 .api_call(|token| {
367 self.client
368 .delete(format!("{}/files/{}", DRIVE_API, file_id))
369 .bearer_auth(token)
370 })
371 .await?;
372
373 let status = resp.status();
374 if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
376 let body = resp
377 .text()
378 .await
379 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
380 return Err(CloudHomeError::Storage(format!(
381 "delete {key} (HTTP {status}): {body}"
382 )));
383 }
384 }
385
386 Ok(())
387 }
388
389 async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
390 let encoded = Self::encode_key(key);
391 Ok(self.find_file_id(&encoded).await?.is_some())
392 }
393
394 async fn grant_access(&self, member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
395 let permission = serde_json::json!({
397 "type": "user",
398 "role": "writer",
399 "emailAddress": member_id,
400 });
401
402 let resp = self
403 .api_call(|token| {
404 self.client
405 .post(format!(
406 "{}/files/{}/permissions",
407 DRIVE_API, self.folder_id
408 ))
409 .bearer_auth(token)
410 .json(&permission)
411 })
412 .await?;
413
414 let status = resp.status();
415 if !status.is_success() {
416 let body = resp
417 .text()
418 .await
419 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
420 return Err(CloudHomeError::Storage(format!(
421 "grant access to {member_id} (HTTP {status}): {body}"
422 )));
423 }
424
425 Ok(CloudHomeJoinInfo::GoogleDrive {
426 folder_id: self.folder_id.clone(),
427 })
428 }
429
430 async fn revoke_access(&self, member_id: &str) -> Result<(), CloudHomeError> {
431 let resp = self
433 .api_call(|token| {
434 self.client
435 .get(format!(
436 "{}/files/{}/permissions",
437 DRIVE_API, self.folder_id
438 ))
439 .bearer_auth(token)
440 .query(&[("fields", "permissions(id,emailAddress)")])
441 })
442 .await?;
443
444 let status = resp.status();
445 let body = resp
446 .text()
447 .await
448 .map_err(|e| CloudHomeError::Storage(format!("read body: {e}")))?;
449
450 if !status.is_success() {
451 return Err(CloudHomeError::Storage(format!(
452 "list permissions (HTTP {status}): {body}"
453 )));
454 }
455
456 let json: serde_json::Value = serde_json::from_str(&body)
457 .map_err(|e| CloudHomeError::Storage(format!("parse permissions: {e}")))?;
458
459 let permission_id = json["permissions"]
460 .as_array()
461 .and_then(|perms| {
462 perms.iter().find_map(|p| {
463 if p["emailAddress"].as_str() == Some(member_id) {
464 p["id"].as_str().map(|s| s.to_string())
465 } else {
466 None
467 }
468 })
469 })
470 .ok_or_else(|| {
471 CloudHomeError::Storage(format!("no permission found for {member_id}"))
472 })?;
473
474 let resp = self
476 .api_call(|token| {
477 self.client
478 .delete(format!(
479 "{}/files/{}/permissions/{}",
480 DRIVE_API, self.folder_id, permission_id
481 ))
482 .bearer_auth(token)
483 })
484 .await?;
485
486 let status = resp.status();
487 if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
488 let body = resp
489 .text()
490 .await
491 .unwrap_or_else(|e| format!("<body read failed: {e}>"));
492 return Err(CloudHomeError::Storage(format!(
493 "revoke access for {member_id} (HTTP {status}): {body}"
494 )));
495 }
496
497 Ok(())
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn encode_key_replaces_slashes() {
507 assert_eq!(
508 GoogleDriveCloudHome::encode_key("changes/dev1/42.enc"),
509 "changes__dev1__42.enc"
510 );
511 }
512
513 #[test]
514 fn decode_key_restores_slashes() {
515 assert_eq!(
516 GoogleDriveCloudHome::decode_key("changes__dev1__42.enc"),
517 "changes/dev1/42.enc"
518 );
519 }
520
521 #[test]
522 fn encode_decode_roundtrip() {
523 let keys = [
524 "snapshot.db.enc",
525 "changes/device-abc/1.enc",
526 "heads/device-abc.json.enc",
527 "images/cover.jpg",
528 ];
529 for key in keys {
530 let encoded = GoogleDriveCloudHome::encode_key(key);
531 let decoded = GoogleDriveCloudHome::decode_key(&encoded);
532 assert_eq!(decoded, key);
533 }
534 }
535
536 #[test]
537 fn encode_prefix_for_query() {
538 assert_eq!(
539 GoogleDriveCloudHome::encode_prefix("changes/dev1/"),
540 "changes__dev1__"
541 );
542 }
543}