Skip to content

Commit 97f6684

Browse files
committed
Threads-Hybrid: Schedule connection to the sample hybrid by url
1 parent eab4f89 commit 97f6684

9 files changed

+51
-17
lines changed

trunk/src/app/srs_app_hybrid.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,9 @@ srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadP
437437
// TODO: FIXME: Response timeout if error?
438438
// TODO: FIXME: Response a different message? With trace ID?
439439
// We're responder, write response to responder.
440-
srs_error_t r0 = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
441-
if (r0 != srs_success) {
442-
srs_freep(r0); // Ignore any error.
440+
err = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
441+
if (err != srs_success) {
442+
return srs_error_wrap(err, "response");
443443
}
444444
}
445445

trunk/src/app/srs_app_rtc_api.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_
412412

413413
uint32_t SrsGoApiRtcPublish::ssrc_num = 0;
414414

415-
SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsRtcServer* server)
415+
SrsGoApiRtcPublish::SrsGoApiRtcPublish(ISrsRtcServer* server)
416416
{
417417
server_ = server;
418418
}

trunk/src/app/srs_app_rtc_api.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class SrsGoApiRtcPublish : public ISrsHttpHandler
7070
public:
7171
static uint32_t ssrc_num;
7272
private:
73-
SrsRtcServer* server_;
73+
ISrsRtcServer* server_;
7474
public:
75-
SrsGoApiRtcPublish(SrsRtcServer* server);
75+
SrsGoApiRtcPublish(ISrsRtcServer* server);
7676
virtual ~SrsGoApiRtcPublish();
7777
public:
7878
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);

trunk/src/app/srs_app_rtc_server.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ srs_error_t SrsRtcServer::listen_udp()
365365
return err;
366366
}
367367

368-
int port = _srs_config->get_rtc_server_listen();
368+
int port = _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index());
369369
if (port <= 0) {
370370
return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);
371371
}
@@ -571,15 +571,15 @@ srs_error_t SrsRtcServer::do_create_session(
571571
// We allows to mock the eip of server.
572572
if (!mock_eip.empty()) {
573573
string host;
574-
int port = _srs_config->get_rtc_server_listen();
574+
int port = _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index());
575575
srs_parse_hostport(mock_eip, host, port);
576576

577577
local_sdp.add_candidate(host, port, "host");
578578
srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port);
579579
} else {
580580
std::vector<string> candidate_ips = get_candidate_ips();
581581
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
582-
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(), "host");
582+
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index()), "host");
583583
}
584584
srs_trace("RTC: Use candidates %s", srs_join_vector_string(candidate_ips, ", ").c_str());
585585
}

trunk/src/app/srs_app_server.cpp

+22-5
Original file line numberDiff line numberDiff line change
@@ -1975,14 +1975,13 @@ srs_error_t SrsApiServer::listen_api()
19751975
{
19761976
srs_error_t err = srs_success;
19771977

1978-
// TODO: FIXME: Implements it.
19791978
if ((err = http_api_mux_->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
19801979
return srs_error_wrap(err, "handle play");
19811980
}
19821981

1983-
//if ((err = http_api_mux_->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
1984-
// return srs_error_wrap(err, "handle publish");
1985-
//}
1982+
if ((err = http_api_mux_->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
1983+
return srs_error_wrap(err, "handle publish");
1984+
}
19861985

19871986
#ifdef SRS_SIMULATOR
19881987
// TODO: FIXME: Implements it.
@@ -2001,9 +2000,22 @@ srs_error_t SrsApiServer::create_session(
20012000
) {
20022001
srs_error_t err = srs_success;
20032002

2003+
// Serve all connections of a stream, which identified by url, by the same hybrid thread.
2004+
string url = req->get_stream_url();
2005+
SrsThreadEntry* hybrid = NULL;
2006+
if (true) {
2007+
map<string, SrsThreadEntry*>::iterator it = hybrids_.find(url);
2008+
if (it == hybrids_.end()) {
2009+
static int index = 0;
2010+
vector<SrsThreadEntry*> hybrids = _srs_thread_pool->hybrids();
2011+
hybrids_[url] = hybrid = hybrids[(index++) % (int)hybrids.size()];
2012+
} else {
2013+
hybrid = it->second;
2014+
}
2015+
}
2016+
20042017
// Allocate slot to communicate with hybrid thread.
20052018
SrsThreadEntry* self = _srs_thread_pool->self();
2006-
SrsThreadEntry* hybrid = _srs_thread_pool->hybrid();
20072019
srs_assert(self && hybrid);
20082020

20092021
SrsThreadPipeChannel* channel = NULL;
@@ -2055,6 +2067,11 @@ srs_error_t SrsApiServer::create_session(
20552067
// TODO: FIMXE: Should never return it, for it's not thread-safe.
20562068
*psession = s.session;
20572069

2070+
// TODO: FIXME: Shoule return detail error by channel.
2071+
if (!s.session) {
2072+
return srs_error_new(ERROR_PIPE_READ, "no session");
2073+
}
2074+
20582075
return err;
20592076
}
20602077

trunk/src/app/srs_app_server.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include <vector>
3030
#include <string>
31+
#include <map>
3132

3233
#include <srs_app_st.hpp>
3334
#include <srs_app_reload.hpp>
@@ -423,6 +424,9 @@ class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager, publi
423424
SrsBufferListener* https_;
424425
SrsHttpServeMux* http_api_mux_;
425426
SrsResourceManager* conn_manager_;
427+
private:
428+
// Key is stream url, value is hybrid thread entry.
429+
std::map<std::string, SrsThreadEntry*> hybrids_;
426430
public:
427431
SrsApiServer();
428432
virtual ~SrsApiServer();

trunk/src/app/srs_app_threads.cpp

+13-3
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,16 @@ srs_error_t SrsThreadPipeChannel::cycle()
279279
// Here we're responder, read from initiator.
280280
SrsThreadMessage m;
281281
if ((err = initiator_->read(&m, sizeof(m), NULL)) != srs_success) {
282-
return srs_error_wrap(err, "read");
282+
srs_warn("read err %s", srs_error_desc(err).c_str());
283+
srs_freep(err); // Ignore any error.
284+
continue;
283285
}
284286

285287
// Consume the message, the responder can write response to responder.
286288
if (handler_ && (err = handler_->on_thread_message(&m, this)) != srs_success) {
287-
return srs_error_wrap(err, "consume");
289+
srs_warn("consume err %s", srs_error_desc(err).c_str());
290+
srs_freep(err); // Ignore any error.
291+
continue;
288292
}
289293
}
290294

@@ -604,8 +608,9 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg)
604608
SrsThreadEntry* entry = new SrsThreadEntry();
605609

606610
// Update the hybrid thread entry for circuit breaker.
607-
if (label == "hybrid" && !hybrid_) {
611+
if (label == "hybrid") {
608612
hybrid_ = entry;
613+
hybrids_.push_back(entry);
609614
}
610615

611616
// To protect the threads_ for executing thread-safe.
@@ -808,6 +813,11 @@ SrsThreadEntry* SrsThreadPool::hybrid()
808813
return hybrid_;
809814
}
810815

816+
vector<SrsThreadEntry*> SrsThreadPool::hybrids()
817+
{
818+
return hybrids_;
819+
}
820+
811821
void* SrsThreadPool::start(void* arg)
812822
{
813823
srs_error_t err = srs_success;

trunk/src/app/srs_app_threads.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ class SrsThreadPool
324324
private:
325325
// The hybrid server entry, the cpu percent used for circuit breaker.
326326
SrsThreadEntry* hybrid_;
327+
std::vector<SrsThreadEntry*> hybrids_;
327328
// Reset the water-level when CPU is low for N times.
328329
// @note To avoid the CPU change rapidly.
329330
int hybrid_high_water_level_;
@@ -358,6 +359,7 @@ class SrsThreadPool
358359
public:
359360
SrsThreadEntry* self();
360361
SrsThreadEntry* hybrid();
362+
std::vector<SrsThreadEntry*> hybrids();
361363
private:
362364
static void* start(void* arg);
363365
};

trunk/src/main/srs_main_server.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ srs_error_t run_hybrid_server(void* arg)
550550
// The config index for hybrid/stream server.
551551
int stream_index = (int)(uint64_t)arg;
552552
_srs_hybrid->set_stream_index(stream_index);
553+
srs_assert(_srs_hybrid->stream_index() >= 0);
553554

554555
// Create servers and register them.
555556
_srs_hybrid->register_server(new SrsServerAdapter());

0 commit comments

Comments
 (0)