@@ -40,10 +40,9 @@ using namespace std;
40
40
#include < srs_app_recv_thread.hpp>
41
41
#include < srs_app_http_hooks.hpp>
42
42
43
- SrsBufferCache::SrsBufferCache (SrsLiveSource* s, SrsRequest* r)
43
+ SrsBufferCache::SrsBufferCache (SrsRequest* r)
44
44
{
45
45
req = r->copy ()->as_http ();
46
- source = s;
47
46
queue = new SrsMessageQueue (true );
48
47
trd = new SrsSTCoroutine (" http-stream" , this );
49
48
@@ -59,12 +58,11 @@ SrsBufferCache::~SrsBufferCache()
59
58
srs_freep (req);
60
59
}
61
60
62
- srs_error_t SrsBufferCache::update_auth (SrsLiveSource* s, SrsRequest* r)
61
+ srs_error_t SrsBufferCache::update_auth (SrsRequest* r)
63
62
{
64
63
srs_freep (req);
65
64
req = r->copy ();
66
- source = s;
67
-
65
+
68
66
return srs_success;
69
67
}
70
68
@@ -107,6 +105,11 @@ srs_error_t SrsBufferCache::cycle()
107
105
srs_usleep (SRS_STREAM_CACHE_CYCLE);
108
106
return err;
109
107
}
108
+
109
+ SrsLiveSource* source = _srs_sources->fetch (req);
110
+ if (!source) {
111
+ return srs_error_new (ERROR_NO_SOURCE, " no source for %s" , req->get_stream_url ().c_str ());
112
+ }
110
113
111
114
// the stream cache will create consumer to cache stream,
112
115
// which will trigger to fetch stream from origin for edge.
@@ -553,9 +556,8 @@ srs_error_t SrsBufferWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwri
553
556
return writer->writev (iov, iovcnt, pnwrite);
554
557
}
555
558
556
- SrsLiveStream::SrsLiveStream (SrsLiveSource* s, SrsRequest* r, SrsBufferCache* c)
559
+ SrsLiveStream::SrsLiveStream (SrsRequest* r, SrsBufferCache* c)
557
560
{
558
- source = s;
559
561
cache = c;
560
562
req = r->copy ()->as_http ();
561
563
security_ = new SrsSecurity ();
@@ -567,10 +569,8 @@ SrsLiveStream::~SrsLiveStream()
567
569
srs_freep (security_);
568
570
}
569
571
570
- srs_error_t SrsLiveStream::update_auth (SrsLiveSource* s, SrsRequest* r)
572
+ srs_error_t SrsLiveStream::update_auth (SrsRequest* r)
571
573
{
572
- source = s;
573
-
574
574
srs_freep (req);
575
575
req = r->copy ()->as_http ();
576
576
@@ -660,6 +660,11 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess
660
660
661
661
// Enter chunked mode, because we didn't set the content-length.
662
662
w->write_header (SRS_CONSTS_HTTP_OK);
663
+
664
+ SrsLiveSource* source = _srs_sources->fetch (req);
665
+ if (!source) {
666
+ return srs_error_new (ERROR_NO_SOURCE, " no source for %s" , req->get_stream_url ().c_str ());
667
+ }
663
668
664
669
// create consumer of souce, ignore gop cache, use the audio gop cache.
665
670
SrsLiveConsumer* consumer = NULL ;
@@ -876,7 +881,6 @@ SrsLiveEntry::SrsLiveEntry(std::string m)
876
881
cache = NULL ;
877
882
878
883
req = NULL ;
879
- source = NULL ;
880
884
881
885
std::string ext = srs_path_filext (m);
882
886
_is_flv = (ext == " .flv" );
@@ -954,7 +958,7 @@ srs_error_t SrsHttpStreamServer::initialize()
954
958
}
955
959
956
960
// TODO: FIXME: rename for HTTP FLV mount.
957
- srs_error_t SrsHttpStreamServer::http_mount (SrsLiveSource* s, SrsRequest* r)
961
+ srs_error_t SrsHttpStreamServer::http_mount (SrsRequest* r)
958
962
{
959
963
srs_error_t err = srs_success;
960
964
@@ -982,10 +986,9 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
982
986
983
987
entry = new SrsLiveEntry (mount);
984
988
985
- entry->source = s;
986
989
entry->req = r->copy ()->as_http ();
987
- entry->cache = new SrsBufferCache (s, r);
988
- entry->stream = new SrsLiveStream (s, r, entry->cache );
990
+ entry->cache = new SrsBufferCache (r);
991
+ entry->stream = new SrsLiveStream (r, entry->cache );
989
992
990
993
// TODO: FIXME: maybe refine the logic of http remux service.
991
994
// if user push streams followed:
@@ -994,8 +997,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
994
997
// and they will using the same template, such as: [vhost]/[app]/[stream].flv
995
998
// so, need to free last request object, otherwise, it will cause memory leak.
996
999
srs_freep (tmpl->req );
997
-
998
- tmpl->source = s;
1000
+
999
1001
tmpl->req = r->copy ()->as_http ();
1000
1002
1001
1003
sflvs[sid] = entry;
@@ -1015,8 +1017,8 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
1015
1017
} else {
1016
1018
// The entry exists, we reuse it and update the request of stream and cache.
1017
1019
entry = sflvs[sid];
1018
- entry->stream ->update_auth (s, r);
1019
- entry->cache ->update_auth (s, r);
1020
+ entry->stream ->update_auth (r);
1021
+ entry->cache ->update_auth (r);
1020
1022
}
1021
1023
1022
1024
if (entry->stream ) {
@@ -1027,7 +1029,7 @@ srs_error_t SrsHttpStreamServer::http_mount(SrsLiveSource* s, SrsRequest* r)
1027
1029
return err;
1028
1030
}
1029
1031
1030
- void SrsHttpStreamServer::http_unmount (SrsLiveSource* s, SrsRequest* r)
1032
+ void SrsHttpStreamServer::http_unmount (SrsRequest* r)
1031
1033
{
1032
1034
std::string sid = r->get_stream_url ();
1033
1035
@@ -1133,7 +1135,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
1133
1135
}
1134
1136
}
1135
1137
}
1136
-
1138
+
1137
1139
SrsLiveSource* s = NULL ;
1138
1140
if ((err = _srs_sources->fetch_or_create (r, server, &s)) != srs_success) {
1139
1141
return srs_error_wrap (err, " source create" );
@@ -1146,7 +1148,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
1146
1148
s->set_gop_cache_max_frames (gcmf);
1147
1149
1148
1150
// create http streaming handler.
1149
- if ((err = http_mount (s, r)) != srs_success) {
1151
+ if ((err = http_mount (r)) != srs_success) {
1150
1152
return srs_error_wrap (err, " http mount" );
1151
1153
}
1152
1154
0 commit comments