aboutsummaryrefslogtreecommitdiff
path: root/utils_housekeep.go
blob: eb39a7a542075facf5d2c7aa3d013a256f6a54bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"gitlab.torproject.org/rgdd/ct/pkg/metadata"
)

type metrics struct {
	Description string // Human-readable log name
	NumEntries  int64  // Number of entries persisted
	Timestamp   int64  // Time that first entry was persisted
	Start       int64  // Next index to start fetching from
	End         int64  // Exclusive end index to reach
	Done        bool   // Worker is done

	avg float64
}

func (m metrics) String() string {
	format := "  %32s  |  %6.1f entries/s  |  Estimated done in %6.2f hours  |  Working on [%d, %d)\n"
	if m.Done {
		return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End)
	}
	return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End)
}

func (m *metrics) update(other metrics) {
	m.NumEntries += other.NumEntries
	m.Start = other.Start
	m.End = other.End
	m.Done = other.Done
	m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp))
}

func handleMetrics(ctx context.Context, opts options, logs []metadata.Log, metricsCh chan metrics) {
	sum := make(map[string]metrics)
	for _, log := range logs {
		sum[*log.Description] = metrics{
			Description: *log.Description,
			Timestamp:   time.Now().Unix(),
		}
	}

	output := func(desc string) {
		str := ""
		for _, log := range logs {
			str += sum[*log.Description].String()
		}
		fmt.Fprintf(os.Stderr, "INFO: %s\n\n%s\n\n", desc, str)
	}
	defer output("status update before shutdown")
	defer time.Sleep(500 * time.Millisecond)

	ticker := time.NewTicker(opts.MetricsInterval)
	defer ticker.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case m := <-metricsCh:
			s := sum[m.Description]
			s.update(m)
			sum[m.Description] = s
		case <-ticker.C:
			output("periodic status update")
		}
	}
}

func handleSignals(ctx context.Context, cancel context.CancelFunc) {
	sigs := make(chan os.Signal, 1)
	defer close(sigs)

	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-sigs:
		cancel()
	case <-ctx.Done():
	}
}