1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
package main
import (
"context"
"fmt"
logger "log"
"os"
"os/signal"
"syscall"
"time"
"gitlab.torproject.org/rgdd/ct/pkg/metadata"
)
type metrics struct {
Description string // Human-readable log name
NumEntries int64 // Number of entries persisted
Timestamp int64 // Time that first entry was persisted
Start int64 // Next index to start fetching from
End int64 // Exclusive end index to reach
Done bool // Worker is done
avg float64
}
func (m metrics) String() string {
format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n"
if m.Done {
return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End)
}
return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End)
}
func (m *metrics) update(other metrics) {
m.NumEntries += other.NumEntries
m.Start = other.Start
m.End = other.End
m.Done = other.Done
m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp))
}
func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) {
sum := make(map[string]metrics)
for _, log := range logs {
sum[*log.Description] = metrics{
Description: *log.Description,
Timestamp: time.Now().Unix(),
}
}
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
case m := <-metricsCh:
s := sum[m.Description]
s.update(m)
sum[m.Description] = s
case <-ticker.C:
output := ""
for _, log := range logs {
output += sum[*log.Description].String()
}
logger.Printf("INFO: periodic status update\n\n%s\n\n", output)
}
}
}
func handleSignals(ctx context.Context, cancel context.CancelFunc) {
sigs := make(chan os.Signal, 1)
defer close(sigs)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigs:
cancel()
case <-ctx.Done():
}
}
|