diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/monitor/monitor.go | 9 | ||||
-rw-r--r-- | internal/monitor/tail.go | 51 |
2 files changed, 42 insertions, 18 deletions
diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index ffe7f75..1f068b2 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "sync" + "time" ct "github.com/google/certificate-transparency-go" "github.com/google/certificate-transparency-go/client" @@ -68,6 +69,8 @@ type Config struct { Logger *logger.Logger // Debug prints only (no output by default) Contact string // Something that help log operators get in touch ChunkSize uint // Min number of leaves to propagate a chunk without matches + ChunkTime time.Duration // But always send chunks (if there are any) with this interval + ExitTime time.Duration // Maximum amount of time to spend on a graceful exit BatchSize uint // Max number of certificates to accept per worker NumWorkers uint // Number of parallel workers to use for each log } @@ -94,6 +97,12 @@ func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error) if cfg.ChunkSize == 0 { cfg.ChunkSize = 4096 } + if cfg.ChunkTime == 0 { + cfg.ChunkTime = 10 * time.Minute + } + if cfg.ExitTime == 0 { + cfg.ExitTime = 10 * time.Second + } if cfg.BatchSize == 0 { cfg.BatchSize = 1024 } diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index d00ebe6..98ce4e0 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/scanner" @@ -77,29 +78,43 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { state := mon.State heap := newChunks() + sendChunk := func(ctx context.Context, force bool) { + if heap.gap(state.NextIndex) { + return // nothing to send yet + } + + c := heap.pop() + if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + return // wait for a larger chunk before batch verification + } + + nextState, err := t.nextState(ctx, state, c) + if err != nil { + errorCh <- err + heap.push(c) + return + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + + sendTicker := time.NewTicker(t.cfg.ChunkTime) + defer sendTicker.Stop() + for { select { case <-ctx.Done(): - return // FIXME: check if we can pop something before return + dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) + defer cancel() + sendChunk(dctx, true) + return + case <-sendTicker.C: + sendChunk(ctx, true) case c := <-chunkCh: heap.push(c) - if heap.gap(state.NextIndex) { - continue - } - c = heap.pop() - if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { - heap.push(c) - continue // FIXME: don't trigger if we havn't run nextState for too long - } - nextState, err := t.nextState(ctx, state, c) - if err != nil { - errorCh <- err - heap.push(c) - continue - } - - state = nextState - eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + sendChunk(ctx, false) } } } |