// Package monitor provides a Certificate Transparency monitor that tails a list // of logs which can be updated dynamically while running. All emitted progress // messages have been verified by the monitor to be included in the log's // append-only Merkle tree with regard to the initial start-up state. It is up // to the user to process the monitor's progress, errors, and persist state. // // Implement the Matcher interface to customize which certificates should be // included in a log's progress messages, or use any of the existing matchers // provided by this package (see for example MatchAll and MatchWildcards). package monitor import ( "context" "crypto/sha256" "fmt" "net/http" "sync" "time" ct "github.com/google/certificate-transparency-go" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/jsonclient" "github.com/google/certificate-transparency-go/scanner" "rgdd.se/silent-ct/internal/merkle" ) const ( UserAgentPrefix = "rgdd.se/silent-ct" DefaultContact = "unknown-user" DefaultChunkSize = 256 // TODO: increase me DefaultBatchSize = 128 // TODO: increase me DefaultNumWorkers = 2 ) type Config struct { Contact string // Something that help log operators get in touch ChunkSize int // Min number of leaves to propagate a chunk without matches BatchSize int // Max number of certificates to accept per worker NumWorkers int // Number of parallel workers to use for each log // Callback determines which certificates are interesting to detect Callback Matcher } type Monitor struct { Config } func New(cfg Config) (Monitor, error) { if cfg.Contact == "" { cfg.Contact = "unknown-user" } if cfg.ChunkSize <= 0 { cfg.ChunkSize = DefaultChunkSize } if cfg.BatchSize <= 0 { cfg.BatchSize = DefaultBatchSize } if cfg.NumWorkers <= 0 { cfg.NumWorkers = DefaultNumWorkers } if cfg.Callback == nil { cfg.Callback = &MatchAll{} } return Monitor{Config: cfg}, nil } func (mon *Monitor) Run(ctx context.Context, metadataCh chan []MessageLogConfig, eventCh chan MessageLogProgress, errorCh chan error) { var wg sync.WaitGroup var sctx context.Context var cancel context.CancelFunc for { select { case <-ctx.Done(): return case metadata := <-metadataCh: fmt.Printf("DEBUG: received new list with %d logs\n", len(metadata)) if cancel != nil { fmt.Printf("DEBUG: stopping all log tailers\n") cancel() wg.Wait() } sctx, cancel = context.WithCancel(ctx) for _, md := range metadata { fmt.Printf("DEBUG: starting log tailer %s\n", md.Metadata.URL) wg.Add(1) go func(lcfg MessageLogConfig) { defer wg.Done() opts := jsonclient.Options{Logger: &noout{}, UserAgent: UserAgentPrefix + ":" + mon.Contact} cli, err := client.New(string(lcfg.Metadata.URL), &http.Client{}, opts) if err != nil { errorCh <- fmt.Errorf("unable to configure %s: %v", lcfg.Metadata.URL, err) return } chunkCh := make(chan *chunk) defer close(chunkCh) t := tail{mon.Config, *cli, chunkCh, eventCh, errorCh} if err := t.run(sctx, lcfg.State); err != nil { errorCh <- fmt.Errorf("unable to continue tailing %s: %v", lcfg.Metadata.URL, err) } }(md) } } } } type tail struct { mcfg Config cli client.LogClient chunkCh chan *chunk eventCh chan MessageLogProgress errorCh chan error } func (t *tail) run(ctx context.Context, state MonitorState) error { var wg sync.WaitGroup defer wg.Wait() mctx, cancel := context.WithCancel(ctx) defer cancel() wg.Add(1) go func() { defer wg.Done() defer cancel() t.sequence(mctx, state) }() fetcher := scanner.NewFetcher(&t.cli, &scanner.FetcherOptions{ BatchSize: t.mcfg.BatchSize, StartIndex: int64(state.NextIndex), ParallelFetch: t.mcfg.NumWorkers, Continuous: true, }) callback := func(eb scanner.EntryBatch) { c := chunk{startIndex: uint64(eb.Start)} for i := 0; i < len(eb.Entries); i++ { c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) match, err := t.mcfg.Callback.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData) if err != nil { c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, t.cli.BaseURI(), err)) } else if match { c.matches = append(c.matches, LogEntry{ LeafIndex: uint64(i), LeafData: eb.Entries[i].LeafInput, ExtraData: eb.Entries[i].ExtraData, }) } } t.chunkCh <- &c } return fetcher.Run(mctx, callback) } func (t *tail) sequence(ctx context.Context, state MonitorState) { heap := newChunks() for { select { case <-ctx.Done(): return case c := <-t.chunkCh: heap.push(c) if heap.gap(state.NextIndex) { continue } c = heap.pop() if len(c.matches) == 0 && len(c.leafHashes) < t.mcfg.ChunkSize { heap.push(c) continue // TODO: don't trigger if we havn't run nextState for too long } nextState, err := t.nextState(ctx, state, c) if err != nil { t.errorCh <- err heap.push(c) continue } state = nextState t.eventCh <- MessageLogProgress{State: state, Matches: c.matches, Errors: c.errors} } } } func (t *tail) nextState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) { newState, err := t.nextConsistentState(ctx, state) if err != nil { return MonitorState{}, err } newState, err = t.nextIncludedState(ctx, state, c) if err != nil { return MonitorState{}, err } return newState, nil } func (t *tail) nextConsistentState(ctx context.Context, state MonitorState) (MonitorState, error) { sth, err := getSignedTreeHead(ctx, &t.cli) if err != nil { return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-sth", Err: err} } oldSize := state.TreeSize oldRoot := state.SHA256RootHash newSize := sth.TreeSize newRoot := sth.SHA256RootHash proof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize) if err != nil { return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", STH: sth, Err: err} } if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil { return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", STH: sth, Err: err} } fmt.Printf("DEBUG: consistently updated STH from size %d to %d\n", oldSize, newSize) return MonitorState{LogState: LogState{*sth}, NextIndex: state.NextIndex}, nil } func (t *tail) nextIncludedState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) { leafHash := c.leafHashes[0] oldSize := state.NextIndex + uint64(len(c.leafHashes)) iproof, err := getInclusionProof(ctx, &t.cli, leafHash, oldSize) if err != nil { err = fmt.Errorf("leaf hash %x and tree size %d: %v", leafHash[:], oldSize, err) return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-inclusion", Err: err} } if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want { err := fmt.Errorf("leaf hash %x and tree size %d: expected leaf index %d but got %d", leafHash[:], oldSize, got, want) return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "proof-index", Err: err} } oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath)) if err != nil { return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "inclusion", Err: err} } newSize := state.TreeSize newRoot := state.SHA256RootHash cproof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize) if err != nil { err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err) return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", Err: err} } if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil { err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err) return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", Err: err} } state.NextIndex += uint64(len(c.leafHashes)) return state, nil } func getInclusionProof(ctx context.Context, cli *client.LogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() return cli.GetProofByHash(rctx, leafHash[:], size) } func getConsistencyProof(ctx context.Context, cli *client.LogClient, oldSize, newSize uint64) ([][]byte, error) { if oldSize == 0 || oldSize >= newSize { return [][]byte{}, nil } rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() return cli.GetSTHConsistency(rctx, oldSize, newSize) } func getSignedTreeHead(ctx context.Context, cli *client.LogClient) (*ct.SignedTreeHead, error) { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() return cli.GetSTH(rctx) } func unslice(hashes [][]byte) [][sha256.Size]byte { var ret [][sha256.Size]byte for _, hash := range hashes { var h [sha256.Size]byte copy(h[:], hash) ret = append(ret, h) } return ret }