diff options
-rw-r--r-- | internal/options/options.go | 28 | ||||
-rw-r--r-- | main.go | 84 |
2 files changed, 35 insertions, 77 deletions
diff --git a/internal/options/options.go b/internal/options/options.go index 9a9a38c..e98d6eb 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -6,35 +6,21 @@ import ( ) type Options struct { - // Input file - InputFile string - MaxFileBuffer int - StartLineInclusive int64 - EndLineExclusive int64 - - // Website visits - NumWorkers int - Timeout time.Duration - MaxResponse int64 - - // Health and metrics - MetricsInterval time.Duration + InputFile string + NumWorkers int Limit int + MaxResponse int64 + Timeout time.Duration + MetricsInterval time.Duration } func Parse() (opts Options) { flag.StringVar(&opts.InputFile, "i", "", "input file, one domain name per line") - flag.IntVar(&opts.MaxFileBuffer, "b", 512, "max bytes to read from input file at once in MiB") - flag.Int64Var(&opts.StartLineInclusive, "s", 0, "first line to read in input file, inclusive and zero-based index") - flag.Int64Var(&opts.EndLineExclusive, "e", 0, "last line to read in input file, exclusive and zero-based; 0 to disable") - flag.IntVar(&opts.NumWorkers, "w", 32, "number of parallel workers") - flag.DurationVar(&opts.Timeout, "t", 10*time.Second, "timeout for each website visit") + flag.IntVar(&opts.Limit, "l", 16, "rate-limit that kicks in before handing out work in requests/s") flag.Int64Var(&opts.MaxResponse, "r", 128, "max response body size to accept in MiB") - + flag.DurationVar(&opts.Timeout, "t", 10*time.Second, "timeout for each website visit") flag.DurationVar(&opts.MetricsInterval, "m", 5*time.Second, "how often to emit metrics") - flag.IntVar(&opts.Limit, "l", 16, "rate-limit that kicks in before handing out work in requests/s") - flag.Parse() return } @@ -86,13 +86,9 @@ func main() { }() log.Printf("INFO: generating work\n") - nextLine, readAll := workGenerator(ctx, opts, fp, questionCh) - if !readAll { - notice := fmt.Sprintf("only read up until line %d", nextLine) - if opts.StartLineInclusive != 0 { - notice += fmt.Sprintf(" (line %d relative to start)", nextLine-opts.StartLineInclusive) - } - log.Printf("NOTICE: %s\n", notice) + workGenerator(ctx, opts, fp, questionCh) + if err := ctx.Err(); err != nil { + log.Printf("WARNING: premature shutdown (context cancelled)\n") } } @@ -105,6 +101,7 @@ func awaitHandler(ctx context.Context, cancel context.CancelFunc) { case <-sigs: case <-ctx.Done(): } + cancel() } @@ -182,7 +179,7 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna } } -func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) (int64, bool) { +func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) { var wg sync.WaitGroup defer wg.Wait() @@ -193,7 +190,7 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest go func() { wg.Add(1) defer wg.Done() - rateMetrics(cctx, opts, &nextLine) + generatorMetrics(cctx, opts, &nextLine) // // Would be nice to clean this up so that the Line type with a @@ -201,48 +198,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest // }() - scanner := bufio.NewScanner(fp) - buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024) - scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024) - if opts.StartLineInclusive > nextLine.Num() { - for scanner.Scan() { - nextLine.Inc() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + numRequests := 0 - select { - case <-ctx.Done(): - return nextLine.Num(), false - default: - } - - if nextLine.Num() == opts.StartLineInclusive { - break - } - } - } - - limit := time.NewTicker(time.Second) - defer limit.Stop() - readCount := 0 + scanner := bufio.NewScanner(fp) + buf := make([]byte, 0, 128*1024*1024) + scanner.Buffer(buf, 128*1024*1024) for scanner.Scan() { - if opts.EndLineExclusive > 0 && nextLine.Num() == opts.EndLineExclusive { - break - } - - if readCount == opts.Limit { + if numRequests == opts.Limit { select { case <-ctx.Done(): - return nextLine.Num(), false - case <-limit.C: - readCount = 0 + return + case <-ticker.C: + numRequests = 0 } } select { case <-ctx.Done(): - return nextLine.Num(), false + return case questionCh <- qna.Question{Domain: trimWildcard(scanner.Text())}: nextLine.Inc() - readCount++ + numRequests++ } } @@ -250,35 +228,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest case <-ctx.Done(): case <-time.After(opts.Timeout + time.Second): } - return nextLine.Num(), true } -func rateMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) { - startTime := time.Now().Unix() - latestTime := startTime - latestCount := opts.StartLineInclusive - +func generatorMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) { metrics := time.NewTicker(opts.MetricsInterval) defer metrics.Stop() + startTime := time.Now().Unix() + prevTime := startTime + prevLine := int64(0) for { select { case <-ctx.Done(): return case <-metrics.C: - nextLineNum := nextLine.Num() + currLine := nextLine.Num() now := time.Now().Unix() - - currRate := float64(nextLineNum-latestCount) / float64(now-latestTime) - avgRate := float64(nextLineNum-opts.StartLineInclusive) / float64(now-startTime) - - str := fmt.Sprintf(" Current rate: %.1f sites/s\n", currRate) - str += fmt.Sprintf(" Average rate: %.1f sites/s\n", avgRate) - str += fmt.Sprintf(" Next line: %d\n", nextLineNum) + str := fmt.Sprintf(" Current rate: %.1f sites/s\n", float64(currLine-prevLine)/float64(now-prevTime)) + str += fmt.Sprintf(" Average rate: %.1f sites/s\n", float64(currLine)/float64(now-startTime)) + str += fmt.Sprintf(" Next line: %d", currLine) log.Printf("INFO: metrics@generator:\n\n%s\n\n", str) - latestCount = nextLineNum - latestTime = now + prevTime = now + prevLine = currLine } } } |