aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor
diff options
context:
space:
mode:
Diffstat (limited to 'internal/monitor')
-rw-r--r--internal/monitor/backoff.go56
-rw-r--r--internal/monitor/monitor.go12
-rw-r--r--internal/monitor/tail.go79
3 files changed, 128 insertions, 19 deletions
diff --git a/internal/monitor/backoff.go b/internal/monitor/backoff.go
new file mode 100644
index 0000000..63c5f55
--- /dev/null
+++ b/internal/monitor/backoff.go
@@ -0,0 +1,56 @@
+package monitor
+
+import (
+ "context"
+
+ ct "github.com/google/certificate-transparency-go"
+ "github.com/google/certificate-transparency-go/client"
+ "github.com/google/certificate-transparency-go/jsonclient"
+ "github.com/google/trillian/client/backoff"
+)
+
+// backoffClient wraps client.LogClient so that we always backoff on get-entries
+// 4XX and 5XX. Backoff is on by default for get-sth already, and our silentct
+// usage is guaranteed to not do any hammering on any of the proof endpoints.
+//
+// For reference on this issue, see:
+// https://github.com/google/certificate-transparency-go/issues/898
+type backoffClient struct {
+ cli *client.LogClient
+}
+
+func (bc *backoffClient) BaseURI() string {
+ return bc.cli.BaseURI()
+}
+
+func (bc *backoffClient) GetSTH(ctx context.Context) (*ct.SignedTreeHead, error) {
+ return bc.cli.GetSTH(ctx)
+}
+
+func (bc *backoffClient) GetSTHConsistency(ctx context.Context, first, second uint64) ([][]byte, error) {
+ return bc.cli.GetSTHConsistency(ctx, first, second)
+}
+
+func (bc *backoffClient) GetProofByHash(ctx context.Context, hash []byte, treeSize uint64) (*ct.GetProofByHashResponse, error) {
+ return bc.cli.GetProofByHash(ctx, hash, treeSize)
+}
+
+func (bc *backoffClient) GetRawEntries(ctx context.Context, start, end int64) (*ct.GetEntriesResponse, error) {
+ rsp, err := bc.cli.GetRawEntries(ctx, start, end)
+ if err != nil {
+ jcErr, ok := err.(jsonclient.RspError)
+ if !ok {
+ return rsp, err
+ }
+ if jcErr.StatusCode < 400 || jcErr.StatusCode >= 600 {
+ return rsp, err
+ }
+ // This ensures we never start hammering when the status code is 4XX or
+ // 5XX. Probably not the right thing to do in all cases, but since the
+ // download library we're using starts hammering if the log suddenly
+ // serves something unexpected this seems like a good safety precaution.
+ // Users of the silentct monitor eventually notice they get no entries.
+ return rsp, backoff.RetriableErrorf("get-entries: %v", err)
+ }
+ return rsp, err
+}
diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go
index ffe7f75..2575977 100644
--- a/internal/monitor/monitor.go
+++ b/internal/monitor/monitor.go
@@ -18,6 +18,7 @@ import (
"net/http"
"os"
"sync"
+ "time"
ct "github.com/google/certificate-transparency-go"
"github.com/google/certificate-transparency-go/client"
@@ -68,6 +69,8 @@ type Config struct {
Logger *logger.Logger // Debug prints only (no output by default)
Contact string // Something that help log operators get in touch
ChunkSize uint // Min number of leaves to propagate a chunk without matches
+ ChunkTime time.Duration // But always send chunks (if there are any) with this interval
+ ExitTime time.Duration // Maximum amount of time to spend on a graceful exit
BatchSize uint // Max number of certificates to accept per worker
NumWorkers uint // Number of parallel workers to use for each log
}
@@ -94,6 +97,12 @@ func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error)
if cfg.ChunkSize == 0 {
cfg.ChunkSize = 4096
}
+ if cfg.ChunkTime == 0 {
+ cfg.ChunkTime = 10 * time.Minute
+ }
+ if cfg.ExitTime == 0 {
+ cfg.ExitTime = 10 * time.Second
+ }
if cfg.BatchSize == 0 {
cfg.BatchSize = 1024
}
@@ -164,7 +173,8 @@ func (mon *Monitor) newTailRFC6962(log MonitoredLog) (tail, error) {
return tail{}, err
}
- return tail{cfg: mon.cfg, scanner: cli, checker: cli, matcher: mon.matcher}, nil
+ bc := &backoffClient{cli: cli}
+ return tail{cfg: mon.cfg, scanner: bc, checker: bc, matcher: mon.matcher}, nil
}
func (mon *Monitor) newTailTile(cfg MonitoredLog) (tail, error) {
diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go
index d00ebe6..2603e81 100644
--- a/internal/monitor/tail.go
+++ b/internal/monitor/tail.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
+ "time"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/scanner"
@@ -75,31 +76,70 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er
}
func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) {
+ var failedAt time.Time
state := mon.State
heap := newChunks()
+ sendChunk := func(ctx context.Context, force bool) {
+ if !failedAt.IsZero() && failedAt.Add(30*time.Second).After(time.Now()) {
+ return // ensures we don't spam get-sth and proof endpoints
+ }
+
+ if heap.gap(state.NextIndex) {
+ return // nothing to send yet
+ }
+ c := heap.pop()
+ if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) {
+ heap.push(c)
+ return // wait for a larger chunk before batch verification
+ }
+
+ nextState, err := t.nextState(ctx, state, c)
+ if err != nil {
+ failedAt = time.Now()
+ errorCh <- err
+ heap.push(c)
+ return
+ }
+
+ state = nextState
+ eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
+ }
+ refreshSTH := func(ctx context.Context) {
+ timestamp := time.UnixMilli(int64(state.Timestamp))
+ timestamp = timestamp.Add(t.cfg.ChunkTime)
+ timestamp = timestamp.Add(-1 * time.Second)
+ if timestamp.After(time.Now()) {
+ return
+ }
+
+ // Looks like we haven't send any chunks the past ChunkTime time units.
+ // Get a newer tree head so the timestamp can be used for freshness.
+ nextState, err := t.nextConsistentState(ctx, state)
+ if err != nil {
+ errorCh <- err
+ return
+ }
+ state = nextState
+ eventCh <- Event{State: state}
+ }
+
+ sendTicker := time.NewTicker(t.cfg.ChunkTime)
+ defer sendTicker.Stop()
+
for {
select {
case <-ctx.Done():
- return // FIXME: check if we can pop something before return
+ dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime)
+ defer cancel()
+ sendChunk(dctx, true)
+ refreshSTH(dctx)
+ return
+ case <-sendTicker.C:
+ sendChunk(ctx, true)
+ refreshSTH(ctx)
case c := <-chunkCh:
heap.push(c)
- if heap.gap(state.NextIndex) {
- continue
- }
- c = heap.pop()
- if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) {
- heap.push(c)
- continue // FIXME: don't trigger if we havn't run nextState for too long
- }
- nextState, err := t.nextState(ctx, state, c)
- if err != nil {
- errorCh <- err
- heap.push(c)
- continue
- }
-
- state = nextState
- eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors}
+ sendChunk(ctx, false)
}
}
}
@@ -121,6 +161,9 @@ func (t *tail) nextConsistentState(ctx context.Context, state State) (State, err
if err != nil {
return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err)
}
+ if sth.Timestamp < state.Timestamp {
+ return State{}, fmt.Errorf("%s: get-sth: timestamp is shrinking", t.checker.BaseURI())
+ }
sth.LogID = state.SignedTreeHead.LogID
oldSize := state.TreeSize
oldRoot := state.SHA256RootHash