package main import ( "bufio" "context" "fmt" "log" "net/http" "os" "os/signal" "sync" "syscall" "time" "git.cs.kau.se/rasmoste/find-onion/internal/onionloc" "git.cs.kau.se/rasmoste/find-onion/internal/options" "git.cs.kau.se/rasmoste/find-onion/internal/qna" ) func main() { opts := options.Parse() cli := &http.Client{ Transport: &http.Transport{ DisableKeepAlives: true, MaxResponseHeaderBytes: opts.MaxResponse, }, } questionCh := make(chan qna.Question, 2*opts.NumWorkers) defer close(questionCh) answerCh := make(chan qna.Answer, 2*opts.NumWorkers) defer close(answerCh) var wg sync.WaitGroup defer wg.Wait() ctx, cancel := context.WithCancel(context.Background()) log.Printf("INFO: starting await handler, ctrl+C to exit\n") go func() { wg.Add(1) defer wg.Done() await(ctx, cancel) }() log.Printf("INFO: starting %d workers\n", opts.NumWorkers) for i := 0; i < opts.NumWorkers; i++ { go func() { wg.Add(1) defer wg.Done() workHandler(ctx, opts, cli, questionCh, answerCh) }() } log.Printf("INFO: starting work aggregator\n") go func() { wg.Add(1) defer wg.Done() workAggregator(ctx, cancel, opts, answerCh) }() log.Printf("INFO: generating work\n") workGenerator(ctx, cancel, opts, questionCh) time.Sleep(time.Second) defer cancel() defer time.Sleep(2 * opts.Timeout) defer log.Printf("INFO: about to exit in %v", 2*opts.Timeout) for { select { case <-ctx.Done(): log.Printf("INFO: context cancelled") return case <-time.After(time.Second): } numMessages := len(questionCh) + len(answerCh) if numMessages == 0 { return } log.Printf("DEBUG: waiting for %d messages to be processed before exit", numMessages) } } func await(ctx context.Context, cancel context.CancelFunc) { sigs := make(chan os.Signal, 1) defer close(sigs) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: cancel() case <-ctx.Done(): } } func workHandler(ctx context.Context, opts options.Options, cli *http.Client, questionCh chan qna.Question, answerCh chan qna.Answer) { for { select { case <-ctx.Done(): return case question := <-questionCh: func() { cctx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() req, err := http.NewRequestWithContext(cctx, http.MethodGet, "https://"+question.Domain, nil) if err != nil { return } rsp, err := cli.Do(req) if err != nil { answerCh <- qna.Answer{question.Domain, false, false, false, ""} return } defer rsp.Body.Close() v, ok := onionloc.HTTP(rsp) if ok { answerCh <- qna.Answer{question.Domain, true, true, false, v} return } v, ok = onionloc.HTML(rsp) if ok { answerCh <- qna.Answer{question.Domain, true, false, true, v} return } answerCh <- qna.Answer{question.Domain, true, false, false, ""} }() } } } func workAggregator(ctx context.Context, _ context.CancelFunc, opts options.Options, answerCh chan qna.Answer) { ticker := time.NewTicker(opts.MetricsInterval) defer ticker.Stop() numConnected := 0 numOnionLocation := 0 numVisits := 0 output := func() { log.Printf("INFO: %d/%d connected, %d matched\n", numConnected, numVisits, numOnionLocation) } defer output() for { select { case <-ctx.Done(): return case a := <-answerCh: numVisits += 1 if !a.OK { continue } numConnected += 1 if a.HTTP || a.HTML { numOnionLocation += 1 fmt.Printf("http:%v html:%v domain:%s onion:%s \n", a.HTTP, a.HTML, a.Domain, a.Onion) } case <-ticker.C: output() } } } func workGenerator(ctx context.Context, cancel context.CancelFunc, opts options.Options, questionCh chan qna.Question) { fp, err := os.OpenFile(opts.InputFile, os.O_RDONLY, 0644) if err != nil { log.Printf("ERROR: %v", err) cancel() return } defer fp.Close() scanner := bufio.NewScanner(fp) max := 2 * 256 * opts.NumWorkers buf := make([]byte, 0, max) scanner.Buffer(buf, max) // TODO: track which line we would have to start from to be sure that // we're not missing any domains on ctrl+C, OK if we go back too much? for scanner.Scan() { select { case <-ctx.Done(): return default: } for { if len(questionCh) < cap(questionCh) { questionCh <- qna.Question{Domain: scanner.Text()} break } select { case <-ctx.Done(): return case <-time.After(time.Second): continue } } } }