sheave_core/readers/
chunk_data.rs

1use std::{
2    cmp::min,
3    future::Future,
4    io::Result as IOResult,
5    pin::{
6        Pin,
7        pin
8    },
9    task::{
10        Context as FutureContext,
11        Poll
12    }
13};
14use futures::ready;
15use tokio::io::{
16    AsyncRead,
17    ReadBuf
18};
19use crate::messages::ChunkSize;
20use super::read_basic_header;
21
22#[doc(hidden)]
23#[derive(Debug)]
24pub struct ChunkDataReader<'a, R: AsyncRead> {
25    reader: Pin<&'a mut R>,
26    chunk_size: ChunkSize,
27    message_length: u32
28}
29
30#[doc(hidden)]
31impl<R: AsyncRead> Future for ChunkDataReader<'_, R> {
32    type Output = IOResult<Vec<u8>>;
33
34    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
35        unsafe {
36            let mut chunk_data_bytes: Vec<u8> = Vec::new();
37            let mut remained = self.message_length;
38
39            loop {
40                let capacity = min(self.chunk_size.get_chunk_size(), remained);
41                let mut tmp_bytes: Vec<u8> = Vec::with_capacity(capacity as usize);
42                tmp_bytes.set_len(tmp_bytes.capacity());
43                let mut buf = ReadBuf::new(tmp_bytes.as_mut_slice());
44                ready!(self.reader.as_mut().poll_read(cx, &mut buf))?;
45                chunk_data_bytes.extend_from_slice(&tmp_bytes);
46
47                remained -= capacity;
48                if remained > 0 {
49                    ready!(pin!(read_basic_header(self.reader.as_mut())).poll(cx))?;
50                } else {
51                    return Poll::Ready(Ok(chunk_data_bytes))
52                }
53            }
54        }
55    }
56}
57
58/// Reads a chunk data from streams.
59///
60/// If a chunk data exceeds specified chunk size, to insert continue headers between chunk data per chunk size is required.
61/// Note the message length doesn't count their headers.
62///
63/// # Examples
64///
65/// ```rust
66/// use std::{
67///     io::Result as IOResult,
68///     pin::pin
69/// };
70/// use rand::fill;
71/// use sheave_core::{
72///     messages::{
73///         ChunkSize,
74///         headers::MessageFormat
75///     },
76///     readers::read_chunk_data
77/// };
78///
79/// #[tokio::main]
80/// async fn main() -> IOResult<()> {
81///     let chunk_size = ChunkSize::default();
82///
83///     // When it's just one chunk.
84///     let mut reader: [u8; 128] = [0; 128];
85///     fill(&mut reader);
86///     let result = read_chunk_data(pin!(reader.as_slice()), chunk_size, 128).await?;
87///     assert_eq!(128, result.len());
88///
89///     // When it has the one byte header.
90///     let mut reader: [u8; 257] = [0; 257];
91///     let mut part: [u8; 128] = [0; 128];
92///     fill(&mut part);
93///     reader[..128].copy_from_slice(&part);
94///     reader[128] = u8::from(MessageFormat::Continue) << 6 | 2;
95///     reader[129..].copy_from_slice(&part);
96///     let result = read_chunk_data(pin!(reader.as_slice()), chunk_size, 256).await?;
97///     assert_eq!(256, result.len());
98///
99///     // When it has the two bytes header.
100///     let mut reader: [u8; 258] = [0; 258];
101///     let mut part: [u8; 128] = [0; 128];
102///     fill(&mut part);
103///     reader[..128].copy_from_slice(&part);
104///     reader[128] = u8::from(MessageFormat::Continue) << 6;
105///     reader[129] = 2;
106///     reader[130..].copy_from_slice(&part);
107///     let result = read_chunk_data(pin!(reader.as_slice()), chunk_size, 256).await?;
108///     assert_eq!(256, result.len());
109///
110///     // When it has the three bytes header.
111///     let mut reader: [u8; 259] = [0; 259];
112///     let mut part: [u8; 128] = [0; 128];
113///     fill(&mut part);
114///     reader[..128].copy_from_slice(&part);
115///     reader[128] = u8::from(MessageFormat::Continue) << 6 | 1;
116///     reader[129..131].copy_from_slice(&2u16.to_le_bytes());
117///     reader[131..].copy_from_slice(&part);
118///     let result = read_chunk_data(pin!(reader.as_slice()), chunk_size, 256).await?;
119///     assert_eq!(256, result.len());
120///
121///     Ok(())
122/// }
123/// ```
124pub fn read_chunk_data<'a, R: AsyncRead>(reader: Pin<&'a mut R>, chunk_size: ChunkSize, message_length: u32) -> ChunkDataReader<'a, R> {
125    ChunkDataReader { reader, chunk_size, message_length }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::pin::pin;
131    use rand::fill;
132    use crate::messages::headers::MessageFormat;
133    use super::*;
134
135    #[tokio::test]
136    async fn read_one_chunk() {
137        let mut reader: [u8; 128] = [0; 128];
138        fill(&mut reader);
139        let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 128).await;
140        assert!(result.is_ok());
141        let bytes = result.unwrap();
142        assert_eq!(128, bytes.len())
143    }
144
145    #[tokio::test]
146    async fn read_with_one_byte_header() {
147        let mut reader: [u8; 257] = [0; 257];
148        let mut part: [u8; 128] = [0; 128];
149        fill(&mut part);
150        reader[..128].copy_from_slice(&part);
151        reader[128] = u8::from(MessageFormat::Continue) << 6 | 2;
152        reader[129..].copy_from_slice(&part);
153        let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
154        assert!(result.is_ok());
155        let bytes = result.unwrap();
156        assert_eq!(256, bytes.len())
157    }
158
159    #[tokio::test]
160    async fn read_with_two_bytes_header() {
161        let mut reader: [u8; 258] = [0; 258];
162        let mut part: [u8; 128] = [0; 128];
163        fill(&mut part);
164        reader[..128].copy_from_slice(&part);
165        reader[128] = u8::from(MessageFormat::Continue) << 6;
166        reader[129] = 2;
167        reader[130..].copy_from_slice(&part);
168        let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
169        assert!(result.is_ok());
170        let bytes = result.unwrap();
171        assert_eq!(256, bytes.len())
172    }
173
174    #[tokio::test]
175    async fn read_with_three_bytes_header() {
176        let mut reader: [u8; 259] = [0; 259];
177        let mut part: [u8; 128] = [0; 128];
178        fill(&mut part);
179        reader[..128].copy_from_slice(&part);
180        reader[128] = u8::from(MessageFormat::Continue) << 6 | 1;
181        reader[129..131].copy_from_slice(&2u16.to_le_bytes());
182        reader[131..].copy_from_slice(&part);
183        let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
184        assert!(result.is_ok());
185        let bytes = result.unwrap();
186        assert_eq!(256, bytes.len())
187    }
188}