package main import ( "container/heap" "context" "crypto/sha256" "encoding/json" "fmt" "net/http" "os" "os/signal" "strings" "sync" "syscall" "time" "git.cs.kau.se/rasmoste/ct-sans/internal/chunk" "git.cs.kau.se/rasmoste/ct-sans/internal/utils" ct "github.com/google/certificate-transparency-go" "github.com/google/certificate-transparency-go/client" "github.com/google/certificate-transparency-go/jsonclient" "github.com/google/certificate-transparency-go/scanner" "gitlab.torproject.org/rgdd/ct/pkg/merkle" "gitlab.torproject.org/rgdd/ct/pkg/metadata" ) func collect(opts options) error { b, err := os.ReadFile(fmt.Sprintf("%s/%s", opts.Directory, opts.metadataFile)) if err != nil { return err } var md metadata.Metadata if err := json.Unmarshal(b, &md); err != nil { return err } ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup defer wg.Wait() go func() { wg.Add(1) defer wg.Done() sigs := make(chan os.Signal, 1) defer close(sigs) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: fmt.Fprintf(os.Stderr, "INFO: received shutdown signal, please wait...\n") cancel() case <-ctx.Done(): } }() for _, log := range utils.Logs(md) { go func(log metadata.Log) { wg.Add(1) defer wg.Done() chunks := make(chan *chunk.Chunk) defer close(chunks) id, _ := log.Key.ID() th, err := readState(opts, id[:]) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) cancel() return } sth, err := readSnapshot(opts, id[:]) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) cancel() return } cli, err := client.New(string(log.URL), &http.Client{}, jsonclient.Options{UserAgent: "wip2"}) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) cancel() return } fetcher := scanner.NewFetcher(cli, &scanner.FetcherOptions{ BatchSize: int(opts.batchSize), StartIndex: th.TreeSize, EndIndex: int64(sth.TreeSize), ParallelFetch: int(opts.workersPerLog), Continuous: false, }) // // Callback that puts downloaded certificates into a // chunk that a single sequencer can verify and persist // callback := func(eb scanner.EntryBatch) { leafHashes := [][sha256.Size]byte{} for i := 0; i < len(eb.Entries); i++ { leafHashes = append(leafHashes, merkle.HashLeafNode(eb.Entries[i].LeafInput)) } sans := []string{"example.com"} // TODO: fixme chunks <- &chunk.Chunk{eb.Start, leafHashes, sans} } // // Sequencer that waits for sufficiently large chunks // before verifying inclusion proofs and persisting an // intermediate tree head (size and root hash) as well // as the SANs that were observed up until that point. // go func() { wg.Add(1) defer wg.Done() defer fmt.Fprintf(os.Stderr, "INFO: %s: shutdown sequencer\n", *log.Description) h := &chunk.ChunkHeap{} heap.Init(h) curr := th.TreeSize for { select { case <-ctx.Done(): if h.Sequence(curr) { c := h.TPop() if _, err := persistChunk(cli, opts, id[:], 0, c); err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", err) } } return case c, ok := <-chunks: if ok { h.TPush(c) } if !h.Sequence(curr) { continue } c = h.TPop() putBack, err := persistChunk(cli, opts, id[:], int64(opts.persistSize), c) if err != nil { cancel() fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) return } if putBack { h.TPush(c) continue } curr += int64(len(c.LeafHashes)) } } }() if err := fetcher.Run(ctx, callback); err != nil { fmt.Fprintf(os.Stderr, "ERROR: %s: %v\n", *log.Description, err) cancel() return } fmt.Fprintf(os.Stderr, "INFO: %s: fetch completed\n", *log.Description) for len(chunks) > 0 { select { case <-ctx.Done(): return case <-time.After(1 * time.Second): fmt.Fprintf(os.Stderr, "DEBUG: %s: waiting for chunks to be consumed\n", *log.Description) } } }(log) break } time.Sleep(1 * time.Second) return fmt.Errorf("TODO") } type treeHead struct { TreeSize int64 `json:"tree_size"` RootHash [sha256.Size]byte `json:root_hash"` } func readState(opts options, logID []byte) (treeHead, error) { if _, err := os.Stat(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)); err != nil { return treeHead{0, sha256.Sum256(nil)}, nil } b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile)) if err != nil { return treeHead{}, err } var th treeHead if err := json.Unmarshal(b, &th); err != nil { return treeHead{}, err } return th, nil } func readSnapshot(opts options, logID []byte) (ct.SignedTreeHead, error) { b, err := os.ReadFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sthFile)) if err != nil { return ct.SignedTreeHead{}, err } var sth ct.SignedTreeHead if err := json.Unmarshal(b, &sth); err != nil { return ct.SignedTreeHead{}, err } return sth, nil } func persistChunk(cli *client.LogClient, opts options, logID []byte, minSequence int64, c *chunk.Chunk) (bool, error) { if len(c.LeafHashes) == 0 { return false, nil // nothing to persist } if int64(len(c.LeafHashes)) < minSequence { return true, nil // wait for more leaves } // Read persisted tree state from disk th, err := readState(opts, logID) if err != nil { return false, err } if th.TreeSize != c.Start { return false, fmt.Errorf("disk state says next index is %d, in-memory says %d", th.TreeSize, c.Start) } // Derive next intermediate tree state to persist // // Independent context because we need to run inclusion and consistency // queries after the parent context is cancelled to persist on shutdown // ctx, cancel := context.WithCancel(context.Background()) defer cancel() p, err := cli.GetProofByHash(ctx, c.LeafHashes[0][:], uint64(c.Start+int64(len(c.LeafHashes)))) if err != nil { fmt.Fprintf(os.Stderr, "WARNING: %x: %v\n", logID, err) return true, nil // try again later } if p.LeafIndex != c.Start { return false, fmt.Errorf("log says proof for entry %d is at index %d", c.Start, p.LeafIndex) } // TODO: ranged inclusion verify + consistency proof // Persist SANs to disk fp, err := os.OpenFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.sansFile), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return false, err } defer fp.Close() if _, err := fp.WriteString(strings.Join(c.SANs, "\n") + "\n"); err != nil { return false, err } if err := fp.Sync(); err != nil { return false, err } // Persist intermediate log state to disk b, err := json.Marshal(&treeHead{c.Start + int64(len(c.LeafHashes)), [sha256.Size]byte{}}) if err != nil { return false, err } if err := os.WriteFile(fmt.Sprintf("%s/%x/%s", opts.logDirectory, logID, opts.stateFile), b, 0644); err != nil { return false, err } fmt.Fprintf(os.Stderr, "DEBUG: %x: persist: start=%d end=%d\n", logID, c.Start, c.Start+int64(len(c.LeafHashes))) return false, nil }