@@ -1269,6 +1269,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12691269 if err == nil {
12701270 mrd .mu .Lock ()
12711271 if len (mrd .activeRanges ) == 0 && mrd .numActiveRanges == 0 {
1272+ mrd .mu .Unlock ()
12721273 mrd .closeReceiver <- true
12731274 mrd .closeSender <- true
12741275 return
@@ -1277,34 +1278,37 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12771278 arr := resp .GetObjectDataRanges ()
12781279 for _ , val := range arr {
12791280 id := val .GetReadRange ().GetReadId ()
1280- mrd .mu .Lock ()
1281- _ , ok := mrd .activeRanges [id ]
1282- if ! ok {
1283- // it's ok to ignore responses for read_id not in map as user would have been notified by callback.
1284- continue
1285- }
1286- _ , err = mrd .activeRanges [id ].writer .Write (val .GetChecksummedData ().GetContent ())
1287- if err != nil {
1288- mrd .activeRanges [id ].callback (mrd .activeRanges [id ].offset , mrd .activeRanges [id ].totalBytesWritten , err )
1289- mrd .numActiveRanges --
1290- delete (mrd .activeRanges , id )
1291- } else {
1292- mrd .activeRanges [id ] = mrdRange {
1293- readID : mrd .activeRanges [id ].readID ,
1294- writer : mrd .activeRanges [id ].writer ,
1295- offset : mrd .activeRanges [id ].offset ,
1296- limit : mrd .activeRanges [id ].limit ,
1297- currentBytesWritten : mrd .activeRanges [id ].currentBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1298- totalBytesWritten : mrd .activeRanges [id ].totalBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1299- callback : mrd .activeRanges [id ].callback ,
1281+ func () {
1282+ mrd .mu .Lock ()
1283+ defer mrd .mu .Unlock ()
1284+ currRange , ok := mrd .activeRanges [id ]
1285+ if ! ok {
1286+ // it's ok to ignore responses for read_id not in map as user would have been notified by callback.
1287+ return
13001288 }
1301- }
1302- if val .GetRangeEnd () {
1303- mrd .activeRanges [id ].callback (mrd .activeRanges [id ].offset , mrd .activeRanges [id ].totalBytesWritten , nil )
1304- mrd .numActiveRanges --
1305- delete (mrd .activeRanges , id )
1306- }
1307- mrd .mu .Unlock ()
1289+ _ , err = currRange .writer .Write (val .GetChecksummedData ().GetContent ())
1290+ if err != nil {
1291+ currRange .callback (currRange .offset , currRange .totalBytesWritten , err )
1292+ mrd .numActiveRanges --
1293+ delete (mrd .activeRanges , id )
1294+ } else {
1295+ currRange = mrdRange {
1296+ readID : currRange .readID ,
1297+ writer : currRange .writer ,
1298+ offset : currRange .offset ,
1299+ limit : currRange .limit ,
1300+ currentBytesWritten : currRange .currentBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1301+ totalBytesWritten : currRange .totalBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1302+ callback : currRange .callback ,
1303+ }
1304+ mrd .activeRanges [id ] = currRange
1305+ }
1306+ if val .GetRangeEnd () {
1307+ currRange .callback (currRange .offset , currRange .totalBytesWritten , nil )
1308+ mrd .numActiveRanges --
1309+ delete (mrd .activeRanges , id )
1310+ }
1311+ }()
13081312 }
13091313 }
13101314 }
@@ -1459,8 +1463,8 @@ func (mrd *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback f
14591463 spec := mrdRange {readID : id , writer : output , offset : offset , limit : limit , currentBytesWritten : 0 , totalBytesWritten : 0 , callback : callback }
14601464 mrd .mu .Lock ()
14611465 mrd .numActiveRanges ++
1462- mrd .rangesToRead <- []mrdRange {spec }
14631466 mrd .mu .Unlock ()
1467+ mrd .rangesToRead <- []mrdRange {spec }
14641468 } else {
14651469 callback (offset , 0 , errors .New ("storage: cannot add range because the stream is closed" ))
14661470 }
0 commit comments