Skip to content

Commit 5f4c977

Browse files
authored
Merge pull request #3162 from crosbymichael/tcpservice
Add tcp service for grpc listeners
2 parents ff90b03 + b911ae3 commit 5f4c977

4 files changed

Lines changed: 48 additions & 3 deletions

File tree

cmd/containerd/command/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ func App() *cli.App {
198198
}
199199
serve(ctx, tl, server.ServeTTRPC)
200200

201+
if config.GRPC.TCPAddress != "" {
202+
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
203+
if err != nil {
204+
return errors.Wrapf(err, "failed to get listener for TCP grpc endpoint")
205+
}
206+
serve(ctx, l, server.ServeTCP)
207+
}
201208
// setup the main grpc endpoint
202209
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
203210
if err != nil {

plugin/plugin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ type TTRPCService interface {
129129
RegisterTTRPC(*ttrpc.Server) error
130130
}
131131

132+
// TCPService allows GRPC services to be registered with the underlying tcp server
133+
type TCPService interface {
134+
RegisterTCP(*grpc.Server) error
135+
}
136+
132137
var register = struct {
133138
sync.RWMutex
134139
r []*Registration

services/server/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ type Config struct {
5757
// GRPCConfig provides GRPC configuration for the socket
5858
type GRPCConfig struct {
5959
Address string `toml:"address"`
60+
TCPAddress string `toml:"tcp_address"`
61+
TCPTLSCert string `toml:"tcp_tls_cert"`
62+
TCPTLSKey string `toml:"tcp_tls_key"`
6063
UID int `toml:"uid"`
6164
GID int `toml:"gid"`
6265
MaxRecvMsgSize int `toml:"max_recv_message_size"`

services/server/server.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/pkg/errors"
5151
bolt "go.etcd.io/bbolt"
5252
"google.golang.org/grpc"
53+
"google.golang.org/grpc/credentials"
5354
)
5455

5556
// CreateTopLevelDirectories creates the top-level root and state directories.
@@ -81,7 +82,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
8182
if err != nil {
8283
return nil, err
8384
}
84-
8585
serverOpts := []grpc.ServerOption{
8686
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
8787
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
@@ -96,12 +96,26 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
9696
if err != nil {
9797
return nil, err
9898
}
99-
grpcServer := grpc.NewServer(serverOpts...)
99+
tcpServerOpts := serverOpts
100+
if config.GRPC.TCPTLSCert != "" {
101+
log.G(ctx).Info("setting up tls on tcp GRPC services...")
102+
creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
103+
if err != nil {
104+
return nil, err
105+
}
106+
tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds))
107+
}
100108
var (
109+
grpcServer = grpc.NewServer(serverOpts...)
110+
hrpc = grpc.NewServer(tcpServerOpts...)
111+
101112
grpcServices []plugin.Service
113+
tcpServices []plugin.TCPService
102114
ttrpcServices []plugin.TTRPCService
103-
s = &Server{
115+
116+
s = &Server{
104117
grpcServer: grpcServer,
118+
hrpc: hrpc,
105119
ttrpcServer: ttrpcServer,
106120
events: exchange.NewExchange(),
107121
config: config,
@@ -159,6 +173,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
159173
if src, ok := instance.(plugin.TTRPCService); ok {
160174
ttrpcServices = append(ttrpcServices, src)
161175
}
176+
if service, ok := instance.(plugin.TCPService); ok {
177+
tcpServices = append(tcpServices, service)
178+
}
179+
162180
s.plugins = append(s.plugins, result)
163181
}
164182
if len(required) != 0 {
@@ -180,13 +198,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
180198
return nil, err
181199
}
182200
}
201+
for _, service := range tcpServices {
202+
if err := service.RegisterTCP(hrpc); err != nil {
203+
return nil, err
204+
}
205+
}
183206
return s, nil
184207
}
185208

186209
// Server is the containerd main daemon
187210
type Server struct {
188211
grpcServer *grpc.Server
189212
ttrpcServer *ttrpc.Server
213+
hrpc *grpc.Server
190214
events *exchange.Exchange
191215
config *srvconfig.Config
192216
plugins []*plugin.Plugin
@@ -217,6 +241,12 @@ func (s *Server) ServeMetrics(l net.Listener) error {
217241
return trapClosedConnErr(http.Serve(l, m))
218242
}
219243

244+
// ServeTCP allows services to serve over tcp
245+
func (s *Server) ServeTCP(l net.Listener) error {
246+
grpc_prometheus.Register(s.hrpc)
247+
return trapClosedConnErr(s.hrpc.Serve(l))
248+
}
249+
220250
// ServeDebug provides a debug endpoint
221251
func (s *Server) ServeDebug(l net.Listener) error {
222252
// don't use the default http server mux to make sure nothing gets registered

0 commit comments

Comments
 (0)