sheave_core/writers/
chunk_data.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
use std::{
    future::Future,
    io::Result as IOResult,
    pin::{
        Pin,
        pin
    },
    task::{
        Context as FutureContext,
        Poll
    }
};
use futures::ready;
use tokio::io::AsyncWrite;
use crate::{
    messages::{
        ChunkSize,
        headers::{
            BasicHeader,
            MessageFormat
        }
    },
    writers::write_basic_header
};

#[doc(hidden)]
#[derive(Debug)]
pub struct ChunkDataWriter<'a, W: AsyncWrite> {
    writer: Pin<&'a mut W>,
    chunk_id: u16,
    chunk_size: ChunkSize,
    chunk_data: &'a [u8],
}

#[doc(hidden)]
impl<W: AsyncWrite> Future for ChunkDataWriter<'_, W> {
    type Output = IOResult<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
        let chunk_size = self.chunk_size.get_chunk_size();
        let mut chunks = self.chunk_data.chunks(chunk_size as usize);
        while let Some(chunk) = chunks.next() {
            ready!(self.writer.as_mut().poll_write(cx, chunk))?;

            if chunks.size_hint().0 >= 1 {
                let basic_header = BasicHeader::new(MessageFormat::Continue, self.chunk_id);
                ready!(pin!(write_basic_header(self.writer.as_mut(), &basic_header)).poll(cx))?;
            }
        }

        Poll::Ready(Ok(()))
    }
}

/// Writes a chunk data into streams.
///
/// If a chunk data exceeds specified chunk size, continue headers is inserted between chunk data per chunk size.
/// Note the message length doesn't count their headers.
///
/// # Examples
///
/// ```rust
/// use std::{
///     io::Result as IOResult,
///     pin::{
///         Pin,
///         pin
///     }
/// };
/// use rand::{
///     Fill,
///     thread_rng
/// };
/// use sheave_core::{
///     messages::{
///         ChunkSize,
///         headers::MessageFormat
///     },
///     writers::write_chunk_data
/// };
///
/// #[tokio::main]
/// async fn main() -> IOResult<()> {
///     // When it's just one chunk.
///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
///     let mut chunk_data: [u8; 128] = [0; 128];
///     chunk_data.try_fill(&mut thread_rng()).unwrap();
///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
///     assert_eq!(128, writer.len());
///
///     // When it requires the one byte header.
///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
///     let mut chunk_data: [u8; 256] = [0; 256];
///     chunk_data.try_fill(&mut thread_rng()).unwrap();
///     write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await?;
///     assert_eq!(257, writer.len());
///     let message_format: MessageFormat = (writer[128] >> 6).into();
///     assert_eq!(MessageFormat::Continue, message_format);
///     let chunk_id = writer[128] << 2 >> 2;
///     assert_eq!(2, chunk_id);
///
///     // When it requires the two bytes header.
///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
///     let mut chunk_data: [u8; 256] = [0; 256];
///     chunk_data.try_fill(&mut thread_rng()).unwrap();
///     write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await?;
///     assert_eq!(258, writer.len());
///     let message_format: MessageFormat = (writer[128] >> 6).into();
///     assert_eq!(MessageFormat::Continue, message_format);
///     assert_eq!(0, writer[128] << 2 >> 2);
///     let chunk_id = writer[129];
///     assert_eq!(64, chunk_id + 64);
///
///     // When it requires the three bytes header.
///     let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
///     let mut chunk_data: [u8; 256] = [0; 256];
///     chunk_data.try_fill(&mut thread_rng()).unwrap();
///     write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await?;
///     assert_eq!(259, writer.len());
///     let message_format: MessageFormat = (writer[128] >> 6).into();
///     assert_eq!(MessageFormat::Continue, message_format);
///     assert_eq!(1, writer[128] << 2 >> 2);
///     let mut chunk_id_bytes: [u8; 2] = [0; 2];
///     chunk_id_bytes.copy_from_slice(&writer[129..131]);
///     let chunk_id = u16::from_le_bytes(chunk_id_bytes);
///     assert_eq!(320, chunk_id + 64);
///
///     Ok(())
/// }
/// ```
pub fn write_chunk_data<'a, W: AsyncWrite>(writer: Pin<&'a mut W>, chunk_id: u16, chunk_size: ChunkSize, chunk_data: &'a [u8]) -> ChunkDataWriter<'a, W> {
    ChunkDataWriter { writer, chunk_id, chunk_size, chunk_data }
}

#[cfg(test)]
mod tests {
    use std::pin::pin;
    use rand::{
        Fill,
        thread_rng
    };
    use super::*;

    #[tokio::test]
    async fn write_one_chunk() {
        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
        let mut chunk_data: [u8; 128] = [0; 128];
        chunk_data.try_fill(&mut thread_rng()).unwrap();
        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
        assert!(result.is_ok());
        assert_eq!(128, writer.len())
    }

    #[tokio::test]
    async fn write_with_one_byte_header() {
        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
        let mut chunk_data: [u8; 256] = [0; 256];
        chunk_data.try_fill(&mut thread_rng()).unwrap();
        let result = write_chunk_data(writer.as_mut(), 2, ChunkSize::default(), &chunk_data).await;
        assert!(result.is_ok());
        assert_eq!(257, writer.len())
    }

    #[tokio::test]
    async fn write_with_two_bytes_header() {
        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
        let mut chunk_data: [u8; 256] = [0; 256];
        chunk_data.try_fill(&mut thread_rng()).unwrap();
        let result = write_chunk_data(writer.as_mut(), 64, ChunkSize::default(), &chunk_data).await;
        assert!(result.is_ok());
        assert_eq!(258, writer.len())
    }

    #[tokio::test]
    async fn write_with_three_bytes_header() {
        let mut writer: Pin<&mut Vec<u8>> = pin!(Vec::new());
        let mut chunk_data: [u8; 256] = [0; 256];
        chunk_data.try_fill(&mut thread_rng()).unwrap();
        let result = write_chunk_data(writer.as_mut(), 320, ChunkSize::default(), &chunk_data).await;
        assert!(result.is_ok());
        assert_eq!(259, writer.len());
    }
}