@@ -453,15 +453,15 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
453
453
{
454
454
filename_ = p;
455
455
writer_ = new SrsFileWriter ();
456
- queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
456
+ chunks_ = new SrsThreadQueue<SrsSharedPtrMessage>();
457
457
}
458
458
459
459
// TODO: FIXME: Before free the writer, we must remove it from the manager.
460
460
SrsAsyncFileWriter::~SrsAsyncFileWriter ()
461
461
{
462
462
// TODO: FIXME: Should we flush dirty logs?
463
463
srs_freep (writer_);
464
- srs_freep (queue_ );
464
+ srs_freep (chunks_ );
465
465
}
466
466
467
467
srs_error_t SrsAsyncFileWriter::open ()
@@ -493,7 +493,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
493
493
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage ();
494
494
msg->wrap (cp, count);
495
495
496
- queue_ ->push_back (msg);
496
+ chunks_ ->push_back (msg);
497
497
498
498
if (pnwrite) {
499
499
*pnwrite = count;
@@ -530,9 +530,9 @@ srs_error_t SrsAsyncFileWriter::flush()
530
530
// at queue to push_back or swap all messages.
531
531
srs_utime_t now = srs_update_system_time ();
532
532
533
- vector<SrsSharedPtrMessage*> flying ;
533
+ vector<SrsSharedPtrMessage*> flying_chunks ;
534
534
if (true ) {
535
- queue_ ->swap (flying );
535
+ chunks_ ->swap (flying_chunks );
536
536
}
537
537
538
538
// Stat the sync wait of locks.
@@ -547,9 +547,9 @@ srs_error_t SrsAsyncFileWriter::flush()
547
547
++_srs_thread_sync_plus->sugar ;
548
548
}
549
549
550
- // Flush the flying messages to disk.
551
- for (int i = 0 ; i < (int )flying .size (); i++) {
552
- SrsSharedPtrMessage* msg = flying .at (i);
550
+ // Flush the chunks to disk.
551
+ for (int i = 0 ; i < (int )flying_chunks .size (); i++) {
552
+ SrsSharedPtrMessage* msg = flying_chunks .at (i);
553
553
554
554
srs_error_t r0 = writer_->write (msg->payload , msg->size , NULL );
555
555
@@ -639,7 +639,7 @@ std::string SrsAsyncLogManager::description()
639
639
for (int i = 0 ; i < (int )writers_.size (); i++) {
640
640
SrsAsyncFileWriter* writer = writers_.at (i);
641
641
642
- int nn = (int )writer->queue_ ->size ();
642
+ int nn = (int )writer->chunks_ ->size ();
643
643
nn_logs += nn;
644
644
max_logs = srs_max (max_logs, nn);
645
645
}
@@ -706,8 +706,7 @@ SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)
706
706
707
707
SrsAsyncSRTP::~SrsAsyncSRTP ()
708
708
{
709
- // TODO: FIXME: Check it carefully.
710
- _srs_async_srtp->remove_task (task_);
709
+ _srs_async_srtp->on_srtp_codec_destroy (task_);
711
710
}
712
711
713
712
srs_error_t SrsAsyncSRTP::initialize (std::string recv_key, std::string send_key)
@@ -728,18 +727,30 @@ srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)
728
727
729
728
srs_error_t SrsAsyncSRTP::protect_rtp (void * packet, int * nb_cipher)
730
729
{
730
+ if (!task_) {
731
+ return srs_error_new (ERROR_RTC_SRTP_UNPROTECT, " not ready" );
732
+ }
733
+
731
734
// TODO: FIMXE: Remove it.
732
735
return SrsSRTP::protect_rtp (packet, nb_cipher);
733
736
}
734
737
735
738
srs_error_t SrsAsyncSRTP::protect_rtcp (void * packet, int * nb_cipher)
736
739
{
740
+ if (!task_) {
741
+ return srs_error_new (ERROR_RTC_SRTP_UNPROTECT, " not ready" );
742
+ }
743
+
737
744
// TODO: FIMXE: Remove it.
738
745
return SrsSRTP::protect_rtcp (packet, nb_cipher);
739
746
}
740
747
741
748
srs_error_t SrsAsyncSRTP::unprotect_rtp (void * packet, int * nb_plaintext)
742
749
{
750
+ if (!task_) {
751
+ return srs_error_new (ERROR_RTC_SRTP_UNPROTECT, " not ready" );
752
+ }
753
+
743
754
int nb_cipher = *nb_plaintext;
744
755
char * buf = new char [nb_cipher];
745
756
memcpy (buf, packet, nb_cipher);
@@ -758,6 +769,10 @@ srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
758
769
759
770
srs_error_t SrsAsyncSRTP::unprotect_rtcp (void * packet, int * nb_plaintext)
760
771
{
772
+ if (!task_) {
773
+ return srs_error_new (ERROR_RTC_SRTP_UNPROTECT, " not ready" );
774
+ }
775
+
761
776
// TODO: FIMXE: Remove it.
762
777
return SrsSRTP::unprotect_rtcp (packet, nb_plaintext);
763
778
}
@@ -766,6 +781,7 @@ SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
766
781
{
767
782
codec_ = codec;
768
783
impl_ = new SrsSRTP ();
784
+ disposing_ = false ;
769
785
}
770
786
771
787
SrsAsyncSRTPTask::~SrsAsyncSRTPTask ()
@@ -784,10 +800,25 @@ srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_
784
800
return err;
785
801
}
786
802
803
+ void SrsAsyncSRTPTask::dispose ()
804
+ {
805
+ // TODO: FIXME: Do cleanup in future.
806
+ // TODO: FIXME: Memory leak here, use lazy free to avoid lock for each packet.
807
+ disposing_ = true ;
808
+
809
+ // It's safe to set the codec to NULl, because it has been freed.
810
+ codec_ = NULL ;
811
+ }
812
+
787
813
srs_error_t SrsAsyncSRTPTask::cook (SrsAsyncSRTPPacket* pkt)
788
814
{
789
815
srs_error_t err = srs_success;
790
816
817
+ // It's safe, because here we do not use the codec.
818
+ if (disposing_) {
819
+ return err;
820
+ }
821
+
791
822
if (pkt->do_decrypt_ ) {
792
823
if (pkt->is_rtp_ ) {
793
824
pkt->nb_consumed_ = pkt->msg_ ->size ;
@@ -801,8 +832,30 @@ srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
801
832
return err;
802
833
}
803
834
835
+ srs_error_t SrsAsyncSRTPTask::consume (SrsAsyncSRTPPacket* pkt)
836
+ {
837
+ srs_error_t err = srs_success;
838
+
839
+ // It's safe, because the dispose and consume are in the same thread hybrid.
840
+ if (disposing_) {
841
+ return err;
842
+ }
843
+
844
+ char * payload = pkt->msg_ ->payload ;
845
+
846
+ if (pkt->do_decrypt_ ) {
847
+ if (pkt->is_rtp_ ) {
848
+ err = codec_->transport_ ->on_rtp_plaintext (payload, pkt->nb_consumed_ );
849
+ }
850
+ }
851
+
852
+ return err;
853
+ }
854
+
804
855
SrsAsyncSRTPPacket::SrsAsyncSRTPPacket (SrsAsyncSRTPTask* task)
805
856
{
857
+ srs_assert (task);
858
+
806
859
task_ = task;
807
860
msg_ = new SrsSharedPtrMessage ();
808
861
is_rtp_ = false ;
@@ -818,7 +871,7 @@ SrsAsyncSRTPPacket::~SrsAsyncSRTPPacket()
818
871
SrsAsyncSRTPManager::SrsAsyncSRTPManager ()
819
872
{
820
873
lock_ = new SrsThreadMutex ();
821
- packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
874
+ srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
822
875
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
823
876
trd_ = new SrsFastCoroutine (" srtp" , this );
824
877
}
@@ -829,7 +882,7 @@ SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
829
882
srs_freep (trd_);
830
883
831
884
srs_freep (lock_);
832
- srs_freep (packets_ );
885
+ srs_freep (srtp_packets_ );
833
886
srs_freep (cooked_packets_);
834
887
835
888
vector<SrsAsyncSRTPTask*>::iterator it;
@@ -849,7 +902,7 @@ void SrsAsyncSRTPManager::register_task(SrsAsyncSRTPTask* task)
849
902
tasks_.push_back (task);
850
903
}
851
904
852
- void SrsAsyncSRTPManager::remove_task (SrsAsyncSRTPTask* task)
905
+ void SrsAsyncSRTPManager::on_srtp_codec_destroy (SrsAsyncSRTPTask* task)
853
906
{
854
907
if (!task) {
855
908
return ;
@@ -859,19 +912,21 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
859
912
vector<SrsAsyncSRTPTask*>::iterator it;
860
913
if ((it = std::find (tasks_.begin (), tasks_.end (), task)) != tasks_.end ()) {
861
914
tasks_.erase (it);
862
- srs_freep (task);
915
+
916
+ // TODO: FIXME: Do cleanup in future.
917
+ task->dispose ();
863
918
}
864
919
}
865
920
866
921
// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
867
922
void SrsAsyncSRTPManager::add_packet (SrsAsyncSRTPPacket* pkt)
868
923
{
869
- packets_ ->push_back (pkt);
924
+ srtp_packets_ ->push_back (pkt);
870
925
}
871
926
872
927
int SrsAsyncSRTPManager::size ()
873
928
{
874
- return packets_ ->size ();
929
+ return srtp_packets_ ->size ();
875
930
}
876
931
int SrsAsyncSRTPManager::cooked_size ()
877
932
{
@@ -892,11 +947,11 @@ srs_error_t SrsAsyncSRTPManager::do_start()
892
947
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
893
948
894
949
while (true ) {
895
- vector<SrsAsyncSRTPPacket*> flying ;
896
- packets_ ->swap (flying );
950
+ vector<SrsAsyncSRTPPacket*> flying_srtp_packets ;
951
+ srtp_packets_ ->swap (flying_srtp_packets );
897
952
898
- for (int i = 0 ; i < (int )flying .size (); i++) {
899
- SrsAsyncSRTPPacket* pkt = flying .at (i);
953
+ for (int i = 0 ; i < (int )flying_srtp_packets .size (); i++) {
954
+ SrsAsyncSRTPPacket* pkt = flying_srtp_packets .at (i);
900
955
901
956
if ((err = pkt->task_ ->cook (pkt)) != srs_success) {
902
957
srs_error_reset (err); // Ignore any error.
@@ -906,7 +961,7 @@ srs_error_t SrsAsyncSRTPManager::do_start()
906
961
}
907
962
908
963
// If got packets, maybe more packets in queue.
909
- if (!flying .empty ()) {
964
+ if (!flying_srtp_packets .empty ()) {
910
965
continue ;
911
966
}
912
967
@@ -943,26 +998,19 @@ srs_error_t SrsAsyncSRTPManager::cycle()
943
998
return srs_error_wrap (err, " pull" );
944
999
}
945
1000
946
- vector<SrsAsyncSRTPPacket*> flying ;
947
- cooked_packets_->swap (flying );
1001
+ vector<SrsAsyncSRTPPacket*> flying_cooked_packets ;
1002
+ cooked_packets_->swap (flying_cooked_packets );
948
1003
949
- if (flying .empty ()) {
1004
+ if (flying_cooked_packets .empty ()) {
950
1005
srs_usleep (20 * SRS_UTIME_MILLISECONDS);
951
1006
continue ;
952
1007
}
953
1008
954
- for (int i = 0 ; i < (int )flying.size (); i++) {
955
- SrsAsyncSRTPPacket* pkt = flying.at (i);
956
- SrsSecurityTransport* transport = pkt->task_ ->codec_ ->transport_ ;
957
- char * payload = pkt->msg_ ->payload ;
1009
+ for (int i = 0 ; i < (int )flying_cooked_packets.size (); i++) {
1010
+ SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at (i);
958
1011
959
- if (pkt->do_decrypt_ ) {
960
- if (pkt->is_rtp_ ) {
961
- err = transport->on_rtp_plaintext (payload, pkt->nb_consumed_ );
962
- }
963
- }
964
- if (err != srs_success) {
965
- srs_error_reset (err); // Ignore any error.
1012
+ if ((err = pkt->task_ ->consume (pkt)) != srs_success) {
1013
+ srs_error_reset (err);
966
1014
}
967
1015
968
1016
srs_freep (pkt);
@@ -993,7 +1041,7 @@ SrsThreadUdpListener::~SrsThreadUdpListener()
993
1041
SrsAsyncRecvManager::SrsAsyncRecvManager ()
994
1042
{
995
1043
lock_ = new SrsThreadMutex ();
996
- packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
1044
+ received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
997
1045
handler_ = NULL ;
998
1046
max_recv_queue_ = 0 ;
999
1047
trd_ = new SrsFastCoroutine (" recv" , this );
@@ -1005,7 +1053,7 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
1005
1053
srs_freep (trd_);
1006
1054
1007
1055
srs_freep (lock_);
1008
- srs_freep (packets_ );
1056
+ srs_freep (received_packets_ );
1009
1057
1010
1058
vector<SrsThreadUdpListener*>::iterator it;
1011
1059
for (it = listeners_.begin (); it != listeners_.end (); ++it) {
@@ -1027,7 +1075,7 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
1027
1075
1028
1076
int SrsAsyncRecvManager::size ()
1029
1077
{
1030
- return packets_ ->size ();
1078
+ return received_packets_ ->size ();
1031
1079
}
1032
1080
1033
1081
srs_error_t SrsAsyncRecvManager::start (void * arg)
@@ -1062,15 +1110,15 @@ srs_error_t SrsAsyncRecvManager::do_start()
1062
1110
}
1063
1111
1064
1112
// Drop packet if queue is critical full.
1065
- int nb_packets = (int )packets_ ->size ();
1113
+ int nb_packets = (int )received_packets_ ->size ();
1066
1114
if (nb_packets >= max_recv_queue_) {
1067
1115
++_srs_pps_aloss->sugar ;
1068
1116
continue ;
1069
1117
}
1070
1118
1071
1119
// If got packet, copy to the queue.
1072
1120
got_packets = true ;
1073
- packets_ ->push_back (listener->skt_ ->copy ());
1121
+ received_packets_ ->push_back (listener->skt_ ->copy ());
1074
1122
}
1075
1123
}
1076
1124
@@ -1115,16 +1163,16 @@ srs_error_t SrsAsyncRecvManager::cycle()
1115
1163
return srs_error_wrap (err, " pull" );
1116
1164
}
1117
1165
1118
- vector<SrsUdpMuxSocket*> flying ;
1119
- packets_ ->swap (flying );
1166
+ vector<SrsUdpMuxSocket*> flying_received_packets ;
1167
+ received_packets_ ->swap (flying_received_packets );
1120
1168
1121
- if (flying .empty ()) {
1169
+ if (flying_received_packets .empty ()) {
1122
1170
srs_usleep (20 * SRS_UTIME_MILLISECONDS);
1123
1171
continue ;
1124
1172
}
1125
1173
1126
- for (int i = 0 ; i < (int )flying .size (); i++) {
1127
- SrsUdpMuxSocket* pkt = flying .at (i);
1174
+ for (int i = 0 ; i < (int )flying_received_packets .size (); i++) {
1175
+ SrsUdpMuxSocket* pkt = flying_received_packets .at (i);
1128
1176
1129
1177
if (handler_ && (err = handler_->on_udp_packet (pkt)) != srs_success) {
1130
1178
srs_error_reset (err); // Ignore any error.
0 commit comments