diff options
-rw-r--r-- | collect.go | 42 | ||||
-rw-r--r-- | house_keeping.go | 80 |
2 files changed, 106 insertions, 16 deletions
@@ -9,10 +9,8 @@ import ( logger "log" "net/http" "os" - "os/signal" "strings" "sync" - "syscall" "time" "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" @@ -34,7 +32,6 @@ func collect(opts options) error { if err := json.Unmarshal(b, &md); err != nil { return err } - logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md))) var await sync.WaitGroup defer await.Wait() @@ -42,20 +39,13 @@ func collect(opts options) error { go func() { await.Add(1) defer await.Done() + handleSignals(ctx, cancel) - sigs := make(chan os.Signal, 1) - defer close(sigs) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - select { - case <-sigs: - cancel() - case <-ctx.Done(): - } - + // // Sometimes some worker in scanner.Fetcher isn't shutdown // properly despite the parent context (including getRanges) // being done. The below is an ugly hack to avoid hanging. + // wait := time.Second * 5 // TODO: 15s logger.Printf("INFO: about to exit, please wait %v...\n", wait) select { @@ -64,6 +54,14 @@ func collect(opts options) error { } }() + metricsCh := make(chan metrics) + defer close(metricsCh) + go func() { + await.Add(1) + defer await.Done() + handleMetrics(ctx, metricsCh, utils.Logs(md)) + }() + defer cancel() var wg sync.WaitGroup defer wg.Wait() @@ -108,6 +106,7 @@ func collect(opts options) error { }) if uint64(th.TreeSize) == sth.TreeSize { logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize) + metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} return } @@ -147,7 +146,7 @@ func collect(opts options) error { case <-cctx.Done(): if h.Sequence(curr) { c := h.TPop() - if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil { + if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) } } @@ -161,7 +160,7 @@ func collect(opts options) error { } c = h.TPop() - putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) + putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) if err != nil { cancel() logger.Printf("ERROR: %s: %v\n", *log.Description, err) @@ -196,6 +195,7 @@ func collect(opts options) error { } } }(log) + //break } logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") @@ -235,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { return sth, nil } -func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { +func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist @@ -318,6 +318,16 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc str return false, err } + // Output metrics + metricsCh <- metrics{ + Description: logDesc, + NumEntries: newTH.TreeSize - oldTH.TreeSize, + Timestamp: time.Now().Unix(), + Start: newTH.TreeSize, + End: int64(sth.TreeSize), + Done: uint64(newTH.TreeSize) == sth.TreeSize, + } + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize) return false, nil } diff --git a/house_keeping.go b/house_keeping.go new file mode 100644 index 0000000..670a95b --- /dev/null +++ b/house_keeping.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + logger "log" + "os" + "os/signal" + "syscall" + "time" + + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +type metrics struct { + Description string // Human-readable log name + NumEntries int64 // Number of entries persisted + Timestamp int64 // Time that first entry was persisted + Start int64 // Next index to start fetching from + End int64 // Exclusive end index to reach + Done bool // Worker is done + + avg float64 +} + +func (m metrics) String() string { + format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n" + if m.Done { + return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End) + } + return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End) +} + +func (m *metrics) update(other metrics) { + m.NumEntries += other.NumEntries + m.Start = other.Start + m.End = other.End + m.Done = other.Done + m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp)) +} + +func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) { + sum := make(map[string]metrics) + for _, log := range logs { + sum[*log.Description] = metrics{ + Description: *log.Description, + Timestamp: time.Now().Unix(), + } + } + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + case m := <-metricsCh: + s := sum[m.Description] + s.update(m) + sum[m.Description] = s + case <-ticker.C: + output := "" + for _, log := range logs { + output += sum[*log.Description].String() + } + logger.Printf("INFO: periodic status update\n\n%s\n\n", output) + } + } +} + +func handleSignals(ctx context.Context, cancel context.CancelFunc) { + sigs := make(chan os.Signal, 1) + defer close(sigs) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigs: + cancel() + case <-ctx.Done(): + } +} |