aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor/tail.go
diff options
context:
space:
mode:
authorRasmus Dahlberg <rgdd@glasklarteknik.se>2025-01-05 14:34:04 +0100
committerRasmus Dahlberg <rgdd@glasklarteknik.se>2025-01-05 14:34:04 +0100
commit108932cab926cf9743df7f9eaf383104130e9377 (patch)
tree42f773ebe08dff70eb9f9aeb3b0474d9fb98ce54 /internal/monitor/tail.go
parentcfc5247820c381310763faa718cdbe19aa4390a3 (diff)
fix: Ensure chunks are sent eventually and on exit
Diffstat (limited to 'internal/monitor/tail.go')
-rw-r--r--internal/monitor/tail.go51
1 files changed, 33 insertions, 18 deletions
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go
index d00ebe6..98ce4e0 100644
--- a/internal/monitor/tail.go
+++ b/internal/monitor/tail.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
+ "time"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/scanner"
@@ -77,29 +78,43 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er
func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) {
state := mon.State
heap := newChunks()
+ sendChunk := func(ctx context.Context, force bool) {
+ if heap.gap(state.NextIndex) {
+ return // nothing to send yet
+ }
+
+ c := heap.pop()
+ if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) {
+ heap.push(c)
+ return // wait for a larger chunk before batch verification
+ }
+
+ nextState, err := t.nextState(ctx, state, c)
+ if err != nil {
+ errorCh <- err
+ heap.push(c)
+ return
+ }
+
+ state = nextState
+ eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
+ }
+
+ sendTicker := time.NewTicker(t.cfg.ChunkTime)
+ defer sendTicker.Stop()
+
for {
select {
case <-ctx.Done():
- return // FIXME: check if we can pop something before return
+ dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime)
+ defer cancel()
+ sendChunk(dctx, true)
+ return
+ case <-sendTicker.C:
+ sendChunk(ctx, true)
case c := <-chunkCh:
heap.push(c)
- if heap.gap(state.NextIndex) {
- continue
- }
- c = heap.pop()
- if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) {
- heap.push(c)
- continue // FIXME: don't trigger if we havn't run nextState for too long
- }
- nextState, err := t.nextState(ctx, state, c)
- if err != nil {
- errorCh <- err
- heap.push(c)
- continue
- }
-
- state = nextState
- eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
+ sendChunk(ctx, false)
}
}
}