sheave_core/
writers.rs

1mod encryption_algorithm;
2mod handshake;
3mod basic_header;
4mod message_header;
5mod extended_timestamp;
6mod chunk_data;
7
8use std::{
9    io::Result as IOResult,
10    pin::Pin,
11    time::Duration
12};
13use tokio::io::AsyncWrite;
14use crate::{
15    U24_MAX,
16    handlers::{
17        LastChunk,
18        RtmpContext
19    },
20    messages::headers::{
21        BasicHeader,
22        MessageFormat,
23        MessageHeader,
24        MessageType
25    }
26};
27pub use self::{
28    encryption_algorithm::*,
29    handshake::*,
30    basic_header::*,
31    message_header::*,
32    extended_timestamp::*,
33    chunk_data::*
34};
35
36/// A wrapper for writing a chunk into streams.
37/// The RTMP needs to refer previous states for deciding sending chunk pattern.
38/// But to check them in every step is troublesome and also can make some bug.
39/// This reduces their risks.
40///
41/// # Examples
42///
43/// ```rust
44/// use std::{
45///     io::Result as IOResult,
46///     pin::{
47///         Pin,
48///         pin
49///     },
50///     time::Duration
51/// };
52/// use sheave_core::{
53///     ByteBuffer,
54///     Encoder,
55///     handlers::{
56///         RtmpContext,
57///         VecStream
58///     },
59///     messages::{
60///         ChunkData,
61///         Connect,
62///         amf::v0::Object
63///     },
64///     readers::{
65///         read_basic_header,
66///         read_message_header,
67///         read_chunk_data
68///     },
69///     writers::write_chunk
70/// };
71///
72/// #[tokio::main]
73/// async fn main() -> IOResult<()> {
74///     let mut stream = pin!(VecStream::default());
75///     let mut rtmp_context = RtmpContext::default();
76///     let mut buffer = ByteBuffer::default();
77///     buffer.encode(&Connect::new(Object::default()));
78///     let expected: Vec<u8> = buffer.into();
79///     write_chunk(stream.as_mut(), &mut rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &expected).await?;
80///
81///     let basic_header = read_basic_header(stream.as_mut()).await?;
82///     let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await?;
83///     let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
84///     let message_length = rtmp_context.get_last_sent_chunk(&basic_header.get_chunk_id()).unwrap().get_message_length();
85///     let actual = read_chunk_data(stream.as_mut(), receiving_chunk_size, message_length).await?;
86///     assert_eq!(expected, actual);
87///
88///     Ok(())
89/// }
90/// ```
91pub async fn write_chunk<'a, W: AsyncWrite>(mut writer: Pin<&'a mut W>, rtmp_context: &'a mut RtmpContext, chunk_id: u16, mut timestamp: Duration, message_type: MessageType, message_id: u32, data: &'a [u8]) -> IOResult<()> {
92    let message_format = if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk(&chunk_id) {
93        if message_id != last_sent_chunk.get_message_id() {
94            MessageFormat::New
95        } else if (message_type != last_sent_chunk.get_message_type()) || (data.len() != last_sent_chunk.get_message_length() as usize) {
96            MessageFormat::SameSource
97        } else if timestamp != last_sent_chunk.get_timestamp() {
98            MessageFormat::TimerChange
99        } else {
100            MessageFormat::Continue
101        }
102    } else {
103        MessageFormat::New
104    };
105    let extended_timestamp = if timestamp.as_millis() >= U24_MAX as u128 {
106        let extended_timestamp = Some(timestamp);
107        timestamp = Duration::from_millis(U24_MAX as u64);
108        extended_timestamp
109    } else {
110        None
111    };
112    let message_header = match message_format {
113        MessageFormat::New => MessageHeader::New((timestamp, data.len() as u32, message_type, message_id).into()),
114        MessageFormat::SameSource => MessageHeader::SameSource((timestamp, data.len() as u32, message_type).into()),
115        MessageFormat::TimerChange => MessageHeader::TimerChange(timestamp.into()),
116        MessageFormat::Continue => MessageHeader::Continue
117    };
118
119    write_basic_header(writer.as_mut(), &BasicHeader::new(message_format, chunk_id)).await?;
120    write_message_header(writer.as_mut(), &message_header).await?;
121    if let Some(extended_timestamp) = extended_timestamp {
122        write_extended_timestamp(writer.as_mut(), extended_timestamp).await?;
123    }
124
125    if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk_mut(&chunk_id) {
126        if let Some(extended_timestamp) = extended_timestamp {
127            last_sent_chunk.set_timestamp(extended_timestamp);
128        } else {
129            message_header.get_timestamp().map(
130                |timestamp| last_sent_chunk.set_timestamp(timestamp)
131            );
132        }
133        message_header.get_message_length().map(
134            |message_length| last_sent_chunk.set_message_length(message_length)
135        );
136        message_header.get_message_type().map(
137            |message_type| last_sent_chunk.set_message_type(message_type)
138        );
139        message_header.get_message_id().map(
140            |message_id| last_sent_chunk.set_message_id(message_id)
141        );
142    } else {
143        rtmp_context.insert_sent_chunk(
144            chunk_id,
145            LastChunk::new(
146                message_header.get_timestamp().unwrap(),
147                message_header.get_message_length().unwrap(),
148                message_header.get_message_type().unwrap(),
149                message_header.get_message_id().unwrap()
150            )
151        );
152    }
153
154    write_chunk_data(writer.as_mut(), chunk_id, rtmp_context.get_sending_chunk_size(), data).await
155}