Skip to content

Commit e34b3d3

Browse files
committed
SmartPtr: Support shared ptr for live source.
1 parent 908c2f2 commit e34b3d3

12 files changed

+80
-78
lines changed

trunk/src/app/srs_app_coworkers.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ SrsRequest* SrsCoWorkers::find_stream_info(string vhost, string app, string stre
122122
return it->second;
123123
}
124124

125-
srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
125+
srs_error_t SrsCoWorkers::on_publish(SrsRequest* r)
126126
{
127127
srs_error_t err = srs_success;
128128

@@ -140,7 +140,7 @@ srs_error_t SrsCoWorkers::on_publish(SrsLiveSource* s, SrsRequest* r)
140140
return err;
141141
}
142142

143-
void SrsCoWorkers::on_unpublish(SrsLiveSource* s, SrsRequest* r)
143+
void SrsCoWorkers::on_unpublish(SrsRequest* r)
144144
{
145145
string url = r->get_stream_url();
146146

trunk/src/app/srs_app_coworkers.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ class SrsCoWorkers
3333
private:
3434
virtual SrsRequest* find_stream_info(std::string vhost, std::string app, std::string stream);
3535
public:
36-
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
37-
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
36+
virtual srs_error_t on_publish(SrsRequest* r);
37+
virtual void on_unpublish(SrsRequest* r);
3838
};
3939

4040
#endif

trunk/src/app/srs_app_http_conn.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -547,13 +547,13 @@ srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage
547547
return http_static->mux.serve_http(w, r);
548548
}
549549

550-
srs_error_t SrsHttpServer::http_mount(SrsLiveSource* s, SrsRequest* r)
550+
srs_error_t SrsHttpServer::http_mount(SrsRequest* r)
551551
{
552-
return http_stream->http_mount(s, r);
552+
return http_stream->http_mount(r);
553553
}
554554

555-
void SrsHttpServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
555+
void SrsHttpServer::http_unmount(SrsRequest* r)
556556
{
557-
http_stream->http_unmount(s, r);
557+
http_stream->http_unmount(r);
558558
}
559559

trunk/src/app/srs_app_http_conn.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ class SrsHttpServer : public ISrsHttpServeMux
187187
public:
188188
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
189189
public:
190-
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
191-
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
190+
virtual srs_error_t http_mount(SrsRequest* r);
191+
virtual void http_unmount(SrsRequest* r);
192192
};
193193

194194
#endif

trunk/src/app/srs_app_http_stream.cpp

+24-22
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ using namespace std;
4040
#include <srs_app_recv_thread.hpp>
4141
#include <srs_app_http_hooks.hpp>
4242

43-
SrsBufferCache::SrsBufferCache(SrsLiveSource* s, SrsRequest* r)
43+
SrsBufferCache::SrsBufferCache(SrsRequest* r)
4444
{
4545
req = r->copy()->as_http();
46-
source = s;
4746
queue = new SrsMessageQueue(true);
4847
trd = new SrsSTCoroutine("http-stream", this);
4948

@@ -59,12 +58,11 @@ SrsBufferCache::~SrsBufferCache()
5958
srs_freep(req);
6059
}
6160

62-
srs_error_t SrsBufferCache::update_auth(SrsLiveSource* s, SrsRequest* r)
61+
srs_error_t SrsBufferCache::update_auth(SrsRequest* r)
6362
{
6463
srs_freep(req);
6564
req = r->copy();
66-
source = s;
67-
65+
6866
return srs_success;
6967
}
7068

@@ -107,6 +105,11 @@ srs_error_t SrsBufferCache::cycle()
107105
srs_usleep(SRS_STREAM_CACHE_CYCLE);
108106
return err;
109107
}
108+
109+
SrsLiveSource* source = _srs_sources->fetch(req);
110+
if (!source) {
111+
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
112+
}
110113

111114
// the stream cache will create consumer to cache stream,
112115
// which will trigger to fetch stream from origin for edge.
@@ -553,9 +556,8 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
553556
return writer->writev(iov, iovcnt, pnwrite);
554557
}
555558

556-
SrsLiveStream::SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c)
559+
SrsLiveStream::SrsLiveStream(SrsRequest* r, SrsBufferCache* c)
557560
{
558-
source = s;
559561
cache = c;
560562
req = r->copy()->as_http();
561563
security_ = new SrsSecurity();
@@ -567,10 +569,8 @@ SrsLiveStream::~SrsLiveStream()
567569
srs_freep(security_);
568570
}
569571

570-
srs_error_t SrsLiveStream::update_auth(SrsLiveSource* s, SrsRequest* r)
572+
srs_error_t SrsLiveStream::update_auth(SrsRequest* r)
571573
{
572-
source = s;
573-
574574
srs_freep(req);
575575
req = r->copy()->as_http();
576576

@@ -660,6 +660,11 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
660660

661661
// Enter chunked mode, because we didn't set the content-length.
662662
w->write_header(SRS_CONSTS_HTTP_OK);
663+
664+
SrsLiveSource* source = _srs_sources->fetch(req);
665+
if (!source) {
666+
return srs_error_new(ERROR_NO_SOURCE, "no source for %s", req->get_stream_url().c_str());
667+
}
663668

664669
// create consumer of souce, ignore gop cache, use the audio gop cache.
665670
SrsLiveConsumer* consumer = NULL;
@@ -876,7 +881,6 @@ SrsLiveEntry::SrsLiveEntry(std::string m)
876881
cache = NULL;
877882

878883
req = NULL;
879-
source = NULL;
880884

881885
std::string ext = srs_path_filext(m);
882886
_is_flv = (ext == ".flv");
@@ -954,7 +958,7 @@ srs_error_t SrsHttpStreamServer::initialize()
954958
}
955959

956960
// TODO: FIXME: rename for HTTP FLV mount.
957-
srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
961+
srs_error_t SrsHttpStreamServer::http_mount(SrsRequest* r)
958962
{
959963
srs_error_t err = srs_success;
960964

@@ -982,10 +986,9 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
982986

983987
entry = new SrsLiveEntry(mount);
984988

985-
entry->source = s;
986989
entry->req = r->copy()->as_http();
987-
entry->cache = new SrsBufferCache(s, r);
988-
entry->stream = new SrsLiveStream(s, r, entry->cache);
990+
entry->cache = new SrsBufferCache(r);
991+
entry->stream = new SrsLiveStream(r, entry->cache);
989992

990993
// TODO: FIXME: maybe refine the logic of http remux service.
991994
// if user push streams followed:
@@ -994,8 +997,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
994997
// and they will using the same template, such as: [vhost]/[app]/[stream].flv
995998
// so, need to free last request object, otherwise, it will cause memory leak.
996999
srs_freep(tmpl->req);
997-
998-
tmpl->source = s;
1000+
9991001
tmpl->req = r->copy()->as_http();
10001002

10011003
sflvs[sid] = entry;
@@ -1015,8 +1017,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
10151017
} else {
10161018
// The entry exists, we reuse it and update the request of stream and cache.
10171019
entry = sflvs[sid];
1018-
entry->stream->update_auth(s, r);
1019-
entry->cache->update_auth(s, r);
1020+
entry->stream->update_auth(r);
1021+
entry->cache->update_auth(r);
10201022
}
10211023

10221024
if (entry->stream) {
@@ -1027,7 +1029,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
10271029
return err;
10281030
}
10291031

1030-
void SrsHttpStreamServer::http_unmount(SrsLiveSource* s, SrsRequest* r)
1032+
void SrsHttpStreamServer::http_unmount(SrsRequest* r)
10311033
{
10321034
std::string sid = r->get_stream_url();
10331035

@@ -1133,7 +1135,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
11331135
}
11341136
}
11351137
}
1136-
1138+
11371139
SrsLiveSource* s = NULL;
11381140
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
11391141
return srs_error_wrap(err, "source create");
@@ -1146,7 +1148,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
11461148
s->set_gop_cache_max_frames(gcmf);
11471149

11481150
// create http streaming handler.
1149-
if ((err = http_mount(s, r)) != srs_success) {
1151+
if ((err = http_mount(r)) != srs_success) {
11501152
return srs_error_wrap(err, "http mount");
11511153
}
11521154

trunk/src/app/srs_app_http_stream.hpp

+6-10
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ class SrsBufferCache : public ISrsCoroutineHandler
2323
srs_utime_t fast_cache;
2424
private:
2525
SrsMessageQueue* queue;
26-
SrsLiveSource* source;
2726
SrsRequest* req;
2827
SrsCoroutine* trd;
2928
public:
30-
SrsBufferCache(SrsLiveSource* s, SrsRequest* r);
29+
SrsBufferCache(SrsRequest* r);
3130
virtual ~SrsBufferCache();
32-
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
31+
virtual srs_error_t update_auth(SrsRequest* r);
3332
public:
3433
virtual srs_error_t start();
3534
virtual srs_error_t dump_cache(SrsLiveConsumer* consumer, SrsRtmpJitterAlgorithm jitter);
@@ -178,13 +177,12 @@ class SrsLiveStream : public ISrsHttpHandler
178177
{
179178
private:
180179
SrsRequest* req;
181-
SrsLiveSource* source;
182180
SrsBufferCache* cache;
183181
SrsSecurity* security_;
184182
public:
185-
SrsLiveStream(SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c);
183+
SrsLiveStream(SrsRequest* r, SrsBufferCache* c);
186184
virtual ~SrsLiveStream();
187-
virtual srs_error_t update_auth(SrsLiveSource* s, SrsRequest* r);
185+
virtual srs_error_t update_auth(SrsRequest* r);
188186
public:
189187
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
190188
private:
@@ -205,8 +203,6 @@ struct SrsLiveEntry
205203
public:
206204
// We will free the request.
207205
SrsRequest* req;
208-
// Shared source.
209-
SrsLiveSource* source;
210206
public:
211207
// For template, the mount contains variables.
212208
// For concrete stream, the mount is url to access.
@@ -244,8 +240,8 @@ class SrsHttpStreamServer : public ISrsReloadHandler
244240
virtual srs_error_t initialize();
245241
public:
246242
// HTTP flv/ts/mp3/aac stream
247-
virtual srs_error_t http_mount(SrsLiveSource* s, SrsRequest* r);
248-
virtual void http_unmount(SrsLiveSource* s, SrsRequest* r);
243+
virtual srs_error_t http_mount(SrsRequest* r);
244+
virtual void http_unmount(SrsRequest* r);
249245
// Interface ISrsHttpMatchHijacker
250246
public:
251247
virtual srs_error_t hijack(ISrsHttpMessage* request, ISrsHttpHandler** ph);

trunk/src/app/srs_app_server.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -1302,28 +1302,28 @@ srs_error_t SrsServer::on_reload_listen()
13021302
return err;
13031303
}
13041304

1305-
srs_error_t SrsServer::on_publish(SrsLiveSource* s, SrsRequest* r)
1305+
srs_error_t SrsServer::on_publish(SrsRequest* r)
13061306
{
13071307
srs_error_t err = srs_success;
13081308

1309-
if ((err = http_server->http_mount(s, r)) != srs_success) {
1309+
if ((err = http_server->http_mount(r)) != srs_success) {
13101310
return srs_error_wrap(err, "http mount");
13111311
}
13121312

13131313
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
1314-
if ((err = coworkers->on_publish(s, r)) != srs_success) {
1314+
if ((err = coworkers->on_publish(r)) != srs_success) {
13151315
return srs_error_wrap(err, "coworkers");
13161316
}
13171317

13181318
return err;
13191319
}
13201320

1321-
void SrsServer::on_unpublish(SrsLiveSource* s, SrsRequest* r)
1321+
void SrsServer::on_unpublish(SrsRequest* r)
13221322
{
1323-
http_server->http_unmount(s, r);
1323+
http_server->http_unmount(r);
13241324

13251325
SrsCoWorkers* coworkers = SrsCoWorkers::instance();
1326-
coworkers->on_unpublish(s, r);
1326+
coworkers->on_unpublish(r);
13271327
}
13281328

13291329
SrsServerAdapter::SrsServerAdapter()

trunk/src/app/srs_app_server.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ class SrsServer : public ISrsReloadHandler, public ISrsLiveSourceHandler, public
234234
virtual srs_error_t on_reload_listen();
235235
// Interface ISrsLiveSourceHandler
236236
public:
237-
virtual srs_error_t on_publish(SrsLiveSource* s, SrsRequest* r);
238-
virtual void on_unpublish(SrsLiveSource* s, SrsRequest* r);
237+
virtual srs_error_t on_publish(SrsRequest* r);
238+
virtual void on_unpublish(SrsRequest* r);
239239
};
240240

241241
// The SRS server adapter, the master server.

trunk/src/app/srs_app_source.cpp

+22-24
Original file line numberDiff line numberDiff line change
@@ -1765,11 +1765,15 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH
17651765

17661766
// Use lock to protect coroutine switch.
17671767
// @bug https://github.com/ossrs/srs/issues/1230
1768-
// TODO: FIXME: Use smaller lock.
1768+
// TODO: FIXME: Use smaller scope lock.
17691769
SrsLocker(lock);
1770-
1771-
SrsLiveSource* source = NULL;
1772-
if ((source = fetch(r)) != NULL) {
1770+
1771+
string stream_url = r->get_stream_url();
1772+
std::map<std::string, SrsLiveSource*>::iterator it = pool.find(stream_url);
1773+
1774+
if (it != pool.end()) {
1775+
SrsLiveSource* source = it->second;
1776+
17731777
// we always update the request of resource,
17741778
// for origin auth is on, the token in request maybe invalid,
17751779
// and we only need to update the token of request, it's simple.
@@ -1778,40 +1782,34 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH
17781782
return err;
17791783
}
17801784

1781-
string stream_url = r->get_stream_url();
1782-
string vhost = r->vhost;
1783-
1784-
// should always not exists for create a source.
1785-
srs_assert (pool.find(stream_url) == pool.end());
1786-
1785+
SrsLiveSource* source = new SrsLiveSource();
17871786
srs_trace("new live source, stream_url=%s", stream_url.c_str());
17881787

1789-
source = new SrsLiveSource();
17901788
if ((err = source->initialize(r, h)) != srs_success) {
1791-
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
1792-
goto failed;
1789+
srs_freep(source);
1790+
return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
17931791
}
17941792

17951793
pool[stream_url] = source;
17961794
*pps = source;
17971795
return err;
1798-
1799-
failed:
1800-
srs_freep(source);
1801-
return err;
18021796
}
18031797

18041798
SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r)
18051799
{
1806-
SrsLiveSource* source = NULL;
1800+
// Use lock to protect coroutine switch.
1801+
// @bug https://github.com/ossrs/srs/issues/1230
1802+
// TODO: FIXME: Use smaller scope lock.
1803+
SrsLocker(lock);
18071804

18081805
string stream_url = r->get_stream_url();
1809-
if (pool.find(stream_url) == pool.end()) {
1806+
std::map<std::string, SrsLiveSource*>::iterator it = pool.find(stream_url);
1807+
1808+
if (it == pool.end()) {
18101809
return NULL;
18111810
}
1812-
1813-
source = pool[stream_url];
1814-
1811+
1812+
SrsLiveSource* source = it->second;
18151813
return source;
18161814
}
18171815

@@ -2600,7 +2598,7 @@ srs_error_t SrsLiveSource::on_publish()
26002598

26012599
// notify the handler.
26022600
srs_assert(handler);
2603-
if ((err = handler->on_publish(this, req)) != srs_success) {
2601+
if ((err = handler->on_publish(req)) != srs_success) {
26042602
return srs_error_wrap(err, "handle publish");
26052603
}
26062604

@@ -2652,7 +2650,7 @@ void SrsLiveSource::on_unpublish()
26522650
SrsStatistic* stat = SrsStatistic::instance();
26532651
stat->on_stream_close(req);
26542652

2655-
handler->on_unpublish(this, req);
2653+
handler->on_unpublish(req);
26562654

26572655
if (bridge_) {
26582656
bridge_->on_unpublish();

0 commit comments

Comments
 (0)