diff options
Diffstat (limited to 'internal/monitor/tail.go')
-rw-r--r-- | internal/monitor/tail.go | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go new file mode 100644 index 0000000..0e16476 --- /dev/null +++ b/internal/monitor/tail.go @@ -0,0 +1,200 @@ +package monitor + +import ( + "context" + "crypto/sha256" + "fmt" + "sync" + "time" + + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/scanner" + "gitlab.torproject.org/rgdd/ct/pkg/merkle" +) + +type tail struct { + cfg Config + matcher Matcher + scanner scanner.LogClient + checker client.CheckLogClient +} + +func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error) { + chunkCh := make(chan *chunk) + defer close(chunkCh) + + mctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + defer wg.Wait() + + callback := func(eb scanner.EntryBatch) { + c := chunk{startIndex: uint64(eb.Start)} + for i := 0; i < len(eb.Entries); i++ { + c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) + match, err := t.matcher.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData) + if err != nil { + c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, mon.Config.URL, err)) + continue + } + if !match { + continue + } + + c.matches = append(c.matches, LogEntry{ + LeafIndex: c.startIndex + uint64(i), + LeafData: eb.Entries[i].LeafInput, + ExtraData: eb.Entries[i].ExtraData, + }) + } + + chunkCh <- &c + } + + fetcher := scanner.NewFetcher(t.scanner, &scanner.FetcherOptions{ + BatchSize: int(t.cfg.BatchSize), + StartIndex: int64(mon.State.NextIndex), + ParallelFetch: int(t.cfg.NumWorkers), + Continuous: true, // FIXME: don't set this for read-only log + }) + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + fetcher.Run(mctx, callback) + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + t.sequence(mctx, mon, eventCh, errorCh, chunkCh) + }() +} + +func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { + state := mon.State + heap := newChunks() + for { + select { + case <-ctx.Done(): + return // FIXME: check if we can pop something before return + case c := <-chunkCh: + heap.push(c) + if heap.gap(state.NextIndex) { + continue + } + c = heap.pop() + if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + continue // FIXME: don't trigger if we havn't run nextState for too long + } + nextState, err := t.nextState(ctx, state, c) + if err != nil { + errorCh <- err + heap.push(c) + continue + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + } +} + +func (t *tail) nextState(ctx context.Context, state State, c *chunk) (State, error) { + newState, err := t.nextConsistentState(ctx, state) + if err != nil { + return State{}, err + } + newState, err = t.nextIncludedState(ctx, newState, c) + if err != nil { + return State{}, err + } + return newState, nil +} + +func (t *tail) nextConsistentState(ctx context.Context, state State) (State, error) { + sth, err := getSignedTreeHead(ctx, t.checker) + if err != nil { + return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err) + } + sth.LogID = state.SignedTreeHead.LogID + oldSize := state.TreeSize + oldRoot := state.SHA256RootHash + newSize := sth.TreeSize + newRoot := sth.SHA256RootHash + + proof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize) + if err != nil { + return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err) + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(proof)); err != nil { + return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err) + } + + return State{SignedTreeHead: *sth, NextIndex: state.NextIndex}, nil +} + +func (t *tail) nextIncludedState(ctx context.Context, state State, c *chunk) (State, error) { + leafHash := c.leafHashes[0] + oldSize := state.NextIndex + uint64(len(c.leafHashes)) + iproof, err := getInclusionProof(ctx, t.checker, leafHash, oldSize) // FIXME: set leaf index in ctx to hack into tile API + if err != nil { + return State{}, fmt.Errorf("%s: get-inclusion: %v", t.checker.BaseURI(), err) + } + if got, want := uint64(iproof.LeafIndex), state.NextIndex; got != want { + return State{}, fmt.Errorf("%s: wrong index for get-inclusion proof query %x:%d", t.checker.BaseURI(), leafHash[:], oldSize) + } + oldRoot, err := merkle.TreeHeadFromRangeProof(c.leafHashes, state.NextIndex, unslice(iproof.AuditPath)) + if err != nil { + return State{}, fmt.Errorf("%s: range proof: %v", t.checker.BaseURI(), err) + } + + newSize := state.TreeSize + newRoot := state.SHA256RootHash + cproof, err := getConsistencyProof(ctx, t.checker, oldSize, newSize) + if err != nil { + return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err) + } + if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, unslice(cproof)); err != nil { + return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err) + } + + state.NextIndex += uint64(len(c.leafHashes)) + return state, nil +} + +func getInclusionProof(ctx context.Context, cli client.CheckLogClient, leafHash [sha256.Size]byte, size uint64) (*ct.GetProofByHashResponse, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetProofByHash(rctx, leafHash[:], size) +} + +func getConsistencyProof(ctx context.Context, cli client.CheckLogClient, oldSize, newSize uint64) ([][]byte, error) { + if oldSize == 0 || oldSize >= newSize { + return [][]byte{}, nil + } + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTHConsistency(rctx, oldSize, newSize) +} + +func getSignedTreeHead(ctx context.Context, cli client.CheckLogClient) (*ct.SignedTreeHead, error) { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return cli.GetSTH(rctx) +} + +func unslice(hashes [][]byte) [][sha256.Size]byte { + var ret [][sha256.Size]byte + for _, hash := range hashes { + var h [sha256.Size]byte + copy(h[:], hash) + ret = append(ret, h) + } + return ret +} |