Skip to content

Commit 5f03edd

Browse files
fix(fs):After the file is copied or moved, flush the cache of the directory that was copied or moved to. (#592)
* fix(fs):After the file is copied, the cache of the copied directory is refreshed * fixed randomstring * fixed EOL and Sync branch chore(quark_uc): `webdav_policy` default to native_proxy * fixed uuid and other bugs * fixed comments * fixed EOL * add move refresh * fixed builds * fixed batch * change betch to task.go --------- Co-authored-by: Sumengjing <[email protected]>
1 parent 8b65c91 commit 5f03edd

File tree

3 files changed

+214
-7
lines changed

3 files changed

+214
-7
lines changed

internal/fs/copy.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func (t *CopyTask) Run() error {
4545
t.ClearEndTime()
4646
t.SetStartTime(time.Now())
4747
defer func() { t.SetEndTime(time.Now()) }()
48+
4849
var err error
4950
if t.srcStorage == nil {
5051
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
@@ -55,11 +56,27 @@ func (t *CopyTask) Run() error {
5556
if err != nil {
5657
return errors.WithMessage(err, "failed get storage")
5758
}
58-
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
59+
60+
// Use the task object's memory address as a unique identifier
61+
taskID := fmt.Sprintf("%p", t)
62+
63+
// Register task to batch tracker
64+
copyBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath)
65+
66+
// Execute copy operation
67+
err = copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
68+
69+
// Mark task completed and automatically refresh cache if needed
70+
copyBatchTracker.MarkTaskCompletedWithRefresh(taskID)
71+
72+
return err
5973
}
6074

6175
var CopyTaskManager *tache.Manager[*CopyTask]
6276

77+
// Batch tracker for copy operations
78+
var copyBatchTracker = NewBatchTracker("copy")
79+
6380
// Copy if in the same storage, call move method
6481
// if not, add copy task
6582
func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool) (task.TaskExtensionInfo, error) {
@@ -75,6 +92,10 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
7592
if srcStorage.GetStorage() == dstStorage.GetStorage() {
7693
err = op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
7794
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
95+
if err == nil {
96+
// Refresh target directory cache after successful same-storage copy
97+
op.ClearCache(dstStorage, dstDirActualPath)
98+
}
7899
return nil, err
79100
}
80101
}
@@ -98,7 +119,12 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
98119
_ = link.Close()
99120
return nil, errors.WithMessagef(err, "failed get [%s] stream", srcObjPath)
100121
}
101-
return nil, op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false)
122+
err = op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false)
123+
if err == nil {
124+
// Refresh target directory cache after successful direct file copy
125+
op.ClearCache(dstStorage, dstDirActualPath)
126+
}
127+
return nil, err
102128
}
103129
}
104130
// not in the same storage
@@ -131,6 +157,7 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src
131157
if err != nil {
132158
return errors.WithMessagef(err, "failed list src [%s] objs", srcObjPath)
133159
}
160+
134161
for _, obj := range objs {
135162
if utils.IsCanceled(t.Ctx()) {
136163
return nil

internal/fs/move.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ func (t *MoveTask) Run() error {
130130
return errors.WithMessage(err, "failed get storage")
131131
}
132132

133+
// Use the task object's memory address as a unique identifier
134+
taskID := fmt.Sprintf("%p", t)
135+
136+
// Register task to batch tracker
137+
moveBatchTracker.RegisterTask(taskID, t.dstStorage, t.DstDirPath)
138+
133139
// Phase 1: Async validation (all validation happens in background)
134140
t.mu.Lock()
135141
t.Status = "validating source and destination"
@@ -138,13 +144,17 @@ func (t *MoveTask) Run() error {
138144
// Check if source exists
139145
srcObj, err := op.Get(t.Ctx(), t.srcStorage, t.SrcObjPath)
140146
if err != nil {
147+
// Clean up tracker records if task failed
148+
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
141149
return errors.WithMessagef(err, "source file [%s] not found", stdpath.Base(t.SrcObjPath))
142150
}
143151

144152
// Check if destination already exists (if validation is required)
145153
if t.ValidateExistence {
146154
dstFilePath := stdpath.Join(t.DstDirPath, srcObj.GetName())
147155
if res, _ := op.Get(t.Ctx(), t.dstStorage, dstFilePath); res != nil {
156+
// Clean up tracker records if task failed
157+
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
148158
return errors.Errorf("destination file [%s] already exists", srcObj.GetName())
149159
}
150160
}
@@ -156,11 +166,16 @@ func (t *MoveTask) Run() error {
156166
t.IsRootTask = true
157167
t.RootTaskID = t.GetID()
158168
t.mu.Unlock()
159-
return t.runRootMoveTask()
169+
err = t.runRootMoveTask()
170+
} else {
171+
// Use safe move logic for files
172+
err = t.safeMoveOperation(srcObj)
160173
}
161174

162-
// Use safe move logic for files
163-
return t.safeMoveOperation(srcObj)
175+
// Mark task completed and automatically refresh cache if needed
176+
moveBatchTracker.MarkTaskCompletedWithRefresh(taskID)
177+
178+
return err
164179
}
165180

166181
func (t *MoveTask) runRootMoveTask() error {
@@ -236,11 +251,15 @@ func (t *MoveTask) runRootMoveTask() error {
236251
t.mu.Unlock()
237252
t.updateProgress()
238253

254+
239255
return nil
240256
}
241257

242258
var MoveTaskManager *tache.Manager[*MoveTask]
243259

260+
// Batch tracker for move operations
261+
var moveBatchTracker = NewBatchTracker("move")
262+
244263
// GetMoveProgress returns the progress of a move task by task ID
245264
func GetMoveProgress(taskID string) (*MoveProgress, bool) {
246265
if progress, ok := moveProgressMap.Load(taskID); ok {
@@ -487,7 +506,7 @@ func moveBetween2Storages(t *MoveTask, srcStorage, dstStorage driver.Driver, src
487506
return nil
488507
}
489508
srcSubObjPath := stdpath.Join(srcObjPath, obj.GetName())
490-
MoveTaskManager.Add(&MoveTask{
509+
subTask := &MoveTask{
491510
TaskExtension: task.TaskExtension{
492511
Creator: t.GetCreator(),
493512
ApiUrl: t.ApiUrl,
@@ -498,7 +517,8 @@ func moveBetween2Storages(t *MoveTask, srcStorage, dstStorage driver.Driver, src
498517
DstDirPath: dstObjPath,
499518
SrcStorageMp: srcStorage.GetStorage().MountPath,
500519
DstStorageMp: dstStorage.GetStorage().MountPath,
501-
})
520+
}
521+
MoveTaskManager.Add(subTask)
502522
}
503523

504524
t.Status = "cleaning up source directory"
@@ -508,6 +528,8 @@ func moveBetween2Storages(t *MoveTask, srcStorage, dstStorage driver.Driver, src
508528
} else {
509529
t.Status = "completed"
510530
}
531+
532+
511533
return nil
512534
} else {
513535
return moveFileBetween2Storages(t, srcStorage, dstStorage, srcObjPath, dstDirPath)
@@ -549,6 +571,7 @@ func moveFileBetween2Storages(tsk *MoveTask, srcStorage, dstStorage driver.Drive
549571
return errors.WithMessagef(err, "failed to delete src [%s] file from storage [%s] after successful copy", srcFilePath, srcStorage.GetStorage().MountPath)
550572
}
551573

574+
552575
tsk.SetProgress(100)
553576
tsk.Status = "completed"
554577
return nil
@@ -583,6 +606,10 @@ func _moveWithValidation(ctx context.Context, srcObjPath, dstDirPath string, val
583606
if srcStorage.GetStorage() == dstStorage.GetStorage() {
584607
err = op.Move(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
585608
if !errors.Is(err, errs.NotImplement) && !errors.Is(err, errs.NotSupport) {
609+
if err == nil {
610+
// For same-storage moves, refresh cache immediately since no batch tracking is used
611+
op.ClearCache(dstStorage, dstDirActualPath)
612+
}
586613
return nil, err
587614
}
588615
}

internal/fs/task.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package fs
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/OpenListTeam/OpenList/v4/internal/driver"
8+
"github.com/OpenListTeam/OpenList/v4/internal/op"
9+
)
10+
11+
// BatchTracker manages batch operations for cache refresh optimization
12+
// It aggregates multiple file operations by target directory and only refreshes
13+
// the cache once when all operations in a directory are completed
14+
type BatchTracker struct {
15+
mu sync.Mutex
16+
dirTasks map[string]*dirTaskInfo // dstStoragePath+dstDirPath -> dirTaskInfo
17+
pendingTasks map[string]string // taskID -> dstStoragePath+dstDirPath
18+
lastCleanup time.Time // last cleanup time
19+
name string // tracker name for debugging
20+
}
21+
22+
type dirTaskInfo struct {
23+
dstStorage driver.Driver
24+
dstDirPath string
25+
pendingTasks map[string]bool // taskID -> true
26+
lastActivity time.Time // last activity time (used for detecting abnormal situations)
27+
}
28+
29+
// NewBatchTracker creates a new batch tracker instance
30+
func NewBatchTracker(name string) *BatchTracker {
31+
return &BatchTracker{
32+
dirTasks: make(map[string]*dirTaskInfo),
33+
pendingTasks: make(map[string]string),
34+
lastCleanup: time.Now(),
35+
name: name,
36+
}
37+
}
38+
39+
// getDirKey generates unique key for target directory
40+
func (bt *BatchTracker) getDirKey(dstStorage driver.Driver, dstDirPath string) string {
41+
return dstStorage.GetStorage().MountPath + ":" + dstDirPath
42+
}
43+
44+
// RegisterTask registers a task to target directory for batch tracking
45+
func (bt *BatchTracker) RegisterTask(taskID string, dstStorage driver.Driver, dstDirPath string) {
46+
bt.mu.Lock()
47+
defer bt.mu.Unlock()
48+
49+
// Periodically clean up expired entries
50+
bt.cleanupIfNeeded()
51+
52+
dirKey := bt.getDirKey(dstStorage, dstDirPath)
53+
54+
// Record task to directory mapping
55+
bt.pendingTasks[taskID] = dirKey
56+
57+
// Initialize or update directory task information
58+
if info, exists := bt.dirTasks[dirKey]; exists {
59+
info.pendingTasks[taskID] = true
60+
info.lastActivity = time.Now()
61+
} else {
62+
bt.dirTasks[dirKey] = &dirTaskInfo{
63+
dstStorage: dstStorage,
64+
dstDirPath: dstDirPath,
65+
pendingTasks: map[string]bool{taskID: true},
66+
lastActivity: time.Now(),
67+
}
68+
}
69+
}
70+
71+
// MarkTaskCompleted marks a task as completed and returns whether cache refresh is needed
72+
// Returns (shouldRefresh, dstStorage, dstDirPath)
73+
func (bt *BatchTracker) MarkTaskCompleted(taskID string) (bool, driver.Driver, string) {
74+
bt.mu.Lock()
75+
defer bt.mu.Unlock()
76+
77+
dirKey, exists := bt.pendingTasks[taskID]
78+
if !exists {
79+
return false, nil, ""
80+
}
81+
82+
// Remove from pending tasks
83+
delete(bt.pendingTasks, taskID)
84+
85+
info, exists := bt.dirTasks[dirKey]
86+
if !exists {
87+
return false, nil, ""
88+
}
89+
90+
// Remove from directory tasks
91+
delete(info.pendingTasks, taskID)
92+
93+
// If no pending tasks left in this directory, trigger cache refresh
94+
if len(info.pendingTasks) == 0 {
95+
dstStorage := info.dstStorage
96+
dstDirPath := info.dstDirPath
97+
delete(bt.dirTasks, dirKey) // Delete directly, no need to update lastActivity
98+
return true, dstStorage, dstDirPath
99+
}
100+
101+
// Only update lastActivity when there are other tasks (indicating the directory still has active tasks)
102+
info.lastActivity = time.Now()
103+
return false, nil, ""
104+
}
105+
106+
// MarkTaskCompletedWithRefresh marks a task as completed and automatically refreshes cache if needed
107+
func (bt *BatchTracker) MarkTaskCompletedWithRefresh(taskID string) {
108+
shouldRefresh, dstStorage, dstDirPath := bt.MarkTaskCompleted(taskID)
109+
if shouldRefresh {
110+
op.ClearCache(dstStorage, dstDirPath)
111+
}
112+
}
113+
114+
// cleanupIfNeeded checks if cleanup is needed and executes cleanup if necessary
115+
func (bt *BatchTracker) cleanupIfNeeded() {
116+
now := time.Now()
117+
// Clean up every 10 minutes
118+
if now.Sub(bt.lastCleanup) > 10*time.Minute {
119+
bt.cleanupStaleEntries()
120+
bt.lastCleanup = now
121+
}
122+
}
123+
124+
// cleanupStaleEntries cleans up timed-out tasks to prevent memory leaks
125+
// Mainly used to clean up residual entries caused by abnormal situations (such as task crashes, process restarts, etc.)
126+
func (bt *BatchTracker) cleanupStaleEntries() {
127+
now := time.Now()
128+
for dirKey, info := range bt.dirTasks {
129+
// If no activity for more than 1 hour, it may indicate an abnormal situation, clean up this entry
130+
// Under normal circumstances, MarkTaskCompleted will be called when the task is completed and the entire entry will be deleted
131+
if now.Sub(info.lastActivity) > time.Hour {
132+
// Clean up related pending tasks
133+
for taskID := range info.pendingTasks {
134+
delete(bt.pendingTasks, taskID)
135+
}
136+
delete(bt.dirTasks, dirKey)
137+
}
138+
}
139+
}
140+
141+
// GetPendingTaskCount returns the number of pending tasks for debugging
142+
func (bt *BatchTracker) GetPendingTaskCount() int {
143+
bt.mu.Lock()
144+
defer bt.mu.Unlock()
145+
return len(bt.pendingTasks)
146+
}
147+
148+
// GetDirTaskCount returns the number of directories being tracked for debugging
149+
func (bt *BatchTracker) GetDirTaskCount() int {
150+
bt.mu.Lock()
151+
defer bt.mu.Unlock()
152+
return len(bt.dirTasks)
153+
}

0 commit comments

Comments
 (0)