diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/monitor/tail.go | 20 | 
1 files changed, 20 insertions, 0 deletions
| diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index 2ad8135..43a6d56 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -99,6 +99,24 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even  		state = nextState  		eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}  	} +	refreshSTH := func(ctx context.Context) { +		timestamp := time.UnixMilli(int64(state.Timestamp)) +		timestamp = timestamp.Add(t.cfg.ChunkTime) +		timestamp = timestamp.Add(-1 * time.Second) +		if timestamp.After(time.Now()) { +			return +		} + +		// Looks like we haven't send any chunks the past ChunkTime time units. +		// Get a newer tree head so the timestamp can be used for freshness. +		nextState, err := t.nextConsistentState(ctx, state) +		if err != nil { +			errorCh <- err +			return +		} +		state = nextState +		eventCh <- Event{State: state} +	}  	sendTicker := time.NewTicker(t.cfg.ChunkTime)  	defer sendTicker.Stop() @@ -109,9 +127,11 @@ func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Even  			dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime)  			defer cancel()  			sendChunk(dctx, true) +			refreshSTH(dctx)  			return  		case <-sendTicker.C:  			sendChunk(ctx, true) +			refreshSTH(ctx)  		case c := <-chunkCh:  			heap.push(c)  			sendChunk(ctx, false) | 
