summaryrefslogtreecommitdiff
path: root/collect.go
diff options
context:
space:
mode:
Diffstat (limited to 'collect.go')
-rw-r--r--collect.go333
1 files changed, 0 insertions, 333 deletions
diff --git a/collect.go b/collect.go
deleted file mode 100644
index 3c548db..0000000
--- a/collect.go
+++ /dev/null
@@ -1,333 +0,0 @@
-package main
-
-import (
- "container/heap"
- "context"
- "crypto/sha256"
- "encoding/json"
- "fmt"
- logger "log"
- "net/http"
- "os"
- "strings"
- "sync"
- "time"
-
- "git.cs.kau.se/rasmoste/ct-sans/internal/chunk"
- "git.cs.kau.se/rasmoste/ct-sans/internal/merkle"
- "git.cs.kau.se/rasmoste/ct-sans/internal/x509"
- ct "github.com/google/certificate-transparency-go"
- "github.com/google/certificate-transparency-go/client"
- "github.com/google/certificate-transparency-go/jsonclient"
- "github.com/google/certificate-transparency-go/scanner"
- "gitlab.torproject.org/rgdd/ct/pkg/metadata"
-)
-
-func collect(opts options) error {
- b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile))
- if err != nil {
- return err
- }
- var md metadata.Metadata
- if err := json.Unmarshal(b, &md); err != nil {
- return err
- }
-
- var await sync.WaitGroup
- defer await.Wait()
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- await.Add(1)
- defer await.Done()
- handleSignals(ctx, cancel)
-
- //
- // Sometimes some worker in scanner.Fetcher isn't shutdown
- // properly despite the parent context (including getRanges)
- // being done. The below is an ugly hack to avoid hanging.
- //
- wait := time.Second * 5 // TODO: 15s
- logger.Printf("INFO: about to exit, please wait %v...\n", wait)
- select {
- case <-time.After(wait):
- os.Exit(0)
- }
- }()
-
- metricsCh := make(chan metrics)
- defer close(metricsCh)
- go func() {
- await.Add(1)
- defer await.Done()
- handleMetrics(ctx, metricsCh, logs(md))
- }()
-
- defer cancel()
- var wg sync.WaitGroup
- defer wg.Wait()
- for _, log := range logs(md) {
- go func(log metadata.Log) {
- wg.Add(1)
- defer wg.Done()
-
- id, _ := log.Key.ID()
- th, err := readState(opts, id[:])
- if err != nil {
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- cancel()
- return
- }
- sth, err := readSnapshot(opts, id[:])
- if err != nil {
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- cancel()
- return
- }
- cli, err := client.New(string(log.URL),
- &http.Client{Transport: &http.Transport{IdleConnTimeout: 120 * time.Second}},
- jsonclient.Options{UserAgent: opts.HTTPAgent},
- )
- if err != nil {
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- cancel()
- return
- }
- fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{
- BatchSize: int(opts.BatchSize),
- StartIndex: th.TreeSize,
- EndIndex: int64(sth.TreeSize),
- ParallelFetch: int(opts.WorkersPerLog),
- })
- if uint64(th.TreeSize) == sth.TreeSize {
- metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true}
- return
- }
-
- //
- // Sequencer that waits for sufficiently large chunks
- // before verifying inclusion proofs and persisting an
- // intermediate tree head (size and root hash) as well
- // as the SANs that were observed up until that point.
- //
- chunksCh := make(chan *chunk.Chunk)
- defer close(chunksCh)
- cctx, fetchDone := context.WithCancel(ctx)
- defer fetchDone()
- go func() {
- wg.Add(1)
- defer wg.Done()
- sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh)
- }()
-
- //
- // Callback that puts downloaded certificates into a
- // chunk that a single sequencer can verify and persist
- //
- callback := func(eb scanner.EntryBatch) {
- leafHashes := [][sha256.Size]byte{}
- for i := 0; i < len(eb.Entries); i++ {
- leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
- }
- sans, errs := x509.SANsFromLeafEntries(eb.Start, eb.Entries)
- for _, err := range errs {
- logger.Printf("NOTICE: %s: %v", *log.Description, err)
- }
- chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans}
- }
-
- if err := fetcher.Run(ctx, callback); err != nil {
- logger.Printf("ERROR: %s: %v\n", *log.Description, err)
- cancel()
- return
- }
- for len(chunksCh) > 0 {
- select {
- case <-ctx.Done():
- return // some Go routine cancelled due to an error, die
- case <-time.After(time.Second):
- }
- }
- }(log)
- }
-
- logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
- time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn
- return nil
-}
-
-func sequence(ctx context.Context, cancel context.CancelFunc,
- opts options, log metadata.Log, nextIndex int64, cli *client.LogClient,
- chunksCh chan *chunk.Chunk, metricsCh chan metrics) {
- desc := *log.Description
-
- h := &chunk.ChunkHeap{}
- heap.Init(h)
- for {
- select {
- case <-ctx.Done():
- if h.Sequence(nextIndex) {
- c := h.TPop()
- if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil {
- logger.Printf("ERROR: %s: %v\n", desc, err)
- }
- }
- return
- case c, ok := <-chunksCh:
- if ok {
- h.TPush(c)
- }
- if !h.Sequence(nextIndex) {
- continue
- }
-
- c = h.TPop()
- putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh)
- if err != nil {
- cancel()
- logger.Printf("ERROR: %s: %v\n", desc, err)
- return
- }
- if putBack {
- h.TPush(c)
- continue
- }
-
- nextIndex += int64(len(c.LeafHashes))
- }
- }
-}
-
-func persist(c *chunk.Chunk,
- opts options, log metadata.Log, cli *client.LogClient, minSequence int64,
- metricsCh chan metrics) (bool, error) {
- logID, _ := log.Key.ID()
- desc := *log.Description
-
- chunkSize := int64(len(c.LeafHashes))
- if chunkSize == 0 {
- return false, nil // nothing to persist
- }
- if chunkSize < minSequence {
- return true, nil // wait for more leaves
- }
-
- // Read persisted tree state from disk
- oldTH, err := readState(opts, logID[:])
- if err != nil {
- return false, err
- }
- if oldTH.TreeSize != c.Start {
- return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start)
- }
-
- // Read signed tree head from disk
- sth, err := readSnapshot(opts, logID[:])
- if err != nil {
- return false, err
- }
-
- //
- // Derive next intermediate tree state instead of verying all inclusions
- //
- // Independent context because we need to run inclusion and consistency
- // queries after the parent context is cancelled to persist on shutdown
- //
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- newTH := treeHead{TreeSize: c.Start + chunkSize}
- p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize))
- if err != nil {
- return true, nil // try again later
- }
- if p.LeafIndex != c.Start {
- return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex)
- }
- if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), proof(p.AuditPath)); err != nil {
- return false, err
- }
-
- // Check that new tree state is consistent with what we stored on disk
- var hashes [][]byte
- if oldTH.TreeSize > 0 {
- if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil {
- return true, nil // try again later
- }
- }
- if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, proof(hashes)); err != nil {
- return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err)
- }
-
- // Check that new tree state is consistent with the signed tree head
- if hashes, err = cli.GetSTHConsistency(ctx, uint64(newTH.TreeSize), sth.TreeSize); err != nil {
- return true, nil // try again later
- }
- if err := merkle.VerifyConsistency(uint64(newTH.TreeSize), sth.TreeSize, newTH.RootHash, sth.SHA256RootHash, proof(hashes)); err != nil {
- return false, fmt.Errorf("%d %x is inconsistent with signed tree head: %v", newTH.TreeSize, newTH.RootHash, err)
- }
-
- // Persist SANs to disk
- fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
- if err != nil {
- return false, err
- }
- defer fp.Close()
- if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil {
- return false, err
- }
- if err := fp.Sync(); err != nil {
- return false, err
- }
-
- // Persist new tree state to disk
- b, err := json.Marshal(&newTH)
- if err != nil {
- return false, err
- }
- if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil {
- return false, err
- }
-
- // Output metrics
- metricsCh <- metrics{
- Description: desc,
- NumEntries: newTH.TreeSize - oldTH.TreeSize,
- Timestamp: time.Now().Unix(),
- Start: newTH.TreeSize,
- End: int64(sth.TreeSize),
- Done: uint64(newTH.TreeSize) == sth.TreeSize,
- }
- logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize)
- return false, nil
-}
-
-type treeHead struct {
- TreeSize int64 `json:"tree_size"`
- RootHash [sha256.Size]byte `json:root_hash"`
-}
-
-func readState(opts options, logID []byte) (treeHead, error) {
- if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil {
- return treeHead{0, sha256.Sum256(nil)}, nil
- }
- b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile))
- if err != nil {
- return treeHead{}, err
- }
- var th treeHead
- if err := json.Unmarshal(b, &th); err != nil {
- return treeHead{}, err
- }
- return th, nil
-}
-
-func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
- b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile))
- if err != nil {
- return ct.SignedTreeHead{}, err
- }
- var sth ct.SignedTreeHead
- if err := json.Unmarshal(b, &sth); err != nil {
- return ct.SignedTreeHead{}, err
- }
- return sth, nil
-}