Skip to content

Commit b470d74

Browse files
authored
Feature/issue 48 sequential handle glob match folder (#50)
1 parent 994bf3a commit b470d74

File tree

2 files changed

+14
-36
lines changed

2 files changed

+14
-36
lines changed

core/dbio/filesys/fs.go

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,8 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi
573573
err = g.Error(err, "Error getting paths")
574574
return
575575
}
576+
577+
nodes.Sort()
576578
}
577579

578580
df, err = GetDataflow(fs.Self(), nodes, Cfg)
@@ -1180,26 +1182,8 @@ func MergeReaders(fs FileSysClient, fileType dbio.FileType, nodes FileNodes, lim
11801182
g.LogError(err)
11811183
}
11821184

1183-
concurrency := runtime.NumCPU()
1184-
switch {
1185-
case fs.GetProp("CONCURRENCY") != "":
1186-
concurrency = cast.ToInt(fs.GetProp("CONCURRENCY"))
1187-
case g.In(fs.FsType(), dbio.TypeFileS3, dbio.TypeFileGoogle, dbio.TypeFileAzure):
1188-
switch {
1189-
// lots of small files (less than 50kb)
1190-
case len(nodes) > 100 && nodes.AvgSize() < uint64(50*1024):
1191-
concurrency = 20
1192-
default:
1193-
concurrency = 10
1194-
}
1195-
case fs.FsType() == dbio.TypeFileLocal:
1196-
concurrency = 3
1197-
case concurrency == 1:
1198-
concurrency = 3
1199-
}
1200-
1201-
g.DebugLow("merging %s readers of %d files [concurrency=%d] from %s", fileType, len(nodes), concurrency, url)
1202-
readerChn := make(chan *iop.ReaderReady, concurrency)
1185+
g.DebugLow("merging %s readers of %d files (sequential processing) from %s", fileType, len(nodes), url)
1186+
readerChn := make(chan *iop.ReaderReady)
12031187
go func() {
12041188
defer close(readerChn)
12051189

@@ -1209,24 +1193,18 @@ func MergeReaders(fs FileSysClient, fileType dbio.FileType, nodes FileNodes, lim
12091193
continue
12101194
}
12111195

1212-
ds.Context.Wg.Read.Add()
1213-
go func(path string) {
1214-
defer ds.Context.Wg.Read.Done()
1215-
g.Debug("processing reader from %s", path)
1196+
g.Debug("processing reader from %s", path)
12161197

1217-
reader, err := fs.Self().GetReader(path)
1218-
if err != nil {
1219-
setError(g.Error(err, "Error getting reader"))
1220-
return
1221-
}
1198+
reader, err := fs.Self().GetReader(path)
1199+
if err != nil {
1200+
setError(g.Error(err, "Error getting reader"))
1201+
return
1202+
}
12221203

1223-
r := &iop.ReaderReady{Reader: reader, URI: path}
1224-
readerChn <- r
1225-
}(path)
1204+
r := &iop.ReaderReady{Reader: reader, URI: path}
1205+
readerChn <- r
12261206
}
12271207

1228-
ds.Context.Wg.Read.Wait()
1229-
12301208
}()
12311209

12321210
if g.In(fileType, dbio.FileTypeCsv, dbio.FileTypeJson, dbio.FileTypeJsonLines, dbio.FileTypeXml) {

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module github.com/slingdata-io/sling-cli
22

3-
go 1.22
3+
go 1.24
44

5-
toolchain go1.22.5
5+
toolchain go1.24.5
66

77
require (
88
cloud.google.com/go v0.115.0

0 commit comments

Comments
 (0)