From b0a9fe6d718947863058d2a3acb884cabd831381 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 25 Mar 2023 15:26:33 +0100 Subject: Improve health and metrics --- internal/options/options.go | 4 ++ main.go | 105 +++++++++++++++++++++++++++++++------------- 2 files changed, 78 insertions(+), 31 deletions(-) diff --git a/internal/options/options.go b/internal/options/options.go index c988713..a567b0e 100644 --- a/internal/options/options.go +++ b/internal/options/options.go @@ -17,6 +17,8 @@ type Options struct { MaxResponse int64 // Health and metrics + CheckerDomain string + CheckerInterval time.Duration MetricsInterval time.Duration } @@ -29,6 +31,8 @@ func Parse() (opts Options) { flag.DurationVar(&opts.Timeout, "t", 10*time.Second, "timeout for each website visit") flag.Int64Var(&opts.MaxResponse, "r", 128, "max response body size to accept in MiB") + flag.StringVar(&opts.CheckerDomain, "c", "rgdd.se", "domain with onion location for setup santity-checks") + flag.DurationVar(&opts.CheckerInterval, "C", 10*time.Second, "how often to to run checker") flag.DurationVar(&opts.MetricsInterval, "m", 5*time.Second, "how often to emit metrics") flag.Parse() diff --git a/main.go b/main.go index f04ea17..146a62e 100644 --- a/main.go +++ b/main.go @@ -62,6 +62,15 @@ func main() { await(ctx, cancel) }() + if opts.CheckerDomain != "" { + log.Printf("INFO: starting checker\n") + go func() { + wg.Add(1) + defer wg.Done() + checker(ctx, cancel, &opts, cli) + }() + } + log.Printf("INFO: starting %d workers\n", opts.NumWorkers) for i := 0; i < opts.NumWorkers; i++ { go func() { @@ -101,42 +110,65 @@ func await(ctx context.Context, cancel context.CancelFunc) { cancel() } +func checker(ctx context.Context, cancel context.CancelFunc, opts *options.Options, cli *http.Client) { + question := qna.Question{opts.CheckerDomain} + answerCh := make(chan qna.Answer, 1) + defer close(answerCh) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(opts.CheckerInterval): + work(ctx, opts, cli, question, answerCh) + answer := <-answerCh + if answer.HTTP == "" && answer.HTML == "" { + log.Printf("ERROR: checker expected onion for %+v", answer) + cancel() + return + } + } + } +} + func workHandler(ctx context.Context, opts options.Options, cli *http.Client, questionCh chan qna.Question, answerCh chan qna.Answer) { for { select { case <-ctx.Done(): return case question := <-questionCh: - func() { - cctx, cancel := context.WithTimeout(ctx, opts.Timeout) - defer cancel() + work(ctx, &opts, cli, question, answerCh) + } + } +} - req, err := http.NewRequestWithContext(cctx, http.MethodGet, "https://"+question.Domain, nil) - if err != nil { - return - } +func work(ctx context.Context, opts *options.Options, cli *http.Client, question qna.Question, answerCh chan qna.Answer) { + cctx, cancel := context.WithTimeout(ctx, opts.Timeout) + defer cancel() - answer := qna.Answer{Domain: question.Domain} - rsp, err := cli.Do(req) - if err != nil { - answerCh <- answer - return - } - defer rsp.Body.Close() - answer.OK = true + req, err := http.NewRequestWithContext(cctx, http.MethodGet, "https://"+question.Domain, nil) + if err != nil { + return + } - onion, ok := onionloc.HTTP(rsp) - if ok { - answer.HTTP = onion - } - onion, ok = onionloc.HTML(rsp) - if ok { - answer.HTML = onion - } - answerCh <- answer - }() - } + answer := qna.Answer{Domain: question.Domain} + rsp, err := cli.Do(req) + if err != nil { + answerCh <- answer + return } + defer rsp.Body.Close() + answer.OK = true + + onion, ok := onionloc.HTTP(rsp) + if ok { + answer.HTTP = onion + } + onion, ok = onionloc.HTML(rsp) + if ok { + answer.HTML = onion + } + answerCh <- answer } func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna.Answer) { @@ -144,7 +176,7 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna numOnions := 0 numAll := 0 output := func() { - log.Printf("INFO: %d/%d connected, %d sites configured Onion-Location\n", numConnect, numAll, numOnions) + log.Printf("SUMMARY: %d/%d connected, %d sites configured Onion-Location\n", numConnect, numAll, numOnions) } handleAnswer := func(a qna.Answer) { @@ -160,8 +192,6 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna } } - ticker := time.NewTicker(opts.MetricsInterval) - defer ticker.Stop() defer output() for { select { @@ -177,8 +207,6 @@ func workAggregator(ctx context.Context, opts options.Options, answerCh chan qna } case a := <-answerCh: handleAnswer(a) - case <-ticker.C: - output() } } } @@ -203,10 +231,25 @@ func workGenerator(ctx context.Context, opts options.Options, fp *os.File, quest nextLine++ } + ticker := time.NewTicker(opts.MetricsInterval) + defer ticker.Stop() + + startTime := time.Now().Unix() + latestTime := startTime + latestCount := opts.NextLine for scanner.Scan() { select { case <-ctx.Done(): return nextLine, false + case <-ticker.C: + now := time.Now().Unix() + log.Printf("INFO: currently %.1f sites/s, %.1f sites/s since start\n", + float64(nextLine-opts.NextLine)/float64(now-startTime), + float64(nextLine-latestCount)/float64(now-latestTime), + ) + + latestCount = nextLine + latestTime = now case questionCh <- qna.Question{Domain: scanner.Text()}: nextLine++ } -- cgit v1.2.3