diff options
author | Rasmus Dahlberg <rasmus@rgdd.se> | 2023-03-17 10:49:48 +0100 |
---|---|---|
committer | Rasmus Dahlberg <rasmus@rgdd.se> | 2023-03-17 10:49:48 +0100 |
commit | b53aa358d64a22e8747bf3a9d117df7135b9edac (patch) | |
tree | 401fb4039922a976e7040953300d8c580696aef7 /collect.go | |
parent | 979e87fcce5482d6da1dd40b3e5ba790dc7af7ea (diff) |
wip collect
Diffstat (limited to 'collect.go')
-rw-r--r-- | collect.go | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/collect.go b/collect.go new file mode 100644 index 0000000..c1e2c6d --- /dev/null +++ b/collect.go @@ -0,0 +1,270 @@ +package main + +import ( + "container/heap" + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" + "git.cs.kau.se/rasmoste/ct-sans/internal/utils" + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "github.com/google/certificate-transparency-go/scanner" + "gitlab.torproject.org/rgdd/ct/pkg/merkle" + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +func collect(opts options) error { + b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile)) + if err != nil { + return err + } + var md metadata.Metadata + if err := json.Unmarshal(b, &md); err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + defer wg.Wait() + + go func() { + wg.Add(1) + defer wg.Done() + + sigs := make(chan os.Signal, 1) + defer close(sigs) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigs: + fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait...\n") + cancel() + case <-ctx.Done(): + } + }() + + for _, log := range utils.Logs(md) { + go func(log metadata.Log) { + wg.Add(1) + defer wg.Done() + + chunks := make(chan *chunk.Chunk) + defer close(chunks) + + id, _ := log.Key.ID() + th, err := readState(opts, id[:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + sth, err := readSnapshot(opts, id[:]) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + + cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{UserAgent: "wip2"}) + if err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{ + BatchSize: int(opts.batchSize), + StartIndex: th.TreeSize, + EndIndex: int64(sth.TreeSize), + ParallelFetch: int(opts.workersPerLog), + Continuous: false, + }) + + // + // Callback that puts downloaded certificates into a + // chunk that a single sequencer can verify and persist + // + callback := func(eb scanner.EntryBatch) { + leafHashes := [][sha256.Size]byte{} + for i := 0; i < len(eb.Entries); i++ { + leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) + } + sans := []string{"example.com"} // TODO: fixme + chunks <- &chunk.Chunk{eb.Start, leafHashes, sans} + } + + // + // Sequencer that waits for sufficiently large chunks + // before verifying inclusion proofs and persisting an + // intermediate tree head (size and root hash) as well + // as the SANs that were observed up until that point. + // + go func() { + wg.Add(1) + defer wg.Done() + defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description) + + h := &chunk.ChunkHeap{} + heap.Init(h) + curr := th.TreeSize + for { + select { + case <-ctx.Done(): + if h.Sequence(curr) { + c := h.TPop() + if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err) + } + } + return + case c, ok := <-chunks: + if ok { + h.TPush(c) + } + if !h.Sequence(curr) { + continue + } + + c = h.TPop() + putBack, err := persistChunk(cli, opts, id[:], int64(opts.persistSize), c) + if err != nil { + cancel() + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + return + } + if putBack { + h.TPush(c) + continue + } + + curr += int64(len(c.LeafHashes)) + } + } + }() + + if err := fetcher.Run(ctx, callback); err != nil { + fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + + fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description) + for len(chunks) > 0 { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) + } + } + }(log) + break + } + + time.Sleep(1 * time.Second) + return fmt.Errorf("TODO") +} + +type treeHead struct { + TreeSize int64 `json:"tree_size"` + RootHash [sha256.Size]byte `json:root_hash"` +} + +func readState(opts options, logID []byte) (treeHead, error) { + if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { + return treeHead{0, sha256.Sum256(nil)}, nil + } + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) + if err != nil { + return treeHead{}, err + } + var th treeHead + if err := json.Unmarshal(b, &th); err != nil { + return treeHead{}, err + } + return th, nil +} + +func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) + if err != nil { + return ct.SignedTreeHead{}, err + } + var sth ct.SignedTreeHead + if err := json.Unmarshal(b, &sth); err != nil { + return ct.SignedTreeHead{}, err + } + return sth, nil +} + +func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) { + if len(c.LeafHashes) == 0 { + return false, nil // nothing to persist + } + if int64(len(c.LeafHashes)) < minSequence { + return true, nil // wait for more leaves + } + + // Read persisted tree state from disk + th, err := readState(opts, logID) + if err != nil { + return false, err + } + if th.TreeSize != c.Start { + return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", th.TreeSize, c.Start) + } + + // Derive next intermediate tree state to persist + // + // Independent context because we need to run inclusion and consistency + // queries after the parent context is cancelled to persist on shutdown + // + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(c.Start+int64(len(c.LeafHashes)))) + if err != nil { + fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err) + return true, nil // try again later + } + if p.LeafIndex != c.Start { + return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex) + } + // TODO: ranged inclusion verify + consistency proof + + // Persist SANs to disk + fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return false, err + } + defer fp.Close() + if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil { + return false, err + } + if err := fp.Sync(); err != nil { + return false, err + } + + // Persist intermediate log state to disk + b, err := json.Marshal(&treeHead{c.Start + int64(len(c.LeafHashes)), [sha256.Size]byte{}}) + if err != nil { + return false, err + } + if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil { + return false, err + } + + fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d end=%d\n", logID, c.Start, c.Start+int64(len(c.LeafHashes))) + return false, nil +} |