sheave_core/writers/chunk_data.rs
1use std::{
2 future::Future,
3 io::Result as IOResult,
4 pin::{
5 Pin,
6 pin
7 },
8 task::{
9 Context as FutureContext,
10 Poll
11 }
12};
13use futures::ready;
14use tokio::io::AsyncWrite;
15use crate::{
16 messages::{
17 ChunkSize,
18 headers::{
19 BasicHeader,
20 MessageFormat
21 }
22 },
23 writers::write_basic_header
24};
25
26#[doc(hidden)]
27#[derive(Debug)]
28pub struct ChunkDataWriter<'a, W: AsyncWrite> {
29 writer: Pin<&'a mut W>,
30 chunk_id: u16,
31 chunk_size: ChunkSize,
32 chunk_data: &'a [u8],
33}
34
35#[doc(hidden)]
36impl<W: AsyncWrite> Future for ChunkDataWriter<'_, W> {
37 type Output = IOResult<()>;
38
39 fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
40 let chunk_size = self.chunk_size.get_chunk_size();
41 let mut chunks = self.chunk_data.chunks(chunk_size as usize);
42 while let Some(chunk) = chunks.next() {
43 ready!(self.writer.as_mut().poll_write(cx, chunk))?;
44
45 if chunks.size_hint().0 >= 1 {
46 let basic_header = BasicHeader::new(MessageFormat::Continue, self.chunk_id);
47 ready!(pin!(write_basic_header(self.writer.as_mut(), &basic_header)).poll(cx))?;
48 }
49 }
50
51 Poll::Ready(Ok(()))
52 }
53}
54
55/// Writes a chunk data into streams.
56///
57/// If a chunk data exceeds specified chunk size, continue headers is inserted between chunk data per chunk size.
58/// Note the message length doesn't count their headers.
59///
60/// # Examples
61///
62/// ```rust
63/// use std::{
64/// io::Result as IOResult,
65/// pin::{
66/// Pin,
67/// pin
68/// }
69/// };
70/// use rand::{
71/// Fill,
72/// thread_rng
73/// };
74/// use sheave_core::{
75/// messages::{
76/// ChunkSize,
77/// headers::MessageFormat
78/// },
79/// writers::write_chunk_data
80/// };
81///
82/// #[tokio::main]
83/// async fn main() -> IOResult<()> {
84/// // When it's just one chunk.
85/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
86/// let mut chunk_data: [u8; 128] = [0; 128];
87/// chunk_data.try_fill(&mut thread_rng()).unwrap();
88/// write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
89/// assert_eq!(128, writer.len());
90///
91/// // When it requires the one byte header.
92/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
93/// let mut chunk_data: [u8; 256] = [0; 256];
94/// chunk_data.try_fill(&mut thread_rng()).unwrap();
95/// write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
96/// assert_eq!(257, writer.len());
97/// let message_format: MessageFormat = (writer[128] >> 6).into();
98/// assert_eq!(MessageFormat::Continue, message_format);
99/// let chunk_id = writer[128] << 2 >> 2;
100/// assert_eq!(2, chunk_id);
101///
102/// // When it requires the two bytes header.
103/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
104/// let mut chunk_data: [u8; 256] = [0; 256];
105/// chunk_data.try_fill(&mut thread_rng()).unwrap();
106/// write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await?;
107/// assert_eq!(258, writer.len());
108/// let message_format: MessageFormat = (writer[128] >> 6).into();
109/// assert_eq!(MessageFormat::Continue, message_format);
110/// assert_eq!(0, writer[128] << 2 >> 2);
111/// let chunk_id = writer[129];
112/// assert_eq!(64, chunk_id + 64);
113///
114/// // When it requires the three bytes header.
115/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
116/// let mut chunk_data: [u8; 256] = [0; 256];
117/// chunk_data.try_fill(&mut thread_rng()).unwrap();
118/// write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await?;
119/// assert_eq!(259, writer.len());
120/// let message_format: MessageFormat = (writer[128] >> 6).into();
121/// assert_eq!(MessageFormat::Continue, message_format);
122/// assert_eq!(1, writer[128] << 2 >> 2);
123/// let mut chunk_id_bytes: [u8; 2] = [0; 2];
124/// chunk_id_bytes.copy_from_slice(&writer[129..131]);
125/// let chunk_id = u16::from_le_bytes(chunk_id_bytes);
126/// assert_eq!(320, chunk_id + 64);
127///
128/// Ok(())
129/// }
130/// ```
131pub fn write_chunk_data<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, chunk_id: u16, chunk_size: ChunkSize, chunk_data: &'a [u8]) -> ChunkDataWriter<'a, W> {
132 ChunkDataWriter { writer, chunk_id, chunk_size, chunk_data }
133}
134
135#[cfg(test)]
136mod tests {
137 use std::pin::pin;
138 use rand::{
139 Fill,
140 thread_rng
141 };
142 use super::*;
143
144 #[tokio::test]
145 async fn write_one_chunk() {
146 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
147 let mut chunk_data: [u8; 128] = [0; 128];
148 chunk_data.try_fill(&mut thread_rng()).unwrap();
149 let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
150 assert!(result.is_ok());
151 assert_eq!(128, writer.len())
152 }
153
154 #[tokio::test]
155 async fn write_with_one_byte_header() {
156 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
157 let mut chunk_data: [u8; 256] = [0; 256];
158 chunk_data.try_fill(&mut thread_rng()).unwrap();
159 let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
160 assert!(result.is_ok());
161 assert_eq!(257, writer.len())
162 }
163
164 #[tokio::test]
165 async fn write_with_two_bytes_header() {
166 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
167 let mut chunk_data: [u8; 256] = [0; 256];
168 chunk_data.try_fill(&mut thread_rng()).unwrap();
169 let result = write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await;
170 assert!(result.is_ok());
171 assert_eq!(258, writer.len())
172 }
173
174 #[tokio::test]
175 async fn write_with_three_bytes_header() {
176 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
177 let mut chunk_data: [u8; 256] = [0; 256];
178 chunk_data.try_fill(&mut thread_rng()).unwrap();
179 let result = write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await;
180 assert!(result.is_ok());
181 assert_eq!(259, writer.len());
182 }
183}