From b8b8e05c833dfb4c4191c8c1391e02d07e0e744f Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Sat, 18 Mar 2023 13:20:27 +0100 Subject: renaming files and moving around --- assemble.go | 7 -- cmd_assemble.go | 7 ++ cmd_collect.go | 300 +++++++++++++++++++++++++++++++++++++++++++++++ cmd_snapshot.go | 163 ++++++++++++++++++++++++++ collect.go | 333 ----------------------------------------------------- house_keeping.go | 80 ------------- snapshot.go | 163 -------------------------- utils_housekeep.go | 80 +++++++++++++ utils_state.go | 42 +++++++ 9 files changed, 592 insertions(+), 583 deletions(-) delete mode 100644 assemble.go create mode 100644 cmd_assemble.go create mode 100644 cmd_collect.go create mode 100644 cmd_snapshot.go delete mode 100644 collect.go delete mode 100644 house_keeping.go delete mode 100644 snapshot.go create mode 100644 utils_housekeep.go create mode 100644 utils_state.go diff --git a/assemble.go b/assemble.go deleted file mode 100644 index 246ba6e..0000000 --- a/assemble.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "fmt" - -func assemble(opts options) error { - return fmt.Errorf("TODO") -} diff --git a/cmd_assemble.go b/cmd_assemble.go new file mode 100644 index 0000000..246ba6e --- /dev/null +++ b/cmd_assemble.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func assemble(opts options) error { + return fmt.Errorf("TODO") +} diff --git a/cmd_collect.go b/cmd_collect.go new file mode 100644 index 0000000..0254bb3 --- /dev/null +++ b/cmd_collect.go @@ -0,0 +1,300 @@ +package main + +import ( + "container/heap" + "context" + "crypto/sha256" + "encoding/json" + "fmt" + logger "log" + "net/http" + "os" + "strings" + "sync" + "time" + + "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" + "git.cs.kau.se/rasmoste/ct-sans/internal/merkle" + "git.cs.kau.se/rasmoste/ct-sans/internal/x509" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "github.com/google/certificate-transparency-go/scanner" + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +func collect(opts options) error { + b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile)) + if err != nil { + return err + } + var md metadata.Metadata + if err := json.Unmarshal(b, &md); err != nil { + return err + } + + var await sync.WaitGroup + defer await.Wait() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + await.Add(1) + defer await.Done() + handleSignals(ctx, cancel) + + // + // Sometimes some worker in scanner.Fetcher isn't shutdown + // properly despite the parent context (including getRanges) + // being done. The below is an ugly hack to avoid hanging. + // + wait := time.Second * 5 // TODO: 15s + logger.Printf("INFO: about to exit, please wait %v...\n", wait) + select { + case <-time.After(wait): + os.Exit(0) + } + }() + + metricsCh := make(chan metrics) + defer close(metricsCh) + go func() { + await.Add(1) + defer await.Done() + handleMetrics(ctx, metricsCh, logs(md)) + }() + + defer cancel() + var wg sync.WaitGroup + defer wg.Wait() + for _, log := range logs(md) { + go func(log metadata.Log) { + wg.Add(1) + defer wg.Done() + + id, _ := log.Key.ID() + th, err := readState(opts, id[:]) + if err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + sth, err := readSnapshot(opts, id[:]) + if err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + cli, err := client.New(string(log.URL), + &http.Client{Transport: &http.Transport{IdleConnTimeout: 120 * time.Second}}, + jsonclient.Options{UserAgent: opts.HTTPAgent}, + ) + if err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{ + BatchSize: int(opts.BatchSize), + StartIndex: th.TreeSize, + EndIndex: int64(sth.TreeSize), + ParallelFetch: int(opts.WorkersPerLog), + }) + if uint64(th.TreeSize) == sth.TreeSize { + metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} + return + } + + // + // Sequencer that waits for sufficiently large chunks + // before verifying inclusion proofs and persisting an + // intermediate tree head (size and root hash) as well + // as the SANs that were observed up until that point. + // + chunksCh := make(chan *chunk.Chunk) + defer close(chunksCh) + cctx, fetchDone := context.WithCancel(ctx) + defer fetchDone() + go func() { + wg.Add(1) + defer wg.Done() + sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh) + }() + + // + // Callback that puts downloaded certificates into a + // chunk that a single sequencer can verify and persist + // + callback := func(eb scanner.EntryBatch) { + leafHashes := [][sha256.Size]byte{} + for i := 0; i < len(eb.Entries); i++ { + leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) + } + sans, errs := x509.SANsFromLeafEntries(eb.Start, eb.Entries) + for _, err := range errs { + logger.Printf("NOTICE: %s: %v", *log.Description, err) + } + chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans} + } + + if err := fetcher.Run(ctx, callback); err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) + cancel() + return + } + for len(chunksCh) > 0 { + select { + case <-ctx.Done(): + return // some Go routine cancelled due to an error, die + case <-time.After(time.Second): + } + } + }(log) + } + + logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") + time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn + return nil +} + +func sequence(ctx context.Context, cancel context.CancelFunc, + opts options, log metadata.Log, nextIndex int64, cli *client.LogClient, + chunksCh chan *chunk.Chunk, metricsCh chan metrics) { + desc := *log.Description + + h := &chunk.ChunkHeap{} + heap.Init(h) + for { + select { + case <-ctx.Done(): + if h.Sequence(nextIndex) { + c := h.TPop() + if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil { + logger.Printf("ERROR: %s: %v\n", desc, err) + } + } + return + case c, ok := <-chunksCh: + if ok { + h.TPush(c) + } + if !h.Sequence(nextIndex) { + continue + } + + c = h.TPop() + putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh) + if err != nil { + cancel() + logger.Printf("ERROR: %s: %v\n", desc, err) + return + } + if putBack { + h.TPush(c) + continue + } + + nextIndex += int64(len(c.LeafHashes)) + } + } +} + +func persist(c *chunk.Chunk, + opts options, log metadata.Log, cli *client.LogClient, minSequence int64, + metricsCh chan metrics) (bool, error) { + logID, _ := log.Key.ID() + desc := *log.Description + + chunkSize := int64(len(c.LeafHashes)) + if chunkSize == 0 { + return false, nil // nothing to persist + } + if chunkSize < minSequence { + return true, nil // wait for more leaves + } + + // Read persisted tree state from disk + oldTH, err := readState(opts, logID[:]) + if err != nil { + return false, err + } + if oldTH.TreeSize != c.Start { + return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start) + } + + // Read signed tree head from disk + sth, err := readSnapshot(opts, logID[:]) + if err != nil { + return false, err + } + + // + // Derive next intermediate tree state instead of verying all inclusions + // + // Independent context because we need to run inclusion and consistency + // queries after the parent context is cancelled to persist on shutdown + // + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + newTH := treeHead{TreeSize: c.Start + chunkSize} + p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize)) + if err != nil { + return true, nil // try again later + } + if p.LeafIndex != c.Start { + return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex) + } + if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), proof(p.AuditPath)); err != nil { + return false, err + } + + // Check that new tree state is consistent with what we stored on disk + var hashes [][]byte + if oldTH.TreeSize > 0 { + if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { + return true, nil // try again later + } + } + if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, proof(hashes)); err != nil { + return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err) + } + + // Check that new tree state is consistent with the signed tree head + if hashes, err = cli.GetSTHConsistency(ctx, uint64(newTH.TreeSize), sth.TreeSize); err != nil { + return true, nil // try again later + } + if err := merkle.VerifyConsistency(uint64(newTH.TreeSize), sth.TreeSize, newTH.RootHash, sth.SHA256RootHash, proof(hashes)); err != nil { + return false, fmt.Errorf("%d %x is inconsistent with signed tree head: %v", newTH.TreeSize, newTH.RootHash, err) + } + + // Persist SANs to disk + fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return false, err + } + defer fp.Close() + if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil { + return false, err + } + if err := fp.Sync(); err != nil { + return false, err + } + + // Persist new tree state to disk + b, err := json.Marshal(&newTH) + if err != nil { + return false, err + } + if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil { + return false, err + } + + // Output metrics + metricsCh <- metrics{ + Description: desc, + NumEntries: newTH.TreeSize - oldTH.TreeSize, + Timestamp: time.Now().Unix(), + Start: newTH.TreeSize, + End: int64(sth.TreeSize), + Done: uint64(newTH.TreeSize) == sth.TreeSize, + } + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize) + return false, nil +} diff --git a/cmd_snapshot.go b/cmd_snapshot.go new file mode 100644 index 0000000..5a9c50e --- /dev/null +++ b/cmd_snapshot.go @@ -0,0 +1,163 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + logger "log" + "net/http" + "os" + "time" + + "git.cs.kau.se/rasmoste/ct-sans/internal/merkle" + ct "github.com/google/certificate-transparency-go" + "github.com/google/certificate-transparency-go/client" + "github.com/google/certificate-transparency-go/jsonclient" + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +func snapshot(opts options) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := os.MkdirAll(opts.Directory, os.ModePerm); err != nil { + return err + } + + logger.Printf("INFO: updating metadata file\n") + source := metadata.NewHTTPSource(metadata.HTTPSourceOptions{Name: "google"}) + msg, sig, md, err := source.Load(ctx) + if err != nil { + return err + } + if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile), msg, 0644); err != nil { + return err + } + if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataSignatureFile), sig, 0644); err != nil { + return err + } + timestamp := []byte(fmt.Sprintf("%d", time.Now().Unix())) + if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataTimestampFile), timestamp, 0644); err != nil { + return err + } + + logger.Printf("INFO: updating signed tree heads\n") + for _, log := range logs(md) { + id, _ := log.Key.ID() + der, _ := x509.MarshalPKIXPublicKey(log.Key) + dir := fmt.Sprintf("%s/%x", opts.logDirectory, id) + sthFile := fmt.Sprintf("%s/%s", dir, opts.sthFile) + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return fmt.Errorf("%s: %v", log.Description, err) + } + + // Fetch next STH + cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: opts.HTTPAgent}) + if err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + nextSTH, err := cli.GetSTH(ctx) + if err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + nextSTHBytes, err := json.Marshal(nextSTH) + if err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + // + // It's a bit ugly that ct.SignedTreeHead contains fields that + // are not populated. Doesn't cause any prolems here, however. + // + + // Bootstrap log if we don't have any STH yet + if _, err := os.Stat(sthFile); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("%s: %v", *log.Description, err) + } + if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + + logger.Printf("INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize) + continue + } + + // Otherwise: update an existing STH + currSTHBytes, err := os.ReadFile(sthFile) + if err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + var currSTH ct.SignedTreeHead + if err := json.Unmarshal(currSTHBytes, &currSTH); err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + if nextSTH.TreeSize < currSTH.TreeSize { + return fmt.Errorf("%s: next tree size is smaller: %s", *log.Description, nextSTHBytes) + } + if nextSTH.TreeSize == currSTH.TreeSize { + if !bytes.Equal(nextSTH.SHA256RootHash[:], currSTH.SHA256RootHash[:]) { + return fmt.Errorf("%s: split-view: %s", *log.Description, nextSTHBytes) + } + + logger.Printf("INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize) + continue + } + hashes, err := cli.GetSTHConsistency(ctx, currSTH.TreeSize, nextSTH.TreeSize) + if err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + if err := merkle.VerifyConsistency(currSTH.TreeSize, + nextSTH.TreeSize, + [sha256.Size]byte(currSTH.SHA256RootHash), + [sha256.Size]byte(nextSTH.SHA256RootHash), + proof(hashes)); err != nil { + return fmt.Errorf("%s: inconsistent tree: %v", *log.Description, err) + } + if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil { + return fmt.Errorf("%s: %v", *log.Description, err) + } + logger.Printf("INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize) + } + return nil +} + +// logs select logs that count towards CT-compliance checks. Logs that don't +// have a description are skipped after printing a warning. +func logs(md metadata.Metadata) (logs []metadata.Log) { + for _, operators := range md.Operators { + for _, log := range operators.Logs { + if log.Description == nil { + fmt.Fprintf(os.Stderr, "WARNING: skipping log without description") + continue + } + if log.State == nil { + continue // skip logs with unknown states + } + if log.State.Name == metadata.LogStatePending { + continue // pending logs do not count towards CT-compliance + } + if log.State.Name == metadata.LogStateRetired { + continue // retired logs are not necessarily reachable + } + if log.State.Name == metadata.LogStateRejected { + continue // rejected logs do not count towards CT-compliance + } + + logs = append(logs, log) + } + } + return +} + +// proof formats hashes so that they can be passed to the merkle package +func proof(hashes [][]byte) (p [][sha256.Size]byte) { + for _, hash := range hashes { + var h [sha256.Size]byte + copy(h[:], hash) + p = append(p, h) + } + return +} diff --git a/collect.go b/collect.go deleted file mode 100644 index 3c548db..0000000 --- a/collect.go +++ /dev/null @@ -1,333 +0,0 @@ -package main - -import ( - "container/heap" - "context" - "crypto/sha256" - "encoding/json" - "fmt" - logger "log" - "net/http" - "os" - "strings" - "sync" - "time" - - "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" - "git.cs.kau.se/rasmoste/ct-sans/internal/merkle" - "git.cs.kau.se/rasmoste/ct-sans/internal/x509" - ct "github.com/google/certificate-transparency-go" - "github.com/google/certificate-transparency-go/client" - "github.com/google/certificate-transparency-go/jsonclient" - "github.com/google/certificate-transparency-go/scanner" - "gitlab.torproject.org/rgdd/ct/pkg/metadata" -) - -func collect(opts options) error { - b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile)) - if err != nil { - return err - } - var md metadata.Metadata - if err := json.Unmarshal(b, &md); err != nil { - return err - } - - var await sync.WaitGroup - defer await.Wait() - ctx, cancel := context.WithCancel(context.Background()) - go func() { - await.Add(1) - defer await.Done() - handleSignals(ctx, cancel) - - // - // Sometimes some worker in scanner.Fetcher isn't shutdown - // properly despite the parent context (including getRanges) - // being done. The below is an ugly hack to avoid hanging. - // - wait := time.Second * 5 // TODO: 15s - logger.Printf("INFO: about to exit, please wait %v...\n", wait) - select { - case <-time.After(wait): - os.Exit(0) - } - }() - - metricsCh := make(chan metrics) - defer close(metricsCh) - go func() { - await.Add(1) - defer await.Done() - handleMetrics(ctx, metricsCh, logs(md)) - }() - - defer cancel() - var wg sync.WaitGroup - defer wg.Wait() - for _, log := range logs(md) { - go func(log metadata.Log) { - wg.Add(1) - defer wg.Done() - - id, _ := log.Key.ID() - th, err := readState(opts, id[:]) - if err != nil { - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - cancel() - return - } - sth, err := readSnapshot(opts, id[:]) - if err != nil { - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - cancel() - return - } - cli, err := client.New(string(log.URL), - &http.Client{Transport: &http.Transport{IdleConnTimeout: 120 * time.Second}}, - jsonclient.Options{UserAgent: opts.HTTPAgent}, - ) - if err != nil { - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - cancel() - return - } - fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{ - BatchSize: int(opts.BatchSize), - StartIndex: th.TreeSize, - EndIndex: int64(sth.TreeSize), - ParallelFetch: int(opts.WorkersPerLog), - }) - if uint64(th.TreeSize) == sth.TreeSize { - metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} - return - } - - // - // Sequencer that waits for sufficiently large chunks - // before verifying inclusion proofs and persisting an - // intermediate tree head (size and root hash) as well - // as the SANs that were observed up until that point. - // - chunksCh := make(chan *chunk.Chunk) - defer close(chunksCh) - cctx, fetchDone := context.WithCancel(ctx) - defer fetchDone() - go func() { - wg.Add(1) - defer wg.Done() - sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh) - }() - - // - // Callback that puts downloaded certificates into a - // chunk that a single sequencer can verify and persist - // - callback := func(eb scanner.EntryBatch) { - leafHashes := [][sha256.Size]byte{} - for i := 0; i < len(eb.Entries); i++ { - leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) - } - sans, errs := x509.SANsFromLeafEntries(eb.Start, eb.Entries) - for _, err := range errs { - logger.Printf("NOTICE: %s: %v", *log.Description, err) - } - chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans} - } - - if err := fetcher.Run(ctx, callback); err != nil { - logger.Printf("ERROR: %s: %v\n", *log.Description, err) - cancel() - return - } - for len(chunksCh) > 0 { - select { - case <-ctx.Done(): - return // some Go routine cancelled due to an error, die - case <-time.After(time.Second): - } - } - }(log) - } - - logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") - time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn - return nil -} - -func sequence(ctx context.Context, cancel context.CancelFunc, - opts options, log metadata.Log, nextIndex int64, cli *client.LogClient, - chunksCh chan *chunk.Chunk, metricsCh chan metrics) { - desc := *log.Description - - h := &chunk.ChunkHeap{} - heap.Init(h) - for { - select { - case <-ctx.Done(): - if h.Sequence(nextIndex) { - c := h.TPop() - if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil { - logger.Printf("ERROR: %s: %v\n", desc, err) - } - } - return - case c, ok := <-chunksCh: - if ok { - h.TPush(c) - } - if !h.Sequence(nextIndex) { - continue - } - - c = h.TPop() - putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh) - if err != nil { - cancel() - logger.Printf("ERROR: %s: %v\n", desc, err) - return - } - if putBack { - h.TPush(c) - continue - } - - nextIndex += int64(len(c.LeafHashes)) - } - } -} - -func persist(c *chunk.Chunk, - opts options, log metadata.Log, cli *client.LogClient, minSequence int64, - metricsCh chan metrics) (bool, error) { - logID, _ := log.Key.ID() - desc := *log.Description - - chunkSize := int64(len(c.LeafHashes)) - if chunkSize == 0 { - return false, nil // nothing to persist - } - if chunkSize < minSequence { - return true, nil // wait for more leaves - } - - // Read persisted tree state from disk - oldTH, err := readState(opts, logID[:]) - if err != nil { - return false, err - } - if oldTH.TreeSize != c.Start { - return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start) - } - - // Read signed tree head from disk - sth, err := readSnapshot(opts, logID[:]) - if err != nil { - return false, err - } - - // - // Derive next intermediate tree state instead of verying all inclusions - // - // Independent context because we need to run inclusion and consistency - // queries after the parent context is cancelled to persist on shutdown - // - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - newTH := treeHead{TreeSize: c.Start + chunkSize} - p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize)) - if err != nil { - return true, nil // try again later - } - if p.LeafIndex != c.Start { - return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex) - } - if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), proof(p.AuditPath)); err != nil { - return false, err - } - - // Check that new tree state is consistent with what we stored on disk - var hashes [][]byte - if oldTH.TreeSize > 0 { - if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { - return true, nil // try again later - } - } - if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, proof(hashes)); err != nil { - return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err) - } - - // Check that new tree state is consistent with the signed tree head - if hashes, err = cli.GetSTHConsistency(ctx, uint64(newTH.TreeSize), sth.TreeSize); err != nil { - return true, nil // try again later - } - if err := merkle.VerifyConsistency(uint64(newTH.TreeSize), sth.TreeSize, newTH.RootHash, sth.SHA256RootHash, proof(hashes)); err != nil { - return false, fmt.Errorf("%d %x is inconsistent with signed tree head: %v", newTH.TreeSize, newTH.RootHash, err) - } - - // Persist SANs to disk - fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - return false, err - } - defer fp.Close() - if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil { - return false, err - } - if err := fp.Sync(); err != nil { - return false, err - } - - // Persist new tree state to disk - b, err := json.Marshal(&newTH) - if err != nil { - return false, err - } - if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil { - return false, err - } - - // Output metrics - metricsCh <- metrics{ - Description: desc, - NumEntries: newTH.TreeSize - oldTH.TreeSize, - Timestamp: time.Now().Unix(), - Start: newTH.TreeSize, - End: int64(sth.TreeSize), - Done: uint64(newTH.TreeSize) == sth.TreeSize, - } - logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize) - return false, nil -} - -type treeHead struct { - TreeSize int64 `json:"tree_size"` - RootHash [sha256.Size]byte `json:root_hash"` -} - -func readState(opts options, logID []byte) (treeHead, error) { - if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { - return treeHead{0, sha256.Sum256(nil)}, nil - } - b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) - if err != nil { - return treeHead{}, err - } - var th treeHead - if err := json.Unmarshal(b, &th); err != nil { - return treeHead{}, err - } - return th, nil -} - -func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { - b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) - if err != nil { - return ct.SignedTreeHead{}, err - } - var sth ct.SignedTreeHead - if err := json.Unmarshal(b, &sth); err != nil { - return ct.SignedTreeHead{}, err - } - return sth, nil -} diff --git a/house_keeping.go b/house_keeping.go deleted file mode 100644 index 670a95b..0000000 --- a/house_keeping.go +++ /dev/null @@ -1,80 +0,0 @@ -package main - -import ( - "context" - "fmt" - logger "log" - "os" - "os/signal" - "syscall" - "time" - - "gitlab.torproject.org/rgdd/ct/pkg/metadata" -) - -type metrics struct { - Description string // Human-readable log name - NumEntries int64 // Number of entries persisted - Timestamp int64 // Time that first entry was persisted - Start int64 // Next index to start fetching from - End int64 // Exclusive end index to reach - Done bool // Worker is done - - avg float64 -} - -func (m metrics) String() string { - format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n" - if m.Done { - return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End) - } - return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End) -} - -func (m *metrics) update(other metrics) { - m.NumEntries += other.NumEntries - m.Start = other.Start - m.End = other.End - m.Done = other.Done - m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp)) -} - -func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) { - sum := make(map[string]metrics) - for _, log := range logs { - sum[*log.Description] = metrics{ - Description: *log.Description, - Timestamp: time.Now().Unix(), - } - } - - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - case m := <-metricsCh: - s := sum[m.Description] - s.update(m) - sum[m.Description] = s - case <-ticker.C: - output := "" - for _, log := range logs { - output += sum[*log.Description].String() - } - logger.Printf("INFO: periodic status update\n\n%s\n\n", output) - } - } -} - -func handleSignals(ctx context.Context, cancel context.CancelFunc) { - sigs := make(chan os.Signal, 1) - defer close(sigs) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - select { - case <-sigs: - cancel() - case <-ctx.Done(): - } -} diff --git a/snapshot.go b/snapshot.go deleted file mode 100644 index 5a9c50e..0000000 --- a/snapshot.go +++ /dev/null @@ -1,163 +0,0 @@ -package main - -import ( - "bytes" - "context" - "crypto/sha256" - "crypto/x509" - "encoding/json" - "errors" - "fmt" - logger "log" - "net/http" - "os" - "time" - - "git.cs.kau.se/rasmoste/ct-sans/internal/merkle" - ct "github.com/google/certificate-transparency-go" - "github.com/google/certificate-transparency-go/client" - "github.com/google/certificate-transparency-go/jsonclient" - "gitlab.torproject.org/rgdd/ct/pkg/metadata" -) - -func snapshot(opts options) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - if err := os.MkdirAll(opts.Directory, os.ModePerm); err != nil { - return err - } - - logger.Printf("INFO: updating metadata file\n") - source := metadata.NewHTTPSource(metadata.HTTPSourceOptions{Name: "google"}) - msg, sig, md, err := source.Load(ctx) - if err != nil { - return err - } - if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile), msg, 0644); err != nil { - return err - } - if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataSignatureFile), sig, 0644); err != nil { - return err - } - timestamp := []byte(fmt.Sprintf("%d", time.Now().Unix())) - if err := os.WriteFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataTimestampFile), timestamp, 0644); err != nil { - return err - } - - logger.Printf("INFO: updating signed tree heads\n") - for _, log := range logs(md) { - id, _ := log.Key.ID() - der, _ := x509.MarshalPKIXPublicKey(log.Key) - dir := fmt.Sprintf("%s/%x", opts.logDirectory, id) - sthFile := fmt.Sprintf("%s/%s", dir, opts.sthFile) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return fmt.Errorf("%s: %v", log.Description, err) - } - - // Fetch next STH - cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: opts.HTTPAgent}) - if err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - nextSTH, err := cli.GetSTH(ctx) - if err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - nextSTHBytes, err := json.Marshal(nextSTH) - if err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - // - // It's a bit ugly that ct.SignedTreeHead contains fields that - // are not populated. Doesn't cause any prolems here, however. - // - - // Bootstrap log if we don't have any STH yet - if _, err := os.Stat(sthFile); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("%s: %v", *log.Description, err) - } - if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - - logger.Printf("INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize) - continue - } - - // Otherwise: update an existing STH - currSTHBytes, err := os.ReadFile(sthFile) - if err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - var currSTH ct.SignedTreeHead - if err := json.Unmarshal(currSTHBytes, &currSTH); err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - if nextSTH.TreeSize < currSTH.TreeSize { - return fmt.Errorf("%s: next tree size is smaller: %s", *log.Description, nextSTHBytes) - } - if nextSTH.TreeSize == currSTH.TreeSize { - if !bytes.Equal(nextSTH.SHA256RootHash[:], currSTH.SHA256RootHash[:]) { - return fmt.Errorf("%s: split-view: %s", *log.Description, nextSTHBytes) - } - - logger.Printf("INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize) - continue - } - hashes, err := cli.GetSTHConsistency(ctx, currSTH.TreeSize, nextSTH.TreeSize) - if err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - if err := merkle.VerifyConsistency(currSTH.TreeSize, - nextSTH.TreeSize, - [sha256.Size]byte(currSTH.SHA256RootHash), - [sha256.Size]byte(nextSTH.SHA256RootHash), - proof(hashes)); err != nil { - return fmt.Errorf("%s: inconsistent tree: %v", *log.Description, err) - } - if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil { - return fmt.Errorf("%s: %v", *log.Description, err) - } - logger.Printf("INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize) - } - return nil -} - -// logs select logs that count towards CT-compliance checks. Logs that don't -// have a description are skipped after printing a warning. -func logs(md metadata.Metadata) (logs []metadata.Log) { - for _, operators := range md.Operators { - for _, log := range operators.Logs { - if log.Description == nil { - fmt.Fprintf(os.Stderr, "WARNING: skipping log without description") - continue - } - if log.State == nil { - continue // skip logs with unknown states - } - if log.State.Name == metadata.LogStatePending { - continue // pending logs do not count towards CT-compliance - } - if log.State.Name == metadata.LogStateRetired { - continue // retired logs are not necessarily reachable - } - if log.State.Name == metadata.LogStateRejected { - continue // rejected logs do not count towards CT-compliance - } - - logs = append(logs, log) - } - } - return -} - -// proof formats hashes so that they can be passed to the merkle package -func proof(hashes [][]byte) (p [][sha256.Size]byte) { - for _, hash := range hashes { - var h [sha256.Size]byte - copy(h[:], hash) - p = append(p, h) - } - return -} diff --git a/utils_housekeep.go b/utils_housekeep.go new file mode 100644 index 0000000..670a95b --- /dev/null +++ b/utils_housekeep.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "fmt" + logger "log" + "os" + "os/signal" + "syscall" + "time" + + "gitlab.torproject.org/rgdd/ct/pkg/metadata" +) + +type metrics struct { + Description string // Human-readable log name + NumEntries int64 // Number of entries persisted + Timestamp int64 // Time that first entry was persisted + Start int64 // Next index to start fetching from + End int64 // Exclusive end index to reach + Done bool // Worker is done + + avg float64 +} + +func (m metrics) String() string { + format := " %32s | %6.1f entries/s | Estimated done in %6.1f hours | Working on [%d, %d)\n" + if m.Done { + return fmt.Sprintf(format, m.Description, float64(0), float64(0), m.End, m.End) + } + return fmt.Sprintf(format, m.Description, m.avg, float64((m.End-m.Start))/m.avg/3600, m.Start, m.End) +} + +func (m *metrics) update(other metrics) { + m.NumEntries += other.NumEntries + m.Start = other.Start + m.End = other.End + m.Done = other.Done + m.avg = float64(m.NumEntries) / float64((other.Timestamp - m.Timestamp)) +} + +func handleMetrics(ctx context.Context, metricsCh chan metrics, logs []metadata.Log) { + sum := make(map[string]metrics) + for _, log := range logs { + sum[*log.Description] = metrics{ + Description: *log.Description, + Timestamp: time.Now().Unix(), + } + } + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + case m := <-metricsCh: + s := sum[m.Description] + s.update(m) + sum[m.Description] = s + case <-ticker.C: + output := "" + for _, log := range logs { + output += sum[*log.Description].String() + } + logger.Printf("INFO: periodic status update\n\n%s\n\n", output) + } + } +} + +func handleSignals(ctx context.Context, cancel context.CancelFunc) { + sigs := make(chan os.Signal, 1) + defer close(sigs) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigs: + cancel() + case <-ctx.Done(): + } +} diff --git a/utils_state.go b/utils_state.go new file mode 100644 index 0000000..2273c24 --- /dev/null +++ b/utils_state.go @@ -0,0 +1,42 @@ +package main + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + "os" + + ct "github.com/google/certificate-transparency-go" +) + +type treeHead struct { + TreeSize int64 `json:"tree_size"` + RootHash [sha256.Size]byte `json:root_hash"` +} + +func readState(opts options, logID []byte) (treeHead, error) { + if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { + return treeHead{0, sha256.Sum256(nil)}, nil + } + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) + if err != nil { + return treeHead{}, err + } + var th treeHead + if err := json.Unmarshal(b, &th); err != nil { + return treeHead{}, err + } + return th, nil +} + +func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { + b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) + if err != nil { + return ct.SignedTreeHead{}, err + } + var sth ct.SignedTreeHead + if err := json.Unmarshal(b, &sth); err != nil { + return ct.SignedTreeHead{}, err + } + return sth, nil +} -- cgit v1.2.3