From c96dabe39386d008985566bf689a3735d0a8f1c8 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 18 Mar 2023 13:11:17 +0100 Subject: light refactoring --- collect.go | 208 ++++++++++++++++++++++++++++++------------------------------- 1 file changed, 104 insertions(+), 104 deletions(-) (limited to 'collect.go') diff --git a/collect.go b/collect.go index d3eaae3..44130c3 100644 --- a/collect.go +++ b/collect.go @@ -66,16 +66,10 @@ func collect(opts options) error { var wg sync.WaitGroup defer wg.Wait() for _, log := range utils.Logs(md) { - //if *log.Description != "Trust Asia Log2024-2" { - // continue - //} go func(log metadata.Log) { wg.Add(1) defer wg.Done() - chunks := make(chan *chunk.Chunk) - defer close(chunks) - id, _ := log.Key.ID() th, err := readState(opts, id[:]) if err != nil { @@ -105,11 +99,26 @@ func collect(opts options) error { ParallelFetch: int(opts.WorkersPerLog), }) if uint64(th.TreeSize) == sth.TreeSize { - logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize) metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} return } + // + // Sequencer that waits for sufficiently large chunks + // before verifying inclusion proofs and persisting an + // intermediate tree head (size and root hash) as well + // as the SANs that were observed up until that point. + // + chunksCh := make(chan *chunk.Chunk) + defer close(chunksCh) + cctx, fetchDone := context.WithCancel(ctx) + defer fetchDone() + go func() { + wg.Add(1) + defer wg.Done() + sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh) + }() + // // Callback that puts downloaded certificates into a // chunk that a single sequencer can verify and persist @@ -123,79 +132,22 @@ func collect(opts options) error { for _, err := range errs { logger.Printf("NOTICE: %s: %v", *log.Description, err) } - chunks <- &chunk.Chunk{eb.Start, leafHashes, sans} + chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans} } - // - // Sequencer that waits for sufficiently large chunks - // before verifying inclusion proofs and persisting an - // intermediate tree head (size and root hash) as well - // as the SANs that were observed up until that point. - // - cctx, fetchDone := context.WithCancel(ctx) - defer fetchDone() - go func() { - wg.Add(1) - defer wg.Done() - - h := &chunk.ChunkHeap{} - heap.Init(h) - curr := th.TreeSize - for { - select { - case <-cctx.Done(): - if h.Sequence(curr) { - c := h.TPop() - if _, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, 0, c); err != nil { - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - } - } - return - case c, ok := <-chunks: - if ok { - h.TPush(c) - } - if !h.Sequence(curr) { - continue - } - - c = h.TPop() - putBack, err := persistChunk(metricsCh, cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) - if err != nil { - cancel() - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - return - } - if putBack { - h.TPush(c) - continue - } - - curr += int64(len(c.LeafHashes)) - } - } - }() - - logger.Printf("INFO: %s: working from tree size %d to %d", *log.Description, th.TreeSize, sth.TreeSize) if err := fetcher.Run(ctx, callback); err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } - if ctx.Err() == nil { - logger.Printf("INFO: %s: completed fetch at tree size %d", *log.Description, sth.TreeSize) - } - - for len(chunks) > 0 { + for len(chunksCh) > 0 { select { case <-ctx.Done(): - return // some Go routine cancelled due to an error - case <-time.After(1 * time.Second): - logger.Printf("DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) + return // some Go routine cancelled due to an error, die + case <-time.After(time.Second): } } }(log) - //break } logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") @@ -203,39 +155,54 @@ func collect(opts options) error { return nil } -type treeHead struct { - TreeSize int64 `json:"tree_size"` - RootHash [sha256.Size]byte `json:root_hash"` -} +func sequence(ctx context.Context, cancel context.CancelFunc, + opts options, log metadata.Log, nextIndex int64, cli *client.LogClient, + chunksCh chan *chunk.Chunk, metricsCh chan metrics) { + desc := *log.Description -func readState(opts options, logID []byte) (treeHead, error) { - if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { - return treeHead{0, sha256.Sum256(nil)}, nil - } - b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) - if err != nil { - return treeHead{}, err - } - var th treeHead - if err := json.Unmarshal(b, &th); err != nil { - return treeHead{}, err - } - return th, nil -} + h := &chunk.ChunkHeap{} + heap.Init(h) + for { + select { + case <-ctx.Done(): + if h.Sequence(nextIndex) { + c := h.TPop() + if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil { + logger.Printf("ERROR: %s: %v\n", desc, err) + } + } + return + case c, ok := <-chunksCh: + if ok { + h.TPush(c) + } + if !h.Sequence(nextIndex) { + continue + } -func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { - b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) - if err != nil { - return ct.SignedTreeHead{}, err - } - var sth ct.SignedTreeHead - if err := json.Unmarshal(b, &sth); err != nil { - return ct.SignedTreeHead{}, err + c = h.TPop() + putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh) + if err != nil { + cancel() + logger.Printf("ERROR: %s: %v\n", desc, err) + return + } + if putBack { + h.TPush(c) + continue + } + + nextIndex += int64(len(c.LeafHashes)) + } } - return sth, nil } -func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { +func persist(c *chunk.Chunk, + opts options, log metadata.Log, cli *client.LogClient, minSequence int64, + metricsCh chan metrics) (bool, error) { + logID, _ := log.Key.ID() + desc := *log.Description + chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist @@ -245,22 +212,22 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l } // Read persisted tree state from disk - oldTH, err := readState(opts, logID) + oldTH, err := readState(opts, logID[:]) if err != nil { return false, err } if oldTH.TreeSize != c.Start { return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start) } + // Read signed tree head from disk - sth, err := readSnapshot(opts, logID) + sth, err := readSnapshot(opts, logID[:]) if err != nil { return false, err } - // Derive next intermediate tree state from a compact range + // - // Santity checks: expected indces/sizes and consistent root hashes. - // This is redundant, but could, e.g., catch bugs with our storage. + // Derive next intermediate tree state instead of verying all inclusions // // Independent context because we need to run inclusion and consistency // queries after the parent context is cancelled to persist on shutdown @@ -278,6 +245,8 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), utils.Proof(p.AuditPath)); err != nil { return false, err } + + // Check that new tree state is consistent with what we stored on disk var hashes [][]byte if oldTH.TreeSize > 0 { if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { @@ -320,14 +289,45 @@ func persistChunk(metricsCh chan metrics, cli *client.LogClient, opts options, l // Output metrics metricsCh <- metrics{ - Description: logDesc, + Description: desc, NumEntries: newTH.TreeSize - oldTH.TreeSize, Timestamp: time.Now().Unix(), Start: newTH.TreeSize, End: int64(sth.TreeSize), Done: uint64(newTH.TreeSize) == sth.TreeSize, } - - logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize) + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize) return false, nil } + +type treeHead struct { + TreeSize int64 `json:"tree_size"` + RootHash [sha256.Size]byte `json:root_hash"` +} + +func readState(opts options, logID []byte) (treeHead, error) { + if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { + return treeHead{0, sha256.Sum256(nil)}, nil + } + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) + if err != nil { + return treeHead{}, err + } + var th treeHead + if err := json.Unmarshal(b, &th); err != nil { + return treeHead{}, err + } + return th, nil +} + +func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) + if err != nil { + return ct.SignedTreeHead{}, err + } + var sth ct.SignedTreeHead + if err := json.Unmarshal(b, &sth); err != nil { + return ct.SignedTreeHead{}, err + } + return sth, nil +} -- cgit v1.2.3