From 895d5fea41177e444c18f4fdc820fffa5f67d5bf Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 9 Dec 2023 17:08:45 +0100 Subject: Add drafty skeleton --- pkg/monitor/monitor.go | 286 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 pkg/monitor/monitor.go (limited to 'pkg/monitor/monitor.go') diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go new file mode 100644 index 0000000..5f7a629 --- /dev/null +++ b/pkg/monitor/monitor.go @@ -0,0 +1,286 @@ +// Package monitor provides a Certificate Transparency monitor that tails a list +// of logs which can be updated dynamically while running. All emitted progress +// messages have been verified by the monitor to be included in the log's +// append-only Merkle tree with regard to the initial start-up state. It is up +// to the user to process the monitor's progress, errors, and persist state. +// +// Implement the Matcher interface to customize which certificates should be +// included in a log's progress messages, or use any of the existing matchers +// provided by this package (see for example MatchAll and MatchWildcards). +package monitor + +import ( + "context" + "crypto/sha256" + "fmt" + "net/http" + "sync" + "time" + + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "github.com/google/certificate-transparency-go/scanner" + "rgdd.se/silent-ct/internal/merkle" +) + +const ( + UserAgentPrefix = "rgdd.se/silent-ct" + DefaultContact = "unknown-user" + DefaultChunkSize = 256 // TODO: increase me + DefaultBatchSize = 128 // TODO: increase me + DefaultNumWorkers = 2 +) + +type Config struct { + Contact string // Something that help log operators get in touch + ChunkSize int // Min number of leaves to propagate a chunk without matches + BatchSize int // Max number of certificates to accept per worker + NumWorkers int // Number of parallel workers to use for each log + + // Callback determines which certificates are interesting to detect + Callback Matcher +} + +type Monitor struct { + Config +} + +func New(cfg Config) (Monitor, error) { + if cfg.Contact == "" { + cfg.Contact = "unknown-user" + } + if cfg.ChunkSize <= 0 { + cfg.ChunkSize = DefaultChunkSize + } + if cfg.BatchSize <= 0 { + cfg.BatchSize = DefaultBatchSize + } + if cfg.NumWorkers <= 0 { + cfg.NumWorkers = DefaultNumWorkers + } + if cfg.Callback == nil { + cfg.Callback = &MatchAll{} + } + return Monitor{Config: cfg}, nil +} + +func (mon *Monitor) Run(ctx context.Context, metadataCh chan []MessageLogConfig, eventCh chan MessageLogProgress, errorCh chan error) { + var wg sync.WaitGroup + var sctx context.Context + var cancel context.CancelFunc + + for { + select { + case <-ctx.Done(): + return + case metadata := <-metadataCh: + fmt.Printf("DEBUG: received new list with %d logs\n", len(metadata)) + if cancel != nil { + fmt.Printf("DEBUG: stopping all log tailers\n") + cancel() + wg.Wait() + } + + sctx, cancel = context.WithCancel(ctx) + for _, md := range metadata { + fmt.Printf("DEBUG: starting log tailer %s\n", md.Metadata.URL) + wg.Add(1) + go func(lcfg MessageLogConfig) { + defer wg.Done() + + opts := jsonclient.Options{Logger: &noout{}, UserAgent: UserAgentPrefix + ":" + mon.Contact} + cli, err := client.New(string(lcfg.Metadata.URL), &http.Client{}, opts) + if err != nil { + errorCh <- fmt.Errorf("unable to configure %s: %v", lcfg.Metadata.URL, err) + return + } + + chunkCh := make(chan *chunk) + defer close(chunkCh) + + t := tail{mon.Config, *cli, chunkCh, eventCh, errorCh} + if err := t.run(sctx, lcfg.State); err != nil { + errorCh <- fmt.Errorf("unable to continue tailing %s: %v", lcfg.Metadata.URL, err) + } + }(md) + } + } + } +} + +type tail struct { + mcfg Config + cli client.LogClient + + chunkCh chan *chunk + eventCh chan MessageLogProgress + errorCh chan error +} + +func (t *tail) run(ctx context.Context, state MonitorState) error { + var wg sync.WaitGroup + defer wg.Wait() + + mctx, cancel := context.WithCancel(ctx) + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + t.sequence(mctx, state) + }() + + fetcher := scanner.NewFetcher(&t.cli, &scanner.FetcherOptions{ + BatchSize: t.mcfg.BatchSize, + StartIndex: int64(state.NextIndex), + ParallelFetch: t.mcfg.NumWorkers, + Continuous: true, + }) + callback := func(eb scanner.EntryBatch) { + c := chunk{startIndex: uint64(eb.Start)} + for i := 0; i < len(eb.Entries); i++ { + c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) + match, err := t.mcfg.Callback.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData) + if err != nil { + c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, t.cli.BaseURI(), err)) + } else if match { + c.matches = append(c.matches, LogEntry{ + LeafIndex: uint64(i), + LeafData: eb.Entries[i].LeafInput, + ExtraData: eb.Entries[i].ExtraData, + }) + } + } + t.chunkCh <- &c + } + return fetcher.Run(mctx, callback) +} + +func (t *tail) sequence(ctx context.Context, state MonitorState) { + heap := newChunks() + for { + select { + case <-ctx.Done(): + return + case c := <-t.chunkCh: + heap.push(c) + if heap.gap(state.NextIndex) { + continue + } + c = heap.pop() + if len(c.matches) == 0 && len(c.leafHashes) < t.mcfg.ChunkSize { + heap.push(c) + continue // TODO: don't trigger if we havn't run nextState for too long + } + nextState, err := t.nextState(ctx, state, c) + if err != nil { + t.errorCh <- err + heap.push(c) + continue + } + + state = nextState + t.eventCh <- MessageLogProgress{State: state, Matches: c.matches, Errors: c.errors} + } + } +} + +func (t *tail) nextState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) { + newState, err := t.nextConsistentState(ctx, state) + if err != nil { + return MonitorState{}, err + } + newState, err = t.nextIncludedState(ctx, state, c) + if err != nil { + return MonitorState{}, err + } + return newState, nil +} + +func (t *tail) nextConsistentState(ctx context.Context, state MonitorState) (MonitorState, error) { + sth, err := getSignedTreeHead(ctx, &t.cli) + if err != nil { + return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-sth", Err: err} + } + oldSize := state.TreeSize + oldRoot := state.SHA256RootHash + newSize := sth.TreeSize + newRoot := sth.SHA256RootHash + + proof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize) + if err != nil { + return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", STH: sth, Err: err} + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil { + return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", STH: sth, Err: err} + } + + fmt.Printf("DEBUG: consistently updated STH from size %d to %d\n", oldSize, newSize) + return MonitorState{LogState: LogState{*sth}, NextIndex: state.NextIndex}, nil +} + +func (t *tail) nextIncludedState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) { + leafHash := c.leafHashes[0] + oldSize := state.NextIndex + uint64(len(c.leafHashes)) + iproof, err := getInclusionProof(ctx, &t.cli, leafHash, oldSize) + if err != nil { + err = fmt.Errorf("leaf hash %x and tree size %d: %v", leafHash[:], oldSize, err) + return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-inclusion", Err: err} + } + if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want { + err := fmt.Errorf("leaf hash %x and tree size %d: expected leaf index %d but got %d", leafHash[:], oldSize, got, want) + return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "proof-index", Err: err} + } + oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath)) + if err != nil { + return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "inclusion", Err: err} + } + + newSize := state.TreeSize + newRoot := state.SHA256RootHash + cproof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize) + if err != nil { + err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err) + return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", Err: err} + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil { + err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err) + return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", Err: err} + } + + state.NextIndex += uint64(len(c.leafHashes)) + return state, nil +} + +func getInclusionProof(ctx context.Context, cli *client.LogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetProofByHash(rctx, leafHash[:], size) +} + +func getConsistencyProof(ctx context.Context, cli *client.LogClient, oldSize, newSize uint64) ([][]byte, error) { + if oldSize == 0 || oldSize >= newSize { + return [][]byte{}, nil + } + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTHConsistency(rctx, oldSize, newSize) +} + +func getSignedTreeHead(ctx context.Context, cli *client.LogClient) (*ct.SignedTreeHead, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTH(rctx) +} + +func unslice(hashes [][]byte) [][sha256.Size]byte { + var ret [][sha256.Size]byte + for _, hash := range hashes { + var h [sha256.Size]byte + copy(h[:], hash) + ret = append(ret, h) + } + return ret +} -- cgit v1.2.3