diff options
Diffstat (limited to 'internal/monitor/tail.go')
-rw-r--r-- | internal/monitor/tail.go | 79 |
1 files changed, 61 insertions, 18 deletions
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index d00ebe6..2603e81 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/scanner" @@ -75,31 +76,70 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er } func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { + var failedAt time.Time state := mon.State heap := newChunks() + sendChunk := func(ctx context.Context, force bool) { + if !failedAt.IsZero() && failedAt.Add(30*time.Second).After(time.Now()) { + return // ensures we don't spam get-sth and proof endpoints + } + + if heap.gap(state.NextIndex) { + return // nothing to send yet + } + c := heap.pop() + if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + return // wait for a larger chunk before batch verification + } + + nextState, err := t.nextState(ctx, state, c) + if err != nil { + failedAt = time.Now() + errorCh <- err + heap.push(c) + return + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + refreshSTH := func(ctx context.Context) { + timestamp := time.UnixMilli(int64(state.Timestamp)) + timestamp = timestamp.Add(t.cfg.ChunkTime) + timestamp = timestamp.Add(-1 * time.Second) + if timestamp.After(time.Now()) { + return + } + + // Looks like we haven't send any chunks the past ChunkTime time units. + // Get a newer tree head so the timestamp can be used for freshness. + nextState, err := t.nextConsistentState(ctx, state) + if err != nil { + errorCh <- err + return + } + state = nextState + eventCh <- Event{State: state} + } + + sendTicker := time.NewTicker(t.cfg.ChunkTime) + defer sendTicker.Stop() + for { select { case <-ctx.Done(): - return // FIXME: check if we can pop something before return + dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) + defer cancel() + sendChunk(dctx, true) + refreshSTH(dctx) + return + case <-sendTicker.C: + sendChunk(ctx, true) + refreshSTH(ctx) case c := <-chunkCh: heap.push(c) - if heap.gap(state.NextIndex) { - continue - } - c = heap.pop() - if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { - heap.push(c) - continue // FIXME: don't trigger if we havn't run nextState for too long - } - nextState, err := t.nextState(ctx, state, c) - if err != nil { - errorCh <- err - heap.push(c) - continue - } - - state = nextState - eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + sendChunk(ctx, false) } } } @@ -121,6 +161,9 @@ func (t *tail) nextConsistentState(ctx context.Context, state State) (State, err if err != nil { return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err) } + if sth.Timestamp < state.Timestamp { + return State{}, fmt.Errorf("%s: get-sth: timestamp is shrinking", t.checker.BaseURI()) + } sth.LogID = state.SignedTreeHead.LogID oldSize := state.TreeSize oldRoot := state.SHA256RootHash |