package main import ( "container/heap" "context" "crypto/sha256" "encoding/json" "fmt" logger "log" "net/http" "os" "strings" "sync" "time" "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" "git.cs.kau.se/rasmoste/ct-sans/internal/merkle" "git.cs.kau.se/rasmoste/ct-sans/internal/x509" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/jsonclient" "github.com/google/certificate-transparency-go/scanner" "gitlab.torproject.org/rgdd/ct/pkg/metadata" ) func collect(opts options) error { b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile)) if err != nil { return err } var md metadata.Metadata if err := json.Unmarshal(b, &md); err != nil { return err } var await sync.WaitGroup defer await.Wait() ctx, cancel := context.WithCancel(context.Background()) go func() { await.Add(1) defer await.Done() handleSignals(ctx, cancel) // // Sometimes some worker in scanner.Fetcher isn't shutdown // properly despite the parent context (including getRanges) // being done. The below is an ugly hack to avoid hanging. // wait := time.Second * 5 // TODO: 15s logger.Printf("INFO: about to exit, please wait %v...\n", wait) select { case <-time.After(wait): os.Exit(0) } }() metricsCh := make(chan metrics) defer close(metricsCh) go func() { await.Add(1) defer await.Done() handleMetrics(ctx, opts, logs(md), metricsCh) }() defer cancel() var wg sync.WaitGroup defer wg.Wait() for _, log := range logs(md) { go func(log metadata.Log) { wg.Add(1) defer wg.Done() id, _ := log.Key.ID() th, err := readState(opts, id[:]) if err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } sth, err := readSnapshot(opts, id[:]) if err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } cli, err := client.New(string(log.URL), &http.Client{Transport: &http.Transport{IdleConnTimeout: 120 * time.Second}}, jsonclient.Options{UserAgent: opts.HTTPAgent}, ) if err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{ BatchSize: int(opts.BatchSize), StartIndex: th.TreeSize, EndIndex: int64(sth.TreeSize), ParallelFetch: maxWorkers(log, opts.WorkersPerLog), }) if uint64(th.TreeSize) == sth.TreeSize { metricsCh <- metrics{Description: *log.Description, End: th.TreeSize, Done: true} return } // // Sequencer that waits for sufficiently large chunks // before verifying inclusion proofs and persisting an // intermediate tree head (size and root hash) as well // as the SANs that were observed up until that point. // chunksCh := make(chan *chunk.Chunk) defer close(chunksCh) cctx, fetchDone := context.WithCancel(ctx) defer fetchDone() go func() { wg.Add(1) defer wg.Done() sequence(cctx, cancel, opts, log, th.TreeSize, cli, chunksCh, metricsCh) }() // // Callback that puts downloaded certificates into a // chunk that a single sequencer can verify and persist // callback := func(eb scanner.EntryBatch) { leafHashes := [][sha256.Size]byte{} for i := 0; i < len(eb.Entries); i++ { leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) } sans, errs := x509.SANsFromLeafEntries(eb.Start, eb.Entries) for _, err := range errs { logger.Printf("NOTICE: %s: %v", *log.Description, err) } chunksCh <- &chunk.Chunk{eb.Start, leafHashes, sans} } if err := fetcher.Run(ctx, callback); err != nil { logger.Printf("ERROR: %s: %v\n", *log.Description, err) cancel() return } for len(chunksCh) > 0 { select { case <-ctx.Done(): return // some Go routine cancelled due to an error, die case <-time.After(time.Second): } } }(log) } logger.Printf("INFO: collect is up-and-running, ctrl+C to exit\n") time.Sleep(3 * time.Second) // ensure that Go routines had time to spawn return nil } func sequence(ctx context.Context, cancel context.CancelFunc, opts options, log metadata.Log, nextIndex int64, cli *client.LogClient, chunksCh chan *chunk.Chunk, metricsCh chan metrics) { desc := *log.Description h := &chunk.ChunkHeap{} heap.Init(h) for { select { case <-ctx.Done(): if h.Sequence(nextIndex) { c := h.TPop() if _, err := persist(c, opts, log, cli, 0, metricsCh); err != nil { logger.Printf("ERROR: %s: %v\n", desc, err) } } return case c, ok := <-chunksCh: if ok { h.TPush(c) } if !h.Sequence(nextIndex) { continue } c = h.TPop() putBack, err := persist(c, opts, log, cli, int64(opts.PersistSize), metricsCh) if err != nil { cancel() logger.Printf("ERROR: %s: %v\n", desc, err) return } if putBack { h.TPush(c) continue } nextIndex += int64(len(c.LeafHashes)) } } } func persist(c *chunk.Chunk, opts options, log metadata.Log, cli *client.LogClient, minSequence int64, metricsCh chan metrics) (bool, error) { logID, _ := log.Key.ID() desc := *log.Description chunkSize := int64(len(c.LeafHashes)) if chunkSize == 0 { return false, nil // nothing to persist } if chunkSize < minSequence { return true, nil // wait for more leaves } // Read persisted tree state from disk oldTH, err := readState(opts, logID[:]) if err != nil { return false, err } if oldTH.TreeSize != c.Start { return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", oldTH.TreeSize, c.Start) } // Read signed tree head from disk sth, err := readSnapshot(opts, logID[:]) if err != nil { return false, err } // // Derive next intermediate tree state instead of verying all inclusions // // Independent context because we need to run inclusion and consistency // queries after the parent context is cancelled to persist on shutdown // ctx, cancel := context.WithCancel(context.Background()) defer cancel() newTH := treeHead{TreeSize: c.Start + chunkSize} p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(newTH.TreeSize)) if err != nil { return true, nil // try again later } if p.LeafIndex != c.Start { return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex) } if newTH.RootHash, err = merkle.TreeHeadFromRangeProof(c.LeafHashes, uint64(c.Start), proof(p.AuditPath)); err != nil { return false, err } // Check that new tree state is consistent with what we stored on disk var hashes [][]byte if oldTH.TreeSize > 0 { if hashes, err = cli.GetSTHConsistency(ctx, uint64(oldTH.TreeSize), uint64(newTH.TreeSize)); err != nil { return true, nil // try again later } } if err := merkle.VerifyConsistency(uint64(oldTH.TreeSize), uint64(newTH.TreeSize), oldTH.RootHash, newTH.RootHash, proof(hashes)); err != nil { return false, fmt.Errorf("%d %x is inconsistent with on-disk state: %v", newTH.TreeSize, newTH.RootHash, err) } // Check that new tree state is consistent with the signed tree head if hashes, err = cli.GetSTHConsistency(ctx, uint64(newTH.TreeSize), sth.TreeSize); err != nil { return true, nil // try again later } if err := merkle.VerifyConsistency(uint64(newTH.TreeSize), sth.TreeSize, newTH.RootHash, sth.SHA256RootHash, proof(hashes)); err != nil { return false, fmt.Errorf("%d %x is inconsistent with signed tree head: %v", newTH.TreeSize, newTH.RootHash, err) } // Persist SANs to disk fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return false, err } defer fp.Close() if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil { return false, err } if err := fp.Sync(); err != nil { return false, err } // Persist new tree state to disk b, err := json.Marshal(&newTH) if err != nil { return false, err } if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil { return false, err } // Output metrics metricsCh <- metrics{ Description: desc, NumEntries: newTH.TreeSize - oldTH.TreeSize, Timestamp: time.Now().Unix(), Start: newTH.TreeSize, End: int64(sth.TreeSize), Done: uint64(newTH.TreeSize) == sth.TreeSize, } //logger.Printf("DEBUG: %s: persisted [%d, %d]\n", desc, oldTH.TreeSize, newTH.TreeSize) return false, nil }