aboutsummaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go86
1 files changed, 56 insertions, 30 deletions
diff --git a/main.go b/main.go
index dfed7eb..3458e4d 100644
--- a/main.go
+++ b/main.go
@@ -22,6 +22,7 @@ import (
"syscall"
"time"
+ "git.cs.kau.se/rasmoste/onion-grab/internal/line"
"git.cs.kau.se/rasmoste/onion-grab/internal/onionloc"
"git.cs.kau.se/rasmoste/onion-grab/internal/options"
"git.cs.kau.se/rasmoste/onion-grab/internal/qna"
@@ -182,48 +183,55 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna
}
func workGenerator(ctx context.Context, opts options.Options, fp *os.File, questionCh chan qna.Question) (int64, bool) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ cctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ var nextLine line.Line
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ rateMetrics(cctx, opts, &nextLine)
+
+ //
+ // Would be nice to clean this up so that the Line type with a
+ // mutex can be eliminated; and with all metrics in one place.
+ //
+ }()
+
scanner := bufio.NewScanner(fp)
buf := make([]byte, 0, opts.MaxFileBuffer*1024*1024)
scanner.Buffer(buf, opts.MaxFileBuffer*1024*1024)
-
- // roll-up to the requested start line
- nextLine := int64(0)
- if opts.StartLineInclusive > nextLine {
+ if opts.StartLineInclusive > nextLine.Num() {
for scanner.Scan() {
- nextLine++
+ nextLine.Inc()
+
select {
case <-ctx.Done():
- return nextLine, false
+ return nextLine.Num(), false
default:
}
- if nextLine == opts.StartLineInclusive {
+ if nextLine.Num() == opts.StartLineInclusive {
break
}
}
}
- // initialize metrics
- metrics := time.NewTicker(opts.MetricsInterval)
- defer metrics.Stop()
- startTime := time.Now().Unix()
- latestTime := startTime
- latestCount := opts.StartLineInclusive
-
- // initialize rate-limit
limit := time.NewTicker(time.Second)
defer limit.Stop()
readCount := 0
-
for scanner.Scan() {
- if opts.EndLineExclusive > 0 && nextLine == opts.EndLineExclusive {
+ if opts.EndLineExclusive > 0 && nextLine.Num() == opts.EndLineExclusive {
break
}
if readCount == opts.Limit {
select {
case <-ctx.Done():
- return nextLine, false
+ return nextLine.Num(), false
case <-limit.C:
readCount = 0
}
@@ -231,30 +239,48 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest
select {
case <-ctx.Done():
- return nextLine, false
+ return nextLine.Num(), false
case questionCh <- qna.Question{Domain: trimWildcard(scanner.Text())}:
- nextLine++
+ nextLine.Inc()
readCount++
+ }
+ }
+
+ select {
+ case <-ctx.Done():
+ case <-time.After(opts.Timeout + time.Second):
+ }
+ return nextLine.Num(), true
+}
+
+func rateMetrics(ctx context.Context, opts options.Options, nextLine *line.Line) {
+ startTime := time.Now().Unix()
+ latestTime := startTime
+ latestCount := opts.StartLineInclusive
+
+ metrics := time.NewTicker(opts.MetricsInterval)
+ defer metrics.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
case <-metrics.C:
+ nextLineNum := nextLine.Num()
now := time.Now().Unix()
- currRate := float64(nextLine-latestCount) / float64(now-latestTime)
- avgRate := float64(nextLine-opts.StartLineInclusive) / float64(now-startTime)
+
+ currRate := float64(nextLineNum-latestCount) / float64(now-latestTime)
+ avgRate := float64(nextLineNum-opts.StartLineInclusive) / float64(now-startTime)
str := fmt.Sprintf(" Current rate: %.1f sites/s\n", currRate)
str += fmt.Sprintf(" Average rate: %.1f sites/s\n", avgRate)
- str += fmt.Sprintf(" Next line: %d\n", nextLine)
+ str += fmt.Sprintf(" Next line: %d\n", nextLineNum)
log.Printf("INFO: metrics@generator:\n\n%s\n\n", str)
- latestCount = nextLine
+ latestCount = nextLineNum
latestTime = now
}
}
-
- select {
- case <-ctx.Done():
- case <-time.After(opts.Timeout + time.Second):
- }
- return nextLine, true
}
func trimWildcard(san string) string {