1use std::{
2 future::Future,
3 io::{
4 Error as IOError,
5 ErrorKind,
6 Result as IOResult
7 },
8 pin::{
9 Pin,
10 pin
11 },
12 sync::Arc,
13 task::{
14 Context as FutureContext,
15 Poll
16 },
17 time::{
18 Duration,
19 Instant
20 }
21};
22use log::{
23 error,
24 info
25};
26use futures::ready;
27use tokio::io::{
28 AsyncRead,
29 AsyncWrite
30};
31use sheave_core::{
32 ByteBuffer,
33 Decoder,
34 Encoder,
35 U24_MAX,
36 flv::tags::*,
37 handlers::{
38 AsyncHandler,
39 AsyncHandlerExt,
40 ClientType,
41 ErrorHandler,
42 HandlerConstructor,
43 LastChunk,
44 PublisherStatus,
45 RtmpContext,
46 StreamWrapper,
47 SubscriberStatus,
48 inconsistent_sha,
49 stream_got_exhausted
50 },
51 handshake::{
52 EncryptionAlgorithm,
53 Handshake,
54 Version
55 },
56 messages::{
57 Channel,
59 ChunkData,
60 Connect,
61 ConnectResult,
62 CreateStream,
63 CreateStreamResult,
64 UserControl,
65 EventType,
66 OnStatus,
67 Audio,
68 Video,
69 SetDataFrame,
70 Acknowledgement,
71 amf::v0::{
72 Number,
73 AmfString,
74 Object
75 },
76 headers::MessageType,
77
78 ReleaseStream,
80 ReleaseStreamResult,
81 FcPublish,
82 OnFcPublish,
83 StreamBegin,
84 Publish,
85 FcUnpublish,
86 DeleteStream,
87
88 WindowAcknowledgementSize,
90 FcSubscribe,
91 SetBufferLength,
92 Play,
93 amf::v0::Boolean
94 },
95 net::RtmpReadExt,
96 object,
97 readers::*,
98 writers::*
99};
100use super::{
101 error_response,
102 middlewares::write_acknowledgement
103};
104
105#[doc(hidden)]
106#[derive(Debug)]
107struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
108
109#[doc(hidden)]
110impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
111 async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
112 let encryption_algorithm = EncryptionAlgorithm::default();
113
114 let version = if rtmp_context.is_signed() {
115 Version::LATEST_CLIENT
116 } else {
117 Version::UNSIGNED
118 };
119 let mut client_request = Handshake::new(Instant::now().elapsed(), version);
120 if rtmp_context.is_signed() {
121 client_request.imprint_digest(encryption_algorithm, Handshake::CLIENT_KEY);
122 }
123
124 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
125 write_handshake(self.0.as_mut(), &client_request).await?;
126
127 rtmp_context.set_encryption_algorithm(encryption_algorithm);
128 rtmp_context.set_client_handshake(client_request);
129
130 info!("First handshake got handled.");
131 Ok(())
132 }
133
134 async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
135 let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
136 let mut server_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
137 let server_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
138
139 if !rtmp_context.is_signed() {
140 write_handshake(self.0.as_mut(), &server_request).await?;
141
142 rtmp_context.set_server_handshake(server_request);
143 rtmp_context.set_client_handshake(server_response);
144
145 } else if !server_request.did_digest_match(encryption_algorithm, Handshake::SERVER_KEY) {
146 error!("Invalid SHA digest/signature: {:x?}", server_request.get_digest(encryption_algorithm));
147 return Err(inconsistent_sha(server_response.get_digest(encryption_algorithm).to_vec()))
148 } else {
149 let mut server_response_key: Vec<u8> = Vec::new();
150 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152
153 if !server_response.did_signature_match(encryption_algorithm, &server_response_key) {
154 error!("Invalid SHA digest/signature: {:x?}", server_response.get_signature());
155 return Err(inconsistent_sha(server_response.get_signature().to_vec()))
156 } else {
157 let mut client_response_key: Vec<u8> = Vec::new();
158 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
159 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
160 server_request.imprint_signature(encryption_algorithm, &client_response_key);
161 write_handshake(self.0.as_mut(), &server_request).await?;
162
163 rtmp_context.set_server_handshake(server_request);
164 rtmp_context.set_client_handshake(server_response);
165 }
166 }
167
168 info!("Second handshake got handled.");
169 Ok(())
170 }
171}
172
173#[doc(hidden)]
174impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
175 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
176 ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
177 pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
178 }
179}
180
181#[doc(hidden)]
182fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
183 HandshakeHandler(stream)
184}
185
186#[doc(hidden)]
187#[derive(Debug)]
188struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
189
190#[doc(hidden)]
191impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
192 async fn write_connect_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
193 use ClientType::*;
194
195 let client_type = rtmp_context.get_client_type().unwrap();
196
197 rtmp_context.increase_transaction_id();
198
199 let command_object = match client_type {
200 Publisher => object!(
201 "app" => rtmp_context.get_app().unwrap().clone(),
202 "type" => AmfString::from("nonprivate"),
203 "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
204 "tcUrl" => rtmp_context.get_tc_url().unwrap().clone()
205 ),
206 Subscriber => object!(
207 "app" => rtmp_context.get_app().unwrap().clone(),
208 "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
209 "tcUrl" => rtmp_context.get_tc_url().unwrap().clone(),
210 "fpad" => Boolean::new(0),
211 "capabilities" => Number::from(15u8),
212 "audioCodecs" => Number::from(4071u16),
213 "videoCodecs" => Number::from(252u8),
214 "videoFunction" => Number::from(1u8)
215 )
216 };
217 let connect = Connect::new(command_object);
218 let mut buffer = ByteBuffer::default();
219 buffer.encode(&AmfString::from("connect"));
220 buffer.encode(&rtmp_context.get_transaction_id());
221 buffer.encode(&connect);
222 write_chunk(self.0.as_mut(), rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
223
224 rtmp_context.set_command_object(connect.into());
225
226 info!("connect got sent.");
227 Ok(())
228 }
229
230 async fn write_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
231 let mut buffer = ByteBuffer::default();
232 buffer.encode(&rtmp_context.get_window_acknowledgement_size());
233 write_chunk(self.0.as_mut(), rtmp_context, WindowAcknowledgementSize::CHANNEL.into(), Duration::default(), WindowAcknowledgementSize::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
234
235 rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
236
237 info!("Window Acknowledgement Size got sent.");
238 Ok(())
239 }
240
241 async fn write_release_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
242 rtmp_context.increase_transaction_id();
243
244 let mut buffer = ByteBuffer::default();
245 buffer.encode(&AmfString::from("releaseStream"));
246 buffer.encode(&rtmp_context.get_transaction_id());
247 buffer.encode(&ReleaseStream::new(rtmp_context.get_topic_path().unwrap().clone()));
248 write_chunk(self.0.as_mut(), rtmp_context, ReleaseStream::CHANNEL.into(), Duration::default(), ReleaseStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
249
250 info!("releaseStream got sent.");
251 Ok(())
252 }
253
254 async fn write_fc_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
255 rtmp_context.increase_transaction_id();
256
257 let mut buffer = ByteBuffer::default();
258 buffer.encode(&AmfString::from("FCPublish"));
259 buffer.encode(&rtmp_context.get_transaction_id());
260 buffer.encode(&FcPublish::new(rtmp_context.get_topic_path().unwrap().clone()));
261 write_chunk(self.0.as_mut(), rtmp_context, FcPublish::CHANNEL.into(), Duration::default(), FcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
262
263 info!("FCPublish got sent.");
264 Ok(())
265 }
266
267 async fn write_create_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
268 rtmp_context.increase_transaction_id();
269
270 let mut buffer = ByteBuffer::default();
271 buffer.encode(&AmfString::from("createStream"));
272 buffer.encode(&rtmp_context.get_transaction_id());
273 buffer.encode(&CreateStream);
274 write_chunk(self.0.as_mut(), rtmp_context, CreateStream::CHANNEL.into(), Duration::default(), CreateStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
275
276 info!("createStream got sent.");
277 Ok(())
278 }
279
280 async fn write_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
281 rtmp_context.increase_transaction_id();
282
283 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
284
285 let mut buffer = ByteBuffer::default();
286 buffer.encode(&FcSubscribe::new(topic_path));
287 write_chunk(self.0.as_mut(), rtmp_context, FcSubscribe::CHANNEL.into(), Duration::default(), FcSubscribe::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
288
289 rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
290
291 info!("FCSubscribe got sent.");
292 Ok(())
293 }
294
295 async fn write_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
296 rtmp_context.increase_transaction_id();
297
298 let publishing_name = rtmp_context.get_topic_path().unwrap().clone();
299 let publishing_type = "live";
300 let mut buffer = ByteBuffer::default();
301 buffer.encode(&AmfString::from("publish"));
302 buffer.encode(&rtmp_context.get_transaction_id());
303 buffer.encode(&Publish::new(publishing_name.clone(), publishing_type.into()));
304 let message_id = rtmp_context.get_message_id().unwrap();
305 write_chunk(self.0.as_mut(), rtmp_context, Publish::CHANNEL.into(), Duration::default(), Publish::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
306
307 info!("publish got sent.");
308 Ok(())
309 }
310
311 async fn write_play_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
312 rtmp_context.increase_transaction_id();
313
314 let stream_name = rtmp_context.get_topic_path().unwrap().clone();
315 let start_time = rtmp_context.get_start_time().unwrap();
316 let play_mode = rtmp_context.get_play_mode().unwrap();
317
318 let mut buffer = ByteBuffer::default();
319 buffer.encode(&AmfString::from("play"));
320 buffer.encode(&rtmp_context.get_transaction_id());
321 buffer.encode(&Play::new(stream_name.clone(), start_time, play_mode));
322 write_chunk(self.0.as_mut(), rtmp_context, Play::CHANNEL.into(), Duration::default(), Play::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
323
324 info!("play got sent.");
325 Ok(())
326 }
327
328 async fn write_buffer_length(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
329 let message_id = rtmp_context.get_message_id().unwrap();
330 let mut buffer = ByteBuffer::default();
331 buffer.put_u16_be(SetBufferLength::EVENT_TYPE.into());
332 buffer.encode(&SetBufferLength::new(message_id, rtmp_context.get_buffer_length()));
333 write_chunk(self.0.as_mut(), rtmp_context, SetBufferLength::CHANNEL.into(), Duration::default(), SetBufferLength::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
334
335 rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
336
337 info!("Buffer Length got sent.");
338 Ok(())
339 }
340
341 async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
342 for next in rtmp_context.get_topic_mut().unwrap() {
343 let flv_tag = next?;
344 let message_id = rtmp_context.get_message_id().unwrap();
345
346 let channel;
347 let message_type;
348 match flv_tag.get_tag_type() {
349 TagType::Audio => {
350 channel = Audio::CHANNEL;
351 message_type = Audio::MESSAGE_TYPE;
352 },
353 TagType::Video => {
354 channel = Video::CHANNEL;
355 message_type = Video::MESSAGE_TYPE;
356 },
357 TagType::ScriptData => {
358 channel = SetDataFrame::CHANNEL;
359 message_type = SetDataFrame::MESSAGE_TYPE;
360 },
361 TagType::Other => {
362 channel = Channel::Other;
363 message_type = MessageType::Other;
364 }
365 }
366 let timestamp = flv_tag.get_timestamp();
367 let data: Vec<u8> = if let MessageType::Data = message_type {
368 let mut buffer = ByteBuffer::default();
369 buffer.encode(&AmfString::from("@setDataFrame"));
370 buffer.put_bytes(flv_tag.get_data());
371 buffer.into()
372 } else {
373 flv_tag.get_data().to_vec()
374 };
375 write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
376
377 info!("FLV chunk got sent.");
378 return Ok(())
379 }
380
381 info!("FLV data became empty.");
383 Err(stream_got_exhausted())
384 }
385
386 async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
387 Decoder::<Acknowledgement>::decode(&mut buffer)?;
388
389 info!("Acknowledgement got handled.");
390 Ok(())
391 }
392
393 async fn handle_stream_begin(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
394 use ClientType::*;
395
396 let client_type = rtmp_context.get_client_type().unwrap();
397
398 Decoder::<StreamBegin>::decode(&mut buffer)?;
399
400 match client_type {
401 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
402 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
403 }
404
405 info!("Stream Begin got handled.");
406 Ok(())
407 }
408
409 async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
410 use EventType::*;
411
412 let event_type: EventType = buffer.get_u16_be()?.into();
413 match event_type {
414 StreamBegin => self.handle_stream_begin(rtmp_context, buffer).await,
415 _ => unreachable!("Publisher gets just a Stream Begin event.")
416 }
417 }
418
419 async fn handle_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object) -> IOResult<()> {
420 let error = error_response(information.clone());
421 rtmp_context.set_information(information);
422
423 error!("{error}");
424 Err(error)
425 }
426
427 async fn handle_connect_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
428 use ClientType::*;
429
430 let response: ConnectResult = buffer.decode()?;
431 let (properties, information): (Object, Object) = response.into();
432
433 rtmp_context.set_properties(properties);
434 rtmp_context.set_information(information);
435
436 match rtmp_context.get_client_type().unwrap() {
437 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
438 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
439 }
440
441 info!("connect result got handled.");
442 Ok(())
443 }
444
445 async fn handle_release_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
446 Decoder::<ReleaseStreamResult>::decode(&mut buffer)?;
447
448 rtmp_context.set_publisher_status(PublisherStatus::Released);
449
450 info!("releaseStream result got handled.");
451 Ok(())
452 }
453
454 async fn handle_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
455 Decoder::<OnFcPublish>::decode(&mut buffer)?;
456
457 rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
458
459 info!("onFCPublish got handled.");
460 Ok(())
461 }
462
463 async fn handle_create_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
464 use ClientType::*;
465
466 let client_type = rtmp_context.get_client_type().unwrap();
467
468 let response: CreateStreamResult = buffer.decode()?;
469 let message_id: u32 = response.into();
470 rtmp_context.set_message_id(message_id);
471
472 match client_type {
473 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
474 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
475 }
476
477 info!("createStream result got handled.");
478 Ok(())
479 }
480
481 async fn handle_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
482 let response: OnStatus = buffer.decode()?;
483 let information: Object = response.into();
484
485 if information.get_properties()["level"] == AmfString::from("error") {
491 return self.handle_error_response(rtmp_context, information).await
492 }
493
494 rtmp_context.set_information(information);
495
496 rtmp_context.set_publisher_status(PublisherStatus::Published);
497
498 info!("onStatus(publish) got handled.");
499 Ok(())
500 }
501
502 async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
503 Decoder::<FcUnpublish>::decode(&mut buffer)?;
504 rtmp_context.reset_topic_path();
505
506 info!("FCUnpublish got handled.");
507 Ok(())
508 }
509
510 async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
511 Decoder::<DeleteStream>::decode(&mut buffer)?;
512 rtmp_context.reset_message_id();
513
514 info!("deleteStream got handled.");
515 Ok(())
516 }
517
518 async fn handle_publisher_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
519 use PublisherStatus::*;
520
521 let command: AmfString = buffer.decode()?;
522
523 if command != "onFCPublish" {
525 Decoder::<Number>::decode(&mut buffer)?;
527 }
528
529 if command == "FCUnpublish" {
530 return self.handle_fc_unpublish_request(rtmp_context, buffer).await
531 } else if command == "deleteStream" {
532 return self.handle_delete_stream_request(rtmp_context, buffer).await
533 } else if command == "_error" {
534 let information: Object = buffer.decode()?;
535 return self.handle_error_response(rtmp_context, information).await
536 } else {
537 }
539
540 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
541 match publisher_status {
542 Connected => self.handle_release_stream_response(rtmp_context, buffer).await,
543 Released => self.handle_fc_publish_response(rtmp_context, buffer).await,
544 FcPublished => self.handle_create_stream_response(rtmp_context, buffer).await,
545 Began => self.handle_publish_response(rtmp_context, buffer).await,
546 _ => Ok(())
547 }
548 } else {
549 self.handle_connect_response(rtmp_context, buffer).await
550 }
551 }
552
553 async fn handle_play_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
554 let response: OnStatus = buffer.decode()?;
555 let information: Object = response.into();
556
557 if information.get_properties()["level"] == AmfString::from("error") {
563 return self.handle_error_response(rtmp_context, information).await
564 }
565
566 rtmp_context.set_information(information);
567
568 rtmp_context.set_subscriber_status(SubscriberStatus::Played);
569
570 info!("onStatus(play) got handled.");
571 Ok(())
572 }
573
574 async fn handle_subscriber_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
575 use SubscriberStatus::*;
576
577 let command: AmfString = buffer.decode()?;
578 Decoder::<Number>::decode(&mut buffer)?;
579
580 if command == "_error" {
581 let information: Object = buffer.decode()?;
582 return self.handle_error_response(rtmp_context, information).await
583 }
584
585 if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
586 match subscriber_status {
587 WindowAcknowledgementSizeGotSent => self.handle_create_stream_response(rtmp_context, buffer).await,
588 Began => self.handle_play_response(rtmp_context, buffer).await,
589 _ => return Ok(())
590 }
591 } else {
592 self.handle_connect_response(rtmp_context, buffer).await
593 }
594 }
595
596 async fn handle_command_response(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
597 use ClientType::*;
598
599 match rtmp_context.get_client_type().unwrap() {
600 Publisher => self.handle_publisher_response(rtmp_context, buffer).await,
601 Subscriber => self.handle_subscriber_response(rtmp_context, buffer).await
602 }
603 }
604
605 async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
606 let topic = rtmp_context.get_topic().unwrap();
607
608 let tag_type = match message_type {
609 MessageType::Audio => TagType::Audio,
610 MessageType::Video => TagType::Video,
611 MessageType::Data => TagType::ScriptData,
612 _ => TagType::Other
613 };
614
615 if let TagType::ScriptData = tag_type {
616 Decoder::<AmfString>::decode(&mut buffer)?;
617 }
618
619 let data: Vec<u8> = buffer.into();
620 let flv_tag = FlvTag::new(tag_type, timestamp, data);
621 topic.append_flv_tag(flv_tag)?;
622
623 info!("FLV chunk got handled.");
624 Ok(())
625 }
626}
627
628#[doc(hidden)]
629impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
630 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
631 use MessageType::*;
632
633 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
634 match publisher_status {
635 PublisherStatus::Connected => ready!(pin!(self.write_release_stream_request(rtmp_context)).poll(cx))?,
636 PublisherStatus::Released => ready!(pin!(self.write_fc_publish_request(rtmp_context)).poll(cx))?,
637 PublisherStatus::FcPublished => ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?,
638 PublisherStatus::Created => ready!(pin!(self.write_publish_request(rtmp_context)).poll(cx))?,
639 PublisherStatus::Published => ready!(pin!(self.write_flv(rtmp_context)).poll(cx))?,
640 _ => {}
641 }
642 } else if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
643 match subscriber_status {
644 SubscriberStatus::Connected => {
645 ready!(pin!(self.write_window_acknowledgement_size(rtmp_context)).poll(cx))?;
646 ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?
647 },
648 SubscriberStatus::Created => {
649 ready!(pin!(self.write_fc_subscribe_request(rtmp_context)).poll(cx))?;
650 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
651 },
652 SubscriberStatus::AdditionalCommandGotSent => {
653 ready!(pin!(self.write_play_request(rtmp_context)).poll(cx))?;
654 ready!(pin!(self.write_buffer_length(rtmp_context)).poll(cx))?
655 },
656 _ => {}
657 }
658 } else {
659 ready!(pin!(self.write_connect_request(rtmp_context)).poll(cx))?;
660 }
661
662 let basic_header = if let Some(PublisherStatus::Published) = rtmp_context.get_publisher_status() {
663 ready!(pin!(read_basic_header(pin!(self.0.try_read_after(rtmp_context.get_await_duration().unwrap())))).poll(cx))?
664 } else {
665 ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?
666 };
667 let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
668 let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
669 if timestamp.as_millis() == U24_MAX as u128 {
670 let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
671 Some(extended_timestamp)
672 } else {
673 None
674 }
675 } else {
676 None
677 };
678
679 let chunk_id = basic_header.get_chunk_id();
680 if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
681 if let Some(extended_timestamp) = extended_timestamp {
682 last_received_chunk.set_timestamp(extended_timestamp);
683 } else {
684 if let Some(timestamp) = message_header.get_timestamp() {
685 last_received_chunk.set_timestamp(timestamp);
686 }
687 }
688
689 if let Some(message_length) = message_header.get_message_length() {
690 last_received_chunk.set_message_length(message_length);
691 }
692
693 if let Some(message_type) = message_header.get_message_type() {
694 last_received_chunk.set_message_type(message_type);
695 }
696
697 if let Some(message_id) = message_header.get_message_id() {
698 last_received_chunk.set_message_id(message_id);
699 }
700 } else {
701 rtmp_context.insert_received_chunk(
702 chunk_id,
703 LastChunk::new(
704 message_header.get_timestamp().unwrap(),
705 message_header.get_message_length().unwrap(),
706 message_header.get_message_type().unwrap(),
707 message_header.get_message_id().unwrap()
708 )
709 );
710 }
711
712 let message_length = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length();
713 let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
714 let data = ready!(pin!(read_chunk_data(pin!(self.0.await_until_receiving()), receiving_chunk_size, message_length)).poll(cx))?;
715 let buffer: ByteBuffer = data.into();
716
717 let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
718 match message_type {
719 Acknowledgement => pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx),
720 UserControl => pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx),
721 Command => pin!(self.handle_command_response(rtmp_context, buffer)).poll(cx),
722 Audio | Video | Data => {
723 let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
724 pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx)
725 },
726 other => unimplemented!("Undefined Message: {other:?}")
727 }
728 }
729}
730
731#[doc(hidden)]
732fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
733 MessageHandler(stream)
734}
735
736#[doc(hidden)]
737#[derive(Debug)]
738struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
739
740#[doc(hidden)]
741impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
742 async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
743 rtmp_context.increase_transaction_id();
744
745 let mut buffer = ByteBuffer::default();
746 buffer.encode(&AmfString::from("FCUnpublish"));
747 buffer.encode(&FcUnpublish::new(rtmp_context.get_topic_path().unwrap().clone()));
748 write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
749
750 info!("FCUnpublish got sent.");
751 Ok(())
752 }
753
754 async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
755 let message_id = rtmp_context.get_message_id().unwrap();
756
757 rtmp_context.increase_transaction_id();
758
759 let mut buffer = ByteBuffer::default();
760 buffer.encode(&AmfString::from("deleteStream"));
761 buffer.encode(&DeleteStream::new(message_id.into()));
762 write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
763
764 info!("deleteStream got sent.");
765 Ok(())
766 }
767}
768
769#[doc(hidden)]
770impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
771 fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
772 if error.kind() != ErrorKind::Other {
773 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
774 if publisher_status >= PublisherStatus::FcPublished {
775 ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
776 }
777
778 if publisher_status >= PublisherStatus::Created {
779 ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
780 }
781 }
782 }
783
784 self.0.as_mut().poll_shutdown(cx)
785 }
786}
787
788#[doc(hidden)]
789fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
790 CloseHandler(stream)
791}
792
793#[derive(Debug)]
855pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
856
857impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
858 fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
859 pin!(
860 handle_handshake(self.0.make_weak_pin())
861 .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
862 .map_err(handle_close(self.0.make_weak_pin()))
863 ).poll_handle(cx, rtmp_context)
864 }
865}
866
867impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
868 fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
869 Self(stream)
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use uuid::Uuid;
876 use sheave_core::{
877 handlers::VecStream,
878 messages::PlayMode
879 };
880 use super::*;
881
882 #[tokio::test]
883 async fn ok_handshake_got_handled() {
884 let mut stream = pin!(VecStream::default());
885 let mut rtmp_context = RtmpContext::default();
886 rtmp_context.set_signed(true);
887
888 handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await.unwrap();
889
890 let sent_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
891 let mut sent_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
892 assert_eq!(EncryptionAlgorithm::NotEncrypted, sent_encryption_algorithm);
893 assert!(sent_client_handshake.did_digest_match(EncryptionAlgorithm::NotEncrypted, Handshake::CLIENT_KEY));
894
895 let mut stream = pin!(VecStream::default());
896 let received_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
897 let mut received_server_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
898 received_server_handshake.imprint_digest(received_encryption_algorithm, Handshake::SERVER_KEY);
899 let mut server_response_key: Vec<u8> = Vec::new();
900 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
901 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
902 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_response_key);
903 write_encryption_algorithm(stream.as_mut(), received_encryption_algorithm).await.unwrap();
904 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
905 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
906 assert!(handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await.is_ok());
907
908 let sent_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
909 let mut client_response_key: Vec<u8> = Vec::new();
910 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
911 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
912 assert!(sent_server_handshake.did_signature_match(sent_encryption_algorithm, &client_response_key))
913 }
914
915 #[tokio::test]
916 async fn ok_publisher_sequence() {
917 let mut stream = pin!(VecStream::default());
918 let mut rtmp_context = RtmpContext::default();
919 rtmp_context.set_tc_url("");
920 rtmp_context.set_app("");
921 rtmp_context.set_client_type(ClientType::Publisher);
922
923 handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
924 let mut stream = pin!(VecStream::default());
925 let mut buffer = ByteBuffer::default();
926 buffer.encode(
927 &ConnectResult::new(
928 object!(
929 "fmsVer" => AmfString::from("FMS/5,0,17"),
930 "capabilities" => Number::new(31f64)
931 ),
932 object!(
933 "level" => AmfString::from("status"),
934 "code" => AmfString::from("NetConnection.Connect.Success"),
935 "description" => AmfString::from("Connection succeeded."),
936 "objectEncoding" => Number::from(0)
937 )
938 )
939 );
940 assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
941 assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
942
943 rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
944 let mut stream = pin!(VecStream::default());
945 handle_message(stream.as_mut()).write_release_stream_request(&mut rtmp_context).await.unwrap();
946 let mut buffer = ByteBuffer::default();
947 buffer.encode(&ReleaseStreamResult);
948 assert!(handle_message(stream.as_mut()).handle_release_stream_response(&mut rtmp_context, buffer).await.is_ok());
949 assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
950
951 handle_message(stream.as_mut()).write_fc_publish_request(&mut rtmp_context).await.unwrap();
952 let mut stream = pin!(VecStream::default());
953 let mut buffer = ByteBuffer::default();
954 buffer.encode(&OnFcPublish);
955 assert!(handle_message(stream.as_mut()).handle_fc_publish_response(&mut rtmp_context, buffer).await.is_ok());
956 assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
957
958 handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
959 let mut stream = pin!(VecStream::default());
960 let mut buffer = ByteBuffer::default();
961 buffer.encode(&CreateStreamResult::new(0.into()));
962 assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
963 assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
964
965 handle_message(stream.as_mut()).write_publish_request(&mut rtmp_context).await.unwrap();
966 let message_id = rtmp_context.get_message_id().unwrap();
967 let mut stream = pin!(VecStream::default());
968 let mut buffer = ByteBuffer::default();
969 buffer.encode(&StreamBegin::new(message_id));
970 assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
971 assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
972
973 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
974 let mut buffer = ByteBuffer::default();
975 buffer.encode(
976 &OnStatus::new(
977 object!(
978 "level" => AmfString::from("status"),
979 "code" => AmfString::from("NetStream.Publish.Start"),
980 "description" => AmfString::new(format!("{topic_path} is now published")),
981 "details" => topic_path
982 )
983 )
984 );
985 assert!(handle_message(stream.as_mut()).handle_publish_response(&mut rtmp_context, buffer).await.is_ok());
986 assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
987 }
988
989 #[tokio::test]
990 async fn ok_subscriber_sequence() {
991 let mut stream = pin!(VecStream::default());
992 let mut rtmp_context = RtmpContext::default();
993 rtmp_context.set_tc_url("");
994 rtmp_context.set_app("");
995 rtmp_context.set_client_type(ClientType::Subscriber);
996
997 handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
998 let mut buffer = ByteBuffer::default();
999 buffer.encode(
1000 &ConnectResult::new(
1001 object!(
1002 "fmsVer" => AmfString::from("FMS/5,0,17"),
1003 "capabilities" => Number::from(31)
1004 ),
1005 object!(
1006 "level" => AmfString::from("status"),
1007 "code" => AmfString::from("NetConnection.Connect.Success"),
1008 "description" => AmfString::from("Connection succeeded."),
1009 "objectEncoding" => Number::from(0)
1010 )
1011 )
1012 );
1013 assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
1014 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1015
1016 let mut stream = pin!(VecStream::default());
1017 handle_message(stream.as_mut()).write_window_acknowledgement_size(&mut rtmp_context).await.unwrap();
1018 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1019
1020 let mut stream = pin!(VecStream::default());
1021 handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
1022 let mut buffer = ByteBuffer::default();
1023 buffer.encode(&CreateStreamResult::new(0.into()));
1024 assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
1025 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1026
1027 rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1028 let mut stream = pin!(VecStream::default());
1029 handle_message(stream.as_mut()).write_fc_subscribe_request(&mut rtmp_context).await.unwrap();
1030 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1031
1032 rtmp_context.set_start_time(Duration::default());
1033 rtmp_context.set_play_mode(PlayMode::Both);
1034 let mut stream = pin!(VecStream::default());
1035 handle_message(stream.as_mut()).write_play_request(&mut rtmp_context).await.unwrap();
1036 let mut buffer = ByteBuffer::default();
1037 buffer.encode(&StreamBegin::new(rtmp_context.get_message_id().unwrap()));
1038 assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
1039 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1040
1041 let mut buffer = ByteBuffer::default();
1042 buffer.encode(
1043 &OnStatus::new(
1044 object!(
1045 "level" => AmfString::from("status"),
1046 "code" => AmfString::from("NetStream.Play.Start"),
1047 "description" => AmfString::from("Playing stream")
1048 )
1049 )
1050 );
1051 assert!(handle_message(stream.as_mut()).handle_play_response(&mut rtmp_context, buffer).await.is_ok());
1052 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1053
1054 rtmp_context.set_buffer_length(30000);
1055 let mut stream = pin!(VecStream::default());
1056 handle_message(stream.as_mut()).write_buffer_length(&mut rtmp_context).await.unwrap();
1057 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1058 }
1059}