diff options
author | Rasmus Dahlberg <rasmus@rgdd.se> | 2023-03-17 17:39:25 +0100 |
---|---|---|
committer | Rasmus Dahlberg <rasmus@rgdd.se> | 2023-03-17 17:39:25 +0100 |
commit | 10f7eb048a0cba6104b52027bff3b6f50db2dab9 (patch) | |
tree | 0d7c370a72bc0db3056af765da6fe5f670fb7f32 | |
parent | 14a8a232e78599caf8c037e85e2549b10951c1af (diff) |
more wip collect
-rw-r--r-- | collect.go | 85 | ||||
-rw-r--r-- | main.go | 19 | ||||
-rw-r--r-- | snapshot.go | 14 |
3 files changed, 68 insertions, 50 deletions
@@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/json" "fmt" + logger "log" "net/http" "os" "os/signal" @@ -33,38 +34,43 @@ func collect(opts options) error { if err := json.Unmarshal(b, &md); err != nil { return err } + logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md))) + var await sync.WaitGroup + defer await.Wait() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var wg sync.WaitGroup - defer wg.Wait() - go func() { - wg.Add(1) - defer wg.Done() + await.Add(1) + defer await.Done() sigs := make(chan os.Signal, 1) defer close(sigs) - // Sometimes some worker in scanner.Fetcher isn't shutdown - // properly despite the parent context (including getRanges) - // being done. The below is an ugly hack to avoid hanging. - wait := time.Second * 5 // TODO: set higher with real runs signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: - fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait %v...\n", wait) cancel() case <-ctx.Done(): } + + // Sometimes some worker in scanner.Fetcher isn't shutdown + // properly despite the parent context (including getRanges) + // being done. The below is an ugly hack to avoid hanging. + wait := time.Second * 5 // TODO: 15s + logger.Printf("INFO: about to exit, please wait %v...\n", wait) select { case <-time.After(wait): os.Exit(0) } }() + defer cancel() + var wg sync.WaitGroup + defer wg.Wait() for _, log := range utils.Logs(md) { + //if *log.Description != "Trust Asia Log2024-2" { + // continue + //} go func(log metadata.Log) { wg.Add(1) defer wg.Done() @@ -75,13 +81,13 @@ func collect(opts options) error { id, _ := log.Key.ID() th, err := readState(opts, id[:]) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } sth, err := readSnapshot(opts, id[:]) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } @@ -90,7 +96,7 @@ func collect(opts options) error { jsonclient.Options{UserAgent: opts.HTTPAgent}, ) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } @@ -100,6 +106,10 @@ func collect(opts options) error { EndIndex: int64(sth.TreeSize), ParallelFetch: int(opts.WorkersPerLog), }) + if uint64(th.TreeSize) == sth.TreeSize { + logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize) + return + } // // Callback that puts downloaded certificates into a @@ -112,7 +122,7 @@ func collect(opts options) error { } sans, errs := utils.SANsFromLeafEntries(eb.Start, eb.Entries) for _, err := range errs { - fmt.Fprintf(os.Stderr, "WARNING: %s: %v", *log.Description, err) + logger.Printf("NOTICE: %s: %v", *log.Description, err) } chunks <- &chunk.Chunk{eb.Start, leafHashes, sans} } @@ -123,21 +133,22 @@ func collect(opts options) error { // intermediate tree head (size and root hash) as well // as the SANs that were observed up until that point. // + cctx, fetchDone := context.WithCancel(ctx) + defer fetchDone() go func() { wg.Add(1) defer wg.Done() - defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description) h := &chunk.ChunkHeap{} heap.Init(h) curr := th.TreeSize for { select { - case <-ctx.Done(): + case <-cctx.Done(): if h.Sequence(curr) { c := h.TPop() - if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err) + if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) } } return @@ -150,10 +161,10 @@ func collect(opts options) error { } c = h.TPop() - putBack, err := persistChunk(cli, opts, id[:], int64(opts.PersistSize), c) + putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) if err != nil { cancel() - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) return } if putBack { @@ -166,27 +177,30 @@ func collect(opts options) error { } }() + logger.Printf("INFO: %s: working from tree size %d to %d", *log.Description, th.TreeSize, sth.TreeSize) if err := fetcher.Run(ctx, callback); err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } + if ctx.Err() == nil { + logger.Printf("INFO: %s: completed fetch at tree size %d", *log.Description, sth.TreeSize) + } - fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description) for len(chunks) > 0 { select { case <-ctx.Done(): - return + return // some Go routine cancelled due to an error case <-time.After(1 * time.Second): - fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) + logger.Printf("DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) } } }(log) - break } - time.Sleep(1 * time.Second) - return fmt.Errorf("TODO") + logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") + time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn + return nil } type treeHead struct { @@ -221,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { return sth, nil } -func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) { +func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist @@ -256,7 +270,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence newTH := treeHead{TreeSize: c.Start + chunkSize} p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize)) if err != nil { - fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err) return true, nil // try again later } if p.LeafIndex != c.Start { @@ -265,9 +278,11 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), utils.Proof(p.AuditPath)); err != nil { return false, err } - hashes, err := cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)) - if err != nil { - return true, nil // try again later + var hashes [][]byte + if oldTH.TreeSize > 0 { + if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { + return true, nil // try again later + } } if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, utils.Proof(hashes)); err != nil { return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err) @@ -303,6 +318,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence return false, err } - fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d next=%d\n", logID, oldTH.TreeSize, newTH.TreeSize) + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize) return false, nil } @@ -12,6 +12,7 @@ package main import ( "flag" "fmt" + "log" "os" "git.cs.kau.se/rasmoste/ct-sans/internal/ctflag" @@ -62,8 +63,10 @@ type options struct { } func main() { + log.SetOutput(os.Stdout) + log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) if ctflag.WantHelp(os.Args, 1) { - fmt.Fprintf(os.Stderr, usage) + fmt.Fprintf(os.Stdout, usage) os.Exit(1) } @@ -79,12 +82,12 @@ func main() { // Parse command-line options and hardcode additional values if err := ctflag.Parse(fs, os.Args[2:]); err != nil { if err == flag.ErrHelp { - fmt.Fprintf(os.Stderr, usage) + fmt.Fprintf(os.Stdout, usage) os.Exit(1) } - fmt.Fprintf(os.Stderr, "error: %v\n\n", err) - os.Exit(2) + fmt.Fprintf(os.Stdout, "error: %v\n\n", err) + os.Exit(1) } opts.logDirectory = opts.Directory + "/" + "logs" opts.metadataFile = "metadata.json" @@ -104,11 +107,11 @@ func main() { case "assemble": err = assemble(opts) default: - fmt.Fprintf(os.Stderr, "ct-sans: unknown command %q\n\n", cmd) - os.Exit(3) + fmt.Fprintf(os.Stdout, "ct-sans: unknown command %q\n\n", cmd) + os.Exit(1) } if err != nil { - fmt.Fprintf(os.Stderr, "ct-sans %s: error: %v\n", os.Args[1], err) - os.Exit(4) + fmt.Fprintf(os.Stdout, "ct-sans %s: error: %v\n", os.Args[1], err) + os.Exit(1) } } diff --git a/snapshot.go b/snapshot.go index fb17056..63402ea 100644 --- a/snapshot.go +++ b/snapshot.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + logger "log" "net/http" "os" "time" @@ -27,7 +28,7 @@ func snapshot(opts options) error { return err } - fmt.Fprintf(os.Stderr, "INFO: updating metadata file\n") + logger.Printf("INFO: updating metadata file\n") source := metadata.NewHTTPSource(metadata.HTTPSourceOptions{Name: "google"}) msg, sig, md, err := source.Load(ctx) if err != nil { @@ -44,7 +45,7 @@ func snapshot(opts options) error { return err } - fmt.Fprintf(os.Stderr, "INFO: updating signed tree heads\n") + logger.Printf("INFO: updating signed tree heads\n") for _, log := range utils.Logs(md) { id, _ := log.Key.ID() der, _ := x509.MarshalPKIXPublicKey(log.Key) @@ -55,7 +56,7 @@ func snapshot(opts options) error { } // Fetch next STH - cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: "wip"}) + cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{PublicKeyDER: der, UserAgent: opts.HTTPAgent}) if err != nil { return fmt.Errorf("%s: %v", *log.Description, err) } @@ -81,7 +82,7 @@ func snapshot(opts options) error { return fmt.Errorf("%s: %v", *log.Description, err) } - fmt.Fprintf(os.Stderr, "INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize) + logger.Printf("INFO: bootstrapped %s at tree size %d\n", *log.Description, nextSTH.TreeSize) continue } @@ -102,7 +103,7 @@ func snapshot(opts options) error { return fmt.Errorf("%s: split-view: %s", *log.Description, nextSTHBytes) } - fmt.Fprintf(os.Stderr, "INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize) + logger.Printf("INFO: %s is already up-to-date at size %d\n", *log.Description, nextSTH.TreeSize) continue } hashes, err := cli.GetSTHConsistency(ctx, currSTH.TreeSize, nextSTH.TreeSize) @@ -119,8 +120,7 @@ func snapshot(opts options) error { if err := os.WriteFile(sthFile, nextSTHBytes, 0644); err != nil { return fmt.Errorf("%s: %v", *log.Description, err) } - fmt.Fprintf(os.Stderr, "INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize) + logger.Printf("INFO: updated %s to tree size %d\n", *log.Description, nextSTH.TreeSize) } - return nil } |