sheave_core/readers/
chunk_data.rs1use std::{
2 cmp::min,
3 future::Future,
4 io::Result as IOResult,
5 pin::{
6 Pin,
7 pin
8 },
9 task::{
10 Context as FutureContext,
11 Poll
12 }
13};
14use futures::ready;
15use tokio::io::{
16 AsyncRead,
17 ReadBuf
18};
19use crate::messages::ChunkSize;
20use super::read_basic_header;
21
22#[doc(hidden)]
23#[derive(Debug)]
24pub struct ChunkDataReader<'a, R: AsyncRead> {
25 reader: Pin<&'a mut R>,
26 chunk_size: ChunkSize,
27 message_length: u32
28}
29
30#[doc(hidden)]
31impl<R: AsyncRead> Future for ChunkDataReader<'_, R> {
32 type Output = IOResult<Vec<u8>>;
33
34 fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
35 unsafe {
36 let mut chunk_data_bytes: Vec<u8> = Vec::new();
37 let mut remained = self.message_length;
38
39 loop {
40 let capacity = min(self.chunk_size.get_chunk_size(), remained);
41 let mut tmp_bytes: Vec<u8> = Vec::with_capacity(capacity as usize);
42 tmp_bytes.set_len(tmp_bytes.capacity());
43 let mut buf = ReadBuf::new(tmp_bytes.as_mut_slice());
44 ready!(self.reader.as_mut().poll_read(cx, &mut buf))?;
45 chunk_data_bytes.extend_from_slice(&tmp_bytes);
46
47 remained -= capacity;
48 if remained > 0 {
49 ready!(pin!(read_basic_header(self.reader.as_mut())).poll(cx))?;
50 } else {
51 return Poll::Ready(Ok(chunk_data_bytes))
52 }
53 }
54 }
55 }
56}
57
58pub fn read_chunk_data<'a, R: AsyncRead>(reader: Pin<&'a mut R>, chunk_size: ChunkSize, message_length: u32) -> ChunkDataReader<'a, R> {
125 ChunkDataReader { reader, chunk_size, message_length }
126}
127
128#[cfg(test)]
129mod tests {
130 use std::pin::pin;
131 use rand::fill;
132 use crate::messages::headers::MessageFormat;
133 use super::*;
134
135 #[tokio::test]
136 async fn read_one_chunk() {
137 let mut reader: [u8; 128] = [0; 128];
138 fill(&mut reader);
139 let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 128).await;
140 assert!(result.is_ok());
141 let bytes = result.unwrap();
142 assert_eq!(128, bytes.len())
143 }
144
145 #[tokio::test]
146 async fn read_with_one_byte_header() {
147 let mut reader: [u8; 257] = [0; 257];
148 let mut part: [u8; 128] = [0; 128];
149 fill(&mut part);
150 reader[..128].copy_from_slice(&part);
151 reader[128] = u8::from(MessageFormat::Continue) << 6 | 2;
152 reader[129..].copy_from_slice(&part);
153 let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
154 assert!(result.is_ok());
155 let bytes = result.unwrap();
156 assert_eq!(256, bytes.len())
157 }
158
159 #[tokio::test]
160 async fn read_with_two_bytes_header() {
161 let mut reader: [u8; 258] = [0; 258];
162 let mut part: [u8; 128] = [0; 128];
163 fill(&mut part);
164 reader[..128].copy_from_slice(&part);
165 reader[128] = u8::from(MessageFormat::Continue) << 6;
166 reader[129] = 2;
167 reader[130..].copy_from_slice(&part);
168 let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
169 assert!(result.is_ok());
170 let bytes = result.unwrap();
171 assert_eq!(256, bytes.len())
172 }
173
174 #[tokio::test]
175 async fn read_with_three_bytes_header() {
176 let mut reader: [u8; 259] = [0; 259];
177 let mut part: [u8; 128] = [0; 128];
178 fill(&mut part);
179 reader[..128].copy_from_slice(&part);
180 reader[128] = u8::from(MessageFormat::Continue) << 6 | 1;
181 reader[129..131].copy_from_slice(&2u16.to_le_bytes());
182 reader[131..].copy_from_slice(&part);
183 let result = read_chunk_data(pin!(reader.as_slice()), ChunkSize::default(), 256).await;
184 assert!(result.is_ok());
185 let bytes = result.unwrap();
186 assert_eq!(256, bytes.len())
187 }
188}