flowchart TD
L["30-line Loop"] --> T["Tool Dispatch Map"]
T --> S["Subagents (fresh context)"]
T --> C["Compaction (when context fills)"]
T --> SK["Skills (lazy-load knowledge)"]
T --> H["Hooks (deterministic control)"]
S --> GA["Gas Town (many loops)"]
style L fill:#dbeafe,stroke:#1e40af,color:#1e40af
style T fill:#dcfce7,stroke:#166534,color:#166534
style S fill:#fef3c7,stroke:#92400e,color:#92400e
style C fill:#f3e8ff,stroke:#6b21a8,color:#6b21a8
style SK fill:#fce7f3,stroke:#9d174d,color:#9d174d
style H fill:#dcfce7,stroke:#166534,color:#166534
style GA fill:#fee2e2,stroke:#b91c1c,color:#b91c1c
6 Build a Claude Code Clone
30 Lines to a Working Agent — Then Layer Everything On
You’ve learned how the ReAct loop works (Chapter 2), how context accumulates and compacts (Chapter 3), and how extensions customize behavior (Chapter 4). Now build it yourself. Starting from a 30-line Python loop using the raw Anthropic Messages API, this chapter progressively adds tool dispatch, subagents, compaction, skills, and hooks, then assembles a complete agent — a task graph, worktree isolation, a teammate protocol, and error recovery — proving that Claude Code’s sophistication grows from a trivial core and that every later mechanism layers on without touching the loop. Based on the open-source Learn Claude Code course by shareAI.
6.1 The Loop Is 30 Lines
Every concept from Chapter 2 — the message array, tool calls, tool results, the “text or tool call?” decision — compiles down to this:
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-sonnet-4-20250514"
SYSTEM = "You are a coding assistant. Use tools to complete tasks."
TOOLS = [{
"name": "bash",
"description": "Run a shell command and return its output.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
}]
def run_bash(command: str) -> str:
import subprocess
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return result.stdout + result.stderr
def agent_loop(query: str):
messages = [{"role": "user", "content": query}]
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return response.content[0].text # Done — final text response
# Execute tool calls, collect results
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})That’s a complete autonomous agent. It reads files, runs tests, installs packages, and self-corrects — because the while True loop keeps spinning until the model decides it’s done.
6.1.1 Map to Chapter 2’s concepts
| Chapter 2 concept | Code equivalent |
|---|---|
| Message array | messages list |
| System message | SYSTEM string passed to API |
| Tool definitions | TOOLS list with name + schema |
| “Text or tool call?” decision | response.stop_reason != "tool_use" |
| Tool execution | run_bash(block.input["command"]) |
| Tool result appended to array | messages.append({"role": "user", "content": results}) |
| Loop continues | while True |
The write-back step — appending tool results as a new user message — is the single most important line. Without it, the model never sees what happened. With it, the loop is self-sustaining.
6.2 Adding Tools Without Changing the Loop
A single bash tool is powerful but dangerous. The next step is a dispatch map — a dictionary routing tool names to handler functions:
from pathlib import Path
WORKDIR = Path(".")
def safe_path(p: str) -> Path:
"""Prevent directory escape."""
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_read(path: str, limit: int = None) -> str:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50_000]
def run_write(path: str, content: str) -> str:
safe_path(path).write_text(content)
return f"Written: {path}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
p = safe_path(path)
text = p.read_text()
if old_text not in text:
return f"ERROR: old_text not found in {path}"
p.write_text(text.replace(old_text, new_text, 1))
return f"Edited: {path}"
# The dispatch map — add tools here, never touch the loop
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}The loop’s tool execution block becomes:
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})The loop itself never changes. You add capabilities by adding entries to TOOL_HANDLERS and corresponding schemas to TOOLS. This is why the course’s tagline is “Bash is all you need” — the loop structure established in s01 carries the entire 19-chapter curriculum.
6.3 Subagents as Context Boundaries
Chapter 4 explained that subagents get their own context window. Here’s what that means in code:
def run_subagent(task: str) -> str:
"""Spawn a child agent with fresh context. Return only its final answer."""
child_messages = [{"role": "user", "content": task}]
while True:
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=child_messages,
tools=TOOLS, max_tokens=8000, # No "task" tool — prevents recursion
)
child_messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
# Return ONLY the final text — discard all intermediate context
return response.content[0].text
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
child_messages.append({"role": "user", "content": results})The subagent is the same loop with child_messages = [] — a fresh context. It might read 10 files and run 5 commands, but the parent only sees the final summary. All intermediate tool calls and results are discarded when the function returns.
This is the “disposable scratch pad” pattern: fresh context in, short summary out. The parent’s message array stays lean.
6.4 Context Compaction
Chapter 3 explained that context fills up and gets compressed. Here’s the implementation — a multi-lever approach:
def count_tokens(messages: list) -> int:
"""Approximate token count."""
return sum(len(str(m)) // 4 for m in messages)
def micro_compact(messages: list, keep_recent: int = 6) -> list:
"""Replace old tool results with one-line placeholders."""
if len(messages) <= keep_recent:
return messages
compacted = []
for i, msg in enumerate(messages):
if i < len(messages) - keep_recent and msg["role"] == "user":
# Check if it's a tool result
content = msg.get("content", "")
if isinstance(content, list) and any(
item.get("type") == "tool_result" for item in content
):
compacted.append({
"role": "user",
"content": "[Previous tool results compacted]"
})
continue
compacted.append(msg)
return compacted
def full_compact(messages: list) -> list:
"""Use the LLM itself to summarize the conversation so far."""
transcript = "\n".join(str(m) for m in messages)
summary = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
f"Summarize this agent conversation. Preserve: "
f"(1) what was accomplished, (2) current state, "
f"(3) what remains to do.\n\n{transcript[:50000]}"
}],
max_tokens=2000,
).content[0].text
return [{"role": "user", "content":
f"[Session compacted. Summary of prior work:]\n{summary}\n\n"
f"Continue from where you left off."
}]Integrated into the loop:
def agent_loop(query: str):
messages = [{"role": "user", "content": query}]
while True:
# Compaction check before each API call
if count_tokens(messages) > 50_000:
messages = full_compact(messages)
else:
messages = micro_compact(messages)
response = client.messages.create(...)
# ... rest of loop unchangedThis is exactly the mechanism from Chapter 3’s “compaction is lossy compression” — now visible as code. The agent doesn’t even know it happened; it just sees a summary of prior work and continues.
6.5 Skills: Lazy-Loading Knowledge
Chapter 4 explained skills as on-demand knowledge packs. The implementation uses a two-layer architecture:
Layer 1 — cheap directory in the system prompt (~100 tokens per skill):
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
for skill_file in skills_dir.glob("*/SKILL.md"):
text = skill_file.read_text()
# Parse YAML frontmatter
_, frontmatter, body = text.split("---", 2)
meta = yaml.safe_load(frontmatter)
self.skills[meta["name"]] = {
"description": meta["description"],
"body": body.strip(),
}
def get_directory(self) -> str:
"""Layer 1: names + descriptions for system prompt."""
lines = ["Available skills:"]
for name, skill in self.skills.items():
lines.append(f" - {name}: {skill['description']}")
return "\n".join(lines)
def load_skill(self, name: str) -> str:
"""Layer 2: full body, loaded on demand via tool call."""
skill = self.skills.get(name)
if not skill:
return f"Unknown skill: {name}"
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"Layer 2 — full content loaded via a load_skill tool when the model decides it needs it:
TOOL_HANDLERS["load_skill"] = lambda **kw: skill_loader.load_skill(kw["name"])The model sees “Available skills: deploy, db-migration, lecture-notes…” in every prompt. When the user says “deploy to staging,” the model calls load_skill("deploy") and gets the full procedure — consuming tokens only when relevant.
6.6 Hooks: Deterministic Control Outside the Loop
Chapter 4’s hooks fire at named moments. The implementation reads from a config file and runs shell commands:
import subprocess, os, json
def load_hooks(path: str = ".hooks.json") -> dict:
if not Path(path).exists():
return {}
return json.loads(Path(path).read_text()).get("hooks", {})
def run_hooks(event: str, context: dict, hooks_config: dict) -> dict:
"""Run all matching hooks for an event. Returns {blocked, messages}."""
result = {"blocked": False, "block_reason": "", "messages": []}
for hook in hooks_config.get(event, []):
matcher = hook.get("matcher")
if matcher and matcher != context.get("tool_name"):
continue # Doesn't match this tool
env = {
**os.environ,
"HOOK_EVENT": event,
"HOOK_TOOL_NAME": context.get("tool_name", ""),
"HOOK_TOOL_INPUT": json.dumps(context.get("tool_input", {})),
}
proc = subprocess.run(hook["command"], shell=True, env=env,
capture_output=True, text=True)
if proc.returncode == 1 and event == "PreToolUse":
result["blocked"] = True
result["block_reason"] = proc.stderr or "Blocked by hook"
elif proc.returncode == 2:
result["messages"].append(proc.stderr)
return resultIn the loop:
# Before tool execution
pre = run_hooks("PreToolUse", {"tool_name": block.name, "tool_input": block.input}, hooks)
if pre["blocked"]:
output = f"BLOCKED: {pre['block_reason']}"
else:
output = handler(**block.input)
run_hooks("PostToolUse", {"tool_name": block.name, "tool_output": output}, hooks)The loop doesn’t know what the hooks do — it just calls them at the right moments. A lint hook, a security check, a logging hook — all configured externally, zero context cost.
6.7 The Full Picture
Every feature from chapters 2–4 is now code:
The Learn Claude Code course continues beyond what we’ve covered here — adding permission systems (s07), memory (s09), task graphs (s12), background execution (s13), agent teams (s15-s16), autonomous agents (s17), and worktree isolation (s18). All 19 sessions layer onto the same 30-line loop without restructuring it. The next section makes that claim concrete: it reassembles every mechanism into one agent, and the core is still the loop you’ve already read.
6.8 Putting It All Together: One Loop, Every Mechanism
A complete agent — the kind that runs for hours, coordinates with peers, and survives a flaky network — is the payoff for everything above. It is a few thousand lines, but it contains no new kind of idea: it puts dispatch, permission, hooks, todos, subagents, skills, compaction, memory, a task graph, error recovery, background tasks, cron, teams, and MCP back into the same loop. This section builds the load-bearing additions the earlier sections promised but didn’t show. The snippets are trimmed to their teaching core — production code adds more edge-case handling, but no new structure.
6.8.1 A task graph the agent can plan against
Once an agent does multi-step work, “what’s left, and what’s blocked?” needs a durable home outside the context window. A task is a small record on disk; dependencies are just a list of other task ids that must finish first:
from dataclasses import dataclass, asdict
import json
from pathlib import Path
TASKS_DIR = Path(".tasks"); TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
status: str # pending | in_progress | completed
owner: str | None
blockedBy: list[str] # ids that must be completed first
def _path(tid): return TASKS_DIR / f"{tid}.json"
def load_task(tid): return Task(**json.loads(_path(tid).read_text()))
def save_task(t): _path(t.id).write_text(json.dumps(asdict(t), indent=2))
def can_start(tid: str) -> bool:
"""Runnable only when every blocker exists and is completed."""
return all(_path(d).exists() and load_task(d).status == "completed"
for d in load_task(tid).blockedBy)
def claim_task(tid: str, owner: str = "agent") -> str:
task = load_task(tid)
if task.status != "pending": return f"{tid} is {task.status}"
if not can_start(tid): return f"{tid} is still blocked"
task.owner, task.status = owner, "in_progress"
save_task(task)
return f"Claimed {tid} ({task.subject})"
def complete_task(tid: str) -> str:
task = load_task(tid); task.status = "completed"; save_task(task)
unblocked = [t.subject for p in TASKS_DIR.glob("task_*.json")
if (t := load_task(p.stem)).status == "pending" and can_start(t.id)]
return f"Completed {tid}" + (f" — unblocked: {unblocked}" if unblocked else "")The graph is just files. can_start is a pure dependency check, and complete_task reports which tasks its completion unblocked — which is exactly the signal an autonomous agent (or a teammate) needs to pick up the next available work without being told.
6.8.2 Worktrees: real isolation for parallel work
When more than one agent edits the same repo, a shared working tree is a data-loss hazard — one agent’s checkout yanks the files out from under another. Git worktrees give each branch its own directory. The agent gets a tool that wraps git worktree add, plus strict name validation, because the name becomes a filesystem path:
import re, subprocess
WORKTREES_DIR = Path(".worktrees"); WORKTREES_DIR.mkdir(exist_ok=True)
VALID_NAME = re.compile(r'^[A-Za-z0-9._-]{1,64}$')
def create_worktree(name: str, task_id: str = "") -> str:
if not VALID_NAME.match(name or ""):
return f"Error: invalid worktree name '{name}'" # tool-layer safety boundary
path = WORKTREES_DIR / name
if path.exists():
return f"Worktree '{name}' already exists"
r = subprocess.run(["git", "worktree", "add", str(path), "-b", f"wt/{name}", "HEAD"],
capture_output=True, text=True)
if r.returncode != 0:
return f"Git error: {r.stderr.strip()}"
if task_id: # bind the task to its sandbox
t = load_task(task_id); t.worktree = name; save_task(t)
return f"Worktree '{name}' created at {path}"Note where the validation lives: in the tool, before git ever sees the name — not relying on git to reject something afterward. Removal is the mirror image, and deliberately refuses to delete a worktree with uncommitted changes unless forced (discard_changes=true) or explicitly kept for review. The agent now has a safe place to do parallel work; the task graph above records which sandbox each task owns.
6.8.3 Subagents, with the safety boundary wired in
The subagent from Section 6.3 returns in the full system, but now it runs tool calls through the same permission and hook layer as the parent, and caps its own iterations so a confused child can’t spin forever:
def spawn_subagent(description: str) -> str:
messages = [{"role": "user", "content": description}]
for _ in range(30): # bounded — no runaway child
response = client.messages.create(
model=MODEL, system=SUB_SYSTEM, # "...do not spawn more agents."
messages=messages, tools=SUB_TOOLS, max_tokens=8000)
messages.append({"role": "assistant", "content": response.content})
if not has_tool_use(response.content):
break # return only the final summary
results = []
for block in response.content:
if block.type != "tool_use":
continue
blocked = trigger_hooks("PreToolUse", block) # same permission layer
output = str(blocked) if blocked else \
call_tool_handler(SUB_HANDLERS.get(block.name), block.input, block.name)
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
return extract_text(messages[-1]["content"])Same fresh-context, short-summary-out pattern as before — but the subagent has no task tool (no recursion), a bounded loop, and its tool calls are gated by the very same PreToolUse hooks the parent uses. Isolation of context and enforcement of policy are independent: the child gets its own message array, but not its own rules.
6.8.4 Teams and a plan-approval protocol
Multiple long-lived agents need two things the subagent pattern lacks: a way to message each other, and a way to gate an action on another agent’s approval. The message bus is one append-only inbox file per agent:
class MessageBus:
def send(self, frm, to, content, msg_type="message", metadata=None):
inbox = MAILBOX_DIR / f"{to}.jsonl"
with open(inbox, "a") as f:
f.write(json.dumps({"from": frm, "to": to, "content": content,
"type": msg_type, "metadata": metadata or {}}) + "\n")
def read_inbox(self, agent):
inbox = MAILBOX_DIR / f"{agent}.jsonl"
if not inbox.exists(): return []
msgs = [json.loads(l) for l in inbox.read_text().splitlines() if l.strip()]
inbox.unlink() # read-once: drain the mailbox
return msgsA protocol layers request/response semantics on top of raw messages. Each request gets an id; a reply is only valid if it references a pending request of the matching type — so one approval can’t accidentally satisfy a different pending ask:
@dataclass
class ProtocolState:
request_id: str; type: str; sender: str; target: str; status: str; payload: str
pending_requests: dict[str, ProtocolState] = {}
def request_plan(teammate: str, task: str) -> str:
rid = new_request_id()
pending_requests[rid] = ProtocolState(rid, "plan_approval", "lead", teammate, "pending", task)
BUS.send("lead", teammate, f"Submit a plan for: {task}", "plan_request", {"request_id": rid})
return f"Asked {teammate} to plan {task} (request {rid})"
def match_response(response_type: str, request_id: str, approve: bool):
state = pending_requests.get(request_id)
if not state or not response_type.startswith(state.type):
return # wrong request or wrong type — ignore
state.status = "approved" if approve else "rejected"This is the spine of the “orchestrator + teammates” pattern from Chapter 12: the lead spawns teammates, asks for a plan, and waits for explicit approval before any teammate touches files — all expressed as messages on disk, with no special infrastructure.
6.8.5 Surviving the real world: error recovery
A loop that runs for hours will hit rate limits (429), overloaded servers (529), and prompts that grow past the context window. Production agents wrap every model call in a retry policy with exponential backoff and jitter, and escalate when a class of error repeats:
def retry_delay(attempt: int) -> float:
base = min(BASE_DELAY_MS * (2 ** attempt), 32_000) / 1000
return base + random.uniform(0, base * 0.25) # backoff + jitter
def with_retry(fn, state: RecoveryState):
for attempt in range(MAX_RETRIES):
try:
result = fn(); state.consecutive_529 = 0; return result
except Exception as e:
name, msg = type(e).__name__.lower(), str(e).lower()
if "ratelimit" in name or "429" in msg: # transient → wait
time.sleep(retry_delay(attempt)); continue
if "overloaded" in name or "529" in msg: # repeated → fail over
state.consecutive_529 += 1
if state.consecutive_529 >= MAX_CONSECUTIVE_529 and FALLBACK_MODEL:
state.current_model = FALLBACK_MODEL # smaller/cheaper model
time.sleep(retry_delay(attempt)); continue
raise # unknown → don't mask it
raise RuntimeError("Max retries exceeded")The companion case is the prompt that’s simply too long. Rather than crash, the loop catches that specific error once, runs a reactive compaction to summarize old turns, and retries — the same lossy-compression idea from Section 6.4, now triggered by failure instead of a token threshold. Note the discipline in the except: transient errors are retried, a recurring class triggers a model fallback, and anything unrecognized is re-raised rather than silently swallowed.
6.8.6 The loop that ties it together
Here is the final agent_loop, stripped to its skeleton. Read it against the 30-line loop from Section 6.1 — it is the same shape, with each new mechanism slotted into the cycle at the point where it belongs:
def agent_loop(messages: list, context: dict):
tools, handlers = assemble_tool_pool() # builtin tools + every MCP server's tools
state = RecoveryState()
while True:
for job in consume_cron_queue(): # scheduled work shows up as a user turn
messages.append({"role": "user", "content": f"[Scheduled] {job.prompt}"})
inject_background_notifications(messages) # finished background tasks report back
prepare_context(messages) # compaction / tool-result budget
try:
response = call_llm(messages, context, tools, state) # wrapped in with_retry
except Exception as e:
if is_prompt_too_long_error(e): # reactive compaction, then retry
messages[:] = reactive_compact(messages); continue
raise
messages.append({"role": "assistant", "content": response.content})
if not has_tool_use(response.content):
trigger_hooks("Stop", messages) # the "are we really done?" gate
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
if blocked := trigger_hooks("PreToolUse", block): # permission + policy
output = str(blocked)
elif should_run_background(block.name, block.input): # slow tool → detach
output = start_background_task(block, handlers)
else:
output = call_tool_handler(handlers.get(block.name), block.input, block.name)
trigger_hooks("PostToolUse", block, output)
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": build_user_content(results)}) # write-backEverything is here, and nothing has displaced the core. Cron and background notifications enter as ordinary user turns. Context preparation and recovery wrap the model call. Permission, background routing, and hooks sit between “the model asked for a tool” and “the tool ran.” And the last line is still the write-back — the single most important line from Section 6.1. The 30-line loop didn’t get replaced; it got surrounded.
6.8.7 …and the rest
For brevity this section skipped several subsystems that follow the identical “layer on, don’t restructure” rule. Each is a handful of functions:
| Mechanism | What it adds | Where it plugs in |
|---|---|---|
| Background tasks | slow tools (build, pytest, install) return a placeholder immediately; real output arrives later as a task_notification |
the should_run_background branch in the loop |
| Cron | durable cron-expression jobs persisted to disk; a scheduler thread fires them | consume_cron_queue at the top of the loop |
| MCP | external tool servers merged into one prefixed (mcp__server__tool) pool |
assemble_tool_pool builds tools + handlers |
| Memory | transcripts and durable task/job state on disk survive restarts | written throughout; re-read on resume |
None of it is surprising once you’ve internalized the loop. Each subsystem is a small, self-contained module that the loop calls at a well-defined moment — which is the whole thesis of this chapter, demonstrated at scale.
6.9 Key Takeaways
- A complete autonomous agent is a
while Trueloop that sends messages, executes tool calls, and appends results — 30 lines of Python - The write-back step (appending tool results as a user message) is the single most important line — it’s what makes the loop self-sustaining
- Tools are added via a dispatch map (
TOOL_HANDLERSdict), never by modifying the loop - Subagents are the same loop with
messages = []— fresh context, return only the final answer - Compaction summarizes old messages when tokens exceed a threshold — the agent doesn’t know it happened
- Skills load full knowledge on-demand via a tool call, keeping the system prompt lean
- Hooks run shell commands at named moments (PreToolUse, PostToolUse) with zero context cost
- A complete agent reassembles every mechanism — task graph, worktrees, teams + approval protocol, error recovery, background, cron, MCP — into one program, and the core is still the same loop
- Durable state (tasks, jobs, transcripts) lives on disk as files, so it survives restarts and is shared between agents without special infrastructure
- Isolation of context (subagents, worktrees) and enforcement of policy (permission, hooks) are independent layers — a fresh context still obeys the same rules
- Every feature layers on without changing the core loop — the 30-line foundation carries the entire system