1mod encryption_algorithm;
2mod handshake;
3mod basic_header;
4mod message_header;
5mod extended_timestamp;
6mod chunk_data;
7
8use std::{
9 io::Result as IOResult,
10 pin::Pin,
11 time::Duration
12};
13use tokio::io::AsyncWrite;
14use crate::{
15 U24_MAX,
16 handlers::{
17 LastChunk,
18 RtmpContext
19 },
20 messages::headers::{
21 BasicHeader,
22 MessageFormat,
23 MessageHeader,
24 MessageType
25 }
26};
27pub use self::{
28 encryption_algorithm::*,
29 handshake::*,
30 basic_header::*,
31 message_header::*,
32 extended_timestamp::*,
33 chunk_data::*
34};
35
36pub async fn write_chunk<'a, W: AsyncWrite>(mut writer: Pin<&'a mut W>, rtmp_context: &'a mut RtmpContext, chunk_id: u16, mut timestamp: Duration, message_type: MessageType, message_id: u32, data: &'a [u8]) -> IOResult<()> {
93 let message_format = if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk(&chunk_id) {
94 if message_id != last_sent_chunk.get_message_id() {
95 MessageFormat::New
96 } else if (message_type != last_sent_chunk.get_message_type()) || (data.len() != last_sent_chunk.get_message_length() as usize) {
97 MessageFormat::SameSource
98 } else if timestamp != last_sent_chunk.get_timestamp() {
99 MessageFormat::TimerChange
100 } else {
101 MessageFormat::Continue
102 }
103 } else {
104 MessageFormat::New
105 };
106 let extended_timestamp = if timestamp.as_millis() >= U24_MAX as u128 {
107 let extended_timestamp = Some(timestamp);
108 timestamp = Duration::from_millis(U24_MAX as u64);
109 extended_timestamp
110 } else {
111 None
112 };
113 let message_header: MessageHeader = match message_format {
114 MessageFormat::New => (timestamp, data.len() as u32, message_type, message_id).into(),
115 MessageFormat::SameSource => (timestamp, data.len() as u32, message_type).into(),
116 MessageFormat::TimerChange => timestamp.into(),
117 MessageFormat::Continue => ().into()
118 };
119
120 write_basic_header(writer.as_mut(), &BasicHeader::new(message_format, chunk_id)).await?;
121 write_message_header(writer.as_mut(), &message_header).await?;
122 if let Some(extended_timestamp) = extended_timestamp {
123 write_extended_timestamp(writer.as_mut(), extended_timestamp).await?;
124 }
125
126 if let Some(last_sent_chunk) = rtmp_context.get_last_sent_chunk_mut(&chunk_id) {
127 if let Some(extended_timestamp) = extended_timestamp {
128 last_sent_chunk.set_timestamp(extended_timestamp);
129 } else {
130 message_header.get_timestamp().map(
131 |timestamp| last_sent_chunk.set_timestamp(timestamp)
132 );
133 }
134 message_header.get_message_length().map(
135 |message_length| last_sent_chunk.set_message_length(message_length)
136 );
137 message_header.get_message_type().map(
138 |message_type| last_sent_chunk.set_message_type(message_type)
139 );
140 message_header.get_message_id().map(
141 |message_id| last_sent_chunk.set_message_id(message_id)
142 );
143 } else {
144 rtmp_context.insert_sent_chunk(
145 chunk_id,
146 LastChunk::new(
147 message_header.get_timestamp().unwrap(),
148 message_header.get_message_length().unwrap(),
149 message_header.get_message_type().unwrap(),
150 message_header.get_message_id().unwrap()
151 )
152 );
153 }
154
155 write_chunk_data(writer.as_mut(), chunk_id, rtmp_context.get_sending_chunk_size(), data).await
156}