sheave_core/writers/chunk_data.rs
1use std::{
2 future::Future,
3 io::Result as IOResult,
4 pin::{
5 Pin,
6 pin
7 },
8 task::{
9 Context as FutureContext,
10 Poll
11 }
12};
13use futures::ready;
14use tokio::io::AsyncWrite;
15use crate::{
16 messages::{
17 ChunkSize,
18 headers::{
19 BasicHeader,
20 MessageFormat
21 }
22 },
23 writers::write_basic_header
24};
25
26#[doc(hidden)]
27#[derive(Debug)]
28pub struct ChunkDataWriter<'a, W: AsyncWrite> {
29 writer: Pin<&'a mut W>,
30 chunk_id: u16,
31 chunk_size: ChunkSize,
32 chunk_data: &'a [u8],
33}
34
35#[doc(hidden)]
36impl<W: AsyncWrite> Future for ChunkDataWriter<'_, W> {
37 type Output = IOResult<()>;
38
39 fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
40 let chunk_size = self.chunk_size.get_chunk_size();
41 let mut chunks = self.chunk_data.chunks(chunk_size as usize);
42 while let Some(chunk) = chunks.next() {
43 ready!(self.writer.as_mut().poll_write(cx, chunk))?;
44
45 if chunks.size_hint().0 >= 1 {
46 let basic_header = BasicHeader::new(MessageFormat::Continue, self.chunk_id);
47 ready!(pin!(write_basic_header(self.writer.as_mut(), &basic_header)).poll(cx))?;
48 }
49 }
50
51 Poll::Ready(Ok(()))
52 }
53}
54
55/// Writes a chunk data into streams.
56///
57/// If a chunk data exceeds specified chunk size, continue headers is inserted between chunk data per chunk size.
58/// Note the message length doesn't count their headers.
59///
60/// # Examples
61///
62/// ```rust
63/// use std::{
64/// io::Result as IOResult,
65/// pin::{
66/// Pin,
67/// pin
68/// }
69/// };
70/// use rand::fill;
71/// use sheave_core::{
72/// messages::{
73/// ChunkSize,
74/// headers::MessageFormat
75/// },
76/// writers::write_chunk_data
77/// };
78///
79/// #[tokio::main]
80/// async fn main() -> IOResult<()> {
81/// // When it's just one chunk.
82/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
83/// let mut chunk_data: [u8; 128] = [0; 128];
84/// fill(&mut chunk_data);
85/// write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
86/// assert_eq!(128, writer.len());
87///
88/// // When it requires the one byte header.
89/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
90/// let mut chunk_data: [u8; 256] = [0; 256];
91/// fill(&mut chunk_data);
92/// write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
93/// assert_eq!(257, writer.len());
94/// let message_format: MessageFormat = (writer[128] >> 6).into();
95/// assert_eq!(MessageFormat::Continue, message_format);
96/// let chunk_id = writer[128] << 2 >> 2;
97/// assert_eq!(2, chunk_id);
98///
99/// // When it requires the two bytes header.
100/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
101/// let mut chunk_data: [u8; 256] = [0; 256];
102/// fill(&mut chunk_data);
103/// write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await?;
104/// assert_eq!(258, writer.len());
105/// let message_format: MessageFormat = (writer[128] >> 6).into();
106/// assert_eq!(MessageFormat::Continue, message_format);
107/// assert_eq!(0, writer[128] << 2 >> 2);
108/// let chunk_id = writer[129];
109/// assert_eq!(64, chunk_id + 64);
110///
111/// // When it requires the three bytes header.
112/// let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
113/// let mut chunk_data: [u8; 256] = [0; 256];
114/// fill(&mut chunk_data);
115/// write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await?;
116/// assert_eq!(259, writer.len());
117/// let message_format: MessageFormat = (writer[128] >> 6).into();
118/// assert_eq!(MessageFormat::Continue, message_format);
119/// assert_eq!(1, writer[128] << 2 >> 2);
120/// let mut chunk_id_bytes: [u8; 2] = [0; 2];
121/// chunk_id_bytes.copy_from_slice(&writer[129..131]);
122/// let chunk_id = u16::from_le_bytes(chunk_id_bytes);
123/// assert_eq!(320, chunk_id + 64);
124///
125/// Ok(())
126/// }
127/// ```
128pub fn write_chunk_data<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, chunk_id: u16, chunk_size: ChunkSize, chunk_data: &'a [u8]) -> ChunkDataWriter<'a, W> {
129 ChunkDataWriter { writer, chunk_id, chunk_size, chunk_data }
130}
131
132#[cfg(test)]
133mod tests {
134 use std::pin::pin;
135 use rand::fill;
136 use super::*;
137
138 #[tokio::test]
139 async fn write_one_chunk() {
140 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
141 let mut chunk_data: [u8; 128] = [0; 128];
142 fill(&mut chunk_data);
143 let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
144 assert!(result.is_ok());
145 assert_eq!(128, writer.len())
146 }
147
148 #[tokio::test]
149 async fn write_with_one_byte_header() {
150 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
151 let mut chunk_data: [u8; 256] = [0; 256];
152 fill(&mut chunk_data);
153 let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
154 assert!(result.is_ok());
155 assert_eq!(257, writer.len())
156 }
157
158 #[tokio::test]
159 async fn write_with_two_bytes_header() {
160 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
161 let mut chunk_data: [u8; 256] = [0; 256];
162 fill(&mut chunk_data);
163 let result = write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await;
164 assert!(result.is_ok());
165 assert_eq!(258, writer.len())
166 }
167
168 #[tokio::test]
169 async fn write_with_three_bytes_header() {
170 let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
171 let mut chunk_data: [u8; 256] = [0; 256];
172 fill(&mut chunk_data);
173 let result = write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await;
174 assert!(result.is_ok());
175 assert_eq!(259, writer.len());
176 }
177}