1use std::{
2 future::Future,
3 io::{
4 Error as IOError,
5 ErrorKind,
6 Result as IOResult
7 },
8 pin::{
9 Pin,
10 pin
11 },
12 sync::Arc,
13 task::{
14 Context as FutureContext,
15 Poll
16 },
17 time::{
18 Duration,
19 Instant
20 }
21};
22use log::{
23 error,
24 info
25};
26use futures::ready;
27use tokio::io::{
28 AsyncRead,
29 AsyncWrite
30};
31use sheave_core::{
32 ByteBuffer,
33 Decoder,
34 Encoder,
35 U24_MAX,
36 flv::tags::*,
37 handlers::{
38 AsyncHandler,
39 AsyncHandlerExt,
40 ClientType,
41 ErrorHandler,
42 HandlerConstructor,
43 LastChunk,
44 PublisherStatus,
45 RtmpContext,
46 StreamWrapper,
47 SubscriberStatus,
48 inconsistent_sha,
49 stream_got_exhausted
50 },
51 handshake::{
52 EncryptionAlgorithm,
53 Handshake,
54 Version
55 },
56 messages::{
57 Channel,
59 ChunkData,
60 Connect,
61 ConnectResult,
62 CreateStream,
63 CreateStreamResult,
64 UserControl,
65 EventType,
66 OnStatus,
67 Audio,
68 Video,
69 SetDataFrame,
70 Acknowledgement,
71 amf::v0::{
72 Number,
73 AmfString,
74 Object
75 },
76 headers::MessageType,
77
78 ReleaseStream,
80 ReleaseStreamResult,
81 FcPublish,
82 OnFcPublish,
83 StreamBegin,
84 Publish,
85 FcUnpublish,
86 DeleteStream,
87
88 WindowAcknowledgementSize,
90 FcSubscribe,
91 SetBufferLength,
92 Play,
93 amf::v0::Boolean
94 },
95 net::RtmpReadExt,
96 object,
97 readers::*,
98 writers::*
99};
100use super::{
101 error_response,
102 middlewares::write_acknowledgement
103};
104
105#[doc(hidden)]
106#[derive(Debug)]
107struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
108
109#[doc(hidden)]
110impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
111 async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
112 let encryption_algorithm = EncryptionAlgorithm::default();
113
114 let version = if rtmp_context.is_signed() {
115 Version::LATEST_CLIENT
116 } else {
117 Version::UNSIGNED
118 };
119 let mut client_request = Handshake::new(Instant::now().elapsed(), version);
120 if rtmp_context.is_signed() {
121 client_request.imprint_digest(encryption_algorithm, Handshake::CLIENT_KEY);
122 }
123
124 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
125 write_handshake(self.0.as_mut(), &client_request).await?;
126
127 rtmp_context.set_encryption_algorithm(encryption_algorithm);
128 rtmp_context.set_client_handshake(client_request);
129
130 info!("First handshake got handled.");
131 Ok(())
132 }
133
134 async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
135 let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
136 let mut server_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
137 let server_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
138
139 if !rtmp_context.is_signed() {
140 write_handshake(self.0.as_mut(), &server_request).await?;
141
142 rtmp_context.set_server_handshake(server_request);
143 rtmp_context.set_client_handshake(server_response);
144
145 } else if !server_request.did_digest_match(encryption_algorithm, Handshake::SERVER_KEY) {
146 error!("Invalid SHA digest/signature: {:x?}", server_request.get_digest(encryption_algorithm));
147 return Err(inconsistent_sha(server_response.get_digest(encryption_algorithm).to_vec()))
148 } else {
149 let mut server_response_key: Vec<u8> = Vec::new();
150 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152
153 if !server_response.did_signature_match(encryption_algorithm, &server_response_key) {
154 error!("Invalid SHA digest/signature: {:x?}", server_response.get_signature());
155 return Err(inconsistent_sha(server_response.get_signature().to_vec()))
156 } else {
157 let mut client_response_key: Vec<u8> = Vec::new();
158 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
159 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
160 server_request.imprint_signature(encryption_algorithm, &client_response_key);
161 write_handshake(self.0.as_mut(), &server_request).await?;
162
163 rtmp_context.set_server_handshake(server_request);
164 rtmp_context.set_client_handshake(server_response);
165 }
166 }
167
168 info!("Second handshake got handled.");
169 Ok(())
170 }
171}
172
173#[doc(hidden)]
174impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
175 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
176 ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
177 pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
178 }
179}
180
181#[doc(hidden)]
182fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
183 HandshakeHandler(stream)
184}
185
186#[doc(hidden)]
187#[derive(Debug)]
188struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
189
190#[doc(hidden)]
191impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
192 async fn write_connect_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
193 use ClientType::*;
194
195 let client_type = rtmp_context.get_client_type().unwrap();
196
197 rtmp_context.increase_transaction_id();
198
199 let command_object = match client_type {
200 Publisher => object!(
201 "app" => rtmp_context.get_app().unwrap().clone(),
202 "type" => AmfString::from("nonprivate"),
203 "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
204 "tcUrl" => rtmp_context.get_tc_url().unwrap().clone()
205 ),
206 Subscriber => object!(
207 "app" => rtmp_context.get_app().unwrap().clone(),
208 "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
209 "tcUrl" => rtmp_context.get_tc_url().unwrap().clone(),
210 "fpad" => Boolean::new(0),
211 "capabilities" => Number::from(15u8),
212 "audioCodecs" => Number::from(4071u16),
213 "videoCodecs" => Number::from(252u8),
214 "videoFunction" => Number::from(1u8)
215 )
216 };
217 let connect = Connect::new(command_object);
218 let mut buffer = ByteBuffer::default();
219 buffer.encode(&AmfString::from("connect"));
220 buffer.encode(&rtmp_context.get_transaction_id());
221 buffer.encode(&connect);
222 write_chunk(self.0.as_mut(), rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
223
224 rtmp_context.set_command_object(connect.into());
225
226 info!("connect got sent.");
227 Ok(())
228 }
229
230 async fn write_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
231 let mut buffer = ByteBuffer::default();
232 buffer.encode(&rtmp_context.get_window_acknowledgement_size());
233 write_chunk(self.0.as_mut(), rtmp_context, WindowAcknowledgementSize::CHANNEL.into(), Duration::default(), WindowAcknowledgementSize::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
234
235 rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
236
237 info!("Window Acknowledgement Size got sent.");
238 Ok(())
239 }
240
241 async fn write_release_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
242 rtmp_context.increase_transaction_id();
243
244 let mut buffer = ByteBuffer::default();
245 buffer.encode(&AmfString::from("releaseStream"));
246 buffer.encode(&rtmp_context.get_transaction_id());
247 buffer.encode(&ReleaseStream::new(rtmp_context.get_topic_id().unwrap().clone()));
248 write_chunk(self.0.as_mut(), rtmp_context, ReleaseStream::CHANNEL.into(), Duration::default(), ReleaseStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
249
250 info!("releaseStream got sent.");
251 Ok(())
252 }
253
254 async fn write_fc_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
255 rtmp_context.increase_transaction_id();
256
257 let mut buffer = ByteBuffer::default();
258 buffer.encode(&AmfString::from("FCPublish"));
259 buffer.encode(&rtmp_context.get_transaction_id());
260 buffer.encode(&FcPublish::new(rtmp_context.get_topic_id().unwrap().clone()));
261 write_chunk(self.0.as_mut(), rtmp_context, FcPublish::CHANNEL.into(), Duration::default(), FcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
262
263 info!("FCPublish got sent.");
264 Ok(())
265 }
266
267 async fn write_create_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
268 rtmp_context.increase_transaction_id();
269
270 let mut buffer = ByteBuffer::default();
271 buffer.encode(&AmfString::from("createStream"));
272 buffer.encode(&rtmp_context.get_transaction_id());
273 buffer.encode(&CreateStream);
274 write_chunk(self.0.as_mut(), rtmp_context, CreateStream::CHANNEL.into(), Duration::default(), CreateStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
275
276 info!("createStream got sent.");
277 Ok(())
278 }
279
280 async fn write_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
281 rtmp_context.increase_transaction_id();
282
283 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
284
285 let mut buffer = ByteBuffer::default();
286 buffer.encode(&FcSubscribe::new(topic_id));
287 write_chunk(self.0.as_mut(), rtmp_context, FcSubscribe::CHANNEL.into(), Duration::default(), FcSubscribe::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
288
289 rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
290
291 info!("FCSubscribe got sent.");
292 Ok(())
293 }
294
295 async fn write_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
296 rtmp_context.increase_transaction_id();
297
298 let publishing_name = rtmp_context.get_topic_id().unwrap().clone();
299 let publishing_type = "live";
300 let mut buffer = ByteBuffer::default();
301 buffer.encode(&AmfString::from("publish"));
302 buffer.encode(&rtmp_context.get_transaction_id());
303 buffer.encode(&Publish::new(publishing_name.clone(), publishing_type.into()));
304 let message_id = rtmp_context.get_message_id().unwrap();
305 write_chunk(self.0.as_mut(), rtmp_context, Publish::CHANNEL.into(), Duration::default(), Publish::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
306
307 info!("publish got sent.");
308 Ok(())
309 }
310
311 async fn write_play_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
312 rtmp_context.increase_transaction_id();
313
314 let stream_name = rtmp_context.get_topic_id().unwrap().clone();
315 let play_mode = rtmp_context.get_play_mode().unwrap();
316 let start_time: Number = if let Some(start_time) = rtmp_context.get_start_time() {
317 Number::new(start_time.as_millis() as u64 as f64)
318 } else {
319 Number::new((1000 * play_mode as i64) as f64)
320 };
321
322 let mut buffer = ByteBuffer::default();
323 buffer.encode(&AmfString::from("play"));
324 buffer.encode(&rtmp_context.get_transaction_id());
325 buffer.encode(&Play::new(stream_name.clone(), start_time));
326 write_chunk(self.0.as_mut(), rtmp_context, Play::CHANNEL.into(), Duration::default(), Play::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
327
328 info!("play got sent.");
329 Ok(())
330 }
331
332 async fn write_buffer_length(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
333 let message_id = rtmp_context.get_message_id().unwrap();
334 let mut buffer = ByteBuffer::default();
335 buffer.put_u16_be(SetBufferLength::EVENT_TYPE.into());
336 buffer.encode(&SetBufferLength::new(message_id, rtmp_context.get_buffer_length()));
337 write_chunk(self.0.as_mut(), rtmp_context, SetBufferLength::CHANNEL.into(), Duration::default(), SetBufferLength::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
338
339 rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
340
341 info!("Buffer Length got sent.");
342 Ok(())
343 }
344
345 async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
346 for next in rtmp_context.get_topic_mut().unwrap() {
347 let flv_tag = next?;
348 let message_id = rtmp_context.get_message_id().unwrap();
349
350 let channel;
351 let message_type;
352 match flv_tag.get_tag_type() {
353 TagType::Audio => {
354 channel = Audio::CHANNEL;
355 message_type = Audio::MESSAGE_TYPE;
356 },
357 TagType::Video => {
358 channel = Video::CHANNEL;
359 message_type = Video::MESSAGE_TYPE;
360 },
361 TagType::ScriptData => {
362 channel = SetDataFrame::CHANNEL;
363 message_type = SetDataFrame::MESSAGE_TYPE;
364 },
365 TagType::Other => {
366 channel = Channel::Other;
367 message_type = MessageType::Other;
368 }
369 }
370 let timestamp = flv_tag.get_timestamp();
371 let data: Vec<u8> = if let MessageType::Data = message_type {
372 let mut buffer = ByteBuffer::default();
373 buffer.encode(&AmfString::from("@setDataFrame"));
374 buffer.put_bytes(flv_tag.get_data());
375 buffer.into()
376 } else {
377 flv_tag.get_data().to_vec()
378 };
379 write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
380
381 info!("FLV chunk got sent.");
382 return Ok(())
383 }
384
385 info!("FLV data became empty.");
387 Err(stream_got_exhausted())
388 }
389
390 async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
391 Decoder::<Acknowledgement>::decode(&mut buffer)?;
392
393 info!("Acknowledgement got handled.");
394 Ok(())
395 }
396
397 async fn handle_stream_begin(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
398 use ClientType::*;
399
400 let client_type = rtmp_context.get_client_type().unwrap();
401
402 Decoder::<StreamBegin>::decode(&mut buffer)?;
403
404 match client_type {
405 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
406 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
407 }
408
409 info!("Stream Begin got handled.");
410 Ok(())
411 }
412
413 async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
414 use EventType::*;
415
416 let event_type: EventType = buffer.get_u16_be()?.into();
417 match event_type {
418 StreamBegin => self.handle_stream_begin(rtmp_context, buffer).await,
419 _ => unreachable!("Publisher gets just a Stream Begin event.")
420 }
421 }
422
423 async fn handle_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object) -> IOResult<()> {
424 let error = error_response(information.clone());
425 rtmp_context.set_information(information);
426
427 error!("{error}");
428 Err(error)
429 }
430
431 async fn handle_connect_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
432 use ClientType::*;
433
434 let response: ConnectResult = buffer.decode()?;
435 let (properties, information): (Object, Object) = response.into();
436
437 rtmp_context.set_properties(properties);
438 rtmp_context.set_information(information);
439
440 match rtmp_context.get_client_type().unwrap() {
441 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
442 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
443 }
444
445 info!("connect result got handled.");
446 Ok(())
447 }
448
449 async fn handle_release_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
450 Decoder::<ReleaseStreamResult>::decode(&mut buffer)?;
451
452 rtmp_context.set_publisher_status(PublisherStatus::Released);
453
454 info!("releaseStream result got handled.");
455 Ok(())
456 }
457
458 async fn handle_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
459 Decoder::<OnFcPublish>::decode(&mut buffer)?;
460
461 rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
462
463 info!("onFCPublish got handled.");
464 Ok(())
465 }
466
467 async fn handle_create_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
468 use ClientType::*;
469
470 let client_type = rtmp_context.get_client_type().unwrap();
471
472 let response: CreateStreamResult = buffer.decode()?;
473 let message_id: u32 = response.into();
474 rtmp_context.set_message_id(message_id);
475
476 match client_type {
477 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
478 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
479 }
480
481 info!("createStream result got handled.");
482 Ok(())
483 }
484
485 async fn handle_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
486 let response: OnStatus = buffer.decode()?;
487 let information: Object = response.into();
488
489 if information.get_properties()["level"] == AmfString::from("error") {
495 return self.handle_error_response(rtmp_context, information).await
496 }
497
498 rtmp_context.set_information(information);
499
500 rtmp_context.set_publisher_status(PublisherStatus::Published);
501
502 info!("onStatus(publish) got handled.");
503 Ok(())
504 }
505
506 async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
507 Decoder::<FcUnpublish>::decode(&mut buffer)?;
508 rtmp_context.reset_topic_id();
509
510 info!("FCUnpublish got handled.");
511 Ok(())
512 }
513
514 async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
515 Decoder::<DeleteStream>::decode(&mut buffer)?;
516 rtmp_context.reset_message_id();
517
518 info!("deleteStream got handled.");
519 Ok(())
520 }
521
522 async fn handle_publisher_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
523 use PublisherStatus::*;
524
525 let command: AmfString = buffer.decode()?;
526
527 if command != "onFCPublish" {
529 Decoder::<Number>::decode(&mut buffer)?;
531 }
532
533 if command == "FCUnpublish" {
534 return self.handle_fc_unpublish_request(rtmp_context, buffer).await
535 } else if command == "deleteStream" {
536 return self.handle_delete_stream_request(rtmp_context, buffer).await
537 } else if command == "_error" {
538 let information: Object = buffer.decode()?;
539 return self.handle_error_response(rtmp_context, information).await
540 } else {
541 }
543
544 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
545 match publisher_status {
546 Connected => self.handle_release_stream_response(rtmp_context, buffer).await,
547 Released => self.handle_fc_publish_response(rtmp_context, buffer).await,
548 FcPublished => self.handle_create_stream_response(rtmp_context, buffer).await,
549 Began => self.handle_publish_response(rtmp_context, buffer).await,
550 _ => Ok(())
551 }
552 } else {
553 self.handle_connect_response(rtmp_context, buffer).await
554 }
555 }
556
557 async fn handle_play_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
558 let response: OnStatus = buffer.decode()?;
559 let information: Object = response.into();
560
561 if information.get_properties()["level"] == AmfString::from("error") {
567 return self.handle_error_response(rtmp_context, information).await
568 }
569
570 rtmp_context.set_information(information);
571
572 rtmp_context.set_subscriber_status(SubscriberStatus::Played);
573
574 info!("onStatus(play) got handled.");
575 Ok(())
576 }
577
578 async fn handle_subscriber_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
579 use SubscriberStatus::*;
580
581 let command: AmfString = buffer.decode()?;
582 Decoder::<Number>::decode(&mut buffer)?;
583
584 if command == "_error" {
585 let information: Object = buffer.decode()?;
586 return self.handle_error_response(rtmp_context, information).await
587 }
588
589 if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
590 match subscriber_status {
591 WindowAcknowledgementSizeGotSent => self.handle_create_stream_response(rtmp_context, buffer).await,
592 Began => self.handle_play_response(rtmp_context, buffer).await,
593 _ => return Ok(())
594 }
595 } else {
596 self.handle_connect_response(rtmp_context, buffer).await
597 }
598 }
599
600 async fn handle_command_response(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
601 use ClientType::*;
602
603 match rtmp_context.get_client_type().unwrap() {
604 Publisher => self.handle_publisher_response(rtmp_context, buffer).await,
605 Subscriber => self.handle_subscriber_response(rtmp_context, buffer).await
606 }
607 }
608
609 async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
610 let topic = rtmp_context.get_topic().unwrap();
611
612 let tag_type = match message_type {
613 MessageType::Audio => TagType::Audio,
614 MessageType::Video => TagType::Video,
615 MessageType::Data => TagType::ScriptData,
616 _ => TagType::Other
617 };
618
619 if let TagType::ScriptData = tag_type {
620 Decoder::<AmfString>::decode(&mut buffer)?;
621 }
622
623 let data: Vec<u8> = buffer.into();
624 let flv_tag = FlvTag::new(tag_type, timestamp, data);
625 topic.append_flv_tag(flv_tag)?;
626
627 info!("FLV chunk got handled.");
628 Ok(())
629 }
630}
631
632#[doc(hidden)]
633impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
634 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
635 use MessageType::*;
636
637 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
638 match publisher_status {
639 PublisherStatus::Connected => ready!(pin!(self.write_release_stream_request(rtmp_context)).poll(cx))?,
640 PublisherStatus::Released => ready!(pin!(self.write_fc_publish_request(rtmp_context)).poll(cx))?,
641 PublisherStatus::FcPublished => ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?,
642 PublisherStatus::Created => ready!(pin!(self.write_publish_request(rtmp_context)).poll(cx))?,
643 PublisherStatus::Published => ready!(pin!(self.write_flv(rtmp_context)).poll(cx))?,
644 _ => {}
645 }
646 } else if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
647 match subscriber_status {
648 SubscriberStatus::Connected => {
649 ready!(pin!(self.write_window_acknowledgement_size(rtmp_context)).poll(cx))?;
650 ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?
651 },
652 SubscriberStatus::Created => {
653 ready!(pin!(self.write_fc_subscribe_request(rtmp_context)).poll(cx))?;
654 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
655 },
656 SubscriberStatus::AdditionalCommandGotSent => {
657 ready!(pin!(self.write_play_request(rtmp_context)).poll(cx))?;
658 ready!(pin!(self.write_buffer_length(rtmp_context)).poll(cx))?
659 },
660 _ => {}
661 }
662 } else {
663 ready!(pin!(self.write_connect_request(rtmp_context)).poll(cx))?;
664 }
665
666 let basic_header = if let Some(PublisherStatus::Published) = rtmp_context.get_publisher_status() {
667 ready!(pin!(read_basic_header(pin!(self.0.try_read_after(rtmp_context.get_await_duration().unwrap())))).poll(cx))?
668 } else {
669 ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?
670 };
671 let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
672 let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
673 if timestamp.as_millis() == U24_MAX as u128 {
674 let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
675 Some(extended_timestamp)
676 } else {
677 None
678 }
679 } else {
680 None
681 };
682
683 let chunk_id = basic_header.get_chunk_id();
684 if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
685 if let Some(extended_timestamp) = extended_timestamp {
686 last_received_chunk.set_timestamp(extended_timestamp);
687 } else {
688 if let Some(timestamp) = message_header.get_timestamp() {
689 last_received_chunk.set_timestamp(timestamp);
690 }
691 }
692
693 if let Some(message_length) = message_header.get_message_length() {
694 last_received_chunk.set_message_length(message_length);
695 }
696
697 if let Some(message_type) = message_header.get_message_type() {
698 last_received_chunk.set_message_type(message_type);
699 }
700
701 if let Some(message_id) = message_header.get_message_id() {
702 last_received_chunk.set_message_id(message_id);
703 }
704 } else {
705 rtmp_context.insert_received_chunk(
706 chunk_id,
707 LastChunk::new(
708 message_header.get_timestamp().unwrap(),
709 message_header.get_message_length().unwrap(),
710 message_header.get_message_type().unwrap(),
711 message_header.get_message_id().unwrap()
712 )
713 );
714 }
715
716 let message_length = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length();
717 let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
718 let data = ready!(pin!(read_chunk_data(pin!(self.0.await_until_receiving()), receiving_chunk_size, message_length)).poll(cx))?;
719 let buffer: ByteBuffer = data.into();
720
721 let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
722 match message_type {
723 Acknowledgement => pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx),
724 UserControl => pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx),
725 Command => pin!(self.handle_command_response(rtmp_context, buffer)).poll(cx),
726 Audio | Video | Data => {
727 let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
728 pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx)
729 },
730 other => unimplemented!("Undefined Message: {other:?}")
731 }
732 }
733}
734
735#[doc(hidden)]
736fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
737 MessageHandler(stream)
738}
739
740#[doc(hidden)]
741#[derive(Debug)]
742struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
743
744#[doc(hidden)]
745impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
746 async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
747 rtmp_context.increase_transaction_id();
748
749 let mut buffer = ByteBuffer::default();
750 buffer.encode(&AmfString::from("FCUnpublish"));
751 buffer.encode(&FcUnpublish::new(rtmp_context.get_topic_id().unwrap().clone()));
752 write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
753
754 info!("FCUnpublish got sent.");
755 Ok(())
756 }
757
758 async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
759 let message_id = rtmp_context.get_message_id().unwrap();
760
761 rtmp_context.increase_transaction_id();
762
763 let mut buffer = ByteBuffer::default();
764 buffer.encode(&AmfString::from("deleteStream"));
765 buffer.encode(&DeleteStream::new(message_id.into()));
766 write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
767
768 info!("deleteStream got sent.");
769 Ok(())
770 }
771}
772
773#[doc(hidden)]
774impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
775 fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
776 if error.kind() != ErrorKind::Other {
777 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
778 if publisher_status >= PublisherStatus::FcPublished {
779 ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
780 }
781
782 if publisher_status >= PublisherStatus::Created {
783 ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
784 }
785 }
786 }
787
788 self.0.as_mut().poll_shutdown(cx)
789 }
790}
791
792#[doc(hidden)]
793fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
794 CloseHandler(stream)
795}
796
797#[derive(Debug)]
859pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
860
861impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
862 fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
863 pin!(
864 handle_handshake(self.0.make_weak_pin())
865 .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
866 .map_err(handle_close(self.0.make_weak_pin()))
867 ).poll_handle(cx, rtmp_context)
868 }
869}
870
871impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
872 fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
873 Self(stream)
874 }
875}
876
877#[cfg(test)]
878mod tests {
879 use uuid::Uuid;
880 use sheave_core::{
881 handlers::VecStream,
882 messages::PlayMode
883 };
884 use super::*;
885
886 #[tokio::test]
887 async fn ok_handshake_got_handled() {
888 let mut stream = pin!(VecStream::default());
889 let mut rtmp_context = RtmpContext::default();
890 rtmp_context.set_signed(true);
891
892 handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await.unwrap();
893
894 let sent_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
895 let mut sent_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
896 assert_eq!(EncryptionAlgorithm::NotEncrypted, sent_encryption_algorithm);
897 assert!(sent_client_handshake.did_digest_match(EncryptionAlgorithm::NotEncrypted, Handshake::CLIENT_KEY));
898
899 let mut stream = pin!(VecStream::default());
900 let received_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
901 let mut received_server_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
902 received_server_handshake.imprint_digest(received_encryption_algorithm, Handshake::SERVER_KEY);
903 let mut server_response_key: Vec<u8> = Vec::new();
904 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
905 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
906 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_response_key);
907 write_encryption_algorithm(stream.as_mut(), received_encryption_algorithm).await.unwrap();
908 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
909 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
910 assert!(handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await.is_ok());
911
912 let sent_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
913 let mut client_response_key: Vec<u8> = Vec::new();
914 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
915 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
916 assert!(sent_server_handshake.did_signature_match(sent_encryption_algorithm, &client_response_key))
917 }
918
919 #[tokio::test]
920 async fn ok_publisher_sequence() {
921 let mut stream = pin!(VecStream::default());
922 let mut rtmp_context = RtmpContext::default();
923 rtmp_context.set_tc_url("");
924 rtmp_context.set_app("");
925 rtmp_context.set_client_type(ClientType::Publisher);
926
927 handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
928 let mut stream = pin!(VecStream::default());
929 let mut buffer = ByteBuffer::default();
930 buffer.encode(
931 &ConnectResult::new(
932 object!(
933 "fmsVer" => AmfString::from("FMS/5,0,17"),
934 "capabilities" => Number::new(31f64)
935 ),
936 object!(
937 "level" => AmfString::from("status"),
938 "code" => AmfString::from("NetConnection.Connect.Success"),
939 "description" => AmfString::from("Connection succeeded."),
940 "objectEncoding" => Number::from(0)
941 )
942 )
943 );
944 assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
945 assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
946
947 rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
948 let mut stream = pin!(VecStream::default());
949 handle_message(stream.as_mut()).write_release_stream_request(&mut rtmp_context).await.unwrap();
950 let mut buffer = ByteBuffer::default();
951 buffer.encode(&ReleaseStreamResult);
952 assert!(handle_message(stream.as_mut()).handle_release_stream_response(&mut rtmp_context, buffer).await.is_ok());
953 assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
954
955 handle_message(stream.as_mut()).write_fc_publish_request(&mut rtmp_context).await.unwrap();
956 let mut stream = pin!(VecStream::default());
957 let mut buffer = ByteBuffer::default();
958 buffer.encode(&OnFcPublish);
959 assert!(handle_message(stream.as_mut()).handle_fc_publish_response(&mut rtmp_context, buffer).await.is_ok());
960 assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
961
962 handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
963 let mut stream = pin!(VecStream::default());
964 let mut buffer = ByteBuffer::default();
965 buffer.encode(&CreateStreamResult::new(0.into()));
966 assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
967 assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
968
969 handle_message(stream.as_mut()).write_publish_request(&mut rtmp_context).await.unwrap();
970 let message_id = rtmp_context.get_message_id().unwrap();
971 let mut stream = pin!(VecStream::default());
972 let mut buffer = ByteBuffer::default();
973 buffer.encode(&StreamBegin::new(message_id));
974 assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
975 assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
976
977 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
978 let mut buffer = ByteBuffer::default();
979 buffer.encode(
980 &OnStatus::new(
981 object!(
982 "level" => AmfString::from("status"),
983 "code" => AmfString::from("NetStream.Publish.Start"),
984 "description" => AmfString::new(format!("{topic_id} is now published")),
985 "details" => topic_id
986 )
987 )
988 );
989 assert!(handle_message(stream.as_mut()).handle_publish_response(&mut rtmp_context, buffer).await.is_ok());
990 assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
991 }
992
993 #[tokio::test]
994 async fn ok_subscriber_sequence() {
995 let mut stream = pin!(VecStream::default());
996 let mut rtmp_context = RtmpContext::default();
997 rtmp_context.set_tc_url("");
998 rtmp_context.set_app("");
999 rtmp_context.set_client_type(ClientType::Subscriber);
1000
1001 handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
1002 let mut buffer = ByteBuffer::default();
1003 buffer.encode(
1004 &ConnectResult::new(
1005 object!(
1006 "fmsVer" => AmfString::from("FMS/5,0,17"),
1007 "capabilities" => Number::from(31)
1008 ),
1009 object!(
1010 "level" => AmfString::from("status"),
1011 "code" => AmfString::from("NetConnection.Connect.Success"),
1012 "description" => AmfString::from("Connection succeeded."),
1013 "objectEncoding" => Number::from(0)
1014 )
1015 )
1016 );
1017 assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
1018 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1019
1020 let mut stream = pin!(VecStream::default());
1021 handle_message(stream.as_mut()).write_window_acknowledgement_size(&mut rtmp_context).await.unwrap();
1022 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1023
1024 let mut stream = pin!(VecStream::default());
1025 handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
1026 let mut buffer = ByteBuffer::default();
1027 buffer.encode(&CreateStreamResult::new(0.into()));
1028 assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
1029 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1030
1031 rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1032 let mut stream = pin!(VecStream::default());
1033 handle_message(stream.as_mut()).write_fc_subscribe_request(&mut rtmp_context).await.unwrap();
1034 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1035
1036 rtmp_context.set_start_time(Some(Duration::default()));
1037 rtmp_context.set_play_mode(PlayMode::Both);
1038 let mut stream = pin!(VecStream::default());
1039 handle_message(stream.as_mut()).write_play_request(&mut rtmp_context).await.unwrap();
1040 let mut buffer = ByteBuffer::default();
1041 buffer.encode(&StreamBegin::new(rtmp_context.get_message_id().unwrap()));
1042 assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
1043 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1044
1045 let mut buffer = ByteBuffer::default();
1046 buffer.encode(
1047 &OnStatus::new(
1048 object!(
1049 "level" => AmfString::from("status"),
1050 "code" => AmfString::from("NetStream.Play.Start"),
1051 "description" => AmfString::from("Playing stream")
1052 )
1053 )
1054 );
1055 assert!(handle_message(stream.as_mut()).handle_play_response(&mut rtmp_context, buffer).await.is_ok());
1056 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1057
1058 rtmp_context.set_buffer_length(30000);
1059 let mut stream = pin!(VecStream::default());
1060 handle_message(stream.as_mut()).write_buffer_length(&mut rtmp_context).await.unwrap();
1061 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1062 }
1063}