From 108932cab926cf9743df7f9eaf383104130e9377 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sun, 5 Jan 2025 14:34:04 +0100 Subject: fix: Ensure chunks are sent eventually and on exit --- internal/monitor/tail.go | 51 +++++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 18 deletions(-) (limited to 'internal/monitor/tail.go') diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index d00ebe6..98ce4e0 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/scanner" @@ -77,29 +78,43 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { state := mon.State heap := newChunks() + sendChunk := func(ctx context.Context, force bool) { + if heap.gap(state.NextIndex) { + return // nothing to send yet + } + + c := heap.pop() + if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + return // wait for a larger chunk before batch verification + } + + nextState, err := t.nextState(ctx, state, c) + if err != nil { + errorCh <- err + heap.push(c) + return + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + + sendTicker := time.NewTicker(t.cfg.ChunkTime) + defer sendTicker.Stop() + for { select { case <-ctx.Done(): - return // FIXME: check if we can pop something before return + dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) + defer cancel() + sendChunk(dctx, true) + return + case <-sendTicker.C: + sendChunk(ctx, true) case c := <-chunkCh: heap.push(c) - if heap.gap(state.NextIndex) { - continue - } - c = heap.pop() - if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { - heap.push(c) - continue // FIXME: don't trigger if we havn't run nextState for too long - } - nextState, err := t.nextState(ctx, state, c) - if err != nil { - errorCh <- err - heap.push(c) - continue - } - - state = nextState - eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + sendChunk(ctx, false) } } } -- cgit v1.2.3