1use std::{
2 future::Future,
3 io::{
4 Error as IOError,
5 ErrorKind,
6 Result as IOResult
7 },
8 pin::{
9 Pin,
10 pin
11 },
12 sync::Arc,
13 task::{
14 Context as FutureContext,
15 Poll
16 },
17 time::{
18 Duration,
19 Instant
20 }
21};
22use log::{
23 debug,
24 error,
25 info
26};
27use futures::ready;
28use tokio::io::{
29 AsyncRead,
30 AsyncWrite
31};
32use sheave_core::{
33 ByteBuffer,
34 Decoder,
35 Encoder,
36 U24_MAX,
37 flv::tags::*,
38 handlers::{
39 AsyncHandler,
40 AsyncHandlerExt,
41 ClientType,
42 ErrorHandler,
43 HandlerConstructor,
44 LastChunk,
45 PublisherStatus,
46 RtmpContext,
47 StreamWrapper,
48 SubscriberStatus,
49 inconsistent_sha,
50 stream_got_exhausted
51 },
52 handshake::{
53 Handshake,
54 Version
55 },
56 messages::{
57 Channel,
59 ChunkData,
60 CommandError,
61 Connect,
62 ConnectResult,
63 CreateStream,
64 CreateStreamResult,
65 EventType,
66 UserControl,
67 OnStatus,
68 Audio,
69 Video,
70 SetDataFrame,
71 Acknowledgement,
72 amf::v0::{
73 AmfString,
74 Number,
75 Object
76 },
77 headers::MessageType,
78
79 ReleaseStream,
81 ReleaseStreamResult,
82 FcPublish,
83 OnFcPublish,
84 StreamBegin,
85 Publish,
86 FcUnpublish,
87 DeleteStream,
88
89 WindowAcknowledgementSize,
91 FcSubscribe,
92 GetStreamLength,
93 GetStreamLengthResult,
94 SetPlaylist,
95 PlaylistReady,
96 Play,
97 SetBufferLength,
98 },
99 net::RtmpReadExt,
100 object,
101 readers::*,
102 writers::*
103};
104use super::{
105 inconsistent_app_path,
107 undistinguishable_client,
108 empty_topic_id,
109 inconsistent_topic_id,
110 middlewares::write_acknowledgement,
111
112 publish_topic,
114 provide_message_id,
115 unpublish_topic,
116 return_message_id,
117
118 subscribe_topic,
120 metadata_not_found,
121};
122
123#[doc(hidden)]
124#[derive(Debug)]
125struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
126
127#[doc(hidden)]
128impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
129 async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
130 let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
131 let mut client_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
132
133 if client_request.get_version() == Version::UNSIGNED {
134 let server_request = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
135 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
136 write_handshake(self.0.as_mut(), &server_request).await?;
137 write_handshake(self.0.as_mut(), &client_request).await?;
138
139 rtmp_context.set_encryption_algorithm(encryption_algorithm);
140 rtmp_context.set_server_handshake(server_request);
141 rtmp_context.set_client_handshake(client_request);
142 } else {
143 if !client_request.did_digest_match(encryption_algorithm, Handshake::CLIENT_KEY) {
144 error!("Invalid SHA digest/signature: {:x?}", client_request.get_digest(encryption_algorithm));
145 return Err(inconsistent_sha(client_request.get_digest(encryption_algorithm).to_vec()))
146 } else {
147 let mut server_request = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
148 server_request.imprint_digest(encryption_algorithm, Handshake::SERVER_KEY);
149 let mut server_response_key: Vec<u8> = Vec::new();
150 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152 client_request.imprint_signature(encryption_algorithm, &server_response_key);
153 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
154 write_handshake(self.0.as_mut(), &server_request).await?;
155 write_handshake(self.0.as_mut(), &client_request).await?;
156
157 rtmp_context.set_signed(true);
158 rtmp_context.set_encryption_algorithm(encryption_algorithm);
159 rtmp_context.set_server_handshake(server_request);
160 rtmp_context.set_client_handshake(client_request);
161 }
162 }
163
164 info!("First handshake got handled.");
165 Ok(())
166 }
167
168 async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
169 let client_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
170
171 if !rtmp_context.is_signed() {
172 rtmp_context.set_server_handshake(client_response);
173 } else {
174 let encryption_algorithm = rtmp_context.get_encryption_algorithm().unwrap();
175 let mut client_response_key: Vec<u8> = Vec::new();
176 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
177 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
178 let server_request = rtmp_context.get_server_handshake().unwrap();
179 if !client_response.did_signature_match(encryption_algorithm, &client_response_key) && server_request.get_signature() != client_response.get_signature() {
181 error!("Invalid SHA digest/signature: {:x?}", client_response.get_signature());
182 return Err(inconsistent_sha(client_response.get_signature().to_vec()))
183 } else {
184 debug!("Handshake version: {:?}", client_response.get_version());
185 debug!("Signature: {:x?}", client_response.get_signature());
186 rtmp_context.set_server_handshake(client_response);
187 }
188 }
189
190 info!("Second handshake got handled.");
191 Ok(())
192 }
193}
194
195#[doc(hidden)]
196impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
197 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
198 ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
199 pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
200 }
201}
202
203#[doc(hidden)]
204fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
205 HandshakeHandler(stream)
206}
207
208#[doc(hidden)]
209#[derive(Debug)]
210struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
211
212#[doc(hidden)]
213impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
214 async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
215 Decoder::<Acknowledgement>::decode(&mut buffer)?;
216
217 info!("Acknowledgement got handled.");
218 Ok(())
219 }
220
221 async fn handle_connect_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
222 let connect_request: Connect = buffer.decode()?;
223 rtmp_context.set_command_object(connect_request.into());
224
225 info!("connect got handled.");
226 Ok(())
227 }
228
229 async fn handle_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
230 let window_acknowledgement_size: WindowAcknowledgementSize = buffer.decode()?;
231 rtmp_context.set_window_acknowledgement_size(window_acknowledgement_size);
232
233 rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
239
240 info!("Window Acknowledgement Size got handled.");
241 Ok(())
242 }
243
244 async fn handle_release_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
245 let release_stream_request: ReleaseStream = buffer.decode()?;
246 rtmp_context.set_topic_id(release_stream_request.into());
247
248 info!("releaseStream got handled.");
249 Ok(())
250 }
251
252 async fn handle_fc_publish_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
253 Decoder::<FcPublish>::decode(&mut buffer)?;
254
255 info!("FCPublish got handled.");
256 Ok(())
257 }
258
259 async fn handle_create_stream_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
260 Decoder::<CreateStream>::decode(&mut buffer)?;
261
262 info!("createStream got handled.");
263 Ok(())
264 }
265
266 async fn handle_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
267 let database_url = rtmp_context.get_database_url().unwrap().clone();
268 let storage_path = rtmp_context.get_storage_path().unwrap().clone();
269 let client_addr = rtmp_context.get_client_addr().unwrap();
270 let app = rtmp_context.get_app().unwrap().clone();
271
272 let fc_subscribe_request: FcSubscribe = buffer.decode()?;
273 let topic = subscribe_topic(&database_url, &storage_path, &app, fc_subscribe_request.get_topic_id(), client_addr).await?;
279 rtmp_context.set_topic(topic);
280 rtmp_context.set_topic_id(fc_subscribe_request.into());
281
282 rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
283
284 info!("FCSubscribe got handled.");
285 Ok(())
286 }
287
288 async fn handle_stream_length_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
289 let get_stream_length_request: GetStreamLength = buffer.decode()?;
290 rtmp_context.set_topic_id(get_stream_length_request.into());
291
292 info!("getStreamLength got handled.");
293 Ok(())
294 }
295
296 async fn handle_playlist_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
297 let set_playlist_request: SetPlaylist = buffer.decode()?;
298 rtmp_context.set_playlist(set_playlist_request.into());
299
300 info!("set playlist got handled.");
301 Ok(())
302 }
303
304 async fn handle_publish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
305 let publish_request: Publish = buffer.decode()?;
306 let (publishing_name, publishing_type): (AmfString, AmfString) = publish_request.into();
307 rtmp_context.set_publishing_name(publishing_name);
308 rtmp_context.set_publishing_type(publishing_type);
309
310 info!("publish got handled.");
311 Ok(())
312 }
313
314 async fn handle_play_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
315 let play_request: Play = buffer.decode()?;
316 let (stream_name, start_time, play_mode) = play_request.into();
317 rtmp_context.set_stream_name(stream_name);
318 rtmp_context.set_start_time(start_time);
319 rtmp_context.set_play_mode(play_mode);
320
321 info!("play got handled.");
322 Ok(())
323 }
324
325 async fn handle_buffer_length(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
326 let buffer_length: SetBufferLength = buffer.decode()?;
327 rtmp_context.set_buffer_length(buffer_length.get_buffering_time());
328
329 if let Some(SubscriberStatus::Played) = rtmp_context.get_subscriber_status() {
336 rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
337 }
338
339 Ok(())
340 }
341
342 async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
343 use EventType::*;
344
345 let event_type: EventType = buffer.get_u16_be()?.into();
346 match event_type {
347 SetBufferLength => self.handle_buffer_length(rtmp_context, buffer).await,
348 _ => unimplemented!("Undefined event type: {event_type:?}")
349 }
350 }
351
352 async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
353 let database_url = rtmp_context.get_database_url().unwrap().clone();
354 let storage_path = rtmp_context.get_storage_path().unwrap().clone();
355 let client_addr = rtmp_context.get_client_addr().unwrap();
356 let app = rtmp_context.get_app().unwrap().clone();
357
358 let fc_unpublish_request: FcUnpublish = buffer.decode()?;
359 unpublish_topic(&database_url, &storage_path, &app, fc_unpublish_request.get_topic_id(), client_addr).await?;
360 rtmp_context.reset_topic_id();
361
362 info!("FCUnpublish got handled.");
363 Ok(())
364 }
365
366 async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
367 let delete_stream_request: DeleteStream = buffer.decode()?;
368 return_message_id(delete_stream_request.into());
369 rtmp_context.reset_message_id();
370
371 info!("deleteStream got handled.");
372 Ok(())
373 }
374
375 async fn handle_publisher_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
376 use PublisherStatus::*;
377
378 let command: AmfString = buffer.decode()?;
379 let transaction_id: Number = buffer.decode()?;
380 rtmp_context.set_command_name(command.clone());
381 rtmp_context.set_transaction_id(transaction_id);
382
383 if command == "FCUnpublish" {
384 return self.handle_fc_unpublish_request(rtmp_context, buffer).await
385 }
386 if command == "deleteStream" {
387 return self.handle_delete_stream_request(rtmp_context, buffer).await
388 }
389
390 match rtmp_context.get_publisher_status().unwrap() {
391 Connected => self.handle_release_stream_request(rtmp_context, buffer).await,
392 Released => self.handle_fc_publish_request(rtmp_context, buffer).await,
393 FcPublished => self.handle_create_stream_request(rtmp_context, buffer).await,
394 Created => self.handle_publish_request(rtmp_context, buffer).await,
395 _ => Ok(())
396 }
397 }
398
399 async fn handle_subscriber_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
400 use SubscriberStatus::*;
401
402 let subscriber_status = rtmp_context.get_subscriber_status().unwrap();
403
404 let command: AmfString = buffer.decode()?;
405 let transaction_id: Number = buffer.decode()?;
406 rtmp_context.set_command_name(command.clone());
407 rtmp_context.set_transaction_id(transaction_id);
408
409 if subscriber_status == FcSubscribed {
410 if command == "getStreamLength" {
412 return self.handle_stream_length_request(rtmp_context, buffer).await
413 }
414 if command == "set_playlist" {
416 return self.handle_playlist_request(rtmp_context, buffer).await
417 }
418 }
419
420 match subscriber_status {
421 Connected => Ok(()),
422 WindowAcknowledgementSizeGotSent => self.handle_create_stream_request(rtmp_context, buffer).await,
424 Created => self.handle_fc_subscribe_request(rtmp_context, buffer).await,
425 AdditionalCommandGotSent => self.handle_play_request(rtmp_context, buffer).await,
426 _ => Ok(())
427 }
428 }
429
430 async fn handle_command_request(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
431 use ClientType::*;
432
433 if let Some(client_type) = rtmp_context.get_client_type() {
434 match client_type {
435 Publisher => self.handle_publisher_request(rtmp_context, buffer).await,
436 Subscriber => self.handle_subscriber_request(rtmp_context, buffer).await
437 }
438 } else {
439 self.handle_connect_request(rtmp_context, buffer).await
440 }
441 }
442
443 async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
444 let topic = rtmp_context.get_topic().unwrap();
445
446 let tag_type = match message_type {
447 MessageType::Audio => TagType::Audio,
448 MessageType::Video => TagType::Video,
449 MessageType::Data => TagType::ScriptData,
450 _ => TagType::Other
451 };
452
453 if let TagType::ScriptData = tag_type {
454 Decoder::<AmfString>::decode(&mut buffer)?;
456 }
457
458 let data: Vec<u8> = buffer.into();
459 let flv_tag = FlvTag::new(tag_type, timestamp, data);
460 topic.append_flv_tag(flv_tag)?;
461
462 info!("FLV chunk got handled.");
463 Ok(())
464 }
465
466 async fn write_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
467 let mut buffer = ByteBuffer::default();
468 buffer.encode(&AmfString::from("_error"));
469 buffer.encode(&rtmp_context.get_transaction_id());
470 buffer.encode(&CommandError::new(information.clone()));
471 write_chunk(self.0.as_mut(), rtmp_context, CommandError::CHANNEL.into(), Duration::default(), CommandError::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
472
473 rtmp_context.set_information(information);
474
475 error!("{error}");
476 return Err(error)
477 }
478
479 async fn write_connect_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
480 use ClientType::*;
481
482 let command_object = rtmp_context.get_command_object().unwrap().clone();
483
484 let client_type = if command_object.get_properties().get("type").is_some() {
485 Publisher
486 } else if command_object.get_properties().get("fpad").is_some() {
487 Subscriber
488 } else {
489 let information = object!(
490 "level" => AmfString::from("error"),
491 "code" => AmfString::from("NetConnection.Connect.UndistinguishableClient"),
492 "description" => AmfString::from("Server couldn't distinguish you are either publisher or subscriber.")
493 );
494 return self.write_error_response(rtmp_context, information, undistinguishable_client()).await
495 };
496
497 let app = rtmp_context.get_app().unwrap().clone();
498 let requested_app: &AmfString = (&command_object.get_properties()["app"]).into();
499 if *requested_app != app {
500 let information = object!(
501 "level" => AmfString::from("error"),
502 "code" => AmfString::from("NetConnection.Connect.InconsistentAppPath"),
503 "description" => AmfString::new(format!("Requested app path is inconsistent. expected: {}, actual: {}", app, requested_app))
504 );
505 return self.write_error_response(rtmp_context, information, inconsistent_app_path(app, requested_app.clone())).await
506 }
507
508 let properties = object!(
509 "fmsVer" => AmfString::from("FMS/5,0,17"),
510 "capabilities" => Number::from(31)
511 );
512 let information = object!(
513 "level" => AmfString::from("status"),
514 "code" => AmfString::from("NetConnection.Connect.Success"),
515 "description" => AmfString::from("Connection succeeded."),
516 "objectEncoding" => Number::from(0)
517 );
518 let mut buffer = ByteBuffer::default();
519 buffer.encode(&AmfString::from("_result"));
520 buffer.encode(&rtmp_context.get_transaction_id());
521 buffer.encode(&ConnectResult::new(properties.clone(), information.clone()));
522 write_chunk(self.0.as_mut(), rtmp_context, ConnectResult::CHANNEL.into(), Duration::default(), ConnectResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
523
524 rtmp_context.set_client_type(client_type);
525 rtmp_context.set_properties(properties);
526 rtmp_context.set_information(information);
527
528 match client_type {
529 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
530 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
531 }
532
533 info!("connect result got sent.");
534 Ok(())
535 }
536
537 async fn write_release_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
538 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
539
540 if topic_id.is_empty() {
541 let information = object!(
542 "level" => AmfString::from("error"),
543 "code" => AmfString::from("NetConnection.ReleaseStream.EmptyTopicPath"),
544 "description" => AmfString::from("The topic path must not be empty.")
545 );
546 return self.write_error_response(rtmp_context, information, empty_topic_id()).await
547 }
548
549 let database_url = rtmp_context.get_database_url().unwrap().clone();
550 let storage_path = rtmp_context.get_storage_path().unwrap().clone();
551 let client_addr = rtmp_context.get_client_addr().unwrap();
552 let app = rtmp_context.get_app().unwrap().clone();
553
554 let topic = match publish_topic(&database_url, &storage_path, &app, &topic_id, client_addr).await {
555 Ok(topic) => topic,
556 Err(e) => {
557 let information = object!(
558 "level" => AmfString::from("error"),
559 "code" => AmfString::from("NetConnection.ReleaseStream.StreamIsUnpublished"),
560 "description" => AmfString::new(format!("A stream of {topic_id} is unpublished."))
561 );
562 return self.write_error_response(rtmp_context, information, e).await
563 }
564 };
565
566 let mut buffer = ByteBuffer::default();
567 buffer.encode(&AmfString::from("_result"));
568 buffer.encode(&rtmp_context.get_transaction_id());
569 buffer.encode(&ReleaseStreamResult);
570 write_chunk(self.0.as_mut(), rtmp_context, ReleaseStreamResult::CHANNEL.into(), Duration::default(), ReleaseStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
571
572 rtmp_context.set_topic(topic);
573
574 rtmp_context.set_publisher_status(PublisherStatus::Released);
575
576 info!("releaseStream result got sent.");
577 Ok(())
578 }
579
580 async fn write_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
581 let mut buffer = ByteBuffer::default();
582 buffer.encode(&AmfString::from("onFCPublish"));
583 buffer.encode(&OnFcPublish);
584 write_chunk(self.0.as_mut(), rtmp_context, OnFcPublish::CHANNEL.into(), Duration::default(), OnFcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
585
586 rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
587
588 info!("onFCPublish got sent.");
589 Ok(())
590 }
591
592 async fn write_create_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
593 use ClientType::*;
594
595 let client_type = rtmp_context.get_client_type().unwrap();
596
597 let message_id = provide_message_id();
598 let mut buffer = ByteBuffer::default();
599 buffer.encode(&AmfString::from("_result"));
600 buffer.encode(&rtmp_context.get_transaction_id());
601 buffer.encode(&CreateStreamResult::new(message_id.into()));
602 write_chunk(self.0.as_mut(), rtmp_context, CreateStreamResult::CHANNEL.into(), Duration::default(), CreateStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
603
604 rtmp_context.set_message_id(message_id);
605
606 match client_type {
607 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
608 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
609 }
610
611 info!("createStream result got sent.");
612 Ok(())
613 }
614
615 async fn write_stream_length_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
616 let transaction_id = rtmp_context.get_transaction_id();
617 let topic = rtmp_context.get_topic_mut().unwrap();
618
619 for result in topic {
620 let flv_tag = result?;
621
622 if flv_tag.get_tag_type() == TagType::ScriptData {
623 let mut buffer = ByteBuffer::default();
624 buffer.put_bytes(flv_tag.get_data());
625 let script_data: ScriptDataTag = buffer.decode()?;
626
627 if *script_data.get_name() == "onMetaData" {
628 let duration: &Number = (&script_data.get_value().get_properties()["duration"]).into();
629 let mut buffer = ByteBuffer::default();
630 buffer.encode(&AmfString::from("_result"));
631 buffer.encode(&transaction_id);
632 buffer.encode(&GetStreamLengthResult::new(*duration));
633 write_chunk(self.0.as_mut(), rtmp_context, GetStreamLengthResult::CHANNEL.into(), Duration::default(), GetStreamLengthResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
634
635 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
636
637 info!("getStreamLength result got sent.");
638 return Ok(())
639 }
640 }
641 }
642
643 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
644 let information = object!(
645 "level" => AmfString::from("error"),
646 "code" => AmfString::from("NetConnection.GetStreamLength.MetadataNotFound"),
647 "description" => AmfString::new(format!("Metadata didn't find in specified topic ID: {topic_id}"))
648 );
649 self.write_error_response(rtmp_context, information, metadata_not_found(topic_id)).await
650 }
651
652 async fn write_playlist_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
653 let mut buffer = ByteBuffer::default();
654 buffer.encode(&AmfString::from("playlist_ready"));
655 buffer.encode(&PlaylistReady);
656 write_chunk(self.0.as_mut(), rtmp_context, PlaylistReady::CHANNEL.into(), Duration::default(), PlaylistReady::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
657
658 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
659
660 info!("playlist_ready got sent.");
661 Ok(())
662 }
663
664 async fn write_stream_begin(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
665 use ClientType::*;
666
667 let message_id = rtmp_context.get_message_id().unwrap();
668 let mut buffer = ByteBuffer::default();
669 buffer.put_u16_be(StreamBegin::EVENT_TYPE.into());
670 buffer.encode(&StreamBegin::new(message_id));
671 write_chunk(self.0.as_mut(), rtmp_context, StreamBegin::CHANNEL.into(), Duration::default(), StreamBegin::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
672
673 match rtmp_context.get_client_type().unwrap() {
674 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
675 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
676 }
677
678 info!("Stream Begin got sent.");
679 Ok(())
680 }
681
682 async fn write_error_status(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
683 let message_id = rtmp_context.get_message_id().unwrap();
684
685 let mut buffer = ByteBuffer::default();
686 buffer.encode(&AmfString::from("onStatus"));
687 buffer.encode(&Number::from(0));
688 buffer.encode(&OnStatus::new(information.clone()));
689 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
690
691 rtmp_context.set_information(information);
692
693 error!("{error}");
694 return Err(error)
695 }
696
697 async fn write_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
698 let message_id = rtmp_context.get_message_id().unwrap();
699 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
700 let publishing_name = rtmp_context.get_publishing_name().unwrap().clone();
701
702 if topic_id != publishing_name {
703 let information = object!(
704 "level" => AmfString::from("error"),
705 "code" => AmfString::from("NetStream.Publish.InconsistentPlaypath"),
706 "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_id}, actual: {publishing_name}"))
707 );
708 return self.write_error_status(rtmp_context, information, inconsistent_topic_id(topic_id, publishing_name)).await
709 }
710
711 let information = object!(
712 "level" => AmfString::from("status"),
713 "code" => AmfString::from("NetStream.Publish.Start"),
714 "description" => AmfString::new(format!("{publishing_name} is now published")),
715 "details" => publishing_name.clone()
716 );
717 let mut buffer = ByteBuffer::default();
718 buffer.encode(&AmfString::from("onStatus"));
719 buffer.encode(&Number::from(0));
720 buffer.encode(&OnStatus::new(information.clone()));
721 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
722
723 rtmp_context.set_information(information);
724
725 rtmp_context.set_publisher_status(PublisherStatus::Published);
726
727 info!("onStatus(publish) got sent.");
728 Ok(())
729 }
730
731 async fn write_play_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
732 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
733 let message_id = rtmp_context.get_message_id().unwrap();
734 let stream_name = rtmp_context.get_stream_name().unwrap().clone();
735
736 if topic_id != stream_name {
737 let information = object!(
738 "level" => AmfString::from("error"),
739 "code" => AmfString::from("NetStream.Play.InconsistentTopicPath"),
740 "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_id}, actual: {stream_name}"))
741 );
742 return self.write_error_status(rtmp_context, information, inconsistent_topic_id(topic_id, stream_name)).await
743 }
744
745 let information = object!(
746 "level" => AmfString::from("status"),
747 "code" => AmfString::from("NetStream.Play.Start"),
748 "description" => AmfString::from("Playing stream.")
749 );
750 let mut buffer = ByteBuffer::default();
751 buffer.encode(&AmfString::from("onStatus"));
752 buffer.encode(&Number::from(0));
753 buffer.encode(&OnStatus::new(information.clone()));
754 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
755
756 rtmp_context.set_information(information);
757
758 rtmp_context.set_subscriber_status(SubscriberStatus::Played);
759
760 info!("onStatus(play) got sent.");
761 Ok(())
762 }
763
764 async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
765 for next in rtmp_context.get_topic_mut().unwrap() {
766 let flv_tag = next?;
767 let message_id = rtmp_context.get_message_id().unwrap();
768
769 let channel;
770 let message_type;
771 match flv_tag.get_tag_type() {
772 TagType::Audio => {
773 channel = Audio::CHANNEL;
774 message_type = Audio::MESSAGE_TYPE;
775 },
776 TagType::Video => {
777 channel = Video::CHANNEL;
778 message_type = Video::MESSAGE_TYPE;
779 },
780 TagType::ScriptData => {
781 channel = SetDataFrame::CHANNEL;
782 message_type = SetDataFrame::MESSAGE_TYPE;
783 },
784 TagType::Other => {
785 channel = Channel::Other;
786 message_type = MessageType::Other;
787 }
788 }
789 let timestamp = flv_tag.get_timestamp();
790 let data: Vec<u8> = if let MessageType::Data = message_type {
791 let mut buffer = ByteBuffer::default();
792 buffer.encode(&AmfString::from("@setDataFrame"));
793 buffer.put_bytes(flv_tag.get_data());
794 buffer.into()
795 } else {
796 flv_tag.get_data().to_vec()
797 };
798 write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
799
800 info!("FLV chunk got sent.");
801 return Ok(())
802 }
803
804 info!("FLV data became empty.");
805 Err(stream_got_exhausted())
806 }
807}
808
809#[doc(hidden)]
810impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
811 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
812 use MessageType::*;
813
814 let basic_header = ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?;
815 let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
816 let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
817 if timestamp.as_millis() == U24_MAX as u128 {
818 let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
819 Some(extended_timestamp)
820 } else {
821 None
822 }
823 } else {
824 None
825 };
826
827 let chunk_id = basic_header.get_chunk_id();
828 if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
829 if let Some(extended_timestamp) = extended_timestamp {
830 last_received_chunk.set_timestamp(extended_timestamp);
831 } else {
832 message_header.get_timestamp().map(
833 |timestamp| last_received_chunk.set_timestamp(timestamp)
834 );
835 }
836 message_header.get_message_length().map(
837 |message_length| last_received_chunk.set_message_length(message_length)
838 );
839 message_header.get_message_type().map(
840 |message_type| last_received_chunk.set_message_type(message_type)
841 );
842 message_header.get_message_id().map(
843 |message_id| last_received_chunk.set_message_id(message_id)
844 );
845 } else {
846 rtmp_context.insert_received_chunk(
847 chunk_id,
848 LastChunk::new(
849 message_header.get_timestamp().unwrap(),
850 message_header.get_message_length().unwrap(),
851 message_header.get_message_type().unwrap(),
852 message_header.get_message_id().unwrap()
853 )
854 );
855 }
856 let data = ready!(
857 pin!(
858 read_chunk_data(
859 pin!(self.0.await_until_receiving()),
860 rtmp_context.get_receiving_chunk_size(),
861 rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length()
862 )
863 ).poll(cx)
864 )?;
865 let buffer: ByteBuffer = data.into();
866
867 let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
868 match message_type {
869 Acknowledgement => ready!(pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx))?,
870 UserControl => ready!(pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx))?,
871 WindowAcknowledgementSize => ready!(pin!(self.handle_window_acknowledgement_size(rtmp_context, buffer)).poll(cx))?,
872 Audio | Video | Data => {
873 let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
874 ready!(pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx))?
875 },
876 Command => ready!(pin!(self.handle_command_request(rtmp_context, buffer)).poll(cx))?,
877 other => unimplemented!("Undefined Message: {other:?}")
878 }
879
880 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
881 match publisher_status {
882 PublisherStatus::Connected => pin!(self.write_release_stream_response(rtmp_context)).poll(cx),
883 PublisherStatus::Released => pin!(self.write_fc_publish_response(rtmp_context)).poll(cx),
884 PublisherStatus::FcPublished => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
885 PublisherStatus::Created => {
886 ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
887 pin!(self.write_publish_response(rtmp_context)).poll(cx)
888 },
889 _ => {
890 Poll::Ready(Ok(()))
892 }
893 }
894 } else if let Some(mut subscriber_status) = rtmp_context.get_subscriber_status() {
895 if subscriber_status == SubscriberStatus::FcSubscribed {
896 let command = rtmp_context.get_command_name().unwrap().clone();
897
898 if command == "getStreamLength" {
899 return pin!(self.write_stream_length_response(rtmp_context)).poll(cx)
900 } else if command == "set_playlist" {
901 return pin!(self.write_playlist_response(rtmp_context)).poll(cx)
902 } else {
903 subscriber_status = SubscriberStatus::AdditionalCommandGotSent;
904 }
905 }
906
907 match subscriber_status {
908 SubscriberStatus::WindowAcknowledgementSizeGotSent => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
909 SubscriberStatus::AdditionalCommandGotSent => {
910 ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
911 pin!(self.write_play_response(rtmp_context)).poll(cx)
912 },
913 SubscriberStatus::Played => pin!(self.write_flv(rtmp_context)).poll(cx),
914 _ => {
915 Poll::Ready(Ok(()))
917 }
918 }
919 } else {
920 pin!(self.write_connect_response(rtmp_context)).poll(cx)
921 }
922 }
923}
924
925#[doc(hidden)]
926fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
927 MessageHandler(stream)
928}
929
930#[doc(hidden)]
931#[derive(Debug)]
932struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
933
934#[doc(hidden)]
935impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
936 async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
937 let topic_id = rtmp_context.get_topic_id().unwrap().clone();
938 rtmp_context.increase_transaction_id();
939
940 let mut buffer = ByteBuffer::default();
941 buffer.encode(&AmfString::from("FCUnpublish"));
942 buffer.encode(&rtmp_context.get_transaction_id());
943 buffer.encode(&FcUnpublish::new(topic_id));
944 write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
945
946 info!("FCUnpublish got sent.");
947 Ok(())
948 }
949
950 async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
951 let message_id = rtmp_context.get_message_id().unwrap();
952 rtmp_context.increase_transaction_id();
953
954 let mut buffer = ByteBuffer::default();
955 buffer.encode(&AmfString::from("deleteStream"));
956 buffer.encode(&rtmp_context.get_transaction_id());
957 buffer.encode(&DeleteStream::new(message_id.into()));
958 write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
959
960 info!("deleteStream got sent.");
961 Ok(())
962 }
963}
964
965#[doc(hidden)]
966fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
967 CloseHandler(stream)
968}
969
970#[doc(hidden)]
971impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
972 fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
973 if error.kind() != ErrorKind::Other {
974 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
975 if publisher_status >= PublisherStatus::FcPublished {
976 ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
977 }
978
979 if publisher_status >= PublisherStatus::Created {
980 ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
981 }
982 }
983 }
984
985 self.0.as_mut().poll_shutdown(cx)
986 }
987}
988
989#[derive(Debug)]
1049pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
1050
1051impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
1052 fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
1053 pin!(
1054 handle_handshake(self.0.make_weak_pin())
1055 .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
1056 .map_err(handle_close(self.0.make_weak_pin()))
1057 ).poll_handle(cx, rtmp_context)
1058 }
1059}
1060
1061impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
1062 fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
1063 Self(stream)
1064 }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use std::{
1070 env::temp_dir,
1071 fs::{
1072 copy,
1073 create_dir_all,
1074 exists,
1075 },
1076 net::{
1077 IpAddr,
1078 Ipv4Addr,
1079 SocketAddr,
1080 },
1081 path::{
1082 MAIN_SEPARATOR,
1083 PathBuf,
1084 },
1085 str::FromStr,
1086 };
1087 use dotenvy::{
1088 from_filename,
1089 var
1090 };
1091 use log::LevelFilter;
1092 use rand::fill;
1093 use sqlx::{
1094 Connection,
1095 MySqlConnection,
1096 migrate::Migrator,
1097 query
1098 };
1099 use tokio::sync::OnceCell;
1100 use uuid::Uuid;
1101 use sheave_core::{
1102 ecma_array,
1103 flv::Flv,
1104 handlers::VecStream,
1105 handshake::EncryptionAlgorithm,
1106 messages::{
1107 ChunkSize,
1108 SetPlaylist,
1109 amf::v0::Boolean
1110 }
1111 };
1112 use super::*;
1113
1114 const CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1935);
1115 const STATEMENT: &str = "INSERT INTO topics (id, client_addr) VALUES (?, ?)";
1116 static MIGRATOR: OnceCell<()> = OnceCell::const_new();
1117
1118 async fn migrate_once() {
1119 let database_url = var("DATABASE_URL")
1120 .unwrap();
1121 let mut connection = MySqlConnection::connect(&database_url)
1122 .await
1123 .unwrap();
1124 let migrator = Migrator::new(format!("{}{MAIN_SEPARATOR}migrations", env!("CARGO_MANIFEST_DIR")).as_ref())
1125 .await
1126 .unwrap();
1127 migrator
1128 .run(&mut connection)
1129 .await
1130 .unwrap()
1131 }
1132
1133 #[tokio::test]
1134 async fn ok_unsigned_handshake_got_handled() {
1135 let mut stream = pin!(VecStream::default());
1136 let mut rtmp_context = RtmpContext::default();
1137
1138 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1139 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1140 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
1141 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1142 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1143 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1144 assert!(result.is_ok());
1145
1146 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1147 let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1148 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1149 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1150 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1151
1152 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1153 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1154 assert!(result.is_ok());
1155 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1156 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1157 }
1158
1159 #[tokio::test]
1160 async fn err_digest_did_not_match() {
1161 let mut stream = pin!(VecStream::default());
1162 let mut rtmp_context = RtmpContext::default();
1163
1164 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1165 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1166 let sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1167 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1168 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1169 assert!(result.is_err())
1170 }
1171
1172 #[tokio::test]
1173 async fn err_signature_did_not_match() {
1174 let mut stream = pin!(VecStream::default());
1175 let mut rtmp_context = RtmpContext::default();
1176
1177 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1178 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1179 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1180 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1181 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1182 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1183 assert!(result.is_ok());
1184
1185 read_encryption_algorithm(stream.as_mut()).await.unwrap();
1186 let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1187 read_handshake(stream.as_mut()).await.unwrap();
1188 let mut invalid_signature_key: [u8; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()] = [0; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()];
1189 fill(&mut invalid_signature_key);
1190 received_server_handshake.imprint_signature(sent_encryption_algorithm, &invalid_signature_key);
1191 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1192 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1193 assert!(result.is_err())
1194 }
1195
1196 #[tokio::test]
1197 async fn ok_singed_handshake_got_handled() {
1198 let mut stream = pin!(VecStream::default());
1199 let mut rtmp_context = RtmpContext::default();
1200
1201 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1202 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1203 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1204 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1205 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1206 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1207 assert!(result.is_ok());
1208
1209 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1210 let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1211 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1212 let mut server_signature_key: Vec<u8> = Vec::new();
1213 server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1214 server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1215 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1216 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1217 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1218
1219 let mut client_signature_key: Vec<u8> = Vec::new();
1220 client_signature_key.extend_from_slice(Handshake::CLIENT_KEY);
1221 client_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1222 received_server_handshake.imprint_signature(sent_encryption_algorithm, &client_signature_key);
1223 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1224 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1225 assert!(result.is_ok());
1226 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1227 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1228 }
1229
1230 #[tokio::test]
1231 async fn ok_signed_handshake_as_ffmpeg_got_handled() {
1232 let mut stream = pin!(VecStream::default());
1233 let mut rtmp_context = RtmpContext::default();
1234
1235 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1236 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1237 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1238 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1239 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1240 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1241 assert!(result.is_ok());
1242
1243 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1244 let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1245 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1246 let mut server_signature_key: Vec<u8> = Vec::new();
1247 server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1248 server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1249 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1250 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1251 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1252
1253 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1254 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1255 assert!(result.is_ok());
1256 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1257 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes());
1258 }
1259
1260 #[tokio::test]
1261 async fn err_undistinguishable_client() {
1262 let mut stream = pin!(VecStream::default());
1263 let mut rtmp_context = RtmpContext::default();
1264
1265 let mut buffer = ByteBuffer::default();
1266 buffer.encode(&Connect::default());
1267 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1268 let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1269 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1270 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1271 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1272 let mut buffer: ByteBuffer = chunk.into();
1273 let command: AmfString = buffer.decode().unwrap();
1274 assert!(result.is_err());
1275 assert_eq!(command, "_error");
1276 assert!(rtmp_context.get_information().is_some())
1277 }
1278
1279 #[tokio::test]
1280 async fn err_inconsistent_app_path() {
1281 let mut stream = pin!(VecStream::default());
1282 let mut rtmp_context = RtmpContext::default();
1283 rtmp_context.set_app("ondemand");
1284
1285 let mut buffer = ByteBuffer::default();
1286 buffer.encode(&Connect::new(object!("app" => AmfString::default())));
1287 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1288 let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1289 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1290 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1291 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1292 let mut buffer: ByteBuffer = chunk.into();
1293 let command: AmfString = buffer.decode().unwrap();
1294 assert!(result.is_err());
1295 assert_eq!(command, "_error");
1296 assert!(rtmp_context.get_information().is_some())
1297 }
1298
1299 #[tokio::test]
1300 async fn err_empty_topic_id() {
1301 let mut stream = pin!(VecStream::default());
1302 let mut rtmp_context = RtmpContext::default();
1303 let mut buffer = ByteBuffer::default();
1304 buffer.encode(&ReleaseStream::new(AmfString::from("")));
1305 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1306 let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1307 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1308 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1309 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1310 let mut buffer: ByteBuffer = chunk.into();
1311 let command: AmfString = buffer.decode().unwrap();
1312 assert!(result.is_err());
1313 assert_eq!(command, "_error");
1314 assert!(rtmp_context.get_information().is_some())
1315 }
1316
1317 #[tokio::test]
1318 async fn err_unpublished_stream() {
1319 if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1320 from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1321 }
1322
1323 MIGRATOR.get_or_init(migrate_once).await;
1324
1325 let temp_dir = temp_dir();
1326 let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1327
1328 let database_url = var("DATABASE_URL").unwrap();
1329
1330 let app = "ondemand";
1331
1332 let mut stream = pin!(VecStream::default());
1333 let mut rtmp_context = RtmpContext::default();
1334 rtmp_context.set_storage_path(&storage_path);
1335 rtmp_context.set_database_url(&database_url);
1336 rtmp_context.set_app(app);
1337 rtmp_context.set_client_addr(CLIENT_ADDR);
1338
1339 let topic_id = Uuid::now_v7().to_string();
1340
1341 let mut buffer = ByteBuffer::default();
1342 buffer.encode(&ReleaseStream::new(AmfString::new(topic_id)));
1343 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1344 let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1345 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1346 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1347 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1348 let mut buffer: ByteBuffer = chunk.into();
1349 let command: AmfString = buffer.decode().unwrap();
1350 assert!(result.is_err());
1351 assert_eq!(command, "_error");
1352 assert!(rtmp_context.get_information().is_some())
1353 }
1354
1355 #[tokio::test]
1356 async fn err_inconsistent_topic_id_in_publish() {
1357 let mut stream = pin!(VecStream::default());
1358 let mut rtmp_context = RtmpContext::default();
1359 rtmp_context.set_message_id(0);
1360 rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1361
1362 let mut buffer = ByteBuffer::default();
1363 buffer.encode(&Publish::new(AmfString::default(), "live".into()));
1364 handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1365 let result = handle_message(stream.as_mut()).write_publish_response(&mut rtmp_context).await;
1366 assert!(result.is_err());
1367 assert!(rtmp_context.get_information().is_some())
1368 }
1369
1370 #[tokio::test]
1371 async fn err_metadata_not_found() {
1372 if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1373 from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1374 }
1375
1376 let temp_dir = temp_dir();
1377 let topic_storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1378 let topic_id = Uuid::now_v7().to_string();
1379 let topic = {
1380 create_dir_all(&topic_storage_path).unwrap();
1381 Flv::create(&format!("{topic_storage_path}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap()
1382 };
1383
1384 let mut stream = pin!(VecStream::default());
1385 let mut rtmp_context = RtmpContext::default();
1386 rtmp_context.set_topic(topic);
1387
1388 let mut buffer = ByteBuffer::default();
1389 buffer.encode(&GetStreamLength::new(AmfString::new(topic_id)));
1390 handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1391 let result = handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await;
1392 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1393 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1394 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1395 let mut buffer: ByteBuffer = chunk.into();
1396 let command: AmfString = buffer.decode().unwrap();
1397 assert!(result.is_err());
1398 assert_eq!(command, "_error");
1399 assert!(rtmp_context.get_information().is_some())
1400 }
1401
1402 #[tokio::test]
1403 async fn err_inconsistent_topic_id_in_play() {
1404 let mut stream = pin!(VecStream::default());
1405 let mut rtmp_context = RtmpContext::default();
1406 rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1407 rtmp_context.set_message_id(0);
1408
1409 let mut buffer = ByteBuffer::default();
1410 buffer.encode(&Play::new(AmfString::new(Uuid::now_v7().to_string()), Number::from(-2i8)));
1411 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1412 let result = handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await;
1413 assert!(result.is_err());
1414 assert!(rtmp_context.get_information().is_some())
1415 }
1416
1417 #[tokio::test]
1418 async fn ok_valid_publisher_sequence() {
1419 if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1420 from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1421 }
1422
1423 env_logger::builder()
1424 .is_test(true)
1425 .filter_level(LevelFilter::from_str(&var("LOGLEVEL").unwrap_or("error".into())).unwrap())
1426 .init();
1427
1428 MIGRATOR.get_or_init(migrate_once).await;
1429
1430 let temp_dir = temp_dir();
1431 let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1432
1433 let database_url = var("DATABASE_URL").unwrap();
1434
1435 let app = "ondemand";
1436
1437 let mut rtmp_context = RtmpContext::default();
1438 rtmp_context.set_storage_path(&storage_path);
1439 rtmp_context.set_database_url(&database_url);
1440 rtmp_context.set_app(app);
1441 rtmp_context.set_client_addr(CLIENT_ADDR);
1442
1443 let topic_id = Uuid::now_v7().to_string();
1444
1445 query(STATEMENT)
1446 .bind(&topic_id)
1447 .bind(&CLIENT_ADDR.to_string())
1448 .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1449 .await
1450 .unwrap();
1451
1452 let mut stream = pin!(VecStream::default());
1453 let mut buffer = ByteBuffer::default();
1454 buffer.encode(
1455 &Connect::new(
1456 object!(
1457 "app" => AmfString::from(app),
1458 "type" => AmfString::from("nonprivate")
1459 )
1460 )
1461 );
1462 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1463 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1464 assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
1465
1466 let mut buffer = ByteBuffer::default();
1467 buffer.encode(&ReleaseStream::new(AmfString::new(topic_id.clone())));
1468 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1469 let mut stream = pin!(VecStream::default());
1470 assert!(handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await.is_ok());
1471 assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
1472
1473 let mut buffer = ByteBuffer::default();
1474 buffer.encode(&FcPublish::new(AmfString::new(topic_id.clone())));
1475 handle_message(stream.as_mut()).handle_fc_publish_request(&mut rtmp_context, buffer).await.unwrap();
1476 let mut stream = pin!(VecStream::default());
1477 assert!(handle_message(stream.as_mut()).write_fc_publish_response(&mut rtmp_context).await.is_ok());
1478 assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
1479
1480 let mut buffer = ByteBuffer::default();
1481 buffer.encode(&CreateStream);
1482 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1483 let mut stream = pin!(VecStream::default());
1484 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1485 assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
1486
1487 let mut buffer = ByteBuffer::default();
1488 buffer.encode(&Publish::new(AmfString::new(topic_id), "live".into()));
1489 handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1490 let mut stream = pin!(VecStream::default());
1491 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1492 assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
1493 assert!(handle_message(stream).write_publish_response(&mut rtmp_context).await.is_ok());
1494 assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
1495 }
1496
1497 #[tokio::test]
1498 async fn ok_valid_subscriber_sequence_in_ffmpeg() {
1499 if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1500 from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1501 }
1502
1503 MIGRATOR.get_or_init(migrate_once).await;
1504
1505 let temp_dir = temp_dir();
1506 let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1507 let app = "ondemand";
1508 let copy_to = format!("{storage_path}{MAIN_SEPARATOR}{app}");
1509 create_dir_all(©_to).unwrap();
1510
1511 let mut resources_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1512 resources_path.pop();
1513 resources_path.push("resources");
1514 resources_path.push("test.flv");
1515 let topic_id = Uuid::now_v7().to_string();
1516 copy(resources_path, format!("{copy_to}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap();
1517
1518 let database_url = var("DATABASE_URL").unwrap();
1519
1520 let mut rtmp_context = RtmpContext::default();
1521 rtmp_context.set_storage_path(&storage_path);
1522 rtmp_context.set_database_url(&database_url);
1523 rtmp_context.set_app(app);
1524 rtmp_context.set_client_addr(CLIENT_ADDR);
1525
1526 query(STATEMENT)
1527 .bind(&topic_id)
1528 .bind(&CLIENT_ADDR.to_string())
1529 .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1530 .await
1531 .unwrap();
1532
1533 let mut stream = pin!(VecStream::default());
1534 let mut buffer = ByteBuffer::default();
1535 buffer.encode(
1536 &Connect::new(
1537 object!(
1538 "app" => AmfString::from(app),
1539 "fpad" => Boolean::new(0)
1540 )
1541 )
1542 );
1543 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1544 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1545 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1546
1547 let mut buffer = ByteBuffer::default();
1548 buffer.encode(&WindowAcknowledgementSize::default());
1549 handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1550 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1551
1552 let mut buffer = ByteBuffer::default();
1553 buffer.encode(&CreateStream);
1554 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1555 let mut stream = pin!(VecStream::default());
1556 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1557 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1558
1559 let mut stream = pin!(VecStream::default());
1560 let mut buffer = ByteBuffer::default();
1561 buffer.encode(&FcSubscribe::new(AmfString::new(topic_id.clone())));
1562 handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1563 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1564
1565 let mut buffer = ByteBuffer::default();
1566 buffer.encode(&GetStreamLength::new(AmfString::new(topic_id.clone())));
1567 handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1568 let mut stream = pin!(VecStream::default());
1569 assert!(handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await.is_ok());
1570 assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1571
1572 let mut buffer = ByteBuffer::default();
1573 buffer.encode(
1574 &Play::new(
1575 AmfString::new(topic_id),
1576 Number::from(-2i8)
1577 )
1578 );
1579 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1580 let mut stream = pin!(VecStream::default());
1581 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1582 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1583 assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1584 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1585
1586 let mut buffer = ByteBuffer::default();
1587 buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1588 handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1589 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1590 }
1591
1592 #[tokio::test]
1593 async fn ok_valid_subscriber_sequence_in_obs() {
1594 if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1595 from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1596 }
1597
1598 MIGRATOR.get_or_init(migrate_once).await;
1599
1600 let temp_dir = temp_dir();
1601 let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1602
1603 let app = "ondemand";
1604 let copy_to = format!("{storage_path}{MAIN_SEPARATOR}{app}");
1605 create_dir_all(©_to).unwrap();
1606
1607 let mut resources_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1608 resources_path.pop();
1609 resources_path.push("resources");
1610 resources_path.push("test.flv");
1611 let topic_id = Uuid::now_v7().to_string();
1612 copy(resources_path, &format!("{copy_to}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap();
1613
1614 let database_url = var("DATABASE_URL").unwrap();
1615
1616 let mut rtmp_context = RtmpContext::default();
1617 rtmp_context.set_storage_path(&storage_path);
1618 rtmp_context.set_database_url(&database_url);
1619 rtmp_context.set_app(app);
1620 rtmp_context.set_client_addr(CLIENT_ADDR);
1621
1622 query(STATEMENT)
1623 .bind(&topic_id)
1624 .bind(&CLIENT_ADDR.to_string())
1625 .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1626 .await
1627 .unwrap();
1628
1629 let mut stream = pin!(VecStream::default());
1630 let mut buffer = ByteBuffer::default();
1631 buffer.encode(
1632 &Connect::new(
1633 object!(
1634 "app" => AmfString::from(app),
1635 "fpad" => Boolean::new(0)
1636 )
1637 )
1638 );
1639 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1640 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1641 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1642
1643 let mut buffer = ByteBuffer::default();
1644 buffer.encode(&WindowAcknowledgementSize::default());
1645 handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1646 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1647
1648 let mut buffer = ByteBuffer::default();
1649 buffer.encode(&CreateStream);
1650 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1651 let mut stream = pin!(VecStream::default());
1652 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1653 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1654
1655 let mut buffer = ByteBuffer::default();
1656 buffer.encode(&FcSubscribe::new(AmfString::new(topic_id.clone())));
1657 handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1658 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1659
1660 let mut buffer = ByteBuffer::default();
1661 buffer.encode(
1662 &SetPlaylist::new(
1663 ecma_array!(
1664 "0" => AmfString::new(topic_id.clone())
1665 )
1666 )
1667 );
1668 handle_message(stream.as_mut()).handle_playlist_request(&mut rtmp_context, buffer).await.unwrap();
1669 let mut stream = pin!(VecStream::default());
1670 assert!(handle_message(stream.as_mut()).write_playlist_response(&mut rtmp_context).await.is_ok());
1671 assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1672
1673 let mut buffer = ByteBuffer::default();
1674 buffer.encode(
1675 &Play::new(
1676 AmfString::new(topic_id),
1677 Number::from(-2i8)
1678 )
1679 );
1680 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1681 let mut stream = pin!(VecStream::default());
1682 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1683 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1684 assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1685 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1686
1687 let mut buffer = ByteBuffer::default();
1688 buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1689 handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1690 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1691 }
1692}