Learn Claude Code
Back to Agent Teams

Background TasksAgent Teams

s08 (198 LOC) → s09 (348 LOC)

LOC Delta

+150lines

New Tools

5

send_messageread_inboxspawn_teammatelist_teammatesbroadcast
New Classes

2

MessageBusTeammateManager
New Functions

5

_safe_path_run_bash_run_read_run_write_run_edit

Background Tasks

Background Threads + Notifications

198 LOC

6 tools: bash, read_file, write_file, edit_file, background_run, check_background

concurrency

Agent Teams

Teammates + Mailboxes

348 LOC

9 tools: bash, read_file, write_file, edit_file, send_message, read_inbox, spawn_teammate, list_teammates, broadcast

collaboration

Source Code Diff

s08 (s08_background_tasks.py) -> s09 (s09_agent_teams.py)
11#!/usr/bin/env python3
2-# Harness: background execution -- the model thinks while the harness waits.
2+# Harness: team mailboxes -- multiple models, coordinated through files.
33"""
4-s08_background_tasks.py - Background Tasks
4+s09_agent_teams.py - Agent Teams
55
6-Run commands in background threads. A notification queue is drained
7-before each LLM call to deliver results.
6+Persistent named agents with file-based JSONL inboxes. Each teammate runs
7+its own agent loop in a separate thread. Communication via append-only inboxes.
88
9- Main thread Background thread
10- +-----------------+ +-----------------+
11- | agent loop | | task executes |
12- | ... | | ... |
13- | [LLM call] <---+------- | enqueue(result) |
14- | ^drain queue | +-----------------+
15- +-----------------+
9+ Subagent (s04): spawn -> execute -> return summary -> destroyed
10+ Teammate (s09): spawn -> work -> idle -> work -> ... -> shutdown
1611
17- Timeline:
18- Agent ----[spawn A]----[spawn B]----[other work]----
19- | |
20- v v
21- [A runs] [B runs] (parallel)
22- | |
23- +-- notification queue --> [results injected]
12+ .team/config.json .team/inbox/
13+ +----------------------------+ +------------------+
14+ | {"team_name": "default", | | alice.jsonl |
15+ | "members": [ | | bob.jsonl |
16+ | {"name":"alice", | | lead.jsonl |
17+ | "role":"coder", | +------------------+
18+ | "status":"idle"} |
19+ | ]} | send_message("alice", "fix bug"):
20+ +----------------------------+ open("alice.jsonl", "a").write(msg)
2421
25-Key insight: "Fire and forget -- the agent doesn't block while the command runs."
22+ read_inbox("alice"):
23+ spawn_teammate("alice","coder",...) msgs = [json.loads(l) for l in ...]
24+ | open("alice.jsonl", "w").close()
25+ v return msgs # drain
26+ Thread: alice Thread: bob
27+ +------------------+ +------------------+
28+ | agent_loop | | agent_loop |
29+ | status: working | | status: idle |
30+ | ... runs tools | | ... waits ... |
31+ | status -> idle | | |
32+ +------------------+ +------------------+
33+
34+ 5 message types (all declared, not all handled here):
35+ +-------------------------+-----------------------------------+
36+ | message | Normal text message |
37+ | broadcast | Sent to all teammates |
38+ | shutdown_request | Request graceful shutdown (s10) |
39+ | shutdown_response | Approve/reject shutdown (s10) |
40+ | plan_approval_response | Approve/reject plan (s10) |
41+ +-------------------------+-----------------------------------+
42+
43+Key insight: "Teammates that can talk to each other."
2644"""
2745
46+import json
2847import os
2948import subprocess
3049import threading
31-import uuid
50+import time
3251from pathlib import Path
3352
3453from anthropic import Anthropic
3554from dotenv import load_dotenv
3655
3756load_dotenv(override=True)
38-
3957if os.getenv("ANTHROPIC_BASE_URL"):
4058 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
4159
4260WORKDIR = Path.cwd()
4361client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
4462MODEL = os.environ["MODEL_ID"]
63+TEAM_DIR = WORKDIR / ".team"
64+INBOX_DIR = TEAM_DIR / "inbox"
4565
46-SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."
66+SYSTEM = f"You are a team lead at {WORKDIR}. Spawn teammates and communicate via inboxes."
4767
68+VALID_MSG_TYPES = {
69+ "message",
70+ "broadcast",
71+ "shutdown_request",
72+ "shutdown_response",
73+ "plan_approval_response",
74+}
4875
49-# -- BackgroundManager: threaded execution + notification queue --
50-class BackgroundManager:
51- def __init__(self):
52- self.tasks = {} # task_id -> {status, result, command}
53- self._notification_queue = [] # completed task results
54- self._lock = threading.Lock()
5576
56- def run(self, command: str) -> str:
57- """Start a background thread, return task_id immediately."""
58- task_id = str(uuid.uuid4())[:8]
59- self.tasks[task_id] = {"status": "running", "result": None, "command": command}
77+# -- MessageBus: JSONL inbox per teammate --
78+class MessageBus:
79+ def __init__(self, inbox_dir: Path):
80+ self.dir = inbox_dir
81+ self.dir.mkdir(parents=True, exist_ok=True)
82+
83+ def send(self, sender: str, to: str, content: str,
84+ msg_type: str = "message", extra: dict = None) -> str:
85+ if msg_type not in VALID_MSG_TYPES:
86+ return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
87+ msg = {
88+ "type": msg_type,
89+ "from": sender,
90+ "content": content,
91+ "timestamp": time.time(),
92+ }
93+ if extra:
94+ msg.update(extra)
95+ inbox_path = self.dir / f"{to}.jsonl"
96+ with open(inbox_path, "a") as f:
97+ f.write(json.dumps(msg) + "\n")
98+ return f"Sent {msg_type} to {to}"
99+
100+ def read_inbox(self, name: str) -> list:
101+ inbox_path = self.dir / f"{name}.jsonl"
102+ if not inbox_path.exists():
103+ return []
104+ messages = []
105+ for line in inbox_path.read_text().strip().splitlines():
106+ if line:
107+ messages.append(json.loads(line))
108+ inbox_path.write_text("")
109+ return messages
110+
111+ def broadcast(self, sender: str, content: str, teammates: list) -> str:
112+ count = 0
113+ for name in teammates:
114+ if name != sender:
115+ self.send(sender, name, content, "broadcast")
116+ count += 1
117+ return f"Broadcast to {count} teammates"
118+
119+
120+BUS = MessageBus(INBOX_DIR)
121+
122+
123+# -- TeammateManager: persistent named agents with config.json --
124+class TeammateManager:
125+ def __init__(self, team_dir: Path):
126+ self.dir = team_dir
127+ self.dir.mkdir(exist_ok=True)
128+ self.config_path = self.dir / "config.json"
129+ self.config = self._load_config()
130+ self.threads = {}
131+
132+ def _load_config(self) -> dict:
133+ if self.config_path.exists():
134+ return json.loads(self.config_path.read_text())
135+ return {"team_name": "default", "members": []}
136+
137+ def _save_config(self):
138+ self.config_path.write_text(json.dumps(self.config, indent=2))
139+
140+ def _find_member(self, name: str) -> dict:
141+ for m in self.config["members"]:
142+ if m["name"] == name:
143+ return m
144+ return None
145+
146+ def spawn(self, name: str, role: str, prompt: str) -> str:
147+ member = self._find_member(name)
148+ if member:
149+ if member["status"] not in ("idle", "shutdown"):
150+ return f"Error: '{name}' is currently {member['status']}"
151+ member["status"] = "working"
152+ member["role"] = role
153+ else:
154+ member = {"name": name, "role": role, "status": "working"}
155+ self.config["members"].append(member)
156+ self._save_config()
60157 thread = threading.Thread(
61- target=self._execute, args=(task_id, command), daemon=True
158+ target=self._teammate_loop,
159+ args=(name, role, prompt),
160+ daemon=True,
62161 )
162+ self.threads[name] = thread
63163 thread.start()
64- return f"Background task {task_id} started: {command[:80]}"
164+ return f"Spawned '{name}' (role: {role})"
65165
66- def _execute(self, task_id: str, command: str):
67- """Thread target: run subprocess, capture output, push to queue."""
68- try:
69- r = subprocess.run(
70- command, shell=True, cwd=WORKDIR,
71- capture_output=True, text=True, timeout=300
72- )
73- output = (r.stdout + r.stderr).strip()[:50000]
74- status = "completed"
75- except subprocess.TimeoutExpired:
76- output = "Error: Timeout (300s)"
77- status = "timeout"
78- except Exception as e:
79- output = f"Error: {e}"
80- status = "error"
81- self.tasks[task_id]["status"] = status
82- self.tasks[task_id]["result"] = output or "(no output)"
83- with self._lock:
84- self._notification_queue.append({
85- "task_id": task_id,
86- "status": status,
87- "command": command[:80],
88- "result": (output or "(no output)")[:500],
89- })
166+ def _teammate_loop(self, name: str, role: str, prompt: str):
167+ sys_prompt = (
168+ f"You are '{name}', role: {role}, at {WORKDIR}. "
169+ f"Use send_message to communicate. Complete your task."
170+ )
171+ messages = [{"role": "user", "content": prompt}]
172+ tools = self._teammate_tools()
173+ for _ in range(50):
174+ inbox = BUS.read_inbox(name)
175+ for msg in inbox:
176+ messages.append({"role": "user", "content": json.dumps(msg)})
177+ try:
178+ response = client.messages.create(
179+ model=MODEL,
180+ system=sys_prompt,
181+ messages=messages,
182+ tools=tools,
183+ max_tokens=8000,
184+ )
185+ except Exception:
186+ break
187+ messages.append({"role": "assistant", "content": response.content})
188+ if response.stop_reason != "tool_use":
189+ break
190+ results = []
191+ for block in response.content:
192+ if block.type == "tool_use":
193+ output = self._exec(name, block.name, block.input)
194+ print(f" [{name}] {block.name}: {str(output)[:120]}")
195+ results.append({
196+ "type": "tool_result",
197+ "tool_use_id": block.id,
198+ "content": str(output),
199+ })
200+ messages.append({"role": "user", "content": results})
201+ member = self._find_member(name)
202+ if member and member["status"] != "shutdown":
203+ member["status"] = "idle"
204+ self._save_config()
90205
91- def check(self, task_id: str = None) -> str:
92- """Check status of one task or list all."""
93- if task_id:
94- t = self.tasks.get(task_id)
95- if not t:
96- return f"Error: Unknown task {task_id}"
97- return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
98- lines = []
99- for tid, t in self.tasks.items():
100- lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
101- return "\n".join(lines) if lines else "No background tasks."
206+ def _exec(self, sender: str, tool_name: str, args: dict) -> str:
207+ # these base tools are unchanged from s02
208+ if tool_name == "bash":
209+ return _run_bash(args["command"])
210+ if tool_name == "read_file":
211+ return _run_read(args["path"])
212+ if tool_name == "write_file":
213+ return _run_write(args["path"], args["content"])
214+ if tool_name == "edit_file":
215+ return _run_edit(args["path"], args["old_text"], args["new_text"])
216+ if tool_name == "send_message":
217+ return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
218+ if tool_name == "read_inbox":
219+ return json.dumps(BUS.read_inbox(sender), indent=2)
220+ return f"Unknown tool: {tool_name}"
102221
103- def drain_notifications(self) -> list:
104- """Return and clear all pending completion notifications."""
105- with self._lock:
106- notifs = list(self._notification_queue)
107- self._notification_queue.clear()
108- return notifs
222+ def _teammate_tools(self) -> list:
223+ # these base tools are unchanged from s02
224+ return [
225+ {"name": "bash", "description": "Run a shell command.",
226+ "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
227+ {"name": "read_file", "description": "Read file contents.",
228+ "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
229+ {"name": "write_file", "description": "Write content to file.",
230+ "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
231+ {"name": "edit_file", "description": "Replace exact text in file.",
232+ "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
233+ {"name": "send_message", "description": "Send message to a teammate.",
234+ "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
235+ {"name": "read_inbox", "description": "Read and drain your inbox.",
236+ "input_schema": {"type": "object", "properties": {}}},
237+ ]
109238
239+ def list_all(self) -> str:
240+ if not self.config["members"]:
241+ return "No teammates."
242+ lines = [f"Team: {self.config['team_name']}"]
243+ for m in self.config["members"]:
244+ lines.append(f" {m['name']} ({m['role']}): {m['status']}")
245+ return "\n".join(lines)
110246
111-BG = BackgroundManager()
247+ def member_names(self) -> list:
248+ return [m["name"] for m in self.config["members"]]
112249
113250
114-# -- Tool implementations --
115-def safe_path(p: str) -> Path:
251+TEAM = TeammateManager(TEAM_DIR)
252+
253+
254+# -- Base tool implementations (these base tools are unchanged from s02) --
255+def _safe_path(p: str) -> Path:
116256 path = (WORKDIR / p).resolve()
117257 if not path.is_relative_to(WORKDIR):
118258 raise ValueError(f"Path escapes workspace: {p}")
119259 return path
120260
121-def run_bash(command: str) -> str:
122- dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
261+
262+def _run_bash(command: str) -> str:
263+ dangerous = ["rm -rf /", "sudo", "shutdown", "reboot"]
123264 if any(d in command for d in dangerous):
124265 return "Error: Dangerous command blocked"
125266 try:
126- r = subprocess.run(command, shell=True, cwd=WORKDIR,
127- capture_output=True, text=True, timeout=120)
267+ r = subprocess.run(
268+ command, shell=True, cwd=WORKDIR,
269+ capture_output=True, text=True, timeout=120,
270+ )
128271 out = (r.stdout + r.stderr).strip()
129272 return out[:50000] if out else "(no output)"
130273 except subprocess.TimeoutExpired:
131274 return "Error: Timeout (120s)"
132275
133-def run_read(path: str, limit: int = None) -> str:
276+
277+def _run_read(path: str, limit: int = None) -> str:
134278 try:
135- lines = safe_path(path).read_text().splitlines()
279+ lines = _safe_path(path).read_text().splitlines()
136280 if limit and limit < len(lines):
137281 lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
138282 return "\n".join(lines)[:50000]
139283 except Exception as e:
140284 return f"Error: {e}"
141285
142-def run_write(path: str, content: str) -> str:
286+
287+def _run_write(path: str, content: str) -> str:
143288 try:
144- fp = safe_path(path)
289+ fp = _safe_path(path)
145290 fp.parent.mkdir(parents=True, exist_ok=True)
146291 fp.write_text(content)
147292 return f"Wrote {len(content)} bytes"
148293 except Exception as e:
149294 return f"Error: {e}"
150295
151-def run_edit(path: str, old_text: str, new_text: str) -> str:
296+
297+def _run_edit(path: str, old_text: str, new_text: str) -> str:
152298 try:
153- fp = safe_path(path)
299+ fp = _safe_path(path)
154300 c = fp.read_text()
155301 if old_text not in c:
156302 return f"Error: Text not found in {path}"
157303 fp.write_text(c.replace(old_text, new_text, 1))
158304 return f"Edited {path}"
159305 except Exception as e:
160306 return f"Error: {e}"
161307
162308
309+# -- Lead tool dispatch (9 tools) --
163310TOOL_HANDLERS = {
164- "bash": lambda **kw: run_bash(kw["command"]),
165- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
166- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
167- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
168- "background_run": lambda **kw: BG.run(kw["command"]),
169- "check_background": lambda **kw: BG.check(kw.get("task_id")),
311+ "bash": lambda **kw: _run_bash(kw["command"]),
312+ "read_file": lambda **kw: _run_read(kw["path"], kw.get("limit")),
313+ "write_file": lambda **kw: _run_write(kw["path"], kw["content"]),
314+ "edit_file": lambda **kw: _run_edit(kw["path"], kw["old_text"], kw["new_text"]),
315+ "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
316+ "list_teammates": lambda **kw: TEAM.list_all(),
317+ "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
318+ "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
319+ "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
170320}
171321
322+# these base tools are unchanged from s02
172323TOOLS = [
173- {"name": "bash", "description": "Run a shell command (blocking).",
324+ {"name": "bash", "description": "Run a shell command.",
174325 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
175326 {"name": "read_file", "description": "Read file contents.",
176327 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
177328 {"name": "write_file", "description": "Write content to file.",
178329 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
179330 {"name": "edit_file", "description": "Replace exact text in file.",
180331 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
181- {"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
182- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
183- {"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
184- "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
332+ {"name": "spawn_teammate", "description": "Spawn a persistent teammate that runs in its own thread.",
333+ "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
334+ {"name": "list_teammates", "description": "List all teammates with name, role, status.",
335+ "input_schema": {"type": "object", "properties": {}}},
336+ {"name": "send_message", "description": "Send a message to a teammate's inbox.",
337+ "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
338+ {"name": "read_inbox", "description": "Read and drain the lead's inbox.",
339+ "input_schema": {"type": "object", "properties": {}}},
340+ {"name": "broadcast", "description": "Send a message to all teammates.",
341+ "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
185342]
186343
187344
188345def agent_loop(messages: list):
189346 while True:
190- # Drain background notifications and inject as system message before LLM call
191- notifs = BG.drain_notifications()
192- if notifs and messages:
193- notif_text = "\n".join(
194- f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
195- )
196- messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
197- messages.append({"role": "assistant", "content": "Noted background results."})
347+ inbox = BUS.read_inbox("lead")
348+ if inbox:
349+ messages.append({
350+ "role": "user",
351+ "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
352+ })
353+ messages.append({
354+ "role": "assistant",
355+ "content": "Noted inbox messages.",
356+ })
198357 response = client.messages.create(
199- model=MODEL, system=SYSTEM, messages=messages,
200- tools=TOOLS, max_tokens=8000,
358+ model=MODEL,
359+ system=SYSTEM,
360+ messages=messages,
361+ tools=TOOLS,
362+ max_tokens=8000,
201363 )
202364 messages.append({"role": "assistant", "content": response.content})
203365 if response.stop_reason != "tool_use":
204366 return
205367 results = []
206368 for block in response.content:
207369 if block.type == "tool_use":
208370 handler = TOOL_HANDLERS.get(block.name)
209371 try:
210372 output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
211373 except Exception as e:
212374 output = f"Error: {e}"
213375 print(f"> {block.name}: {str(output)[:200]}")
214- results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
376+ results.append({
377+ "type": "tool_result",
378+ "tool_use_id": block.id,
379+ "content": str(output),
380+ })
215381 messages.append({"role": "user", "content": results})
216382
217383
218384if __name__ == "__main__":
219385 history = []
220386 while True:
221387 try:
222- query = input("\033[36ms08 >> \033[0m")
388+ query = input("\033[36ms09 >> \033[0m")
223389 except (EOFError, KeyboardInterrupt):
224390 break
225391 if query.strip().lower() in ("q", "exit", ""):
226392 break
393+ if query.strip() == "/team":
394+ print(TEAM.list_all())
395+ continue
396+ if query.strip() == "/inbox":
397+ print(json.dumps(BUS.read_inbox("lead"), indent=2))
398+ continue
227399 history.append({"role": "user", "content": query})
228400 agent_loop(history)
229401 response_content = history[-1]["content"]
230402 if isinstance(response_content, list):
231403 for block in response_content:
232404 if hasattr(block, "text"):
233405 print(block.text)
234406 print()