Skip to content

Commit a49cd04

Browse files
authored
feat: Support publishing new log entries to Pub/Sub topics (#1580)
Adds initial support publishing new log entries to Pub/Sub topics. Interested parties can subscribe to the topic in order to receive notifications when new entries are added. Signed-off-by: James Alseth <[email protected]>
1 parent 45bbaf0 commit a49cd04

22 files changed

+1099
-33
lines changed

Dockerfile.pubsub-emulator

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# gcloud sdk for pubsub emulator with netcat added for the startup health check
2+
FROM google/cloud-sdk:443.0.0@sha256:a59335d227a98b41ecdf6ff3d69923b8aef0703694e58992ecb75c7147140d37
3+
RUN apt-get install -y netcat

cmd/rekor-server/app/root.go

+4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func init() {
9393
Memory and file-based signers should only be used for testing.`)
9494
rootCmd.PersistentFlags().String("rekor_server.signer-passwd", "", "Password to decrypt signer private key")
9595

96+
rootCmd.PersistentFlags().String("rekor_server.new_entry_publisher", "", "URL for pub/sub queue to send messages to when new entries are added to the log. Ignored if not set. Supported providers: [gcppubsub]")
97+
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_protobuf", false, "Whether to publish events in Protobuf wire format. Applies to all enabled event types.")
98+
rootCmd.PersistentFlags().Bool("rekor_server.publish_events_json", false, "Whether to publish events in CloudEvents JSON format. Applies to all enabled event types.")
99+
96100
rootCmd.PersistentFlags().Uint16("port", 3000, "Port to bind to")
97101

98102
rootCmd.PersistentFlags().Bool("enable_retrieve_api", true, "enables Redis-based index API endpoint")

cmd/rekor-server/app/serve.go

-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ var serveCmd = &cobra.Command{
6161
Short: "start http server with configured api",
6262
Long: `Starts a http server and serves the configured api`,
6363
Run: func(cmd *cobra.Command, args []string) {
64-
6564
// Setup the logger to dev/prod
6665
log.ConfigureLogger(viper.GetString("log_type"))
6766

@@ -83,7 +82,6 @@ var serveCmd = &cobra.Command{
8382
log.Logger.Error(err)
8483
}
8584
}()
86-
8785
//TODO: make this a config option for server to load via viper field
8886
//TODO: add command line option to print versions supported in binary
8987

@@ -101,7 +99,6 @@ var serveCmd = &cobra.Command{
10199
hashedrekord.KIND: {hashedrekord_v001.APIVERSION},
102100
dsse.KIND: {dsse_v001.APIVERSION},
103101
}
104-
105102
for k, v := range pluggableTypeMap {
106103
log.Logger.Infof("Loading support for pluggable type '%v'", k)
107104
log.Logger.Infof("Loading version '%v' for pluggable type '%v'", v, k)

docker-compose.test.yml

+24-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ services:
2020
context: .
2121
target: "test"
2222
environment:
23-
- TMPDIR=/var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
23+
TMPDIR: /var/run/attestations # workaround for https://github.com/google/go-cloud/issues/3294
24+
PUBSUB_EMULATOR_HOST: gcp-pubsub-emulator:8085
2425
command: [
2526
"rekor-server",
2627
"-test.coverprofile=rekor-server.cov",
@@ -34,7 +35,29 @@ services:
3435
"--enable_attestation_storage",
3536
"--attestation_storage_bucket=file:///var/run/attestations",
3637
"--max_request_body_size=32792576",
38+
"--rekor_server.new_entry_publisher=gcppubsub://projects/test-project/topics/new-entry",
39+
"--rekor_server.publish_events_json=true",
3740
]
3841
ports:
3942
- "3000:3000"
4043
- "2112:2112"
44+
depends_on:
45+
- gcp-pubsub-emulator
46+
gcp-pubsub-emulator:
47+
image: gcp-pubsub-emulator
48+
ports:
49+
- "8085:8085"
50+
command:
51+
- gcloud
52+
- beta
53+
- emulators
54+
- pubsub
55+
- start
56+
- --host-port=0.0.0.0:8085
57+
- --project=test-project
58+
healthcheck:
59+
test: ["CMD", "nc", "-zv", "localhost", "8085"]
60+
interval: 10s
61+
timeout: 3s
62+
retries: 3
63+
start_period: 10s

docker-compose.yml

-1
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,3 @@ services:
114114
timeout: 3s
115115
retries: 3
116116
start_period: 5s
117-

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ require (
5151
)
5252

5353
require (
54+
cloud.google.com/go/pubsub v1.33.0
5455
github.com/AdamKorcz/go-fuzz-headers-1 v0.0.0-20230618160516-e936619f9f18
5556
github.com/cyberphone/json-canonicalization v0.0.0-20220623050100-57a0ce2678a7
5657
github.com/go-redis/redismock/v9 v9.0.3
@@ -183,7 +184,7 @@ require (
183184
golang.org/x/term v0.11.0 // indirect
184185
golang.org/x/text v0.12.0 // indirect
185186
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
186-
google.golang.org/api v0.135.0 // indirect
187+
google.golang.org/api v0.135.0
187188
google.golang.org/appengine v1.6.7 // indirect
188189
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
189190
gopkg.in/yaml.v2 v2.4.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
3939
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
4040
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
4141
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
42+
cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g=
43+
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
4244
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
4345
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
4446
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=

pkg/api/api.go

+24
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"google.golang.org/grpc/credentials/insecure"
3232

3333
"github.com/sigstore/rekor/pkg/log"
34+
"github.com/sigstore/rekor/pkg/pubsub"
3435
"github.com/sigstore/rekor/pkg/sharding"
3536
"github.com/sigstore/rekor/pkg/signer"
3637
"github.com/sigstore/rekor/pkg/storage"
@@ -39,6 +40,8 @@ import (
3940
"github.com/sigstore/sigstore/pkg/cryptoutils"
4041
"github.com/sigstore/sigstore/pkg/signature"
4142
"github.com/sigstore/sigstore/pkg/signature/options"
43+
44+
_ "github.com/sigstore/rekor/pkg/pubsub/gcp" // Load GCP pubsub implementation
4245
)
4346

4447
func dial(ctx context.Context, rpcServer string) (*grpc.ClientConn, error) {
@@ -63,6 +66,9 @@ type API struct {
6366
signer signature.Signer
6467
// stops checkpoint publishing
6568
checkpointPublishCancel context.CancelFunc
69+
// Publishes notifications when new entries are added to the log. May be
70+
// nil if no publisher is configured.
71+
newEntryPublisher pubsub.Publisher
6672
}
6773

6874
func NewAPI(treeID uint) (*API, error) {
@@ -112,6 +118,18 @@ func NewAPI(treeID uint) (*API, error) {
112118

113119
pubkey := cryptoutils.PEMEncode(cryptoutils.PublicKeyPEMType, b)
114120

121+
var newEntryPublisher pubsub.Publisher
122+
if p := viper.GetString("rekor_server.new_entry_publisher"); p != "" {
123+
if !viper.GetBool("rekor_server.publish_events_protobuf") && !viper.GetBool("rekor_server.publish_events_json") {
124+
return nil, fmt.Errorf("%q is configured but neither %q or %q are enabled", "new_entry_publisher", "publish_events_protobuf", "publish_events_json")
125+
}
126+
newEntryPublisher, err = pubsub.Get(ctx, p)
127+
if err != nil {
128+
return nil, fmt.Errorf("init event publisher: %w", err)
129+
}
130+
log.ContextLogger(ctx).Infof("Initialized new entry event publisher: %s", p)
131+
}
132+
115133
return &API{
116134
// Transparency Log Stuff
117135
logClient: logClient,
@@ -121,6 +139,8 @@ func NewAPI(treeID uint) (*API, error) {
121139
pubkey: string(pubkey),
122140
pubkeyHash: hex.EncodeToString(pubkeyHashBytes[:]),
123141
signer: rekorSigner,
142+
// Utility functionality not required for operation of the core service
143+
newEntryPublisher: newEntryPublisher,
124144
}, nil
125145
}
126146

@@ -166,4 +186,8 @@ func ConfigureAPI(treeID uint) {
166186

167187
func StopAPI() {
168188
api.checkpointPublishCancel()
189+
190+
if api.newEntryPublisher != nil {
191+
api.newEntryPublisher.Close()
192+
}
169193
}

pkg/api/entries.go

+62-1
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@ import (
3636
"google.golang.org/genproto/googleapis/rpc/code"
3737
"google.golang.org/grpc/codes"
3838

39+
"github.com/sigstore/rekor/pkg/events"
40+
"github.com/sigstore/rekor/pkg/events/newentry"
3941
"github.com/sigstore/rekor/pkg/generated/models"
4042
"github.com/sigstore/rekor/pkg/generated/restapi/operations/entries"
4143
"github.com/sigstore/rekor/pkg/log"
44+
"github.com/sigstore/rekor/pkg/pubsub"
4245
"github.com/sigstore/rekor/pkg/sharding"
46+
"github.com/sigstore/rekor/pkg/tle"
4347
"github.com/sigstore/rekor/pkg/trillianclient"
4448
"github.com/sigstore/rekor/pkg/types"
4549
"github.com/sigstore/rekor/pkg/util"
@@ -290,7 +294,7 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
290294
RootHash: swag.String(hex.EncodeToString(root.RootHash)),
291295
LogIndex: swag.Int64(queuedLeaf.LeafIndex),
292296
Hashes: hashes,
293-
Checkpoint: stringPointer(string(scBytes)),
297+
Checkpoint: swag.String(string(scBytes)),
294298
}
295299

296300
logEntryAnon.Verification = &models.LogEntryAnonVerification{
@@ -301,9 +305,66 @@ func createLogEntry(params entries.CreateLogEntryParams) (models.LogEntry, middl
301305
logEntry := models.LogEntry{
302306
entryID: logEntryAnon,
303307
}
308+
309+
if api.newEntryPublisher != nil {
310+
// Publishing notifications should not block the API response.
311+
go func() {
312+
verifiers, err := entry.Verifiers()
313+
if err != nil {
314+
incPublishEvent(newentry.Name, "", false)
315+
log.ContextLogger(ctx).Errorf("Could not get verifiers for log entry %s: %v", entryID, err)
316+
return
317+
}
318+
var subjects []string
319+
for _, v := range verifiers {
320+
subjects = append(subjects, v.Subjects()...)
321+
}
322+
323+
pbEntry, err := tle.GenerateTransparencyLogEntry(logEntryAnon)
324+
if err != nil {
325+
incPublishEvent(newentry.Name, "", false)
326+
log.ContextLogger(ctx).Error(err)
327+
return
328+
}
329+
event, err := newentry.New(entryID, pbEntry, subjects)
330+
if err != nil {
331+
incPublishEvent(newentry.Name, "", false)
332+
log.ContextLogger(ctx).Error(err)
333+
return
334+
}
335+
if viper.GetBool("rekor_server.publish_events_protobuf") {
336+
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeProtobuf)
337+
}
338+
if viper.GetBool("rekor_server.publish_events_json") {
339+
go publishEvent(ctx, api.newEntryPublisher, event, events.ContentTypeJSON)
340+
}
341+
}()
342+
}
343+
304344
return logEntry, nil
305345
}
306346

347+
func publishEvent(ctx context.Context, publisher pubsub.Publisher, event *events.Event, contentType events.EventContentType) {
348+
err := publisher.Publish(context.WithoutCancel(ctx), event, contentType)
349+
incPublishEvent(event.Type().Name(), contentType, err == nil)
350+
if err != nil {
351+
log.ContextLogger(ctx).Error(err)
352+
}
353+
}
354+
355+
func incPublishEvent(event string, contentType events.EventContentType, ok bool) {
356+
status := "SUCCESS"
357+
if !ok {
358+
status = "ERROR"
359+
}
360+
labels := map[string]string{
361+
"event": event,
362+
"status": status,
363+
"content_type": string(contentType),
364+
}
365+
metricPublishEvents.With(labels).Inc()
366+
}
367+
307368
// CreateLogEntryHandler creates new entry into log
308369
func CreateLogEntryHandler(params entries.CreateLogEntryParams) middleware.Responder {
309370
httpReq := params.HTTPRequest

pkg/api/metrics.go

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ var (
2929
Help: "The total number of new log entries",
3030
})
3131

32+
metricPublishEvents = promauto.NewCounterVec(prometheus.CounterOpts{
33+
Name: "rekor_publish_events",
34+
Help: "The status of publishing events to Pub/Sub",
35+
}, []string{"event", "content_type", "status"})
36+
3237
MetricLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
3338
Name: "rekor_api_latency",
3439
Help: "Api Latency on calls",

pkg/events/doc.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2023 The Sigstore Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package events provides methods for working with CloudEvents.
16+
package events
17+
18+
// The version of the CloudEvents specification the package adheres to.
19+
const CloudEventsSpecVersion = "1.0"

0 commit comments

Comments
 (0)