From 65576cc6238679b7673952d28e89702c3aaeec48 Mon Sep 17 00:00:00 2001 From: Kharec Date: Fri, 21 Nov 2025 16:16:35 +0100 Subject: [PATCH] feat: keep seeding fast and predictable even when parallelized --- cmd/goyco/commands/parallel_processor.go | 122 +++++++++++++++++------ cmd/goyco/commands/seed.go | 58 ++++++++--- 2 files changed, 133 insertions(+), 47 deletions(-) diff --git a/cmd/goyco/commands/parallel_processor.go b/cmd/goyco/commands/parallel_processor.go index 0ab6f90..3243afb 100644 --- a/cmd/goyco/commands/parallel_processor.go +++ b/cmd/goyco/commands/parallel_processor.go @@ -2,9 +2,9 @@ package commands import ( "context" - "crypto/rand" + cryptoRand "crypto/rand" "fmt" - "math/big" + "math/rand" "runtime" "sync" "time" @@ -17,14 +17,24 @@ type ParallelProcessor struct { maxWorkers int timeout time.Duration passwordHash string + randSource *rand.Rand + randMu sync.Mutex } func NewParallelProcessor() *ParallelProcessor { maxWorkers := max(min(runtime.NumCPU(), 8), 2) + seed := time.Now().UnixNano() + seedBytes := make([]byte, 8) + if _, err := cryptoRand.Read(seedBytes); err == nil { + seed = int64(seedBytes[0])<<56 | int64(seedBytes[1])<<48 | int64(seedBytes[2])<<40 | int64(seedBytes[3])<<32 | + int64(seedBytes[4])<<24 | int64(seedBytes[5])<<16 | int64(seedBytes[6])<<8 | int64(seedBytes[7]) + } + return &ParallelProcessor{ maxWorkers: maxWorkers, - timeout: 30 * time.Second, + timeout: 60 * time.Second, + randSource: rand.New(rand.NewSource(seed)), } } @@ -73,8 +83,21 @@ func (p *ParallelProcessor) CreateUsersInParallel(userRepo repositories.UserRepo users := make([]database.User, count) completed := 0 + firstError := make(chan error, 1) - for { + go func() { + for err := range errors { + if err != nil { + select { + case firstError <- err: + default: + } + return + } + } + }() + + for completed < count { select { case result, ok := <-results: if !ok { @@ -85,14 +108,14 @@ func (p *ParallelProcessor) CreateUsersInParallel(userRepo repositories.UserRepo if progress != nil { progress.Update(completed) } - case err := <-errors: - if err != nil { - return nil, err - } + case err := <-firstError: + return nil, err case <-ctx.Done(): return nil, fmt.Errorf("timeout creating users: %w", ctx.Err()) } } + + return users, nil } func (p *ParallelProcessor) CreatePostsInParallel(postRepo repositories.PostRepository, authorID uint, count int, progress *ProgressIndicator) ([]database.Post, error) { @@ -136,8 +159,21 @@ func (p *ParallelProcessor) CreatePostsInParallel(postRepo repositories.PostRepo posts := make([]database.Post, count) completed := 0 + firstError := make(chan error, 1) - for { + go func() { + for err := range errors { + if err != nil { + select { + case firstError <- err: + default: + } + return + } + } + }() + + for completed < count { select { case result, ok := <-results: if !ok { @@ -148,14 +184,14 @@ func (p *ParallelProcessor) CreatePostsInParallel(postRepo repositories.PostRepo if progress != nil { progress.Update(completed) } - case err := <-errors: - if err != nil { - return nil, err - } + case err := <-firstError: + return nil, err case <-ctx.Done(): return nil, fmt.Errorf("timeout creating posts: %w", ctx.Err()) } } + + return posts, nil } func (p *ParallelProcessor) CreateVotesInParallel(voteRepo repositories.VoteRepository, users []database.User, posts []database.Post, avgVotesPerPost int, progress *ProgressIndicator) (int, error) { @@ -199,8 +235,21 @@ func (p *ParallelProcessor) CreateVotesInParallel(voteRepo repositories.VoteRepo totalVotes := 0 completed := 0 + firstError := make(chan error, 1) - for { + go func() { + for err := range errors { + if err != nil { + select { + case firstError <- err: + default: + } + return + } + } + }() + + for completed < len(posts) { select { case result, ok := <-results: if !ok { @@ -211,14 +260,14 @@ func (p *ParallelProcessor) CreateVotesInParallel(voteRepo repositories.VoteRepo if progress != nil { progress.Update(completed) } - case err := <-errors: - if err != nil { - return 0, err - } + case err := <-firstError: + return 0, err case <-ctx.Done(): return 0, fmt.Errorf("timeout creating votes: %w", ctx.Err()) } } + + return totalVotes, nil } func (p *ParallelProcessor) UpdatePostScoresInParallel(postRepo repositories.PostRepository, voteRepo repositories.VoteRepository, posts []database.Post, progress *ProgressIndicator) error { @@ -284,19 +333,20 @@ type voteResult struct { index int } -func generateRandomIdentifier() string { +func (p *ParallelProcessor) generateRandomIdentifier() string { const length = 12 const chars = "abcdefghijklmnopqrstuvwxyz0123456789" identifier := make([]byte, length) + p.randMu.Lock() for i := range identifier { - num, _ := rand.Int(rand.Reader, big.NewInt(int64(len(chars)))) - identifier[i] = chars[num.Int64()] + identifier[i] = chars[p.randSource.Intn(len(chars))] } + p.randMu.Unlock() return string(identifier) } func (p *ParallelProcessor) createSingleUser(userRepo repositories.UserRepository, index int) (database.User, error) { - randomID := generateRandomIdentifier() + randomID := p.generateRandomIdentifier() username := fmt.Sprintf("user_%s", randomID) email := fmt.Sprintf("user_%s@goyco.local", randomID) @@ -315,7 +365,7 @@ func (p *ParallelProcessor) createSingleUser(userRepo repositories.UserRepositor } if err := userRepo.Create(user); err != nil { - randomID = generateRandomIdentifier() + randomID = p.generateRandomIdentifier() username = fmt.Sprintf("user_%s", randomID) email = fmt.Sprintf("user_%s@goyco.local", randomID) continue @@ -370,7 +420,7 @@ func (p *ParallelProcessor) createSinglePost(postRepo repositories.PostRepositor } domain := sampleDomains[index%len(sampleDomains)] - randomID := generateRandomIdentifier() + randomID := p.generateRandomIdentifier() path := fmt.Sprintf("/article/%s", randomID) url := fmt.Sprintf("https://%s%s", domain, path) @@ -389,7 +439,7 @@ func (p *ParallelProcessor) createSinglePost(postRepo repositories.PostRepositor } if err := postRepo.Create(post); err != nil { - randomID = generateRandomIdentifier() + randomID = p.generateRandomIdentifier() path = fmt.Sprintf("/article/%s", randomID) url = fmt.Sprintf("https://%s%s", domain, path) continue @@ -402,31 +452,37 @@ func (p *ParallelProcessor) createSinglePost(postRepo repositories.PostRepositor } func (p *ParallelProcessor) createVotesForPost(voteRepo repositories.VoteRepository, users []database.User, post database.Post, avgVotesPerPost int) (int, error) { - voteCount, _ := rand.Int(rand.Reader, big.NewInt(int64(avgVotesPerPost*2)+1)) - numVotes := int(voteCount.Int64()) + p.randMu.Lock() + numVotes := p.randSource.Intn(avgVotesPerPost*2 + 1) + p.randMu.Unlock() if numVotes == 0 && avgVotesPerPost > 0 { - chance, _ := rand.Int(rand.Reader, big.NewInt(5)) - if chance.Int64() > 0 { + p.randMu.Lock() + if p.randSource.Intn(5) > 0 { numVotes = 1 } + p.randMu.Unlock() } totalVotes := 0 usedUsers := make(map[uint]bool) for i := 0; i < numVotes && len(usedUsers) < len(users); i++ { - userIdx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(users)))) - user := users[userIdx.Int64()] + p.randMu.Lock() + userIdx := p.randSource.Intn(len(users)) + p.randMu.Unlock() + user := users[userIdx] if usedUsers[user.ID] { continue } usedUsers[user.ID] = true - voteTypeInt, _ := rand.Int(rand.Reader, big.NewInt(10)) + p.randMu.Lock() + voteTypeInt := p.randSource.Intn(10) + p.randMu.Unlock() var voteType database.VoteType - if voteTypeInt.Int64() < 7 { + if voteTypeInt < 7 { voteType = database.VoteUp } else { voteType = database.VoteDown diff --git a/cmd/goyco/commands/seed.go b/cmd/goyco/commands/seed.go index 60f0277..44df0c4 100644 --- a/cmd/goyco/commands/seed.go +++ b/cmd/goyco/commands/seed.go @@ -1,13 +1,15 @@ package commands import ( - "crypto/rand" + cryptoRand "crypto/rand" "errors" "flag" "fmt" - "math/big" + "math/rand" "os" "strings" + "sync" + "time" "goyco/internal/config" "goyco/internal/database" @@ -17,6 +19,34 @@ import ( "gorm.io/gorm" ) +var ( + seedRandSource *rand.Rand + seedRandOnce sync.Once +) + +func initSeedRand() { + seedRandOnce.Do(func() { + seed := time.Now().UnixNano() + seedBytes := make([]byte, 8) + if _, err := cryptoRand.Read(seedBytes); err == nil { + seed = int64(seedBytes[0])<<56 | int64(seedBytes[1])<<48 | int64(seedBytes[2])<<40 | int64(seedBytes[3])<<32 | + int64(seedBytes[4])<<24 | int64(seedBytes[5])<<16 | int64(seedBytes[6])<<8 | int64(seedBytes[7]) + } + seedRandSource = rand.New(rand.NewSource(seed)) + }) +} + +func generateRandomIdentifier() string { + initSeedRand() + const length = 12 + const chars = "abcdefghijklmnopqrstuvwxyz0123456789" + identifier := make([]byte, length) + for i := range identifier { + identifier[i] = chars[seedRandSource.Intn(len(chars))] + } + return string(identifier) +} + func HandleSeedCommand(cfg *config.Config, name string, args []string) error { fs := newFlagSet(name, printSeedUsage) if err := parseCommand(fs, args, name); err != nil { @@ -360,44 +390,44 @@ func createRandomPosts(postRepo repositories.PostRepository, authorID uint, coun } func generateRandomPath() string { - pathLength, _ := rand.Int(rand.Reader, big.NewInt(20)) + initSeedRand() + pathLength := seedRandSource.Intn(20) path := "/article/" - for i := int64(0); i < pathLength.Int64()+5; i++ { - randomChar, _ := rand.Int(rand.Reader, big.NewInt(26)) - path += string(rune('a' + randomChar.Int64())) + for i := 0; i < pathLength+5; i++ { + randomChar := seedRandSource.Intn(26) + path += string(rune('a' + randomChar)) } return path } func createRandomVotes(voteRepo repositories.VoteRepository, users []database.User, posts []database.Post, avgVotesPerPost int) (int, error) { + initSeedRand() totalVotes := 0 for _, post := range posts { - voteCount, _ := rand.Int(rand.Reader, big.NewInt(int64(avgVotesPerPost*2)+1)) - numVotes := int(voteCount.Int64()) + numVotes := seedRandSource.Intn(avgVotesPerPost*2 + 1) if numVotes == 0 && avgVotesPerPost > 0 { - chance, _ := rand.Int(rand.Reader, big.NewInt(5)) - if chance.Int64() > 0 { + if seedRandSource.Intn(5) > 0 { numVotes = 1 } } usedUsers := make(map[uint]bool) for i := 0; i < numVotes && len(usedUsers) < len(users); i++ { - userIdx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(users)))) - user := users[userIdx.Int64()] + userIdx := seedRandSource.Intn(len(users)) + user := users[userIdx] if usedUsers[user.ID] { continue } usedUsers[user.ID] = true - voteTypeInt, _ := rand.Int(rand.Reader, big.NewInt(10)) + voteTypeInt := seedRandSource.Intn(10) var voteType database.VoteType - if voteTypeInt.Int64() < 7 { + if voteTypeInt < 7 { voteType = database.VoteUp } else { voteType = database.VoteDown