Skip to content

Commit 06e3ca1

Browse files
committed
fix: ignore event inbox temp files
1 parent 40d29b9 commit 06e3ca1

2 files changed

Lines changed: 30 additions & 0 deletions

File tree

internal/persis/fileeventstore/collector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path/filepath"
1515
"sort"
16+
"strings"
1617
"time"
1718

1819
"github.com/dagucloud/dagu/internal/cmn/fileutil"
@@ -140,6 +141,9 @@ func (c *Collector) DrainOnce(_ context.Context) error {
140141
if entry.IsDir() {
141142
continue
142143
}
144+
if !strings.HasSuffix(entry.Name(), inboxSuffix) {
145+
continue
146+
}
143147
processed++
144148
path := filepath.Join(c.store.inboxDir, entry.Name())
145149
pending, err := c.readPendingEvent(path)

internal/persis/fileeventstore/collector_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,32 @@ func TestCollectorDrainOnceQuarantinesMalformedInbox(t *testing.T) {
7070
require.Len(t, entries, 1)
7171
}
7272

73+
func TestCollectorDrainOnceIgnoresAtomicWriteTempFiles(t *testing.T) {
74+
t.Parallel()
75+
76+
baseDir := t.TempDir()
77+
store, err := New(baseDir)
78+
require.NoError(t, err)
79+
80+
event := testEvent("evt-final", time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC))
81+
require.NoError(t, store.Emit(context.Background(), event))
82+
83+
tmpFile := filepath.Join(store.inboxDir, "pending.json.tmp.123")
84+
require.NoError(t, os.WriteFile(tmpFile, []byte("{partial"), filePermissions))
85+
86+
collector, err := NewCollector(baseDir, 10)
87+
require.NoError(t, err)
88+
require.NoError(t, collector.DrainOnce(context.Background()))
89+
90+
assertFileExists(t, tmpFile, true)
91+
assertInboxCount(t, store.inboxDir, 1)
92+
assertLogLineCount(t, filepath.Join(baseDir, "_2026032912.jsonl"), 1)
93+
94+
entries, err := os.ReadDir(store.quarantineDir)
95+
require.NoError(t, err)
96+
require.Empty(t, entries)
97+
}
98+
7399
func TestCollectorDrainOnceDropsDuplicateInboxEventsWithinSinglePass(t *testing.T) {
74100
t.Parallel()
75101

0 commit comments

Comments
 (0)