sheave_core/writers/
message_header.rs

1use std::{
2    future::Future,
3    io::Result as IOResult,
4    pin::Pin,
5    task::{
6        Context as FutureContext,
7        Poll
8    },
9    time::Duration
10};
11use futures::ready;
12use tokio::io::AsyncWrite;
13use crate::{
14    U24_MAX,
15    messages::headers::{
16        MessageHeader,
17        New,
18        SameSource,
19        TimerChange,
20        MessageType
21    }
22};
23
24#[doc(hidden)]
25#[derive(Debug)]
26pub struct MessageHeaderWriter<'a, W: AsyncWrite> {
27    writer: Pin<&'a mut W>,
28    message_header: &'a MessageHeader
29}
30
31#[doc(hidden)]
32impl<W: AsyncWrite> MessageHeaderWriter<'_, W> {
33    fn write_timestamp(&mut self, cx: &mut FutureContext<'_>, timestamp: Duration) -> Poll<IOResult<()>> {
34        assert!(timestamp.as_millis() <= U24_MAX as u128);
35        self.writer.as_mut().poll_write(cx, &(timestamp.as_millis() as u32).to_be_bytes()[1..]).map_ok(|_| ())
36    }
37
38    fn write_message_length(&mut self, cx: &mut FutureContext<'_>, message_length: u32) -> Poll<IOResult<()>> {
39        assert!(message_length <= U24_MAX);
40        self.writer.as_mut().poll_write(cx, &message_length.to_be_bytes()[1..]).map_ok(|_| ())
41    }
42
43    fn write_message_type(&mut self, cx: &mut FutureContext<'_>, message_type: MessageType) -> Poll<IOResult<()>> {
44        self.writer.as_mut().poll_write(cx, &u8::from(message_type).to_be_bytes()).map_ok(|_| ())
45    }
46
47    fn write_message_id(&mut self, cx: &mut FutureContext<'_>, message_id: u32) -> Poll<IOResult<()>> {
48        self.writer.as_mut().poll_write(cx, &message_id.to_le_bytes()).map_ok(|_| ())
49    }
50
51    fn write_new(&mut self, cx: &mut FutureContext<'_>, new: &New) -> Poll<IOResult<()>> {
52        let (timestamp, message_length, message_type, message_id) = (*new).into();
53        ready!(self.write_timestamp(cx, timestamp))?;
54        ready!(self.write_message_length(cx, message_length))?;
55        ready!(self.write_message_type(cx, message_type))?;
56        ready!(self.write_message_id(cx, message_id))?;
57        Poll::Ready(Ok(()))
58    }
59
60    fn write_same_source(&mut self, cx: &mut FutureContext<'_>, same_source: &SameSource) -> Poll<IOResult<()>> {
61        let (timestamp, message_length, message_type) = (*same_source).into();
62        ready!(self.write_timestamp(cx, timestamp))?;
63        ready!(self.write_message_length(cx, message_length))?;
64        ready!(self.write_message_type(cx, message_type))?;
65        Poll::Ready(Ok(()))
66    }
67
68    fn write_timer_change(&mut self, cx: &mut FutureContext<'_>, timer_change: &TimerChange) -> Poll<IOResult<()>> {
69        ready!(self.write_timestamp(cx, (*timer_change).into()))?;
70        Poll::Ready(Ok(()))
71    }
72}
73
74#[doc(hidden)]
75impl<W: AsyncWrite> Future for MessageHeaderWriter<'_, W> {
76    type Output = IOResult<()>;
77
78    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
79        match self.message_header {
80            &MessageHeader::New(ref new) => self.write_new(cx, new),
81            &MessageHeader::SameSource(ref same_source) => self.write_same_source(cx, same_source),
82            &MessageHeader::TimerChange(ref timer_change) => self.write_timer_change(cx, timer_change),
83            _ => Poll::Ready(Ok(()))
84        }
85    }
86}
87
88/// Writes a message header into streams.
89///
90/// # Panics
91///
92/// In the specification, timestamps and message lengths are defined as 3 bytes, therefore any value above `0x00ffffff` is emitted an assertion error.
93///
94/// # Examples
95///
96/// ```rust
97/// use std::{
98///     cmp::min,
99///     io::Result as IOResult,
100///     pin::{
101///         Pin,
102///         pin
103///     },
104///     time::Duration
105/// };
106/// use rand::random;
107/// use sheave_core::{
108///     messages::headers::{
109///         MessageHeader,
110///         MessageType
111///     },
112///     writers::write_message_header
113/// };
114///
115/// #[tokio::main]
116/// async fn main() -> IOResult<()> {
117///     // In case of 11 bytes.
118///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
119///     let timestamp = Duration::from_millis(min(0x00ffffff, random::<u32>()) as u64);
120///     let message_length = min(0x00ffffff, random::<u32>());
121///     let message_type: MessageType = random::<u8>().into();
122///     let message_id = random::<u32>();
123///     let message_header = MessageHeader::New((timestamp, message_length, message_type, message_id).into());
124///     write_message_header(writer.as_mut(), &message_header).await?;
125///     let mut written: [u8; 4] = [0; 4];
126///     written[1..].copy_from_slice(&writer[..3]);
127///     let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
128///     assert_eq!(timestamp, message_header.get_timestamp().unwrap());
129///     let mut written: [u8; 4] = [0; 4];
130///     written[1..].copy_from_slice(&writer[3..6]);
131///     let message_length = u32::from_be_bytes(written);
132///     assert_eq!(message_length, message_header.get_message_length().unwrap());
133///     let message_type: MessageType = writer[6].into();
134///     assert_eq!(message_type, message_header.get_message_type().unwrap());
135///     let mut written: [u8; 4] = [0; 4];
136///     written.copy_from_slice(&writer[7..]);
137///     let message_id = u32::from_le_bytes(written);
138///     assert_eq!(message_id, message_header.get_message_id().unwrap());
139///
140///     // In case of 7 bytes.
141///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
142///     let timestamp = Duration::from_millis(min(0x00ffffff, random::<u32>()) as u64);
143///     let message_length = min(0x00ffffff, random::<u32>());
144///     let message_type: MessageType = random::<u8>().into();
145///     let message_header = MessageHeader::SameSource((timestamp, message_length, message_type).into());
146///     write_message_header(writer.as_mut(), &message_header).await?;
147///     let mut written: [u8; 4] = [0; 4];
148///     written[1..].copy_from_slice(&writer[..3]);
149///     let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
150///     assert_eq!(timestamp, message_header.get_timestamp().unwrap());
151///     let mut written: [u8; 4] = [0; 4];
152///     written[1..].copy_from_slice(&writer[3..6]);
153///     let message_length = u32::from_be_bytes(written);
154///     assert_eq!(message_length, message_header.get_message_length().unwrap());
155///     let message_type: MessageType = writer[6].into();
156///     assert_eq!(message_type, message_header.get_message_type().unwrap());
157///
158///     // In case of 3 bytes.
159///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
160///     let timestamp = Duration::from_millis(min(0x00ffffff, random::<u32>()) as u64);
161///     let message_header = MessageHeader::TimerChange(timestamp.into());
162///     write_message_header(writer.as_mut(), &message_header).await?;
163///     let mut written: [u8; 4] = [0; 4];
164///     written[1..].copy_from_slice(&writer[..3]);
165///     let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
166///     assert_eq!(timestamp, message_header.get_timestamp().unwrap());
167///
168///     // In case of 0 bytes. (Continue)
169///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
170///     let message_header = MessageHeader::Continue;
171///     write_message_header(writer.as_mut(), &message_header).await?;
172///     assert!(writer.is_empty());
173///
174///     Ok(())
175/// }
176/// ```
177pub fn write_message_header<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, message_header: &'a MessageHeader) -> MessageHeaderWriter<'a, W> {
178    MessageHeaderWriter { writer, message_header }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::{
184        cmp::min,
185        pin::pin
186    };
187    use rand::random;
188    use super::*;
189
190    #[tokio::test]
191    async fn write_new() {
192        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
193        let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
194        let message_length = min(U24_MAX, random::<u32>());
195        let message_type = random::<u8>();
196        let message_id = random::<u32>();
197        let message_header = MessageHeader::New((timestamp, message_length, message_type.into(), message_id).into());
198        let result = write_message_header(writer.as_mut(), &message_header).await;
199        assert!(result.is_ok());
200        let mut written: [u8; 4] = [0; 4];
201        written[1..].copy_from_slice(&writer[..3]);
202        let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
203        assert_eq!(timestamp, message_header.get_timestamp().unwrap());
204        let mut written: [u8; 4] = [0; 4];
205        written[1..].copy_from_slice(&writer[3..6]);
206        let message_length = u32::from_be_bytes(written);
207        assert_eq!(message_length, message_header.get_message_length().unwrap());
208        let message_type = writer[6];
209        assert_eq!(MessageType::from(message_type), message_header.get_message_type().unwrap());
210        let mut written: [u8; 4] = [0; 4];
211        written.copy_from_slice(&writer[7..]);
212        let message_id = u32::from_le_bytes(written);
213        assert_eq!(message_id, message_header.get_message_id().unwrap())
214    }
215
216    #[tokio::test]
217    async fn write_same_source() {
218        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
219        let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
220        let message_length = min(U24_MAX, random::<u32>());
221        let message_type = random::<u8>();
222        let message_header = MessageHeader::SameSource((timestamp, message_length, message_type.into()).into());
223        let result = write_message_header(writer.as_mut(), &message_header).await;
224        assert!(result.is_ok());
225        let mut written: [u8; 4] = [0; 4];
226        written[1..].copy_from_slice(&writer[..3]);
227        let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
228        assert_eq!(timestamp, message_header.get_timestamp().unwrap());
229        let mut written: [u8; 4] = [0; 4];
230        written[1..].copy_from_slice(&writer[3..6]);
231        let message_length = u32::from_be_bytes(written);
232        assert_eq!(message_length, message_header.get_message_length().unwrap());
233        let message_type = writer[6];
234        assert_eq!(MessageType::from(message_type), message_header.get_message_type().unwrap())
235    }
236
237    #[tokio::test]
238    async fn write_timer_change() {
239        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
240        let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
241        let message_header = MessageHeader::TimerChange(timestamp.into());
242        let result = write_message_header(writer.as_mut(), &message_header).await;
243        assert!(result.is_ok());
244        let mut written: [u8; 4] = [0; 4];
245        written[1..].copy_from_slice(&writer[..3]);
246        let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
247        assert_eq!(timestamp, message_header.get_timestamp().unwrap())
248    }
249
250    #[tokio::test]
251    async fn write_continue() {
252        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
253        let message_header = MessageHeader::Continue;
254        write_message_header(writer.as_mut(), &message_header).await.unwrap();
255        assert!(writer.is_empty())
256    }
257}