aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor/monitor.go
blob: 12d93cf6aefa72cd48f3007128159d5aa7de0980 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Package monitor provides monitoring of Certificate Transparency logs.  If
// running in continuous mode, the list of logs can be updated dynamically.
//
// Implement the Matcher interface to customize which certificates should be
// included in the monitor's emitted events.  See MatchAll for an example.
//
// Note that this package verifies that the monitored logs are locally
// consistent with regard to the initial start-up state.  It is up to the user
// to process the monitor's emitted events and errors, and to persist state.
package monitor

import (
	"context"
	"crypto/x509"
	"encoding/base64"
	"fmt"
	"net/http"
	"os"
	"sync"

	ct "github.com/google/certificate-transparency-go"
	"github.com/google/certificate-transparency-go/client"
	"github.com/google/certificate-transparency-go/jsonclient"
	"gitlab.torproject.org/rgdd/ct/pkg/metadata"
	"rgdd.se/silent-ct/internal/logger"
)

// MonitoredLog provides information about a log the monitor is following
type MonitoredLog struct {
	Config metadata.Log
	State  State
}

// State is the latest append-only state the monitor observed from its local
// vantage point.  The next entry to download is specified by NextIndex.
type State struct {
	ct.SignedTreeHead `json:"latest_sth"`
	NextIndex         uint64 `json:"next_index"`
}

// Event carries the latest consistent monitor state, found matches, as well as
// errors that occurred while trying to match on the downloaded log entries.
type Event struct {
	State   State
	Matches []LogEntry
	Errors  []error
}

func (ev *Event) Summary() string {
	return fmt.Sprintf("log %s: tree size %d at next index %d (%d matches, %d errors)",
		base64.StdEncoding.EncodeToString(ev.State.LogID[:]),
		ev.State.TreeSize, ev.State.NextIndex, len(ev.Matches), len(ev.Errors))
}

// LogEntry is a Merkle tree leaf in a log
type LogEntry struct {
	LeafIndex uint64 `json:"leaf_index"`
	LeafData  []byte `json:"leaf_data"`
	ExtraData []byte `json:"extra_data"`
}

type Config struct {
	// Optional
	Matcher    Matcher        // Which log entries to match (default is to match all)
	Logger     *logger.Logger // Debug prints only (no output by default)
	Contact    string         // Something that help log operators get in touch
	ChunkSize  uint           // Min number of leaves to propagate a chunk without matches
	BatchSize  uint           // Max number of certificates to accept per worker
	NumWorkers uint           // Number of parallel workers to use for each log
}

type Monitor struct {
	cfg     Config
	matcher Matcher

	eventCh  chan Event
	configCh chan MonitoredLog
	errorCh  chan error
}

func New(cfg Config, evCh chan Event, cfgCh chan MonitoredLog, errCh chan error) (Monitor, error) {
	if cfg.Matcher == nil {
		cfg.Matcher = &MatchAll{}
	}
	if !cfg.Logger.IsConfigured() {
		cfg.Logger = logger.New(logger.Config{Level: logger.LevelNotice, File: os.Stderr})
	}
	if cfg.Contact == "" {
		cfg.Contact = "unknown-user"
	}
	if cfg.ChunkSize == 0 {
		cfg.ChunkSize = 16364
	}
	if cfg.BatchSize == 0 {
		cfg.BatchSize = 1024
	}
	if cfg.NumWorkers == 0 {
		cfg.NumWorkers = 2
	}
	return Monitor{cfg: cfg, matcher: cfg.Matcher, eventCh: evCh, configCh: cfgCh, errorCh: errCh}, nil
}

func (mon *Monitor) RunOnce(ctx context.Context, cfg []MonitoredLog, evCh chan Event, errCh chan error) error {
	return fmt.Errorf("TODO")
}

func (mon *Monitor) RunForever(ctx context.Context) error {
	var wg sync.WaitGroup
	defer wg.Wait()

	mctx, cancel := context.WithCancel(ctx)
	defer cancel()

	monitoring := make(map[metadata.LogURL]context.CancelFunc)
	for {
		select {
		case <-ctx.Done():
			return nil
		case log := <-mon.configCh:
			if tcancel, ok := monitoring[log.Config.URL]; ok {
				delete(monitoring, log.Config.URL)
				tcancel()
				continue
			}

			newTail := mon.newTailRFC6962
			if log.Config.DNS != nil { // FIXME: get a real nob for tile-based logs
				newTail = mon.newTailTile
			}
			t, err := newTail(log)
			if err != nil {
				return err
			}

			tctx, tcancel := context.WithCancel(mctx)
			monitoring[log.Config.URL] = tcancel

			wg.Add(1)
			go func(log MonitoredLog, t tail) {
				defer wg.Done()
				defer tcancel()
				t.run(tctx, log, mon.eventCh, mon.errorCh)
			}(log, t)
		}
	}
}

const userAgentPrefix = "rgdd.se/silent-ct"

func (mon *Monitor) newTailRFC6962(log MonitoredLog) (tail, error) {
	key, err := x509.MarshalPKIXPublicKey(log.Config.Key.Public)
	if err != nil {
		return tail{}, err
	}
	cli, err := client.New(string(log.Config.URL), &http.Client{}, jsonclient.Options{
		Logger:       &discard{},
		UserAgent:    userAgentPrefix + "/" + mon.cfg.Contact,
		PublicKeyDER: key,
	})
	if err != nil {
		return tail{}, err
	}

	return tail{cfg: mon.cfg, scanner: cli, checker: cli, matcher: mon.matcher}, nil
}

func (mon *Monitor) newTailTile(cfg MonitoredLog) (tail, error) {
	return tail{}, fmt.Errorf("TODO")
}

type discard struct{}

func (n *discard) Printf(string, ...interface{}) {}