aboutsummaryrefslogtreecommitdiff
path: root/utils_housekeep.go
blob: 670a95b4ed5b56461318a922dfd0cc404baf81cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package main

import (
	"context"
	"fmt"
	logger "log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"gitlab.torproject.org/rgdd/ct/pkg/metadata"
)

type metrics struct {
	Description string // Human-readable log name
	NumEntries  int64  // Number of entries persisted
	Timestamp   int64  // Time that first entry was persisted
	Start       int64  // Next index to start fetching from
	End         int64  // Exclusive end index to reach
	Done        bool   // Worker is done

	avg float64
}

func (m metrics) String() string {
	format := "  %32s  |  %6.1f entries/s  |  Estimated done in %6.1f hours  |  Working on [%d, %d)\n"
	if m.Done {
		return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End)
	}
	return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End)
}

func (m *metrics) update(other metrics) {
	m.NumEntries += other.NumEntries
	m.Start = other.Start
	m.End = other.End
	m.Done = other.Done
	m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp))
}

func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) {
	sum := make(map[string]metrics)
	for _, log := range logs {
		sum[*log.Description] = metrics{
			Description: *log.Description,
			Timestamp:   time.Now().Unix(),
		}
	}

	ticker := time.NewTicker(15 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
		case m := <-metricsCh:
			s := sum[m.Description]
			s.update(m)
			sum[m.Description] = s
		case <-ticker.C:
			output := ""
			for _, log := range logs {
				output += sum[*log.Description].String()
			}
			logger.Printf("INFO: periodic status update\n\n%s\n\n", output)
		}
	}
}

func handleSignals(ctx context.Context, cancel context.CancelFunc) {
	sigs := make(chan os.Signal, 1)
	defer close(sigs)

	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-sigs:
		cancel()
	case <-ctx.Done():
	}
}