Skip to content

Commit eab4f89

Browse files
committed
Threads-Hybrid: Change global variables to thread-local or with lock
1. Thread-Safe: Config subscribes, subscribe or unsubscribe. 2. Global-Shared: Async SRTP/RECV/SEND/Log use thread-safe objects. 3. Global-Shared: SRTP and DTLS certificate, without critical data. 4. Thread-Local: Blackhole, ResourceManager, StreamManager, ObjectCache, by design. 5. Global-Shared: Log and context, which use thread-safe objects. 6. Thread-Local: Pithy print for each thread.
1 parent 7dbac15 commit eab4f89

22 files changed

+93
-50
lines changed

trunk/src/app/srs_app_config.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ using namespace std;
5656
#include <srs_app_http_hooks.hpp>
5757
#include <srs_kernel_utility.hpp>
5858
#include <srs_rtmp_stack.hpp>
59+
#include <srs_app_threads.hpp>
5960

6061
using namespace srs_internal;
6162

@@ -1189,15 +1190,20 @@ SrsConfig::SrsConfig()
11891190
root = new SrsConfDirective();
11901191
root->conf_line = 0;
11911192
root->name = "root";
1193+
1194+
lock_ = new SrsThreadMutex();
11921195
}
11931196

11941197
SrsConfig::~SrsConfig()
11951198
{
1199+
srs_freep(lock_);
11961200
srs_freep(root);
11971201
}
11981202

11991203
void SrsConfig::subscribe(ISrsReloadHandler* handler)
12001204
{
1205+
SrsThreadLocker(lock_);
1206+
12011207
std::vector<ISrsReloadHandler*>::iterator it;
12021208

12031209
it = std::find(subscribes.begin(), subscribes.end(), handler);
@@ -1210,6 +1216,8 @@ void SrsConfig::subscribe(ISrsReloadHandler* handler)
12101216

12111217
void SrsConfig::unsubscribe(ISrsReloadHandler* handler)
12121218
{
1219+
SrsThreadLocker(lock_);
1220+
12131221
std::vector<ISrsReloadHandler*>::iterator it;
12141222

12151223
it = std::find(subscribes.begin(), subscribes.end(), handler);

trunk/src/app/srs_app_config.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class SrsConfig;
4646
class SrsRequest;
4747
class SrsJsonArray;
4848
class SrsConfDirective;
49+
class SrsThreadMutex;
4950

5051
/**
5152
* whether the two vector actual equals, for instance,
@@ -140,6 +141,7 @@ extern srs_error_t srs_config_transform_vhost(SrsConfDirective* root);
140141
extern srs_error_t srs_config_transform_vhost2(SrsConfDirective* root);
141142

142143
// TODO: FIXME: It should be thread-local or thread-safe.
144+
// TODO: FIXME: We should use channel to deliver changes of config.
143145
extern SrsConfig* _srs_config;
144146

145147
// The config directive.
@@ -299,6 +301,7 @@ class SrsConfig
299301
private:
300302
// The reload subscribers, when reload, callback all handlers.
301303
std::vector<ISrsReloadHandler*> subscribes;
304+
SrsThreadMutex* lock_;
302305
public:
303306
SrsConfig();
304307
virtual ~SrsConfig();

trunk/src/app/srs_app_pithy_print.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ bool SrsErrorPithyPrint::can_print(int error_code, uint32_t* pnn)
165165
return new_stage || stage->can_print();
166166
}
167167

168-
// The global stage manager for pithy print, multiple stages.
169-
static SrsStageManager* _srs_stages = new SrsStageManager();
168+
// It MUST be thread-local, by design.
169+
__thread SrsStageManager* _srs_stages = NULL;
170170

171171
SrsPithyPrint::SrsPithyPrint(int _stage_id)
172172
{

trunk/src/app/srs_app_rtc_conn.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ class ISrsRtcHijacker
582582
};
583583

584584
// TODO: FIXME: It should be thread-local or thread-safe.
585+
// TODO: FIXME: It seems thread-local make sense.
585586
extern ISrsRtcHijacker* _srs_rtc_hijacker;
586587

587588
#endif

trunk/src/app/srs_app_rtc_dtls.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,9 @@ srs_error_t SrsDtlsCertificate::initialize()
246246
// OPENSSL_init_ssl();
247247
#endif
248248

249+
// Initialize SRTP first.
250+
srs_assert(srtp_init() == 0);
251+
249252
// Whether use ECDSA certificate.
250253
ecdsa_mode = _srs_config->get_rtc_server_ecdsa();
251254

trunk/src/app/srs_app_rtc_dtls.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class SrsDtlsCertificate
6363
bool is_ecdsa();
6464
};
6565

66-
// TODO: FIXME: It should be thread-local or thread-safe.
66+
// It's shared global object, MUST be thread-safe.
6767
extern SrsDtlsCertificate* _srs_rtc_dtls_certificate;
6868

6969
// @remark: play the role of DTLS_CLIENT, will send handshake

trunk/src/app/srs_app_rtc_server.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ void SrsRtcBlackhole::sendto(void* data, int len)
139139
srs_sendto(blackhole_stfd, data, len, (sockaddr*)blackhole_addr, sizeof(sockaddr_in), SRS_UTIME_NO_TIMEOUT);
140140
}
141141

142-
SrsRtcBlackhole* _srs_blackhole = new SrsRtcBlackhole();
142+
__thread SrsRtcBlackhole* _srs_blackhole = NULL;
143143

144144
// @global dtls certficate for rtc module.
145145
SrsDtlsCertificate* _srs_rtc_dtls_certificate = new SrsDtlsCertificate();
@@ -749,7 +749,6 @@ srs_error_t RtcServerAdapter::run()
749749
return srs_error_wrap(err, "listen udp");
750750
}
751751

752-
// TODO: FIXME: It should be thread-local or thread-safe.
753752
if ((err = _srs_rtc_manager->start()) != srs_success) {
754753
return srs_error_wrap(err, "start manager");
755754
}
@@ -761,6 +760,5 @@ void RtcServerAdapter::stop()
761760
{
762761
}
763762

764-
// TODO: FIXME: It should be thread-local or thread-safe.
765-
SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true);
763+
__thread SrsResourceManager* _srs_rtc_manager = NULL;
766764

trunk/src/app/srs_app_rtc_server.hpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ class SrsRtcBlackhole
6060
void sendto(void* data, int len);
6161
};
6262

63-
// TODO: FIXME: It should be thread-local or thread-safe.
64-
extern SrsRtcBlackhole* _srs_blackhole;
63+
// It MUST be thread-local, because it create ST socket.
64+
extern __thread SrsRtcBlackhole* _srs_blackhole;
6565

6666
// The handler for RTC server to call.
6767
class ISrsRtcServerHandler
@@ -145,8 +145,8 @@ class RtcServerAdapter : public ISrsHybridServer
145145
virtual void stop();
146146
};
147147

148-
// TODO: FIXME: It should be thread-local or thread-safe.
149-
extern SrsResourceManager* _srs_rtc_manager;
148+
// It SHOULD be thread-local, because used to find connection for each UDP packet.
149+
extern __thread SrsResourceManager* _srs_rtc_manager;
150150

151151
#endif
152152

trunk/src/app/srs_app_rtc_source.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,7 @@ SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
320320
return source;
321321
}
322322

323-
// TODO: FIXME: It should be thread-local or thread-safe.
324-
SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager();
323+
__thread SrsRtcStreamManager* _srs_rtc_sources = NULL;
325324

326325
ISrsRtcPublishStream::ISrsRtcPublishStream()
327326
{

trunk/src/app/srs_app_rtc_source.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ class SrsRtcStreamManager
138138
virtual SrsRtcStream* fetch(SrsRequest* r);
139139
};
140140

141-
// TODO: FIXME: It should be thread-local or thread-safe.
142-
extern SrsRtcStreamManager* _srs_rtc_sources;
141+
// It SHOULD be thread-local, because stream source is isolated by threads.
142+
extern __thread SrsRtcStreamManager* _srs_rtc_sources;
143143

144144
// A publish stream interface, for source to callback with.
145145
class ISrsRtcPublishStream

trunk/src/app/srs_app_server.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -977,12 +977,14 @@ srs_error_t SrsServer::cycle()
977977
{
978978
srs_error_t err = srs_success;
979979

980+
// TODO: FIXME: It should be thread-local or thread-safe.
980981
// Start the inotify auto reload by watching config file.
981982
SrsInotifyWorker inotify(this);
982983
if ((err = inotify.start()) != srs_success) {
983984
return srs_error_wrap(err, "start inotify");
984985
}
985986

987+
// TODO: FIXME: It should be thread-local or thread-safe.
986988
// Do server main cycle.
987989
err = do_cycle();
988990

trunk/src/app/srs_app_threads.cpp

+29-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
#include <srs_app_utility.hpp>
3232
#include <srs_app_hybrid.hpp>
3333
#include <srs_app_source.hpp>
34+
#include <srs_app_rtc_server.hpp>
35+
#include <srs_app_conn.hpp>
36+
#include <srs_app_rtc_source.hpp>
37+
#include <srs_kernel_rtc_rtp.hpp>
38+
#include <srs_app_pithy_print.hpp>
3439

3540
#include <unistd.h>
3641

@@ -492,6 +497,7 @@ bool SrsThreadPool::hybrid_dying_water_level()
492497
// Thread local objects.
493498
extern const int LOG_MAX_SIZE;
494499
extern __thread char* _srs_log_data;
500+
extern __thread SrsStageManager* _srs_stages;
495501

496502
// Setup the thread-local variables, MUST call when each thread starting.
497503
srs_error_t SrsThreadPool::setup()
@@ -513,6 +519,25 @@ srs_error_t SrsThreadPool::setup()
513519
// Create the source manager for server.
514520
_srs_sources = new SrsSourceManager();
515521

522+
// The blackhole for RTC server.
523+
_srs_blackhole = new SrsRtcBlackhole();
524+
525+
// The resource manager for RTC server.
526+
_srs_rtc_manager = new SrsResourceManager("RTC", true);
527+
528+
// The source manager for RTC streams.
529+
_srs_rtc_sources = new SrsRtcStreamManager();
530+
531+
// The object cache for RTC server.
532+
_srs_rtp_cache = new SrsRtpObjectCacheManager<SrsRtpPacket2>(sizeof(SrsRtpPacket2));
533+
_srs_rtp_raw_cache = new SrsRtpObjectCacheManager<SrsRtpRawPayload>(sizeof(SrsRtpRawPayload));
534+
_srs_rtp_fua_cache = new SrsRtpObjectCacheManager<SrsRtpFUAPayload2>(sizeof(SrsRtpFUAPayload2));
535+
_srs_rtp_msg_cache_buffers = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage) + kRtpPacketSize);
536+
_srs_rtp_msg_cache_objs = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage));
537+
538+
// The pithy print for each thread.
539+
_srs_stages = new SrsStageManager();
540+
516541
return err;
517542
}
518543

@@ -521,8 +546,6 @@ srs_error_t SrsThreadPool::initialize()
521546
srs_error_t err = srs_success;
522547

523548
// Initialize global shared thread-safe objects once.
524-
srs_assert(srtp_init() == 0);
525-
526549
if ((err = _srs_rtc_dtls_certificate->initialize()) != srs_success) {
527550
return srs_error_wrap(err, "rtc dtls certificate initialize");
528551
}
@@ -581,7 +604,7 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg)
581604
SrsThreadEntry* entry = new SrsThreadEntry();
582605

583606
// Update the hybrid thread entry for circuit breaker.
584-
if (label == "hybrid") {
607+
if (label == "hybrid" && !hybrid_) {
585608
hybrid_ = entry;
586609
}
587610

@@ -664,6 +687,7 @@ srs_error_t SrsThreadPool::run()
664687
}
665688

666689
// Update the Circuit-Breaker by water-level.
690+
// TODO: FIXME: Should stat all hybrid servers.
667691
if (hybrid_ && hybrid_->stat) {
668692
// Reset the high water-level when CPU is low for N times.
669693
if (hybrid_->stat->percent * 100 > high_threshold_) {
@@ -823,7 +847,7 @@ void* SrsThreadPool::start(void* arg)
823847
return NULL;
824848
}
825849

826-
// TODO: FIXME: It should be thread-local or thread-safe.
850+
// It MUST be thread-safe, global and shared object.
827851
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
828852

829853
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
@@ -1072,7 +1096,7 @@ srs_error_t SrsAsyncLogManager::do_start()
10721096
return err;
10731097
}
10741098

1075-
// TODO: FIXME: It should be thread-local or thread-safe.
1099+
// It MUST be thread-safe, global shared object.
10761100
SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager();
10771101

10781102
SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)

trunk/src/app/srs_app_threads.hpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ class SrsThreadPool
362362
static void* start(void* arg);
363363
};
364364

365-
// TODO: FIXME: It should be thread-local or thread-safe.
365+
// It MUST be thread-safe, global and shared object.
366366
extern SrsThreadPool* _srs_thread_pool;
367367

368368
// Async file writer, it's thread safe.
@@ -426,7 +426,7 @@ class SrsAsyncLogManager
426426
srs_error_t do_start();
427427
};
428428

429-
// TODO: FIXME: It should be thread-local or thread-safe.
429+
// It MUST be thread-safe, global shared object.
430430
extern SrsAsyncLogManager* _srs_async_log;
431431

432432
// The async SRTP codec.
@@ -527,7 +527,7 @@ class SrsAsyncSRTPManager
527527
virtual srs_error_t consume(SrsThreadEntry* entry, int* nn_consumed);
528528
};
529529

530-
// TODO: FIXME: It should be thread-local or thread-safe.
530+
// It MUST be thread-safe, because it runs in multiple threads by design.
531531
extern SrsAsyncSRTPManager* _srs_async_srtp;
532532

533533
// A thread-safe UDP listener.
@@ -608,7 +608,7 @@ class SrsAsyncRecvManager
608608
bool consume_by_tunnel(SrsUdpMuxSocket* skt);
609609
};
610610

611-
// TODO: FIXME: It should be thread-local or thread-safe.
611+
// It MUST be thread-safe, because it runs in multiple threads by design.
612612
extern SrsAsyncRecvManager* _srs_async_recv;
613613

614614
// The async UDP packet.
@@ -651,7 +651,7 @@ class SrsAsyncSendManager
651651
srs_error_t do_start();
652652
};
653653

654-
// TODO: FIXME: It should be thread-local or thread-safe.
654+
// It MUST be thread-safe, because it runs in multiple threads by design.
655655
extern SrsAsyncSendManager* _srs_async_send;
656656

657657
#endif

trunk/src/app/srs_app_utility.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ srs_error_t srs_kill_forced(int& pid)
221221
return err;
222222
}
223223

224+
// TODO: FIXME: It should be thread-local or thread-safe.
224225
static SrsRusage _srs_system_rusage;
225226

226227
SrsRusage::SrsRusage()
@@ -247,6 +248,7 @@ void srs_update_system_rusage()
247248
_srs_system_rusage.ok = true;
248249
}
249250

251+
// TODO: FIXME: It should be thread-local or thread-safe.
250252
static SrsProcSelfStat _srs_system_cpu_self_stat;
251253
static SrsProcSystemStat _srs_system_cpu_system_stat;
252254

@@ -523,6 +525,7 @@ SrsDiskStat::SrsDiskStat()
523525
wr_ticks = nb_current = ticks = aveq = 0;
524526
}
525527

528+
// TODO: FIXME: It should be thread-local or thread-safe.
526529
static SrsDiskStat _srs_disk_stat;
527530

528531
SrsDiskStat* srs_get_disk_stat()
@@ -717,6 +720,7 @@ SrsMemInfo::SrsMemInfo()
717720
SwapFree = 0;
718721
}
719722

723+
// TODO: FIXME: It should be thread-local or thread-safe.
720724
static SrsMemInfo _srs_system_meminfo;
721725

722726
SrsMemInfo* srs_get_meminfo()
@@ -809,6 +813,7 @@ SrsPlatformInfo::SrsPlatformInfo()
809813
load_fifteen_minutes = 0;
810814
}
811815

816+
// TODO: FIXME: It should be thread-local or thread-safe.
812817
static SrsPlatformInfo _srs_system_platform_info;
813818

814819
SrsPlatformInfo* srs_get_platform_info()
@@ -909,6 +914,7 @@ SrsSnmpUdpStat::~SrsSnmpUdpStat()
909914
{
910915
}
911916

917+
// TODO: FIXME: It should be thread-local or thread-safe.
912918
static SrsSnmpUdpStat _srs_snmp_udp_stat;
913919

914920
void srs_update_udp_snmp_statistic()
@@ -987,6 +993,7 @@ SrsNetworkDevices::SrsNetworkDevices()
987993
scompressed = 0;
988994
}
989995

996+
// TODO: FIXME: It should be thread-local or thread-safe.
990997
#define MAX_NETWORK_DEVICES_COUNT 16
991998
static SrsNetworkDevices _srs_system_network_devices[MAX_NETWORK_DEVICES_COUNT];
992999
static int _nb_srs_system_network_devices = -1;
@@ -1055,6 +1062,7 @@ SrsNetworkRtmpServer::SrsNetworkRtmpServer()
10551062
rkbps_5m = skbps_5m = 0;
10561063
}
10571064

1065+
// TODO: FIXME: It should be thread-local or thread-safe.
10581066
static SrsNetworkRtmpServer _srs_network_rtmp_server;
10591067

10601068
SrsNetworkRtmpServer* srs_get_network_rtmp_server()

trunk/src/kernel/srs_kernel_log.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ class ISrsContext
9797
virtual const SrsContextId& set_id(const SrsContextId& v) = 0;
9898
};
9999

100-
// TODO: FIXME: It should be thread-local or thread-safe.
100+
// It SHOULD be thread-safe, because it use async log and thread-local buffer.
101101
extern ISrsLog* _srs_log;
102102

103-
// TODO: FIXME: It should be thread-local or thread-safe.
103+
// It SHOULD be thread-safe, because it use thread-local thread private data.
104104
extern ISrsContext* _srs_context;
105105

106106
// Log style.

0 commit comments

Comments
 (0)