aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 11:53:50 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 11:53:50 +0100
commita18ae2877d3bb084df9f505f6c11dae7a66d341e (patch)
tree93f92835e4d0d9b7f13f9e07e018bfad2786684a
parent10f7eb048a0cba6104b52027bff3b6f50db2dab9 (diff)
more wip collect, metrics
-rw-r--r--collect.go42
-rw-r--r--house_keeping.go80
2 files changed, 106 insertions, 16 deletions
diff --git a/collect.go b/collect.go
index d60554e..d3eaae3 100644
--- a/collect.go
+++ b/collect.go
@@ -9,10 +9,8 @@ import (
logger "log"
"net/http"
"os"
- "os/signal"
"strings"
"sync"
- "syscall"
"time"
"git.cs.kau.se/rasmoste/ct-sans/internal/chunk"
@@ -34,7 +32,6 @@ func collect(opts options) error {
if err := json.Unmarshal(b, &md); err != nil {
return err
}
- logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md)))
var await sync.WaitGroup
defer await.Wait()
@@ -42,20 +39,13 @@ func collect(opts options) error {
go func() {
await.Add(1)
defer await.Done()
+ handleSignals(ctx, cancel)
- sigs := make(chan os.Signal, 1)
- defer close(sigs)
-
- signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
- select {
- case <-sigs:
- cancel()
- case <-ctx.Done():
- }
-
+ //
// Sometimes some worker in scanner.Fetcher isn't shutdown
// properly despite the parent context (including getRanges)
// being done. The below is an ugly hack to avoid hanging.
+ //
wait := time.Second * 5 // TODO: 15s
logger.Printf("INFO: about to exit, please wait %v...\n", wait)
select {
@@ -64,6 +54,14 @@ func collect(opts options) error {
}
}()
+ metricsCh := make(chan metrics)
+ defer close(metricsCh)
+ go func() {
+ await.Add(1)
+ defer await.Done()
+ handleMetrics(ctx, metricsCh, utils.Logs(md))
+ }()
+
defer cancel()
var wg sync.WaitGroup
defer wg.Wait()
@@ -108,6 +106,7 @@ func collect(opts options) error {
})
if uint64(th.TreeSize) == sth.TreeSize {
logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize)
+ metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true}
return
}
@@ -147,7 +146,7 @@ func collect(opts options) error {
case <-cctx.Done():
if h.Sequence(curr) {
c := h.TPop()
- if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil {
+ if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil {
logger.Printf("ERROR: %s: %v\n", *log.Description, err)
}
}
@@ -161,7 +160,7 @@ func collect(opts options) error {
}
c = h.TPop()
- putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
+ putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
if err != nil {
cancel()
logger.Printf("ERROR: %s: %v\n", *log.Description, err)
@@ -196,6 +195,7 @@ func collect(opts options) error {
}
}
}(log)
+ //break
}
logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
@@ -235,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
return sth, nil
}
-func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
+func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
chunkSize := int64(len(c.LeafHashes))
if chunkSize == 0 {
return false, nil // nothing to persist
@@ -318,6 +318,16 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc str
return false, err
}
+ // Output metrics
+ metricsCh <- metrics{
+ Description: logDesc,
+ NumEntries: newTH.TreeSize - oldTH.TreeSize,
+ Timestamp: time.Now().Unix(),
+ Start: newTH.TreeSize,
+ End: int64(sth.TreeSize),
+ Done: uint64(newTH.TreeSize) == sth.TreeSize,
+ }
+
logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize)
return false, nil
}
diff --git a/house_keeping.go b/house_keeping.go
new file mode 100644
index 0000000..670a95b
--- /dev/null
+++ b/house_keeping.go
@@ -0,0 +1,80 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ logger "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "gitlab.torproject.org/rgdd/ct/pkg/metadata"
+)
+
+type metrics struct {
+ Description string // Human-readable log name
+ NumEntries int64 // Number of entries persisted
+ Timestamp int64 // Time that first entry was persisted
+ Start int64 // Next index to start fetching from
+ End int64 // Exclusive end index to reach
+ Done bool // Worker is done
+
+ avg float64
+}
+
+func (m metrics) String() string {
+ format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n"
+ if m.Done {
+ return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End)
+ }
+ return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End)
+}
+
+func (m *metrics) update(other metrics) {
+ m.NumEntries += other.NumEntries
+ m.Start = other.Start
+ m.End = other.End
+ m.Done = other.Done
+ m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp))
+}
+
+func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) {
+ sum := make(map[string]metrics)
+ for _, log := range logs {
+ sum[*log.Description] = metrics{
+ Description: *log.Description,
+ Timestamp: time.Now().Unix(),
+ }
+ }
+
+ ticker := time.NewTicker(15 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ case m := <-metricsCh:
+ s := sum[m.Description]
+ s.update(m)
+ sum[m.Description] = s
+ case <-ticker.C:
+ output := ""
+ for _, log := range logs {
+ output += sum[*log.Description].String()
+ }
+ logger.Printf("INFO: periodic status update\n\n%s\n\n", output)
+ }
+ }
+}
+
+func handleSignals(ctx context.Context, cancel context.CancelFunc) {
+ sigs := make(chan os.Signal, 1)
+ defer close(sigs)
+
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-sigs:
+ cancel()
+ case <-ctx.Done():
+ }
+}