forked from slingdata-io/sling-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
folder2db: avoid parallel merge file #48
Copy link
Copy link
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Currently, the internal write order is random. We expect a strict, deterministic order.
Source stream: folder/*
Target object: simple_stream
1. Ensure Ordered Processing
- Question:
Should we remove the goroutine, or use an ordered channel pattern to maintain strict order?
go func() {
defer close(readerChn)
for _, path := range nodes.URIs() {
if strings.HasSuffix(path, "/") {
g.DebugLow("Skipping %s because it is not a file", path)
continue
}
ds.Context.Wg.Read.Add()
go func(path string) {
defer ds.Context.Wg.Read.Done()
g.Debug("Processing reader from %s", path)
reader, err := fs.Self().GetReader(path)
if err != nil {
setError(g.Error(err, "Error getting reader"))
return
}
r := &iop.ReaderReady{Reader: reader, URI: path}
readerChn <- r
}(path)
}
ds.Context.Wg.Read.Wait()
}()2. Add nodes.Sort() Before Processing
To ensure strict ordering, sort the list of file nodes before spawning readers.
var nodes FileNodes
if Cfg.ShouldUseDuckDB() {
nodes = FileNodes{FileNode{URI: url}}
} else {
g.Trace("Listing path: %s", url)
nodes, err = fs.Self().ListRecursive(url)
if err != nil {
err = g.Error(err, "Error retrieving paths")
return
}
g.Info("Nodes before sorting: %+v", nodes)
nodes.Sort()
g.Info("Nodes after sorting: %+v", nodes)
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request