From d2a788933d8fee73553b40db7173d6c591391ba5 Mon Sep 17 00:00:00 2001 From: Kharec Date: Wed, 10 Dec 2025 07:29:55 +0100 Subject: [PATCH] fix: track completed items in the main loop instead of using the index --- cmd/goyco/commands/parallel_processor.go | 36 ++++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/cmd/goyco/commands/parallel_processor.go b/cmd/goyco/commands/parallel_processor.go index 00bca9d..32c8286 100644 --- a/cmd/goyco/commands/parallel_processor.go +++ b/cmd/goyco/commands/parallel_processor.go @@ -261,6 +261,7 @@ func processItemsInParallelNoResult[T any]( ) error { count := len(items) errors := make(chan error, count) + completions := make(chan struct{}, count) semaphore := make(chan struct{}, maxWorkers) var wg sync.WaitGroup @@ -288,20 +289,45 @@ func processItemsInParallelNoResult[T any]( return } - if progress != nil { - progress.Update(index + 1) - } + completions <- struct{}{} }(i, item) } go func() { wg.Wait() close(errors) + close(completions) }() - for err := range errors { - if err != nil { + completed := 0 + firstError := make(chan error, 1) + + go func() { + for err := range errors { + if err != nil { + select { + case firstError <- err: + default: + } + return + } + } + }() + + for completed < count { + select { + case _, ok := <-completions: + if !ok { + return nil + } + completed++ + if progress != nil { + progress.Update(completed) + } + case err := <-firstError: return err + case <-ctx.Done(): + return fmt.Errorf("timeout: %w", ctx.Err()) } }