aboutsummaryrefslogtreecommitdiff
path: root/cmd_collect.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 13:20:27 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-18 13:20:38 +0100
commitb8b8e05c833dfb4c4191c8c1391e02d07e0e744f (patch)
tree65dfc946a218e81f97a2a8f867f4ef9661d0e4de /cmd_collect.go
parent15ffe76847c4c0383c4d0c0a35fb29d5031d093b (diff)
renaming files and moving around
Diffstat (limited to 'cmd_collect.go')
-rw-r--r--cmd_collect.go300
1 files changed, 300 insertions, 0 deletions
diff --git a/cmd_collect.go b/cmd_collect.go
new file mode 100644
index 0000000..0254bb3
--- /dev/null
+++ b/cmd_collect.go
@@ -0,0 +1,300 @@
+package main
+
+import (
+ "container/heap"
+ "context"
+ "crypto/sha256"
+ "encoding/json"
+ "fmt"
+ logger "log"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "git.cs.kau.se/rasmoste/ct-sans/internal/chunk"
+ "git.cs.kau.se/rasmoste/ct-sans/internal/merkle"
+ "git.cs.kau.se/rasmoste/ct-sans/internal/x509"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/jsonclient"
+ "github.com/google/certificate-transparency-go/scanner"
+ "gitlab.torproject.org/rgdd/ct/pkg/metadata"
+)
+
+func collect(opts options) error {
+ b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile))
+ if err != nil {
+ return err
+ }
+ var md metadata.Metadata
+ if err := json.Unmarshal(b, &md); err != nil {
+ return err
+ }
+
+ var await sync.WaitGroup
+ defer await.Wait()
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ await.Add(1)
+ defer await.Done()
+ handleSignals(ctx, cancel)
+
+ //
+ // Sometimes some worker in scanner.Fetcher isn't shutdown
+ // properly despite the parent context (including getRanges)
+ // being done. The below is an ugly hack to avoid hanging.
+ //
+ wait := time.Second * 5 // TODO: 15s
+ logger.Printf("INFO: about to exit, please wait %v...\n", wait)
+ select {
+ case <-time.After(wait):
+ os.Exit(0)
+ }
+ }()
+
+ metricsCh := make(chan metrics)
+ defer close(metricsCh)
+ go func() {
+ await.Add(1)
+ defer await.Done()
+ handleMetrics(ctx, metricsCh, logs(md))
+ }()
+
+ defer cancel()
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ for _, log := range logs(md) {
+ go func(log metadata.Log) {
+ wg.Add(1)
+ defer wg.Done()
+
+ id, _ := log.Key.ID()
+ th, err := readState(opts, id[:])
+ if err != nil {
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ sth, err := readSnapshot(opts, id[:])
+ if err != nil {
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ cli, err := client.New(string(log.URL),
+ &http.Client{Transport: &http.Transport{IdleConnTimeout: 120 * time.Second}},
+ jsonclient.Options{UserAgent: opts.HTTPAgent},
+ )
+ if err != nil {
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{
+ BatchSize: int(opts.BatchSize),
+ StartIndex: th.TreeSize,
+ EndIndex: int64(sth.TreeSize),
+ ParallelFetch: int(opts.WorkersPerLog),
+ })
+ if uint64(th.TreeSize) == sth.TreeSize {
+ metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true}
+ return
+ }
+
+ //
+ // Sequencer that waits for sufficiently large chunks
+ // before verifying inclusion proofs and persisting an
+ // intermediate tree head (size and root hash) as well
+ // as the SANs that were observed up until that point.
+ //
+ chunksCh := make(chan *chunk.Chunk)
+ defer close(chunksCh)
+ cctx, fetchDone := context.WithCancel(ctx)
+ defer fetchDone()
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh)
+ }()
+
+ //
+ // Callback that puts downloaded certificates into a
+ // chunk that a single sequencer can verify and persist
+ //
+ callback := func(eb scanner.EntryBatch) {
+ leafHashes := [][sha256.Size]byte{}
+ for i := 0; i < len(eb.Entries); i++ {
+ leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
+ }
+ sans, errs := x509.SANsFromLeafEntries(eb.Start, eb.Entries)
+ for _, err := range errs {
+ logger.Printf("NOTICE: %s: %v", *log.Description, err)
+ }
+ chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans}
+ }
+
+ if err := fetcher.Run(ctx, callback); err != nil {
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ for len(chunksCh) > 0 {
+ select {
+ case <-ctx.Done():
+ return // some Go routine cancelled due to an error, die
+ case <-time.After(time.Second):
+ }
+ }
+ }(log)
+ }
+
+ logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
+ time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn
+ return nil
+}
+
+func sequence(ctx context.Context, cancel context.CancelFunc,
+ opts options, log metadata.Log, nextIndex int64, cli *client.LogClient,
+ chunksCh chan *chunk.Chunk, metricsCh chan metrics) {
+ desc := *log.Description
+
+ h := &chunk.ChunkHeap{}
+ heap.Init(h)
+ for {
+ select {
+ case <-ctx.Done():
+ if h.Sequence(nextIndex) {
+ c := h.TPop()
+ if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil {
+ logger.Printf("ERROR: %s: %v\n", desc, err)
+ }
+ }
+ return
+ case c, ok := <-chunksCh:
+ if ok {
+ h.TPush(c)
+ }
+ if !h.Sequence(nextIndex) {
+ continue
+ }
+
+ c = h.TPop()
+ putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh)
+ if err != nil {
+ cancel()
+ logger.Printf("ERROR: %s: %v\n", desc, err)
+ return
+ }
+ if putBack {
+ h.TPush(c)
+ continue
+ }
+
+ nextIndex += int64(len(c.LeafHashes))
+ }
+ }
+}
+
+func persist(c *chunk.Chunk,
+ opts options, log metadata.Log, cli *client.LogClient, minSequence int64,
+ metricsCh chan metrics) (bool, error) {
+ logID, _ := log.Key.ID()
+ desc := *log.Description
+
+ chunkSize := int64(len(c.LeafHashes))
+ if chunkSize == 0 {
+ return false, nil // nothing to persist
+ }
+ if chunkSize < minSequence {
+ return true, nil // wait for more leaves
+ }
+
+ // Read persisted tree state from disk
+ oldTH, err := readState(opts, logID[:])
+ if err != nil {
+ return false, err
+ }
+ if oldTH.TreeSize != c.Start {
+ return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start)
+ }
+
+ // Read signed tree head from disk
+ sth, err := readSnapshot(opts, logID[:])
+ if err != nil {
+ return false, err
+ }
+
+ //
+ // Derive next intermediate tree state instead of verying all inclusions
+ //
+ // Independent context because we need to run inclusion and consistency
+ // queries after the parent context is cancelled to persist on shutdown
+ //
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ newTH := treeHead{TreeSize: c.Start + chunkSize}
+ p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize))
+ if err != nil {
+ return true, nil // try again later
+ }
+ if p.LeafIndex != c.Start {
+ return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex)
+ }
+ if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), proof(p.AuditPath)); err != nil {
+ return false, err
+ }
+
+ // Check that new tree state is consistent with what we stored on disk
+ var hashes [][]byte
+ if oldTH.TreeSize > 0 {
+ if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil {
+ return true, nil // try again later
+ }
+ }
+ if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, proof(hashes)); err != nil {
+ return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err)
+ }
+
+ // Check that new tree state is consistent with the signed tree head
+ if hashes, err = cli.GetSTHConsistency(ctx, uint64(newTH.TreeSize), sth.TreeSize); err != nil {
+ return true, nil // try again later
+ }
+ if err := merkle.VerifyConsistency(uint64(newTH.TreeSize), sth.TreeSize, newTH.RootHash, sth.SHA256RootHash, proof(hashes)); err != nil {
+ return false, fmt.Errorf("%d %x is inconsistent with signed tree head: %v", newTH.TreeSize, newTH.RootHash, err)
+ }
+
+ // Persist SANs to disk
+ fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
+ if err != nil {
+ return false, err
+ }
+ defer fp.Close()
+ if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil {
+ return false, err
+ }
+ if err := fp.Sync(); err != nil {
+ return false, err
+ }
+
+ // Persist new tree state to disk
+ b, err := json.Marshal(&newTH)
+ if err != nil {
+ return false, err
+ }
+ if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil {
+ return false, err
+ }
+
+ // Output metrics
+ metricsCh <- metrics{
+ Description: desc,
+ NumEntries: newTH.TreeSize - oldTH.TreeSize,
+ Timestamp: time.Now().Unix(),
+ Start: newTH.TreeSize,
+ End: int64(sth.TreeSize),
+ Done: uint64(newTH.TreeSize) == sth.TreeSize,
+ }
+ logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize)
+ return false, nil
+}