aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 13:11:17 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 13:11:17 +0100
commitc96dabe39386d008985566bf689a3735d0a8f1c8 (patch)
tree47c0a9db54c89e2afdb53a8fa12eb581ca774114
parenta18ae2877d3bb084df9f505f6c11dae7a66d341e (diff)
light refactoring
-rw-r--r--collect.go208
1 files changed, 104 insertions, 104 deletions
diff --git a/collect.go b/collect.go
index d3eaae3..44130c3 100644
--- a/collect.go
+++ b/collect.go
@@ -66,16 +66,10 @@ func collect(opts options) error {
var wg sync.WaitGroup
defer wg.Wait()
for _, log := range utils.Logs(md) {
- //if *log.Description != "Trust Asia Log2024-2" {
- // continue
- //}
go func(log metadata.Log) {
wg.Add(1)
defer wg.Done()
- chunks := make(chan *chunk.Chunk)
- defer close(chunks)
-
id, _ := log.Key.ID()
th, err := readState(opts, id[:])
if err != nil {
@@ -105,12 +99,27 @@ func collect(opts options) error {
ParallelFetch: int(opts.WorkersPerLog),
})
if uint64(th.TreeSize) == sth.TreeSize {
- logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize)
metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true}
return
}
//
+ // Sequencer that waits for sufficiently large chunks
+ // before verifying inclusion proofs and persisting an
+ // intermediate tree head (size and root hash) as well
+ // as the SANs that were observed up until that point.
+ //
+ chunksCh := make(chan *chunk.Chunk)
+ defer close(chunksCh)
+ cctx, fetchDone := context.WithCancel(ctx)
+ defer fetchDone()
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh)
+ }()
+
+ //
// Callback that puts downloaded certificates into a
// chunk that a single sequencer can verify and persist
//
@@ -123,79 +132,22 @@ func collect(opts options) error {
for _, err := range errs {
logger.Printf("NOTICE: %s: %v", *log.Description, err)
}
- chunks <- &chunk.Chunk{eb.Start, leafHashes, sans}
+ chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans}
}
- //
- // Sequencer that waits for sufficiently large chunks
- // before verifying inclusion proofs and persisting an
- // intermediate tree head (size and root hash) as well
- // as the SANs that were observed up until that point.
- //
- cctx, fetchDone := context.WithCancel(ctx)
- defer fetchDone()
- go func() {
- wg.Add(1)
- defer wg.Done()
-
- h := &chunk.ChunkHeap{}
- heap.Init(h)
- curr := th.TreeSize
- for {
- select {
- case <-cctx.Done():
- if h.Sequence(curr) {
- c := h.TPop()
- if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil {
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- }
- }
- return
- case c, ok := <-chunks:
- if ok {
- h.TPush(c)
- }
- if !h.Sequence(curr) {
- continue
- }
-
- c = h.TPop()
- putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
- if err != nil {
- cancel()
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- return
- }
- if putBack {
- h.TPush(c)
- continue
- }
-
- curr += int64(len(c.LeafHashes))
- }
- }
- }()
-
- logger.Printf("INFO: %s: working from tree size %d to %d", *log.Description, th.TreeSize, sth.TreeSize)
if err := fetcher.Run(ctx, callback); err != nil {
logger.Printf("ERROR: %s: %v\n", *log.Description, err)
cancel()
return
}
- if ctx.Err() == nil {
- logger.Printf("INFO: %s: completed fetch at tree size %d", *log.Description, sth.TreeSize)
- }
-
- for len(chunks) > 0 {
+ for len(chunksCh) > 0 {
select {
case <-ctx.Done():
- return // some Go routine cancelled due to an error
- case <-time.After(1 * time.Second):
- logger.Printf("DEBUG: %s: waiting for chunks to be consumed\n", *log.Description)
+ return // some Go routine cancelled due to an error, die
+ case <-time.After(time.Second):
}
}
}(log)
- //break
}
logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
@@ -203,39 +155,54 @@ func collect(opts options) error {
return nil
}
-type treeHead struct {
- TreeSize int64 `json:"tree_size"`
- RootHash [sha256.Size]byte `json:root_hash"`
-}
+func sequence(ctx context.Context, cancel context.CancelFunc,
+ opts options, log metadata.Log, nextIndex int64, cli *client.LogClient,
+ chunksCh chan *chunk.Chunk, metricsCh chan metrics) {
+ desc := *log.Description
-func readState(opts options, logID []byte) (treeHead, error) {
- if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil {
- return treeHead{0, sha256.Sum256(nil)}, nil
- }
- b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile))
- if err != nil {
- return treeHead{}, err
- }
- var th treeHead
- if err := json.Unmarshal(b, &th); err != nil {
- return treeHead{}, err
- }
- return th, nil
-}
+ h := &chunk.ChunkHeap{}
+ heap.Init(h)
+ for {
+ select {
+ case <-ctx.Done():
+ if h.Sequence(nextIndex) {
+ c := h.TPop()
+ if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil {
+ logger.Printf("ERROR: %s: %v\n", desc, err)
+ }
+ }
+ return
+ case c, ok := <-chunksCh:
+ if ok {
+ h.TPush(c)
+ }
+ if !h.Sequence(nextIndex) {
+ continue
+ }
-func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
- b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile))
- if err != nil {
- return ct.SignedTreeHead{}, err
- }
- var sth ct.SignedTreeHead
- if err := json.Unmarshal(b, &sth); err != nil {
- return ct.SignedTreeHead{}, err
+ c = h.TPop()
+ putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh)
+ if err != nil {
+ cancel()
+ logger.Printf("ERROR: %s: %v\n", desc, err)
+ return
+ }
+ if putBack {
+ h.TPush(c)
+ continue
+ }
+
+ nextIndex += int64(len(c.LeafHashes))
+ }
}
- return sth, nil
}
-func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
+func persist(c *chunk.Chunk,
+ opts options, log metadata.Log, cli *client.LogClient, minSequence int64,
+ metricsCh chan metrics) (bool, error) {
+ logID, _ := log.Key.ID()
+ desc := *log.Description
+
chunkSize := int64(len(c.LeafHashes))
if chunkSize == 0 {
return false, nil // nothing to persist
@@ -245,22 +212,22 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l
}
// Read persisted tree state from disk
- oldTH, err := readState(opts, logID)
+ oldTH, err := readState(opts, logID[:])
if err != nil {
return false, err
}
if oldTH.TreeSize != c.Start {
return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start)
}
+
// Read signed tree head from disk
- sth, err := readSnapshot(opts, logID)
+ sth, err := readSnapshot(opts, logID[:])
if err != nil {
return false, err
}
- // Derive next intermediate tree state from a compact range
+
//
- // Santity checks: expected indces/sizes and consistent root hashes.
- // This is redundant, but could, e.g., catch bugs with our storage.
+ // Derive next intermediate tree state instead of verying all inclusions
//
// Independent context because we need to run inclusion and consistency
// queries after the parent context is cancelled to persist on shutdown
@@ -278,6 +245,8 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l
if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), utils.Proof(p.AuditPath)); err != nil {
return false, err
}
+
+ // Check that new tree state is consistent with what we stored on disk
var hashes [][]byte
if oldTH.TreeSize > 0 {
if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil {
@@ -320,14 +289,45 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l
// Output metrics
metricsCh <- metrics{
- Description: logDesc,
+ Description: desc,
NumEntries: newTH.TreeSize - oldTH.TreeSize,
Timestamp: time.Now().Unix(),
Start: newTH.TreeSize,
End: int64(sth.TreeSize),
Done: uint64(newTH.TreeSize) == sth.TreeSize,
}
-
- logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize)
+ logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize)
return false, nil
}
+
+type treeHead struct {
+ TreeSize int64 `json:"tree_size"`
+ RootHash [sha256.Size]byte `json:root_hash"`
+}
+
+func readState(opts options, logID []byte) (treeHead, error) {
+ if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil {
+ return treeHead{0, sha256.Sum256(nil)}, nil
+ }
+ b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile))
+ if err != nil {
+ return treeHead{}, err
+ }
+ var th treeHead
+ if err := json.Unmarshal(b, &th); err != nil {
+ return treeHead{}, err
+ }
+ return th, nil
+}
+
+func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
+ b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile))
+ if err != nil {
+ return ct.SignedTreeHead{}, err
+ }
+ var sth ct.SignedTreeHead
+ if err := json.Unmarshal(b, &sth); err != nil {
+ return ct.SignedTreeHead{}, err
+ }
+ return sth, nil
+}