Skip to content

Commit 2d8a65b

Browse files
author
Kathryn Baldauf
committed
Export shim publisher functions
- Our out of tree shim would like to publish events with ttrpc. These functions should be exposed so our shim doesn't need to reimplement publisher logic. Signed-off-by: Kathryn Baldauf <[email protected]>
1 parent 3096178 commit 2d8a65b

2 files changed

Lines changed: 10 additions & 10 deletions

File tree

runtime/v2/shim/publisher.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ type item struct {
4141
count int
4242
}
4343

44-
func newPublisher(address string) (*remoteEventsPublisher, error) {
44+
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
4545
client, err := ttrpcutil.NewClient(address)
4646
if err != nil {
4747
return nil, err
4848
}
4949

50-
l := &remoteEventsPublisher{
50+
l := &RemoteEventsPublisher{
5151
client: client,
5252
closed: make(chan struct{}),
5353
requeue: make(chan *item, queueSize),
@@ -57,26 +57,26 @@ func newPublisher(address string) (*remoteEventsPublisher, error) {
5757
return l, nil
5858
}
5959

60-
type remoteEventsPublisher struct {
60+
type RemoteEventsPublisher struct {
6161
client *ttrpcutil.Client
6262
closed chan struct{}
6363
closer sync.Once
6464
requeue chan *item
6565
}
6666

67-
func (l *remoteEventsPublisher) Done() <-chan struct{} {
67+
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
6868
return l.closed
6969
}
7070

71-
func (l *remoteEventsPublisher) Close() (err error) {
71+
func (l *RemoteEventsPublisher) Close() (err error) {
7272
err = l.client.Close()
7373
l.closer.Do(func() {
7474
close(l.closed)
7575
})
7676
return err
7777
}
7878

79-
func (l *remoteEventsPublisher) processQueue() {
79+
func (l *RemoteEventsPublisher) processQueue() {
8080
for i := range l.requeue {
8181
if i.count > maxRequeue {
8282
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
@@ -91,7 +91,7 @@ func (l *remoteEventsPublisher) processQueue() {
9191
}
9292
}
9393

94-
func (l *remoteEventsPublisher) queue(i *item) {
94+
func (l *RemoteEventsPublisher) queue(i *item) {
9595
go func() {
9696
i.count++
9797
// re-queue after a short delay
@@ -100,7 +100,7 @@ func (l *remoteEventsPublisher) queue(i *item) {
100100
}()
101101
}
102102

103-
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
103+
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
104104
ns, err := namespaces.NamespaceRequired(ctx)
105105
if err != nil {
106106
return err
@@ -127,7 +127,7 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
127127
return nil
128128
}
129129

130-
func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
130+
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
131131
_, err := l.client.EventsService().Forward(ctx, req)
132132
if err == nil {
133133
return nil

runtime/v2/shim/shim.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func run(id string, initFunc Init, config Config) error {
169169

170170
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
171171

172-
publisher, err := newPublisher(ttrpcAddress)
172+
publisher, err := NewPublisher(ttrpcAddress)
173173
if err != nil {
174174
return err
175175
}

0 commit comments

Comments
 (0)