package main import ( "context" "fmt" "os" "os/signal" "syscall" "time" "gitlab.torproject.org/rgdd/ct/pkg/metadata" ) type metrics struct { Description string // Human-readable log name NumEntries int64 // Number of entries persisted Timestamp int64 // Time that first entry was persisted Start int64 // Next index to start fetching from End int64 // Exclusive end index to reach Done bool // Worker is done avg float64 } func (m metrics) String() string { format := " %32s | %6.1f entries/s | Estimated done in %6.2f hours | Working on [%d, %d)\n" if m.Done { return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End) } return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End) } func (m *metrics) update(other metrics) { m.NumEntries += other.NumEntries m.Start = other.Start m.End = other.End m.Done = other.Done m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp)) } func handleMetrics(ctx context.Context, opts options, logs []metadata.Log, metricsCh chan metrics) { sum := make(map[string]metrics) for _, log := range logs { sum[*log.Description] = metrics{ Description: *log.Description, Timestamp: time.Now().Unix(), } } output := func(desc string) { str := "" for _, log := range logs { str += sum[*log.Description].String() } fmt.Fprintf(os.Stderr, "INFO: %s\n\n%s\n\n", desc, str) } defer output("status update before shutdown") defer time.Sleep(500 * time.Millisecond) ticker := time.NewTicker(opts.MetricsInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case m := <-metricsCh: s := sum[m.Description] s.update(m) sum[m.Description] = s case <-ticker.C: output("periodic status update") } } } func handleSignals(ctx context.Context, cancel context.CancelFunc) { sigs := make(chan os.Signal, 1) defer close(sigs) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: cancel() case <-ctx.Done(): } }