Learn Claude Code
Back to Tasks

CompactTasks

s06 (205 LOC) → s07 (207 LOC)

LOC Delta

+2lines

New Tools

4

task_createtask_updatetask_listtask_get
New Classes

1

TaskManager
New Functions

0

Compact

Three-Layer Compression

205 LOC

5 tools: bash, read_file, write_file, edit_file, compact

memory

Tasks

Task Graph + Dependencies

207 LOC

8 tools: bash, read_file, write_file, edit_file, task_create, task_update, task_list, task_get

planning

Source Code Diff

s06 (s06_context_compact.py) -> s07 (s07_task_system.py)
11#!/usr/bin/env python3
2-# Harness: compression -- clean memory for infinite sessions.
2+# Harness: persistent tasks -- goals that outlive any single conversation.
33"""
4-s06_context_compact.py - Compact
4+s07_task_system.py - Tasks
55
6-Three-layer compression pipeline so the agent can work forever:
6+Tasks persist as JSON files in .tasks/ so they survive context compression.
7+Each task has a dependency graph (blockedBy/blocks).
78
8- Every turn:
9- +------------------+
10- | Tool call result |
11- +------------------+
12- |
13- v
14- [Layer 1: micro_compact] (silent, every turn)
15- Replace tool_result content older than last 3
16- with "[Previous: used {tool_name}]"
17- |
18- v
19- [Check: tokens > 50000?]
20- | |
21- no yes
22- | |
23- v v
24- continue [Layer 2: auto_compact]
25- Save full transcript to .transcripts/
26- Ask LLM to summarize conversation.
27- Replace all messages with [summary].
28- |
29- v
30- [Layer 3: compact tool]
31- Model calls compact -> immediate summarization.
32- Same as auto, triggered manually.
9+ .tasks/
10+ task_1.json {"id":1, "subject":"...", "status":"completed", ...}
11+ task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
12+ task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
3313
34-Key insight: "The agent can forget strategically and keep working forever."
14+ Dependency resolution:
15+ +----------+ +----------+ +----------+
16+ | task 1 | --> | task 2 | --> | task 3 |
17+ | complete | | blocked | | blocked |
18+ +----------+ +----------+ +----------+
19+ | ^
20+ +--- completing task 1 removes it from task 2's blockedBy
21+
22+Key insight: "State that survives compression -- because it's outside the conversation."
3523"""
3624
3725import json
3826import os
3927import subprocess
40-import time
4128from pathlib import Path
4229
4330from anthropic import Anthropic
4431from dotenv import load_dotenv
4532
4633load_dotenv(override=True)
4734
4835if os.getenv("ANTHROPIC_BASE_URL"):
4936 os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
5037
5138WORKDIR = Path.cwd()
5239client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
5340MODEL = os.environ["MODEL_ID"]
41+TASKS_DIR = WORKDIR / ".tasks"
5442
55-SYSTEM = f"You are a coding agent at {WORKDIR}. Use tools to solve tasks."
43+SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."
5644
57-THRESHOLD = 50000
58-TRANSCRIPT_DIR = WORKDIR / ".transcripts"
59-KEEP_RECENT = 3
6045
46+# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
47+class TaskManager:
48+ def __init__(self, tasks_dir: Path):
49+ self.dir = tasks_dir
50+ self.dir.mkdir(exist_ok=True)
51+ self._next_id = self._max_id() + 1
6152
62-def estimate_tokens(messages: list) -> int:
63- """Rough token count: ~4 chars per token."""
64- return len(str(messages)) // 4
53+ def _max_id(self) -> int:
54+ ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
55+ return max(ids) if ids else 0
6556
57+ def _load(self, task_id: int) -> dict:
58+ path = self.dir / f"task_{task_id}.json"
59+ if not path.exists():
60+ raise ValueError(f"Task {task_id} not found")
61+ return json.loads(path.read_text())
6662
67-# -- Layer 1: micro_compact - replace old tool results with placeholders --
68-def micro_compact(messages: list) -> list:
69- # Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
70- tool_results = []
71- for msg_idx, msg in enumerate(messages):
72- if msg["role"] == "user" and isinstance(msg.get("content"), list):
73- for part_idx, part in enumerate(msg["content"]):
74- if isinstance(part, dict) and part.get("type") == "tool_result":
75- tool_results.append((msg_idx, part_idx, part))
76- if len(tool_results) <= KEEP_RECENT:
77- return messages
78- # Find tool_name for each result by matching tool_use_id in prior assistant messages
79- tool_name_map = {}
80- for msg in messages:
81- if msg["role"] == "assistant":
82- content = msg.get("content", [])
83- if isinstance(content, list):
84- for block in content:
85- if hasattr(block, "type") and block.type == "tool_use":
86- tool_name_map[block.id] = block.name
87- # Clear old results (keep last KEEP_RECENT)
88- to_clear = tool_results[:-KEEP_RECENT]
89- for _, _, result in to_clear:
90- if isinstance(result.get("content"), str) and len(result["content"]) > 100:
91- tool_id = result.get("tool_use_id", "")
92- tool_name = tool_name_map.get(tool_id, "unknown")
93- result["content"] = f"[Previous: used {tool_name}]"
94- return messages
63+ def _save(self, task: dict):
64+ path = self.dir / f"task_{task['id']}.json"
65+ path.write_text(json.dumps(task, indent=2))
9566
67+ def create(self, subject: str, description: str = "") -> str:
68+ task = {
69+ "id": self._next_id, "subject": subject, "description": description,
70+ "status": "pending", "blockedBy": [], "blocks": [], "owner": "",
71+ }
72+ self._save(task)
73+ self._next_id += 1
74+ return json.dumps(task, indent=2)
9675
97-# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
98-def auto_compact(messages: list) -> list:
99- # Save full transcript to disk
100- TRANSCRIPT_DIR.mkdir(exist_ok=True)
101- transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
102- with open(transcript_path, "w") as f:
103- for msg in messages:
104- f.write(json.dumps(msg, default=str) + "\n")
105- print(f"[transcript saved: {transcript_path}]")
106- # Ask LLM to summarize
107- conversation_text = json.dumps(messages, default=str)[:80000]
108- response = client.messages.create(
109- model=MODEL,
110- messages=[{"role": "user", "content":
111- "Summarize this conversation for continuity. Include: "
112- "1) What was accomplished, 2) Current state, 3) Key decisions made. "
113- "Be concise but preserve critical details.\n\n" + conversation_text}],
114- max_tokens=2000,
115- )
116- summary = response.content[0].text
117- # Replace all messages with compressed summary
118- return [
119- {"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
120- {"role": "assistant", "content": "Understood. I have the context from the summary. Continuing."},
121- ]
76+ def get(self, task_id: int) -> str:
77+ return json.dumps(self._load(task_id), indent=2)
12278
79+ def update(self, task_id: int, status: str = None,
80+ add_blocked_by: list = None, add_blocks: list = None) -> str:
81+ task = self._load(task_id)
82+ if status:
83+ if status not in ("pending", "in_progress", "completed"):
84+ raise ValueError(f"Invalid status: {status}")
85+ task["status"] = status
86+ # When a task is completed, remove it from all other tasks' blockedBy
87+ if status == "completed":
88+ self._clear_dependency(task_id)
89+ if add_blocked_by:
90+ task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
91+ if add_blocks:
92+ task["blocks"] = list(set(task["blocks"] + add_blocks))
93+ # Bidirectional: also update the blocked tasks' blockedBy lists
94+ for blocked_id in add_blocks:
95+ try:
96+ blocked = self._load(blocked_id)
97+ if task_id not in blocked["blockedBy"]:
98+ blocked["blockedBy"].append(task_id)
99+ self._save(blocked)
100+ except ValueError:
101+ pass
102+ self._save(task)
103+ return json.dumps(task, indent=2)
123104
124-# -- Tool implementations --
105+ def _clear_dependency(self, completed_id: int):
106+ """Remove completed_id from all other tasks' blockedBy lists."""
107+ for f in self.dir.glob("task_*.json"):
108+ task = json.loads(f.read_text())
109+ if completed_id in task.get("blockedBy", []):
110+ task["blockedBy"].remove(completed_id)
111+ self._save(task)
112+
113+ def list_all(self) -> str:
114+ tasks = []
115+ for f in sorted(self.dir.glob("task_*.json")):
116+ tasks.append(json.loads(f.read_text()))
117+ if not tasks:
118+ return "No tasks."
119+ lines = []
120+ for t in tasks:
121+ marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
122+ blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
123+ lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
124+ return "\n".join(lines)
125+
126+
127+TASKS = TaskManager(TASKS_DIR)
128+
129+
130+# -- Base tool implementations --
125131def safe_path(p: str) -> Path:
126132 path = (WORKDIR / p).resolve()
127133 if not path.is_relative_to(WORKDIR):
128134 raise ValueError(f"Path escapes workspace: {p}")
129135 return path
130136
131137def run_bash(command: str) -> str:
132138 dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
133139 if any(d in command for d in dangerous):
134140 return "Error: Dangerous command blocked"
135141 try:
136142 r = subprocess.run(command, shell=True, cwd=WORKDIR,
137143 capture_output=True, text=True, timeout=120)
138144 out = (r.stdout + r.stderr).strip()
139145 return out[:50000] if out else "(no output)"
140146 except subprocess.TimeoutExpired:
141147 return "Error: Timeout (120s)"
142148
143149def run_read(path: str, limit: int = None) -> str:
144150 try:
145151 lines = safe_path(path).read_text().splitlines()
146152 if limit and limit < len(lines):
147153 lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
148154 return "\n".join(lines)[:50000]
149155 except Exception as e:
150156 return f"Error: {e}"
151157
152158def run_write(path: str, content: str) -> str:
153159 try:
154160 fp = safe_path(path)
155161 fp.parent.mkdir(parents=True, exist_ok=True)
156162 fp.write_text(content)
157163 return f"Wrote {len(content)} bytes"
158164 except Exception as e:
159165 return f"Error: {e}"
160166
161167def run_edit(path: str, old_text: str, new_text: str) -> str:
162168 try:
163169 fp = safe_path(path)
164- content = fp.read_text()
165- if old_text not in content:
170+ c = fp.read_text()
171+ if old_text not in c:
166172 return f"Error: Text not found in {path}"
167- fp.write_text(content.replace(old_text, new_text, 1))
173+ fp.write_text(c.replace(old_text, new_text, 1))
168174 return f"Edited {path}"
169175 except Exception as e:
170176 return f"Error: {e}"
171177
172178
173179TOOL_HANDLERS = {
174- "bash": lambda **kw: run_bash(kw["command"]),
175- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
176- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
177- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
178- "compact": lambda **kw: "Manual compression requested.",
180+ "bash": lambda **kw: run_bash(kw["command"]),
181+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
182+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
183+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
184+ "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
185+ "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("addBlocks")),
186+ "task_list": lambda **kw: TASKS.list_all(),
187+ "task_get": lambda **kw: TASKS.get(kw["task_id"]),
179188}
180189
181190TOOLS = [
182191 {"name": "bash", "description": "Run a shell command.",
183192 "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
184193 {"name": "read_file", "description": "Read file contents.",
185194 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
186195 {"name": "write_file", "description": "Write content to file.",
187196 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
188197 {"name": "edit_file", "description": "Replace exact text in file.",
189198 "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
190- {"name": "compact", "description": "Trigger manual conversation compression.",
191- "input_schema": {"type": "object", "properties": {"focus": {"type": "string", "description": "What to preserve in the summary"}}}},
199+ {"name": "task_create", "description": "Create a new task.",
200+ "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
201+ {"name": "task_update", "description": "Update a task's status or dependencies.",
202+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "addBlocks": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
203+ {"name": "task_list", "description": "List all tasks with status summary.",
204+ "input_schema": {"type": "object", "properties": {}}},
205+ {"name": "task_get", "description": "Get full details of a task by ID.",
206+ "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
192207]
193208
194209
195210def agent_loop(messages: list):
196211 while True:
197- # Layer 1: micro_compact before each LLM call
198- micro_compact(messages)
199- # Layer 2: auto_compact if token estimate exceeds threshold
200- if estimate_tokens(messages) > THRESHOLD:
201- print("[auto_compact triggered]")
202- messages[:] = auto_compact(messages)
203212 response = client.messages.create(
204213 model=MODEL, system=SYSTEM, messages=messages,
205214 tools=TOOLS, max_tokens=8000,
206215 )
207216 messages.append({"role": "assistant", "content": response.content})
208217 if response.stop_reason != "tool_use":
209218 return
210219 results = []
211- manual_compact = False
212220 for block in response.content:
213221 if block.type == "tool_use":
214- if block.name == "compact":
215- manual_compact = True
216- output = "Compressing..."
217- else:
218- handler = TOOL_HANDLERS.get(block.name)
219- try:
220- output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
221- except Exception as e:
222- output = f"Error: {e}"
222+ handler = TOOL_HANDLERS.get(block.name)
223+ try:
224+ output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
225+ except Exception as e:
226+ output = f"Error: {e}"
223227 print(f"> {block.name}: {str(output)[:200]}")
224228 results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
225229 messages.append({"role": "user", "content": results})
226- # Layer 3: manual compact triggered by the compact tool
227- if manual_compact:
228- print("[manual compact]")
229- messages[:] = auto_compact(messages)
230230
231231
232232if __name__ == "__main__":
233233 history = []
234234 while True:
235235 try:
236- query = input("\033[36ms06 >> \033[0m")
236+ query = input("\033[36ms07 >> \033[0m")
237237 except (EOFError, KeyboardInterrupt):
238238 break
239239 if query.strip().lower() in ("q", "exit", ""):
240240 break
241241 history.append({"role": "user", "content": query})
242242 agent_loop(history)
243243 response_content = history[-1]["content"]
244244 if isinstance(response_content, list):
245245 for block in response_content:
246246 if hasattr(block, "text"):
247247 print(block.text)
248248 print()