aboutsummaryrefslogtreecommitdiff
path: root/collect.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-17 10:49:48 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-17 10:49:48 +0100
commitb53aa358d64a22e8747bf3a9d117df7135b9edac (patch)
tree401fb4039922a976e7040953300d8c580696aef7 /collect.go
parent979e87fcce5482d6da1dd40b3e5ba790dc7af7ea (diff)
wip collect
Diffstat (limited to 'collect.go')
-rw-r--r--collect.go270
1 files changed, 270 insertions, 0 deletions
diff --git a/collect.go b/collect.go
new file mode 100644
index 0000000..c1e2c6d
--- /dev/null
+++ b/collect.go
@@ -0,0 +1,270 @@
+package main
+
+import (
+ "container/heap"
+ "context"
+ "crypto/sha256"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ "git.cs.kau.se/rasmoste/ct-sans/internal/chunk"
+ "git.cs.kau.se/rasmoste/ct-sans/internal/utils"
+ ct "github.com/google/certificate-transparency-go"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/jsonclient"
+ "github.com/google/certificate-transparency-go/scanner"
+ "gitlab.torproject.org/rgdd/ct/pkg/merkle"
+ "gitlab.torproject.org/rgdd/ct/pkg/metadata"
+)
+
+func collect(opts options) error {
+ b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile))
+ if err != nil {
+ return err
+ }
+ var md metadata.Metadata
+ if err := json.Unmarshal(b, &md); err != nil {
+ return err
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+
+ sigs := make(chan os.Signal, 1)
+ defer close(sigs)
+
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-sigs:
+ fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait...\n")
+ cancel()
+ case <-ctx.Done():
+ }
+ }()
+
+ for _, log := range utils.Logs(md) {
+ go func(log metadata.Log) {
+ wg.Add(1)
+ defer wg.Done()
+
+ chunks := make(chan *chunk.Chunk)
+ defer close(chunks)
+
+ id, _ := log.Key.ID()
+ th, err := readState(opts, id[:])
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ sth, err := readSnapshot(opts, id[:])
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+
+ cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{UserAgent: "wip2"})
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+ fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{
+ BatchSize: int(opts.batchSize),
+ StartIndex: th.TreeSize,
+ EndIndex: int64(sth.TreeSize),
+ ParallelFetch: int(opts.workersPerLog),
+ Continuous: false,
+ })
+
+ //
+ // Callback that puts downloaded certificates into a
+ // chunk that a single sequencer can verify and persist
+ //
+ callback := func(eb scanner.EntryBatch) {
+ leafHashes := [][sha256.Size]byte{}
+ for i := 0; i < len(eb.Entries); i++ {
+ leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
+ }
+ sans := []string{"example.com"} // TODO: fixme
+ chunks <- &chunk.Chunk{eb.Start, leafHashes, sans}
+ }
+
+ //
+ // Sequencer that waits for sufficiently large chunks
+ // before verifying inclusion proofs and persisting an
+ // intermediate tree head (size and root hash) as well
+ // as the SANs that were observed up until that point.
+ //
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description)
+
+ h := &chunk.ChunkHeap{}
+ heap.Init(h)
+ curr := th.TreeSize
+ for {
+ select {
+ case <-ctx.Done():
+ if h.Sequence(curr) {
+ c := h.TPop()
+ if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err)
+ }
+ }
+ return
+ case c, ok := <-chunks:
+ if ok {
+ h.TPush(c)
+ }
+ if !h.Sequence(curr) {
+ continue
+ }
+
+ c = h.TPop()
+ putBack, err := persistChunk(cli, opts, id[:], int64(opts.persistSize), c)
+ if err != nil {
+ cancel()
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ return
+ }
+ if putBack {
+ h.TPush(c)
+ continue
+ }
+
+ curr += int64(len(c.LeafHashes))
+ }
+ }
+ }()
+
+ if err := fetcher.Run(ctx, callback); err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ cancel()
+ return
+ }
+
+ fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description)
+ for len(chunks) > 0 {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(1 * time.Second):
+ fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description)
+ }
+ }
+ }(log)
+ break
+ }
+
+ time.Sleep(1 * time.Second)
+ return fmt.Errorf("TODO")
+}
+
+type treeHead struct {
+ TreeSize int64 `json:"tree_size"`
+ RootHash [sha256.Size]byte `json:root_hash"`
+}
+
+func readState(opts options, logID []byte) (treeHead, error) {
+ if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil {
+ return treeHead{0, sha256.Sum256(nil)}, nil
+ }
+ b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile))
+ if err != nil {
+ return treeHead{}, err
+ }
+ var th treeHead
+ if err := json.Unmarshal(b, &th); err != nil {
+ return treeHead{}, err
+ }
+ return th, nil
+}
+
+func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
+ b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile))
+ if err != nil {
+ return ct.SignedTreeHead{}, err
+ }
+ var sth ct.SignedTreeHead
+ if err := json.Unmarshal(b, &sth); err != nil {
+ return ct.SignedTreeHead{}, err
+ }
+ return sth, nil
+}
+
+func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) {
+ if len(c.LeafHashes) == 0 {
+ return false, nil // nothing to persist
+ }
+ if int64(len(c.LeafHashes)) < minSequence {
+ return true, nil // wait for more leaves
+ }
+
+ // Read persisted tree state from disk
+ th, err := readState(opts, logID)
+ if err != nil {
+ return false, err
+ }
+ if th.TreeSize != c.Start {
+ return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", th.TreeSize, c.Start)
+ }
+
+ // Derive next intermediate tree state to persist
+ //
+ // Independent context because we need to run inclusion and consistency
+ // queries after the parent context is cancelled to persist on shutdown
+ //
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(c.Start+int64(len(c.LeafHashes))))
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err)
+ return true, nil // try again later
+ }
+ if p.LeafIndex != c.Start {
+ return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex)
+ }
+ // TODO: ranged inclusion verify + consistency proof
+
+ // Persist SANs to disk
+ fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
+ if err != nil {
+ return false, err
+ }
+ defer fp.Close()
+ if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil {
+ return false, err
+ }
+ if err := fp.Sync(); err != nil {
+ return false, err
+ }
+
+ // Persist intermediate log state to disk
+ b, err := json.Marshal(&treeHead{c.Start + int64(len(c.LeafHashes)), [sha256.Size]byte{}})
+ if err != nil {
+ return false, err
+ }
+ if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil {
+ return false, err
+ }
+
+ fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d end=%d\n", logID, c.Start, c.Start+int64(len(c.LeafHashes)))
+ return false, nil
+}