sheave_core/readers/
extended_timestamp.rs

1use std::{
2    future::Future,
3    io::Result as IOResult,
4    pin::Pin,
5    task::{
6        Context as FutureContext,
7        Poll
8    },
9    time::Duration
10};
11use futures::ready;
12use tokio::io::{
13    AsyncRead,
14    ReadBuf
15};
16
17#[doc(hidden)]
18#[derive(Debug)]
19pub struct ExtendedTimestampReader<'a, R: AsyncRead> {
20    reader: Pin<&'a mut R>
21}
22
23#[doc(hidden)]
24impl<R: AsyncRead> Future for ExtendedTimestampReader<'_, R> {
25    type Output = IOResult<Duration>;
26
27    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
28        let mut extended_timestamp_bytes: [u8; 4] = [0; 4];
29        let mut buf = ReadBuf::new(&mut extended_timestamp_bytes);
30        ready!(self.reader.as_mut().poll_read(cx, &mut buf))?;
31        Poll::Ready(Ok(Duration::from_millis(u32::from_be_bytes(extended_timestamp_bytes) as u64)))
32    }
33}
34
35/// Reads an extended timestamp from streams.
36///
37/// # Examples
38///
39/// ```rust
40/// use std::{
41///     io::Result as IOResult,
42///     pin::pin,
43///     time::Duration
44/// };
45/// use rand::random;
46/// use sheave_core::readers::read_extended_timestamp;
47///
48/// #[tokio::main]
49/// async fn main() -> IOResult<()> {
50///     let extended_timestamp = Duration::from_millis(random::<u32>() as u64);
51///     let reader: [u8; 4] = (extended_timestamp.as_millis() as u32).to_be_bytes();
52///     let result = read_extended_timestamp(pin!(reader.as_slice())).await?;
53///     assert_eq!(extended_timestamp, result);
54///     Ok(())
55/// }
56/// ```
57pub fn read_extended_timestamp<R: AsyncRead>(reader: Pin<&mut R>) -> ExtendedTimestampReader<'_, R> {
58    ExtendedTimestampReader { reader }
59}
60
61#[cfg(test)]
62mod tests {
63    use std::pin::pin;
64    use rand::random;
65    use super::*;
66
67    #[tokio::test]
68    async fn read_extended_ts() {
69        let mut reader: [u8; 4] = [0; 4];
70        let extended_timestamp = random::<u32>();
71        reader.copy_from_slice(&extended_timestamp.to_be_bytes());
72        let result = read_extended_timestamp(pin!(reader.as_slice())).await;
73        assert!(result.is_ok());
74        assert_eq!(Duration::from_millis(extended_timestamp as u64), result.unwrap())
75    }
76}