sheave_core/writers/
message_header.rs1use std::{
2 future::Future,
3 io::Result as IOResult,
4 pin::Pin,
5 task::{
6 Context as FutureContext,
7 Poll
8 },
9 time::Duration
10};
11use futures::ready;
12use tokio::io::AsyncWrite;
13use crate::{
14 U24_MAX,
15 messages::headers::{
16 MessageHeader,
17 New,
18 SameSource,
19 TimerChange,
20 MessageType
21 }
22};
23
24#[doc(hidden)]
25#[derive(Debug)]
26pub struct MessageHeaderWriter<'a, W: AsyncWrite> {
27 writer: Pin<&'a mut W>,
28 message_header: &'a MessageHeader
29}
30
31#[doc(hidden)]
32impl<W: AsyncWrite> MessageHeaderWriter<'_, W> {
33 fn write_timestamp(&mut self, cx: &mut FutureContext<'_>, timestamp: Duration) -> Poll<IOResult<()>> {
34 assert!(timestamp.as_millis() <= U24_MAX as u128);
35 self.writer.as_mut().poll_write(cx, &(timestamp.as_millis() as u32).to_be_bytes()[1..]).map_ok(|_| ())
36 }
37
38 fn write_message_length(&mut self, cx: &mut FutureContext<'_>, message_length: u32) -> Poll<IOResult<()>> {
39 assert!(message_length <= U24_MAX);
40 self.writer.as_mut().poll_write(cx, &message_length.to_be_bytes()[1..]).map_ok(|_| ())
41 }
42
43 fn write_message_type(&mut self, cx: &mut FutureContext<'_>, message_type: MessageType) -> Poll<IOResult<()>> {
44 self.writer.as_mut().poll_write(cx, &u8::from(message_type).to_be_bytes()).map_ok(|_| ())
45 }
46
47 fn write_message_id(&mut self, cx: &mut FutureContext<'_>, message_id: u32) -> Poll<IOResult<()>> {
48 self.writer.as_mut().poll_write(cx, &message_id.to_le_bytes()).map_ok(|_| ())
49 }
50
51 fn write_new(&mut self, cx: &mut FutureContext<'_>, new: &New) -> Poll<IOResult<()>> {
52 let (timestamp, message_length, message_type, message_id) = (*new).into();
53 ready!(self.write_timestamp(cx, timestamp))?;
54 ready!(self.write_message_length(cx, message_length))?;
55 ready!(self.write_message_type(cx, message_type))?;
56 ready!(self.write_message_id(cx, message_id))?;
57 Poll::Ready(Ok(()))
58 }
59
60 fn write_same_source(&mut self, cx: &mut FutureContext<'_>, same_source: &SameSource) -> Poll<IOResult<()>> {
61 let (timestamp, message_length, message_type) = (*same_source).into();
62 ready!(self.write_timestamp(cx, timestamp))?;
63 ready!(self.write_message_length(cx, message_length))?;
64 ready!(self.write_message_type(cx, message_type))?;
65 Poll::Ready(Ok(()))
66 }
67
68 fn write_timer_change(&mut self, cx: &mut FutureContext<'_>, timer_change: &TimerChange) -> Poll<IOResult<()>> {
69 ready!(self.write_timestamp(cx, (*timer_change).into()))?;
70 Poll::Ready(Ok(()))
71 }
72}
73
74#[doc(hidden)]
75impl<W: AsyncWrite> Future for MessageHeaderWriter<'_, W> {
76 type Output = IOResult<()>;
77
78 fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
79 match self.message_header {
80 &MessageHeader::New(ref new) => self.write_new(cx, new),
81 &MessageHeader::SameSource(ref same_source) => self.write_same_source(cx, same_source),
82 &MessageHeader::TimerChange(ref timer_change) => self.write_timer_change(cx, timer_change),
83 _ => Poll::Ready(Ok(()))
84 }
85 }
86}
87
88pub fn write_message_header<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, message_header: &'a MessageHeader) -> MessageHeaderWriter<'a, W> {
178 MessageHeaderWriter { writer, message_header }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::{
184 cmp::min,
185 pin::pin
186 };
187 use rand::random;
188 use super::*;
189
190 #[tokio::test]
191 async fn write_new() {
192 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
193 let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
194 let message_length = min(U24_MAX, random::<u32>());
195 let message_type = random::<u8>();
196 let message_id = random::<u32>();
197 let message_header = MessageHeader::New((timestamp, message_length, message_type.into(), message_id).into());
198 let result = write_message_header(writer.as_mut(), &message_header).await;
199 assert!(result.is_ok());
200 let mut written: [u8; 4] = [0; 4];
201 written[1..].copy_from_slice(&writer[..3]);
202 let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
203 assert_eq!(timestamp, message_header.get_timestamp().unwrap());
204 let mut written: [u8; 4] = [0; 4];
205 written[1..].copy_from_slice(&writer[3..6]);
206 let message_length = u32::from_be_bytes(written);
207 assert_eq!(message_length, message_header.get_message_length().unwrap());
208 let message_type = writer[6];
209 assert_eq!(MessageType::from(message_type), message_header.get_message_type().unwrap());
210 let mut written: [u8; 4] = [0; 4];
211 written.copy_from_slice(&writer[7..]);
212 let message_id = u32::from_le_bytes(written);
213 assert_eq!(message_id, message_header.get_message_id().unwrap())
214 }
215
216 #[tokio::test]
217 async fn write_same_source() {
218 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
219 let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
220 let message_length = min(U24_MAX, random::<u32>());
221 let message_type = random::<u8>();
222 let message_header = MessageHeader::SameSource((timestamp, message_length, message_type.into()).into());
223 let result = write_message_header(writer.as_mut(), &message_header).await;
224 assert!(result.is_ok());
225 let mut written: [u8; 4] = [0; 4];
226 written[1..].copy_from_slice(&writer[..3]);
227 let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
228 assert_eq!(timestamp, message_header.get_timestamp().unwrap());
229 let mut written: [u8; 4] = [0; 4];
230 written[1..].copy_from_slice(&writer[3..6]);
231 let message_length = u32::from_be_bytes(written);
232 assert_eq!(message_length, message_header.get_message_length().unwrap());
233 let message_type = writer[6];
234 assert_eq!(MessageType::from(message_type), message_header.get_message_type().unwrap())
235 }
236
237 #[tokio::test]
238 async fn write_timer_change() {
239 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
240 let timestamp = Duration::from_millis(min(U24_MAX, random::<u32>()) as u64);
241 let message_header = MessageHeader::TimerChange(timestamp.into());
242 let result = write_message_header(writer.as_mut(), &message_header).await;
243 assert!(result.is_ok());
244 let mut written: [u8; 4] = [0; 4];
245 written[1..].copy_from_slice(&writer[..3]);
246 let timestamp = Duration::from_millis(u32::from_be_bytes(written) as u64);
247 assert_eq!(timestamp, message_header.get_timestamp().unwrap())
248 }
249
250 #[tokio::test]
251 async fn write_continue() {
252 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
253 let message_header = MessageHeader::Continue;
254 write_message_header(writer.as_mut(), &message_header).await.unwrap();
255 assert!(writer.is_empty())
256 }
257}