sheave_core/readers/
extended_timestamp.rs1use std::{
2 future::Future,
3 io::Result as IOResult,
4 pin::Pin,
5 task::{
6 Context as FutureContext,
7 Poll
8 },
9 time::Duration
10};
11use futures::ready;
12use tokio::io::{
13 AsyncRead,
14 ReadBuf
15};
16
17#[doc(hidden)]
18#[derive(Debug)]
19pub struct ExtendedTimestampReader<'a, R: AsyncRead> {
20 reader: Pin<&'a mut R>
21}
22
23#[doc(hidden)]
24impl<R: AsyncRead> Future for ExtendedTimestampReader<'_, R> {
25 type Output = IOResult<Duration>;
26
27 fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
28 let mut extended_timestamp_bytes: [u8; 4] = [0; 4];
29 let mut buf = ReadBuf::new(&mut extended_timestamp_bytes);
30 ready!(self.reader.as_mut().poll_read(cx, &mut buf))?;
31 Poll::Ready(Ok(Duration::from_millis(u32::from_be_bytes(extended_timestamp_bytes) as u64)))
32 }
33}
34
35pub fn read_extended_timestamp<R: AsyncRead>(reader: Pin<&mut R>) -> ExtendedTimestampReader<'_, R> {
58 ExtendedTimestampReader { reader }
59}
60
61#[cfg(test)]
62mod tests {
63 use std::pin::pin;
64 use rand::random;
65 use super::*;
66
67 #[tokio::test]
68 async fn read_extended_ts() {
69 let mut reader: [u8; 4] = [0; 4];
70 let extended_timestamp = random::<u32>();
71 reader.copy_from_slice(&extended_timestamp.to_be_bytes());
72 let result = read_extended_timestamp(pin!(reader.as_slice())).await;
73 assert!(result.is_ok());
74 assert_eq!(Duration::from_millis(extended_timestamp as u64), result.unwrap())
75 }
76}