aboutsummaryrefslogtreecommitdiff
path: root/pkg/monitor/monitor.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/monitor/monitor.go')
-rw-r--r--pkg/monitor/monitor.go286
1 files changed, 286 insertions, 0 deletions
diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go
new file mode 100644
index 0000000..5f7a629
--- /dev/null
+++ b/pkg/monitor/monitor.go
@@ -0,0 +1,286 @@
+// Package monitor provides a Certificate Transparency monitor that tails a list
+// of logs which can be updated dynamically while running. All emitted progress
+// messages have been verified by the monitor to be included in the log's
+// append-only Merkle tree with regard to the initial start-up state. It is up
+// to the user to process the monitor's progress, errors, and persist state.
+//
+// Implement the Matcher interface to customize which certificates should be
+// included in a log's progress messages, or use any of the existing matchers
+// provided by this package (see for example MatchAll and MatchWildcards).
+package monitor
+
+import (
+ "context"
+ "crypto/sha256"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ ct "github.com/google/certificate-transparency-go"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/jsonclient"
+ "github.com/google/certificate-transparency-go/scanner"
+ "rgdd.se/silent-ct/internal/merkle"
+)
+
+const (
+ UserAgentPrefix = "rgdd.se/silent-ct"
+ DefaultContact = "unknown-user"
+ DefaultChunkSize = 256 // TODO: increase me
+ DefaultBatchSize = 128 // TODO: increase me
+ DefaultNumWorkers = 2
+)
+
+type Config struct {
+ Contact string // Something that help log operators get in touch
+ ChunkSize int // Min number of leaves to propagate a chunk without matches
+ BatchSize int // Max number of certificates to accept per worker
+ NumWorkers int // Number of parallel workers to use for each log
+
+ // Callback determines which certificates are interesting to detect
+ Callback Matcher
+}
+
+type Monitor struct {
+ Config
+}
+
+func New(cfg Config) (Monitor, error) {
+ if cfg.Contact == "" {
+ cfg.Contact = "unknown-user"
+ }
+ if cfg.ChunkSize <= 0 {
+ cfg.ChunkSize = DefaultChunkSize
+ }
+ if cfg.BatchSize <= 0 {
+ cfg.BatchSize = DefaultBatchSize
+ }
+ if cfg.NumWorkers <= 0 {
+ cfg.NumWorkers = DefaultNumWorkers
+ }
+ if cfg.Callback == nil {
+ cfg.Callback = &MatchAll{}
+ }
+ return Monitor{Config: cfg}, nil
+}
+
+func (mon *Monitor) Run(ctx context.Context, metadataCh chan []MessageLogConfig, eventCh chan MessageLogProgress, errorCh chan error) {
+ var wg sync.WaitGroup
+ var sctx context.Context
+ var cancel context.CancelFunc
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case metadata := <-metadataCh:
+ fmt.Printf("DEBUG: received new list with %d logs\n", len(metadata))
+ if cancel != nil {
+ fmt.Printf("DEBUG: stopping all log tailers\n")
+ cancel()
+ wg.Wait()
+ }
+
+ sctx, cancel = context.WithCancel(ctx)
+ for _, md := range metadata {
+ fmt.Printf("DEBUG: starting log tailer %s\n", md.Metadata.URL)
+ wg.Add(1)
+ go func(lcfg MessageLogConfig) {
+ defer wg.Done()
+
+ opts := jsonclient.Options{Logger: &noout{}, UserAgent: UserAgentPrefix + ":" + mon.Contact}
+ cli, err := client.New(string(lcfg.Metadata.URL), &http.Client{}, opts)
+ if err != nil {
+ errorCh <- fmt.Errorf("unable to configure %s: %v", lcfg.Metadata.URL, err)
+ return
+ }
+
+ chunkCh := make(chan *chunk)
+ defer close(chunkCh)
+
+ t := tail{mon.Config, *cli, chunkCh, eventCh, errorCh}
+ if err := t.run(sctx, lcfg.State); err != nil {
+ errorCh <- fmt.Errorf("unable to continue tailing %s: %v", lcfg.Metadata.URL, err)
+ }
+ }(md)
+ }
+ }
+ }
+}
+
+type tail struct {
+ mcfg Config
+ cli client.LogClient
+
+ chunkCh chan *chunk
+ eventCh chan MessageLogProgress
+ errorCh chan error
+}
+
+func (t *tail) run(ctx context.Context, state MonitorState) error {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ mctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer cancel()
+ t.sequence(mctx, state)
+ }()
+
+ fetcher := scanner.NewFetcher(&t.cli, &scanner.FetcherOptions{
+ BatchSize: t.mcfg.BatchSize,
+ StartIndex: int64(state.NextIndex),
+ ParallelFetch: t.mcfg.NumWorkers,
+ Continuous: true,
+ })
+ callback := func(eb scanner.EntryBatch) {
+ c := chunk{startIndex: uint64(eb.Start)}
+ for i := 0; i < len(eb.Entries); i++ {
+ c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
+ match, err := t.mcfg.Callback.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData)
+ if err != nil {
+ c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, t.cli.BaseURI(), err))
+ } else if match {
+ c.matches = append(c.matches, LogEntry{
+ LeafIndex: uint64(i),
+ LeafData: eb.Entries[i].LeafInput,
+ ExtraData: eb.Entries[i].ExtraData,
+ })
+ }
+ }
+ t.chunkCh <- &c
+ }
+ return fetcher.Run(mctx, callback)
+}
+
+func (t *tail) sequence(ctx context.Context, state MonitorState) {
+ heap := newChunks()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case c := <-t.chunkCh:
+ heap.push(c)
+ if heap.gap(state.NextIndex) {
+ continue
+ }
+ c = heap.pop()
+ if len(c.matches) == 0 && len(c.leafHashes) < t.mcfg.ChunkSize {
+ heap.push(c)
+ continue // TODO: don't trigger if we havn't run nextState for too long
+ }
+ nextState, err := t.nextState(ctx, state, c)
+ if err != nil {
+ t.errorCh <- err
+ heap.push(c)
+ continue
+ }
+
+ state = nextState
+ t.eventCh <- MessageLogProgress{State: state, Matches: c.matches, Errors: c.errors}
+ }
+ }
+}
+
+func (t *tail) nextState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) {
+ newState, err := t.nextConsistentState(ctx, state)
+ if err != nil {
+ return MonitorState{}, err
+ }
+ newState, err = t.nextIncludedState(ctx, state, c)
+ if err != nil {
+ return MonitorState{}, err
+ }
+ return newState, nil
+}
+
+func (t *tail) nextConsistentState(ctx context.Context, state MonitorState) (MonitorState, error) {
+ sth, err := getSignedTreeHead(ctx, &t.cli)
+ if err != nil {
+ return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-sth", Err: err}
+ }
+ oldSize := state.TreeSize
+ oldRoot := state.SHA256RootHash
+ newSize := sth.TreeSize
+ newRoot := sth.SHA256RootHash
+
+ proof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize)
+ if err != nil {
+ return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", STH: sth, Err: err}
+ }
+ if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil {
+ return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", STH: sth, Err: err}
+ }
+
+ fmt.Printf("DEBUG: consistently updated STH from size %d to %d\n", oldSize, newSize)
+ return MonitorState{LogState: LogState{*sth}, NextIndex: state.NextIndex}, nil
+}
+
+func (t *tail) nextIncludedState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) {
+ leafHash := c.leafHashes[0]
+ oldSize := state.NextIndex + uint64(len(c.leafHashes))
+ iproof, err := getInclusionProof(ctx, &t.cli, leafHash, oldSize)
+ if err != nil {
+ err = fmt.Errorf("leaf hash %x and tree size %d: %v", leafHash[:], oldSize, err)
+ return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-inclusion", Err: err}
+ }
+ if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want {
+ err := fmt.Errorf("leaf hash %x and tree size %d: expected leaf index %d but got %d", leafHash[:], oldSize, got, want)
+ return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "proof-index", Err: err}
+ }
+ oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath))
+ if err != nil {
+ return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "inclusion", Err: err}
+ }
+
+ newSize := state.TreeSize
+ newRoot := state.SHA256RootHash
+ cproof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize)
+ if err != nil {
+ err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err)
+ return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", Err: err}
+ }
+ if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil {
+ err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err)
+ return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", Err: err}
+ }
+
+ state.NextIndex += uint64(len(c.leafHashes))
+ return state, nil
+}
+
+func getInclusionProof(ctx context.Context, cli *client.LogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) {
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetProofByHash(rctx, leafHash[:], size)
+}
+
+func getConsistencyProof(ctx context.Context, cli *client.LogClient, oldSize, newSize uint64) ([][]byte, error) {
+ if oldSize == 0 || oldSize >= newSize {
+ return [][]byte{}, nil
+ }
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetSTHConsistency(rctx, oldSize, newSize)
+}
+
+func getSignedTreeHead(ctx context.Context, cli *client.LogClient) (*ct.SignedTreeHead, error) {
+ rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ return cli.GetSTH(rctx)
+}
+
+func unslice(hashes [][]byte) [][sha256.Size]byte {
+ var ret [][sha256.Size]byte
+ for _, hash := range hashes {
+ var h [sha256.Size]byte
+ copy(h[:], hash)
+ ret = append(ret, h)
+ }
+ return ret
+}