@@ -646,6 +646,7 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
646
646
}
647
647
}
648
648
649
+ // TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
649
650
void SrsAsyncSRTPManager::add_packet (SrsAsyncSRTPPacket* pkt)
650
651
{
651
652
packets_->push_back (pkt);
@@ -661,7 +662,9 @@ srs_error_t SrsAsyncSRTPManager::do_start()
661
662
{
662
663
srs_error_t err = srs_success;
663
664
665
+ // TODO: FIXME: Config it?
664
666
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
667
+
665
668
while (true ) {
666
669
vector<SrsAsyncSRTPPacket*> flying;
667
670
packets_->swap (flying);
@@ -719,3 +722,112 @@ srs_error_t SrsAsyncSRTPManager::consume()
719
722
}
720
723
721
724
SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager();
725
+
726
+ SrsThreadUdpListener::SrsThreadUdpListener (srs_netfd_t fd)
727
+ {
728
+ skt_ = new SrsUdpMuxSocket (fd);
729
+ }
730
+
731
+ SrsThreadUdpListener::~SrsThreadUdpListener ()
732
+ {
733
+ }
734
+
735
+ SrsAsyncRecvManager::SrsAsyncRecvManager ()
736
+ {
737
+ lock_ = new SrsThreadMutex ();
738
+ packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
739
+ handler_ = NULL ;
740
+ }
741
+
742
+ // TODO: FIXME: We should stop the thread first, then free the manager.
743
+ SrsAsyncRecvManager::~SrsAsyncRecvManager ()
744
+ {
745
+ srs_freep (lock_);
746
+ srs_freep (packets_);
747
+
748
+ vector<SrsThreadUdpListener*>::iterator it;
749
+ for (it = listeners_.begin (); it != listeners_.end (); ++it) {
750
+ SrsThreadUdpListener* listener = *it;
751
+ srs_freep (listener);
752
+ }
753
+ }
754
+
755
+ void SrsAsyncRecvManager::set_handler (ISrsUdpMuxHandler* v)
756
+ {
757
+ handler_ = v;
758
+ }
759
+
760
+ void SrsAsyncRecvManager::add_listener (SrsThreadUdpListener* listener)
761
+ {
762
+ SrsThreadLocker (lock_);
763
+ listeners_.push_back (listener);
764
+ }
765
+
766
+ srs_error_t SrsAsyncRecvManager::start (void * arg)
767
+ {
768
+ SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
769
+ return recv->do_start ();
770
+ }
771
+
772
+ srs_error_t SrsAsyncRecvManager::do_start ()
773
+ {
774
+ srs_error_t err = srs_success;
775
+
776
+ // TODO: FIXME: Config it?
777
+ srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
778
+
779
+ while (true ) {
780
+ vector<SrsThreadUdpListener*> listeners;
781
+ if (true ) {
782
+ SrsThreadLocker (lock_);
783
+ listeners = listeners_;
784
+ }
785
+
786
+ bool got_packet = false ;
787
+ for (int i = 0 ; i < (int )listeners.size (); i++) {
788
+ SrsThreadUdpListener* listener = listeners.at (i);
789
+
790
+ // TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
791
+ int nread = listener->skt_ ->raw_recvfrom ();
792
+ if (nread > 0 ) {
793
+ got_packet = true ;
794
+ packets_->push_back (listener->skt_ ->copy ());
795
+ }
796
+ }
797
+
798
+ // If got packets, maybe more packets in queue.
799
+ if (got_packet) {
800
+ continue ;
801
+ }
802
+
803
+ // TODO: FIXME: Maybe we should use cond wait?
804
+ timespec tv = {0 };
805
+ tv.tv_sec = interval / SRS_UTIME_SECONDS;
806
+ tv.tv_nsec = (interval % SRS_UTIME_SECONDS) * 1000 ;
807
+ nanosleep (&tv, NULL );
808
+ }
809
+
810
+ return err;
811
+ }
812
+
813
+ srs_error_t SrsAsyncRecvManager::consume ()
814
+ {
815
+ srs_error_t err = srs_success;
816
+
817
+ vector<SrsUdpMuxSocket*> flying;
818
+ packets_->swap (flying);
819
+
820
+ for (int i = 0 ; i < (int )flying.size (); i++) {
821
+ SrsUdpMuxSocket* pkt = flying.at (i);
822
+
823
+ if (handler_ && (err = handler_->on_udp_packet (pkt)) != srs_success) {
824
+ srs_error_reset (err); // Ignore any error.
825
+ }
826
+
827
+ srs_freep (pkt);
828
+ }
829
+
830
+ return err;
831
+ }
832
+
833
+ SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager();
0 commit comments