From 89e2b5f0bbbf9b2e3e6bc7f30a12e28e7109d82b Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sun, 2 Apr 2023 13:47:57 +0200 Subject: Fix bug where a few input lines are skipped --- internal/line/line.go | 22 +++++++++++++ main.go | 86 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 78 insertions(+), 30 deletions(-) create mode 100644 internal/line/line.go diff --git a/internal/line/line.go b/internal/line/line.go new file mode 100644 index 0000000..02dbbcc --- /dev/null +++ b/internal/line/line.go @@ -0,0 +1,22 @@ +package line + +import "sync" + +type Line struct { + sync.Mutex + num int64 +} + +func (l *Line) Inc() { + l.Lock() + defer l.Unlock() + + l.num++ +} + +func (l *Line) Num() int64 { + l.Lock() + defer l.Unlock() + + return l.num +} diff --git a/main.go b/main.go index dfed7eb..3458e4d 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "syscall" "time" + "git.cs.kau.se/rasmoste/onion-grab/internal/line" "git.cs.kau.se/rasmoste/onion-grab/internal/onionloc" "git.cs.kau.se/rasmoste/onion-grab/internal/options" "git.cs.kau.se/rasmoste/onion-grab/internal/qna" @@ -182,48 +183,55 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna } func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) (int64, bool) { + var wg sync.WaitGroup + defer wg.Wait() + + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + var nextLine line.Line + go func() { + wg.Add(1) + defer wg.Done() + rateMetrics(cctx, opts, &nextLine) + + // + // Would be nice to clean this up so that the Line type with a + // mutex can be eliminated; and with all metrics in one place. + // + }() + scanner := bufio.NewScanner(fp) buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024) scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024) - - // roll-up to the requested start line - nextLine := int64(0) - if opts.StartLineInclusive > nextLine { + if opts.StartLineInclusive > nextLine.Num() { for scanner.Scan() { - nextLine++ + nextLine.Inc() + select { case <-ctx.Done(): - return nextLine, false + return nextLine.Num(), false default: } - if nextLine == opts.StartLineInclusive { + if nextLine.Num() == opts.StartLineInclusive { break } } } - // initialize metrics - metrics := time.NewTicker(opts.MetricsInterval) - defer metrics.Stop() - startTime := time.Now().Unix() - latestTime := startTime - latestCount := opts.StartLineInclusive - - // initialize rate-limit limit := time.NewTicker(time.Second) defer limit.Stop() readCount := 0 - for scanner.Scan() { - if opts.EndLineExclusive > 0 && nextLine == opts.EndLineExclusive { + if opts.EndLineExclusive > 0 && nextLine.Num() == opts.EndLineExclusive { break } if readCount == opts.Limit { select { case <-ctx.Done(): - return nextLine, false + return nextLine.Num(), false case <-limit.C: readCount = 0 } @@ -231,30 +239,48 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest select { case <-ctx.Done(): - return nextLine, false + return nextLine.Num(), false case questionCh <- qna.Question{Domain: trimWildcard(scanner.Text())}: - nextLine++ + nextLine.Inc() readCount++ + } + } + + select { + case <-ctx.Done(): + case <-time.After(opts.Timeout + time.Second): + } + return nextLine.Num(), true +} + +func rateMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) { + startTime := time.Now().Unix() + latestTime := startTime + latestCount := opts.StartLineInclusive + + metrics := time.NewTicker(opts.MetricsInterval) + defer metrics.Stop() + + for { + select { + case <-ctx.Done(): + return case <-metrics.C: + nextLineNum := nextLine.Num() now := time.Now().Unix() - currRate := float64(nextLine-latestCount) / float64(now-latestTime) - avgRate := float64(nextLine-opts.StartLineInclusive) / float64(now-startTime) + + currRate := float64(nextLineNum-latestCount) / float64(now-latestTime) + avgRate := float64(nextLineNum-opts.StartLineInclusive) / float64(now-startTime) str := fmt.Sprintf(" Current rate: %.1f sites/s\n", currRate) str += fmt.Sprintf(" Average rate: %.1f sites/s\n", avgRate) - str += fmt.Sprintf(" Next line: %d\n", nextLine) + str += fmt.Sprintf(" Next line: %d\n", nextLineNum) log.Printf("INFO: metrics@generator:\n\n%s\n\n", str) - latestCount = nextLine + latestCount = nextLineNum latestTime = now } } - - select { - case <-ctx.Done(): - case <-time.After(opts.Timeout + time.Second): - } - return nextLine, true } func trimWildcard(san string) string { -- cgit v1.2.3