aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/monitor')
-rw-r--r--internal/monitor/tail.go20
1 files changed, 20 insertions, 0 deletions
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go
index 2ad8135..43a6d56 100644
--- a/internal/monitor/tail.go
+++ b/internal/monitor/tail.go
@@ -99,6 +99,24 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even
state = nextState
eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
}
+ refreshSTH := func(ctx context.Context) {
+ timestamp := time.UnixMilli(int64(state.Timestamp))
+ timestamp = timestamp.Add(t.cfg.ChunkTime)
+ timestamp = timestamp.Add(-1 * time.Second)
+ if timestamp.After(time.Now()) {
+ return
+ }
+
+ // Looks like we haven't send any chunks the past ChunkTime time units.
+ // Get a newer tree head so the timestamp can be used for freshness.
+ nextState, err := t.nextConsistentState(ctx, state)
+ if err != nil {
+ errorCh <- err
+ return
+ }
+ state = nextState
+ eventCh <- Event{State: state}
+ }
sendTicker := time.NewTicker(t.cfg.ChunkTime)
defer sendTicker.Stop()
@@ -109,9 +127,11 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even
dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime)
defer cancel()
sendChunk(dctx, true)
+ refreshSTH(dctx)
return
case <-sendTicker.C:
sendChunk(ctx, true)
+ refreshSTH(ctx)
case c := <-chunkCh:
heap.push(c)
sendChunk(ctx, false)