Skip to content

Commit 57b771a

Browse files
committed
Threads: Refine variables and do dispose
1. Rename packets to srtp or received packets. 2. Add task dispose API, cleanup in future. 3. If got packets before init AsyncSRTP, return error. 4. Never free the SRTPTask, dispose it instead.
1 parent 9a05d24 commit 57b771a

File tree

2 files changed

+105
-52
lines changed

2 files changed

+105
-52
lines changed

trunk/src/app/srs_app_threads.cpp

+94-46
Original file line numberDiff line numberDiff line change
@@ -453,15 +453,15 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
453453
{
454454
filename_ = p;
455455
writer_ = new SrsFileWriter();
456-
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
456+
chunks_ = new SrsThreadQueue<SrsSharedPtrMessage>();
457457
}
458458

459459
// TODO: FIXME: Before free the writer, we must remove it from the manager.
460460
SrsAsyncFileWriter::~SrsAsyncFileWriter()
461461
{
462462
// TODO: FIXME: Should we flush dirty logs?
463463
srs_freep(writer_);
464-
srs_freep(queue_);
464+
srs_freep(chunks_);
465465
}
466466

467467
srs_error_t SrsAsyncFileWriter::open()
@@ -493,7 +493,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
493493
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
494494
msg->wrap(cp, count);
495495

496-
queue_->push_back(msg);
496+
chunks_->push_back(msg);
497497

498498
if (pnwrite) {
499499
*pnwrite = count;
@@ -530,9 +530,9 @@ srs_error_t SrsAsyncFileWriter::flush()
530530
// at queue to push_back or swap all messages.
531531
srs_utime_t now = srs_update_system_time();
532532

533-
vector<SrsSharedPtrMessage*> flying;
533+
vector<SrsSharedPtrMessage*> flying_chunks;
534534
if (true) {
535-
queue_->swap(flying);
535+
chunks_->swap(flying_chunks);
536536
}
537537

538538
// Stat the sync wait of locks.
@@ -547,9 +547,9 @@ srs_error_t SrsAsyncFileWriter::flush()
547547
++_srs_thread_sync_plus->sugar;
548548
}
549549

550-
// Flush the flying messages to disk.
551-
for (int i = 0; i < (int)flying.size(); i++) {
552-
SrsSharedPtrMessage* msg = flying.at(i);
550+
// Flush the chunks to disk.
551+
for (int i = 0; i < (int)flying_chunks.size(); i++) {
552+
SrsSharedPtrMessage* msg = flying_chunks.at(i);
553553

554554
srs_error_t r0 = writer_->write(msg->payload, msg->size, NULL);
555555

@@ -639,7 +639,7 @@ std::string SrsAsyncLogManager::description()
639639
for (int i = 0; i < (int)writers_.size(); i++) {
640640
SrsAsyncFileWriter* writer = writers_.at(i);
641641

642-
int nn = (int)writer->queue_->size();
642+
int nn = (int)writer->chunks_->size();
643643
nn_logs += nn;
644644
max_logs = srs_max(max_logs, nn);
645645
}
@@ -706,8 +706,7 @@ SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)
706706

707707
SrsAsyncSRTP::~SrsAsyncSRTP()
708708
{
709-
// TODO: FIXME: Check it carefully.
710-
_srs_async_srtp->remove_task(task_);
709+
_srs_async_srtp->on_srtp_codec_destroy(task_);
711710
}
712711

713712
srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)
@@ -728,18 +727,30 @@ srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)
728727

729728
srs_error_t SrsAsyncSRTP::protect_rtp(void* packet, int* nb_cipher)
730729
{
730+
if (!task_) {
731+
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
732+
}
733+
731734
// TODO: FIMXE: Remove it.
732735
return SrsSRTP::protect_rtp(packet, nb_cipher);
733736
}
734737

735738
srs_error_t SrsAsyncSRTP::protect_rtcp(void* packet, int* nb_cipher)
736739
{
740+
if (!task_) {
741+
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
742+
}
743+
737744
// TODO: FIMXE: Remove it.
738745
return SrsSRTP::protect_rtcp(packet, nb_cipher);
739746
}
740747

741748
srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
742749
{
750+
if (!task_) {
751+
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
752+
}
753+
743754
int nb_cipher = *nb_plaintext;
744755
char* buf = new char[nb_cipher];
745756
memcpy(buf, packet, nb_cipher);
@@ -758,6 +769,10 @@ srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
758769

759770
srs_error_t SrsAsyncSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
760771
{
772+
if (!task_) {
773+
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
774+
}
775+
761776
// TODO: FIMXE: Remove it.
762777
return SrsSRTP::unprotect_rtcp(packet, nb_plaintext);
763778
}
@@ -766,6 +781,7 @@ SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
766781
{
767782
codec_ = codec;
768783
impl_ = new SrsSRTP();
784+
disposing_ = false;
769785
}
770786

771787
SrsAsyncSRTPTask::~SrsAsyncSRTPTask()
@@ -784,10 +800,25 @@ srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_
784800
return err;
785801
}
786802

803+
void SrsAsyncSRTPTask::dispose()
804+
{
805+
// TODO: FIXME: Do cleanup in future.
806+
// TODO: FIXME: Memory leak here, use lazy free to avoid lock for each packet.
807+
disposing_ = true;
808+
809+
// It's safe to set the codec to NULl, because it has been freed.
810+
codec_ = NULL;
811+
}
812+
787813
srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
788814
{
789815
srs_error_t err = srs_success;
790816

817+
// It's safe, because here we do not use the codec.
818+
if (disposing_) {
819+
return err;
820+
}
821+
791822
if (pkt->do_decrypt_) {
792823
if (pkt->is_rtp_) {
793824
pkt->nb_consumed_ = pkt->msg_->size;
@@ -801,8 +832,30 @@ srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
801832
return err;
802833
}
803834

835+
srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt)
836+
{
837+
srs_error_t err = srs_success;
838+
839+
// It's safe, because the dispose and consume are in the same thread hybrid.
840+
if (disposing_) {
841+
return err;
842+
}
843+
844+
char* payload = pkt->msg_->payload;
845+
846+
if (pkt->do_decrypt_) {
847+
if (pkt->is_rtp_) {
848+
err = codec_->transport_->on_rtp_plaintext(payload, pkt->nb_consumed_);
849+
}
850+
}
851+
852+
return err;
853+
}
854+
804855
SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task)
805856
{
857+
srs_assert(task);
858+
806859
task_ = task;
807860
msg_ = new SrsSharedPtrMessage();
808861
is_rtp_ = false;
@@ -818,7 +871,7 @@ SrsAsyncSRTPPacket::~SrsAsyncSRTPPacket()
818871
SrsAsyncSRTPManager::SrsAsyncSRTPManager()
819872
{
820873
lock_ = new SrsThreadMutex();
821-
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
874+
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
822875
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
823876
trd_ = new SrsFastCoroutine("srtp", this);
824877
}
@@ -829,7 +882,7 @@ SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
829882
srs_freep(trd_);
830883

831884
srs_freep(lock_);
832-
srs_freep(packets_);
885+
srs_freep(srtp_packets_);
833886
srs_freep(cooked_packets_);
834887

835888
vector<SrsAsyncSRTPTask*>::iterator it;
@@ -849,7 +902,7 @@ void SrsAsyncSRTPManager::register_task(SrsAsyncSRTPTask* task)
849902
tasks_.push_back(task);
850903
}
851904

852-
void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
905+
void SrsAsyncSRTPManager::on_srtp_codec_destroy(SrsAsyncSRTPTask* task)
853906
{
854907
if (!task) {
855908
return;
@@ -859,19 +912,21 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
859912
vector<SrsAsyncSRTPTask*>::iterator it;
860913
if ((it = std::find(tasks_.begin(), tasks_.end(), task)) != tasks_.end()) {
861914
tasks_.erase(it);
862-
srs_freep(task);
915+
916+
// TODO: FIXME: Do cleanup in future.
917+
task->dispose();
863918
}
864919
}
865920

866921
// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
867922
void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
868923
{
869-
packets_->push_back(pkt);
924+
srtp_packets_->push_back(pkt);
870925
}
871926

872927
int SrsAsyncSRTPManager::size()
873928
{
874-
return packets_->size();
929+
return srtp_packets_->size();
875930
}
876931
int SrsAsyncSRTPManager::cooked_size()
877932
{
@@ -892,11 +947,11 @@ srs_error_t SrsAsyncSRTPManager::do_start()
892947
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
893948

894949
while (true) {
895-
vector<SrsAsyncSRTPPacket*> flying;
896-
packets_->swap(flying);
950+
vector<SrsAsyncSRTPPacket*> flying_srtp_packets;
951+
srtp_packets_->swap(flying_srtp_packets);
897952

898-
for (int i = 0; i < (int)flying.size(); i++) {
899-
SrsAsyncSRTPPacket* pkt = flying.at(i);
953+
for (int i = 0; i < (int)flying_srtp_packets.size(); i++) {
954+
SrsAsyncSRTPPacket* pkt = flying_srtp_packets.at(i);
900955

901956
if ((err = pkt->task_->cook(pkt)) != srs_success) {
902957
srs_error_reset(err); // Ignore any error.
@@ -906,7 +961,7 @@ srs_error_t SrsAsyncSRTPManager::do_start()
906961
}
907962

908963
// If got packets, maybe more packets in queue.
909-
if (!flying.empty()) {
964+
if (!flying_srtp_packets.empty()) {
910965
continue;
911966
}
912967

@@ -943,26 +998,19 @@ srs_error_t SrsAsyncSRTPManager::cycle()
943998
return srs_error_wrap(err, "pull");
944999
}
9451000

946-
vector<SrsAsyncSRTPPacket*> flying;
947-
cooked_packets_->swap(flying);
1001+
vector<SrsAsyncSRTPPacket*> flying_cooked_packets;
1002+
cooked_packets_->swap(flying_cooked_packets);
9481003

949-
if (flying.empty()) {
1004+
if (flying_cooked_packets.empty()) {
9501005
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
9511006
continue;
9521007
}
9531008

954-
for (int i = 0; i < (int)flying.size(); i++) {
955-
SrsAsyncSRTPPacket* pkt = flying.at(i);
956-
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
957-
char* payload = pkt->msg_->payload;
1009+
for (int i = 0; i < (int)flying_cooked_packets.size(); i++) {
1010+
SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at(i);
9581011

959-
if (pkt->do_decrypt_) {
960-
if (pkt->is_rtp_) {
961-
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
962-
}
963-
}
964-
if (err != srs_success) {
965-
srs_error_reset(err); // Ignore any error.
1012+
if ((err = pkt->task_->consume(pkt)) != srs_success) {
1013+
srs_error_reset(err);
9661014
}
9671015

9681016
srs_freep(pkt);
@@ -993,7 +1041,7 @@ SrsThreadUdpListener::~SrsThreadUdpListener()
9931041
SrsAsyncRecvManager::SrsAsyncRecvManager()
9941042
{
9951043
lock_ = new SrsThreadMutex();
996-
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
1044+
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
9971045
handler_ = NULL;
9981046
max_recv_queue_ = 0;
9991047
trd_ = new SrsFastCoroutine("recv", this);
@@ -1005,7 +1053,7 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
10051053
srs_freep(trd_);
10061054

10071055
srs_freep(lock_);
1008-
srs_freep(packets_);
1056+
srs_freep(received_packets_);
10091057

10101058
vector<SrsThreadUdpListener*>::iterator it;
10111059
for (it = listeners_.begin(); it != listeners_.end(); ++it) {
@@ -1027,7 +1075,7 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
10271075

10281076
int SrsAsyncRecvManager::size()
10291077
{
1030-
return packets_->size();
1078+
return received_packets_->size();
10311079
}
10321080

10331081
srs_error_t SrsAsyncRecvManager::start(void* arg)
@@ -1062,15 +1110,15 @@ srs_error_t SrsAsyncRecvManager::do_start()
10621110
}
10631111

10641112
// Drop packet if queue is critical full.
1065-
int nb_packets = (int)packets_->size();
1113+
int nb_packets = (int)received_packets_->size();
10661114
if (nb_packets >= max_recv_queue_) {
10671115
++_srs_pps_aloss->sugar;
10681116
continue;
10691117
}
10701118

10711119
// If got packet, copy to the queue.
10721120
got_packets = true;
1073-
packets_->push_back(listener->skt_->copy());
1121+
received_packets_->push_back(listener->skt_->copy());
10741122
}
10751123
}
10761124

@@ -1115,16 +1163,16 @@ srs_error_t SrsAsyncRecvManager::cycle()
11151163
return srs_error_wrap(err, "pull");
11161164
}
11171165

1118-
vector<SrsUdpMuxSocket*> flying;
1119-
packets_->swap(flying);
1166+
vector<SrsUdpMuxSocket*> flying_received_packets;
1167+
received_packets_->swap(flying_received_packets);
11201168

1121-
if (flying.empty()) {
1169+
if (flying_received_packets.empty()) {
11221170
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
11231171
continue;
11241172
}
11251173

1126-
for (int i = 0; i < (int)flying.size(); i++) {
1127-
SrsUdpMuxSocket* pkt = flying.at(i);
1174+
for (int i = 0; i < (int)flying_received_packets.size(); i++) {
1175+
SrsUdpMuxSocket* pkt = flying_received_packets.at(i);
11281176

11291177
if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
11301178
srs_error_reset(err); // Ignore any error.

0 commit comments

Comments
 (0)