From 10f7eb048a0cba6104b52027bff3b6f50db2dab9 Mon Sep 17 00:00:00 2001 From: Rasmus Dahlberg Date: Fri, 17 Mar 2023 17:39:25 +0100 Subject: more wip collect --- collect.go | 85 ++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 35 deletions(-) (limited to 'collect.go') diff --git a/collect.go b/collect.go index 53b7607..d60554e 100644 --- a/collect.go +++ b/collect.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/json" "fmt" + logger "log" "net/http" "os" "os/signal" @@ -33,38 +34,43 @@ func collect(opts options) error { if err := json.Unmarshal(b, &md); err != nil { return err } + logger.Printf("INFO: found metadata file with %d logs\n", len(utils.Logs(md))) + var await sync.WaitGroup + defer await.Wait() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var wg sync.WaitGroup - defer wg.Wait() - go func() { - wg.Add(1) - defer wg.Done() + await.Add(1) + defer await.Done() sigs := make(chan os.Signal, 1) defer close(sigs) - // Sometimes some worker in scanner.Fetcher isn't shutdown - // properly despite the parent context (including getRanges) - // being done. The below is an ugly hack to avoid hanging. - wait := time.Second * 5 // TODO: set higher with real runs signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: - fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait %v...\n", wait) cancel() case <-ctx.Done(): } + + // Sometimes some worker in scanner.Fetcher isn't shutdown + // properly despite the parent context (including getRanges) + // being done. The below is an ugly hack to avoid hanging. + wait := time.Second * 5 // TODO: 15s + logger.Printf("INFO: about to exit, please wait %v...\n", wait) select { case <-time.After(wait): os.Exit(0) } }() + defer cancel() + var wg sync.WaitGroup + defer wg.Wait() for _, log := range utils.Logs(md) { + //if *log.Description != "Trust Asia Log2024-2" { + // continue + //} go func(log metadata.Log) { wg.Add(1) defer wg.Done() @@ -75,13 +81,13 @@ func collect(opts options) error { id, _ := log.Key.ID() th, err := readState(opts, id[:]) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } sth, err := readSnapshot(opts, id[:]) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } @@ -90,7 +96,7 @@ func collect(opts options) error { jsonclient.Options{UserAgent: opts.HTTPAgent}, ) if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } @@ -100,6 +106,10 @@ func collect(opts options) error { EndIndex: int64(sth.TreeSize), ParallelFetch: int(opts.WorkersPerLog), }) + if uint64(th.TreeSize) == sth.TreeSize { + logger.Printf("INFO: %s: up-to-date with tree size %d", *log.Description, th.TreeSize) + return + } // // Callback that puts downloaded certificates into a @@ -112,7 +122,7 @@ func collect(opts options) error { } sans, errs := utils.SANsFromLeafEntries(eb.Start, eb.Entries) for _, err := range errs { - fmt.Fprintf(os.Stderr, "WARNING: %s: %v", *log.Description, err) + logger.Printf("NOTICE: %s: %v", *log.Description, err) } chunks <- &chunk.Chunk{eb.Start, leafHashes, sans} } @@ -123,21 +133,22 @@ func collect(opts options) error { // intermediate tree head (size and root hash) as well // as the SANs that were observed up until that point. // + cctx, fetchDone := context.WithCancel(ctx) + defer fetchDone() go func() { wg.Add(1) defer wg.Done() - defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description) h := &chunk.ChunkHeap{} heap.Init(h) curr := th.TreeSize for { select { - case <-ctx.Done(): + case <-cctx.Done(): if h.Sequence(curr) { c := h.TPop() - if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err) + if _, err := persistChunk(cli, opts, id[:], *log.Description, 0, c); err != nil { + logger.Printf("ERROR: %s: %v\n", *log.Description, err) } } return @@ -150,10 +161,10 @@ func collect(opts options) error { } c = h.TPop() - putBack, err := persistChunk(cli, opts, id[:], int64(opts.PersistSize), c) + putBack, err := persistChunk(cli, opts, id[:], *log.Description, int64(opts.PersistSize), c) if err != nil { cancel() - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) return } if putBack { @@ -166,27 +177,30 @@ func collect(opts options) error { } }() + logger.Printf("INFO: %s: working from tree size %d to %d", *log.Description, th.TreeSize, sth.TreeSize) if err := fetcher.Run(ctx, callback); err != nil { - fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) + logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } + if ctx.Err() == nil { + logger.Printf("INFO: %s: completed fetch at tree size %d", *log.Description, sth.TreeSize) + } - fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description) for len(chunks) > 0 { select { case <-ctx.Done(): - return + return // some Go routine cancelled due to an error case <-time.After(1 * time.Second): - fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) + logger.Printf("DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) } } }(log) - break } - time.Sleep(1 * time.Second) - return fmt.Errorf("TODO") + logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") + time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn + return nil } type treeHead struct { @@ -221,7 +235,7 @@ func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { return sth, nil } -func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) { +func persistChunk(cli *client.LogClient, opts options, logID []byte, logDesc string, minSequence int64, c *chunk.Chunk) (bool, error) { chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist @@ -256,7 +270,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence newTH := treeHead{TreeSize: c.Start + chunkSize} p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize)) if err != nil { - fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err) return true, nil // try again later } if p.LeafIndex != c.Start { @@ -265,9 +278,11 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), utils.Proof(p.AuditPath)); err != nil { return false, err } - hashes, err := cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)) - if err != nil { - return true, nil // try again later + var hashes [][]byte + if oldTH.TreeSize > 0 { + if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { + return true, nil // try again later + } } if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, utils.Proof(hashes)); err != nil { return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err) @@ -303,6 +318,6 @@ func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence return false, err } - fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d next=%d\n", logID, oldTH.TreeSize, newTH.TreeSize) + logger.Printf("DEBUG: %s: persisted [%d, %d]\n", logDesc, oldTH.TreeSize, newTH.TreeSize) return false, nil } -- cgit v1.2.3