Learn Claude Code
Back to Background Tasks

TasksBackground Tasks

s07 (207 LOC) → s08 (198 LOC)

LOC Delta

-9lines

New Tools

2

background_runcheck_background
New Classes

1

BackgroundManager
New Functions

0

Tasks

Task Graph + Dependencies

207 LOC

8 tools: bash, read_file, write_file, edit_file, task_create, task_update, task_list, task_get

planning

Background Tasks

Background Threads + Notifications

198 LOC

6 tools: bash, read_file, write_file, edit_file, background_run, check_background

concurrency

Source Code Diff

s07 (s07_task_system.py) -> s08 (s08_background_tasks.py)
11#!/usr/bin/env python3
2-# Harness: persistent tasks -- goals that outlive any single conversation.
2+# Harness: background execution -- the model thinks while the harness waits.
33"""
4-s07_task_system.py - Tasks
4+s08_background_tasks.py - Background Tasks
55
6-Tasks persist as JSON files in .tasks/ so they survive context compression.
7-Each task has a dependency graph (blockedBy/blocks).
6+Run commands in background threads. A notification queue is drained
7+before each LLM call to deliver results.
88
9- .tasks/
10- task_1.json {"id":1, "subject":"...", "status":"completed", ...}
11- task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
12- task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
9+ Main thread Background thread
10+ +-----------------+ +-----------------+
11+ | agent loop | | task executes |
12+ | ... | | ... |
13+ | [LLM call] <---+------- | enqueue(result) |
14+ | ^drain queue | +-----------------+
15+ +-----------------+
1316
14- Dependency resolution:
15- +----------+ +----------+ +----------+
16- | task 1 | --> | task 2 | --> | task 3 |
17- | complete | | blocked | | blocked |
18- +----------+ +----------+ +----------+
19- | ^
20- +--- completing task 1 removes it from task 2's blockedBy
17+ Timeline:
18+ Agent ----[spawn A]----[spawn B]----[other work]----
19+ | |
20+ v v
21+ [A runs] [B runs] (parallel)
22+ | |
23+ +-- notification queue --> [results injected]
2124
22-Key insight: "State that survives compression -- because it's outside the conversation."
25+Key insight: "Fire and forget -- the agent doesn't block while the command runs."
2326"""
2427
25-import json
2628import os
2729import subprocess
30+import threading
31+import uuid
2832from pathlib import Path
2933
3034from anthropic import Anthropic
3135from dotenv import load_dotenv
3236
3337load_dotenv(override=True)
3438
3539if os.getenv("ANTHROPIC_BASE_URL"):
3640 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
3741
3842WORKDIR = Path.cwd()
3943client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
4044MODEL = os.environ["MODEL_ID"]
41-TASKS_DIR = WORKDIR / ".tasks"
4245
43-SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."
46+SYSTEM = f"You are a coding agent at {WORKDIR}. Use background_run for long-running commands."
4447
4548
46-# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
47-class TaskManager:
48- def __init__(self, tasks_dir: Path):
49- self.dir = tasks_dir
50- self.dir.mkdir(exist_ok=True)
51- self._next_id = self._max_id() + 1
49+# -- BackgroundManager: threaded execution + notification queue --
50+class BackgroundManager:
51+ def __init__(self):
52+ self.tasks = {} # task_id -> {status, result, command}
53+ self._notification_queue = [] # completed task results
54+ self._lock = threading.Lock()
5255
53- def _max_id(self) -> int:
54- ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
55- return max(ids) if ids else 0
56+ def run(self, command: str) -> str:
57+ """Start a background thread, return task_id immediately."""
58+ task_id = str(uuid.uuid4())[:8]
59+ self.tasks[task_id] = {"status": "running", "result": None, "command": command}
60+ thread = threading.Thread(
61+ target=self._execute, args=(task_id, command), daemon=True
62+ )
63+ thread.start()
64+ return f"Background task {task_id} started: {command[:80]}"
5665
57- def _load(self, task_id: int) -> dict:
58- path = self.dir / f"task_{task_id}.json"
59- if not path.exists():
60- raise ValueError(f"Task {task_id} not found")
61- return json.loads(path.read_text())
66+ def _execute(self, task_id: str, command: str):
67+ """Thread target: run subprocess, capture output, push to queue."""
68+ try:
69+ r = subprocess.run(
70+ command, shell=True, cwd=WORKDIR,
71+ capture_output=True, text=True, timeout=300
72+ )
73+ output = (r.stdout + r.stderr).strip()[:50000]
74+ status = "completed"
75+ except subprocess.TimeoutExpired:
76+ output = "Error: Timeout (300s)"
77+ status = "timeout"
78+ except Exception as e:
79+ output = f"Error: {e}"
80+ status = "error"
81+ self.tasks[task_id]["status"] = status
82+ self.tasks[task_id]["result"] = output or "(no output)"
83+ with self._lock:
84+ self._notification_queue.append({
85+ "task_id": task_id,
86+ "status": status,
87+ "command": command[:80],
88+ "result": (output or "(no output)")[:500],
89+ })
6290
63- def _save(self, task: dict):
64- path = self.dir / f"task_{task['id']}.json"
65- path.write_text(json.dumps(task, indent=2))
66-
67- def create(self, subject: str, description: str = "") -> str:
68- task = {
69- "id": self._next_id, "subject": subject, "description": description,
70- "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
71- }
72- self._save(task)
73- self._next_id += 1
74- return json.dumps(task, indent=2)
75-
76- def get(self, task_id: int) -> str:
77- return json.dumps(self._load(task_id), indent=2)
78-
79- def update(self, task_id: int, status: str = None,
80- add_blocked_by: list = None, add_blocks: list = None) -> str:
81- task = self._load(task_id)
82- if status:
83- if status not in ("pending", "in_progress", "completed"):
84- raise ValueError(f"Invalid status: {status}")
85- task["status"] = status
86- # When a task is completed, remove it from all other tasks' blockedBy
87- if status == "completed":
88- self._clear_dependency(task_id)
89- if add_blocked_by:
90- task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
91- if add_blocks:
92- task["blocks"] = list(set(task["blocks"] + add_blocks))
93- # Bidirectional: also update the blocked tasks' blockedBy lists
94- for blocked_id in add_blocks:
95- try:
96- blocked = self._load(blocked_id)
97- if task_id not in blocked["blockedBy"]:
98- blocked["blockedBy"].append(task_id)
99- self._save(blocked)
100- except ValueError:
101- pass
102- self._save(task)
103- return json.dumps(task, indent=2)
104-
105- def _clear_dependency(self, completed_id: int):
106- """Remove completed_id from all other tasks' blockedBy lists."""
107- for f in self.dir.glob("task_*.json"):
108- task = json.loads(f.read_text())
109- if completed_id in task.get("blockedBy", []):
110- task["blockedBy"].remove(completed_id)
111- self._save(task)
112-
113- def list_all(self) -> str:
114- tasks = []
115- for f in sorted(self.dir.glob("task_*.json")):
116- tasks.append(json.loads(f.read_text()))
117- if not tasks:
118- return "No tasks."
91+ def check(self, task_id: str = None) -> str:
92+ """Check status of one task or list all."""
93+ if task_id:
94+ t = self.tasks.get(task_id)
95+ if not t:
96+ return f"Error: Unknown task {task_id}"
97+ return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
11998 lines = []
120- for t in tasks:
121- marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
122- blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
123- lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
124- return "\n".join(lines)
99+ for tid, t in self.tasks.items():
100+ lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
101+ return "\n".join(lines) if lines else "No background tasks."
125102
103+ def drain_notifications(self) -> list:
104+ """Return and clear all pending completion notifications."""
105+ with self._lock:
106+ notifs = list(self._notification_queue)
107+ self._notification_queue.clear()
108+ return notifs
126109
127-TASKS = TaskManager(TASKS_DIR)
128110
111+BG = BackgroundManager()
129112
130-# -- Base tool implementations --
113+
114+# -- Tool implementations --
131115def safe_path(p: str) -> Path:
132116 path = (WORKDIR / p).resolve()
133117 if not path.is_relative_to(WORKDIR):
134118 raise ValueError(f"Path escapes workspace: {p}")
135119 return path
136120
137121def run_bash(command: str) -> str:
138122 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
139123 if any(d in command for d in dangerous):
140124 return "Error: Dangerous command blocked"
141125 try:
142126 r = subprocess.run(command, shell=True, cwd=WORKDIR,
143127 capture_output=True, text=True, timeout=120)
144128 out = (r.stdout + r.stderr).strip()
145129 return out[:50000] if out else "(no output)"
146130 except subprocess.TimeoutExpired:
147131 return "Error: Timeout (120s)"
148132
149133def run_read(path: str, limit: int = None) -> str:
150134 try:
151135 lines = safe_path(path).read_text().splitlines()
152136 if limit and limit < len(lines):
153137 lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
154138 return "\n".join(lines)[:50000]
155139 except Exception as e:
156140 return f"Error: {e}"
157141
158142def run_write(path: str, content: str) -> str:
159143 try:
160144 fp = safe_path(path)
161145 fp.parent.mkdir(parents=True, exist_ok=True)
162146 fp.write_text(content)
163147 return f"Wrote {len(content)} bytes"
164148 except Exception as e:
165149 return f"Error: {e}"
166150
167151def run_edit(path: str, old_text: str, new_text: str) -> str:
168152 try:
169153 fp = safe_path(path)
170154 c = fp.read_text()
171155 if old_text not in c:
172156 return f"Error: Text not found in {path}"
173157 fp.write_text(c.replace(old_text, new_text, 1))
174158 return f"Edited {path}"
175159 except Exception as e:
176160 return f"Error: {e}"
177161
178162
179163TOOL_HANDLERS = {
180- "bash": lambda **kw: run_bash(kw["command"]),
181- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
182- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
183- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
184- "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
185- "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("addBlocks")),
186- "task_list": lambda **kw: TASKS.list_all(),
187- "task_get": lambda **kw: TASKS.get(kw["task_id"]),
164+ "bash": lambda **kw: run_bash(kw["command"]),
165+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
166+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
167+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
168+ "background_run": lambda **kw: BG.run(kw["command"]),
169+ "check_background": lambda **kw: BG.check(kw.get("task_id")),
188170}
189171
190172TOOLS = [
191- {"name": "bash", "description": "Run a shell command.",
173+ {"name": "bash", "description": "Run a shell command (blocking).",
192174 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
193175 {"name": "read_file", "description": "Read file contents.",
194176 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
195177 {"name": "write_file", "description": "Write content to file.",
196178 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
197179 {"name": "edit_file", "description": "Replace exact text in file.",
198180 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
199- {"name": "task_create", "description": "Create a new task.",
200- "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
201- {"name": "task_update", "description": "Update a task's status or dependencies.",
202- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "addBlocks": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
203- {"name": "task_list", "description": "List all tasks with status summary.",
204- "input_schema": {"type": "object", "properties": {}}},
205- {"name": "task_get", "description": "Get full details of a task by ID.",
206- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
181+ {"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
182+ "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
183+ {"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
184+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
207185]
208186
209187
210188def agent_loop(messages: list):
211189 while True:
190+ # Drain background notifications and inject as system message before LLM call
191+ notifs = BG.drain_notifications()
192+ if notifs and messages:
193+ notif_text = "\n".join(
194+ f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
195+ )
196+ messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
197+ messages.append({"role": "assistant", "content": "Noted background results."})
212198 response = client.messages.create(
213199 model=MODEL, system=SYSTEM, messages=messages,
214200 tools=TOOLS, max_tokens=8000,
215201 )
216202 messages.append({"role": "assistant", "content": response.content})
217203 if response.stop_reason != "tool_use":
218204 return
219205 results = []
220206 for block in response.content:
221207 if block.type == "tool_use":
222208 handler = TOOL_HANDLERS.get(block.name)
223209 try:
224210 output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
225211 except Exception as e:
226212 output = f"Error: {e}"
227213 print(f"> {block.name}: {str(output)[:200]}")
228214 results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
229215 messages.append({"role": "user", "content": results})
230216
231217
232218if __name__ == "__main__":
233219 history = []
234220 while True:
235221 try:
236- query = input("\033[36ms07 >> \033[0m")
222+ query = input("\033[36ms08 >> \033[0m")
237223 except (EOFError, KeyboardInterrupt):
238224 break
239225 if query.strip().lower() in ("q", "exit", ""):
240226 break
241227 history.append({"role": "user", "content": query})
242228 agent_loop(history)
243229 response_content = history[-1]["content"]
244230 if isinstance(response_content, list):
245231 for block in response_content:
246232 if hasattr(block, "text"):
247233 print(block.text)
248234 print()