aboutsummaryrefslogtreecommitdiff
path: root/collect.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 11:53:50 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 11:53:50 +0100
commita18ae2877d3bb084df9f505f6c11dae7a66d341e (patch)
tree93f92835e4d0d9b7f13f9e07e018bfad2786684a /collect.go
parent10f7eb048a0cba6104b52027bff3b6f50db2dab9 (diff)
more wip collect, metrics
Diffstat (limited to 'collect.go')
-rw-r--r--collect.go42
1 files changed, 26 insertions, 16 deletions
diff --git a/collect.go b/collect.go
index d60554e..d3eaae3 100644
--- a/collect.go
+++ b/collect.go
@@ -9,10 +9,8 @@ import (
logger "log"
"net/http"
"os"
- "os/signal"
"strings"
"sync"
- "syscall"
"time"
"git.cs.kau.se/rasmoste/ct-sans/internal/chunk"
@@ -34,7 +32,6 @@ func collect(opts options) error {
if err := json.Unmarshal(b, &md); err != nil {
return err
}
- logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md)))
var await sync.WaitGroup
defer await.Wait()
@@ -42,20 +39,13 @@ func collect(opts options) error {
go func() {
await.Add(1)
defer await.Done()
+ handleSignals(ctx, cancel)
- sigs := make(chan os.Signal, 1)
- defer close(sigs)
-
- signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
- select {
- case <-sigs:
- cancel()
- case <-ctx.Done():
- }
-
+ //
// Sometimes some worker in scanner.Fetcher isn't shutdown
// properly despite the parent context (including getRanges)
// being done. The below is an ugly hack to avoid hanging.
+ //
wait := time.Second * 5 // TODO: 15s
logger.Printf("INFO: about to exit, please wait %v...\n", wait)
select {
@@ -64,6 +54,14 @@ func collect(opts options) error {
}
}()
+ metricsCh := make(chan metrics)
+ defer close(metricsCh)
+ go func() {
+ await.Add(1)
+ defer await.Done()
+ handleMetrics(ctx, metricsCh, utils.Logs(md))
+ }()
+
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
@@ -108,6 +106,7 @@ func collect(opts options) error {
})
if uint64(th.TreeSize) == sth.TreeSize {
logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize)
+ metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true}
return
}
@@ -147,7 +146,7 @@ func collect(opts options) error {
case <-cctx.Done():
if h.Sequence(curr) {
c := h.TPop()
- if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil {
+ if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil {
logger.Printf("ERROR: %s: %v\n", *log.Description, err)
}
}
@@ -161,7 +160,7 @@ func collect(opts options) error {
}
c = h.TPop()
- putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
+ putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
if err != nil {
cancel()
logger.Printf("ERROR: %s: %v\n", *log.Description, err)
@@ -196,6 +195,7 @@ func collect(opts options) error {
}
}
}(log)
+ //break
}
logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
@@ -235,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
return sth, nil
}
-func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
+func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
chunkSize := int64(len(c.LeafHashes))
if chunkSize == 0 {
return false, nil // nothing to persist
@@ -318,6 +318,16 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc str
return false, err
}
+ // Output metrics
+ metricsCh <- metrics{
+ Description: logDesc,
+ NumEntries: newTH.TreeSize - oldTH.TreeSize,
+ Timestamp: time.Now().Unix(),
+ Start: newTH.TreeSize,
+ End: int64(sth.TreeSize),
+ Done: uint64(newTH.TreeSize) == sth.TreeSize,
+ }
+
logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize)
return false, nil
}