From 4a36fbdc9972c5b6101d6f3bc9bd0a77c8c83fd4 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Thu, 30 Mar 2023 16:28:19 +0200 Subject: Add input file read rate-limit --- internal/options/options.go | 2 ++ main.go | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/options/options.go b/internal/options/options.go index d01ba83..9e1a653 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -21,6 +21,7 @@ type Options struct { CheckerDomain string CheckerInterval time.Duration MetricsInterval time.Duration + Limit int } func Parse() (opts Options) { @@ -36,6 +37,7 @@ func Parse() (opts Options) { flag.StringVar(&opts.CheckerDomain, "c", "", "domain with onion location for setup santity-checks") flag.DurationVar(&opts.CheckerInterval, "C", 10*time.Second, "how often to to run checker") flag.DurationVar(&opts.MetricsInterval, "m", 5*time.Second, "how often to emit metrics") + flag.IntVar(&opts.Limit, "l", 10, "rate-limit that kicks in before feeding workers in reads/s") flag.Parse() return diff --git a/main.go b/main.go index 55e7989..072924a 100644 --- a/main.go +++ b/main.go @@ -217,6 +217,7 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024) scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024) + // roll-up to the requested start line nextLine := int64(0) if opts.StartLineInclusive > nextLine { for scanner.Scan() { @@ -233,17 +234,32 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest } } + // initialize ticker ticker := time.NewTicker(opts.MetricsInterval) defer ticker.Stop() - startTime := time.Now().Unix() latestTime := startTime latestCount := opts.StartLineInclusive + + // initialize rate-limit + limit := time.NewTicker(time.Second) + defer limit.Stop() + readCount := 0 + for scanner.Scan() { if opts.EndLineExclusive > 0 && nextLine == opts.EndLineExclusive { break } + if readCount == opts.Limit { + select { + case <-ctx.Done(): + return nextLine, false + case <-limit.C: + readCount = 0 + } + } + select { case <-ctx.Done(): return nextLine, false @@ -259,6 +275,7 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest latestTime = now case questionCh <- qna.Question{Domain: scanner.Text()}: nextLine++ + readCount++ } } -- cgit v1.2.3