Refine transfer progress and cancellation flow
This commit is contained in:
parent
95847ad231
commit
6787a7a363
3 changed files with 179 additions and 24 deletions
|
|
@ -23,6 +23,7 @@ type CopyProgress struct {
|
|||
BytesDone int64
|
||||
BytesTotal int64
|
||||
CurrentPath string
|
||||
Stage string
|
||||
}
|
||||
|
||||
type copyProgressState struct {
|
||||
|
|
@ -108,6 +109,9 @@ func CopyPathWithProgressContext(ctx context.Context, srcPath string, dstDir str
|
|||
if err := copyDir(srcPath, targetPath, &tracker); err != nil {
|
||||
return cleanupOnErr(err)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return cleanupOnErr(err)
|
||||
}
|
||||
tracker.emit(srcPath, true)
|
||||
return targetPath, nil
|
||||
}
|
||||
|
|
@ -215,6 +219,7 @@ func MovePathWithProgressContext(ctx context.Context, srcPath string, dstDir str
|
|||
BytesDone: stats.BytesTotal,
|
||||
BytesTotal: stats.BytesTotal,
|
||||
CurrentPath: srcPath,
|
||||
Stage: "Move completed",
|
||||
})
|
||||
return targetPath, nil
|
||||
} else if !errors.Is(err, syscall.EXDEV) {
|
||||
|
|
@ -229,6 +234,14 @@ func MovePathWithProgressContext(ctx context.Context, srcPath string, dstDir str
|
|||
_ = os.RemoveAll(targetPath)
|
||||
return "", err
|
||||
}
|
||||
progress(CopyProgress{
|
||||
FilesDone: stats.FilesTotal,
|
||||
FilesTotal: stats.FilesTotal,
|
||||
BytesDone: stats.BytesTotal,
|
||||
BytesTotal: stats.BytesTotal,
|
||||
CurrentPath: srcPath,
|
||||
Stage: "Finalizing move",
|
||||
})
|
||||
if err := DeletePath(srcPath); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
@ -313,6 +326,12 @@ func copyDir(srcDir string, dstDir string, tracker *copyProgressState) error {
|
|||
}
|
||||
}
|
||||
|
||||
if tracker != nil && tracker.ctx != nil {
|
||||
if err := tracker.ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -343,6 +362,13 @@ func copyFile(srcPath string, dstPath string, mode os.FileMode, tracker *copyPro
|
|||
_ = os.Remove(dstPath)
|
||||
return err
|
||||
}
|
||||
if tracker != nil && tracker.ctx != nil {
|
||||
if err := tracker.ctx.Err(); err != nil {
|
||||
_ = dstFile.Close()
|
||||
_ = os.Remove(dstPath)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if tracker != nil {
|
||||
tracker.finishFile(srcPath)
|
||||
}
|
||||
|
|
@ -392,6 +418,7 @@ func (s *copyProgressState) emit(currentPath string, force bool) {
|
|||
BytesDone: s.bytesDone,
|
||||
BytesTotal: s.stats.BytesTotal,
|
||||
CurrentPath: currentPath,
|
||||
Stage: "Transferring data",
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
88
internal/fs/ops_test.go
Normal file
88
internal/fs/ops_test.go
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
package vfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCopyPathWithProgressContextRemovesPartialTargetOnCancel(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
root := t.TempDir()
|
||||
srcDir := filepath.Join(root, "src")
|
||||
dstDir := filepath.Join(root, "dst")
|
||||
if err := os.MkdirAll(srcDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir src: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(dstDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dst: %v", err)
|
||||
}
|
||||
|
||||
for idx := 0; idx < 64; idx++ {
|
||||
path := filepath.Join(srcDir, "file-"+strconv.Itoa(idx)+".txt")
|
||||
if err := os.WriteFile(path, []byte("payload-"+strconv.Itoa(idx)), 0o644); err != nil {
|
||||
t.Fatalf("write source file %d: %v", idx, err)
|
||||
}
|
||||
}
|
||||
|
||||
stats, err := CopyStats(srcDir)
|
||||
if err != nil {
|
||||
t.Fatalf("copy stats: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_, err = CopyPathWithProgressContext(ctx, srcDir, dstDir, false, stats, func(progress CopyProgress) {
|
||||
if progress.FilesDone >= 1 {
|
||||
cancel()
|
||||
}
|
||||
})
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Fatalf("expected context cancellation, got %v", err)
|
||||
}
|
||||
|
||||
targetPath := filepath.Join(dstDir, filepath.Base(srcDir))
|
||||
if _, statErr := os.Stat(targetPath); !errors.Is(statErr, os.ErrNotExist) {
|
||||
t.Fatalf("expected partial target to be removed, stat err=%v", statErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMovePathWithProgressContextCancelledBeforeStartKeepsSource(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
root := t.TempDir()
|
||||
srcFile := filepath.Join(root, "source.txt")
|
||||
dstDir := filepath.Join(root, "dst")
|
||||
if err := os.WriteFile(srcFile, []byte("payload"), 0o644); err != nil {
|
||||
t.Fatalf("write source: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(dstDir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir dst: %v", err)
|
||||
}
|
||||
|
||||
stats, err := CopyStats(srcFile)
|
||||
if err != nil {
|
||||
t.Fatalf("copy stats: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
_, err = MovePathWithProgressContext(ctx, srcFile, dstDir, false, stats, nil)
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Fatalf("expected context cancellation, got %v", err)
|
||||
}
|
||||
|
||||
if _, statErr := os.Stat(srcFile); statErr != nil {
|
||||
t.Fatalf("expected source to remain in place, stat err=%v", statErr)
|
||||
}
|
||||
targetPath := filepath.Join(dstDir, filepath.Base(srcFile))
|
||||
if _, statErr := os.Stat(targetPath); !errors.Is(statErr, os.ErrNotExist) {
|
||||
t.Fatalf("expected destination file to be absent, stat err=%v", statErr)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue