coven/storage/cloud/
s3.rs

1//! S3-backed `CloudHome` implementation.
2//!
3//! Wraps `aws-sdk-s3` to provide raw storage operations against any
4//! S3-compatible endpoint.
5
6use async_trait::async_trait;
7use aws_config::{BehaviorVersion, Region};
8use aws_credential_types::Credentials;
9use aws_sdk_s3::Client;
10
11use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
12
13/// S3-backed cloud home.
14pub struct S3CloudHome {
15    client: Client,
16    bucket: String,
17    region: String,
18    endpoint: Option<String>,
19    access_key: String,
20    secret_key: String,
21    key_prefix: Option<String>,
22}
23
24impl S3CloudHome {
25    pub async fn new(
26        bucket: String,
27        region: String,
28        endpoint: Option<String>,
29        access_key: String,
30        secret_key: String,
31        key_prefix: Option<String>,
32    ) -> Result<Self, CloudHomeError> {
33        let credentials = Credentials::new(&access_key, &secret_key, None, None, "bae-cloud-home");
34
35        // aws-config has default-features disabled, so the SDK won't auto-bundle
36        // an HTTP client. Plug in the rustls-ring smithy client explicitly.
37        let http_client = aws_smithy_http_client::Builder::new()
38            .tls_provider(aws_smithy_http_client::tls::Provider::Rustls(
39                aws_smithy_http_client::tls::rustls_provider::CryptoMode::Ring,
40            ))
41            .build_https();
42
43        let mut builder = aws_config::defaults(BehaviorVersion::latest())
44            .region(Region::new(region.clone()))
45            .credentials_provider(credentials)
46            .http_client(http_client);
47
48        if let Some(ref ep) = endpoint {
49            builder = builder.endpoint_url(ep.trim_end_matches('/'));
50        }
51
52        let aws_config = builder.load().await;
53        let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
54            .force_path_style(true)
55            .build();
56        let client = Client::from_conf(s3_config);
57
58        Ok(S3CloudHome {
59            client,
60            bucket,
61            region,
62            endpoint,
63            access_key,
64            secret_key,
65            key_prefix,
66        })
67    }
68
69    /// Prepend the key prefix (if configured) to produce the full S3 object key.
70    fn full_key(&self, key: &str) -> String {
71        apply_prefix(self.key_prefix.as_deref(), key)
72    }
73}
74
75/// Prepend an optional prefix to a key. Trailing slashes on the prefix are normalized.
76fn apply_prefix(prefix: Option<&str>, key: &str) -> String {
77    match prefix {
78        Some(p) => format!("{}/{}", p.trim_end_matches('/'), key),
79        None => key.to_string(),
80    }
81}
82
83#[async_trait]
84impl CloudHome for S3CloudHome {
85    /// HeadBucket — cheap auth + existence check, no listing cost.
86    async fn probe(&self) -> Result<(), CloudHomeError> {
87        use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
88        use aws_sdk_s3::operation::head_bucket::HeadBucketError;
89
90        match self.client.head_bucket().bucket(&self.bucket).send().await {
91            Ok(_) => Ok(()),
92            Err(SdkError::ServiceError(svc)) => {
93                let status = svc.raw().status().as_u16();
94                let code: Option<String> = svc.err().code().map(str::to_string);
95                let bucket = self.bucket.clone();
96                match svc.into_err() {
97                    HeadBucketError::NotFound(_) => Err(CloudHomeError::Storage(format!(
98                        "bucket {bucket:?} does not exist"
99                    ))),
100                    other => {
101                        let is_auth = status == 403
102                            || matches!(
103                                code.as_deref(),
104                                Some("SignatureDoesNotMatch") | Some("InvalidAccessKeyId")
105                            );
106                        if is_auth {
107                            Err(CloudHomeError::Storage(format!(
108                                "S3 credentials rejected (status {status}, code {code:?})"
109                            )))
110                        } else {
111                            Err(CloudHomeError::Storage(format!(
112                                "S3 probe failed (status {status}, code {code:?}): {other}"
113                            )))
114                        }
115                    }
116                }
117            }
118            Err(e) => Err(CloudHomeError::Storage(format!("S3 probe failed: {e}"))),
119        }
120    }
121
122    async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
123        let full = self.full_key(key);
124        self.client
125            .put_object()
126            .bucket(&self.bucket)
127            .key(&full)
128            .body(data.into())
129            .send()
130            .await
131            .map_err(|e| CloudHomeError::Storage(format!("put {key}: {e}")))?;
132        Ok(())
133    }
134
135    async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
136        let full = self.full_key(key);
137        let resp = self
138            .client
139            .get_object()
140            .bucket(&self.bucket)
141            .key(&full)
142            .send()
143            .await
144            .map_err(|e| {
145                let msg = format!("{e}");
146                if msg.contains("NoSuchKey") || msg.contains("not found") || msg.contains("404") {
147                    CloudHomeError::NotFound(key.to_string())
148                } else {
149                    CloudHomeError::Storage(format!("get {key}: {e}"))
150                }
151            })?;
152
153        let bytes = resp
154            .body
155            .collect()
156            .await
157            .map_err(|e| CloudHomeError::Storage(format!("read body for {key}: {e}")))?
158            .into_bytes()
159            .to_vec();
160
161        Ok(bytes)
162    }
163
164    async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
165        let full = self.full_key(key);
166        let range = format!("bytes={start}-{}", end.saturating_sub(1));
167        let resp = self
168            .client
169            .get_object()
170            .bucket(&self.bucket)
171            .key(&full)
172            .range(range)
173            .send()
174            .await
175            .map_err(|e| {
176                let msg = format!("{e}");
177                if msg.contains("NoSuchKey") || msg.contains("not found") || msg.contains("404") {
178                    CloudHomeError::NotFound(key.to_string())
179                } else {
180                    CloudHomeError::Storage(format!("get range {key}: {e}"))
181                }
182            })?;
183
184        let bytes = resp
185            .body
186            .collect()
187            .await
188            .map_err(|e| CloudHomeError::Storage(format!("read range body for {key}: {e}")))?
189            .into_bytes()
190            .to_vec();
191
192        Ok(bytes)
193    }
194
195    async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
196        let full_prefix = self.full_key(prefix);
197        let strip_prefix = self
198            .key_prefix
199            .as_ref()
200            .map(|p| format!("{}/", p.trim_end_matches('/')));
201
202        let mut keys = Vec::new();
203        let mut continuation_token: Option<String> = None;
204
205        loop {
206            let mut req = self
207                .client
208                .list_objects_v2()
209                .bucket(&self.bucket)
210                .prefix(&full_prefix);
211
212            if let Some(token) = continuation_token.take() {
213                req = req.continuation_token(token);
214            }
215
216            let resp = req
217                .send()
218                .await
219                .map_err(|e| CloudHomeError::Storage(format!("list {prefix}: {e}")))?;
220
221            for obj in resp.contents() {
222                if let Some(key) = obj.key() {
223                    let stripped = match &strip_prefix {
224                        Some(p) => key.strip_prefix(p.as_str()).unwrap_or(key),
225                        None => key,
226                    };
227                    keys.push(stripped.to_string());
228                }
229            }
230
231            if resp.is_truncated() == Some(true) {
232                continuation_token = resp.next_continuation_token().map(|s| s.to_string());
233            } else {
234                break;
235            }
236        }
237
238        Ok(keys)
239    }
240
241    async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
242        let full = self.full_key(key);
243        self.client
244            .delete_object()
245            .bucket(&self.bucket)
246            .key(&full)
247            .send()
248            .await
249            .map_err(|e| CloudHomeError::Storage(format!("delete {key}: {e}")))?;
250        Ok(())
251    }
252
253    async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
254        let full = self.full_key(key);
255        match self
256            .client
257            .head_object()
258            .bucket(&self.bucket)
259            .key(&full)
260            .send()
261            .await
262        {
263            Ok(_) => Ok(true),
264            Err(e) => {
265                let msg = format!("{e}");
266                if msg.contains("NotFound")
267                    || msg.contains("not found")
268                    || msg.contains("404")
269                    || msg.contains("NoSuchKey")
270                {
271                    Ok(false)
272                } else {
273                    Err(CloudHomeError::Storage(format!("head {key}: {e}")))
274                }
275            }
276        }
277    }
278
279    async fn grant_access(&self, _member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
280        // S3 access is managed externally (IAM/pre-shared credentials).
281        // Return the owner's credentials so they can be embedded in the invite code.
282        Ok(CloudHomeJoinInfo::S3 {
283            bucket: self.bucket.clone(),
284            region: self.region.clone(),
285            endpoint: self.endpoint.clone(),
286            access_key: self.access_key.clone(),
287            secret_key: self.secret_key.clone(),
288            key_prefix: self.key_prefix.clone(),
289        })
290    }
291
292    async fn revoke_access(&self, _member_id: &str) -> Result<(), CloudHomeError> {
293        // S3 access is managed externally (IAM/pre-shared credentials).
294        Ok(())
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn full_key_prepends_prefix() {
304        let key = apply_prefix(Some("libs/abc"), "heads/dev1.json");
305        assert_eq!(key, "libs/abc/heads/dev1.json");
306    }
307
308    #[test]
309    fn full_key_no_prefix() {
310        let key = apply_prefix(None, "heads/dev1.json");
311        assert_eq!(key, "heads/dev1.json");
312    }
313
314    #[test]
315    fn full_key_strips_trailing_slash() {
316        let key = apply_prefix(Some("libs/abc/"), "heads/dev1.json");
317        assert_eq!(key, "libs/abc/heads/dev1.json");
318    }
319
320    // ── probe() against a real S3 endpoint ──────────────────────────────
321    //
322    // These tests require a minio (or any S3-compatible server) reachable at
323    // `BAE_TEST_S3_URL` (default http://localhost:19000) with credentials
324    // `BAE_TEST_S3_KEY` / `BAE_TEST_S3_SECRET` (default minioadmin / minioadmin).
325    // Marked `#[ignore]` so `cargo test` skips them; run with
326    // `cargo test -- --ignored` when an endpoint is available.
327
328    /// Read a test env var with a default fallback. `NotPresent` silently uses
329    /// the default (the intended path); `NotUnicode` panics so a misconfigured
330    /// env var fails loudly instead of silently substituting bytes-as-default.
331    fn test_env(name: &str, default: &str) -> String {
332        match std::env::var(name) {
333            Ok(s) => s,
334            Err(std::env::VarError::NotPresent) => default.to_string(),
335            Err(std::env::VarError::NotUnicode(raw)) => {
336                panic!("test env var {name} is non-utf8: {raw:?}");
337            }
338        }
339    }
340
341    struct TestCreds {
342        endpoint: String,
343        access_key: String,
344        secret_key: String,
345    }
346
347    fn test_creds() -> TestCreds {
348        TestCreds {
349            endpoint: test_env("BAE_TEST_S3_URL", "http://localhost:19000"),
350            access_key: test_env("BAE_TEST_S3_KEY", "baetest"),
351            secret_key: test_env("BAE_TEST_S3_SECRET", "baetestpass"),
352        }
353    }
354
355    /// Provision the bucket configured on `home`.
356    async fn provision_test_bucket(home: &S3CloudHome) {
357        home.client
358            .create_bucket()
359            .bucket(&home.bucket)
360            .send()
361            .await
362            .expect("create test bucket");
363    }
364
365    #[tokio::test]
366    #[ignore]
367    async fn probe_succeeds_against_existing_bucket() {
368        let creds = test_creds();
369        let bucket = format!("bae-probe-ok-{}", uuid::Uuid::new_v4());
370        let home = S3CloudHome::new(
371            bucket,
372            "us-east-1".to_string(),
373            Some(creds.endpoint),
374            creds.access_key,
375            creds.secret_key,
376            None,
377        )
378        .await
379        .expect("construct S3CloudHome");
380        provision_test_bucket(&home).await;
381        home.probe().await.expect("probe should succeed");
382    }
383
384    #[tokio::test]
385    #[ignore]
386    async fn probe_fails_for_missing_bucket() {
387        let creds = test_creds();
388        let bucket = format!("bae-probe-missing-{}", uuid::Uuid::new_v4());
389        let home = S3CloudHome::new(
390            bucket.clone(),
391            "us-east-1".to_string(),
392            Some(creds.endpoint),
393            creds.access_key,
394            creds.secret_key,
395            None,
396        )
397        .await
398        .expect("construct S3CloudHome");
399        // Deliberately do NOT create the bucket.
400        let err = home
401            .probe()
402            .await
403            .expect_err("probe should fail for a missing bucket");
404        let msg = format!("{err}");
405        assert!(
406            msg.contains("does not exist") || msg.contains("NoSuchBucket") || msg.contains("404"),
407            "expected missing-bucket error, got: {msg}",
408        );
409    }
410
411    #[tokio::test]
412    #[ignore]
413    async fn probe_fails_for_bad_secret_key() {
414        let creds = test_creds();
415        let bucket = format!("bae-probe-badkey-{}", uuid::Uuid::new_v4());
416        // Provision the bucket with the good creds so the only difference is the bad secret.
417        let good = S3CloudHome::new(
418            bucket.clone(),
419            "us-east-1".to_string(),
420            Some(creds.endpoint.clone()),
421            creds.access_key.clone(),
422            creds.secret_key,
423            None,
424        )
425        .await
426        .expect("construct good S3CloudHome");
427        provision_test_bucket(&good).await;
428
429        let bad = S3CloudHome::new(
430            bucket,
431            "us-east-1".to_string(),
432            Some(creds.endpoint),
433            creds.access_key,
434            "wrong-secret".to_string(),
435            None,
436        )
437        .await
438        .expect("construct bad S3CloudHome");
439        let err = bad
440            .probe()
441            .await
442            .expect_err("probe should fail for bad credentials");
443        let msg = format!("{err}");
444        assert!(
445            msg.contains("rejected")
446                || msg.contains("403")
447                || msg.contains("SignatureDoesNotMatch"),
448            "expected credentials error, got: {msg}",
449        );
450    }
451}