Compare commits

...

2 Commits

2 changed files with 55 additions and 14 deletions

View File

@@ -261,6 +261,7 @@ func processItemsInParallelNoResult[T any](
) error { ) error {
count := len(items) count := len(items)
errors := make(chan error, count) errors := make(chan error, count)
completions := make(chan struct{}, count)
semaphore := make(chan struct{}, maxWorkers) semaphore := make(chan struct{}, maxWorkers)
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -288,20 +289,45 @@ func processItemsInParallelNoResult[T any](
return return
} }
if progress != nil { completions <- struct{}{}
progress.Update(index + 1)
}
}(i, item) }(i, item)
} }
go func() { go func() {
wg.Wait() wg.Wait()
close(errors) close(errors)
close(completions)
}() }()
for err := range errors { completed := 0
if err != nil { firstError := make(chan error, 1)
go func() {
for err := range errors {
if err != nil {
select {
case firstError <- err:
default:
}
return
}
}
}()
for completed < count {
select {
case _, ok := <-completions:
if !ok {
return nil
}
completed++
if progress != nil {
progress.Update(completed)
}
case err := <-firstError:
return err return err
case <-ctx.Done():
return fmt.Errorf("timeout: %w", ctx.Err())
} }
} }

View File

@@ -2,15 +2,15 @@ package commands_test
import ( import (
"errors" "errors"
"fmt"
"sync" "sync"
"testing" "testing"
"golang.org/x/crypto/bcrypt"
"goyco/cmd/goyco/commands" "goyco/cmd/goyco/commands"
"goyco/internal/database" "goyco/internal/database"
"goyco/internal/repositories" "goyco/internal/repositories"
"goyco/internal/testutils" "goyco/internal/testutils"
"golang.org/x/crypto/bcrypt"
) )
func TestParallelProcessor_CreateUsersInParallel(t *testing.T) { func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
@@ -25,7 +25,7 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
wantErr bool wantErr bool
}{ }{
{ {
name: "creates users with deterministic fields", name: "creates users with required fields",
count: successCount, count: successCount,
repoFactory: func() repositories.UserRepository { repoFactory: func() repositories.UserRepository {
base := testutils.NewMockUserRepository() base := testutils.NewMockUserRepository()
@@ -37,14 +37,24 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
if len(got) != successCount { if len(got) != successCount {
t.Fatalf("expected %d users, got %d", successCount, len(got)) t.Fatalf("expected %d users, got %d", successCount, len(got))
} }
usernames := make(map[string]bool)
for i, user := range got { for i, user := range got {
expectedUsername := fmt.Sprintf("user_%d", i+1) if user.Username == "" {
expectedEmail := fmt.Sprintf("user_%d@goyco.local", i+1) t.Errorf("user %d expected non-empty username", i)
if user.Username != expectedUsername {
t.Errorf("user %d username mismatch: got %q want %q", i, user.Username, expectedUsername)
} }
if user.Email != expectedEmail { if len(user.Username) < 6 || user.Username[:5] != "user_" {
t.Errorf("user %d email mismatch: got %q want %q", i, user.Email, expectedEmail) t.Errorf("user %d username should start with 'user_', got %q", i, user.Username)
}
if usernames[user.Username] {
t.Errorf("user %d duplicate username: %q", i, user.Username)
}
usernames[user.Username] = true
if user.Email == "" {
t.Errorf("user %d expected non-empty email", i)
}
if len(user.Email) < 20 || user.Email[:5] != "user_" || user.Email[len(user.Email)-12:] != "@goyco.local" {
t.Errorf("user %d email should match pattern 'user_*@goyco.local', got %q", i, user.Email)
} }
if !user.EmailVerified { if !user.EmailVerified {
t.Errorf("user %d expected EmailVerified to be true", i) t.Errorf("user %d expected EmailVerified to be true", i)
@@ -83,6 +93,11 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
t.Parallel() t.Parallel()
repo := tt.repoFactory() repo := tt.repoFactory()
p := commands.NewParallelProcessor() p := commands.NewParallelProcessor()
passwordHash, err := bcrypt.GenerateFromPassword([]byte("password123"), bcrypt.DefaultCost)
if err != nil {
t.Fatalf("failed to generate password hash: %v", err)
}
p.SetPasswordHash(string(passwordHash))
got, gotErr := p.CreateUsersInParallel(repo, tt.count, tt.progress) got, gotErr := p.CreateUsersInParallel(repo, tt.count, tt.progress)
if gotErr != nil { if gotErr != nil {
if !tt.wantErr { if !tt.wantErr {