aboutsummaryrefslogtreecommitdiff
path: root/utils_housekeep.go
diff options
context:
space:
mode:
Diffstat (limited to 'utils_housekeep.go')
-rw-r--r--utils_housekeep.go80
1 files changed, 80 insertions, 0 deletions
diff --git a/utils_housekeep.go b/utils_housekeep.go
new file mode 100644
index 0000000..670a95b
--- /dev/null
+++ b/utils_housekeep.go
@@ -0,0 +1,80 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ logger "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "gitlab.torproject.org/rgdd/ct/pkg/metadata"
+)
+
+type metrics struct {
+ Description string // Human-readable log name
+ NumEntries int64 // Number of entries persisted
+ Timestamp int64 // Time that first entry was persisted
+ Start int64 // Next index to start fetching from
+ End int64 // Exclusive end index to reach
+ Done bool // Worker is done
+
+ avg float64
+}
+
+func (m metrics) String() string {
+ format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n"
+ if m.Done {
+ return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End)
+ }
+ return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End)
+}
+
+func (m *metrics) update(other metrics) {
+ m.NumEntries += other.NumEntries
+ m.Start = other.Start
+ m.End = other.End
+ m.Done = other.Done
+ m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp))
+}
+
+func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) {
+ sum := make(map[string]metrics)
+ for _, log := range logs {
+ sum[*log.Description] = metrics{
+ Description: *log.Description,
+ Timestamp: time.Now().Unix(),
+ }
+ }
+
+ ticker := time.NewTicker(15 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ case m := <-metricsCh:
+ s := sum[m.Description]
+ s.update(m)
+ sum[m.Description] = s
+ case <-ticker.C:
+ output := ""
+ for _, log := range logs {
+ output += sum[*log.Description].String()
+ }
+ logger.Printf("INFO: periodic status update\n\n%s\n\n", output)
+ }
+ }
+}
+
+func handleSignals(ctx context.Context, cancel context.CancelFunc) {
+ sigs := make(chan os.Signal, 1)
+ defer close(sigs)
+
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-sigs:
+ cancel()
+ case <-ctx.Done():
+ }
+}