Compare commits
2 Commits
18be3950dc
...
dcf054046f
| Author | SHA1 | Date | |
|---|---|---|---|
| dcf054046f | |||
| d2a788933d |
@@ -261,6 +261,7 @@ func processItemsInParallelNoResult[T any](
|
|||||||
) error {
|
) error {
|
||||||
count := len(items)
|
count := len(items)
|
||||||
errors := make(chan error, count)
|
errors := make(chan error, count)
|
||||||
|
completions := make(chan struct{}, count)
|
||||||
|
|
||||||
semaphore := make(chan struct{}, maxWorkers)
|
semaphore := make(chan struct{}, maxWorkers)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -288,20 +289,45 @@ func processItemsInParallelNoResult[T any](
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if progress != nil {
|
completions <- struct{}{}
|
||||||
progress.Update(index + 1)
|
|
||||||
}
|
|
||||||
}(i, item)
|
}(i, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
close(errors)
|
close(errors)
|
||||||
|
close(completions)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
completed := 0
|
||||||
|
firstError := make(chan error, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
for err := range errors {
|
for err := range errors {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
select {
|
||||||
|
case firstError <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for completed < count {
|
||||||
|
select {
|
||||||
|
case _, ok := <-completions:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
completed++
|
||||||
|
if progress != nil {
|
||||||
|
progress.Update(completed)
|
||||||
|
}
|
||||||
|
case err := <-firstError:
|
||||||
return err
|
return err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fmt.Errorf("timeout: %w", ctx.Err())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,15 +2,15 @@ package commands_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"golang.org/x/crypto/bcrypt"
|
|
||||||
"goyco/cmd/goyco/commands"
|
"goyco/cmd/goyco/commands"
|
||||||
"goyco/internal/database"
|
"goyco/internal/database"
|
||||||
"goyco/internal/repositories"
|
"goyco/internal/repositories"
|
||||||
"goyco/internal/testutils"
|
"goyco/internal/testutils"
|
||||||
|
|
||||||
|
"golang.org/x/crypto/bcrypt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
|
func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
|
||||||
@@ -25,7 +25,7 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
|
|||||||
wantErr bool
|
wantErr bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "creates users with deterministic fields",
|
name: "creates users with required fields",
|
||||||
count: successCount,
|
count: successCount,
|
||||||
repoFactory: func() repositories.UserRepository {
|
repoFactory: func() repositories.UserRepository {
|
||||||
base := testutils.NewMockUserRepository()
|
base := testutils.NewMockUserRepository()
|
||||||
@@ -37,14 +37,24 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
|
|||||||
if len(got) != successCount {
|
if len(got) != successCount {
|
||||||
t.Fatalf("expected %d users, got %d", successCount, len(got))
|
t.Fatalf("expected %d users, got %d", successCount, len(got))
|
||||||
}
|
}
|
||||||
|
usernames := make(map[string]bool)
|
||||||
for i, user := range got {
|
for i, user := range got {
|
||||||
expectedUsername := fmt.Sprintf("user_%d", i+1)
|
if user.Username == "" {
|
||||||
expectedEmail := fmt.Sprintf("user_%d@goyco.local", i+1)
|
t.Errorf("user %d expected non-empty username", i)
|
||||||
if user.Username != expectedUsername {
|
|
||||||
t.Errorf("user %d username mismatch: got %q want %q", i, user.Username, expectedUsername)
|
|
||||||
}
|
}
|
||||||
if user.Email != expectedEmail {
|
if len(user.Username) < 6 || user.Username[:5] != "user_" {
|
||||||
t.Errorf("user %d email mismatch: got %q want %q", i, user.Email, expectedEmail)
|
t.Errorf("user %d username should start with 'user_', got %q", i, user.Username)
|
||||||
|
}
|
||||||
|
if usernames[user.Username] {
|
||||||
|
t.Errorf("user %d duplicate username: %q", i, user.Username)
|
||||||
|
}
|
||||||
|
usernames[user.Username] = true
|
||||||
|
|
||||||
|
if user.Email == "" {
|
||||||
|
t.Errorf("user %d expected non-empty email", i)
|
||||||
|
}
|
||||||
|
if len(user.Email) < 20 || user.Email[:5] != "user_" || user.Email[len(user.Email)-12:] != "@goyco.local" {
|
||||||
|
t.Errorf("user %d email should match pattern 'user_*@goyco.local', got %q", i, user.Email)
|
||||||
}
|
}
|
||||||
if !user.EmailVerified {
|
if !user.EmailVerified {
|
||||||
t.Errorf("user %d expected EmailVerified to be true", i)
|
t.Errorf("user %d expected EmailVerified to be true", i)
|
||||||
@@ -83,6 +93,11 @@ func TestParallelProcessor_CreateUsersInParallel(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
repo := tt.repoFactory()
|
repo := tt.repoFactory()
|
||||||
p := commands.NewParallelProcessor()
|
p := commands.NewParallelProcessor()
|
||||||
|
passwordHash, err := bcrypt.GenerateFromPassword([]byte("password123"), bcrypt.DefaultCost)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to generate password hash: %v", err)
|
||||||
|
}
|
||||||
|
p.SetPasswordHash(string(passwordHash))
|
||||||
got, gotErr := p.CreateUsersInParallel(repo, tt.count, tt.progress)
|
got, gotErr := p.CreateUsersInParallel(repo, tt.count, tt.progress)
|
||||||
if gotErr != nil {
|
if gotErr != nil {
|
||||||
if !tt.wantErr {
|
if !tt.wantErr {
|
||||||
|
|||||||
Reference in New Issue
Block a user