Skip to content

Commit fd31052

Browse files
committed
Add oom events to shim
Signed-off-by: Michael Crosby <[email protected]>
1 parent 7e49c60 commit fd31052

File tree

2 files changed

+139
-2
lines changed

2 files changed

+139
-2
lines changed

runtime/v2/runc/epoll.go

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// +build linux
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package runc
20+
21+
import (
22+
"context"
23+
"sync"
24+
25+
"github.com/containerd/cgroups"
26+
eventstypes "github.com/containerd/containerd/api/events"
27+
"github.com/containerd/containerd/events"
28+
"github.com/containerd/containerd/runtime"
29+
"github.com/sirupsen/logrus"
30+
"golang.org/x/sys/unix"
31+
)
32+
33+
func newOOMEpoller(publisher events.Publisher) (*epoller, error) {
34+
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
35+
if err != nil {
36+
return nil, err
37+
}
38+
return &epoller{
39+
fd: fd,
40+
publisher: publisher,
41+
set: make(map[uintptr]*item),
42+
}, nil
43+
}
44+
45+
type epoller struct {
46+
mu sync.Mutex
47+
48+
fd int
49+
publisher events.Publisher
50+
set map[uintptr]*item
51+
}
52+
53+
type item struct {
54+
id string
55+
cg cgroups.Cgroup
56+
}
57+
58+
func (e *epoller) Close() error {
59+
return unix.Close(e.fd)
60+
}
61+
62+
func (e *epoller) run(ctx context.Context) {
63+
var events [128]unix.EpollEvent
64+
for {
65+
n, err := unix.EpollWait(e.fd, events[:], -1)
66+
if err != nil {
67+
if err == unix.EINTR {
68+
continue
69+
}
70+
logrus.WithError(err).Error("cgroups: epoll wait")
71+
}
72+
for i := 0; i < n; i++ {
73+
e.process(ctx, uintptr(events[i].Fd))
74+
}
75+
}
76+
}
77+
78+
func (e *epoller) add(id string, cg cgroups.Cgroup) error {
79+
e.mu.Lock()
80+
defer e.mu.Unlock()
81+
fd, err := cg.OOMEventFD()
82+
if err != nil {
83+
return err
84+
}
85+
e.set[fd] = &item{
86+
id: id,
87+
cg: cg,
88+
}
89+
event := unix.EpollEvent{
90+
Fd: int32(fd),
91+
Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR,
92+
}
93+
return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
94+
}
95+
96+
func (e *epoller) process(ctx context.Context, fd uintptr) {
97+
flush(fd)
98+
e.mu.Lock()
99+
i, ok := e.set[fd]
100+
if !ok {
101+
e.mu.Unlock()
102+
return
103+
}
104+
e.mu.Unlock()
105+
if i.cg.State() == cgroups.Deleted {
106+
e.mu.Lock()
107+
delete(e.set, fd)
108+
e.mu.Unlock()
109+
unix.Close(int(fd))
110+
return
111+
}
112+
if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
113+
ContainerID: i.id,
114+
}); err != nil {
115+
logrus.WithError(err).Error("publish OOM event")
116+
}
117+
}
118+
119+
func flush(fd uintptr) error {
120+
var buf [8]byte
121+
_, err := unix.Read(int(fd), buf[:])
122+
return err
123+
}

runtime/v2/runc/service.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,18 @@ var _ = (taskAPI.TaskService)(&service{})
6464

6565
// New returns a new shim service that can be used via GRPC
6666
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
67+
ep, err := newOOMEpoller(publisher)
68+
if err != nil {
69+
return nil, err
70+
}
71+
go ep.run(ctx)
6772
s := &service{
6873
id: id,
6974
context: ctx,
7075
processes: make(map[string]rproc.Process),
7176
events: make(chan interface{}, 128),
7277
ec: shim.Default.Subscribe(),
78+
ep: ep,
7379
}
7480
go s.processExits()
7581
runcC.Monitor = shim.Default
@@ -90,6 +96,7 @@ type service struct {
9096
events chan interface{}
9197
platform rproc.Platform
9298
ec chan runcC.Exit
99+
ep *epoller
93100

94101
id string
95102
// Filled by Create()
@@ -293,7 +300,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
293300
if err != nil {
294301
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
295302
}
296-
s.cg = cg
303+
s.setCgroup(cg)
297304
}
298305
s.task = process
299306
return &taskAPI.CreateTaskResponse{
@@ -318,7 +325,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
318325
if err != nil {
319326
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
320327
}
321-
s.cg = cg
328+
s.setCgroup(cg)
322329
}
323330
return &taskAPI.StartResponse{
324331
Pid: uint32(p.Pid()),
@@ -708,6 +715,13 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
708715
return p, nil
709716
}
710717

718+
func (s *service) setCgroup(cg cgroups.Cgroup) {
719+
s.cg = cg
720+
if err := s.ep.add(s.id, cg); err != nil {
721+
logrus.WithError(err).Error("add cg to OOM monitor")
722+
}
723+
}
724+
711725
func getTopic(ctx context.Context, e interface{}) string {
712726
switch e.(type) {
713727
case *eventstypes.TaskCreate:

0 commit comments

Comments
 (0)