package monitor import ( "context" "fmt" "sync" "time" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/scanner" "gitlab.torproject.org/rgdd/ct/pkg/merkle" "rgdd.se/silentct/internal/ioutil" "rgdd.se/silentct/internal/logutil" ) type tail struct { cfg Config matcher Matcher scanner scanner.LogClient checker client.CheckLogClient } func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error) { chunkCh := make(chan *chunk) defer close(chunkCh) mctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup defer wg.Wait() callback := func(eb scanner.EntryBatch) { c := chunk{startIndex: uint64(eb.Start)} for i := 0; i < len(eb.Entries); i++ { c.leafHashes = append(c.leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) match, err := t.matcher.Match(eb.Entries[i].LeafInput, eb.Entries[i].ExtraData) if err != nil { c.errors = append(c.errors, fmt.Errorf("while processing index %d for %s: %v", i, mon.Config.URL, err)) continue } if !match { continue } c.matches = append(c.matches, LogEntry{ LeafIndex: c.startIndex + uint64(i), LeafData: eb.Entries[i].LeafInput, ExtraData: eb.Entries[i].ExtraData, }) } chunkCh <- &c } fetcher := scanner.NewFetcher(t.scanner, &scanner.FetcherOptions{ BatchSize: int(t.cfg.BatchSize), StartIndex: int64(mon.State.NextIndex), ParallelFetch: int(t.cfg.NumWorkers), Continuous: true, // FIXME: don't set this for read-only log }) wg.Add(1) go func() { defer wg.Done() defer cancel() fetcher.Run(mctx, callback) }() wg.Add(1) go func() { defer wg.Done() defer cancel() t.sequence(mctx, mon, eventCh, errorCh, chunkCh) }() } func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { state := mon.State heap := newChunks() sendChunk := func(ctx context.Context, force bool) { if heap.gap(state.NextIndex) { return // nothing to send yet } c := heap.pop() if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { heap.push(c) return // wait for a larger chunk before batch verification } nextState, err := t.nextState(ctx, state, c) if err != nil { errorCh <- err heap.push(c) return } state = nextState eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} } sendTicker := time.NewTicker(t.cfg.ChunkTime) defer sendTicker.Stop() for { select { case <-ctx.Done(): dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) defer cancel() sendChunk(dctx, true) return case <-sendTicker.C: sendChunk(ctx, true) case c := <-chunkCh: heap.push(c) sendChunk(ctx, false) } } } func (t *tail) nextState(ctx context.Context, state State, c *chunk) (State, error) { newState, err := t.nextConsistentState(ctx, state) if err != nil { return State{}, err } newState, err = t.nextIncludedState(ctx, newState, c) if err != nil { return State{}, err } return newState, nil } func (t *tail) nextConsistentState(ctx context.Context, state State) (State, error) { sth, err := logutil.GetSignedTreeHead(ctx, t.checker) if err != nil { return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err) } sth.LogID = state.SignedTreeHead.LogID oldSize := state.TreeSize oldRoot := state.SHA256RootHash newSize := sth.TreeSize newRoot := sth.SHA256RootHash proof, err := logutil.GetConsistencyProof(ctx, t.checker, oldSize, newSize) if err != nil { return State{}, fmt.Errorf("%s: get-consistency: %v", t.checker.BaseURI(), err) } if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, proof); err != nil { return State{}, fmt.Errorf("%s: verify consistency: %v", t.checker.BaseURI(), err) } return State{SignedTreeHead: *sth, CompactRange: ioutil.CopyHashes(state.CompactRange), NextIndex: state.NextIndex}, nil } func (t *tail) nextIncludedState(ctx context.Context, state State, c *chunk) (State, error) { cr, err := logutil.AppendCompactRange(state.CompactRange, state.NextIndex, c.leafHashes) if err != nil { panic(fmt.Sprintf("bug: %v", err)) } oldRoot := logutil.RootHash(cr) oldSize := state.NextIndex + uint64(len(c.leafHashes)) newRoot := state.SHA256RootHash newSize := state.TreeSize proof, err := logutil.GetConsistencyProof(ctx, t.checker, oldSize, newSize) if err != nil { return State{}, fmt.Errorf("%s: tree: get-consistency: %v", t.checker.BaseURI(), err) } if err := merkle.VerifyConsistency(oldSize, newSize, oldRoot, newRoot, proof); err != nil { return State{}, fmt.Errorf("%s: tree: verify consistency: %v", t.checker.BaseURI(), err) } state.NextIndex += uint64(len(c.leafHashes)) state.CompactRange = ioutil.UnsliceHashes(cr.Hashes()) return state, nil }