1use std::sync::Mutex;
9use std::time::{SystemTime, UNIX_EPOCH};
10
11const MAX_CLOCK_DRIFT_MS: u64 = 24 * 60 * 60 * 1000;
15
16#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
18pub struct Timestamp {
19 pub millis: u64,
20 pub counter: u16,
21 pub device_id: String,
22}
23
24impl Timestamp {
25 pub fn new(millis: u64, counter: u16, device_id: String) -> Self {
26 Self {
27 millis,
28 counter,
29 device_id,
30 }
31 }
32
33 pub fn parse(s: &str) -> Option<Self> {
35 let mut parts = s.splitn(3, '-');
36 let millis = parts.next()?.parse::<u64>().ok()?;
37 let counter = parts.next()?.parse::<u16>().ok()?;
38 let device_id = parts.next()?;
39 if device_id.is_empty() {
40 return None;
41 }
42 Some(Self {
43 millis,
44 counter,
45 device_id: device_id.to_string(),
46 })
47 }
48}
49
50impl std::fmt::Display for Timestamp {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 write!(
53 f,
54 "{:013}-{:04}-{}",
55 self.millis, self.counter, self.device_id
56 )
57 }
58}
59
60struct HlcState {
61 millis: u64,
62 counter: u16,
63}
64
65pub struct Hlc {
70 device_id: String,
71 state: Mutex<HlcState>,
72 wall_clock: Box<dyn Fn() -> u64 + Send + Sync>,
74}
75
76impl Hlc {
77 pub fn new(device_id: String) -> Self {
79 Self {
80 device_id,
81 state: Mutex::new(HlcState {
82 millis: 0,
83 counter: 0,
84 }),
85 wall_clock: Box::new(wall_clock_ms),
86 }
87 }
88
89 pub fn now(&self) -> Timestamp {
92 let wall = (self.wall_clock)();
93 let mut state = self.state.lock().unwrap();
94
95 if wall > state.millis {
96 state.millis = wall;
97 state.counter = 0;
98 } else {
99 state.counter += 1;
100 }
101
102 Timestamp::new(state.millis, state.counter, self.device_id.clone())
103 }
104
105 pub fn update(&self, remote: &Timestamp) -> Timestamp {
112 let wall = (self.wall_clock)();
113 let mut state = self.state.lock().unwrap();
114
115 let remote_millis = if remote.millis > wall + MAX_CLOCK_DRIFT_MS {
116 wall
119 } else {
120 remote.millis
121 };
122
123 if wall > state.millis && wall > remote_millis {
124 state.millis = wall;
126 state.counter = 0;
127 } else if remote_millis > state.millis {
128 state.millis = remote_millis;
130 state.counter = remote.counter + 1;
131 } else if state.millis > remote_millis {
132 state.counter += 1;
134 } else {
135 state.counter = state.counter.max(remote.counter) + 1;
137 }
138
139 Timestamp::new(state.millis, state.counter, self.device_id.clone())
140 }
141
142 #[cfg(test)]
143 fn with_wall_clock(device_id: String, clock: impl Fn() -> u64 + Send + Sync + 'static) -> Self {
144 Self {
145 device_id,
146 state: Mutex::new(HlcState {
147 millis: 0,
148 counter: 0,
149 }),
150 wall_clock: Box::new(clock),
151 }
152 }
153}
154
155fn wall_clock_ms() -> u64 {
156 SystemTime::now()
157 .duration_since(UNIX_EPOCH)
158 .expect("system clock before UNIX epoch")
159 .as_millis() as u64
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use std::sync::atomic::{AtomicU64, Ordering};
166 use std::sync::Arc;
167
168 fn fixed_clock(ms: u64) -> impl Fn() -> u64 + Send + Sync + 'static {
169 move || ms
170 }
171
172 fn advancing_clock(start: u64) -> (Arc<AtomicU64>, impl Fn() -> u64 + Send + Sync + 'static) {
173 let time = Arc::new(AtomicU64::new(start));
174 let time_clone = time.clone();
175 (time, move || time_clone.load(Ordering::SeqCst))
176 }
177
178 #[test]
179 fn basic_monotonicity() {
180 let hlc = Hlc::new("dev-1".into());
181 let t1 = hlc.now();
182 let t2 = hlc.now();
183 let t3 = hlc.now();
184
185 assert!(t2 > t1, "t2={t2} should be > t1={t1}");
186 assert!(t3 > t2, "t3={t3} should be > t2={t2}");
187 }
188
189 #[test]
190 fn counter_increments_when_clock_stalls() {
191 let hlc = Hlc::with_wall_clock("dev-1".into(), fixed_clock(1000));
192
193 let t1 = hlc.now();
194 assert_eq!(t1.millis, 1000);
195 assert_eq!(t1.counter, 0);
196
197 let t2 = hlc.now();
198 assert_eq!(t2.millis, 1000);
199 assert_eq!(t2.counter, 1);
200
201 let t3 = hlc.now();
202 assert_eq!(t3.millis, 1000);
203 assert_eq!(t3.counter, 2);
204
205 assert!(t3 > t2);
206 assert!(t2 > t1);
207 }
208
209 #[test]
210 fn wall_clock_advance_resets_counter() {
211 let (time, clock) = advancing_clock(1000);
212 let hlc = Hlc::with_wall_clock("dev-1".into(), clock);
213
214 let t1 = hlc.now();
215 assert_eq!(t1.millis, 1000);
216 assert_eq!(t1.counter, 0);
217
218 let t2 = hlc.now();
220 assert_eq!(t2.counter, 1);
221
222 time.store(2000, Ordering::SeqCst);
224 let t3 = hlc.now();
225 assert_eq!(t3.millis, 2000);
226 assert_eq!(t3.counter, 0);
227
228 assert!(t3 > t2);
229 }
230
231 #[test]
232 fn merge_with_remote_ahead() {
233 let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
234
235 let remote = Timestamp::new(5000, 3, "dev-remote".into());
237 let t = hlc.update(&remote);
238
239 assert_eq!(t.millis, 5000);
240 assert_eq!(t.counter, 4); assert_eq!(t.device_id, "dev-local");
242
243 let t2 = hlc.now();
245 assert!(t2 > t, "t2={t2} should be > t={t}");
246 }
247
248 #[test]
249 fn merge_with_remote_behind() {
250 let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(5000));
251
252 hlc.now();
254
255 let remote = Timestamp::new(1000, 10, "dev-remote".into());
257 let t = hlc.update(&remote);
258
259 assert_eq!(t.millis, 5000);
261 assert_eq!(t.counter, 1); assert_eq!(t.device_id, "dev-local");
263 }
264
265 #[test]
266 fn merge_with_same_millis() {
267 let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(3000));
268
269 hlc.now();
271
272 let remote = Timestamp::new(3000, 5, "dev-remote".into());
274 let t = hlc.update(&remote);
275
276 assert_eq!(t.millis, 3000);
277 assert_eq!(t.counter, 6);
279 }
280
281 #[test]
282 fn clock_skew_guard_rejects_far_future() {
283 let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
284
285 let far_future = 1000 + MAX_CLOCK_DRIFT_MS + 1;
287 let remote = Timestamp::new(far_future, 0, "dev-remote".into());
288 let t = hlc.update(&remote);
289
290 assert_eq!(t.millis, 1000);
292 }
293
294 #[test]
295 fn clock_skew_guard_accepts_near_future() {
296 let hlc = Hlc::with_wall_clock("dev-local".into(), fixed_clock(1000));
297
298 let near_future = 1000 + 60 * 60 * 1000;
300 let remote = Timestamp::new(near_future, 0, "dev-remote".into());
301 let t = hlc.update(&remote);
302
303 assert_eq!(t.millis, near_future);
305 }
306
307 #[test]
308 fn string_roundtrip() {
309 let ts = Timestamp::new(1707580800000, 42, "dev-abc123".into());
310 let s = ts.to_string();
311 let parsed = Timestamp::parse(&s).expect("parse should succeed");
312
313 assert_eq!(parsed, ts);
314 assert_eq!(s, "1707580800000-0042-dev-abc123");
315 }
316
317 #[test]
318 fn string_format_is_zero_padded() {
319 let ts = Timestamp::new(1000, 0, "d".into());
320 assert_eq!(ts.to_string(), "0000000001000-0000-d");
321
322 let ts2 = Timestamp::new(9999999999999, 9999, "d".into());
323 assert_eq!(ts2.to_string(), "9999999999999-9999-d");
324 }
325
326 #[test]
327 fn lexicographic_ordering_matches_causal_ordering() {
328 let timestamps = [
329 Timestamp::new(1000, 0, "dev-a".into()),
330 Timestamp::new(1000, 1, "dev-a".into()),
331 Timestamp::new(1000, 1, "dev-b".into()),
332 Timestamp::new(2000, 0, "dev-a".into()),
333 Timestamp::new(2000, 0, "dev-b".into()),
334 ];
335
336 let strings: Vec<String> = timestamps.iter().map(|t| t.to_string()).collect();
337
338 for i in 1..strings.len() {
340 assert!(
341 strings[i] > strings[i - 1],
342 "Expected {:?} > {:?}",
343 strings[i],
344 strings[i - 1]
345 );
346 }
347 }
348
349 #[test]
350 fn device_id_breaks_ties() {
351 let ts_a = Timestamp::new(5000, 3, "aaa".into());
352 let ts_b = Timestamp::new(5000, 3, "bbb".into());
353
354 assert!(ts_b > ts_a);
356
357 assert!(ts_b.to_string() > ts_a.to_string());
359 }
360
361 #[test]
362 fn parse_rejects_invalid_input() {
363 assert!(Timestamp::parse("").is_none());
364 assert!(Timestamp::parse("not-a-timestamp").is_none());
365 assert!(Timestamp::parse("1000-0000").is_none()); assert!(Timestamp::parse("1000-0000-").is_none()); assert!(Timestamp::parse("abc-0000-dev").is_none()); assert!(Timestamp::parse("1000-xyz-dev").is_none()); }
370
371 #[test]
372 fn parse_handles_device_id_with_dashes() {
373 let ts = Timestamp::new(1000, 0, "550e8400-e29b-41d4-a716-446655440000".into());
376 let s = ts.to_string();
377 let parsed = Timestamp::parse(&s).expect("parse should handle UUID device_id");
378 assert_eq!(parsed.device_id, "550e8400-e29b-41d4-a716-446655440000");
379 assert_eq!(parsed, ts);
380 }
381}