From b8b6ce265ef083d21db155af4a50cee1b9b0b934 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sun, 5 Jan 2025 14:34:04 +0100 Subject: fix: Ensure fresh STHs are propagated --- internal/monitor/tail.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) (limited to 'internal') diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index 2ad8135..43a6d56 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -99,6 +99,24 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even state = nextState eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} } + refreshSTH := func(ctx context.Context) { + timestamp := time.UnixMilli(int64(state.Timestamp)) + timestamp = timestamp.Add(t.cfg.ChunkTime) + timestamp = timestamp.Add(-1 * time.Second) + if timestamp.After(time.Now()) { + return + } + + // Looks like we haven't send any chunks the past ChunkTime time units. + // Get a newer tree head so the timestamp can be used for freshness. + nextState, err := t.nextConsistentState(ctx, state) + if err != nil { + errorCh <- err + return + } + state = nextState + eventCh <- Event{State: state} + } sendTicker := time.NewTicker(t.cfg.ChunkTime) defer sendTicker.Stop() @@ -109,9 +127,11 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) defer cancel() sendChunk(dctx, true) + refreshSTH(dctx) return case <-sendTicker.C: sendChunk(ctx, true) + refreshSTH(ctx) case c := <-chunkCh: heap.push(c) sendChunk(ctx, false) -- cgit v1.2.3