Skip to content

Commit b911ae3

Browse files
committed
Add tcp service for grpc listeners
Signed-off-by: Michael Crosby <[email protected]>
1 parent 475619c commit b911ae3

4 files changed

Lines changed: 48 additions & 3 deletions

File tree

cmd/containerd/command/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ func App() *cli.App {
198198
}
199199
serve(ctx, tl, server.ServeTTRPC)
200200

201+
if config.GRPC.TCPAddress != "" {
202+
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
203+
if err != nil {
204+
return errors.Wrapf(err, "failed to get listener for TCP grpc endpoint")
205+
}
206+
serve(ctx, l, server.ServeTCP)
207+
}
201208
// setup the main grpc endpoint
202209
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
203210
if err != nil {

plugin/plugin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ type TTRPCService interface {
129129
RegisterTTRPC(*ttrpc.Server) error
130130
}
131131

132+
// TCPService allows GRPC services to be registered with the underlying tcp server
133+
type TCPService interface {
134+
RegisterTCP(*grpc.Server) error
135+
}
136+
132137
var register = struct {
133138
sync.RWMutex
134139
r []*Registration

services/server/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ type Config struct {
5454
// GRPCConfig provides GRPC configuration for the socket
5555
type GRPCConfig struct {
5656
Address string `toml:"address"`
57+
TCPAddress string `toml:"tcp_address"`
58+
TCPTLSCert string `toml:"tcp_tls_cert"`
59+
TCPTLSKey string `toml:"tcp_tls_key"`
5760
UID int `toml:"uid"`
5861
GID int `toml:"gid"`
5962
MaxRecvMsgSize int `toml:"max_recv_message_size"`

services/server/server.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/pkg/errors"
5151
bolt "go.etcd.io/bbolt"
5252
"google.golang.org/grpc"
53+
"google.golang.org/grpc/credentials"
5354
)
5455

5556
// CreateTopLevelDirectories creates the top-level root and state directories.
@@ -81,7 +82,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
8182
if err != nil {
8283
return nil, err
8384
}
84-
8585
serverOpts := []grpc.ServerOption{
8686
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
8787
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
@@ -96,12 +96,26 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
9696
if err != nil {
9797
return nil, err
9898
}
99-
grpcServer := grpc.NewServer(serverOpts...)
99+
tcpServerOpts := serverOpts
100+
if config.GRPC.TCPTLSCert != "" {
101+
log.G(ctx).Info("setting up tls on tcp GRPC services...")
102+
creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
103+
if err != nil {
104+
return nil, err
105+
}
106+
tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds))
107+
}
100108
var (
109+
grpcServer = grpc.NewServer(serverOpts...)
110+
hrpc = grpc.NewServer(tcpServerOpts...)
111+
101112
grpcServices []plugin.Service
113+
tcpServices []plugin.TCPService
102114
ttrpcServices []plugin.TTRPCService
103-
s = &Server{
115+
116+
s = &Server{
104117
grpcServer: grpcServer,
118+
hrpc: hrpc,
105119
ttrpcServer: ttrpcServer,
106120
events: exchange.NewExchange(),
107121
config: config,
@@ -151,6 +165,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
151165
if src, ok := instance.(plugin.TTRPCService); ok {
152166
ttrpcServices = append(ttrpcServices, src)
153167
}
168+
if service, ok := instance.(plugin.TCPService); ok {
169+
tcpServices = append(tcpServices, service)
170+
}
171+
154172
s.plugins = append(s.plugins, result)
155173
}
156174
// register services after all plugins have been initialized
@@ -164,13 +182,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
164182
return nil, err
165183
}
166184
}
185+
for _, service := range tcpServices {
186+
if err := service.RegisterTCP(hrpc); err != nil {
187+
return nil, err
188+
}
189+
}
167190
return s, nil
168191
}
169192

170193
// Server is the containerd main daemon
171194
type Server struct {
172195
grpcServer *grpc.Server
173196
ttrpcServer *ttrpc.Server
197+
hrpc *grpc.Server
174198
events *exchange.Exchange
175199
config *srvconfig.Config
176200
plugins []*plugin.Plugin
@@ -201,6 +225,12 @@ func (s *Server) ServeMetrics(l net.Listener) error {
201225
return trapClosedConnErr(http.Serve(l, m))
202226
}
203227

228+
// ServeTCP allows services to serve over tcp
229+
func (s *Server) ServeTCP(l net.Listener) error {
230+
grpc_prometheus.Register(s.hrpc)
231+
return trapClosedConnErr(s.hrpc.Serve(l))
232+
}
233+
204234
// ServeDebug provides a debug endpoint
205235
func (s *Server) ServeDebug(l net.Listener) error {
206236
// don't use the default http server mux to make sure nothing gets registered

0 commit comments

Comments
 (0)