-
Notifications
You must be signed in to change notification settings - Fork 84
Expand file tree
/
Copy pathclient.py
More file actions
288 lines (237 loc) · 8.58 KB
/
client.py
File metadata and controls
288 lines (237 loc) · 8.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
#!/usr/bin/env python3
"""ACP client module for auth verification.
Spawns agent process and performs raw JSON-RPC handshake to verify auth methods.
No SDK dependency - just raw JSON parsing to preserve _meta fields.
"""
import json
import os
import subprocess
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class AuthMethod:
"""Auth method with type inferred from _meta."""
id: str
name: str
type: str | None = None
description: str | None = None
@dataclass
class AuthCheckResult:
"""Result of auth verification."""
success: bool
auth_methods: list[AuthMethod] = field(default_factory=list)
error: str | None = None
stderr_tail: str | None = None
duration_seconds: float | None = None
process_exit_code: int | None = None
def parse_auth_methods(auth_methods_raw: list[dict]) -> list[AuthMethod]:
"""Parse authMethods from initialize response.
Type detection priority:
1. Direct "type" field on the auth method
2. "_meta" keys: "terminal-auth" -> "terminal", "agent-auth" -> "agent"
3. Default to "agent" if not specified (per AUTHENTICATION.md)
"""
auth_methods = []
for method in auth_methods_raw:
# 1. Check direct "type" field
auth_type = method.get("type")
# 2. Check _meta for terminal-auth or agent-auth
if not auth_type:
meta = method.get("_meta", {})
if isinstance(meta, dict):
if "terminal-auth" in meta:
auth_type = "terminal"
elif "agent-auth" in meta:
auth_type = "agent"
# 3. Default to "agent" per AUTHENTICATION.md
if not auth_type:
auth_type = "agent"
auth_methods.append(
AuthMethod(
id=method.get("id", ""),
name=method.get("name", ""),
type=auth_type,
description=method.get("description"),
)
)
return auth_methods
def validate_auth_methods(auth_methods: list[AuthMethod]) -> tuple[bool, str]:
"""Validate that at least one auth method has type 'agent' or 'terminal'."""
if not auth_methods:
return False, "No authMethods in response"
valid_types = {"agent", "terminal"}
methods_with_valid_type = [m for m in auth_methods if m.type in valid_types]
if not methods_with_valid_type:
types_found = [m.type for m in auth_methods]
return False, f"No auth method with type 'agent' or 'terminal'. Found types: {types_found}"
return True, f"Found {len(methods_with_valid_type)} valid auth method(s)"
def send_jsonrpc(proc: subprocess.Popen, method: str, params: dict, msg_id: int = 1) -> None:
"""Send a JSON-RPC message to the process (raw JSON, newline-delimited)."""
request = {
"jsonrpc": "2.0",
"id": msg_id,
"method": method,
"params": params,
}
message = json.dumps(request) + "\n"
proc.stdin.write(message)
proc.stdin.flush()
def read_jsonrpc(proc: subprocess.Popen, timeout: float) -> dict | None:
"""Read a JSON-RPC response from the process (raw JSON, newline-delimited)."""
import select
ready, _, _ = select.select([proc.stdout], [], [], timeout)
if not ready:
return None
line = proc.stdout.readline()
if not line:
return None
try:
return json.loads(line)
except json.JSONDecodeError as e:
raise ValueError(
f"ACP spec violation: agent wrote non-JSON to stdout: {line.rstrip()!r}\n"
f"Per the ACP spec, agents MUST NOT write anything to stdout "
f"that is not a valid ACP message. "
f"Diagnostic output should go to stderr."
) from e
def _collect_proc_diagnostics(proc: subprocess.Popen) -> tuple[str | None, int | None]:
"""Collect stderr tail and exit code from a process (non-blocking).
Returns:
(stderr_tail, exit_code) — either may be None if unavailable.
"""
import select
exit_code = proc.poll()
stderr_tail: str | None = None
try:
ready, _, _ = select.select([proc.stderr], [], [], 0.5)
if ready:
data = proc.stderr.read(8192)
if data:
stderr_tail = data[-4000:]
except Exception:
pass
return stderr_tail, exit_code
def run_auth_check(
cmd: list[str],
cwd: Path,
env: dict[str, str] | None = None,
timeout: float = 60.0,
) -> AuthCheckResult:
"""Verify an agent supports ACP authentication.
Args:
cmd: Command to spawn the agent
cwd: Working directory for the agent process
env: Environment variables (HOME should be overridden for isolation)
timeout: Handshake timeout in seconds
Returns:
AuthCheckResult with success status and auth methods
"""
# Build isolated environment
full_env = os.environ.copy()
full_env["TERM"] = "dumb"
if env:
full_env.update(env)
# Use a temporary directory as HOME if not specified
if "HOME" not in (env or {}):
sandbox_home = tempfile.mkdtemp(prefix="acp-auth-check-")
full_env["HOME"] = sandbox_home
proc = None
t0 = time.monotonic()
try:
# Make binary executable if needed
exe_path = Path(cmd[0])
if exe_path.exists() and not os.access(exe_path, os.X_OK):
exe_path.chmod(exe_path.stat().st_mode | 0o755)
# Start agent process
proc = subprocess.Popen(
cmd,
cwd=cwd,
env=full_env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
)
# Send initialize request with capabilities
send_jsonrpc(
proc,
"initialize",
{
"protocolVersion": 1,
"clientInfo": {"name": "ACP Registry Validator", "version": "1.0.0"},
"clientCapabilities": {
"terminal": True,
"fs": {
"readTextFile": True,
"writeTextFile": True,
},
"_meta": {
"terminal_output": True,
"terminal-auth": True,
},
},
},
)
# Read response
response = read_jsonrpc(proc, timeout)
if response is None:
duration = time.monotonic() - t0
stderr_tail, exit_code = _collect_proc_diagnostics(proc)
return AuthCheckResult(
success=False,
error=f"Timeout after {timeout}s waiting for initialize response",
stderr_tail=stderr_tail,
duration_seconds=duration,
process_exit_code=exit_code,
)
if "error" in response:
duration = time.monotonic() - t0
stderr_tail, exit_code = _collect_proc_diagnostics(proc)
return AuthCheckResult(
success=False,
error=f"Agent error: {response['error']}",
stderr_tail=stderr_tail,
duration_seconds=duration,
process_exit_code=exit_code,
)
result = response.get("result", {})
auth_methods_raw = result.get("authMethods", [])
# Parse auth methods
auth_methods = parse_auth_methods(auth_methods_raw)
# Validate
is_valid, message = validate_auth_methods(auth_methods)
if is_valid:
return AuthCheckResult(
success=True,
auth_methods=auth_methods,
)
duration = time.monotonic() - t0
stderr_tail, exit_code = _collect_proc_diagnostics(proc)
return AuthCheckResult(
success=False,
auth_methods=auth_methods,
error=message,
stderr_tail=stderr_tail,
duration_seconds=duration,
process_exit_code=exit_code,
)
except Exception as e:
duration = time.monotonic() - t0
stderr_tail, exit_code = _collect_proc_diagnostics(proc) if proc else (None, None)
return AuthCheckResult(
success=False,
error=f"Error during auth check: {type(e).__name__}: {e}",
stderr_tail=stderr_tail,
duration_seconds=duration,
process_exit_code=exit_code,
)
finally:
if proc:
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()