diff --git a/cmd/goyco/commands/parallel_processor.go b/cmd/goyco/commands/parallel_processor.go deleted file mode 100644 index 805db57..0000000 --- a/cmd/goyco/commands/parallel_processor.go +++ /dev/null @@ -1,527 +0,0 @@ -package commands - -import ( - "context" - "fmt" - "math/rand" - "runtime" - "sync" - "time" - - "goyco/internal/database" - "goyco/internal/repositories" -) - -type ParallelProcessor struct { - maxWorkers int - timeout time.Duration - passwordHash string - randSource *rand.Rand - randMu sync.Mutex -} - -func NewParallelProcessor() *ParallelProcessor { - maxWorkers := max(min(runtime.NumCPU(), 8), 2) - - seed := time.Now().UnixNano() - - return &ParallelProcessor{ - maxWorkers: maxWorkers, - timeout: 60 * time.Second, - randSource: rand.New(rand.NewSource(seed)), - } -} - -func (p *ParallelProcessor) SetPasswordHash(hash string) { - p.passwordHash = hash -} - -type indexedResult[T any] struct { - value T - index int -} - -func processInParallel[T any]( - ctx context.Context, - maxWorkers int, - count int, - processor func(index int) (T, error), - errorPrefix string, - progress *ProgressIndicator, -) ([]T, error) { - results := make(chan indexedResult[T], count) - errors := make(chan error, count) - - semaphore := make(chan struct{}, maxWorkers) - var wg sync.WaitGroup - - for i := range count { - wg.Add(1) - go func(index int) { - defer wg.Done() - - select { - case semaphore <- struct{}{}: - case <-ctx.Done(): - errors <- ctx.Err() - return - } - defer func() { <-semaphore }() - - value, err := processor(index + 1) - if err != nil { - errors <- fmt.Errorf("%s %d: %w", errorPrefix, index+1, err) - return - } - - results <- indexedResult[T]{value: value, index: index} - }(i) - } - - go func() { - wg.Wait() - close(results) - close(errors) - }() - - items := make([]T, count) - completed := 0 - firstError := make(chan error, 1) - - go func() { - for err := range errors { - if err != nil { - select { - case firstError <- err: - default: - } - return - } - } - }() - - for completed < count { - select { - case result, ok := <-results: - if !ok { - return items, nil - } - items[result.index] = result.value - completed++ - if progress != nil { - progress.Update(completed) - } - case err := <-firstError: - return nil, err - case <-ctx.Done(): - return nil, fmt.Errorf("timeout: %w", ctx.Err()) - } - } - - return items, nil -} - -func (p *ParallelProcessor) CreateUsersInParallel(userRepo repositories.UserRepository, count int, progress *ProgressIndicator) ([]database.User, error) { - ctx, cancel := context.WithTimeout(context.Background(), p.timeout) - defer cancel() - - return processInParallel(ctx, p.maxWorkers, count, - func(index int) (database.User, error) { - return p.createSingleUser(userRepo, index) - }, - "create user", - progress, - ) -} - -func (p *ParallelProcessor) CreatePostsInParallel(postRepo repositories.PostRepository, authorID uint, count int, progress *ProgressIndicator) ([]database.Post, error) { - ctx, cancel := context.WithTimeout(context.Background(), p.timeout) - defer cancel() - - return processInParallel(ctx, p.maxWorkers, count, - func(index int) (database.Post, error) { - return p.createSinglePost(postRepo, authorID, index) - }, - "create post", - progress, - ) -} - -func processItemsInParallel[T any, R any]( - ctx context.Context, - maxWorkers int, - items []T, - processor func(index int, item T) (R, error), - errorPrefix string, - aggregator func(accumulator R, value R) R, - initialValue R, - progress *ProgressIndicator, -) (R, error) { - count := len(items) - results := make(chan indexedResult[R], count) - errors := make(chan error, count) - - semaphore := make(chan struct{}, maxWorkers) - var wg sync.WaitGroup - - for i, item := range items { - wg.Add(1) - go func(index int, item T) { - defer wg.Done() - - select { - case semaphore <- struct{}{}: - case <-ctx.Done(): - errors <- ctx.Err() - return - } - defer func() { <-semaphore }() - - value, err := processor(index, item) - if err != nil { - errors <- fmt.Errorf("%s %d: %w", errorPrefix, index+1, err) - return - } - - results <- indexedResult[R]{value: value, index: index} - }(i, item) - } - - go func() { - wg.Wait() - close(results) - close(errors) - }() - - accumulator := initialValue - completed := 0 - firstError := make(chan error, 1) - - go func() { - for err := range errors { - if err != nil { - select { - case firstError <- err: - default: - } - return - } - } - }() - - for completed < count { - select { - case result, ok := <-results: - if !ok { - return accumulator, nil - } - accumulator = aggregator(accumulator, result.value) - completed++ - if progress != nil { - progress.Update(completed) - } - case err := <-firstError: - return initialValue, err - case <-ctx.Done(): - return initialValue, fmt.Errorf("timeout: %w", ctx.Err()) - } - } - - return accumulator, nil -} - -func (p *ParallelProcessor) CreateVotesInParallel(voteRepo repositories.VoteRepository, users []database.User, posts []database.Post, avgVotesPerPost int, progress *ProgressIndicator) (int, error) { - ctx, cancel := context.WithTimeout(context.Background(), p.timeout) - defer cancel() - - return processItemsInParallel(ctx, p.maxWorkers, posts, - func(index int, post database.Post) (int, error) { - return p.createVotesForPost(voteRepo, users, post, avgVotesPerPost) - }, - "create votes for post", - func(acc, val int) int { return acc + val }, - 0, - progress, - ) -} - -func processItemsInParallelNoResult[T any]( - ctx context.Context, - maxWorkers int, - items []T, - processor func(index int, item T) error, - errorFormatter func(index int, item T, err error) error, - progress *ProgressIndicator, -) error { - count := len(items) - errors := make(chan error, count) - completions := make(chan struct{}, count) - - semaphore := make(chan struct{}, maxWorkers) - var wg sync.WaitGroup - - for i, item := range items { - wg.Add(1) - go func(index int, item T) { - defer wg.Done() - - select { - case semaphore <- struct{}{}: - case <-ctx.Done(): - errors <- ctx.Err() - return - } - defer func() { <-semaphore }() - - err := processor(index, item) - if err != nil { - if errorFormatter != nil { - errors <- errorFormatter(index, item, err) - } else { - errors <- fmt.Errorf("process item %d: %w", index+1, err) - } - return - } - - completions <- struct{}{} - }(i, item) - } - - go func() { - wg.Wait() - close(errors) - close(completions) - }() - - completed := 0 - firstError := make(chan error, 1) - - go func() { - for err := range errors { - if err != nil { - select { - case firstError <- err: - default: - } - return - } - } - }() - - for completed < count { - select { - case _, ok := <-completions: - if !ok { - return nil - } - completed++ - if progress != nil { - progress.Update(completed) - } - case err := <-firstError: - return err - case <-ctx.Done(): - return fmt.Errorf("timeout: %w", ctx.Err()) - } - } - - return nil -} - -func (p *ParallelProcessor) UpdatePostScoresInParallel(postRepo repositories.PostRepository, voteRepo repositories.VoteRepository, posts []database.Post, progress *ProgressIndicator) error { - ctx, cancel := context.WithTimeout(context.Background(), p.timeout) - defer cancel() - - return processItemsInParallelNoResult(ctx, p.maxWorkers, posts, - func(index int, post database.Post) error { - return p.updateSinglePostScore(postRepo, voteRepo, post) - }, - func(index int, post database.Post, err error) error { - return fmt.Errorf("update post %d scores: %w", post.ID, err) - }, - progress, - ) -} - -func (p *ParallelProcessor) generateRandomIdentifier() string { - const length = 12 - const chars = "abcdefghijklmnopqrstuvwxyz0123456789" - identifier := make([]byte, length) - p.randMu.Lock() - for i := range identifier { - identifier[i] = chars[p.randSource.Intn(len(chars))] - } - p.randMu.Unlock() - return string(identifier) -} - -func (p *ParallelProcessor) createSingleUser(userRepo repositories.UserRepository, index int) (database.User, error) { - randomID := p.generateRandomIdentifier() - username := fmt.Sprintf("user_%s", randomID) - email := fmt.Sprintf("user_%s@goyco.local", randomID) - - const maxRetries = 10 - for range maxRetries { - user := &database.User{ - Username: username, - Email: email, - Password: p.passwordHash, - EmailVerified: true, - } - - if err := userRepo.Create(user); err != nil { - randomID = p.generateRandomIdentifier() - username = fmt.Sprintf("user_%s", randomID) - email = fmt.Sprintf("user_%s@goyco.local", randomID) - continue - } - - return *user, nil - } - - return database.User{}, fmt.Errorf("failed to create user after %d attempts", maxRetries) -} - -func (p *ParallelProcessor) createSinglePost(postRepo repositories.PostRepository, authorID uint, index int) (database.Post, error) { - sampleTitles := []string{ - "Amazing JavaScript Framework", - "Python Best Practices", - "Go Performance Tips", - "Database Optimization", - "Web Security Guide", - "Machine Learning Basics", - "Cloud Architecture", - "DevOps Automation", - "API Design Patterns", - "Frontend Optimization", - "Backend Scaling", - "Container Orchestration", - "Microservices Architecture", - "Testing Strategies", - "Code Review Process", - "Version Control Best Practices", - "Continuous Integration", - "Monitoring and Alerting", - "Error Handling Patterns", - "Data Structures Explained", - } - - sampleDomains := []string{ - "example.com", - "techblog.org", - "devguide.net", - "programming.io", - "codeexamples.com", - "tutorialhub.org", - "bestpractices.dev", - "learnprogramming.net", - "codingtips.org", - "softwareengineering.com", - } - - title := sampleTitles[index%len(sampleTitles)] - if index >= len(sampleTitles) { - title = fmt.Sprintf("%s - Part %d", title, (index/len(sampleTitles))+1) - } - - domain := sampleDomains[index%len(sampleDomains)] - randomID := p.generateRandomIdentifier() - path := fmt.Sprintf("/article/%s", randomID) - url := fmt.Sprintf("https://%s%s", domain, path) - - content := fmt.Sprintf("Autogenerated seed post #%d\n\nThis is sample content for testing purposes. The post discusses %s and provides valuable insights.", index, title) - - const maxRetries = 10 - for range maxRetries { - post := &database.Post{ - Title: title, - URL: url, - Content: content, - AuthorID: &authorID, - UpVotes: 0, - DownVotes: 0, - Score: 0, - } - - if err := postRepo.Create(post); err != nil { - randomID = p.generateRandomIdentifier() - path = fmt.Sprintf("/article/%s", randomID) - url = fmt.Sprintf("https://%s%s", domain, path) - continue - } - - return *post, nil - } - - return database.Post{}, fmt.Errorf("failed to create post after %d attempts", maxRetries) -} - -func (p *ParallelProcessor) createVotesForPost(voteRepo repositories.VoteRepository, users []database.User, post database.Post, avgVotesPerPost int) (int, error) { - p.randMu.Lock() - numVotes := p.randSource.Intn(avgVotesPerPost*2 + 1) - p.randMu.Unlock() - - if numVotes == 0 && avgVotesPerPost > 0 { - p.randMu.Lock() - if p.randSource.Intn(5) > 0 { - numVotes = 1 - } - p.randMu.Unlock() - } - - totalVotes := 0 - usedUsers := make(map[uint]bool) - - for i := 0; i < numVotes && len(usedUsers) < len(users); i++ { - p.randMu.Lock() - userIdx := p.randSource.Intn(len(users)) - p.randMu.Unlock() - user := users[userIdx] - - if usedUsers[user.ID] { - continue - } - usedUsers[user.ID] = true - - p.randMu.Lock() - voteTypeInt := p.randSource.Intn(10) - p.randMu.Unlock() - var voteType database.VoteType - if voteTypeInt < 7 { - voteType = database.VoteUp - } else { - voteType = database.VoteDown - } - - vote := &database.Vote{ - UserID: &user.ID, - PostID: post.ID, - Type: voteType, - } - - if err := voteRepo.CreateOrUpdate(vote); err != nil { - return totalVotes, fmt.Errorf("create or update vote: %w", err) - } - - totalVotes++ - } - - return totalVotes, nil -} - -func (p *ParallelProcessor) updateSinglePostScore(postRepo repositories.PostRepository, voteRepo repositories.VoteRepository, post database.Post) error { - upVotes, downVotes, err := getVoteCounts(voteRepo, post.ID) - if err != nil { - return fmt.Errorf("get vote counts: %w", err) - } - - post.UpVotes = upVotes - post.DownVotes = downVotes - post.Score = upVotes - downVotes - - if err := postRepo.Update(&post); err != nil { - return fmt.Errorf("update post: %w", err) - } - - return nil -} diff --git a/cmd/goyco/commands/parallel_processor_test.go b/cmd/goyco/commands/parallel_processor_test.go deleted file mode 100644 index 6458c27..0000000 --- a/cmd/goyco/commands/parallel_processor_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package commands_test - -import ( - "errors" - "sync" - "testing" - - "goyco/cmd/goyco/commands" - "goyco/internal/database" - "goyco/internal/repositories" - "goyco/internal/testutils" - - "golang.org/x/crypto/bcrypt" -) - -func TestParallelProcessor_CreateUsersInParallel(t *testing.T) { - const successCount = 4 - - tests := []struct { - name string - count int - repoFactory func() repositories.UserRepository - progress *commands.ProgressIndicator - validate func(t *testing.T, got []database.User) - wantErr bool - }{ - { - name: "creates users with required fields", - count: successCount, - repoFactory: func() repositories.UserRepository { - base := testutils.NewMockUserRepository() - return newFakeUserRepo(base, 0, nil) - }, - progress: nil, - validate: func(t *testing.T, got []database.User) { - t.Helper() - if len(got) != successCount { - t.Fatalf("expected %d users, got %d", successCount, len(got)) - } - usernames := make(map[string]bool) - for i, user := range got { - if user.Username == "" { - t.Errorf("user %d expected non-empty username", i) - } - if len(user.Username) < 6 || user.Username[:5] != "user_" { - t.Errorf("user %d username should start with 'user_', got %q", i, user.Username) - } - if usernames[user.Username] { - t.Errorf("user %d duplicate username: %q", i, user.Username) - } - usernames[user.Username] = true - - if user.Email == "" { - t.Errorf("user %d expected non-empty email", i) - } - if len(user.Email) < 20 || user.Email[:5] != "user_" || user.Email[len(user.Email)-12:] != "@goyco.local" { - t.Errorf("user %d email should match pattern 'user_*@goyco.local', got %q", i, user.Email) - } - if !user.EmailVerified { - t.Errorf("user %d expected EmailVerified to be true", i) - } - if user.ID == 0 { - t.Errorf("user %d expected non-zero ID", i) - } - if user.Password == "" { - t.Errorf("user %d expected hashed password to be populated", i) - } - if err := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte("password123")); err != nil { - t.Errorf("user %d password not hashed correctly: %v", i, err) - } - if user.CreatedAt.IsZero() { - t.Errorf("user %d expected CreatedAt to be set", i) - } - if user.UpdatedAt.IsZero() { - t.Errorf("user %d expected UpdatedAt to be set", i) - } - } - }, - }, - { - name: "returns error when repository create fails", - count: 3, - repoFactory: func() repositories.UserRepository { - base := testutils.NewMockUserRepository() - return newFakeUserRepo(base, 1, errors.New("create failure")) - }, - progress: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - repo := tt.repoFactory() - p := commands.NewParallelProcessor() - passwordHash, err := bcrypt.GenerateFromPassword([]byte("password123"), bcrypt.DefaultCost) - if err != nil { - t.Fatalf("failed to generate password hash: %v", err) - } - p.SetPasswordHash(string(passwordHash)) - got, gotErr := p.CreateUsersInParallel(repo, tt.count, tt.progress) - if gotErr != nil { - if !tt.wantErr { - t.Errorf("CreateUsersInParallel() failed: %v", gotErr) - } - if got != nil { - t.Error("expected nil result when error occurs") - } - return - } - if tt.wantErr { - t.Fatal("CreateUsersInParallel() succeeded unexpectedly") - } - if tt.validate != nil { - tt.validate(t, got) - } - }) - } -} - -type fakeUserRepo struct { - repositories.UserRepository - mu sync.Mutex - failAt int - err error - calls int -} - -func newFakeUserRepo(base repositories.UserRepository, failAt int, err error) *fakeUserRepo { - return &fakeUserRepo{ - UserRepository: base, - failAt: failAt, - err: err, - } -} - -func (r *fakeUserRepo) Create(user *database.User) error { - r.mu.Lock() - defer r.mu.Unlock() - r.calls++ - if r.failAt > 0 && r.calls >= r.failAt { - return r.err - } - return r.UserRepository.Create(user) -}