aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/options/options.go28
-rw-r--r--main.go84
2 files changed, 35 insertions, 77 deletions
diff --git a/internal/options/options.go b/internal/options/options.go
index 9a9a38c..e98d6eb 100644
--- a/internal/options/options.go
+++ b/internal/options/options.go
@@ -6,35 +6,21 @@ import (
)
type Options struct {
- // Input file
- InputFile string
- MaxFileBuffer int
- StartLineInclusive int64
- EndLineExclusive int64
-
- // Website visits
- NumWorkers int
- Timeout time.Duration
- MaxResponse int64
-
- // Health and metrics
- MetricsInterval time.Duration
+ InputFile string
+ NumWorkers int
Limit int
+ MaxResponse int64
+ Timeout time.Duration
+ MetricsInterval time.Duration
}
func Parse() (opts Options) {
flag.StringVar(&opts.InputFile, "i", "", "input file, one domain name per line")
- flag.IntVar(&opts.MaxFileBuffer, "b", 512, "max bytes to read from input file at once in MiB")
- flag.Int64Var(&opts.StartLineInclusive, "s", 0, "first line to read in input file, inclusive and zero-based index")
- flag.Int64Var(&opts.EndLineExclusive, "e", 0, "last line to read in input file, exclusive and zero-based; 0 to disable")
-
flag.IntVar(&opts.NumWorkers, "w", 32, "number of parallel workers")
- flag.DurationVar(&opts.Timeout, "t", 10*time.Second, "timeout for each website visit")
+ flag.IntVar(&opts.Limit, "l", 16, "rate-limit that kicks in before handing out work in requests/s")
flag.Int64Var(&opts.MaxResponse, "r", 128, "max response body size to accept in MiB")
-
+ flag.DurationVar(&opts.Timeout, "t", 10*time.Second, "timeout for each website visit")
flag.DurationVar(&opts.MetricsInterval, "m", 5*time.Second, "how often to emit metrics")
- flag.IntVar(&opts.Limit, "l", 16, "rate-limit that kicks in before handing out work in requests/s")
-
flag.Parse()
return
}
diff --git a/main.go b/main.go
index 3458e4d..79dde96 100644
--- a/main.go
+++ b/main.go
@@ -86,13 +86,9 @@ func main() {
}()
log.Printf("INFO: generating work\n")
- nextLine, readAll := workGenerator(ctx, opts, fp, questionCh)
- if !readAll {
- notice := fmt.Sprintf("only read up until line %d", nextLine)
- if opts.StartLineInclusive != 0 {
- notice += fmt.Sprintf(" (line %d relative to start)", nextLine-opts.StartLineInclusive)
- }
- log.Printf("NOTICE: %s\n", notice)
+ workGenerator(ctx, opts, fp, questionCh)
+ if err := ctx.Err(); err != nil {
+ log.Printf("WARNING: premature shutdown (context cancelled)\n")
}
}
@@ -105,6 +101,7 @@ func awaitHandler(ctx context.Context, cancel context.CancelFunc) {
case <-sigs:
case <-ctx.Done():
}
+
cancel()
}
@@ -182,7 +179,7 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna
}
}
-func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) (int64, bool) {
+func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) {
var wg sync.WaitGroup
defer wg.Wait()
@@ -193,7 +190,7 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
go func() {
wg.Add(1)
defer wg.Done()
- rateMetrics(cctx, opts, &nextLine)
+ generatorMetrics(cctx, opts, &nextLine)
//
// Would be nice to clean this up so that the Line type with a
@@ -201,48 +198,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
//
}()
- scanner := bufio.NewScanner(fp)
- buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024)
- scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024)
- if opts.StartLineInclusive > nextLine.Num() {
- for scanner.Scan() {
- nextLine.Inc()
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ numRequests := 0
- select {
- case <-ctx.Done():
- return nextLine.Num(), false
- default:
- }
-
- if nextLine.Num() == opts.StartLineInclusive {
- break
- }
- }
- }
-
- limit := time.NewTicker(time.Second)
- defer limit.Stop()
- readCount := 0
+ scanner := bufio.NewScanner(fp)
+ buf := make([]byte, 0, 128*1024*1024)
+ scanner.Buffer(buf, 128*1024*1024)
for scanner.Scan() {
- if opts.EndLineExclusive > 0 && nextLine.Num() == opts.EndLineExclusive {
- break
- }
-
- if readCount == opts.Limit {
+ if numRequests == opts.Limit {
select {
case <-ctx.Done():
- return nextLine.Num(), false
- case <-limit.C:
- readCount = 0
+ return
+ case <-ticker.C:
+ numRequests = 0
}
}
select {
case <-ctx.Done():
- return nextLine.Num(), false
+ return
case questionCh <- qna.Question{Domain: trimWildcard(scanner.Text())}:
nextLine.Inc()
- readCount++
+ numRequests++
}
}
@@ -250,35 +228,29 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
case <-ctx.Done():
case <-time.After(opts.Timeout + time.Second):
}
- return nextLine.Num(), true
}
-func rateMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) {
- startTime := time.Now().Unix()
- latestTime := startTime
- latestCount := opts.StartLineInclusive
-
+func generatorMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) {
metrics := time.NewTicker(opts.MetricsInterval)
defer metrics.Stop()
+ startTime := time.Now().Unix()
+ prevTime := startTime
+ prevLine := int64(0)
for {
select {
case <-ctx.Done():
return
case <-metrics.C:
- nextLineNum := nextLine.Num()
+ currLine := nextLine.Num()
now := time.Now().Unix()
-
- currRate := float64(nextLineNum-latestCount) / float64(now-latestTime)
- avgRate := float64(nextLineNum-opts.StartLineInclusive) / float64(now-startTime)
-
- str := fmt.Sprintf(" Current rate: %.1f sites/s\n", currRate)
- str += fmt.Sprintf(" Average rate: %.1f sites/s\n", avgRate)
- str += fmt.Sprintf(" Next line: %d\n", nextLineNum)
+ str := fmt.Sprintf(" Current rate: %.1f sites/s\n", float64(currLine-prevLine)/float64(now-prevTime))
+ str += fmt.Sprintf(" Average rate: %.1f sites/s\n", float64(currLine)/float64(now-startTime))
+ str += fmt.Sprintf(" Next line: %d", currLine)
log.Printf("INFO: metrics@generator:\n\n%s\n\n", str)
- latestCount = nextLineNum
- latestTime = now
+ prevTime = now
+ prevLine = currLine
}
}
}