aboutsummaryrefslogtreecommitdiff
path: root/pkg/monitor/monitor.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-12-31 09:39:25 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2024-01-07 20:22:23 +0100
commite18d36ebae30536c77c61cd5da123991e0ca1629 (patch)
treebf4880c0019a6009ab1b671e23ef4a1a4a5e8e08 /pkg/monitor/monitor.go
parent54d980afcbd6f0011d6a162e0003587d26a3e311 (diff)
Add drafty prototype
Diffstat (limited to 'pkg/monitor/monitor.go')
-rw-r--r--pkg/monitor/monitor.go286
1 files changed, 0 insertions, 286 deletions
diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go
deleted file mode 100644
index 5f7a629..0000000
--- a/pkg/monitor/monitor.go
+++ /dev/null
@@ -1,286 +0,0 @@
-// Package monitor provides a Certificate Transparency monitor that tails a list
-// of logs which can be updated dynamically while running. All emitted progress
-// messages have been verified by the monitor to be included in the log's
-// append-only Merkle tree with regard to the initial start-up state. It is up
-// to the user to process the monitor's progress, errors, and persist state.
-//
-// Implement the Matcher interface to customize which certificates should be
-// included in a log's progress messages, or use any of the existing matchers
-// provided by this package (see for example MatchAll and MatchWildcards).
-package monitor
-
-import (
- "context"
- "crypto/sha256"
- "fmt"
- "net/http"
- "sync"
- "time"
-
- ct "github.com/google/certificate-transparency-go"
- "github.com/google/certificate-transparency-go/client"
- "github.com/google/certificate-transparency-go/jsonclient"
- "github.com/google/certificate-transparency-go/scanner"
- "rgdd.se/silent-ct/internal/merkle"
-)
-
-const (
- UserAgentPrefix = "rgdd.se/silent-ct"
- DefaultContact = "unknown-user"
- DefaultChunkSize = 256 // TODO: increase me
- DefaultBatchSize = 128 // TODO: increase me
- DefaultNumWorkers = 2
-)
-
-type Config struct {
- Contact string // Something that help log operators get in touch
- ChunkSize int // Min number of leaves to propagate a chunk without matches
- BatchSize int // Max number of certificates to accept per worker
- NumWorkers int // Number of parallel workers to use for each log
-
- // Callback determines which certificates are interesting to detect
- Callback Matcher
-}
-
-type Monitor struct {
- Config
-}
-
-func New(cfg Config) (Monitor, error) {
- if cfg.Contact == "" {
- cfg.Contact = "unknown-user"
- }
- if cfg.ChunkSize <= 0 {
- cfg.ChunkSize = DefaultChunkSize
- }
- if cfg.BatchSize <= 0 {
- cfg.BatchSize = DefaultBatchSize
- }
- if cfg.NumWorkers <= 0 {
- cfg.NumWorkers = DefaultNumWorkers
- }
- if cfg.Callback == nil {
- cfg.Callback = &MatchAll{}
- }
- return Monitor{Config: cfg}, nil
-}
-
-func (mon *Monitor) Run(ctx context.Context, metadataCh chan []MessageLogConfig, eventCh chan MessageLogProgress, errorCh chan error) {
- var wg sync.WaitGroup
- var sctx context.Context
- var cancel context.CancelFunc
-
- for {
- select {
- case <-ctx.Done():
- return
- case metadata := <-metadataCh:
- fmt.Printf("DEBUG: received new list with %d logs\n", len(metadata))
- if cancel != nil {
- fmt.Printf("DEBUG: stopping all log tailers\n")
- cancel()
- wg.Wait()
- }
-
- sctx, cancel = context.WithCancel(ctx)
- for _, md := range metadata {
- fmt.Printf("DEBUG: starting log tailer %s\n", md.Metadata.URL)
- wg.Add(1)
- go func(lcfg MessageLogConfig) {
- defer wg.Done()
-
- opts := jsonclient.Options{Logger: &noout{}, UserAgent: UserAgentPrefix + ":" + mon.Contact}
- cli, err := client.New(string(lcfg.Metadata.URL), &http.Client{}, opts)
- if err != nil {
- errorCh <- fmt.Errorf("unable to configure %s: %v", lcfg.Metadata.URL, err)
- return
- }
-
- chunkCh := make(chan *chunk)
- defer close(chunkCh)
-
- t := tail{mon.Config, *cli, chunkCh, eventCh, errorCh}
- if err := t.run(sctx, lcfg.State); err != nil {
- errorCh <- fmt.Errorf("unable to continue tailing %s: %v", lcfg.Metadata.URL, err)
- }
- }(md)
- }
- }
- }
-}
-
-type tail struct {
- mcfg Config
- cli client.LogClient
-
- chunkCh chan *chunk
- eventCh chan MessageLogProgress
- errorCh chan error
-}
-
-func (t *tail) run(ctx context.Context, state MonitorState) error {
- var wg sync.WaitGroup
- defer wg.Wait()
-
- mctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
- defer cancel()
- t.sequence(mctx, state)
- }()
-
- fetcher := scanner.NewFetcher(&t.cli, &scanner.FetcherOptions{
- BatchSize: t.mcfg.BatchSize,
- StartIndex: int64(state.NextIndex),
- ParallelFetch: t.mcfg.NumWorkers,
- Continuous: true,
- })
- callback := func(eb scanner.EntryBatch) {
- c := chunk{startIndex: uint64(eb.Start)}
- for i := 0; i < len(eb.Entries); i++ {
- c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput))
- match, err := t.mcfg.Callback.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData)
- if err != nil {
- c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, t.cli.BaseURI(), err))
- } else if match {
- c.matches = append(c.matches, LogEntry{
- LeafIndex: uint64(i),
- LeafData: eb.Entries[i].LeafInput,
- ExtraData: eb.Entries[i].ExtraData,
- })
- }
- }
- t.chunkCh <- &c
- }
- return fetcher.Run(mctx, callback)
-}
-
-func (t *tail) sequence(ctx context.Context, state MonitorState) {
- heap := newChunks()
- for {
- select {
- case <-ctx.Done():
- return
- case c := <-t.chunkCh:
- heap.push(c)
- if heap.gap(state.NextIndex) {
- continue
- }
- c = heap.pop()
- if len(c.matches) == 0 && len(c.leafHashes) < t.mcfg.ChunkSize {
- heap.push(c)
- continue // TODO: don't trigger if we havn't run nextState for too long
- }
- nextState, err := t.nextState(ctx, state, c)
- if err != nil {
- t.errorCh <- err
- heap.push(c)
- continue
- }
-
- state = nextState
- t.eventCh <- MessageLogProgress{State: state, Matches: c.matches, Errors: c.errors}
- }
- }
-}
-
-func (t *tail) nextState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) {
- newState, err := t.nextConsistentState(ctx, state)
- if err != nil {
- return MonitorState{}, err
- }
- newState, err = t.nextIncludedState(ctx, state, c)
- if err != nil {
- return MonitorState{}, err
- }
- return newState, nil
-}
-
-func (t *tail) nextConsistentState(ctx context.Context, state MonitorState) (MonitorState, error) {
- sth, err := getSignedTreeHead(ctx, &t.cli)
- if err != nil {
- return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-sth", Err: err}
- }
- oldSize := state.TreeSize
- oldRoot := state.SHA256RootHash
- newSize := sth.TreeSize
- newRoot := sth.SHA256RootHash
-
- proof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize)
- if err != nil {
- return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", STH: sth, Err: err}
- }
- if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil {
- return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", STH: sth, Err: err}
- }
-
- fmt.Printf("DEBUG: consistently updated STH from size %d to %d\n", oldSize, newSize)
- return MonitorState{LogState: LogState{*sth}, NextIndex: state.NextIndex}, nil
-}
-
-func (t *tail) nextIncludedState(ctx context.Context, state MonitorState, c *chunk) (MonitorState, error) {
- leafHash := c.leafHashes[0]
- oldSize := state.NextIndex + uint64(len(c.leafHashes))
- iproof, err := getInclusionProof(ctx, &t.cli, leafHash, oldSize)
- if err != nil {
- err = fmt.Errorf("leaf hash %x and tree size %d: %v", leafHash[:], oldSize, err)
- return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-inclusion", Err: err}
- }
- if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want {
- err := fmt.Errorf("leaf hash %x and tree size %d: expected leaf index %d but got %d", leafHash[:], oldSize, got, want)
- return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "proof-index", Err: err}
- }
- oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath))
- if err != nil {
- return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "inclusion", Err: err}
- }
-
- newSize := state.TreeSize
- newRoot := state.SHA256RootHash
- cproof, err := getConsistencyProof(ctx, &t.cli, oldSize, newSize)
- if err != nil {
- err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err)
- return MonitorState{}, ErrorFetch{URL: t.cli.BaseURI(), Msg: "get-consistency", Err: err}
- }
- if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil {
- err = fmt.Errorf("from size %d to %d: %v", oldSize, newSize, err)
- return MonitorState{}, ErrorMerkleTree{URL: t.cli.BaseURI(), Msg: "consistency", Err: err}
- }
-
- state.NextIndex += uint64(len(c.leafHashes))
- return state, nil
-}
-
-func getInclusionProof(ctx context.Context, cli *client.LogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) {
- rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
- return cli.GetProofByHash(rctx, leafHash[:], size)
-}
-
-func getConsistencyProof(ctx context.Context, cli *client.LogClient, oldSize, newSize uint64) ([][]byte, error) {
- if oldSize == 0 || oldSize >= newSize {
- return [][]byte{}, nil
- }
- rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
- return cli.GetSTHConsistency(rctx, oldSize, newSize)
-}
-
-func getSignedTreeHead(ctx context.Context, cli *client.LogClient) (*ct.SignedTreeHead, error) {
- rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
- defer cancel()
- return cli.GetSTH(rctx)
-}
-
-func unslice(hashes [][]byte) [][sha256.Size]byte {
- var ret [][sha256.Size]byte
- for _, hash := range hashes {
- var h [sha256.Size]byte
- copy(h[:], hash)
- ret = append(ret, h)
- }
- return ret
-}