aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRasmus Dahlberg <rasmus@rgdd.se>2023-03-17 17:39:25 +0100
committerRasmus Dahlberg <rasmus@rgdd.se>2023-03-17 17:39:25 +0100
commit10f7eb048a0cba6104b52027bff3b6f50db2dab9 (patch)
tree0d7c370a72bc0db3056af765da6fe5f670fb7f32
parent14a8a232e78599caf8c037e85e2549b10951c1af (diff)
more wip collect
-rw-r--r--collect.go85
-rw-r--r--main.go19
-rw-r--r--snapshot.go14
3 files changed, 68 insertions, 50 deletions
diff --git a/collect.go b/collect.go
index 53b7607..d60554e 100644
--- a/collect.go
+++ b/collect.go
@@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
+ logger "log"
"net/http"
"os"
"os/signal"
@@ -33,38 +34,43 @@ func collect(opts options) error {
if err := json.Unmarshal(b, &md); err != nil {
return err
}
+ logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md)))
+ var await sync.WaitGroup
+ defer await.Wait()
ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- var wg sync.WaitGroup
- defer wg.Wait()
-
go func() {
- wg.Add(1)
- defer wg.Done()
+ await.Add(1)
+ defer await.Done()
sigs := make(chan os.Signal, 1)
defer close(sigs)
- // Sometimes some worker in scanner.Fetcher isn't shutdown
- // properly despite the parent context (including getRanges)
- // being done. The below is an ugly hack to avoid hanging.
- wait := time.Second * 5 // TODO: set higher with real runs
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigs:
- fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait %v...\n", wait)
cancel()
case <-ctx.Done():
}
+
+ // Sometimes some worker in scanner.Fetcher isn't shutdown
+ // properly despite the parent context (including getRanges)
+ // being done. The below is an ugly hack to avoid hanging.
+ wait := time.Second * 5 // TODO: 15s
+ logger.Printf("INFO: about to exit, please wait %v...\n", wait)
select {
case <-time.After(wait):
os.Exit(0)
}
}()
+ defer cancel()
+ var wg sync.WaitGroup
+ defer wg.Wait()
for _, log := range utils.Logs(md) {
+ //if *log.Description != "Trust Asia Log2024-2" {
+ // continue
+ //}
go func(log metadata.Log) {
wg.Add(1)
defer wg.Done()
@@ -75,13 +81,13 @@ func collect(opts options) error {
id, _ := log.Key.ID()
th, err := readState(opts, id[:])
if err != nil {
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
cancel()
return
}
sth, err := readSnapshot(opts, id[:])
if err != nil {
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
cancel()
return
}
@@ -90,7 +96,7 @@ func collect(opts options) error {
jsonclient.Options{UserAgent: opts.HTTPAgent},
)
if err != nil {
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
cancel()
return
}
@@ -100,6 +106,10 @@ func collect(opts options) error {
EndIndex: int64(sth.TreeSize),
ParallelFetch: int(opts.WorkersPerLog),
})
+ if uint64(th.TreeSize) == sth.TreeSize {
+ logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize)
+ return
+ }
//
// Callback that puts downloaded certificates into a
@@ -112,7 +122,7 @@ func collect(opts options) error {
}
sans, errs := utils.SANsFromLeafEntries(eb.Start, eb.Entries)
for _, err := range errs {
- fmt.Fprintf(os.Stderr, "WARNING: %s: %v", *log.Description, err)
+ logger.Printf("NOTICE: %s: %v", *log.Description, err)
}
chunks <- &chunk.Chunk{eb.Start, leafHashes, sans}
}
@@ -123,21 +133,22 @@ func collect(opts options) error {
// intermediate tree head (size and root hash) as well
// as the SANs that were observed up until that point.
//
+ cctx, fetchDone := context.WithCancel(ctx)
+ defer fetchDone()
go func() {
wg.Add(1)
defer wg.Done()
- defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description)
h := &chunk.ChunkHeap{}
heap.Init(h)
curr := th.TreeSize
for {
select {
- case <-ctx.Done():
+ case <-cctx.Done():
if h.Sequence(curr) {
c := h.TPop()
- if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil {
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err)
+ if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil {
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
}
}
return
@@ -150,10 +161,10 @@ func collect(opts options) error {
}
c = h.TPop()
- putBack, err := persistChunk(cli, opts, id[:], int64(opts.PersistSize), c)
+ putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c)
if err != nil {
cancel()
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
return
}
if putBack {
@@ -166,27 +177,30 @@ func collect(opts options) error {
}
}()
+ logger.Printf("INFO: %s: working from tree size %d to %d", *log.Description, th.TreeSize, sth.TreeSize)
if err := fetcher.Run(ctx, callback); err != nil {
- fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err)
+ logger.Printf("ERROR: %s: %v\n", *log.Description, err)
cancel()
return
}
+ if ctx.Err() == nil {
+ logger.Printf("INFO: %s: completed fetch at tree size %d", *log.Description, sth.TreeSize)
+ }
- fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description)
for len(chunks) > 0 {
select {
case <-ctx.Done():
- return
+ return // some Go routine cancelled due to an error
case <-time.After(1 * time.Second):
- fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description)
+ logger.Printf("DEBUG: %s: waiting for chunks to be consumed\n", *log.Description)
}
}
}(log)
- break
}
- time.Sleep(1 * time.Second)
- return fmt.Errorf("TODO")
+ logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n")
+ time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn
+ return nil
}
type treeHead struct {
@@ -221,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) {
return sth, nil
}
-func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) {
+func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) {
chunkSize := int64(len(c.LeafHashes))
if chunkSize == 0 {
return false, nil // nothing to persist
@@ -256,7 +270,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence
newTH := treeHead{TreeSize: c.Start + chunkSize}
p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize))
if err != nil {
- fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err)
return true, nil // try again later
}
if p.LeafIndex != c.Start {
@@ -265,9 +278,11 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence
if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), utils.Proof(p.AuditPath)); err != nil {
return false, err
}
- hashes, err := cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize))
- if err != nil {
- return true, nil // try again later
+ var hashes [][]byte
+ if oldTH.TreeSize > 0 {
+ if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil {
+ return true, nil // try again later
+ }
}
if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, utils.Proof(hashes)); err != nil {
return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err)
@@ -303,6 +318,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence
return false, err
}
- fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d next=%d\n", logID, oldTH.TreeSize, newTH.TreeSize)
+ logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize)
return false, nil
}
diff --git a/main.go b/main.go
index a13ba34..3dd8969 100644
--- a/main.go
+++ b/main.go
@@ -12,6 +12,7 @@ package main
import (
"flag"
"fmt"
+ "log"
"os"
"git.cs.kau.se/rasmoste/ct-sans/internal/ctflag"
@@ -62,8 +63,10 @@ type options struct {
}
func main() {
+ log.SetOutput(os.Stdout)
+ log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
if ctflag.WantHelp(os.Args, 1) {
- fmt.Fprintf(os.Stderr, usage)
+ fmt.Fprintf(os.Stdout, usage)
os.Exit(1)
}
@@ -79,12 +82,12 @@ func main() {
// Parse command-line options and hardcode additional values
if err := ctflag.Parse(fs, os.Args[2:]); err != nil {
if err == flag.ErrHelp {
- fmt.Fprintf(os.Stderr, usage)
+ fmt.Fprintf(os.Stdout, usage)
os.Exit(1)
}
- fmt.Fprintf(os.Stderr, "error: %v\n\n", err)
- os.Exit(2)
+ fmt.Fprintf(os.Stdout, "error: %v\n\n", err)
+ os.Exit(1)
}
opts.logDirectory = opts.Directory + "/" + "logs"
opts.metadataFile = "metadata.json"
@@ -104,11 +107,11 @@ func main() {
case "assemble":
err = assemble(opts)
default:
- fmt.Fprintf(os.Stderr, "ct-sans: unknown command %q\n\n", cmd)
- os.Exit(3)
+ fmt.Fprintf(os.Stdout, "ct-sans: unknown command %q\n\n", cmd)
+ os.Exit(1)
}
if err != nil {
- fmt.Fprintf(os.Stderr, "ct-sans %s: error: %v\n", os.Args[1], err)
- os.Exit(4)
+ fmt.Fprintf(os.Stdout, "ct-sans %s: error: %v\n", os.Args[1], err)
+ os.Exit(1)
}
}
diff --git a/snapshot.go b/snapshot.go
index fb17056..63402ea 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ logger "log"
"net/http"
"os"
"time"
@@ -27,7 +28,7 @@ func snapshot(opts options) error {
return err
}
- fmt.Fprintf(os.Stderr, "INFO: updating metadata file\n")
+ logger.Printf("INFO: updating metadata file\n")
source := metadata.NewHTTPSource(metadata.HTTPSourceOptions{Name: "google"})
msg, sig, md, err := source.Load(ctx)
if err != nil {
@@ -44,7 +45,7 @@ func snapshot(opts options) error {
return err
}
- fmt.Fprintf(os.Stderr, "INFO: updating signed tree heads\n")
+ logger.Printf("INFO: updating signed tree heads\n")
for _, log := range utils.Logs(md) {
id, _ := log.Key.ID()
der, _ := x509.MarshalPKIXPublicKey(log.Key)
@@ -55,7 +56,7 @@ func snapshot(opts options) error {
}
// Fetch next STH
- cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: "wip"})
+ cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: opts.HTTPAgent})
if err != nil {
return fmt.Errorf("%s: %v", *log.Description, err)
}
@@ -81,7 +82,7 @@ func snapshot(opts options) error {
return fmt.Errorf("%s: %v", *log.Description, err)
}
- fmt.Fprintf(os.Stderr, "INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize)
+ logger.Printf("INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize)
continue
}
@@ -102,7 +103,7 @@ func snapshot(opts options) error {
return fmt.Errorf("%s: split-view: %s", *log.Description, nextSTHBytes)
}
- fmt.Fprintf(os.Stderr, "INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize)
+ logger.Printf("INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize)
continue
}
hashes, err := cli.GetSTHConsistency(ctx, currSTH.TreeSize, nextSTH.TreeSize)
@@ -119,8 +120,7 @@ func snapshot(opts options) error {
if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil {
return fmt.Errorf("%s: %v", *log.Description, err)
}
- fmt.Fprintf(os.Stderr, "INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize)
+ logger.Printf("INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize)
}
-
return nil
}