aboutsummaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go84
1 files changed, 28 insertions, 56 deletions
diff --git a/main.go b/main.go
index 3458e4d..79dde96 100644
--- a/main.go
+++ b/main.go
@@ -86,13 +86,9 @@ func main() {
}()
log.Printf("INFO: generating work\n")
- nextLine, readAll := workGenerator(ctx, opts, fp, questionCh)
- if !readAll {
- notice := fmt.Sprintf("only read up until line %d", nextLine)
- if opts.StartLineInclusive != 0 {
- notice += fmt.Sprintf(" (line %d relative to start)", nextLine-opts.StartLineInclusive)
- }
- log.Printf("NOTICE: %s\n", notice)
+ workGenerator(ctx, opts, fp, questionCh)
+ if err := ctx.Err(); err != nil {
+ log.Printf("WARNING: premature shutdown (context cancelled)\n")
}
}
@@ -105,6 +101,7 @@ func awaitHandler(ctx context.Context, cancel context.CancelFunc) {
case <-sigs:
case <-ctx.Done():
}
+
cancel()
}
@@ -182,7 +179,7 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna
}
}
-func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) (int64, bool) {
+func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) {
var wg sync.WaitGroup
defer wg.Wait()
@@ -193,7 +190,7 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
go func() {
wg.Add(1)
defer wg.Done()
- rateMetrics(cctx, opts, &nextLine)
+ generatorMetrics(cctx, opts, &nextLine)
//
// Would be nice to clean this up so that the Line type with a
@@ -201,48 +198,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
//
}()
- scanner := bufio.NewScanner(fp)
- buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024)
- scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024)
- if opts.StartLineInclusive > nextLine.Num() {
- for scanner.Scan() {
- nextLine.Inc()
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ numRequests := 0
- select {
- case <-ctx.Done():
- return nextLine.Num(), false
- default:
- }
-
- if nextLine.Num() == opts.StartLineInclusive {
- break
- }
- }
- }
-
- limit := time.NewTicker(time.Second)
- defer limit.Stop()
- readCount := 0
+ scanner := bufio.NewScanner(fp)
+ buf := make([]byte, 0, 128*1024*1024)
+ scanner.Buffer(buf, 128*1024*1024)
for scanner.Scan() {
- if opts.EndLineExclusive > 0 && nextLine.Num() == opts.EndLineExclusive {
- break
- }
-
- if readCount == opts.Limit {
+ if numRequests == opts.Limit {
select {
case <-ctx.Done():
- return nextLine.Num(), false
- case <-limit.C:
- readCount = 0
+ return
+ case <-ticker.C:
+ numRequests = 0
}
}
select {
case <-ctx.Done():
- return nextLine.Num(), false
+ return
case questionCh <- qna.Question{Domain: trimWildcard(scanner.Text())}:
nextLine.Inc()
- readCount++
+ numRequests++
}
}
@@ -250,35 +228,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
case <-ctx.Done():
case <-time.After(opts.Timeout + time.Second):
}
- return nextLine.Num(), true
}
-func rateMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) {
- startTime := time.Now().Unix()
- latestTime := startTime
- latestCount := opts.StartLineInclusive
-
+func generatorMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) {
metrics := time.NewTicker(opts.MetricsInterval)
defer metrics.Stop()
+ startTime := time.Now().Unix()
+ prevTime := startTime
+ prevLine := int64(0)
for {
select {
case <-ctx.Done():
return
case <-metrics.C:
- nextLineNum := nextLine.Num()
+ currLine := nextLine.Num()
now := time.Now().Unix()
-
- currRate := float64(nextLineNum-latestCount) / float64(now-latestTime)
- avgRate := float64(nextLineNum-opts.StartLineInclusive) / float64(now-startTime)
-
- str := fmt.Sprintf(" Current rate: %.1f sites/s\n", currRate)
- str += fmt.Sprintf(" Average rate: %.1f sites/s\n", avgRate)
- str += fmt.Sprintf(" Next line: %d\n", nextLineNum)
+ str := fmt.Sprintf(" Current rate: %.1f sites/s\n", float64(currLine-prevLine)/float64(now-prevTime))
+ str += fmt.Sprintf(" Average rate: %.1f sites/s\n", float64(currLine)/float64(now-startTime))
+ str += fmt.Sprintf(" Next line: %d", currLine)
log.Printf("INFO: metrics@generator:\n\n%s\n\n", str)
- latestCount = nextLineNum
- latestTime = now
+ prevTime = now
+ prevLine = currLine
}
}
}