1use std::{
2 future::Future,
3 io::{
4 Error as IOError,
5 ErrorKind,
6 Result as IOResult
7 },
8 pin::{
9 Pin,
10 pin
11 },
12 sync::Arc,
13 task::{
14 Context as FutureContext,
15 Poll
16 },
17 time::{
18 Duration,
19 Instant
20 }
21};
22use log::{
23 debug,
24 error,
25 info
26};
27use futures::ready;
28use tokio::io::{
29 AsyncRead,
30 AsyncWrite
31};
32use sheave_core::{
33 ByteBuffer,
34 Decoder,
35 Encoder,
36 U24_MAX,
37 flv::tags::*,
38 handlers::{
39 AsyncHandler,
40 AsyncHandlerExt,
41 ClientType,
42 ErrorHandler,
43 HandlerConstructor,
44 LastChunk,
45 PublisherStatus,
46 RtmpContext,
47 StreamWrapper,
48 SubscriberStatus,
49 inconsistent_sha,
50 stream_got_exhausted
51 },
52 handshake::{
53 Handshake,
54 Version
55 },
56 messages::{
57 Channel,
59 ChunkData,
60 CommandError,
61 Connect,
62 ConnectResult,
63 CreateStream,
64 CreateStreamResult,
65 EventType,
66 UserControl,
67 OnStatus,
68 Audio,
69 Video,
70 SetDataFrame,
71 Acknowledgement,
72 amf::v0::{
73 AmfString,
74 Number,
75 Object
76 },
77 headers::MessageType,
78
79 ReleaseStream,
81 ReleaseStreamResult,
82 FcPublish,
83 OnFcPublish,
84 StreamBegin,
85 Publish,
86 FcUnpublish,
87 DeleteStream,
88
89 WindowAcknowledgementSize,
91 FcSubscribe,
92 GetStreamLength,
93 GetStreamLengthResult,
94 SetPlaylist,
95 PlaylistReady,
96 Play,
97 SetBufferLength,
98 },
99 net::RtmpReadExt,
100 object,
101 readers::*,
102 writers::*
103};
104use super::{
105 inconsistent_app_path,
107 undistinguishable_client,
108 empty_topic_path,
109 inconsistent_topic_path,
110 middlewares::write_acknowledgement,
111
112 publish_topic,
114 provide_message_id,
115 unpublish_topic,
116 return_message_id,
117
118 subscribe_topic,
120 metadata_not_found,
121};
122
123#[doc(hidden)]
124#[derive(Debug)]
125struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
126
127#[doc(hidden)]
128impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
129 async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
130 let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
131 let mut client_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
132
133 if client_request.get_version() == Version::UNSIGNED {
134 let server_request = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
135 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
136 write_handshake(self.0.as_mut(), &server_request).await?;
137 write_handshake(self.0.as_mut(), &client_request).await?;
138
139 rtmp_context.set_encryption_algorithm(encryption_algorithm);
140 rtmp_context.set_server_handshake(server_request);
141 rtmp_context.set_client_handshake(client_request);
142 } else {
143 if !client_request.did_digest_match(encryption_algorithm, Handshake::CLIENT_KEY) {
144 error!("Invalid SHA digest/signature: {:x?}", client_request.get_digest(encryption_algorithm));
145 return Err(inconsistent_sha(client_request.get_digest(encryption_algorithm).to_vec()))
146 } else {
147 let mut server_request = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
148 server_request.imprint_digest(encryption_algorithm, Handshake::SERVER_KEY);
149 let mut server_response_key: Vec<u8> = Vec::new();
150 server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151 server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152 client_request.imprint_signature(encryption_algorithm, &server_response_key);
153 write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
154 write_handshake(self.0.as_mut(), &server_request).await?;
155 write_handshake(self.0.as_mut(), &client_request).await?;
156
157 rtmp_context.set_signed(true);
158 rtmp_context.set_encryption_algorithm(encryption_algorithm);
159 rtmp_context.set_server_handshake(server_request);
160 rtmp_context.set_client_handshake(client_request);
161 }
162 }
163
164 info!("First handshake got handled.");
165 Ok(())
166 }
167
168 async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
169 let client_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
170
171 if !rtmp_context.is_signed() {
172 rtmp_context.set_server_handshake(client_response);
173 } else {
174 let encryption_algorithm = rtmp_context.get_encryption_algorithm().unwrap();
175 let mut client_response_key: Vec<u8> = Vec::new();
176 client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
177 client_response_key.extend_from_slice(Handshake::COMMON_KEY);
178 let server_request = rtmp_context.get_server_handshake().unwrap();
179 if !client_response.did_signature_match(encryption_algorithm, &client_response_key) && server_request.get_signature() != client_response.get_signature() {
181 error!("Invalid SHA digest/signature: {:x?}", client_response.get_signature());
182 return Err(inconsistent_sha(client_response.get_signature().to_vec()))
183 } else {
184 debug!("Handshake version: {:?}", client_response.get_version());
185 debug!("Signature: {:x?}", client_response.get_signature());
186 rtmp_context.set_server_handshake(client_response);
187 }
188 }
189
190 info!("Second handshake got handled.");
191 Ok(())
192 }
193}
194
195#[doc(hidden)]
196impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
197 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
198 ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
199 pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
200 }
201}
202
203#[doc(hidden)]
204fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
205 HandshakeHandler(stream)
206}
207
208#[doc(hidden)]
209#[derive(Debug)]
210struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
211
212#[doc(hidden)]
213impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
214 async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
215 Decoder::<Acknowledgement>::decode(&mut buffer)?;
216
217 info!("Acknowledgement got handled.");
218 Ok(())
219 }
220
221 async fn handle_connect_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
222 let connect_request: Connect = buffer.decode()?;
223 rtmp_context.set_command_object(connect_request.into());
224
225 info!("connect got handled.");
226 Ok(())
227 }
228
229 async fn handle_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
230 let window_acknowledgement_size: WindowAcknowledgementSize = buffer.decode()?;
231 rtmp_context.set_window_acknowledgement_size(window_acknowledgement_size);
232
233 rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
239
240 info!("Window Acknowledgement Size got handled.");
241 Ok(())
242 }
243
244 async fn handle_release_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
245 let release_stream_request: ReleaseStream = buffer.decode()?;
246 rtmp_context.set_topic_path(release_stream_request.into());
247
248 info!("releaseStream got handled.");
249 Ok(())
250 }
251
252 async fn handle_fc_publish_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
253 Decoder::<FcPublish>::decode(&mut buffer)?;
254
255 info!("FCPublish got handled.");
256 Ok(())
257 }
258
259 async fn handle_create_stream_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
260 Decoder::<CreateStream>::decode(&mut buffer)?;
261
262 info!("createStream got handled.");
263 Ok(())
264 }
265
266 async fn handle_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
267 let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
268 let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
269 let client_addr = rtmp_context.get_client_addr().unwrap();
270 let app = rtmp_context.get_app().unwrap().clone();
271
272 let fc_subscribe_request: FcSubscribe = buffer.decode()?;
273 let topic = subscribe_topic(&topic_database_url, &topic_storage_path, &app, fc_subscribe_request.get_topic_path(), RtmpContext::DIRECTORY_SEPARATOR, client_addr).await?;
279 rtmp_context.set_topic(topic);
280 rtmp_context.set_topic_path(fc_subscribe_request.into());
281
282 rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
283
284 info!("FCSubscribe got handled.");
285 Ok(())
286 }
287
288 async fn handle_stream_length_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
289 let get_stream_length_request: GetStreamLength = buffer.decode()?;
290 rtmp_context.set_topic_path(get_stream_length_request.into());
291
292 info!("getStreamLength got handled.");
293 Ok(())
294 }
295
296 async fn handle_playlist_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
297 let set_playlist_request: SetPlaylist = buffer.decode()?;
298 rtmp_context.set_playlist(set_playlist_request.into());
299
300 info!("set playlist got handled.");
301 Ok(())
302 }
303
304 async fn handle_publish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
305 let publish_request: Publish = buffer.decode()?;
306 let (publishing_name, publishing_type): (AmfString, AmfString) = publish_request.into();
307 rtmp_context.set_publishing_name(publishing_name);
308 rtmp_context.set_publishing_type(publishing_type);
309
310 info!("publish got handled.");
311 Ok(())
312 }
313
314 async fn handle_play_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
315 let play_request: Play = buffer.decode()?;
316 let (stream_name, start_time, play_mode) = play_request.into();
317 rtmp_context.set_stream_name(stream_name);
318 rtmp_context.set_start_time(start_time);
319 rtmp_context.set_play_mode(play_mode);
320
321 info!("play got handled.");
322 Ok(())
323 }
324
325 async fn handle_buffer_length(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
326 let buffer_length: SetBufferLength = buffer.decode()?;
327 rtmp_context.set_buffer_length(buffer_length.get_buffering_time());
328
329 if let Some(SubscriberStatus::Played) = rtmp_context.get_subscriber_status() {
336 rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
337 }
338
339 Ok(())
340 }
341
342 async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
343 use EventType::*;
344
345 let event_type: EventType = buffer.get_u16_be()?.into();
346 match event_type {
347 SetBufferLength => self.handle_buffer_length(rtmp_context, buffer).await,
348 _ => unimplemented!("Undefined event type: {event_type:?}")
349 }
350 }
351
352 async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
353 let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
354 let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
355 let client_addr = rtmp_context.get_client_addr().unwrap();
356 let app = rtmp_context.get_app().unwrap().clone();
357
358 let fc_unpublish_request: FcUnpublish = buffer.decode()?;
359 unpublish_topic(&topic_database_url, &topic_storage_path, &app, fc_unpublish_request.get_topic_path(), RtmpContext::DIRECTORY_SEPARATOR, client_addr).await?;
360 rtmp_context.reset_topic_path();
361
362 info!("FCUnpublish got handled.");
363 Ok(())
364 }
365
366 async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
367 let delete_stream_request: DeleteStream = buffer.decode()?;
368 return_message_id(delete_stream_request.into());
369 rtmp_context.reset_message_id();
370
371 info!("deleteStream got handled.");
372 Ok(())
373 }
374
375 async fn handle_publisher_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
376 use PublisherStatus::*;
377
378 let command: AmfString = buffer.decode()?;
379 let transaction_id: Number = buffer.decode()?;
380 rtmp_context.set_command_name(command.clone());
381 rtmp_context.set_transaction_id(transaction_id);
382
383 if command == "FCUnpublish" {
384 return self.handle_fc_unpublish_request(rtmp_context, buffer).await
385 }
386 if command == "deleteStream" {
387 return self.handle_delete_stream_request(rtmp_context, buffer).await
388 }
389
390 match rtmp_context.get_publisher_status().unwrap() {
391 Connected => self.handle_release_stream_request(rtmp_context, buffer).await,
392 Released => self.handle_fc_publish_request(rtmp_context, buffer).await,
393 FcPublished => self.handle_create_stream_request(rtmp_context, buffer).await,
394 Created => self.handle_publish_request(rtmp_context, buffer).await,
395 _ => Ok(())
396 }
397 }
398
399 async fn handle_subscriber_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
400 use SubscriberStatus::*;
401
402 let subscriber_status = rtmp_context.get_subscriber_status().unwrap();
403
404 let command: AmfString = buffer.decode()?;
405 let transaction_id: Number = buffer.decode()?;
406 rtmp_context.set_command_name(command.clone());
407 rtmp_context.set_transaction_id(transaction_id);
408
409 if subscriber_status == FcSubscribed {
410 if command == "getStreamLength" {
412 return self.handle_stream_length_request(rtmp_context, buffer).await
413 }
414 if command == "set_playlist" {
416 return self.handle_playlist_request(rtmp_context, buffer).await
417 }
418 }
419
420 match subscriber_status {
421 Connected => Ok(()),
422 WindowAcknowledgementSizeGotSent => self.handle_create_stream_request(rtmp_context, buffer).await,
424 Created => self.handle_fc_subscribe_request(rtmp_context, buffer).await,
425 AdditionalCommandGotSent => self.handle_play_request(rtmp_context, buffer).await,
426 _ => Ok(())
427 }
428 }
429
430 async fn handle_command_request(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
431 use ClientType::*;
432
433 if let Some(client_type) = rtmp_context.get_client_type() {
434 match client_type {
435 Publisher => self.handle_publisher_request(rtmp_context, buffer).await,
436 Subscriber => self.handle_subscriber_request(rtmp_context, buffer).await
437 }
438 } else {
439 self.handle_connect_request(rtmp_context, buffer).await
440 }
441 }
442
443 async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
444 let topic = rtmp_context.get_topic().unwrap();
445
446 let tag_type = match message_type {
447 MessageType::Audio => TagType::Audio,
448 MessageType::Video => TagType::Video,
449 MessageType::Data => TagType::ScriptData,
450 _ => TagType::Other
451 };
452
453 if let TagType::ScriptData = tag_type {
454 Decoder::<AmfString>::decode(&mut buffer)?;
456 }
457
458 let data: Vec<u8> = buffer.into();
459 let flv_tag = FlvTag::new(tag_type, timestamp, data);
460 topic.append_flv_tag(flv_tag)?;
461
462 info!("FLV chunk got handled.");
463 Ok(())
464 }
465
466 async fn write_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
467 let mut buffer = ByteBuffer::default();
468 buffer.encode(&AmfString::from("_error"));
469 buffer.encode(&rtmp_context.get_transaction_id());
470 buffer.encode(&CommandError::new(information.clone()));
471 write_chunk(self.0.as_mut(), rtmp_context, CommandError::CHANNEL.into(), Duration::default(), CommandError::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
472
473 rtmp_context.set_information(information);
474
475 error!("{error}");
476 return Err(error)
477 }
478
479 async fn write_connect_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
480 use ClientType::*;
481
482 let command_object = rtmp_context.get_command_object().unwrap().clone();
483
484 let client_type = if command_object.get_properties().get("type").is_some() {
485 Publisher
486 } else if command_object.get_properties().get("fpad").is_some() {
487 Subscriber
488 } else {
489 let information = object!(
490 "level" => AmfString::from("error"),
491 "code" => AmfString::from("NetConnection.Connect.UndistinguishableClient"),
492 "description" => AmfString::from("Server couldn't distinguish you are either publisher or subscriber.")
493 );
494 return self.write_error_response(rtmp_context, information, undistinguishable_client()).await
495 };
496
497 let app = rtmp_context.get_app().unwrap().clone();
498 let requested_app: &AmfString = (&command_object.get_properties()["app"]).into();
499 if *requested_app != app {
500 let information = object!(
501 "level" => AmfString::from("error"),
502 "code" => AmfString::from("NetConnection.Connect.InconsistentAppPath"),
503 "description" => AmfString::new(format!("Requested app path is inconsistent. expected: {}, actual: {}", app, requested_app))
504 );
505 return self.write_error_response(rtmp_context, information, inconsistent_app_path(app, requested_app.clone())).await
506 }
507
508 let properties = object!(
509 "fmsVer" => AmfString::from("FMS/5,0,17"),
510 "capabilities" => Number::from(31)
511 );
512 let information = object!(
513 "level" => AmfString::from("status"),
514 "code" => AmfString::from("NetConnection.Connect.Success"),
515 "description" => AmfString::from("Connection succeeded."),
516 "objectEncoding" => Number::from(0)
517 );
518 let mut buffer = ByteBuffer::default();
519 buffer.encode(&AmfString::from("_result"));
520 buffer.encode(&rtmp_context.get_transaction_id());
521 buffer.encode(&ConnectResult::new(properties.clone(), information.clone()));
522 write_chunk(self.0.as_mut(), rtmp_context, ConnectResult::CHANNEL.into(), Duration::default(), ConnectResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
523
524 rtmp_context.set_client_type(client_type);
525 rtmp_context.set_properties(properties);
526 rtmp_context.set_information(information);
527
528 match client_type {
529 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
530 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
531 }
532
533 info!("connect result got sent.");
534 Ok(())
535 }
536
537 async fn write_release_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
538 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
539
540 if topic_path.is_empty() {
541 let information = object!(
542 "level" => AmfString::from("error"),
543 "code" => AmfString::from("NetConnection.ReleaseStream.EmptyTopicPath"),
544 "description" => AmfString::from("The topic path must not be empty.")
545 );
546 return self.write_error_response(rtmp_context, information, empty_topic_path()).await
547 }
548
549 let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
550 let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
551 let client_addr = rtmp_context.get_client_addr().unwrap();
552 let app = rtmp_context.get_app().unwrap().clone();
553
554 let topic = match publish_topic(&topic_database_url, &topic_storage_path, &app, &topic_path, RtmpContext::DIRECTORY_SEPARATOR, client_addr).await {
555 Ok(topic) => topic,
556 Err(e) => {
557 let information = object!(
558 "level" => AmfString::from("error"),
559 "code" => AmfString::from("NetConnection.ReleaseStream.StreamIsUnpublished"),
560 "description" => AmfString::new(format!("A stream of {topic_path} is unpublished."))
561 );
562 return self.write_error_response(rtmp_context, information, e).await
563 }
564 };
565
566 let mut buffer = ByteBuffer::default();
567 buffer.encode(&AmfString::from("_result"));
568 buffer.encode(&rtmp_context.get_transaction_id());
569 buffer.encode(&ReleaseStreamResult);
570 write_chunk(self.0.as_mut(), rtmp_context, ReleaseStreamResult::CHANNEL.into(), Duration::default(), ReleaseStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
571
572 rtmp_context.set_topic(topic);
573
574 rtmp_context.set_publisher_status(PublisherStatus::Released);
575
576 info!("releaseStream result got sent.");
577 Ok(())
578 }
579
580 async fn write_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
581 let mut buffer = ByteBuffer::default();
582 buffer.encode(&AmfString::from("onFCPublish"));
583 buffer.encode(&OnFcPublish);
584 write_chunk(self.0.as_mut(), rtmp_context, OnFcPublish::CHANNEL.into(), Duration::default(), OnFcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
585
586 rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
587
588 info!("onFCPublish got sent.");
589 Ok(())
590 }
591
592 async fn write_create_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
593 use ClientType::*;
594
595 let client_type = rtmp_context.get_client_type().unwrap();
596
597 let message_id = provide_message_id();
598 let mut buffer = ByteBuffer::default();
599 buffer.encode(&AmfString::from("_result"));
600 buffer.encode(&rtmp_context.get_transaction_id());
601 buffer.encode(&CreateStreamResult::new(message_id.into()));
602 write_chunk(self.0.as_mut(), rtmp_context, CreateStreamResult::CHANNEL.into(), Duration::default(), CreateStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
603
604 rtmp_context.set_message_id(message_id);
605
606 match client_type {
607 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
608 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
609 }
610
611 info!("createStream result got sent.");
612 Ok(())
613 }
614
615 async fn write_stream_length_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
616 let transaction_id = rtmp_context.get_transaction_id();
617 let topic = rtmp_context.get_topic_mut().unwrap();
618
619 for result in topic {
620 let flv_tag = result?;
621
622 if flv_tag.get_tag_type() == TagType::ScriptData {
623 let mut buffer = ByteBuffer::default();
624 buffer.put_bytes(flv_tag.get_data());
625 let script_data: ScriptDataTag = buffer.decode()?;
626
627 if *script_data.get_name() == "onMetaData" {
628 let duration: &Number = (&script_data.get_value().get_properties()["duration"]).into();
629 let mut buffer = ByteBuffer::default();
630 buffer.encode(&AmfString::from("_result"));
631 buffer.encode(&transaction_id);
632 buffer.encode(&GetStreamLengthResult::new(*duration));
633 write_chunk(self.0.as_mut(), rtmp_context, GetStreamLengthResult::CHANNEL.into(), Duration::default(), GetStreamLengthResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
634
635 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
636
637 info!("getStreamLength result got sent.");
638 return Ok(())
639 }
640 }
641 }
642
643 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
644 let information = object!(
645 "level" => AmfString::from("error"),
646 "code" => AmfString::from("NetConnection.GetStreamLength.MetadataNotFound"),
647 "description" => AmfString::new(format!("Metadata didn't find in specified topic path: {topic_path}"))
648 );
649 self.write_error_response(rtmp_context, information, metadata_not_found(topic_path)).await
650 }
651
652 async fn write_playlist_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
653 let mut buffer = ByteBuffer::default();
654 buffer.encode(&AmfString::from("playlist_ready"));
655 buffer.encode(&PlaylistReady);
656 write_chunk(self.0.as_mut(), rtmp_context, PlaylistReady::CHANNEL.into(), Duration::default(), PlaylistReady::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
657
658 rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
659
660 info!("playlist_ready got sent.");
661 Ok(())
662 }
663
664 async fn write_stream_begin(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
665 use ClientType::*;
666
667 let message_id = rtmp_context.get_message_id().unwrap();
668 let mut buffer = ByteBuffer::default();
669 buffer.put_u16_be(StreamBegin::EVENT_TYPE.into());
670 buffer.encode(&StreamBegin::new(message_id));
671 write_chunk(self.0.as_mut(), rtmp_context, StreamBegin::CHANNEL.into(), Duration::default(), StreamBegin::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
672
673 match rtmp_context.get_client_type().unwrap() {
674 Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
675 Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
676 }
677
678 info!("Stream Begin got sent.");
679 Ok(())
680 }
681
682 async fn write_error_status(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
683 let message_id = rtmp_context.get_message_id().unwrap();
684
685 let mut buffer = ByteBuffer::default();
686 buffer.encode(&AmfString::from("onStatus"));
687 buffer.encode(&Number::from(0));
688 buffer.encode(&OnStatus::new(information.clone()));
689 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
690
691 rtmp_context.set_information(information);
692
693 error!("{error}");
694 return Err(error)
695 }
696
697 async fn write_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
698 let message_id = rtmp_context.get_message_id().unwrap();
699 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
700 let publishing_name = rtmp_context.get_publishing_name().unwrap().clone();
701
702 if topic_path != publishing_name {
703 let information = object!(
704 "level" => AmfString::from("error"),
705 "code" => AmfString::from("NetStream.Publish.InconsistentPlaypath"),
706 "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_path}, actual: {publishing_name}"))
707 );
708 return self.write_error_status(rtmp_context, information, inconsistent_topic_path(topic_path, publishing_name)).await
709 }
710
711 let information = object!(
712 "level" => AmfString::from("status"),
713 "code" => AmfString::from("NetStream.Publish.Start"),
714 "description" => AmfString::new(format!("{publishing_name} is now published")),
715 "details" => publishing_name.clone()
716 );
717 let mut buffer = ByteBuffer::default();
718 buffer.encode(&AmfString::from("onStatus"));
719 buffer.encode(&Number::from(0));
720 buffer.encode(&OnStatus::new(information.clone()));
721 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
722
723 rtmp_context.set_information(information);
724
725 rtmp_context.set_publisher_status(PublisherStatus::Published);
726
727 info!("onStatus(publish) got sent.");
728 Ok(())
729 }
730
731 async fn write_play_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
732 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
733 let message_id = rtmp_context.get_message_id().unwrap();
734 let stream_name = rtmp_context.get_stream_name().unwrap().clone();
735
736 if topic_path != stream_name {
737 let information = object!(
738 "level" => AmfString::from("error"),
739 "code" => AmfString::from("NetStream.Play.InconsistentTopicPath"),
740 "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_path}, actual: {stream_name}"))
741 );
742 return self.write_error_status(rtmp_context, information, inconsistent_topic_path(topic_path, stream_name)).await
743 }
744
745 let information = object!(
746 "level" => AmfString::from("status"),
747 "code" => AmfString::from("NetStream.Play.Start"),
748 "description" => AmfString::from("Playing stream.")
749 );
750 let mut buffer = ByteBuffer::default();
751 buffer.encode(&AmfString::from("onStatus"));
752 buffer.encode(&Number::from(0));
753 buffer.encode(&OnStatus::new(information.clone()));
754 write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
755
756 rtmp_context.set_information(information);
757
758 rtmp_context.set_subscriber_status(SubscriberStatus::Played);
759
760 info!("onStatus(play) got sent.");
761 Ok(())
762 }
763
764 async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
765 for next in rtmp_context.get_topic_mut().unwrap() {
766 let flv_tag = next?;
767 let message_id = rtmp_context.get_message_id().unwrap();
768
769 let channel;
770 let message_type;
771 match flv_tag.get_tag_type() {
772 TagType::Audio => {
773 channel = Audio::CHANNEL;
774 message_type = Audio::MESSAGE_TYPE;
775 },
776 TagType::Video => {
777 channel = Video::CHANNEL;
778 message_type = Video::MESSAGE_TYPE;
779 },
780 TagType::ScriptData => {
781 channel = SetDataFrame::CHANNEL;
782 message_type = SetDataFrame::MESSAGE_TYPE;
783 },
784 TagType::Other => {
785 channel = Channel::Other;
786 message_type = MessageType::Other;
787 }
788 }
789 let timestamp = flv_tag.get_timestamp();
790 let data: Vec<u8> = if let MessageType::Data = message_type {
791 let mut buffer = ByteBuffer::default();
792 buffer.encode(&AmfString::from("@setDataFrame"));
793 buffer.put_bytes(flv_tag.get_data());
794 buffer.into()
795 } else {
796 flv_tag.get_data().to_vec()
797 };
798 write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
799
800 info!("FLV chunk got sent.");
801 return Ok(())
802 }
803
804 info!("FLV data became empty.");
805 Err(stream_got_exhausted())
806 }
807}
808
809#[doc(hidden)]
810impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
811 fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
812 use MessageType::*;
813
814 let basic_header = ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?;
815 let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
816 let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
817 if timestamp.as_millis() == U24_MAX as u128 {
818 let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
819 Some(extended_timestamp)
820 } else {
821 None
822 }
823 } else {
824 None
825 };
826
827 let chunk_id = basic_header.get_chunk_id();
828 if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
829 if let Some(extended_timestamp) = extended_timestamp {
830 last_received_chunk.set_timestamp(extended_timestamp);
831 } else {
832 message_header.get_timestamp().map(
833 |timestamp| last_received_chunk.set_timestamp(timestamp)
834 );
835 }
836 message_header.get_message_length().map(
837 |message_length| last_received_chunk.set_message_length(message_length)
838 );
839 message_header.get_message_type().map(
840 |message_type| last_received_chunk.set_message_type(message_type)
841 );
842 message_header.get_message_id().map(
843 |message_id| last_received_chunk.set_message_id(message_id)
844 );
845 } else {
846 rtmp_context.insert_received_chunk(
847 chunk_id,
848 LastChunk::new(
849 message_header.get_timestamp().unwrap(),
850 message_header.get_message_length().unwrap(),
851 message_header.get_message_type().unwrap(),
852 message_header.get_message_id().unwrap()
853 )
854 );
855 }
856 let data = ready!(
857 pin!(
858 read_chunk_data(
859 pin!(self.0.await_until_receiving()),
860 rtmp_context.get_receiving_chunk_size(),
861 rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length()
862 )
863 ).poll(cx)
864 )?;
865 let buffer: ByteBuffer = data.into();
866
867 let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
868 match message_type {
869 Acknowledgement => ready!(pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx))?,
870 UserControl => ready!(pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx))?,
871 WindowAcknowledgementSize => ready!(pin!(self.handle_window_acknowledgement_size(rtmp_context, buffer)).poll(cx))?,
872 Audio | Video | Data => {
873 let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
874 ready!(pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx))?
875 },
876 Command => ready!(pin!(self.handle_command_request(rtmp_context, buffer)).poll(cx))?,
877 other => unimplemented!("Undefined Message: {other:?}")
878 }
879
880 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
881 match publisher_status {
882 PublisherStatus::Connected => pin!(self.write_release_stream_response(rtmp_context)).poll(cx),
883 PublisherStatus::Released => pin!(self.write_fc_publish_response(rtmp_context)).poll(cx),
884 PublisherStatus::FcPublished => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
885 PublisherStatus::Created => {
886 ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
887 pin!(self.write_publish_response(rtmp_context)).poll(cx)
888 },
889 _ => {
890 Poll::Ready(Ok(()))
892 }
893 }
894 } else if let Some(mut subscriber_status) = rtmp_context.get_subscriber_status() {
895 if subscriber_status == SubscriberStatus::FcSubscribed {
896 let command = rtmp_context.get_command_name().unwrap().clone();
897
898 if command == "getStreamLength" {
899 return pin!(self.write_stream_length_response(rtmp_context)).poll(cx)
900 } else if command == "set_playlist" {
901 return pin!(self.write_playlist_response(rtmp_context)).poll(cx)
902 } else {
903 subscriber_status = SubscriberStatus::AdditionalCommandGotSent;
904 }
905 }
906
907 match subscriber_status {
908 SubscriberStatus::WindowAcknowledgementSizeGotSent => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
909 SubscriberStatus::AdditionalCommandGotSent => {
910 ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
911 pin!(self.write_play_response(rtmp_context)).poll(cx)
912 },
913 SubscriberStatus::Played => pin!(self.write_flv(rtmp_context)).poll(cx),
914 _ => {
915 Poll::Ready(Ok(()))
917 }
918 }
919 } else {
920 pin!(self.write_connect_response(rtmp_context)).poll(cx)
921 }
922 }
923}
924
925#[doc(hidden)]
926fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
927 MessageHandler(stream)
928}
929
930#[doc(hidden)]
931#[derive(Debug)]
932struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
933
934#[doc(hidden)]
935impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
936 async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
937 let topic_path = rtmp_context.get_topic_path().unwrap().clone();
938 rtmp_context.increase_transaction_id();
939
940 let mut buffer = ByteBuffer::default();
941 buffer.encode(&AmfString::from("FCUnpublish"));
942 buffer.encode(&rtmp_context.get_transaction_id());
943 buffer.encode(&FcUnpublish::new(topic_path));
944 write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
945
946 info!("FCUnpublish got sent.");
947 Ok(())
948 }
949
950 async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
951 let message_id = rtmp_context.get_message_id().unwrap();
952 rtmp_context.increase_transaction_id();
953
954 let mut buffer = ByteBuffer::default();
955 buffer.encode(&AmfString::from("deleteStream"));
956 buffer.encode(&rtmp_context.get_transaction_id());
957 buffer.encode(&DeleteStream::new(message_id.into()));
958 write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
959
960 info!("deleteStream got sent.");
961 Ok(())
962 }
963}
964
965#[doc(hidden)]
966fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
967 CloseHandler(stream)
968}
969
970#[doc(hidden)]
971impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
972 fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
973 if error.kind() != ErrorKind::Other {
974 if let Some(publisher_status) = rtmp_context.get_publisher_status() {
975 if publisher_status >= PublisherStatus::FcPublished {
976 ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
977 }
978
979 if publisher_status >= PublisherStatus::Created {
980 ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
981 }
982 }
983 }
984
985 self.0.as_mut().poll_shutdown(cx)
986 }
987}
988
989#[derive(Debug)]
1049pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
1050
1051impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
1052 fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
1053 pin!(
1054 handle_handshake(self.0.make_weak_pin())
1055 .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
1056 .map_err(handle_close(self.0.make_weak_pin()))
1057 ).poll_handle(cx, rtmp_context)
1058 }
1059}
1060
1061impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
1062 fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
1063 Self(stream)
1064 }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use std::{
1070 fs::{
1071 copy,
1072 create_dir_all
1073 },
1074 net::{
1075 IpAddr,
1076 Ipv4Addr,
1077 SocketAddr
1078 },
1079 path::Path
1080 };
1081 use rand::fill;
1082 use uuid::Uuid;
1083 use sqlx::{
1084 Connection,
1085 SqliteConnection,
1086 migrate::Migrator,
1087 query
1088 };
1089 use sheave_core::{
1090 ecma_array,
1091 flv::Flv,
1092 handlers::VecStream,
1093 handshake::EncryptionAlgorithm,
1094 messages::{
1095 ChunkSize,
1096 PlayMode,
1097 SetPlaylist,
1098 amf::v0::Boolean
1099 }
1100 };
1101 use super::*;
1102
1103
1104 const CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1935);
1105 const STATEMENT: &str = "INSERT INTO topics (path, client_addr) VALUES (?1, ?2)";
1106
1107 #[tokio::test]
1108 async fn ok_unsigned_handshake_got_handled() {
1109 let mut stream = pin!(VecStream::default());
1110 let mut rtmp_context = RtmpContext::default();
1111
1112 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1113 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1114 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
1115 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1116 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1117 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1118 assert!(result.is_ok());
1119
1120 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1121 let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1122 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1123 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1124 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1125
1126 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1127 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1128 assert!(result.is_ok());
1129 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1130 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1131 }
1132
1133 #[tokio::test]
1134 async fn err_digest_did_not_match() {
1135 let mut stream = pin!(VecStream::default());
1136 let mut rtmp_context = RtmpContext::default();
1137
1138 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1139 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1140 let sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1141 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1142 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1143 assert!(result.is_err())
1144 }
1145
1146 #[tokio::test]
1147 async fn err_signature_did_not_match() {
1148 let mut stream = pin!(VecStream::default());
1149 let mut rtmp_context = RtmpContext::default();
1150
1151 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1152 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1153 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1154 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1155 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1156 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1157 assert!(result.is_ok());
1158
1159 read_encryption_algorithm(stream.as_mut()).await.unwrap();
1160 let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1161 read_handshake(stream.as_mut()).await.unwrap();
1162 let mut invalid_signature_key: [u8; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()] = [0; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()];
1163 fill(&mut invalid_signature_key);
1164 received_server_handshake.imprint_signature(sent_encryption_algorithm, &invalid_signature_key);
1165 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1166 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1167 assert!(result.is_err())
1168 }
1169
1170 #[tokio::test]
1171 async fn ok_singed_handshake_got_handled() {
1172 let mut stream = pin!(VecStream::default());
1173 let mut rtmp_context = RtmpContext::default();
1174
1175 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1176 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1177 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1178 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1179 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1180 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1181 assert!(result.is_ok());
1182
1183 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1184 let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1185 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1186 let mut server_signature_key: Vec<u8> = Vec::new();
1187 server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1188 server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1189 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1190 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1191 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1192
1193 let mut client_signature_key: Vec<u8> = Vec::new();
1194 client_signature_key.extend_from_slice(Handshake::CLIENT_KEY);
1195 client_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1196 received_server_handshake.imprint_signature(sent_encryption_algorithm, &client_signature_key);
1197 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1198 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1199 assert!(result.is_ok());
1200 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1201 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1202 }
1203
1204 #[tokio::test]
1205 async fn ok_signed_handshake_as_ffmpeg_got_handled() {
1206 let mut stream = pin!(VecStream::default());
1207 let mut rtmp_context = RtmpContext::default();
1208
1209 let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1210 write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1211 let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1212 sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1213 write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1214 let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1215 assert!(result.is_ok());
1216
1217 let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1218 let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1219 let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1220 let mut server_signature_key: Vec<u8> = Vec::new();
1221 server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1222 server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1223 sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1224 assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1225 assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1226
1227 write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1228 let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1229 assert!(result.is_ok());
1230 let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1231 assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes());
1232 }
1233
1234 #[tokio::test]
1235 async fn err_undistinguishable_client() {
1236 let mut stream = pin!(VecStream::default());
1237 let mut rtmp_context = RtmpContext::default();
1238
1239 let mut buffer = ByteBuffer::default();
1240 buffer.encode(&Connect::default());
1241 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1242 let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1243 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1244 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1245 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1246 let mut buffer: ByteBuffer = chunk.into();
1247 let command: AmfString = buffer.decode().unwrap();
1248 assert!(result.is_err());
1249 assert_eq!(command, "_error");
1250 assert!(rtmp_context.get_information().is_some())
1251 }
1252
1253 #[tokio::test]
1254 async fn err_inconsistent_app_path() {
1255 let mut stream = pin!(VecStream::default());
1256 let mut rtmp_context = RtmpContext::default();
1257 rtmp_context.set_app("ondemand");
1258
1259 let mut buffer = ByteBuffer::default();
1260 buffer.encode(&Connect::new(object!("app" => AmfString::default())));
1261 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1262 let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1263 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1264 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1265 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1266 let mut buffer: ByteBuffer = chunk.into();
1267 let command: AmfString = buffer.decode().unwrap();
1268 assert!(result.is_err());
1269 assert_eq!(command, "_error");
1270 assert!(rtmp_context.get_information().is_some())
1271 }
1272
1273 #[tokio::test]
1274 async fn err_empty_topic_path() {
1275 let mut stream = pin!(VecStream::default());
1276 let mut rtmp_context = RtmpContext::default();
1277 let mut buffer = ByteBuffer::default();
1278 buffer.encode(&ReleaseStream::new(AmfString::from("")));
1279 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1280 let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1281 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1282 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1283 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1284 let mut buffer: ByteBuffer = chunk.into();
1285 let command: AmfString = buffer.decode().unwrap();
1286 assert!(result.is_err());
1287 assert_eq!(command, "_error");
1288 assert!(rtmp_context.get_information().is_some())
1289 }
1290
1291 #[tokio::test]
1292 async fn err_unpublished_stream() {
1293 let app = "ondemand";
1294
1295 #[cfg(windows)]
1296 let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1297 #[cfg(windows)]
1298 let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1299 #[cfg(windows)]
1300 let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1301 #[cfg(unix)]
1302 let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1303 #[cfg(unix)]
1304 let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1305 #[cfg(unix)]
1306 let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1307
1308 let mut stream = pin!(VecStream::default());
1309 let mut rtmp_context = RtmpContext::default();
1310 rtmp_context.set_topic_storage_path(&topic_storage_path);
1311 rtmp_context.set_topic_database_url(&topic_database_url);
1312 rtmp_context.set_app(app);
1313 rtmp_context.set_client_addr(CLIENT_ADDR);
1314
1315 let topic_path = Uuid::now_v7().to_string();
1316
1317 let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1318 let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1319 migrator.run(&mut connection).await.unwrap();
1320
1321 let mut buffer = ByteBuffer::default();
1322 buffer.encode(&ReleaseStream::new(AmfString::new(topic_path)));
1323 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1324 let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1325 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1326 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1327 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1328 let mut buffer: ByteBuffer = chunk.into();
1329 let command: AmfString = buffer.decode().unwrap();
1330 assert!(result.is_err());
1331 assert_eq!(command, "_error");
1332 assert!(rtmp_context.get_information().is_some())
1333 }
1334
1335 #[tokio::test]
1336 async fn err_inconsistent_topic_path_in_publish() {
1337 let mut stream = pin!(VecStream::default());
1338 let mut rtmp_context = RtmpContext::default();
1339 rtmp_context.set_message_id(0);
1340 rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1341
1342 let mut buffer = ByteBuffer::default();
1343 buffer.encode(&Publish::new(AmfString::default(), "live".into()));
1344 handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1345 let result = handle_message(stream.as_mut()).write_publish_response(&mut rtmp_context).await;
1346 assert!(result.is_err());
1347 assert!(rtmp_context.get_information().is_some())
1348 }
1349
1350 #[tokio::test]
1351 async fn err_metadata_not_found() {
1352 let topic_path = Uuid::now_v7().to_string();
1353
1354 #[cfg(windows)]
1355 let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1356 #[cfg(windows)]
1357 let topic = {
1358 create_dir_all(&topic_storage_path).unwrap();
1359 Flv::create(&format!("{}\\{}.flv", &topic_storage_path, &topic_path)).unwrap()
1360 };
1361 #[cfg(unix)]
1362 let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1363 #[cfg(unix)]
1364 let topic = {
1365 create_dir_all(&topic_storage_path).unwrap();
1366 Flv::create(&format!("{}/{}.flv", &topic_storage_path, &topic_path)).unwrap()
1367 };
1368
1369 let mut stream = pin!(VecStream::default());
1370 let mut rtmp_context = RtmpContext::default();
1371 rtmp_context.set_topic(topic);
1372
1373 let mut buffer = ByteBuffer::default();
1374 buffer.encode(&GetStreamLength::new(AmfString::new(topic_path)));
1375 handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1376 let result = handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await;
1377 let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1378 let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1379 let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1380 let mut buffer: ByteBuffer = chunk.into();
1381 let command: AmfString = buffer.decode().unwrap();
1382 assert!(result.is_err());
1383 assert_eq!(command, "_error");
1384 assert!(rtmp_context.get_information().is_some())
1385 }
1386
1387 #[tokio::test]
1388 async fn err_inconsistent_topic_path_in_play() {
1389 let mut stream = pin!(VecStream::default());
1390 let mut rtmp_context = RtmpContext::default();
1391 rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1392 rtmp_context.set_message_id(0);
1393
1394 let mut buffer = ByteBuffer::default();
1395 buffer.encode(&Play::new(AmfString::new(Uuid::now_v7().to_string()), Duration::default(), PlayMode::default()));
1396 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1397 let result = handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await;
1398 assert!(result.is_err());
1399 assert!(rtmp_context.get_information().is_some())
1400 }
1401
1402 #[tokio::test]
1403 async fn ok_valid_publisher_sequence() {
1404 let app = "ondemand";
1405
1406 #[cfg(windows)]
1407 let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1408 #[cfg(windows)]
1409 let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1410 #[cfg(windows)]
1411 let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1412 #[cfg(unix)]
1413 let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1414 let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1415 #[cfg(unix)]
1416 let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1417
1418 let mut rtmp_context = RtmpContext::default();
1419 rtmp_context.set_topic_storage_path(&topic_storage_path);
1420 rtmp_context.set_topic_database_url(&topic_database_url);
1421 rtmp_context.set_client_addr(CLIENT_ADDR);
1422 rtmp_context.set_app(app);
1423
1424 let topic_path = Uuid::now_v7().to_string();
1425
1426 let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1427 let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1428 migrator.run(&mut connection).await.unwrap();
1429 query(STATEMENT)
1430 .bind(&topic_path)
1431 .bind(&CLIENT_ADDR.to_string())
1432 .execute(&mut connection)
1433 .await
1434 .unwrap();
1435
1436 let mut stream = pin!(VecStream::default());
1437 let mut buffer = ByteBuffer::default();
1438 buffer.encode(
1439 &Connect::new(
1440 object!(
1441 "app" => AmfString::from(app),
1442 "type" => AmfString::from("nonprivate")
1443 )
1444 )
1445 );
1446 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1447 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1448 assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
1449
1450 let mut buffer = ByteBuffer::default();
1451 buffer.encode(&ReleaseStream::new(AmfString::new(topic_path.clone())));
1452 handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1453 let mut stream = pin!(VecStream::default());
1454 assert!(handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await.is_ok());
1455 assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
1456
1457 let mut buffer = ByteBuffer::default();
1458 buffer.encode(&FcPublish::new(AmfString::new(topic_path.clone())));
1459 handle_message(stream.as_mut()).handle_fc_publish_request(&mut rtmp_context, buffer).await.unwrap();
1460 let mut stream = pin!(VecStream::default());
1461 assert!(handle_message(stream.as_mut()).write_fc_publish_response(&mut rtmp_context).await.is_ok());
1462 assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
1463
1464 let mut buffer = ByteBuffer::default();
1465 buffer.encode(&CreateStream);
1466 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1467 let mut stream = pin!(VecStream::default());
1468 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1469 assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
1470
1471 let mut buffer = ByteBuffer::default();
1472 buffer.encode(&Publish::new(AmfString::new(topic_path), "live".into()));
1473 handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1474 let mut stream = pin!(VecStream::default());
1475 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1476 assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
1477 assert!(handle_message(stream).write_publish_response(&mut rtmp_context).await.is_ok());
1478 assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
1479 }
1480
1481 #[tokio::test]
1482 async fn ok_valid_subscriber_sequence_in_ffmpeg() {
1483 let app = "ondemand";
1484 let topic_path = Uuid::now_v7().to_string();
1485
1486 #[cfg(windows)]
1487 let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1488 #[cfg(windows)]
1489 let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1490 #[cfg(windows)]
1491 let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1492 #[cfg(windows)]
1493 {
1494 let copy_to = format!("{topic_storage_path}\\{app}");
1495 create_dir_all(©_to).unwrap();
1496 copy(&format!("{}\\..\\resources\\test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}\\{}.flv", copy_to, topic_path)).unwrap();
1497 }
1498 #[cfg(unix)]
1499 let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1500 #[cfg(unix)]
1501 let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1502 #[cfg(unix)]
1503 let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1504 #[cfg(unix)]
1505 {
1506 let copy_to = format!("{topic_storage_path}/{app}");
1507 create_dir_all(©_to).unwrap();
1508 copy(&format!("{}/../resources/test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}/{}.flv", copy_to, topic_path)).unwrap();
1509 }
1510
1511 let mut rtmp_context = RtmpContext::default();
1512 rtmp_context.set_topic_storage_path(&topic_storage_path);
1513 rtmp_context.set_topic_database_url(&topic_database_url);
1514 rtmp_context.set_client_addr(CLIENT_ADDR);
1515 rtmp_context.set_app(app);
1516
1517 let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1518 let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1519 migrator.run(&mut connection).await.unwrap();
1520 query(STATEMENT)
1521 .bind(&topic_path)
1522 .bind(&CLIENT_ADDR.to_string())
1523 .execute(&mut connection)
1524 .await
1525 .unwrap();
1526
1527 let mut stream = pin!(VecStream::default());
1528 let mut buffer = ByteBuffer::default();
1529 buffer.encode(
1530 &Connect::new(
1531 object!(
1532 "app" => AmfString::from(app),
1533 "fpad" => Boolean::new(0)
1534 )
1535 )
1536 );
1537 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1538 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1539 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1540
1541 let mut buffer = ByteBuffer::default();
1542 buffer.encode(&WindowAcknowledgementSize::default());
1543 handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1544 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1545
1546 let mut buffer = ByteBuffer::default();
1547 buffer.encode(&CreateStream);
1548 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1549 let mut stream = pin!(VecStream::default());
1550 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1551 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1552
1553 let mut stream = pin!(VecStream::default());
1554 let mut buffer = ByteBuffer::default();
1555 buffer.encode(&FcSubscribe::new(AmfString::new(topic_path.clone())));
1556 handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1557 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1558
1559 let mut buffer = ByteBuffer::default();
1560 buffer.encode(&GetStreamLength::new(AmfString::new(topic_path.clone())));
1561 handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1562 let mut stream = pin!(VecStream::default());
1563 assert!(handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await.is_ok());
1564 assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1565
1566 let mut buffer = ByteBuffer::default();
1567 buffer.encode(
1568 &Play::new(
1569 AmfString::new(topic_path),
1570 Duration::default(),
1571 PlayMode::default()
1572 )
1573 );
1574 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1575 let mut stream = pin!(VecStream::default());
1576 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1577 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1578 assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1579 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1580
1581 let mut buffer = ByteBuffer::default();
1582 buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1583 handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1584 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1585 }
1586
1587 #[tokio::test]
1588 async fn ok_valid_subscriber_sequence_in_obs() {
1589 let app = "ondemand";
1590 let topic_path = Uuid::now_v7().to_string();
1591
1592 #[cfg(windows)]
1593 let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1594 #[cfg(windows)]
1595 let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1596 #[cfg(windows)]
1597 let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1598 #[cfg(windows)]
1599 {
1600 let copy_to = format!("{topic_storage_path}\\{app}");
1601 create_dir_all(©_to).unwrap();
1602 copy(&format!("{}\\..\\resources\\test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}\\{}.flv", copy_to, topic_path)).unwrap();
1603 }
1604
1605 #[cfg(unix)]
1606 let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1607 #[cfg(unix)]
1608 let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1609 #[cfg(unix)]
1610 let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1611 #[cfg(unix)]
1612 {
1613 let copy_to = format!("{topic_storage_path}/{app}");
1614 create_dir_all(©_to).unwrap();
1615 copy(&format!("{}/../resources/test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}/{}.flv", copy_to, topic_path)).unwrap();
1616 }
1617
1618 let mut rtmp_context = RtmpContext::default();
1619 rtmp_context.set_topic_storage_path(&topic_storage_path);
1620 rtmp_context.set_topic_database_url(&topic_database_url);
1621 rtmp_context.set_client_addr(CLIENT_ADDR);
1622 rtmp_context.set_app(app);
1623
1624 let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1625 let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1626 migrator.run(&mut connection).await.unwrap();
1627 query(STATEMENT)
1628 .bind(&topic_path)
1629 .bind(&CLIENT_ADDR.to_string())
1630 .execute(&mut connection)
1631 .await
1632 .unwrap();
1633
1634 let mut stream = pin!(VecStream::default());
1635 let mut buffer = ByteBuffer::default();
1636 buffer.encode(
1637 &Connect::new(
1638 object!(
1639 "app" => AmfString::from(app),
1640 "fpad" => Boolean::new(0)
1641 )
1642 )
1643 );
1644 handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1645 assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1646 assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1647
1648 let mut buffer = ByteBuffer::default();
1649 buffer.encode(&WindowAcknowledgementSize::default());
1650 handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1651 assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1652
1653 let mut buffer = ByteBuffer::default();
1654 buffer.encode(&CreateStream);
1655 handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1656 let mut stream = pin!(VecStream::default());
1657 assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1658 assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1659
1660 let mut buffer = ByteBuffer::default();
1661 buffer.encode(&FcSubscribe::new(AmfString::new(topic_path.clone())));
1662 handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1663 assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1664
1665 let mut buffer = ByteBuffer::default();
1666 buffer.encode(
1667 &SetPlaylist::new(
1668 ecma_array!(
1669 "0" => AmfString::new(topic_path.clone())
1670 )
1671 )
1672 );
1673 handle_message(stream.as_mut()).handle_playlist_request(&mut rtmp_context, buffer).await.unwrap();
1674 let mut stream = pin!(VecStream::default());
1675 assert!(handle_message(stream.as_mut()).write_playlist_response(&mut rtmp_context).await.is_ok());
1676 assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1677
1678 let mut buffer = ByteBuffer::default();
1679 buffer.encode(
1680 &Play::new(
1681 AmfString::new(topic_path),
1682 Duration::default(),
1683 PlayMode::default()
1684 )
1685 );
1686 handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1687 let mut stream = pin!(VecStream::default());
1688 assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1689 assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1690 assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1691 assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1692
1693 let mut buffer = ByteBuffer::default();
1694 buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1695 handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1696 assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1697 }
1698}