From a18ae2877d3bb084df9f505f6c11dae7a66d341e Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 18 Mar 2023 11:53:50 +0100 Subject: more wip collect, metrics --- collect.go | 42 ++++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 16 deletions(-) (limited to 'collect.go') diff --git a/collect.go b/collect.go index d60554e..d3eaae3 100644 --- a/collect.go +++ b/collect.go @@ -9,10 +9,8 @@ import ( logger "log" "net/http" "os" - "os/signal" "strings" "sync" - "syscall" "time" "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" @@ -34,7 +32,6 @@ func collect(opts options) error { if err := json.Unmarshal(b, &md); err != nil { return err } - logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md))) var await sync.WaitGroup defer await.Wait() @@ -42,20 +39,13 @@ func collect(opts options) error { go func() { await.Add(1) defer await.Done() + handleSignals(ctx, cancel) - sigs := make(chan os.Signal, 1) - defer close(sigs) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - select { - case <-sigs: - cancel() - case <-ctx.Done(): - } - + // // Sometimes some worker in scanner.Fetcher isn't shutdown // properly despite the parent context (including getRanges) // being done. The below is an ugly hack to avoid hanging. + // wait := time.Second * 5 // TODO: 15s logger.Printf("INFO: about to exit, please wait %v...\n", wait) select { @@ -64,6 +54,14 @@ func collect(opts options) error { } }() + metricsCh := make(chan metrics) + defer close(metricsCh) + go func() { + await.Add(1) + defer await.Done() + handleMetrics(ctx, metricsCh, utils.Logs(md)) + }() + defer cancel() var wg sync.WaitGroup defer wg.Wait() @@ -108,6 +106,7 @@ func collect(opts options) error { }) if uint64(th.TreeSize) == sth.TreeSize { logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize) + metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} return } @@ -147,7 +146,7 @@ func collect(opts options) error { case <-cctx.Done(): if h.Sequence(curr) { c := h.TPop() - if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil { + if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) } } @@ -161,7 +160,7 @@ func collect(opts options) error { } c = h.TPop() - putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) + putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) if err != nil { cancel() logger.Printf("ERROR: %s: %v\n", *log.Description, err) @@ -196,6 +195,7 @@ func collect(opts options) error { } } }(log) + //break } logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") @@ -235,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { return sth, nil } -func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { +func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist @@ -318,6 +318,16 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc str return false, err } + // Output metrics + metricsCh <- metrics{ + Description: logDesc, + NumEntries: newTH.TreeSize - oldTH.TreeSize, + Timestamp: time.Now().Unix(), + Start: newTH.TreeSize, + End: int64(sth.TreeSize), + Done: uint64(newTH.TreeSize) == sth.TreeSize, + } + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize) return false, nil } -- cgit v1.2.3