Skip to content

Commit 5fa6d54

Browse files
authored
feat: add memory pressure watchdog with runtime monitoring (#216)
Signed-off-by: Christopher Maher <[email protected]> Signed-off-by: Christopher Maher <[email protected]>
1 parent fd4b7ab commit 5fa6d54

File tree

10 files changed

+613
-29
lines changed

10 files changed

+613
-29
lines changed

cmd/metal-agent/main.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@ var (
4545
)
4646

4747
type AgentConfig struct {
48-
Namespace string
49-
ModelStorePath string
50-
LlamaServerBin string
51-
Port int
52-
LogLevel string
53-
HostIP string
54-
MemoryFraction float64
48+
Namespace string
49+
ModelStorePath string
50+
LlamaServerBin string
51+
Port int
52+
LogLevel string
53+
HostIP string
54+
MemoryFraction float64
55+
WatchdogInterval time.Duration
56+
MemoryPressureWarning float64
57+
MemoryPressureCritical float64
58+
EvictionEnabled bool
5559
}
5660

5761
func parseLogLevel(level string) zapcore.Level {
@@ -115,6 +119,14 @@ func main() {
115119
flag.StringVar(&cfg.HostIP, "host-ip", "", "IP address to register in Kubernetes endpoints (auto-detected if empty)")
116120
flag.Float64Var(&cfg.MemoryFraction, "memory-fraction", 0,
117121
"Fraction of system memory to budget for models (0 = auto-detect based on total RAM)")
122+
flag.DurationVar(&cfg.WatchdogInterval, "memory-watchdog-interval", 10*time.Second,
123+
"How often to check memory pressure (0 to disable)")
124+
flag.Float64Var(&cfg.MemoryPressureWarning, "memory-pressure-warning", 0.20,
125+
"Available memory fraction below which a warning is emitted")
126+
flag.Float64Var(&cfg.MemoryPressureCritical, "memory-pressure-critical", 0.10,
127+
"Available memory fraction below which pressure is critical")
128+
flag.BoolVar(&cfg.EvictionEnabled, "eviction-enabled", false,
129+
"Enable automatic process eviction under critical memory pressure")
118130
showVersion := flag.Bool("version", false, "Show version information")
119131
flag.Parse()
120132

@@ -209,7 +221,7 @@ func main() {
209221

210222
// Create agent
211223
logger.Infow("creating Metal agent")
212-
metalAgent := agent.NewMetalAgent(agent.MetalAgentConfig{
224+
agentCfg := agent.MetalAgentConfig{
213225
K8sClient: k8sClient,
214226
Namespace: cfg.Namespace,
215227
ModelStorePath: cfg.ModelStorePath,
@@ -218,7 +230,20 @@ func main() {
218230
HostIP: cfg.HostIP,
219231
Logger: logger,
220232
MemoryFraction: cfg.MemoryFraction,
221-
})
233+
}
234+
if cfg.WatchdogInterval > 0 {
235+
agentCfg.WatchdogConfig = &agent.MemoryWatchdogConfig{
236+
Interval: cfg.WatchdogInterval,
237+
WarningThreshold: cfg.MemoryPressureWarning,
238+
CriticalThreshold: cfg.MemoryPressureCritical,
239+
}
240+
logger.Infow("memory watchdog enabled",
241+
"interval", cfg.WatchdogInterval,
242+
"warningThreshold", cfg.MemoryPressureWarning,
243+
"criticalThreshold", cfg.MemoryPressureCritical,
244+
)
245+
}
246+
metalAgent := agent.NewMetalAgent(agentCfg)
222247

223248
// Setup context with signal handling
224249
ctx, cancel := context.WithCancel(context.Background())

pkg/agent/agent.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type MetalAgentConfig struct {
4646
MemoryProvider MemoryProvider
4747
// MemoryFraction is the fraction of total memory to budget for models (0 = auto-detect).
4848
MemoryFraction float64
49+
50+
// WatchdogConfig configures the memory pressure watchdog. Nil disables it.
51+
WatchdogConfig *MemoryWatchdogConfig
4952
}
5053

5154
// MetalAgent watches Kubernetes InferenceService resources and manages
@@ -150,6 +153,18 @@ func (a *MetalAgent) Start(ctx context.Context) error {
150153
)
151154
go monitor.Run(ctx)
152155

156+
// Start memory watchdog (if configured)
157+
if a.config.WatchdogConfig != nil {
158+
watchdog := NewMemoryWatchdog(
159+
a.memoryProvider,
160+
a.processMemInfoSnapshot,
161+
nil, // observe-only in PR A; eviction callback added in PR B
162+
*a.config.WatchdogConfig,
163+
a.logger.With("subsystem", "watchdog"),
164+
)
165+
go watchdog.Run(ctx)
166+
}
167+
153168
// Start watcher
154169
eventChan := make(chan InferenceServiceEvent)
155170
go func() {
@@ -416,6 +431,21 @@ func (a *MetalAgent) Shutdown(ctx context.Context) error {
416431
return nil
417432
}
418433

434+
// processMemInfoSnapshot returns a snapshot of process names and PIDs for the watchdog.
435+
func (a *MetalAgent) processMemInfoSnapshot() []processMemInfo {
436+
a.mu.RLock()
437+
defer a.mu.RUnlock()
438+
439+
infos := make([]processMemInfo, 0, len(a.processes))
440+
for _, p := range a.processes {
441+
infos = append(infos, processMemInfo{
442+
Name: p.Name,
443+
PID: p.PID,
444+
})
445+
}
446+
return infos
447+
}
448+
419449
// HealthCheck returns the health status of all managed processes
420450
func (a *MetalAgent) HealthCheck() map[string]bool {
421451
a.mu.RLock()

pkg/agent/agentmetrics.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,42 @@ var (
7373
},
7474
[]string{"name", "namespace"},
7575
)
76+
77+
systemMemoryAvailableBytes = prometheus.NewGauge(
78+
prometheus.GaugeOpts{
79+
Name: "llmkube_metal_agent_system_memory_available_bytes",
80+
Help: "Available system memory in bytes.",
81+
},
82+
)
83+
84+
systemMemoryWiredBytes = prometheus.NewGauge(
85+
prometheus.GaugeOpts{
86+
Name: "llmkube_metal_agent_system_memory_wired_bytes",
87+
Help: "Wired (non-pageable) system memory in bytes.",
88+
},
89+
)
90+
91+
processRSSBytes = prometheus.NewGaugeVec(
92+
prometheus.GaugeOpts{
93+
Name: "llmkube_metal_agent_process_rss_bytes",
94+
Help: "Actual resident set size per managed process in bytes.",
95+
},
96+
[]string{"name"},
97+
)
98+
99+
memoryPressureLevelGauge = prometheus.NewGauge(
100+
prometheus.GaugeOpts{
101+
Name: "llmkube_metal_agent_memory_pressure_level",
102+
Help: "Current memory pressure level: 0=normal, 1=warning, 2=critical.",
103+
},
104+
)
105+
106+
evictionsTotal = prometheus.NewCounter(
107+
prometheus.CounterOpts{
108+
Name: "llmkube_metal_agent_evictions_total",
109+
Help: "Total number of process eviction events triggered by memory pressure.",
110+
},
111+
)
76112
)
77113

78114
func init() {
@@ -85,5 +121,10 @@ func init() {
85121
healthCheckDuration,
86122
memoryBudgetBytes,
87123
memoryEstimatedBytes,
124+
systemMemoryAvailableBytes,
125+
systemMemoryWiredBytes,
126+
processRSSBytes,
127+
memoryPressureLevelGauge,
128+
evictionsTotal,
88129
)
89130
}

pkg/agent/agentmetrics_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ func TestAgentMetricsRegistered(t *testing.T) {
3434
{"llmkube_metal_agent_health_check_duration_seconds", healthCheckDuration},
3535
{"llmkube_metal_agent_memory_budget_bytes", memoryBudgetBytes},
3636
{"llmkube_metal_agent_memory_estimated_bytes", memoryEstimatedBytes},
37+
{"llmkube_metal_agent_system_memory_available_bytes", systemMemoryAvailableBytes},
38+
{"llmkube_metal_agent_system_memory_wired_bytes", systemMemoryWiredBytes},
39+
{"llmkube_metal_agent_process_rss_bytes", processRSSBytes},
40+
{"llmkube_metal_agent_memory_pressure_level", memoryPressureLevelGauge},
41+
{"llmkube_metal_agent_evictions_total", evictionsTotal},
3742
}
3843

3944
for _, c := range collectors {

pkg/agent/memory.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,32 @@ import (
2727
inferencev1alpha1 "github.com/defilantech/llmkube/api/v1alpha1"
2828
)
2929

30+
// MemoryPressureLevel represents the severity of memory pressure.
31+
type MemoryPressureLevel int
32+
33+
const (
34+
MemoryPressureNormal MemoryPressureLevel = 0
35+
MemoryPressureWarning MemoryPressureLevel = 1
36+
MemoryPressureCritical MemoryPressureLevel = 2
37+
)
38+
39+
func (l MemoryPressureLevel) String() string {
40+
switch l {
41+
case MemoryPressureWarning:
42+
return "warning"
43+
case MemoryPressureCritical:
44+
return "critical"
45+
default:
46+
return "normal"
47+
}
48+
}
49+
3050
// MemoryProvider abstracts system memory queries for testability.
3151
type MemoryProvider interface {
3252
TotalMemory() (uint64, error)
3353
AvailableMemory() (uint64, error)
54+
WiredMemory() (uint64, error)
55+
ProcessRSS(pid int) (uint64, error)
3456
}
3557

3658
// MemoryEstimate holds the estimated memory requirements for a model.

pkg/agent/memory_darwin.go

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,39 +50,77 @@ func (p *DarwinMemoryProvider) AvailableMemory() (uint64, error) {
5050
return 0, fmt.Errorf("vm_stat: %w", err)
5151
}
5252

53-
lines := strings.Split(string(out), "\n")
53+
pageSize, bodyLines := parseVMStatHeader(string(out))
54+
55+
var freePages, inactivePages uint64
56+
for _, line := range bodyLines {
57+
line = strings.TrimSpace(line)
58+
if strings.HasPrefix(line, "Pages free:") {
59+
freePages = parseVMStatValue(line)
60+
} else if strings.HasPrefix(line, "Pages inactive:") {
61+
inactivePages = parseVMStatValue(line)
62+
}
63+
}
64+
65+
return (freePages + inactivePages) * pageSize, nil
66+
}
67+
68+
// WiredMemory returns the amount of wired (non-pageable) memory by parsing vm_stat.
69+
func (p *DarwinMemoryProvider) WiredMemory() (uint64, error) {
70+
out, err := exec.Command("vm_stat").Output()
71+
if err != nil {
72+
return 0, fmt.Errorf("vm_stat: %w", err)
73+
}
74+
75+
pageSize, lines := parseVMStatHeader(string(out))
76+
for _, line := range lines {
77+
line = strings.TrimSpace(line)
78+
if strings.HasPrefix(line, "Pages wired down:") {
79+
return parseVMStatValue(line) * pageSize, nil
80+
}
81+
}
82+
return 0, fmt.Errorf("vm_stat: 'Pages wired down' not found")
83+
}
84+
85+
// ProcessRSS returns the resident set size of a process in bytes.
86+
func (p *DarwinMemoryProvider) ProcessRSS(pid int) (uint64, error) {
87+
out, err := exec.Command("ps", "-o", "rss=", "-p",
88+
strconv.Itoa(pid)).Output()
89+
if err != nil {
90+
return 0, fmt.Errorf("ps rss for pid %d: %w", pid, err)
91+
}
92+
// ps reports RSS in kilobytes
93+
kb, err := strconv.ParseUint(strings.TrimSpace(string(out)), 10, 64)
94+
if err != nil {
95+
return 0, fmt.Errorf(
96+
"parse rss for pid %d: %w", pid, err)
97+
}
98+
return kb * 1024, nil
99+
}
100+
101+
// parseVMStatHeader extracts the page size and body lines from vm_stat output.
102+
func parseVMStatHeader(output string) (uint64, []string) {
103+
lines := strings.Split(output, "\n")
54104
if len(lines) == 0 {
55-
return 0, fmt.Errorf("vm_stat: empty output")
105+
return 16384, nil
56106
}
57107

58-
// First line contains page size: "Mach Virtual Memory Statistics: (page size of 16384 bytes)"
59108
var pageSize uint64
60109
firstLine := lines[0]
61110
if idx := strings.Index(firstLine, "page size of "); idx >= 0 {
62111
sizeStr := firstLine[idx+len("page size of "):]
63112
if endIdx := strings.Index(sizeStr, " "); endIdx >= 0 {
64113
sizeStr = sizeStr[:endIdx]
65114
}
66-
pageSize, err = strconv.ParseUint(sizeStr, 10, 64)
67-
if err != nil {
68-
return 0, fmt.Errorf("parse page size %q: %w", sizeStr, err)
115+
parsed, err := strconv.ParseUint(sizeStr, 10, 64)
116+
if err == nil {
117+
pageSize = parsed
69118
}
70119
}
71120
if pageSize == 0 {
72121
pageSize = 16384 // default to 16KB for Apple Silicon
73122
}
74-
75-
var freePages, inactivePages uint64
76-
for _, line := range lines[1:] {
77-
line = strings.TrimSpace(line)
78-
if strings.HasPrefix(line, "Pages free:") {
79-
freePages = parseVMStatValue(line)
80-
} else if strings.HasPrefix(line, "Pages inactive:") {
81-
inactivePages = parseVMStatValue(line)
82-
}
83-
}
84-
85-
return (freePages + inactivePages) * pageSize, nil
123+
return pageSize, lines[1:]
86124
}
87125

88126
// parseVMStatValue extracts the numeric value from a vm_stat line like "Pages free: 123456."

pkg/agent/memory_other.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,11 @@ func (p *DarwinMemoryProvider) TotalMemory() (uint64, error) {
3333
func (p *DarwinMemoryProvider) AvailableMemory() (uint64, error) {
3434
return 0, fmt.Errorf("DarwinMemoryProvider not supported on %s", runtime.GOOS)
3535
}
36+
37+
func (p *DarwinMemoryProvider) WiredMemory() (uint64, error) {
38+
return 0, fmt.Errorf("DarwinMemoryProvider not supported on %s", runtime.GOOS)
39+
}
40+
41+
func (p *DarwinMemoryProvider) ProcessRSS(_ int) (uint64, error) {
42+
return 0, fmt.Errorf("DarwinMemoryProvider not supported on %s", runtime.GOOS)
43+
}

pkg/agent/memory_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525
)
2626

2727
type mockMemoryProvider struct {
28-
totalBytes, availableBytes uint64
29-
totalErr, availableErr error
28+
totalBytes, availableBytes, wiredBytes uint64
29+
totalErr, availableErr, wiredErr error
30+
processRSS map[int]uint64
31+
processRSSErr error
3032
}
3133

3234
func (m *mockMemoryProvider) TotalMemory() (uint64, error) {
@@ -37,6 +39,22 @@ func (m *mockMemoryProvider) AvailableMemory() (uint64, error) {
3739
return m.availableBytes, m.availableErr
3840
}
3941

42+
func (m *mockMemoryProvider) WiredMemory() (uint64, error) {
43+
return m.wiredBytes, m.wiredErr
44+
}
45+
46+
func (m *mockMemoryProvider) ProcessRSS(pid int) (uint64, error) {
47+
if m.processRSSErr != nil {
48+
return 0, m.processRSSErr
49+
}
50+
if m.processRSS != nil {
51+
if v, ok := m.processRSS[pid]; ok {
52+
return v, nil
53+
}
54+
}
55+
return 0, fmt.Errorf("no RSS for pid %d", pid)
56+
}
57+
4058
func TestEstimateModelMemory_WithFullMetadata(t *testing.T) {
4159
// Llama 8B: 32 layers, 4096 embedding, 2048 context, ~4.5 GiB file
4260
fileSize := uint64(4831838208) // ~4.5 GiB

0 commit comments

Comments
 (0)