diff options
Diffstat (limited to 'internal/monitor')
-rw-r--r-- | internal/monitor/chunks.go | 89 | ||||
-rw-r--r-- | internal/monitor/matcher.go | 13 | ||||
-rw-r--r-- | internal/monitor/monitor.go | 173 | ||||
-rw-r--r-- | internal/monitor/tail.go | 200 |
4 files changed, 475 insertions, 0 deletions
diff --git a/internal/monitor/chunks.go b/internal/monitor/chunks.go new file mode 100644 index 0000000..02b3802 --- /dev/null +++ b/internal/monitor/chunks.go @@ -0,0 +1,89 @@ +package monitor + +// +// A min heap of chunks, ordered on each chunk's start index. This makes it +// easy to order the downloaded leaves when using multiple parallell fetchers. +// +// Credit: inspiration to use a heap from Aaron Gable, see +// https://github.com/aarongable/ctaudit +// + +import ( + "container/heap" + "crypto/sha256" +) + +type chunk struct { + startIndex uint64 // Index of the first leaf + leafHashes [][sha256.Size]byte // List of consecutive leaf hashes + matches []LogEntry // Leaves that match some criteria + errors []error // Errors that ocurred while matching on the leaves +} + +type chunks []*chunk + +func newChunks() *chunks { + var h chunks + heap.Init((*internal)(&h)) + return &h +} + +func (h *chunks) push(c *chunk) { + heap.Push((*internal)(h), c) +} + +func (h *chunks) pop() *chunk { + x := heap.Pop((*internal)(h)) + return x.(*chunk) +} + +// gap returns true if there's a gap between the provided start index and the +// top most chunk. If the top most chunk is in sequence, it is merged with +// any following chunks that are also in sequence to form one larger chunk. +func (h *chunks) gap(start uint64) bool { + if len(*h) == 0 { + return true + } + + top := h.pop() + if start != top.startIndex { + h.push(top) + return true + } + + for len(*h) > 0 { + c := h.pop() + if c.startIndex != top.startIndex+uint64(len(top.leafHashes)) { + h.push(c) + break + } + + top.leafHashes = append(top.leafHashes, c.leafHashes...) + top.matches = append(top.matches, c.matches...) + top.errors = append(top.errors, c.errors...) + } + + h.push(top) + return false +} + +// internal implements the heap interface, see example: +// https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/container/heap/example_intheap_test.go +type internal chunks + +func (h internal) Len() int { return len(h) } +func (h internal) Less(i, j int) bool { return h[i].startIndex < h[j].startIndex } +func (h internal) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *internal) Push(x any) { + *h = append(*h, x.(*chunk)) +} + +func (h *internal) Pop() any { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil // avoid memory leak + *h = old[:n-1] + return x +} diff --git a/internal/monitor/matcher.go b/internal/monitor/matcher.go new file mode 100644 index 0000000..912e595 --- /dev/null +++ b/internal/monitor/matcher.go @@ -0,0 +1,13 @@ +package monitor + +type Matcher interface { + // Match determines if a log entry is considered to be a "match" based on + // some criteria. An error is returned if any certificate parsing fails. + Match(leafInput, extraData []byte) (bool, error) +} + +type MatchAll struct{} + +func (m *MatchAll) Match(leafInput, extraData []byte) (bool, error) { + return true, nil +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go new file mode 100644 index 0000000..6accd97 --- /dev/null +++ b/internal/monitor/monitor.go @@ -0,0 +1,173 @@ +// Package monitor provides monitoring of Certificate Transparency logs. If +// running in continuous mode, the list of logs can be updated dynamically. +// +// Implement the Matcher interface to customize which certificates should be +// included in the monitor's emitted events. See MatchAll for an example. +// +// Note that this package verifies that the monitored logs are locally +// consistent with regard to the initial start-up state. It is up to the user +// to process the monitor's emitted events and errors, and to persist state. +package monitor + +import ( + "context" + "crypto/x509" + "encoding/base64" + "fmt" + "net/http" + "os" + "sync" + + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "gitlab.torproject.org/rgdd/ct/pkg/metadata" + "rgdd.se/silent-ct/internal/logger" +) + +// MonitoredLog provides information about a log the monitor is following +type MonitoredLog struct { + Config metadata.Log + State State +} + +// State is the latest append-only state the monitor observed from its local +// vantage point. The next entry to download is specified by NextIndex. +type State struct { + ct.SignedTreeHead `json:"latest_sth"` + NextIndex uint64 `json:"next_index"` +} + +// Event carries the latest consistent monitor state, found matches, as well as +// errors that occurred while trying to match on the downloaded log entries. +type Event struct { + State State + Matches []LogEntry + Errors []error +} + +func (ev *Event) Summary() string { + return fmt.Sprintf("log %s: tree size %d at next index %d (%d matches, %d errors)", + base64.StdEncoding.EncodeToString(ev.State.LogID[:]), + ev.State.TreeSize, ev.State.NextIndex, len(ev.Matches), len(ev.Errors)) +} + +// LogEntry is a Merkle tree leaf in a log +type LogEntry struct { + LeafIndex uint64 `json:"leaf_index"` + LeafData []byte `json:"leaf_data"` + ExtraData []byte `json:"extra_data"` +} + +type Config struct { + // Optional + Matcher Matcher // Which log entries to match (default is to match all) + Logger logger.Logger // Debug prints only (no output by default) + Contact string // Something that help log operators get in touch + ChunkSize uint // Min number of leaves to propagate a chunk without matches + BatchSize uint // Max number of certificates to accept per worker + NumWorkers uint // Number of parallel workers to use for each log +} + +type Monitor struct { + cfg Config + matcher Matcher + + eventCh chan Event + configCh chan MonitoredLog + errorCh chan error +} + +func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error) (Monitor, error) { + if cfg.Matcher == nil { + cfg.Matcher = &MatchAll{} + } + if !cfg.Logger.IsConfigured() { + cfg.Logger = logger.New(logger.Config{Level: logger.LevelNotice, File: os.Stderr}) + } + if cfg.Contact == "" { + cfg.Contact = "unknown-user" + } + if cfg.ChunkSize == 0 { + cfg.ChunkSize = 256 // FIXME: 16364 + } + if cfg.BatchSize == 0 { + cfg.BatchSize = 1024 + } + if cfg.NumWorkers == 0 { + cfg.NumWorkers = 2 + } + return Monitor{cfg: cfg, matcher: cfg.Matcher, eventCh: evCh, configCh: cfgCh, errorCh: errCh}, nil +} + +func (mon *Monitor) RunOnce(ctx context.Context, cfg []MonitoredLog, evCh chan Event, errCh chan error) error { + return fmt.Errorf("TODO") +} + +func (mon *Monitor) RunForever(ctx context.Context) error { + var wg sync.WaitGroup + defer wg.Wait() + + mctx, cancel := context.WithCancel(ctx) + defer cancel() + + monitoring := make(map[metadata.LogURL]context.CancelFunc) + for { + select { + case <-ctx.Done(): + return nil + case log := <-mon.configCh: + if tcancel, ok := monitoring[log.Config.URL]; ok { + delete(monitoring, log.Config.URL) + tcancel() + continue + } + + newTail := mon.newTailRFC6962 + if log.Config.DNS != nil { // FIXME: get a real nob for tile-based logs + newTail = mon.newTailTile + } + t, err := newTail(log) + if err != nil { + return err + } + + tctx, tcancel := context.WithCancel(mctx) + monitoring[log.Config.URL] = tcancel + + wg.Add(1) + go func(log MonitoredLog, t tail) { + defer wg.Done() + defer tcancel() + t.run(tctx, log, mon.eventCh, mon.errorCh) + }(log, t) + } + } +} + +const userAgentPrefix = "rgdd.se/silent-ct" + +func (mon *Monitor) newTailRFC6962(log MonitoredLog) (tail, error) { + key, err := x509.MarshalPKIXPublicKey(log.Config.Key.Public) + if err != nil { + return tail{}, err + } + cli, err := client.New(string(log.Config.URL), &http.Client{}, jsonclient.Options{ + Logger: &discard{}, + UserAgent: userAgentPrefix + "/" + mon.cfg.Contact, + PublicKeyDER: key, + }) + if err != nil { + return tail{}, err + } + + return tail{cfg: mon.cfg, scanner: cli, checker: cli, matcher: mon.matcher}, nil +} + +func (mon *Monitor) newTailTile(cfg MonitoredLog) (tail, error) { + return tail{}, fmt.Errorf("TODO") +} + +type discard struct{} + +func (n *discard) Printf(string, ...interface{}) {} diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go new file mode 100644 index 0000000..0e16476 --- /dev/null +++ b/internal/monitor/tail.go @@ -0,0 +1,200 @@ +package monitor + +import ( + "context" + "crypto/sha256" + "fmt" + "sync" + "time" + + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/scanner" + "gitlab.torproject.org/rgdd/ct/pkg/merkle" +) + +type tail struct { + cfg Config + matcher Matcher + scanner scanner.LogClient + checker client.CheckLogClient +} + +func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error) { + chunkCh := make(chan *chunk) + defer close(chunkCh) + + mctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + defer wg.Wait() + + callback := func(eb scanner.EntryBatch) { + c := chunk{startIndex: uint64(eb.Start)} + for i := 0; i < len(eb.Entries); i++ { + c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) + match, err := t.matcher.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData) + if err != nil { + c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, mon.Config.URL, err)) + continue + } + if !match { + continue + } + + c.matches = append(c.matches, LogEntry{ + LeafIndex: c.startIndex + uint64(i), + LeafData: eb.Entries[i].LeafInput, + ExtraData: eb.Entries[i].ExtraData, + }) + } + + chunkCh <- &c + } + + fetcher := scanner.NewFetcher(t.scanner, &scanner.FetcherOptions{ + BatchSize: int(t.cfg.BatchSize), + StartIndex: int64(mon.State.NextIndex), + ParallelFetch: int(t.cfg.NumWorkers), + Continuous: true, // FIXME: don't set this for read-only log + }) + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + fetcher.Run(mctx, callback) + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + t.sequence(mctx, mon, eventCh, errorCh, chunkCh) + }() +} + +func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { + state := mon.State + heap := newChunks() + for { + select { + case <-ctx.Done(): + return // FIXME: check if we can pop something before return + case c := <-chunkCh: + heap.push(c) + if heap.gap(state.NextIndex) { + continue + } + c = heap.pop() + if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + continue // FIXME: don't trigger if we havn't run nextState for too long + } + nextState, err := t.nextState(ctx, state, c) + if err != nil { + errorCh <- err + heap.push(c) + continue + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + } +} + +func (t *tail) nextState(ctx context.Context, state State, c *chunk) (State, error) { + newState, err := t.nextConsistentState(ctx, state) + if err != nil { + return State{}, err + } + newState, err = t.nextIncludedState(ctx, newState, c) + if err != nil { + return State{}, err + } + return newState, nil +} + +func (t *tail) nextConsistentState(ctx context.Context, state State) (State, error) { + sth, err := getSignedTreeHead(ctx, t.checker) + if err != nil { + return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err) + } + sth.LogID = state.SignedTreeHead.LogID + oldSize := state.TreeSize + oldRoot := state.SHA256RootHash + newSize := sth.TreeSize + newRoot := sth.SHA256RootHash + + proof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize) + if err != nil { + return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err) + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil { + return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err) + } + + return State{SignedTreeHead: *sth, NextIndex: state.NextIndex}, nil +} + +func (t *tail) nextIncludedState(ctx context.Context, state State, c *chunk) (State, error) { + leafHash := c.leafHashes[0] + oldSize := state.NextIndex + uint64(len(c.leafHashes)) + iproof, err := getInclusionProof(ctx, t.checker, leafHash, oldSize) // FIXME: set leaf index in ctx to hack into tile API + if err != nil { + return State{}, fmt.Errorf("%s: get-inclusion: %v", t.checker.BaseURI(), err) + } + if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want { + return State{}, fmt.Errorf("%s: wrong index for get-inclusion proof query %x:%d", t.checker.BaseURI(), leafHash[:], oldSize) + } + oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath)) + if err != nil { + return State{}, fmt.Errorf("%s: range proof: %v", t.checker.BaseURI(), err) + } + + newSize := state.TreeSize + newRoot := state.SHA256RootHash + cproof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize) + if err != nil { + return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err) + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil { + return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err) + } + + state.NextIndex += uint64(len(c.leafHashes)) + return state, nil +} + +func getInclusionProof(ctx context.Context, cli client.CheckLogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetProofByHash(rctx, leafHash[:], size) +} + +func getConsistencyProof(ctx context.Context, cli client.CheckLogClient, oldSize, newSize uint64) ([][]byte, error) { + if oldSize == 0 || oldSize >= newSize { + return [][]byte{}, nil + } + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTHConsistency(rctx, oldSize, newSize) +} + +func getSignedTreeHead(ctx context.Context, cli client.CheckLogClient) (*ct.SignedTreeHead, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTH(rctx) +} + +func unslice(hashes [][]byte) [][sha256.Size]byte { + var ret [][sha256.Size]byte + for _, hash := range hashes { + var h [sha256.Size]byte + copy(h[:], hash) + ret = append(ret, h) + } + return ret +} |