@@ -573,6 +573,8 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi
573573 err = g .Error (err , "Error getting paths" )
574574 return
575575 }
576+
577+ nodes .Sort ()
576578 }
577579
578580 df , err = GetDataflow (fs .Self (), nodes , Cfg )
@@ -1180,26 +1182,8 @@ func MergeReaders(fs FileSysClient, fileType dbio.FileType, nodes FileNodes, lim
11801182 g .LogError (err )
11811183 }
11821184
1183- concurrency := runtime .NumCPU ()
1184- switch {
1185- case fs .GetProp ("CONCURRENCY" ) != "" :
1186- concurrency = cast .ToInt (fs .GetProp ("CONCURRENCY" ))
1187- case g .In (fs .FsType (), dbio .TypeFileS3 , dbio .TypeFileGoogle , dbio .TypeFileAzure ):
1188- switch {
1189- // lots of small files (less than 50kb)
1190- case len (nodes ) > 100 && nodes .AvgSize () < uint64 (50 * 1024 ):
1191- concurrency = 20
1192- default :
1193- concurrency = 10
1194- }
1195- case fs .FsType () == dbio .TypeFileLocal :
1196- concurrency = 3
1197- case concurrency == 1 :
1198- concurrency = 3
1199- }
1200-
1201- g .DebugLow ("merging %s readers of %d files [concurrency=%d] from %s" , fileType , len (nodes ), concurrency , url )
1202- readerChn := make (chan * iop.ReaderReady , concurrency )
1185+ g .DebugLow ("merging %s readers of %d files (sequential processing) from %s" , fileType , len (nodes ), url )
1186+ readerChn := make (chan * iop.ReaderReady )
12031187 go func () {
12041188 defer close (readerChn )
12051189
@@ -1209,24 +1193,18 @@ func MergeReaders(fs FileSysClient, fileType dbio.FileType, nodes FileNodes, lim
12091193 continue
12101194 }
12111195
1212- ds .Context .Wg .Read .Add ()
1213- go func (path string ) {
1214- defer ds .Context .Wg .Read .Done ()
1215- g .Debug ("processing reader from %s" , path )
1196+ g .Debug ("processing reader from %s" , path )
12161197
1217- reader , err := fs .Self ().GetReader (path )
1218- if err != nil {
1219- setError (g .Error (err , "Error getting reader" ))
1220- return
1221- }
1198+ reader , err := fs .Self ().GetReader (path )
1199+ if err != nil {
1200+ setError (g .Error (err , "Error getting reader" ))
1201+ return
1202+ }
12221203
1223- r := & iop.ReaderReady {Reader : reader , URI : path }
1224- readerChn <- r
1225- }(path )
1204+ r := & iop.ReaderReady {Reader : reader , URI : path }
1205+ readerChn <- r
12261206 }
12271207
1228- ds .Context .Wg .Read .Wait ()
1229-
12301208 }()
12311209
12321210 if g .In (fileType , dbio .FileTypeCsv , dbio .FileTypeJson , dbio .FileTypeJsonLines , dbio .FileTypeXml ) {
0 commit comments