sheave_core/writers/
chunk_data.rs

1use std::{
2    future::Future,
3    io::Result as IOResult,
4    pin::{
5        Pin,
6        pin
7    },
8    task::{
9        Context as FutureContext,
10        Poll
11    }
12};
13use futures::ready;
14use tokio::io::AsyncWrite;
15use crate::{
16    messages::{
17        ChunkSize,
18        headers::{
19            BasicHeader,
20            MessageFormat
21        }
22    },
23    writers::write_basic_header
24};
25
26#[doc(hidden)]
27#[derive(Debug)]
28pub struct ChunkDataWriter<'a, W: AsyncWrite> {
29    writer: Pin<&'a mut W>,
30    chunk_id: u16,
31    chunk_size: ChunkSize,
32    chunk_data: &'a [u8],
33}
34
35#[doc(hidden)]
36impl<W: AsyncWrite> Future for ChunkDataWriter<'_, W> {
37    type Output = IOResult<()>;
38
39    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
40        let chunk_size = self.chunk_size.get_chunk_size();
41        let mut chunks = self.chunk_data.chunks(chunk_size as usize);
42        while let Some(chunk) = chunks.next() {
43            ready!(self.writer.as_mut().poll_write(cx, chunk))?;
44
45            if chunks.size_hint().0 >= 1 {
46                let basic_header = BasicHeader::new(MessageFormat::Continue, self.chunk_id);
47                ready!(pin!(write_basic_header(self.writer.as_mut(), &basic_header)).poll(cx))?;
48            }
49        }
50
51        Poll::Ready(Ok(()))
52    }
53}
54
55/// Writes a chunk data into streams.
56///
57/// If a chunk data exceeds specified chunk size, continue headers is inserted between chunk data per chunk size.
58/// Note the message length doesn't count their headers.
59///
60/// # Examples
61///
62/// ```rust
63/// use std::{
64///     io::Result as IOResult,
65///     pin::{
66///         Pin,
67///         pin
68///     }
69/// };
70/// use rand::{
71///     Fill,
72///     thread_rng
73/// };
74/// use sheave_core::{
75///     messages::{
76///         ChunkSize,
77///         headers::MessageFormat
78///     },
79///     writers::write_chunk_data
80/// };
81///
82/// #[tokio::main]
83/// async fn main() -> IOResult<()> {
84///     // When it's just one chunk.
85///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
86///     let mut chunk_data: [u8; 128] = [0; 128];
87///     chunk_data.try_fill(&mut thread_rng()).unwrap();
88///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
89///     assert_eq!(128, writer.len());
90///
91///     // When it requires the one byte header.
92///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
93///     let mut chunk_data: [u8; 256] = [0; 256];
94///     chunk_data.try_fill(&mut thread_rng()).unwrap();
95///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
96///     assert_eq!(257, writer.len());
97///     let message_format: MessageFormat = (writer[128] >> 6).into();
98///     assert_eq!(MessageFormat::Continue, message_format);
99///     let chunk_id = writer[128] << 2 >> 2;
100///     assert_eq!(2, chunk_id);
101///
102///     // When it requires the two bytes header.
103///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
104///     let mut chunk_data: [u8; 256] = [0; 256];
105///     chunk_data.try_fill(&mut thread_rng()).unwrap();
106///     write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await?;
107///     assert_eq!(258, writer.len());
108///     let message_format: MessageFormat = (writer[128] >> 6).into();
109///     assert_eq!(MessageFormat::Continue, message_format);
110///     assert_eq!(0, writer[128] << 2 >> 2);
111///     let chunk_id = writer[129];
112///     assert_eq!(64, chunk_id + 64);
113///
114///     // When it requires the three bytes header.
115///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
116///     let mut chunk_data: [u8; 256] = [0; 256];
117///     chunk_data.try_fill(&mut thread_rng()).unwrap();
118///     write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await?;
119///     assert_eq!(259, writer.len());
120///     let message_format: MessageFormat = (writer[128] >> 6).into();
121///     assert_eq!(MessageFormat::Continue, message_format);
122///     assert_eq!(1, writer[128] << 2 >> 2);
123///     let mut chunk_id_bytes: [u8; 2] = [0; 2];
124///     chunk_id_bytes.copy_from_slice(&writer[129..131]);
125///     let chunk_id = u16::from_le_bytes(chunk_id_bytes);
126///     assert_eq!(320, chunk_id + 64);
127///
128///     Ok(())
129/// }
130/// ```
131pub fn write_chunk_data<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, chunk_id: u16, chunk_size: ChunkSize, chunk_data: &'a [u8]) -> ChunkDataWriter<'a, W> {
132    ChunkDataWriter { writer, chunk_id, chunk_size, chunk_data }
133}
134
135#[cfg(test)]
136mod tests {
137    use std::pin::pin;
138    use rand::{
139        Fill,
140        thread_rng
141    };
142    use super::*;
143
144    #[tokio::test]
145    async fn write_one_chunk() {
146        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
147        let mut chunk_data: [u8; 128] = [0; 128];
148        chunk_data.try_fill(&mut thread_rng()).unwrap();
149        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
150        assert!(result.is_ok());
151        assert_eq!(128, writer.len())
152    }
153
154    #[tokio::test]
155    async fn write_with_one_byte_header() {
156        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
157        let mut chunk_data: [u8; 256] = [0; 256];
158        chunk_data.try_fill(&mut thread_rng()).unwrap();
159        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
160        assert!(result.is_ok());
161        assert_eq!(257, writer.len())
162    }
163
164    #[tokio::test]
165    async fn write_with_two_bytes_header() {
166        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
167        let mut chunk_data: [u8; 256] = [0; 256];
168        chunk_data.try_fill(&mut thread_rng()).unwrap();
169        let result = write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await;
170        assert!(result.is_ok());
171        assert_eq!(258, writer.len())
172    }
173
174    #[tokio::test]
175    async fn write_with_three_bytes_header() {
176        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
177        let mut chunk_data: [u8; 256] = [0; 256];
178        chunk_data.try_fill(&mut thread_rng()).unwrap();
179        let result = write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await;
180        assert!(result.is_ok());
181        assert_eq!(259, writer.len());
182    }
183}