diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/monitor/tail.go | 20 |
1 files changed, 20 insertions, 0 deletions
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index 2ad8135..43a6d56 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -99,6 +99,24 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even state = nextState eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} } + refreshSTH := func(ctx context.Context) { + timestamp := time.UnixMilli(int64(state.Timestamp)) + timestamp = timestamp.Add(t.cfg.ChunkTime) + timestamp = timestamp.Add(-1 * time.Second) + if timestamp.After(time.Now()) { + return + } + + // Looks like we haven't send any chunks the past ChunkTime time units. + // Get a newer tree head so the timestamp can be used for freshness. + nextState, err := t.nextConsistentState(ctx, state) + if err != nil { + errorCh <- err + return + } + state = nextState + eventCh <- Event{State: state} + } sendTicker := time.NewTicker(t.cfg.ChunkTime) defer sendTicker.Stop() @@ -109,9 +127,11 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) defer cancel() sendChunk(dctx, true) + refreshSTH(dctx) return case <-sendTicker.C: sendChunk(ctx, true) + refreshSTH(ctx) case c := <-chunkCh: heap.push(c) sendChunk(ctx, false) |