sheave_core/
writers.rs

1mod encryption_algorithm;
2mod handshake;
3mod basic_header;
4mod message_header;
5mod extended_timestamp;
6mod chunk_data;
7
8use std::{
9    io::Result as IOResult,
10    pin::Pin,
11    time::Duration
12};
13use tokio::io::AsyncWrite;
14use crate::{
15    U24_MAX,
16    handlers::{
17        LastChunk,
18        RtmpContext
19    },
20    messages::headers::{
21        BasicHeader,
22        MessageFormat,
23        MessageHeader,
24        MessageType
25    }
26};
27pub use self::{
28    encryption_algorithm::*,
29    handshake::*,
30    basic_header::*,
31    message_header::*,
32    extended_timestamp::*,
33    chunk_data::*
34};
35
36/// A wrapper for writing a chunk into streams.
37///
38/// The RTMP needs to refer previous states for deciding sending chunk pattern.
39/// But to check them in every step is troublesome and also can make some bug.
40/// This reduces their risks.
41///
42/// # Examples
43///
44/// ```rust
45/// use std::{
46///     io::Result as IOResult,
47///     pin::{
48///         Pin,
49///         pin
50///     },
51///     time::Duration
52/// };
53/// use sheave_core::{
54///     ByteBuffer,
55///     Encoder,
56///     handlers::{
57///         RtmpContext,
58///         VecStream
59///     },
60///     messages::{
61///         ChunkData,
62///         Connect,
63///         amf::v0::Object
64///     },
65///     readers::{
66///         read_basic_header,
67///         read_message_header,
68///         read_chunk_data
69///     },
70///     writers::write_chunk
71/// };
72///
73/// #[tokio::main]
74/// async fn main() -> IOResult<()> {
75///     let mut stream = pin!(VecStream::default());
76///     let mut rtmp_context = RtmpContext::default();
77///     let mut buffer = ByteBuffer::default();
78///     buffer.encode(&Connect::new(Object::default()));
79///     let expected: Vec<u8> = buffer.into();
80///     write_chunk(stream.as_mut(), &mut rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &expected).await?;
81///
82///     let basic_header = read_basic_header(stream.as_mut()).await?;
83///     let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await?;
84///     let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
85///     let message_length = rtmp_context.get_last_sent_chunk(&basic_header.get_chunk_id()).unwrap().get_message_length();
86///     let actual = read_chunk_data(stream.as_mut(), receiving_chunk_size, message_length).await?;
87///     assert_eq!(expected, actual);
88///
89///     Ok(())
90/// }
91/// ```
92pub async fn write_chunk<'a, W: AsyncWrite>(mut writer: Pin<&'a mut W>, rtmp_context: &'a mut RtmpContext, chunk_id: u16, mut timestamp: Duration, message_type: MessageType, message_id: u32, data: &'a [u8]) -> IOResult<()> {
93    let message_format = if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk(&chunk_id) {
94        if message_id != last_sent_chunk.get_message_id() {
95            MessageFormat::New
96        } else if (message_type != last_sent_chunk.get_message_type()) || (data.len() != last_sent_chunk.get_message_length() as usize) {
97            MessageFormat::SameSource
98        } else if timestamp != last_sent_chunk.get_timestamp() {
99            MessageFormat::TimerChange
100        } else {
101            MessageFormat::Continue
102        }
103    } else {
104        MessageFormat::New
105    };
106    let extended_timestamp = if timestamp.as_millis() >= U24_MAX as u128 {
107        let extended_timestamp = Some(timestamp);
108        timestamp = Duration::from_millis(U24_MAX as u64);
109        extended_timestamp
110    } else {
111        None
112    };
113    let message_header: MessageHeader = match message_format {
114        MessageFormat::New => (timestamp, data.len() as u32, message_type, message_id).into(),
115        MessageFormat::SameSource => (timestamp, data.len() as u32, message_type).into(),
116        MessageFormat::TimerChange => timestamp.into(),
117        MessageFormat::Continue => ().into()
118    };
119
120    write_basic_header(writer.as_mut(), &BasicHeader::new(message_format, chunk_id)).await?;
121    write_message_header(writer.as_mut(), &message_header).await?;
122    if let Some(extended_timestamp) = extended_timestamp {
123        write_extended_timestamp(writer.as_mut(), extended_timestamp).await?;
124    }
125
126    if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk_mut(&chunk_id) {
127        if let Some(extended_timestamp) = extended_timestamp {
128            last_sent_chunk.set_timestamp(extended_timestamp);
129        } else {
130            message_header.get_timestamp().map(
131                |timestamp| last_sent_chunk.set_timestamp(timestamp)
132            );
133        }
134        message_header.get_message_length().map(
135            |message_length| last_sent_chunk.set_message_length(message_length)
136        );
137        message_header.get_message_type().map(
138            |message_type| last_sent_chunk.set_message_type(message_type)
139        );
140        message_header.get_message_id().map(
141            |message_id| last_sent_chunk.set_message_id(message_id)
142        );
143    } else {
144        rtmp_context.insert_sent_chunk(
145            chunk_id,
146            LastChunk::new(
147                message_header.get_timestamp().unwrap(),
148                message_header.get_message_length().unwrap(),
149                message_header.get_message_type().unwrap(),
150                message_header.get_message_id().unwrap()
151            )
152        );
153    }
154
155    write_chunk_data(writer.as_mut(), chunk_id, rtmp_context.get_sending_chunk_size(), data).await
156}