Skip to content

Commit 88f9224

Browse files
committed
SmartPtr: Use shared ptr for live source.
1 parent d38af02 commit 88f9224

15 files changed

+101
-95
lines changed

trunk/src/app/srs_app_edge.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -415,9 +415,11 @@ SrsEdgeIngester::~SrsEdgeIngester()
415415
srs_freep(trd);
416416
}
417417

418-
srs_error_t SrsEdgeIngester::initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r)
418+
srs_error_t SrsEdgeIngester::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r)
419419
{
420-
source_ = s;
420+
// Because source references to this object, so we should directly use the source ptr.
421+
source_ = s.get();
422+
421423
edge = e;
422424
req = r;
423425

@@ -747,9 +749,11 @@ void SrsEdgeForwarder::set_queue_size(srs_utime_t queue_size)
747749
return queue->set_queue_size(queue_size);
748750
}
749751

750-
srs_error_t SrsEdgeForwarder::initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r)
752+
srs_error_t SrsEdgeForwarder::initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r)
751753
{
752-
source_ = s;
754+
// Because source references to this object, so we should directly use the source ptr.
755+
source_ = s.get();
756+
753757
edge = e;
754758
req = r;
755759

@@ -956,7 +960,7 @@ SrsPlayEdge::~SrsPlayEdge()
956960
srs_freep(ingester);
957961
}
958962

959-
srs_error_t SrsPlayEdge::initialize(SrsLiveSource* source, SrsRequest* req)
963+
srs_error_t SrsPlayEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
960964
{
961965
srs_error_t err = srs_success;
962966

@@ -1048,7 +1052,7 @@ void SrsPublishEdge::set_queue_size(srs_utime_t queue_size)
10481052
return forwarder->set_queue_size(queue_size);
10491053
}
10501054

1051-
srs_error_t SrsPublishEdge::initialize(SrsLiveSource* source, SrsRequest* req)
1055+
srs_error_t SrsPublishEdge::initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req)
10521056
{
10531057
srs_error_t err = srs_success;
10541058

trunk/src/app/srs_app_edge.hpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <srs_core.hpp>
1111

1212
#include <srs_app_st.hpp>
13+
#include <srs_core_autofree.hpp>
1314

1415
#include <string>
1516

@@ -152,7 +153,7 @@ class SrsEdgeIngester : public ISrsCoroutineHandler
152153
SrsEdgeIngester();
153154
virtual ~SrsEdgeIngester();
154155
public:
155-
virtual srs_error_t initialize(SrsLiveSource* s, SrsPlayEdge* e, SrsRequest* r);
156+
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPlayEdge* e, SrsRequest* r);
156157
virtual srs_error_t start();
157158
virtual void stop();
158159
virtual std::string get_curr_origin();
@@ -195,7 +196,7 @@ class SrsEdgeForwarder : public ISrsCoroutineHandler
195196
public:
196197
virtual void set_queue_size(srs_utime_t queue_size);
197198
public:
198-
virtual srs_error_t initialize(SrsLiveSource* s, SrsPublishEdge* e, SrsRequest* r);
199+
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> s, SrsPublishEdge* e, SrsRequest* r);
199200
virtual srs_error_t start();
200201
virtual void stop();
201202
// Interface ISrsReusableThread2Handler
@@ -220,7 +221,7 @@ class SrsPlayEdge
220221
// Always use the req of source,
221222
// For we assume all client to edge is invalid,
222223
// if auth open, edge must valid it from origin, then service it.
223-
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
224+
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
224225
// When client play stream on edge.
225226
virtual srs_error_t on_client_play();
226227
// When all client stopped play, disconnect to origin.
@@ -243,7 +244,7 @@ class SrsPublishEdge
243244
public:
244245
virtual void set_queue_size(srs_utime_t queue_size);
245246
public:
246-
virtual srs_error_t initialize(SrsLiveSource* source, SrsRequest* req);
247+
virtual srs_error_t initialize(SrsSharedPtr<SrsLiveSource> source, SrsRequest* req);
247248
virtual bool can_publish();
248249
// When client publish stream on edge.
249250
virtual srs_error_t on_client_publish();

trunk/src/app/srs_app_http_stream.cpp

+7-7
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ srs_error_t SrsBufferCache::cycle()
106106
return err;
107107
}
108108

109-
SrsLiveSource* live_source = _srs_sources->fetch(req);
110-
if (!live_source) {
109+
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
110+
if (!live_source.get()) {
111111
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
112112
}
113113

@@ -661,8 +661,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
661661
// Enter chunked mode, because we didn't set the content-length.
662662
w->write_header(SRS_CONSTS_HTTP_OK);
663663

664-
SrsLiveSource* live_source = _srs_sources->fetch(req);
665-
if (!live_source) {
664+
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(req);
665+
if (!live_source.get()) {
666666
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
667667
}
668668

@@ -1136,11 +1136,11 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
11361136
}
11371137
}
11381138

1139-
SrsLiveSource* live_source = NULL;
1140-
if ((err = _srs_sources->fetch_or_create(r, server, &live_source)) != srs_success) {
1139+
SrsSharedPtr<SrsLiveSource> live_source;
1140+
if ((err = _srs_sources->fetch_or_create(r, server, live_source)) != srs_success) {
11411141
return srs_error_wrap(err, "source create");
11421142
}
1143-
srs_assert(live_source != NULL);
1143+
srs_assert(live_source.get() != NULL);
11441144

11451145
bool enabled_cache = _srs_config->get_gop_cache(r->vhost);
11461146
int gcmf = _srs_config->get_gop_cache_max_frames(r->vhost);

trunk/src/app/srs_app_recv_thread.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ void SrsQueueRecvThread::on_stop()
259259
}
260260

261261
SrsPublishRecvThread::SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
262-
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid)
262+
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid)
263263
: trd(this, rtmp_sdk, tm, parent_cid)
264264
{
265265
rtmp = rtmp_sdk;

trunk/src/app/srs_app_recv_thread.hpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <srs_protocol_stream.hpp>
1717
#include <srs_core_performance.hpp>
1818
#include <srs_app_reload.hpp>
19+
#include <srs_core_autofree.hpp>
1920

2021
class SrsRtmpServer;
2122
class SrsCommonMessage;
@@ -146,15 +147,15 @@ class SrsPublishRecvThread : public ISrsMessagePumper, public ISrsReloadHandler
146147
srs_error_t recv_error;
147148
SrsRtmpConn* _conn;
148149
// The params for conn callback.
149-
SrsLiveSource* source_;
150+
SrsSharedPtr<SrsLiveSource> source_;
150151
// The error timeout cond
151152
srs_cond_t error;
152153
// The merged context id.
153154
SrsContextId cid;
154155
SrsContextId ncid;
155156
public:
156157
SrsPublishRecvThread(SrsRtmpServer* rtmp_sdk, SrsRequest* _req,
157-
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsLiveSource* source, SrsContextId parent_cid);
158+
int mr_sock_fd, srs_utime_t tm, SrsRtmpConn* conn, SrsSharedPtr<SrsLiveSource> source, SrsContextId parent_cid);
158159
virtual ~SrsPublishRecvThread();
159160
public:
160161
// Wait for error for some timeout.

trunk/src/app/srs_app_rtc_api.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa
224224

225225
// For RTMP to RTC, fail if disabled and RTMP is active, see https://github.com/ossrs/srs/issues/2728
226226
if (!is_rtc_stream_active && !_srs_config->get_rtc_from_rtmp(ruc->req_->vhost)) {
227-
SrsLiveSource* live_source = _srs_sources->fetch(ruc->req_);
228-
if (live_source && !live_source->inactive()) {
227+
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(ruc->req_);
228+
if (live_source.get() && !live_source->inactive()) {
229229
return srs_error_new(ERROR_RTC_DISABLED, "Disabled rtmp_to_rtc of %s, see #2728", ruc->req_->vhost.c_str());
230230
}
231231
}

trunk/src/app/srs_app_rtc_conn.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1202,8 +1202,8 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
12021202
source_->set_publish_stream(this);
12031203

12041204
// TODO: FIMXE: Check it in SrsRtcConnection::add_publisher?
1205-
SrsLiveSource* live_source = _srs_sources->fetch(r);
1206-
if (live_source && !live_source->can_publish(false)) {
1205+
SrsSharedPtr<SrsLiveSource> live_source = _srs_sources->fetch(r);
1206+
if (live_source.get() && !live_source->can_publish(false)) {
12071207
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp stream %s busy", r->get_stream_url().c_str());
12081208
}
12091209

@@ -1227,7 +1227,7 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
12271227
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
12281228
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
12291229
if (rtc_to_rtmp) {
1230-
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &live_source)) != srs_success) {
1230+
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), live_source)) != srs_success) {
12311231
return srs_error_wrap(err, "create source");
12321232
}
12331233

trunk/src/app/srs_app_rtmp_conn.cpp

+11-11
Original file line numberDiff line numberDiff line change
@@ -571,11 +571,11 @@ srs_error_t SrsRtmpConn::stream_service_cycle()
571571
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);
572572

573573
// find a source to serve.
574-
SrsLiveSource* live_source = NULL;
575-
if ((err = _srs_sources->fetch_or_create(req, server, &live_source)) != srs_success) {
574+
SrsSharedPtr<SrsLiveSource> live_source;
575+
if ((err = _srs_sources->fetch_or_create(req, server, live_source)) != srs_success) {
576576
return srs_error_wrap(err, "rtmp: fetch source");
577577
}
578-
srs_assert(live_source != NULL);
578+
srs_assert(live_source.get() != NULL);
579579

580580
bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
581581
int gcmf = _srs_config->get_gop_cache_max_frames(req->vhost);
@@ -699,7 +699,7 @@ srs_error_t SrsRtmpConn::check_vhost(bool try_default_vhost)
699699
return err;
700700
}
701701

702-
srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
702+
srs_error_t SrsRtmpConn::playing(SrsSharedPtr<SrsLiveSource> source)
703703
{
704704
srs_error_t err = srs_success;
705705

@@ -786,7 +786,7 @@ srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
786786
return err;
787787
}
788788

789-
srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
789+
srs_error_t SrsRtmpConn::do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
790790
{
791791
srs_error_t err = srs_success;
792792

@@ -923,7 +923,7 @@ srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* cons
923923
return err;
924924
}
925925

926-
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
926+
srs_error_t SrsRtmpConn::publishing(SrsSharedPtr<SrsLiveSource> source)
927927
{
928928
srs_error_t err = srs_success;
929929

@@ -969,7 +969,7 @@ srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
969969
return err;
970970
}
971971

972-
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
972+
srs_error_t SrsRtmpConn::do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* rtrd)
973973
{
974974
srs_error_t err = srs_success;
975975

@@ -1073,7 +1073,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
10731073
return err;
10741074
}
10751075

1076-
srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
1076+
srs_error_t SrsRtmpConn::acquire_publish(SrsSharedPtr<SrsLiveSource> source)
10771077
{
10781078
srs_error_t err = srs_success;
10791079

@@ -1141,7 +1141,7 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source)
11411141
return err;
11421142
}
11431143

1144-
void SrsRtmpConn::release_publish(SrsLiveSource* source)
1144+
void SrsRtmpConn::release_publish(SrsSharedPtr<SrsLiveSource> source)
11451145
{
11461146
// when edge, notice edge to change state.
11471147
// when origin, notice all service to unpublish.
@@ -1152,7 +1152,7 @@ void SrsRtmpConn::release_publish(SrsLiveSource* source)
11521152
}
11531153
}
11541154

1155-
srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
1155+
srs_error_t SrsRtmpConn::handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
11561156
{
11571157
srs_error_t err = srs_success;
11581158

@@ -1193,7 +1193,7 @@ srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommon
11931193
return err;
11941194
}
11951195

1196-
srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
1196+
srs_error_t SrsRtmpConn::process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg)
11971197
{
11981198
srs_error_t err = srs_success;
11991199

trunk/src/app/srs_app_rtmp_conn.hpp

+9-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <srs_app_reload.hpp>
1717
#include <srs_protocol_rtmp_stack.hpp>
1818
#include <srs_protocol_rtmp_conn.hpp>
19+
#include <srs_core_autofree.hpp>
1920

2021
class SrsServer;
2122
class SrsRtmpServer;
@@ -145,14 +146,14 @@ class SrsRtmpConn : public ISrsConnection, public ISrsStartable, public ISrsRelo
145146
// The stream(play/publish) service cycle, identify client first.
146147
virtual srs_error_t stream_service_cycle();
147148
virtual srs_error_t check_vhost(bool try_default_vhost);
148-
virtual srs_error_t playing(SrsLiveSource* source);
149-
virtual srs_error_t do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
150-
virtual srs_error_t publishing(SrsLiveSource* source);
151-
virtual srs_error_t do_publishing(SrsLiveSource* source, SrsPublishRecvThread* trd);
152-
virtual srs_error_t acquire_publish(SrsLiveSource* source);
153-
virtual void release_publish(SrsLiveSource* source);
154-
virtual srs_error_t handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
155-
virtual srs_error_t process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg);
149+
virtual srs_error_t playing(SrsSharedPtr<SrsLiveSource> source);
150+
virtual srs_error_t do_playing(SrsSharedPtr<SrsLiveSource> source, SrsLiveConsumer* consumer, SrsQueueRecvThread* trd);
151+
virtual srs_error_t publishing(SrsSharedPtr<SrsLiveSource> source);
152+
virtual srs_error_t do_publishing(SrsSharedPtr<SrsLiveSource> source, SrsPublishRecvThread* trd);
153+
virtual srs_error_t acquire_publish(SrsSharedPtr<SrsLiveSource> source);
154+
virtual void release_publish(SrsSharedPtr<SrsLiveSource> source);
155+
virtual srs_error_t handle_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
156+
virtual srs_error_t process_publish_message(SrsSharedPtr<SrsLiveSource>& source, SrsCommonMessage* msg);
156157
virtual srs_error_t process_play_control_msg(SrsLiveConsumer* consumer, SrsCommonMessage* msg);
157158
virtual void set_sock_options();
158159
private:

0 commit comments

Comments
 (0)