diff options
Diffstat (limited to 'internal/manager')
-rw-r--r-- | internal/manager/helpers.go | 52 | ||||
-rw-r--r-- | internal/manager/manager.go | 219 |
2 files changed, 167 insertions, 104 deletions
diff --git a/internal/manager/helpers.go b/internal/manager/helpers.go deleted file mode 100644 index a9a2158..0000000 --- a/internal/manager/helpers.go +++ /dev/null @@ -1,52 +0,0 @@ -package manager - -import ( - "crypto/sha256" - "encoding/base64" - "fmt" - - ct "github.com/google/certificate-transparency-go" - "gitlab.torproject.org/rgdd/ct/pkg/metadata" - "rgdd.se/silent-ct/pkg/monitor" -) - -func selectLogs(m metadata.Metadata) []monitor.MessageLogConfig { - var logs []monitor.MessageLogConfig - for _, operator := range m.Operators { - for _, log := range operator.Logs { - if log.State == nil { - continue // ignore logs without a state (should not happen) - } - if log.State.Name == metadata.LogStatePending { - continue // log is not yet relevant - } - if log.State.Name == metadata.LogStateRetired { - continue // log is not expected to be reachable - } - if log.State.Name == metadata.LogStateRejected { - continue // log is not expected to be reachable - } - - // FIXME: remove me instead of hard coding Argon 2024 - id, _ := log.Key.ID() - got := fmt.Sprintf("%s", base64.StdEncoding.EncodeToString(id[:])) - want := "7s3QZNXbGs7FXLedtM0TojKHRny87N7DUUhZRnEftZs=" - if got != want { - continue - } - - logs = append(logs, monitor.MessageLogConfig{ - Metadata: log, - State: monitor.MonitorState{ - LogState: monitor.LogState{ct.SignedTreeHead{ - SHA256RootHash: [sha256.Size]byte{47, 66, 110, 15, 246, 154, 8, 100, 150, 140, 206, 208, 17, 57, 112, 116, 210, 3, 19, 55, 46, 63, 209, 12, 234, 130, 225, 124, 237, 2, 64, 228}, - TreeSize: 610650601, - Timestamp: 1702108968538, - }}, - NextIndex: 388452203, - }, - }) - } - } - return logs -} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index 33207e9..bc216c1 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -2,93 +2,208 @@ package manager import ( "context" - "encoding/json" + "errors" "fmt" "os" "time" "gitlab.torproject.org/rgdd/ct/pkg/metadata" - "rgdd.se/silent-ct/pkg/monitor" - "rgdd.se/silent-ct/pkg/server" -) - -const ( - DefaultStateDir = "/home/rgdd/.local/share/silent-ct" // FIXME - DefaultMetadataRefreshInterval = 1 * time.Hour + "rgdd.se/silent-ct/internal/feedback" + "rgdd.se/silent-ct/internal/logger" + "rgdd.se/silent-ct/internal/monitor" + "rgdd.se/silent-ct/pkg/policy" + "rgdd.se/silent-ct/pkg/storage" ) type Config struct { - StateDir string - Nodes server.Nodes + Policy policy.Policy + Bootstrap bool // Whether a new storage should be initialized from scratch + Directory string // Path to a directory where everything will be stored - MetadataRefreshInterval time.Duration + // Optional + Logger logger.Logger // Where to output messages and with what verbosity + AlertDelay time.Duration // Time before alerting on certificates that are unaccounted for + MetadataRefreshInterval time.Duration // How often to update the list of monitored logs + ShutdownTimeout time.Duration // Force shutdown after this timeout (FIXME: should not be needed) } type Manager struct { Config + storage.Storage + + feventCh chan []feedback.Event + meventCh chan monitor.Event + mconfigCh chan monitor.MonitoredLog + errorCh chan error } -func New(cfg Config) (Manager, error) { - if cfg.StateDir == "" { - cfg.StateDir = DefaultStateDir +func New(cfg Config, fch chan []feedback.Event, mch chan monitor.Event, cch chan monitor.MonitoredLog, ech chan error) (Manager, error) { + if !cfg.Logger.IsConfigured() { + cfg.Logger = logger.New(logger.Config{Level: logger.LevelNotice, File: os.Stdout}) } if cfg.MetadataRefreshInterval == 0 { - cfg.MetadataRefreshInterval = DefaultMetadataRefreshInterval + cfg.MetadataRefreshInterval = 1 * time.Hour + } + if cfg.ShutdownTimeout == 0 { + cfg.ShutdownTimeout = 1 * time.Second // FIXME: increase } - return Manager{Config: cfg}, nil -} - -func (mgr *Manager) Run(ctx context.Context, - serverCh chan server.MessageNodeSubmission, - monitorCh chan monitor.MessageLogProgress, - configCh chan []monitor.MessageLogConfig, - errorCh chan error) error { - md, err := mgr.metadataRead() + s, err := storage.New(storage.Config{ + Bootstrap: cfg.Bootstrap, + Directory: cfg.Directory, + AlertDelay: cfg.AlertDelay, + StaticLogs: cfg.Policy.StaticLogs, + RemoveLogs: cfg.Policy.RemoveLogs, + }) if err != nil { - return fmt.Errorf("read metadata: %v\n", err) + return Manager{}, err + } + + for _, log := range s.LogList.Generate() { + state, err := s.BootstrapLog(context.Background(), log, cfg.Bootstrap) + if errors.Is(err, storage.ErrorMonitorStateExists) { + continue + } + if err != nil { + return Manager{}, err + } + cfg.Logger.Infof("bootstrapping log %s at next index %d\n", log.URL, state.NextIndex) + } + + return Manager{Config: cfg, Storage: s, feventCh: fch, meventCh: mch, mconfigCh: cch, errorCh: ech}, nil +} + +func (mgr *Manager) Run(ctx context.Context) error { + if err := mgr.startupConfig(); err != nil { + return fmt.Errorf("unable to do startup config: %v", err) } - configCh <- selectLogs(md) - ticker := time.NewTicker(mgr.MetadataRefreshInterval) - defer ticker.Stop() + metadataTicker := time.NewTicker(mgr.MetadataRefreshInterval) + defer metadataTicker.Stop() + shutdown := false for { select { - case <-ctx.Done(): - return nil - case <-ticker.C: - mu, err := mgr.metadataUpdate(ctx, md) - if err != nil { - continue + case <-metadataTicker.C: + if err := mgr.metadataJob(ctx); err != nil { + return fmt.Errorf("unable to run metadata job: %v", err) + } + case ev := <-mgr.meventCh: + if err := mgr.monitorJob(ev); err != nil { + return fmt.Errorf("unable to run monitor job: %v", err) + } + if err := mgr.alertJob(); err != nil { + return fmt.Errorf("unable to run alert job: %v\n", err) } - if mu.Version.Major <= md.Version.Major { + case ev := <-mgr.feventCh: + if err := mgr.feedbackJob(ev); err != nil { + return fmt.Errorf("unable to run server job: %v", err) + } + if err := mgr.alertJob(); err != nil { + return fmt.Errorf("unable to run alert job: %v\n", err) + } + case err := <-mgr.errorCh: + if err := mgr.errorJob(err); err != nil { + return fmt.Errorf("unable to run error job: %v", err) + } + case <-ctx.Done(): + if !shutdown { + shutdown = true + mgr.Logger.Noticef("shutdown scheduled in %v\n", mgr.ShutdownTimeout) + + // defer shutdown so that all channels can be drained + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), mgr.ShutdownTimeout) + defer cancel() continue } - md = mu - configCh <- selectLogs(md) - case ev := <-monitorCh: - fmt.Printf("DEBUG: received event from monitor with %d matches\n", len(ev.Matches)) - case ev := <-serverCh: - fmt.Printf("DEBUG: received event from server: %v\n", ev) - case err := <-errorCh: - fmt.Printf("DEBUG: received error: %v\n", err) + + mgr.Logger.Debugf("manager shutdown\n") + os.Exit(0) // FIXME: return nil without hanging, unpredictable gh.com/google/ct-go fetcher shutdown? } } } -func (mgr *Manager) metadataRead() (metadata.Metadata, error) { - b, err := os.ReadFile(mgr.StateDir + "/metadata.json") +func (mgr *Manager) startupConfig() error { + mgr.Logger.Debugf("startup configuring contains %d logs\n", len(mgr.Storage.LogList.Generate())) + for _, log := range mgr.Storage.LogList.Generate() { + state, err := mgr.GetMonitorState(log) + if err != nil { + return err + } + mgr.mconfigCh <- monitor.MonitoredLog{Config: log, State: state} + } + return nil +} + +func (mgr *Manager) metadataJob(ctx context.Context) error { + mgr.Logger.Debugf("running metadata job\n") + added, removed, err := mgr.LogList.Update(ctx) + if err != nil { + if mgr.LogList.IsStale() { + return fmt.Errorf("unable to update log list which is now stale: %v", err) + } + } + + mgr.removeLogs(removed) + mgr.addLogs(ctx, added) + return nil +} + +func (mgr *Manager) removeLogs(logs []metadata.Log) { + mgr.Logger.Debugf("removing %d logs\n", len(logs)) + for _, log := range logs { + state, _ := mgr.GetMonitorState(log) + mgr.Logger.Infof("removing log %s with %d entries in its backlog\n", log.URL, state.TreeSize-state.NextIndex) + mgr.mconfigCh <- monitor.MonitoredLog{Config: log} + } +} + +func (mgr *Manager) addLogs(ctx context.Context, logs []metadata.Log) { + mgr.Logger.Debugf("adding %d logs\n", len(logs)) + for _, log := range logs { + state, err := mgr.BootstrapLog(ctx, log, false) + if errors.Is(err, storage.ErrorMonitorStateExists) { + mgr.Logger.Infof("adding log %s with existing state on disk\n", log.URL) + } else if err != nil { + mgr.Logger.Noticef("restart required: failed to bootstrap new log %s: %v\n", log.URL, err) + } else { + mgr.Logger.Infof("bootstrapping log %s at next index 0\n", log.URL) + } + mgr.mconfigCh <- monitor.MonitoredLog{Config: log, State: state} + } +} + +func (mgr *Manager) feedbackJob(events []feedback.Event) error { + mgr.Logger.Debugf("received feedback with %d events", len(events)) + for _, ev := range events { + if err := mgr.AddChain(ev.NodeName, ev.PEMChain); err != nil { + return err + } + } + return nil +} + +func (mgr *Manager) monitorJob(msg monitor.Event) error { + mgr.Logger.Debugf("new state for %s\n", msg.Summary()) + if err := mgr.AddEntries(msg.State.LogID, msg.Matches); err != nil { + return err + } + return mgr.SetMonitorState(msg.State.LogID, msg.State) +} + +func (mgr *Manager) alertJob() error { + alerts, err := mgr.Index.TriggerAlerts() if err != nil { - return metadata.Metadata{}, err + return err } - var md metadata.Metadata - if err := json.Unmarshal(b, &md); err != nil { - return metadata.Metadata{}, err + for _, alert := range alerts { + mgr.Logger.Noticef("certificate mis-issuance? No node submitted certificate %s\n", alert.StoredAt) } - return md, nil + return nil } -func (mgr *Manager) metadataUpdate(ctx context.Context, old metadata.Metadata) (metadata.Metadata, error) { - return metadata.Metadata{}, fmt.Errorf("TODO: update metadata") +func (mgr *Manager) errorJob(err error) error { + mgr.Logger.Debugf("received error: %v\n", err) + return nil } |