Exemples
Cette page fournit des exemples de code prêts à l'emploi pour les patrons d'intégration ACP courants.
Prérequis
Tous les exemples de cette page ont été vérifiés avec l'environnement suivant :
| Dépendance | Version | Fonction |
|---|---|---|
| Python | 3.10 | Utilise les typages modernes (unions |, instructions match/case) |
agent-client-protocol | 0.9.0 | SDK Python ACP officiel (importé en tant que acp) |
httpx | 0.28.1 | Client HTTP asynchrone utilisé par l'exemple HTTP+SSE |
iac-code | dépôt actuel | Fournit la sous-commande iac-code acp utilisée par spawn_agent_process |
Installez les dépendances côté client avec uv :
# Create a Python 3.10 virtualenv managed by uv
uv venv --python 3.10
source .venv/bin/activate
# Install the pinned client-side dependencies into that venv
uv pip install "agent-client-protocol==0.9.0" "httpx>=0.28.1"
AllowedOutcome.outcome et DeniedOutcome.outcome sont typés respectivement comme Literal['selected'] et Literal['cancelled'] depuis le SDK 0.9.0. L'utilisation de toute autre chaîne lèvera une pydantic.ValidationError lors de la construction.
SDK Python -- Cycle de vie complet d'une session
Un exemple complet utilisant le SDK Python agent-client-protocol :
"""Full iac-code ACP session lifecycle."""
import asyncio
from typing import Any
import acp
import acp.schema
class MyClient(acp.Client):
"""ACP client with streaming output."""
async def session_update(
self,
session_id: str,
update: (
acp.schema.AgentMessageChunk
| acp.schema.AgentThoughtChunk
| acp.schema.ToolCallStart
| acp.schema.ToolCallProgress
| Any
),
**kwargs: Any,
) -> None:
match update:
case acp.schema.AgentThoughtChunk():
print(f"[thought] {update.content.text}", end="", flush=True)
case acp.schema.AgentMessageChunk():
print(f"{update.content.text}", end="", flush=True)
case acp.schema.ToolCallStart():
print(f"\n[tool] {update.title} (kind={update.kind})")
case acp.schema.ToolCallProgress():
status = update.status
print(f"[tool] {update.tool_call_id} → {status}")
async def request_permission(
self, options, session_id, tool_call, **kwargs
) -> acp.RequestPermissionResponse:
# Auto-approve for demonstration (use interactive approval in production)
return acp.RequestPermissionResponse(
outcome=acp.schema.AllowedOutcome(
outcome="selected", option_id="allow_once"
)
)
async def main():
async with acp.spawn_agent_process(MyClient(), "iac-code", "acp") as (conn, _):
# 1. Initialize
resp = await conn.initialize(
protocol_version=1,
client_info=acp.schema.Implementation(name="demo", version="1.0"),
)
print(f"Connected to {resp.agent_info.name} v{resp.agent_info.version}")
# 2. Create session
session = await conn.new_session(cwd="/path/to/project")
sid = session.session_id
# `models` is typed as Optional in the schema — guard against agents that don't report it.
current_model = session.models.current_model_id if session.models else "<unknown>"
print(f"Session: {sid}, model: {current_model}")
# 3. Send prompt
result = await conn.prompt(
session_id=sid,
prompt=[
acp.schema.TextContentBlock(
type="text",
text="Create a VPC with two subnets using a ROS template",
)
],
)
print(f"\nDone — stop_reason={result.stop_reason}")
# 4. Close session
await conn.close_session(session_id=sid)
if __name__ == "__main__":
asyncio.run(main())
SDK Python -- Gestion des permissions
Implémentation de l'approbation interactive des permissions :
import acp
import acp.schema
class InteractiveClient(acp.Client):
async def session_update(self, session_id, update, **kwargs):
if isinstance(update, acp.schema.AgentMessageChunk):
print(update.content.text, end="", flush=True)
async def request_permission(
self, options, session_id, tool_call, **kwargs
) -> acp.RequestPermissionResponse:
print(f"\n⚠️ Permission request: {tool_call.title}")
print(f" Tool kind: {tool_call.kind}")
# Show available options
for opt in options:
print(f" [{opt.option_id}] {opt.name}")
choice = input("Choose (allow_once/reject_once): ").strip()
if choice.startswith("allow"):
return acp.RequestPermissionResponse(
outcome=acp.schema.AllowedOutcome(
outcome="selected",
option_id=choice,
)
)
else:
return acp.RequestPermissionResponse(
outcome=acp.schema.DeniedOutcome(outcome="cancelled")
)
Utilisez InteractiveClient de la même manière que MyClient ci-dessus -- passez une instance à spawn_agent_process, pas la classe :
async with acp.spawn_agent_process(InteractiveClient(), "iac-code", "acp") as (conn, _):
...
Passer la classe directement lève une TypeError: __init__() takes exactly one argument parce que acp.Client est un typing.Protocol dont le __init__ par défaut rejette les arguments positionnels.
Stratégies de permission par environnement :
| Environnement | Stratégie |
|---|---|
| Développement | Auto-approuver tout |
| Production | Approbation interactive pour les outils d'écriture/exécution |
| CI/CD | Autoriser la lecture seule, refuser l'écriture/exécution |
SDK Python -- Événements en streaming
Traitement des différents types d'événements avec une gestion détaillée :
import acp
import acp.schema
class StreamingClient(acp.Client):
def __init__(self):
self.tool_calls: dict[str, str] = {} # tool_call_id → title
async def session_update(self, session_id, update, **kwargs):
match update:
case acp.schema.AgentThoughtChunk():
# Model's internal reasoning (dimmed in UI typically)
print(f" 💭 {update.content.text}", end="", flush=True)
case acp.schema.AgentMessageChunk():
# Final response text shown to user
print(update.content.text, end="", flush=True)
case acp.schema.ToolCallStart():
self.tool_calls[update.tool_call_id] = update.title
print(f"\n 🔧 [{update.kind}] {update.title}")
case acp.schema.ToolCallProgress():
title = self.tool_calls.get(update.tool_call_id, "unknown")
if update.status == "completed":
print(f" ✅ {title} completed")
elif update.status == "failed":
print(f" ❌ {title} failed")
if update.raw_output:
print(f" Error: {str(update.raw_output)[:200]}")
else:
print(f" ⏳ {title} in progress...")
case acp.schema.UsageUpdate():
# UsageUpdate reports context-window usage in tokens.
# Fields: used (current context tokens), size (total window), cost (optional).
print(f"\n 📊 Context: {update.used}/{update.size} tokens")
async def request_permission(self, options, session_id, tool_call, **kwargs):
return acp.RequestPermissionResponse(
outcome=acp.schema.AllowedOutcome(
outcome="selected", option_id="allow_once"
)
)
StreamingClient définit __init__(self) sans arguments pour initialiser l'état interne. Lors du branchement, passez toujours une instance -- spawn_agent_process(StreamingClient(), "iac-code", "acp") -- jamais la classe elle-même.
HTTP+SSE -- Client minimal
Pour les environnements où vous ne pouvez pas utiliser le SDK Python, connectez-vous directement via HTTP+SSE :
"""Minimal HTTP+SSE client using httpx."""
import asyncio
import httpx
BASE_URL = "http://127.0.0.1:8765"
HEADERS = {"Authorization": "Bearer YOUR_TOKEN"}
async def main():
async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
# 1. Initialize — get connection ID
resp = await client.post("/acp", json={
"jsonrpc": "2.0", "id": 1,
"method": "initialize",
"params": {
"protocolVersion": 1,
"clientInfo": {"name": "http-client", "version": "1.0"},
"capabilities": {}
}
}, headers=HEADERS)
resp.raise_for_status()
conn_id = resp.headers["Acp-Connection-Id"]
print(f"Connection ID: {conn_id}")
session_headers = {**HEADERS, "Acp-Connection-Id": conn_id}
# 2. Subscribe to SSE stream (background)
async def listen_sse():
async with client.stream("GET", "/acp", headers=session_headers) as stream:
async for line in stream.aiter_lines():
if line.startswith("data:"):
print(f"[SSE] {line[5:].strip()}")
sse_task = asyncio.create_task(listen_sse())
# 3. Create session (response arrives via SSE)
resp = await client.post("/acp", json={
"jsonrpc": "2.0", "id": 2,
"method": "session/new",
"params": {"cwd": "/workspace"}
}, headers=session_headers)
# Returns 202 Accepted; actual result delivered via SSE
await asyncio.sleep(2) # Wait for session creation
# 4. Send prompt
await client.post("/acp", json={
"jsonrpc": "2.0", "id": 3,
"method": "session/prompt",
"params": {
"sessionId": "<session-id-from-sse>",
"prompt": [{"type": "text", "text": "List files in current directory"}]
}
}, headers=session_headers)
await asyncio.sleep(10) # Wait for streaming response
sse_task.cancel()
# 5. Close connection
await client.request("DELETE", "/acp", headers=session_headers)
print("Connection closed")
if __name__ == "__main__":
asyncio.run(main())
Points clés :
POST /acpavecmethod: "initialize"retourneAcp-Connection-Iddans les en-têtes de réponse- Toutes les requêtes suivantes doivent inclure les en-têtes
AuthorizationetAcp-Connection-Id POST /acpretourne202 Accepted; les réponses réelles sont livrées via le flux SSEGET /acpouvre le flux SSE pour recevoir les événements poussés par le serveurDELETE /acpferme la connexion et libère les ressources du serveur
Patrons de gestion de sessions
Dupliquer pour expérimenter
Créez une branche à partir d'une session existante pour essayer différentes approches sans affecter l'originale :
async def fork_and_experiment(conn, original_session_id: str, cwd: str):
"""Fork a session to experiment without affecting the original."""
# Fork creates a copy with the same history
forked = await conn.fork_session(
session_id=original_session_id,
cwd=cwd,
)
forked_sid = forked.session_id
print(f"Forked session: {forked_sid}")
# Experiment on the fork
result = await conn.prompt(
session_id=forked_sid,
prompt=[acp.schema.TextContentBlock(
type="text",
text="Try an alternative approach: use Terraform instead of ROS",
)],
)
# Close fork when done (original session is unaffected)
await conn.close_session(session_id=forked_sid)
return result
Charger et reprendre des sessions historiques
Restaurez une session précédente pour reprendre là où vous vous étiez arrêté :
async def resume_previous_session(conn, cwd: str):
"""List sessions and resume the most recent one."""
# List available sessions
listing = await conn.list_sessions(cwd=cwd)
if not listing.sessions:
print("No previous sessions found")
return None
# Resume the first session
target = listing.sessions[0]
print(f"Resuming session: {target.session_id}")
session = await conn.resume_session(
session_id=target.session_id,
cwd=cwd,
)
return session.session_id
Multi-sessions parallèles
Exécutez plusieurs tâches indépendantes simultanément :
async def parallel_tasks(conn, cwd: str, prompts: list[str]):
"""Run multiple prompts in parallel sessions."""
sessions = []
# Create sessions
for _ in prompts:
s = await conn.new_session(cwd=cwd)
sessions.append(s.session_id)
# Run prompts concurrently
tasks = [
conn.prompt(
session_id=sid,
prompt=[acp.schema.TextContentBlock(type="text", text=text)],
)
for sid, text in zip(sessions, prompts)
]
results = await asyncio.gather(*tasks)
# Cleanup
for sid in sessions:
await conn.close_session(session_id=sid)
return results
# Usage
# results = await parallel_tasks(conn, "/workspace", [
# "Create a VPC template",
# "Create a security group template",
# "Create an ECS instance template",
# ])
Chaque session maintient une connexion LLM. L'exécution d'un trop grand nombre de sessions parallèles peut déclencher des limites de taux d'API.