diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/manager/manager.go | 48 | ||||
-rw-r--r-- | internal/metrics/metrics.go | 114 | ||||
-rw-r--r-- | internal/monitor/backoff.go | 56 | ||||
-rw-r--r-- | internal/monitor/monitor.go | 12 | ||||
-rw-r--r-- | internal/monitor/tail.go | 79 |
5 files changed, 288 insertions, 21 deletions
diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 6781d57..b839502 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -5,20 +5,24 @@ import ( "errors" "fmt" "os" + "strings" "time" "gitlab.torproject.org/rgdd/ct/pkg/metadata" "rgdd.se/silentct/internal/feedback" "rgdd.se/silentct/internal/logger" + "rgdd.se/silentct/internal/metrics" "rgdd.se/silentct/internal/monitor" "rgdd.se/silentct/pkg/policy" "rgdd.se/silentct/pkg/storage" + "rgdd.se/silentct/pkg/storage/loglist" ) type Config struct { Policy policy.Policy Bootstrap bool // Whether a new storage should be initialized from scratch Directory string // Path to a directory where everything will be stored + Metrics *metrics.Metrics // Optional Logger *logger.Logger // Where to output messages and with what verbosity @@ -82,6 +86,12 @@ func (mgr *Manager) Run(ctx context.Context) error { metadataTicker := time.NewTicker(mgr.MetadataRefreshInterval) defer metadataTicker.Stop() + alertTicker := time.NewTicker(mgr.AlertDelay) + defer alertTicker.Stop() + if err := mgr.alertJob(); err != nil { + return fmt.Errorf("unable to run alert job: %v\n", err) + } + shutdown := false for { select { @@ -89,6 +99,10 @@ func (mgr *Manager) Run(ctx context.Context) error { if err := mgr.metadataJob(ctx); err != nil { return fmt.Errorf("unable to run metadata job: %v", err) } + case <-alertTicker.C: + if err := mgr.alertJob(); err != nil { + return fmt.Errorf("unable to run alert job: %v\n", err) + } case ev := <-mgr.meventCh: if err := mgr.monitorJob(ev); err != nil { return fmt.Errorf("unable to run monitor job: %v", err) @@ -133,6 +147,7 @@ func (mgr *Manager) startupConfig() error { return err } mgr.mconfigCh <- monitor.MonitoredLog{Config: log, State: state} + mgr.Metrics.LogState(loglist.FormatLogName(log), state) } return nil } @@ -157,6 +172,7 @@ func (mgr *Manager) removeLogs(logs []metadata.Log) { state, _ := mgr.GetMonitorState(log) mgr.Logger.Infof("removing log %s with %d entries in its backlog\n", log.URL, state.TreeSize-state.NextIndex) mgr.mconfigCh <- monitor.MonitoredLog{Config: log} + mgr.Metrics.RemoveLogState(loglist.FormatLogName(log), state) } } @@ -168,10 +184,12 @@ func (mgr *Manager) addLogs(ctx context.Context, logs []metadata.Log) { mgr.Logger.Infof("adding log %s with existing state on disk\n", log.URL) } else if err != nil { mgr.Logger.Noticef("restart required: failed to bootstrap new log %s: %v\n", log.URL, err) + mgr.Metrics.NeedRestart() } else { mgr.Logger.Infof("bootstrapping log %s at next index 0\n", log.URL) } mgr.mconfigCh <- monitor.MonitoredLog{Config: log, State: state} + mgr.Metrics.LogState(loglist.FormatLogName(log), state) } } @@ -190,21 +208,47 @@ func (mgr *Manager) monitorJob(msg monitor.Event) error { if err := mgr.AddEntries(msg.State.LogID, msg.Matches); err != nil { return err } - return mgr.SetMonitorState(msg.State.LogID, msg.State) + if err := mgr.SetMonitorState(msg.State.LogID, msg.State); err != nil { + return err + } + for _, err := range msg.Errors { + mgr.errorJob(err) + } + + // no metrics update if the log has just been removed (final event) + name, err := mgr.Storage.LogList.LogName(msg.State.SignedTreeHead.LogID) + if err == nil { + mgr.Metrics.LogState(name, msg.State) + } + return nil } func (mgr *Manager) alertJob() error { + // See if there are any new unexpected certificates alerts, err := mgr.Index.TriggerAlerts() if err != nil { return err } for _, alert := range alerts { - mgr.Logger.Noticef("certificate mis-issuance? No node submitted certificate %s\n", alert.StoredAt) + mgr.Logger.Noticef("unexpected certificate: no allowlisting for crt_sans=\"%s\", see log_id=\"%x\" log_index=\"%d\"\n", strings.Join(alert.SANs, " "), alert.LogID, alert.LogIndex) + } + + // Update metrics for the current unexpected certificates + alerting := mgr.Storage.Index.Alerting() + var names []string + for _, alert := range alerting { + name, err := mgr.Storage.LogList.LogName(alert.LogID) + if err != nil { + name = "historic log" + } + names = append(names, name) } + mgr.Metrics.UnexpectedCertificateCount(names, mgr.Storage.Index.Alerting()) return nil } func (mgr *Manager) errorJob(err error) error { mgr.Logger.Debugf("received error: %v\n", err) + mgr.Metrics.CountError() return nil } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..aae46cd --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,114 @@ +package metrics + +import ( + "fmt" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "rgdd.se/silentct/internal/monitor" + "rgdd.se/silentct/pkg/storage/index" +) + +type Metrics struct { + errorCounter prometheus.Counter + logIndex *prometheus.GaugeVec + logSize *prometheus.GaugeVec + logTimestamp *prometheus.GaugeVec + needRestart prometheus.Gauge + unexpectedCertificateCount *prometheus.GaugeVec +} + +func NewMetrics(registry *prometheus.Registry) *Metrics { + m := &Metrics{ + errorCounter: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "silentct_error_counter", + Help: "The number of errors propagated to the main loop.", + }, + ), + logIndex: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "silentct_log_index", + Help: "The next log entry to be downloaded.", + }, + []string{"log_id", "log_name"}, + ), + logSize: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "silentct_log_size", + Help: "The number of entries in the log.", + }, + []string{"log_id", "log_name"}, + ), + logTimestamp: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "silentct_log_timestamp", + Help: "The log's UNIX timestamp in ms.", + }, + []string{"log_id", "log_name"}, + ), + needRestart: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "silentct_need_restart", + Help: "A non-zero value if the monitor needs restarting.", + }, + ), + unexpectedCertificateCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "silentct_unexpected_certificate_count", + Help: "Number of certificates without any allowlisting", + }, + []string{"log_id", "log_name", "log_index", "crt_sans"}, + ), + } + registry.MustRegister( + m.errorCounter, + m.logIndex, + m.logSize, + m.logTimestamp, + m.needRestart, + m.unexpectedCertificateCount, + ) + return m +} + +func (m *Metrics) LogState(logName string, state monitor.State) { + labels := prometheus.Labels{ + "log_id": fmt.Sprintf("%x", state.LogID[:]), + "log_name": logName, + } + m.logIndex.With(labels).Set(float64(state.NextIndex)) + m.logSize.With(labels).Set(float64(state.TreeSize)) + m.logTimestamp.With(labels).Set(float64(state.Timestamp)) +} + +func (m *Metrics) RemoveLogState(logName string, state monitor.State) { + labels := prometheus.Labels{ + "log_id": fmt.Sprintf("%x", state.LogID[:]), + "log_name": logName, + } + m.logIndex.Delete(labels) + m.logSize.Delete(labels) + m.logTimestamp.Delete(labels) +} + +func (m *Metrics) UnexpectedCertificateCount(logNames []string, alerts []index.CertificateInfo) { + m.unexpectedCertificateCount.Reset() + for i, alert := range alerts { + labels := prometheus.Labels{ + "crt_sans": strings.Join(alert.SANs, " "), + "log_id": fmt.Sprintf("%x", alert.LogID), + "log_name": logNames[i], + "log_index": fmt.Sprintf("%d", alert.LogIndex), + } + m.unexpectedCertificateCount.With(labels).Set(1) + } +} + +func (m *Metrics) CountError() { + m.errorCounter.Inc() +} + +func (m *Metrics) NeedRestart() { + m.needRestart.Set(float64(1)) +} diff --git a/internal/monitor/backoff.go b/internal/monitor/backoff.go new file mode 100644 index 0000000..63c5f55 --- /dev/null +++ b/internal/monitor/backoff.go @@ -0,0 +1,56 @@ +package monitor + +import ( + "context" + + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "github.com/google/trillian/client/backoff" +) + +// backoffClient wraps client.LogClient so that we always backoff on get-entries +// 4XX and 5XX. Backoff is on by default for get-sth already, and our silentct +// usage is guaranteed to not do any hammering on any of the proof endpoints. +// +// For reference on this issue, see: +// https://github.com/google/certificate-transparency-go/issues/898 +type backoffClient struct { + cli *client.LogClient +} + +func (bc *backoffClient) BaseURI() string { + return bc.cli.BaseURI() +} + +func (bc *backoffClient) GetSTH(ctx context.Context) (*ct.SignedTreeHead, error) { + return bc.cli.GetSTH(ctx) +} + +func (bc *backoffClient) GetSTHConsistency(ctx context.Context, first, second uint64) ([][]byte, error) { + return bc.cli.GetSTHConsistency(ctx, first, second) +} + +func (bc *backoffClient) GetProofByHash(ctx context.Context, hash []byte, treeSize uint64) (*ct.GetProofByHashResponse, error) { + return bc.cli.GetProofByHash(ctx, hash, treeSize) +} + +func (bc *backoffClient) GetRawEntries(ctx context.Context, start, end int64) (*ct.GetEntriesResponse, error) { + rsp, err := bc.cli.GetRawEntries(ctx, start, end) + if err != nil { + jcErr, ok := err.(jsonclient.RspError) + if !ok { + return rsp, err + } + if jcErr.StatusCode < 400 || jcErr.StatusCode >= 600 { + return rsp, err + } + // This ensures we never start hammering when the status code is 4XX or + // 5XX. Probably not the right thing to do in all cases, but since the + // download library we're using starts hammering if the log suddenly + // serves something unexpected this seems like a good safety precaution. + // Users of the silentct monitor eventually notice they get no entries. + return rsp, backoff.RetriableErrorf("get-entries: %v", err) + } + return rsp, err +} diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index ffe7f75..2575977 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -18,6 +18,7 @@ import ( "net/http" "os" "sync" + "time" ct "github.com/google/certificate-transparency-go" "github.com/google/certificate-transparency-go/client" @@ -68,6 +69,8 @@ type Config struct { Logger *logger.Logger // Debug prints only (no output by default) Contact string // Something that help log operators get in touch ChunkSize uint // Min number of leaves to propagate a chunk without matches + ChunkTime time.Duration // But always send chunks (if there are any) with this interval + ExitTime time.Duration // Maximum amount of time to spend on a graceful exit BatchSize uint // Max number of certificates to accept per worker NumWorkers uint // Number of parallel workers to use for each log } @@ -94,6 +97,12 @@ func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error) if cfg.ChunkSize == 0 { cfg.ChunkSize = 4096 } + if cfg.ChunkTime == 0 { + cfg.ChunkTime = 10 * time.Minute + } + if cfg.ExitTime == 0 { + cfg.ExitTime = 10 * time.Second + } if cfg.BatchSize == 0 { cfg.BatchSize = 1024 } @@ -164,7 +173,8 @@ func (mon *Monitor) newTailRFC6962(log MonitoredLog) (tail, error) { return tail{}, err } - return tail{cfg: mon.cfg, scanner: cli, checker: cli, matcher: mon.matcher}, nil + bc := &backoffClient{cli: cli} + return tail{cfg: mon.cfg, scanner: bc, checker: bc, matcher: mon.matcher}, nil } func (mon *Monitor) newTailTile(cfg MonitoredLog) (tail, error) { diff --git a/internal/monitor/tail.go b/internal/monitor/tail.go index d00ebe6..2603e81 100644 --- a/internal/monitor/tail.go +++ b/internal/monitor/tail.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/scanner" @@ -75,31 +76,70 @@ func (t *tail) run(ctx context.Context, mon MonitoredLog, eventCh chan Event, er } func (t *tail) sequence(ctx context.Context, mon MonitoredLog, eventCh chan Event, errorCh chan error, chunkCh chan *chunk) { + var failedAt time.Time state := mon.State heap := newChunks() + sendChunk := func(ctx context.Context, force bool) { + if !failedAt.IsZero() && failedAt.Add(30*time.Second).After(time.Now()) { + return // ensures we don't spam get-sth and proof endpoints + } + + if heap.gap(state.NextIndex) { + return // nothing to send yet + } + c := heap.pop() + if !force && len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { + heap.push(c) + return // wait for a larger chunk before batch verification + } + + nextState, err := t.nextState(ctx, state, c) + if err != nil { + failedAt = time.Now() + errorCh <- err + heap.push(c) + return + } + + state = nextState + eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + } + refreshSTH := func(ctx context.Context) { + timestamp := time.UnixMilli(int64(state.Timestamp)) + timestamp = timestamp.Add(t.cfg.ChunkTime) + timestamp = timestamp.Add(-1 * time.Second) + if timestamp.After(time.Now()) { + return + } + + // Looks like we haven't send any chunks the past ChunkTime time units. + // Get a newer tree head so the timestamp can be used for freshness. + nextState, err := t.nextConsistentState(ctx, state) + if err != nil { + errorCh <- err + return + } + state = nextState + eventCh <- Event{State: state} + } + + sendTicker := time.NewTicker(t.cfg.ChunkTime) + defer sendTicker.Stop() + for { select { case <-ctx.Done(): - return // FIXME: check if we can pop something before return + dctx, cancel := context.WithTimeout(context.Background(), t.cfg.ExitTime) + defer cancel() + sendChunk(dctx, true) + refreshSTH(dctx) + return + case <-sendTicker.C: + sendChunk(ctx, true) + refreshSTH(ctx) case c := <-chunkCh: heap.push(c) - if heap.gap(state.NextIndex) { - continue - } - c = heap.pop() - if len(c.matches) == 0 && len(c.leafHashes) < int(t.cfg.ChunkSize) { - heap.push(c) - continue // FIXME: don't trigger if we havn't run nextState for too long - } - nextState, err := t.nextState(ctx, state, c) - if err != nil { - errorCh <- err - heap.push(c) - continue - } - - state = nextState - eventCh <- Event{State: state, Matches: c.matches, Errors: c.errors} + sendChunk(ctx, false) } } } @@ -121,6 +161,9 @@ func (t *tail) nextConsistentState(ctx context.Context, state State) (State, err if err != nil { return State{}, fmt.Errorf("%s: get-sth: %v", t.checker.BaseURI(), err) } + if sth.Timestamp < state.Timestamp { + return State{}, fmt.Errorf("%s: get-sth: timestamp is shrinking", t.checker.BaseURI()) + } sth.LogID = state.SignedTreeHead.LogID oldSize := state.TreeSize oldRoot := state.SHA256RootHash |