aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-12-31 09:39:25 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2024-01-07 20:22:23 +0100
commite18d36ebae30536c77c61cd5da123991e0ca1629 (patch)
treebf4880c0019a6009ab1b671e23ef4a1a4a5e8e08 /internal/monitor
parent54d980afcbd6f0011d6a162e0003587d26a3e311 (diff)
Add drafty prototype
Diffstat (limited to 'internal/monitor')
-rw-r--r--internal/monitor/chunks.go89
-rw-r--r--internal/monitor/matcher.go13
-rw-r--r--internal/monitor/monitor.go173
-rw-r--r--internal/monitor/tail.go200
4 files changed, 475 insertions, 0 deletions
diff --git a/internal/monitor/chunks.go b/internal/monitor/chunks.go
new file mode 100644
index 0000000..02b3802
--- /dev/null
+++ b/internal/monitor/chunks.go
@@ -0,0 +1,89 @@
+package monitor
+
+//
+// A min heap of chunks, ordered on each chunk's start index. This makes it
+// easy to order the downloaded leaves when using multiple parallell fetchers.
+//
+// Credit: inspiration to use a heap from Aaron Gable, see
+// https://github.com/aarongable/ctaudit
+//
+
+import (
+ "container/heap"
+ "crypto/sha256"
+)
+
+type chunk struct {
+ startIndex uint64 // Index of the first leaf
+ leafHashes [][sha256.Size]byte // List of consecutive leaf hashes
+ matches []LogEntry // Leaves that match some criteria
+ errors []error // Errors that ocurred while matching on the leaves
+}
+
+type chunks []*chunk
+
+func newChunks() *chunks {
+ var h chunks
+ heap.Init((*internal)(&h))
+ return &h
+}
+
+func (h *chunks) push(c *chunk) {
+ heap.Push((*internal)(h), c)
+}
+
+func (h *chunks) pop() *chunk {
+ x := heap.Pop((*internal)(h))
+ return x.(*chunk)
+}
+
+// gap returns true if there's a gap between the provided start index and the
+// top most chunk. If the top most chunk is in sequence, it is merged with
+// any following chunks that are also in sequence to form one larger chunk.
+func (h *chunks) gap(start uint64) bool {
+ if len(*h) == 0 {
+ return true
+ }
+
+ top := h.pop()
+ if start != top.startIndex {
+ h.push(top)
+ return true
+ }
+
+ for len(*h) > 0 {
+ c := h.pop()
+ if c.startIndex != top.startIndex+uint64(len(top.leafHashes)) {
+ h.push(c)
+ break
+ }
+
+ top.leafHashes = append(top.leafHashes, c.leafHashes...)
+ top.matches = append(top.matches, c.matches...)
+ top.errors = append(top.errors, c.errors...)
+ }
+
+ h.push(top)
+ return false
+}
+
+// internal implements the heap interface, see example:
+// https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/container/heap/example_intheap_test.go
+type internal chunks
+
+func (h internal) Len() int { return len(h) }
+func (h internal) Less(i, j int) bool { return h[i].startIndex < h[j].startIndex }
+func (h internal) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *internal) Push(x any) {
+ *h = append(*h, x.(*chunk))
+}
+
+func (h *internal) Pop() any {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ old[n-1] = nil // avoid memory leak
+ *h = old[:n-1]
+ return x
+}
diff --git a/internal/monitor/matcher.go b/internal/monitor/matcher.go
new file mode 100644
index 0000000..912e595
--- /dev/null
+++ b/internal/monitor/matcher.go
@@ -0,0 +1,13 @@
+package monitor
+
+type Matcher interface {
+ // Match determines if a log entry is considered to be a "match" based on
+ // some criteria. An error is returned if any certificate parsing fails.
+ Match(leafInput, extraData []byte) (bool, error)
+}
+
+type MatchAll struct{}
+
+func (m *MatchAll) Match(leafInput, extraData []byte) (bool, error) {
+ return true, nil
+}
diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go
new file mode 100644
index 0000000..6accd97
--- /dev/null
+++ b/internal/monitor/monitor.go
@@ -0,0 +1,173 @@
+// Package monitor provides monitoring of Certificate Transparency logs. If
+// running in continuous mode, the list of logs can be updated dynamically.
+//
+// Implement the Matcher interface to customize which certificates should be
+// included in the monitor's emitted events. See MatchAll for an example.
+//
+// Note that this package verifies that the monitored logs are locally
+// consistent with regard to the initial start-up state. It is up to the user
+// to process the monitor's emitted events and errors, and to persist state.
+package monitor
+
+import (
+ "context"
+ "crypto/x509"
+ "encoding/base64"
+ "fmt"
+ "net/http"
+ "os"
+ "sync"
+
+ ct "github.com/google/certificate-transparency-go"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/jsonclient"
+ "gitlab.torproject.org/rgdd/ct/pkg/metadata"
+ "rgdd.se/silent-ct/internal/logger"
+)
+
+// MonitoredLog provides information about a log the monitor is following
+type MonitoredLog struct {
+ Config metadata.Log
+ State State
+}
+
+// State is the latest append-only state the monitor observed from its local
+// vantage point. The next entry to download is specified by NextIndex.
+type State struct {
+ ct.SignedTreeHead `json:"latest_sth"`
+ NextIndex uint64 `json:"next_index"`
+}
+
+// Event carries the latest consistent monitor state, found matches, as well as
+// errors that occurred while trying to match on the downloaded log entries.
+type Event struct {
+ State State
+ Matches []LogEntry
+ Errors []error
+}
+
+func (ev *Event) Summary() string {
+ return fmt.Sprintf("log %s: tree size %d at next index %d (%d matches, %d errors)",
+ base64.StdEncoding.EncodeToString(ev.State.LogID[:]),
+ ev.State.TreeSize, ev.State.NextIndex, len(ev.Matches), len(ev.Errors))
+}
+
+// LogEntry is a Merkle tree leaf in a log
+type LogEntry struct {
+ LeafIndex uint64 `json:"leaf_index"`
+ LeafData []byte `json:"leaf_data"`
+ ExtraData []byte `json:"extra_data"`
+}
+
+type Config struct {
+ // Optional
+ Matcher Matcher // Which log entries to match (default is to match all)
+ Logger logger.Logger // Debug prints only (no output by default)
+ Contact string // Something that help log operators get in touch
+ ChunkSize uint // Min number of leaves to propagate a chunk without matches
+ BatchSize uint // Max number of certificates to accept per worker
+ NumWorkers uint // Number of parallel workers to use for each log
+}
+
+type Monitor struct {
+ cfg Config
+ matcher Matcher
+
+ eventCh chan Event
+ configCh chan MonitoredLog
+ errorCh chan error
+}
+
+func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error) (Monitor, error) {
+ if cfg.Matcher == nil {
+ cfg.Matcher = &MatchAll{}
+ }
+ if !cfg.Logger.IsConfigured() {
+ cfg.Logger = logger.New(logger.Config{Level: logger.LevelNotice, File: os.Stderr})
+ }
+ if cfg.Contact == "" {
+ cfg.Contact = "unknown-user"
+ }
+ if cfg.ChunkSize == 0 {
+ cfg.ChunkSize = 256 // FIXME: 16364
+ }
+ if cfg.BatchSize == 0 {
+ cfg.BatchSize = 1024
+ }
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = 2
+ }
+ return Monitor{cfg: cfg, matcher: cfg.Matcher, eventCh: evCh, configCh: cfgCh, errorCh: errCh}, nil
+}
+
+func (mon *Monitor) RunOnce(ctx context.Context, cfg []MonitoredLog, evCh chan Event, errCh chan error) error {
+ return fmt.Errorf("TODO")
+}
+
+func (mon *Monitor) RunForever(ctx context.Context) error {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ mctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ monitoring := make(map[metadata.LogURL]context.CancelFunc)
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case log := <-mon.configCh:
+ if tcancel, ok := monitoring[log.Config.URL]; ok {
+ delete(monitoring, log.Config.URL)
+ tcancel()
+ continue
+ }
+
+ newTail := mon.newTailRFC6962
+ if log.Config.DNS != nil { // FIXME: get a real nob for tile-based logs
+ newTail = mon.newTailTile
+ }
+ t, err := newTail(log)
+ if err != nil {
+ return err
+ }
+
+ tctx, tcancel := context.WithCancel(mctx)
+ monitoring[log.Config.URL] = tcancel
+
+ wg.Add(1)
+ go func(log MonitoredLog, t tail) {
+ defer wg.Done()
+ defer tcancel()
+ t.run(tctx, log, mon.eventCh, mon.errorCh)
+ }(log, t)
+ }
+ }
+}
+
+const userAgentPrefix = "rgdd.se/silent-ct"
+
+func (mon *Monitor) newTailRFC6962(log MonitoredLog) (tail, error) {
+ key, err := x509.MarshalPKIXPublicKey(log.Config.Key.Public)
+ if err != nil {
+ return tail{}, err
+ }
+ cli, err := client.New(string(log.Config.URL), &http.Client{}, jsonclient.Options{
+ Logger: &discard{},
+ UserAgent: userAgentPrefix + "/" + mon.cfg.Contact,
+ PublicKeyDER: key,
+ })
+ if err != nil {
+ return tail{}, err
+ }
+
+ return tail{cfg: mon.cfg, scanner: cli, checker: cli, matcher: mon.matcher}, nil
+}
+
+func (mon *Monitor) newTailTile(cfg MonitoredLog) (tail, error) {
+ return tail{}, fmt.Errorf("TODO")
+}
+
+type discard struct{}
+
+func (n *discard) Printf(string, ...interface{}) {}
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go
new file mode 100644
index 0000000..0e16476
--- /dev/null
+++ b/internal/monitor/tail.go
@@ -0,0 +1,200 @@
+package monitor
+
+import (
+ "context"
+ "crypto/sha256"
+ "fmt"
+ "sync"
+ "time"
+
+ ct "github.com/google/certificate-transparency-go"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/scanner"
+ "gitlab.torproject.org/rgdd/ct/pkg/merkle"
+)
+
+type tail struct {
+ cfg Config
+ matcher Matcher
+ scanner scanner.LogClient
+ checker client.CheckLogClient
+}
+
+func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error) {
+ chunkCh := make(chan *chunk)
+ defer close(chunkCh)
+
+ mctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ callback := func(eb scanner.EntryBatch) {
+ c := chunk{startIndex: uint64(eb.Start)}
+ for i := 0; i < len(eb.Entries); i++ {
+ c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
+ match, err := t.matcher.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData)
+ if err != nil {
+ c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, mon.Config.URL, err))
+ continue
+ }
+ if !match {
+ continue
+ }
+
+ c.matches = append(c.matches, LogEntry{
+ LeafIndex: c.startIndex + uint64(i),
+ LeafData: eb.Entries[i].LeafInput,
+ ExtraData: eb.Entries[i].ExtraData,
+ })
+ }
+
+ chunkCh <- &c
+ }
+
+ fetcher := scanner.NewFetcher(t.scanner, &scanner.FetcherOptions{
+ BatchSize: int(t.cfg.BatchSize),
+ StartIndex: int64(mon.State.NextIndex),
+ ParallelFetch: int(t.cfg.NumWorkers),
+ Continuous: true, // FIXME: don't set this for read-only log
+ })
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ fetcher.Run(mctx, callback)
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ t.sequence(mctx, mon, eventCh, errorCh, chunkCh)
+ }()
+}
+
+func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) {
+ state := mon.State
+ heap := newChunks()
+ for {
+ select {
+ case <-ctx.Done():
+ return // FIXME: check if we can pop something before return
+ case c := <-chunkCh:
+ heap.push(c)
+ if heap.gap(state.NextIndex) {
+ continue
+ }
+ c = heap.pop()
+ if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) {
+ heap.push(c)
+ continue // FIXME: don't trigger if we havn't run nextState for too long
+ }
+ nextState, err := t.nextState(ctx, state, c)
+ if err != nil {
+ errorCh <- err
+ heap.push(c)
+ continue
+ }
+
+ state = nextState
+ eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
+ }
+ }
+}
+
+func (t *tail) nextState(ctx context.Context, state State, c *chunk) (State, error) {
+ newState, err := t.nextConsistentState(ctx, state)
+ if err != nil {
+ return State{}, err
+ }
+ newState, err = t.nextIncludedState(ctx, newState, c)
+ if err != nil {
+ return State{}, err
+ }
+ return newState, nil
+}
+
+func (t *tail) nextConsistentState(ctx context.Context, state State) (State, error) {
+ sth, err := getSignedTreeHead(ctx, t.checker)
+ if err != nil {
+ return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err)
+ }
+ sth.LogID = state.SignedTreeHead.LogID
+ oldSize := state.TreeSize
+ oldRoot := state.SHA256RootHash
+ newSize := sth.TreeSize
+ newRoot := sth.SHA256RootHash
+
+ proof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize)
+ if err != nil {
+ return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err)
+ }
+ if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil {
+ return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err)
+ }
+
+ return State{SignedTreeHead: *sth, NextIndex: state.NextIndex}, nil
+}
+
+func (t *tail) nextIncludedState(ctx context.Context, state State, c *chunk) (State, error) {
+ leafHash := c.leafHashes[0]
+ oldSize := state.NextIndex + uint64(len(c.leafHashes))
+ iproof, err := getInclusionProof(ctx, t.checker, leafHash, oldSize) // FIXME: set leaf index in ctx to hack into tile API
+ if err != nil {
+ return State{}, fmt.Errorf("%s: get-inclusion: %v", t.checker.BaseURI(), err)
+ }
+ if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want {
+ return State{}, fmt.Errorf("%s: wrong index for get-inclusion proof query %x:%d", t.checker.BaseURI(), leafHash[:], oldSize)
+ }
+ oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath))
+ if err != nil {
+ return State{}, fmt.Errorf("%s: range proof: %v", t.checker.BaseURI(), err)
+ }
+
+ newSize := state.TreeSize
+ newRoot := state.SHA256RootHash
+ cproof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize)
+ if err != nil {
+ return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err)
+ }
+ if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil {
+ return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err)
+ }
+
+ state.NextIndex += uint64(len(c.leafHashes))
+ return state, nil
+}
+
+func getInclusionProof(ctx context.Context, cli client.CheckLogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) {
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetProofByHash(rctx, leafHash[:], size)
+}
+
+func getConsistencyProof(ctx context.Context, cli client.CheckLogClient, oldSize, newSize uint64) ([][]byte, error) {
+ if oldSize == 0 || oldSize >= newSize {
+ return [][]byte{}, nil
+ }
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetSTHConsistency(rctx, oldSize, newSize)
+}
+
+func getSignedTreeHead(ctx context.Context, cli client.CheckLogClient) (*ct.SignedTreeHead, error) {
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetSTH(rctx)
+}
+
+func unslice(hashes [][]byte) [][sha256.Size]byte {
+ var ret [][sha256.Size]byte
+ for _, hash := range hashes {
+ var h [sha256.Size]byte
+ copy(h[:], hash)
+ ret = append(ret, h)
+ }
+ return ret
+}