From b8b8e05c833dfb4c4191c8c1391e02d07e0e744f Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 18 Mar 2023 13:20:27 +0100 Subject: renaming files and moving around --- utils_housekeep.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 utils_housekeep.go (limited to 'utils_housekeep.go') diff --git a/utils_housekeep.go b/utils_housekeep.go new file mode 100644 index 0000000..670a95b --- /dev/null +++ b/utils_housekeep.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + logger "log" + "os" + "os/signal" + "syscall" + "time" + + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +type metrics struct { + Description string // Human-readable log name + NumEntries int64 // Number of entries persisted + Timestamp int64 // Time that first entry was persisted + Start int64 // Next index to start fetching from + End int64 // Exclusive end index to reach + Done bool // Worker is done + + avg float64 +} + +func (m metrics) String() string { + format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n" + if m.Done { + return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End) + } + return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End) +} + +func (m *metrics) update(other metrics) { + m.NumEntries += other.NumEntries + m.Start = other.Start + m.End = other.End + m.Done = other.Done + m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp)) +} + +func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) { + sum := make(map[string]metrics) + for _, log := range logs { + sum[*log.Description] = metrics{ + Description: *log.Description, + Timestamp: time.Now().Unix(), + } + } + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + case m := <-metricsCh: + s := sum[m.Description] + s.update(m) + sum[m.Description] = s + case <-ticker.C: + output := "" + for _, log := range logs { + output += sum[*log.Description].String() + } + logger.Printf("INFO: periodic status update\n\n%s\n\n", output) + } + } +} + +func handleSignals(ctx context.Context, cancel context.CancelFunc) { + sigs := make(chan os.Signal, 1) + defer close(sigs) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigs: + cancel() + case <-ctx.Done(): + } +} -- cgit v1.2.3