Skip to content

Commit 1ae3e6c

Browse files
committed
performance refine, support 3k+ connections(270kbps). 0.9.130
1 parent e9c96af commit 1ae3e6c

18 files changed

+230
-162
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ Supported operating systems and hardware:
241241
* 2013-10-17, Created.<br/>
242242

243243
## History
244+
* v1.0, 2014-06-22, performance refine, support 3k+ connections(270kbps). 0.9.130
244245
* v1.0, 2014-06-21, support edge [token traverse](https://github.com/winlinvip/simple-rtmp-server/wiki/DRM#tokentraverse), fix [#104](https://github.com/winlinvip/simple-rtmp-server/issues/104). 0.9.129
245246
* v1.0, 2014-06-19, add connections count to api summaries. 0.9.127
246247
* v1.0, 2014-06-19, add srs bytes and kbps to api summaries. 0.9.126

trunk/conf/full.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ srs_log_level trace;
3636
srs_log_file ./objs/srs.log;
3737
# the max connections.
3838
# if exceed the max connections, server will drop the new connection.
39-
# default: 2000
39+
# default: 12345
4040
max_connections 1000;
4141
# whether start as deamon
4242
# @remark: donot support reload.

trunk/configure

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ MODULE_ID="RTMP"
456456
MODULE_DEPENDS=("CORE" "KERNEL")
457457
ModuleLibIncs=(${SRS_OBJS} ${LibSSLRoot})
458458
MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_protocol_rtmp_stack" "srs_protocol_rtmp"
459-
"srs_protocol_handshake" "srs_protocol_utility")
459+
"srs_protocol_handshake" "srs_protocol_utility" "srs_protocol_msg_array")
460460
RTMP_INCS="src/rtmp"; MODULE_DIR=${RTMP_INCS} . auto/modules.sh
461461
RTMP_OBJS="${MODULE_OBJS[@]}"
462462
#

trunk/src/app/srs_app_config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1403,7 +1403,7 @@ int SrsConfig::get_max_connections()
14031403

14041404
SrsConfDirective* conf = root->get("max_connections");
14051405
if (!conf || conf->arg0().empty()) {
1406-
return 2000;
1406+
return SRS_CONF_DEFAULT_MAX_CONNECTIONS;
14071407
}
14081408

14091409
return ::atoi(conf->arg0().c_str());

trunk/src/app/srs_app_config.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3838
#define SRS_CONF_DEFAULT_PID_FILE "./objs/srs.pid"
3939
#define SRS_DEFAULT_CONF "conf/srs.conf"
4040

41+
#define SRS_CONF_DEFAULT_MAX_CONNECTIONS 12345
4142
#define SRS_CONF_DEFAULT_HLS_PATH "./objs/nginx/html"
4243
#define SRS_CONF_DEFAULT_HLS_FRAGMENT 10
4344
#define SRS_CONF_DEFAULT_HLS_WINDOW 60

trunk/src/app/srs_app_edge.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
4444
#include <srs_app_socket.hpp>
4545
#include <srs_app_kbps.hpp>
4646
#include <srs_kernel_utility.hpp>
47+
#include <srs_protocol_msg_array.hpp>
4748

4849
// when error, edge ingester sleep for a while and retry.
4950
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(1*1000*1000LL)
@@ -431,13 +432,16 @@ void SrsEdgeForwarder::stop()
431432
kbps->set_io(NULL, NULL);
432433
}
433434

435+
#define SYS_MAX_EDGE_SEND_MSGS 128
434436
int SrsEdgeForwarder::cycle()
435437
{
436438
int ret = ERROR_SUCCESS;
437439

438440
client->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
439441

440442
SrsPithyPrint pithy_print(SRS_STAGE_EDGE);
443+
444+
SrsSharedPtrMessageArray msgs(SYS_MAX_EDGE_SEND_MSGS);
441445

442446
while (pthread->can_loop()) {
443447
// switch to other st-threads.
@@ -465,8 +469,7 @@ int SrsEdgeForwarder::cycle()
465469

466470
// forward all messages.
467471
int count = 0;
468-
SrsSharedPtrMessage** msgs = NULL;
469-
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
472+
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
470473
srs_error("get message to forward to origin failed. ret=%d", ret);
471474
return ret;
472475
}
@@ -488,16 +491,15 @@ int SrsEdgeForwarder::cycle()
488491
srs_verbose("no packets to forward.");
489492
continue;
490493
}
491-
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
492494

493495
// all msgs to forward to origin.
494496
// @remark, becareful, all msgs must be free explicitly,
495497
// free by send_and_free_message or srs_freep.
496498
for (int i = 0; i < count; i++) {
497-
SrsSharedPtrMessage* msg = msgs[i];
499+
SrsSharedPtrMessage* msg = msgs.msgs[i];
498500

499501
srs_assert(msg);
500-
msgs[i] = NULL;
502+
msgs.msgs[i] = NULL;
501503

502504
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
503505
srs_error("edge publish forwarder send message to server failed. ret=%d", ret);

trunk/src/app/srs_app_forward.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
4141
#include <srs_protocol_rtmp.hpp>
4242
#include <srs_app_kbps.hpp>
4343
#include <srs_kernel_utility.hpp>
44+
#include <srs_protocol_msg_array.hpp>
4445

4546
// when error, forwarder sleep for a while and retry.
4647
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
@@ -309,6 +310,7 @@ int SrsForwarder::connect_server()
309310
return ret;
310311
}
311312

313+
#define SYS_MAX_FORWARD_SEND_MSGS 128
312314
int SrsForwarder::forward()
313315
{
314316
int ret = ERROR_SUCCESS;
@@ -317,6 +319,8 @@ int SrsForwarder::forward()
317319

318320
SrsPithyPrint pithy_print(SRS_STAGE_FORWARDER);
319321

322+
SrsSharedPtrMessageArray msgs(SYS_MAX_FORWARD_SEND_MSGS);
323+
320324
while (pthread->can_loop()) {
321325
// switch to other st-threads.
322326
st_usleep(0);
@@ -339,8 +343,7 @@ int SrsForwarder::forward()
339343

340344
// forward all messages.
341345
int count = 0;
342-
SrsSharedPtrMessage** msgs = NULL;
343-
if ((ret = queue->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
346+
if ((ret = queue->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
344347
srs_error("get message to forward failed. ret=%d", ret);
345348
return ret;
346349
}
@@ -360,16 +363,15 @@ int SrsForwarder::forward()
360363
srs_verbose("no packets to forward.");
361364
continue;
362365
}
363-
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
364366

365367
// all msgs to forward.
366368
// @remark, becareful, all msgs must be free explicitly,
367369
// free by send_and_free_message or srs_freep.
368370
for (int i = 0; i < count; i++) {
369-
SrsSharedPtrMessage* msg = msgs[i];
371+
SrsSharedPtrMessage* msg = msgs.msgs[i];
370372

371373
srs_assert(msg);
372-
msgs[i] = NULL;
374+
msgs.msgs[i] = NULL;
373375

374376
if ((ret = client->send_and_free_message(msg, stream_id)) != ERROR_SUCCESS) {
375377
srs_error("forwarder send message to server failed. ret=%d", ret);

trunk/src/app/srs_app_rtmp_conn.cpp

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ using namespace std;
5050
#include <srs_app_utility.hpp>
5151
#include <srs_protocol_utility.hpp>
5252
#include <srs_kernel_utility.hpp>
53+
#include <srs_protocol_msg_array.hpp>
5354

5455
// when stream is busy, for example, streaming is already
5556
// publishing, when a new client to request to publish,
@@ -382,7 +383,7 @@ int SrsRtmpConn::stream_service_cycle()
382383
}
383384

384385
srs_info("start to publish stream %s success", req->stream.c_str());
385-
ret = fmle_publish(source);
386+
ret = fmle_publishing(source);
386387

387388
// when edge, notice edge to change state.
388389
// when origin, notice all service to unpublish.
@@ -416,7 +417,7 @@ int SrsRtmpConn::stream_service_cycle()
416417
}
417418

418419
srs_info("flash start to publish stream %s success", req->stream.c_str());
419-
ret = flash_publish(source);
420+
ret = flash_publishing(source);
420421

421422
// when edge, notice edge to change state.
422423
// when origin, notice all service to unpublish.
@@ -476,6 +477,8 @@ int SrsRtmpConn::check_vhost()
476477
return ret;
477478
}
478479

480+
#define SYS_MAX_PLAY_SEND_MSGS 128
481+
479482
int SrsRtmpConn::playing(SrsSource* source)
480483
{
481484
int ret = ERROR_SUCCESS;
@@ -499,38 +502,43 @@ int SrsRtmpConn::playing(SrsSource* source)
499502
rtmp->set_recv_timeout(SRS_PULSE_TIMEOUT_US);
500503

501504
SrsPithyPrint pithy_print(SRS_STAGE_PLAY_USER);
505+
506+
SrsSharedPtrMessageArray msgs(SYS_MAX_PLAY_SEND_MSGS);
502507

508+
bool user_specified_duration_to_stop = (req->duration > 0);
503509
int64_t starttime = -1;
510+
504511
while (true) {
505-
// switch to other st-threads.
506-
st_usleep(0);
507-
512+
// collect elapse for pithy print.
508513
pithy_print.elapse();
509514

510515
// read from client.
511516
if (true) {
512517
SrsMessage* msg = NULL;
513518
ret = rtmp->recv_message(&msg);
514-
515519
srs_verbose("play loop recv message. ret=%d", ret);
516-
if (ret != ERROR_SUCCESS && ret != ERROR_SOCKET_TIMEOUT) {
517-
if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
520+
521+
if (ret == ERROR_SOCKET_TIMEOUT) {
522+
// it's ok, do nothing.
523+
ret = ERROR_SUCCESS;
524+
} else if (ret != ERROR_SUCCESS) {
525+
if (!srs_is_client_gracefully_close(ret)) {
518526
srs_error("recv client control message failed. ret=%d", ret);
519527
}
520528
return ret;
521-
}
522-
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
523-
if (!srs_is_system_control_error(ret)) {
524-
srs_error("process play control message failed. ret=%d", ret);
529+
} else {
530+
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
531+
if (!srs_is_system_control_error(ret)) {
532+
srs_error("process play control message failed. ret=%d", ret);
533+
}
534+
return ret;
525535
}
526-
return ret;
527536
}
528537
}
529538

530539
// get messages from consumer.
531-
SrsSharedPtrMessage** msgs = NULL;
532540
int count = 0;
533-
if ((ret = consumer->get_packets(0, msgs, count)) != ERROR_SUCCESS) {
541+
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
534542
srs_error("get messages from consumer failed. ret=%d", ret);
535543
return ret;
536544
}
@@ -545,32 +553,29 @@ int SrsRtmpConn::playing(SrsSource* source)
545553
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
546554
}
547555

548-
if (count <= 0) {
549-
srs_verbose("no packets in queue.");
550-
continue;
551-
}
552-
SrsAutoFreeArray(SrsSharedPtrMessage, msgs, count);
553-
554556
// sendout messages
555557
// @remark, becareful, all msgs must be free explicitly,
556558
// free by send_and_free_message or srs_freep.
557559
for (int i = 0; i < count; i++) {
558-
SrsSharedPtrMessage* msg = msgs[i];
560+
SrsSharedPtrMessage* msg = msgs.msgs[i];
559561

560562
// the send_message will free the msg,
561563
// so set the msgs[i] to NULL.
562-
msgs[i] = NULL;
563-
564-
srs_assert(msg);
564+
msgs.msgs[i] = NULL;
565565

566-
// foreach msg, collect the duration.
567-
// @remark: never use msg when sent it, for the protocol sdk will free it.
568-
if (starttime < 0 || starttime > msg->header.timestamp) {
566+
// only when user specifies the duration,
567+
// we start to collect the durations for each message.
568+
if (user_specified_duration_to_stop) {
569+
// foreach msg, collect the duration.
570+
// @remark: never use msg when sent it, for the protocol sdk will free it.
571+
if (starttime < 0 || starttime > msg->header.timestamp) {
572+
starttime = msg->header.timestamp;
573+
}
574+
duration += msg->header.timestamp - starttime;
569575
starttime = msg->header.timestamp;
570576
}
571-
duration += msg->header.timestamp - starttime;
572-
starttime = msg->header.timestamp;
573577

578+
// no need to assert msg, for the rtmp will assert it.
574579
if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
575580
srs_error("send message to client failed. ret=%d", ret);
576581
return ret;
@@ -579,17 +584,22 @@ int SrsRtmpConn::playing(SrsSource* source)
579584

580585
// if duration specified, and exceed it, stop play live.
581586
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/45
582-
if (req->duration > 0 && duration >= (int64_t)req->duration) {
583-
ret = ERROR_RTMP_DURATION_EXCEED;
584-
srs_trace("stop live for duration exceed. ret=%d", ret);
585-
return ret;
587+
if (user_specified_duration_to_stop) {
588+
if (duration >= (int64_t)req->duration) {
589+
ret = ERROR_RTMP_DURATION_EXCEED;
590+
srs_trace("stop live for duration exceed. ret=%d", ret);
591+
return ret;
592+
}
586593
}
594+
595+
// switch to other threads, to anti dead loop.
596+
st_usleep(0);
587597
}
588598

589599
return ret;
590600
}
591601

592-
int SrsRtmpConn::fmle_publish(SrsSource* source)
602+
int SrsRtmpConn::fmle_publishing(SrsSource* source)
593603
{
594604
int ret = ERROR_SUCCESS;
595605

@@ -668,7 +678,7 @@ int SrsRtmpConn::fmle_publish(SrsSource* source)
668678
return ret;
669679
}
670680

671-
int SrsRtmpConn::flash_publish(SrsSource* source)
681+
int SrsRtmpConn::flash_publishing(SrsSource* source)
672682
{
673683
int ret = ERROR_SUCCESS;
674684

trunk/src/app/srs_app_rtmp_conn.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class SrsHttpHooks;
4848
class SrsBandwidth;
4949
class SrsKbps;
5050
class SrsRtmpClient;
51+
class SrsSharedPtrMessage;
5152

5253
/**
5354
* the client provides the main logic control for RTMP clients.
@@ -87,8 +88,8 @@ class SrsRtmpConn : public virtual SrsConnection, public virtual ISrsReloadHandl
8788
virtual int stream_service_cycle();
8889
virtual int check_vhost();
8990
virtual int playing(SrsSource* source);
90-
virtual int fmle_publish(SrsSource* source);
91-
virtual int flash_publish(SrsSource* source);
91+
virtual int fmle_publishing(SrsSource* source);
92+
virtual int flash_publishing(SrsSource* source);
9293
virtual int process_publish_message(SrsSource* source, SrsMessage* msg, bool vhost_is_edge);
9394
virtual int process_play_control_msg(SrsConsumer* consumer, SrsMessage* msg);
9495
private:

0 commit comments

Comments
 (0)