sheave_core/writers/
chunk_data.rs

1use std::{
2    future::Future,
3    io::Result as IOResult,
4    pin::{
5        Pin,
6        pin
7    },
8    task::{
9        Context as FutureContext,
10        Poll
11    }
12};
13use futures::ready;
14use tokio::io::AsyncWrite;
15use crate::{
16    messages::{
17        ChunkSize,
18        headers::{
19            BasicHeader,
20            MessageFormat
21        }
22    },
23    writers::write_basic_header
24};
25
26#[doc(hidden)]
27#[derive(Debug)]
28pub struct ChunkDataWriter<'a, W: AsyncWrite> {
29    writer: Pin<&'a mut W>,
30    chunk_id: u16,
31    chunk_size: ChunkSize,
32    chunk_data: &'a [u8],
33}
34
35#[doc(hidden)]
36impl<W: AsyncWrite> Future for ChunkDataWriter<'_, W> {
37    type Output = IOResult<()>;
38
39    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
40        let chunk_size = self.chunk_size.get_chunk_size();
41        let mut chunks = self.chunk_data.chunks(chunk_size as usize);
42        while let Some(chunk) = chunks.next() {
43            ready!(self.writer.as_mut().poll_write(cx, chunk))?;
44
45            if chunks.size_hint().0 >= 1 {
46                let basic_header = BasicHeader::new(MessageFormat::Continue, self.chunk_id);
47                ready!(pin!(write_basic_header(self.writer.as_mut(), &basic_header)).poll(cx))?;
48            }
49        }
50
51        Poll::Ready(Ok(()))
52    }
53}
54
55/// Writes a chunk data into streams.
56///
57/// If a chunk data exceeds specified chunk size, continue headers is inserted between chunk data per chunk size.
58/// Note the message length doesn't count their headers.
59///
60/// # Examples
61///
62/// ```rust
63/// use std::{
64///     io::Result as IOResult,
65///     pin::{
66///         Pin,
67///         pin
68///     }
69/// };
70/// use rand::fill;
71/// use sheave_core::{
72///     messages::{
73///         ChunkSize,
74///         headers::MessageFormat
75///     },
76///     writers::write_chunk_data
77/// };
78///
79/// #[tokio::main]
80/// async fn main() -> IOResult<()> {
81///     // When it's just one chunk.
82///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
83///     let mut chunk_data: [u8; 128] = [0; 128];
84///     fill(&mut chunk_data);
85///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
86///     assert_eq!(128, writer.len());
87///
88///     // When it requires the one byte header.
89///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
90///     let mut chunk_data: [u8; 256] = [0; 256];
91///     fill(&mut chunk_data);
92///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
93///     assert_eq!(257, writer.len());
94///     let message_format: MessageFormat = (writer[128] >> 6).into();
95///     assert_eq!(MessageFormat::Continue, message_format);
96///     let chunk_id = writer[128] << 2 >> 2;
97///     assert_eq!(2, chunk_id);
98///
99///     // When it requires the two bytes header.
100///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
101///     let mut chunk_data: [u8; 256] = [0; 256];
102///     fill(&mut chunk_data);
103///     write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await?;
104///     assert_eq!(258, writer.len());
105///     let message_format: MessageFormat = (writer[128] >> 6).into();
106///     assert_eq!(MessageFormat::Continue, message_format);
107///     assert_eq!(0, writer[128] << 2 >> 2);
108///     let chunk_id = writer[129];
109///     assert_eq!(64, chunk_id + 64);
110///
111///     // When it requires the three bytes header.
112///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
113///     let mut chunk_data: [u8; 256] = [0; 256];
114///     fill(&mut chunk_data);
115///     write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await?;
116///     assert_eq!(259, writer.len());
117///     let message_format: MessageFormat = (writer[128] >> 6).into();
118///     assert_eq!(MessageFormat::Continue, message_format);
119///     assert_eq!(1, writer[128] << 2 >> 2);
120///     let mut chunk_id_bytes: [u8; 2] = [0; 2];
121///     chunk_id_bytes.copy_from_slice(&writer[129..131]);
122///     let chunk_id = u16::from_le_bytes(chunk_id_bytes);
123///     assert_eq!(320, chunk_id + 64);
124///
125///     Ok(())
126/// }
127/// ```
128pub fn write_chunk_data<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, chunk_id: u16, chunk_size: ChunkSize, chunk_data: &'a [u8]) -> ChunkDataWriter<'a, W> {
129    ChunkDataWriter { writer, chunk_id, chunk_size, chunk_data }
130}
131
132#[cfg(test)]
133mod tests {
134    use std::pin::pin;
135    use rand::fill;
136    use super::*;
137
138    #[tokio::test]
139    async fn write_one_chunk() {
140        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
141        let mut chunk_data: [u8; 128] = [0; 128];
142        fill(&mut chunk_data);
143        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
144        assert!(result.is_ok());
145        assert_eq!(128, writer.len())
146    }
147
148    #[tokio::test]
149    async fn write_with_one_byte_header() {
150        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
151        let mut chunk_data: [u8; 256] = [0; 256];
152        fill(&mut chunk_data);
153        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
154        assert!(result.is_ok());
155        assert_eq!(257, writer.len())
156    }
157
158    #[tokio::test]
159    async fn write_with_two_bytes_header() {
160        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
161        let mut chunk_data: [u8; 256] = [0; 256];
162        fill(&mut chunk_data);
163        let result = write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await;
164        assert!(result.is_ok());
165        assert_eq!(258, writer.len())
166    }
167
168    #[tokio::test]
169    async fn write_with_three_bytes_header() {
170        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
171        let mut chunk_data: [u8; 256] = [0; 256];
172        fill(&mut chunk_data);
173        let result = write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await;
174        assert!(result.is_ok());
175        assert_eq!(259, writer.len());
176    }
177}