1mod encryption_algorithm;
2mod handshake;
3mod basic_header;
4mod message_header;
5mod extended_timestamp;
6mod chunk_data;
7
8use std::{
9 io::Result as IOResult,
10 pin::Pin,
11 time::Duration
12};
13use tokio::io::AsyncWrite;
14use crate::{
15 U24_MAX,
16 handlers::{
17 LastChunk,
18 RtmpContext
19 },
20 messages::headers::{
21 BasicHeader,
22 MessageFormat,
23 MessageHeader,
24 MessageType
25 }
26};
27pub use self::{
28 encryption_algorithm::*,
29 handshake::*,
30 basic_header::*,
31 message_header::*,
32 extended_timestamp::*,
33 chunk_data::*
34};
35
36pub async fn write_chunk<'a, W: AsyncWrite>(mut writer: Pin<&'a mut W>, rtmp_context: &'a mut RtmpContext, chunk_id: u16, mut timestamp: Duration, message_type: MessageType, message_id: u32, data: &'a [u8]) -> IOResult<()> {
92 let message_format = if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk(&chunk_id) {
93 if message_id != last_sent_chunk.get_message_id() {
94 MessageFormat::New
95 } else if (message_type != last_sent_chunk.get_message_type()) || (data.len() != last_sent_chunk.get_message_length() as usize) {
96 MessageFormat::SameSource
97 } else if timestamp != last_sent_chunk.get_timestamp() {
98 MessageFormat::TimerChange
99 } else {
100 MessageFormat::Continue
101 }
102 } else {
103 MessageFormat::New
104 };
105 let extended_timestamp = if timestamp.as_millis() >= U24_MAX as u128 {
106 let extended_timestamp = Some(timestamp);
107 timestamp = Duration::from_millis(U24_MAX as u64);
108 extended_timestamp
109 } else {
110 None
111 };
112 let message_header = match message_format {
113 MessageFormat::New => MessageHeader::New((timestamp, data.len() as u32, message_type, message_id).into()),
114 MessageFormat::SameSource => MessageHeader::SameSource((timestamp, data.len() as u32, message_type).into()),
115 MessageFormat::TimerChange => MessageHeader::TimerChange(timestamp.into()),
116 MessageFormat::Continue => MessageHeader::Continue
117 };
118
119 write_basic_header(writer.as_mut(), &BasicHeader::new(message_format, chunk_id)).await?;
120 write_message_header(writer.as_mut(), &message_header).await?;
121 if let Some(extended_timestamp) = extended_timestamp {
122 write_extended_timestamp(writer.as_mut(), extended_timestamp).await?;
123 }
124
125 if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk_mut(&chunk_id) {
126 if let Some(extended_timestamp) = extended_timestamp {
127 last_sent_chunk.set_timestamp(extended_timestamp);
128 } else {
129 message_header.get_timestamp().map(
130 |timestamp| last_sent_chunk.set_timestamp(timestamp)
131 );
132 }
133 message_header.get_message_length().map(
134 |message_length| last_sent_chunk.set_message_length(message_length)
135 );
136 message_header.get_message_type().map(
137 |message_type| last_sent_chunk.set_message_type(message_type)
138 );
139 message_header.get_message_id().map(
140 |message_id| last_sent_chunk.set_message_id(message_id)
141 );
142 } else {
143 rtmp_context.insert_sent_chunk(
144 chunk_id,
145 LastChunk::new(
146 message_header.get_timestamp().unwrap(),
147 message_header.get_message_length().unwrap(),
148 message_header.get_message_type().unwrap(),
149 message_header.get_message_id().unwrap()
150 )
151 );
152 }
153
154 write_chunk_data(writer.as_mut(), chunk_id, rtmp_context.get_sending_chunk_size(), data).await
155}