4. The Agentic Engine
The agentic engine lives in rysh-shared/agentic and rysh-shared/provider. It is the single implementation of "an LLM that uses tools" reused by the CLI (local panes, agents, humanoids) and the server (browser panes, chatbots). This chapter explains the per-pane manager, the orchestrator loop, the approval flow, and the provider abstraction.
4.1 Two actors: manager and orchestrator
| Actor | Lifetime | Responsibility |
|---|---|---|
LLMPromptExecutionActor |
per pane (long-lived) | owns the conversation, system prompt, auto-approval set, memory state, and KV persistence; spawns one orchestrator per prompt |
OrchestratorActor |
per prompt (ephemeral) | runs the autonomous tool-use loop to completion, then reports back |
graph TD
Inbox["pane.{id}.llm_prompt_execution.inbox"] --> M["LLMPromptExecutionActor"]
M -->|"on MsgAgenticPrompt"| Spawn["spawn OrchestratorActor orch-{id}"]
Spawn --> O["OrchestratorActor.runLoop (goroutine)"]
O -->|MsgOrchestratorDone| M
M -->|persist (throttled 2s)| KV["JetStream KV
{paneID}.llm_conversation"]
LLMPromptExecutionActor (llm_prompt_execution_actor.go)
State (key fields, llm_prompt_execution_actor.go:64): conversation []provider.ConversationTurn, systemPrompt, paneName, autoApproved map[string]bool, activeOrch *actor.PID + activeOrchID + cancelFn context.CancelFunc, memoryState *msg.MemoryState, kvStore nats.KeyValue + lastKVWrite time.Time, the inbox/output/status subjects, chatOutputPaneID, approvalPaneGroups []string, and contextTokenLimit (settable via SetContextTokenLimit, default DefaultContextTokenLimit = 160000, threaded into each orchestrator to drive compaction). maxIterations defaults to 20 when ≤0.
On *actor.Started it builds a NATSBridge and subscribes its inbox plus pane.{id}.approval.response (approval responses are forwarded to the active orchestrator).
Message handling (Receive):
MsgAgenticPrompt→handlePrompt: cancel/stop any running orchestrator, append the user turn, trim to the last 50 turns, build the effective system prompt (base + FormatMemoryForPrompt(memory)), spawn a freshOrchestratorActorwith a cancellable context.MsgAgenticCancel→ cancel the active orchestrator.MsgOrchestratorDone→ replace the local conversation with the orchestrator's full version (preserving tool-result context like adraft_idacross runs), trim, and persist to KV (throttled to 2s; immediate flush on shutdown).MsgApprovalResponse→ forward to the active orchestrator.MsgSetChatOutputPane,MsgPaneSetApprovalPaneGroups,MsgMemoryStateUpdate,MsgRestoreConversation,MsgGetConversationHistory(clampsLastNto default 20 / max 100).
handlePrompt (:200) and handleOrchestratorDone (:278) verbatim-ish:
// handlePrompt: append user turn, keep last 50, build sys+memory, spawn fresh orchestrator
a.conversation = append(a.conversation, provider.ConversationTurn{Role: "user", Content: m.Prompt})
if len(a.conversation) > 50 { a.conversation = a.conversation[len(a.conversation)-50:] }
effectivePrompt := a.buildEffectiveSystemPrompt() // base + FormatMemoryForPrompt(memory)
orchCtx, cancelFn := context.WithCancel(context.Background())
pid, _ := ctx.SpawnNamed(props, "orch-"+orchID[:8])
// handleOrchestratorDone: replace local convo with the orchestrator's full version
if len(m.Conversation) > 0 {
a.conversation = m.Conversation // preserves tool-result context (e.g. draft_id)
} else if m.Summary != "" {
a.conversation = append(a.conversation, provider.ConversationTurn{Role: "assistant", Content: m.Summary})
}
if len(a.conversation) > 50 { a.conversation = a.conversation[len(a.conversation)-50:] }
a.persistConversation()
KV persistence is throttled to 2s (persistConversation, :357); a flushConversation on *actor.Stopping bypasses the throttle:
if a.kvStore == nil || time.Since(a.lastKVWrite) < 2*time.Second { return }
data, _ := json.Marshal(a.conversation)
_, _ = a.kvStore.Put(a.paneID+".llm_conversation", data) // key: <paneID>.llm_conversation
a.lastKVWrite = time.Now()
Output routing precedence (emitOutput, :396): it always emits a MsgAgenticOutput to the output subject first, then routes the unified ConversationMessage by precedence: pipeline output subject → chat output pane (as ConvChat, role assistant) → default per-pane ConvAI output.
4.2 The orchestrator loop (orchestrator.go)
The orchestrator is spawned with a cloned tool registry, a copy of the conversation, a copy of the auto-approval map, a cancellable context, a max-iteration cap, a contextTokenLimit, and a subAgentDepth (0 at the top level; NewOrchestratorActor's trailing parameter). Optional knobs set on the actor before run: a permission policy (SetPermissionPolicy, §4.3), a max wall-clock duration (SetMaxDuration, below), a retry policy and model-fallback on the provider (§4.4). Phases: planning → executing → waiting_approval → compacting → done | error.
Phase note.
PhaseEvaluating("evaluating") is still defined inorchestrator.gobut is never assigned anywhere — it is dead. The phase actually entered between iterations when context fills up isPhaseCompacting("compacting"), added with the context-window compaction feature (see below).
flowchart TD
Start([runLoop]) --> Check{ctx cancelled?
iteration ≥ max?}
Check -->|yes| ErrStop["emit error / stop"]
Check -->|no| Compact["maybeCompact()
(summarize+drop oldest turns
if input tokens > 75% limit)"]
Compact --> Status["emitStatus(executing)
emitContextStatus (context %)
iteration++ · check maxDuration"]
Status --> Call["stream if provider is a StreamingProvider,
else CompleteWithTools
(provider retries internally per RetryPolicy)"]
Call -->|"error (retries exhausted)"| ErrStop
Call -->|ok| Emit["text: live-emit deltas while streaming,
else emit text blocks; accumulate assistant content
(+ thinking blocks)"]
Emit --> HasTools{tool calls?}
HasTools -->|"no & end_turn"| Done["append assistant turn
→ Done"]
HasTools -->|"no & max_tokens"| Trunc["emit '[truncated, continuing...]'
→ loop"]
HasTools -->|yes| ExecAll["append assistant turn (tool_use)
executeTool for each
append role:tool results"]
ExecAll --> Check
Done --> Defer
Trunc --> Check
ErrStop --> Defer
Defer["defer: Flush NATS →
emit terminal status →
Send(parent, MsgOrchestratorDone{conversation})"]
The struct sets loopThreshold: 3 and the constructor Clone()s the registry and copies the conversation + auto-approval map.
Loop detection (orchestrator.go:138)
detectLoop hashes toolName:params (200-char cap), keeps the last 20 calls, and blocks (returns an error to the LLM as a tool result) if the same call repeats ≥3 times:
hash := toolName + ":" + string(params)
if len(hash) > 200 { hash = hash[:200] }
o.callHistory = append(o.callHistory, loopEntry{toolName, hash})
if len(o.callHistory) > 20 { o.callHistory = o.callHistory[len(o.callHistory)-20:] }
count := 0
for _, e := range o.callHistory { if e.paramsHash == hash { count++ } }
return count >= threshold // threshold = 3
Provider error retry & duration cap
Provider error handling moved into the provider (provider/retry.go, §4.4): CompleteWithTools/CompleteWithToolsStream internally retry transient errors with exponential backoff and optional model fallback. The orchestrator no longer does loop-level retry — the old "retry once after 2s" sleep is gone; on a returned error it just emits the error, sets PhaseError, and stops.
Separately, SetMaxDuration(d) (orchestrator.go:249) imposes a wall-clock cap: when d > 0, each iteration checks elapsed time and aborts with [Reached maximum duration (d)] → PhaseError. 0 means no cap.
Context-window compaction (orchestrator.go:941, :959–:1006)
Every loop iteration begins with o.maybeCompact() (:941). When the last reported input-token count (from the provider's Usage, see §4.4) exceeds 75 % of contextTokenLimit, the orchestrator enters PhaseCompacting and shrinks the conversation in place:
- Defaults (
orchestrator.go:35–43):DefaultContextTokenLimit = 160000,compactionThreshold = 0.75,compactionKeepRecentTurns = 12. safeCompactionCutIndex(:989) finds a cut point that keeps the most recent ~12 turns without splitting an assistanttool_usefrom its matchingtool_result(which would make the next API call invalid).summarizeTurns(:1006) asks the LLM to summarize the dropped prefix into a single synthetic turn, using theDefaultCompactionSummarizePrompttemplate (avarwith a{{transcript}}slot, overridable by rysh-cli — see §6.5); if that call fails it falls back tomechanicalDigest(a deterministic, non-LLM summary). The summary replaces the dropped turns, so the loop continues with a compacted history.
After each model response the orchestrator also calls emitContextStatus, which publishes a status line like [agentic] context N% (X/Y tok) | iter i/max derived from resp.Usage.TotalInputTokens(). Compaction is covered by agentic/compaction_test.go.
Real sub-agents (agentic/sub_agent.go)
The sub_agent tool is intercepted in executeTool (by name SubAgentToolName) before registry dispatch and routed to handleSubAgent (orchestrator.go:1233), which spawns a child OrchestratorActor — a genuine nested agent, not a placeholder.
- Params
SubAgentParams{Task (req), Context, SystemPrompt, AllowedTools}. The child gets its own context window, a focused system prompt (DefaultSubAgentSystemPrompt, avaroverridable by rysh-cli), a filtered tool registry (BuildSubAgentRegistry: emptyAllowedTools⇒ clone the full parent set; else only the named tools), and a freshReadTracker. - Limits (
sub_agent.go:27–37):MaxSubAgentDepth = 2(spawning deeper returns an error),DefaultSubAgentMaxIterations = 25,DefaultSubAgentTimeout = 15min. - Mechanism: the run-loop goroutine sends an internal
subAgentSpawnReqself-message;handleSubAgentSpawn(:1324) doesctx.SpawnNamed("sub-orch-"+id[:8])insideReceive(safe child creation), and the child'sMsgOrchestratorDoneroutes back to the parent (Receivecase at:269–283), matched byOrchestratorID. The child inherits the parent's context (cancel propagates). The child's transcript is dropped — only a summary (FormatSubAgentSummary) returns; the child'sFilesChangedmerge up. No NATS/wire message types are involved (the legacyMsgSpawnSubOrchestrator/MsgSubOrchestratorResultremain defined but unused).
Why defer flushes NATS
The terminal MsgOrchestratorDone and final status must arrive after all streamed output. The deferred block runs pub.Flush() → emit terminal status → Send(parent, MsgOrchestratorDone{... Conversation: o.conversation}), guaranteeing output ordering on the async NATS connection before the parent merges the conversation.
4.3 Tool invocation & the approval flow
executeTool is the gate between the LLM and the host:
flowchart TD
TC["tool call"] --> SubA{name == sub_agent?}
SubA -->|yes| HSA["handleSubAgent
(spawn child orchestrator)"]
SubA -->|no| Loop{detectLoop?}
Loop -->|repeated| Block["return error to LLM"]
Loop -->|ok| Perm{"permission policy
Decide()"}
Perm -->|PermDeny| Deny["return policy error to LLM"]
Perm -->|"PermAllow"| KnownA[skip approval prompt]
Perm -->|"PermAsk / none"| Known{tool exists?}
KnownA --> Known
Known -->|no| Unknown["return 'unknown tool'"]
Known -->|yes| Stale{"file_edit/multi_edit
& stale on disk?"}
Stale -->|stale| Reedit["return 'changed since read,
re-read first' error"]
Stale -->|ok| EmitTC["emit tool_call"]
EmitTC --> Req{RequiresApproval(input)?}
Req -->|"no / PermAllow"| Exec["executor.Execute"]
Req -->|yes| Auto{auto-approved key?}
Auto -->|yes| Exec
Auto -->|no| Kind{file_edit / file_write?}
Kind -->|yes| Preview["executeWithPreview:
execute → diff → ask → apply/discard"]
Kind -->|"no (destructive)"| Pre["executeWithPreApproval:
ask BEFORE executing"]
Preview --> Wait["waitForApproval
(approval.response, 5min)"]
Pre --> Wait
Wait -->|yes / yes_always| Exec
Wait -->|no / no_with_explanation| Reject["return rejection text as tool result
(so the LLM sees it)"]
Wait -->|"timeout / no response"| Skip
Exec --> Shape["shapeToolOutput
(head+tail truncation)"]
Shape --> Track["updateReadTrackerOnSuccess"]
Track --> Result["append role:tool result"]
Three gates run before the approval flow (in order): the sub_agent interception (§4.2), the permission policy (Decide), and the stale-edit check; every successful tool output is then passed through shapeToolOutput. The classic approval paths remain:
- Preview-then-confirm (
file_edit/file_write): execute first to compute the unified diff, emit the diff, sendMsgApprovalRequest{Type: diff}, wait, then apply or discard. - Ask-before-execute (other destructive tools, e.g. dangerous
bash): sendMsgApprovalRequest{Type: destructive_action}before running. - Auto-approval (
buildApprovalKey,:1088):yes_alwaysrecords a key inautoApprovedso future matching calls skip the prompt:
// file_edit / file_write / file_read → "<tool>:<file_path>"
// bash → "bash:<first word of command>"
// everything else → "<tool>"
waitForApproval(:771) subscribes topane.{id}.approval.response, matches theRequestID, and times out at 5 minutes (defaulting to No); it also returns No on context cancellation:
select {
case resp := <-ch: return resp
case <-time.After(5 * time.Minute): return &MsgApprovalResponse{Decision: DecisionNo, Reason: "timeout"}
case <-o.ctx.Done(): return &MsgApprovalResponse{Decision: DecisionNo, Reason: "cancelled"}
}
- Routing (
publishApprovalRequest,:1189): if no approval pane-groups are configured, publish to the legacypane.{id}.approval.request(TUI global approval mode). Otherwise sendMsgCreateApprovalPaneto eachpane-group.{id}.inbox(ephemeral approval panes), torn down viaMsgDestroyApprovalPaneafter resolution.
ApprovalManager (approval.go) is the TUI-side helper: PublishApproval(requestID, decision, reason) and SubscribeApprovalRequests(cb).
Permission policy (agentic/permissions.go)
A declarative policy that runs before the approval flow, letting a host pre-authorize or block tool calls without a human prompt. PermissionPolicy{Rules} where each PermissionRule{Tool, Match, Decision} yields PermAllow, PermDeny, or PermAsk (the zero value / default). Decide(toolName, params) returns the first matching rule (Tool=="" matches any tool, Match=="" any args; an empty policy ⇒ PermAsk). The match target is per-tool (bash→command, web_fetch→url, web_search/grep→pattern, default→file_path) compared with a glob engine (* within a path segment, ** across /, ? single char, prefix* starts-with). Integration: PermDeny → immediate structured error to the model; PermAllow → skip the approval prompt (deliberately not persisted into autoApproved, so policy decisions don't leak across runs); PermAsk → the normal approval flow. Installed via SetPermissionPolicy (orchestrator.go:243); nil disables.
Stale-edit detection (agentic/read_tracker.go)
A per-orchestrator ReadTracker (fresh per sub-agent) prevents the model from editing a file that changed on disk since it read it. Record after a successful file_read; Refresh after a successful edit/write; staleCheck before a file_edit/multi_edit (only these two are gated — file_write has replace semantics and is not). The check fast-passes on matching mtime+size, otherwise recomputes a SHA-256 — a mismatch (or a deleted file) returns a "changed on disk since you read it… re-read with file_read" error. updateReadTrackerOnSuccess (orchestrator.go:604) drives the bookkeeping, keyed on the file_path param and skipped when the tool errored. Not covered: apply_patch and files mutated inside bash.
Tool-output shaping (agentic/output_shape.go)
Every executed tool's output passes through shapeToolOutput before returning to the model. Tunable package vars: MaxToolOutputBytes = 50000, MaxToolOutputLines = 600, ToolOutputHeadLines = 200, ToolOutputTailLines = 200. It always annotates Metadata["byte_count"]/["line_count"]; when either threshold is exceeded it keeps the head + tail with a ... [N lines / M bytes omitted] ... marker (truncation_mode = head_tail), falling back to byte-halving for a single pathologically-long line (truncation_mode = bytes). Tool errors are left intact.
4.4 The provider abstraction (provider/agentic_provider.go)
type Provider interface {
Name() string
Complete(ctx, prompt) (string, error)
}
type AgenticProvider interface { // extends Provider
CompleteWithTools(ctx, []ConversationTurn, []ToolSpec, systemPrompt) (*AgenticResponse, error)
}
type StreamingProvider interface { // OPTIONAL capability; orchestrator type-asserts for it
CompleteWithToolsStream(ctx, []ConversationTurn, []ToolSpec, systemPrompt, StreamCallback) (*AgenticResponse, error)
}
Core types:
| Type | Fields |
|---|---|
ConversationTurn |
Role (user/assistant/tool), Content, ToolCalls []ToolCallRequest, ToolCallID, IsError |
ToolSpec |
Name, Description, Parameters (json.RawMessage — a JSON Schema, not a string) |
ToolCallRequest |
ID, Name, Input |
AgenticResponse |
TextBlocks, ToolCalls, ThinkingBlocks []ThinkingBlock, StopReason (end_turn / tool_use / max_tokens / stop_sequence), Usage |
Usage |
InputTokens, OutputTokens, CacheCreationInputTokens, CacheReadInputTokens; TotalInputTokens() sums the input + both cache buckets |
ThinkingBlock |
Text, Signature (opaque; must be echoed verbatim on follow-up turns) |
The ConversationTurn.Role is "user", "assistant", or "tool"; StopReason ∈ end_turn / tool_use / max_tokens / stop_sequence. The Usage field is what feeds compaction (§4.2) and the context-% status line.
ClaudeAgenticProvider
Implements the Anthropic Messages API (POST {apiURL}/v1/messages), supporting both the non-streaming and streaming paths plus retry/backoff, model fallback, and extended thinking. Defaults (consts at agentic_provider.go:15–18, applied in NewClaudeAgenticProvider at :139): anthropicVersion = "2023-06-01", base model claude-sonnet-4-20250514, apiURL https://api.anthropic.com. The default maxTokens is now per-model via DefaultMaxTokensForModel when the caller passes ≤0 (model_defaults.go: Sonnet 8192, Opus/Haiku/unknown 4096). The CLI still explicitly overrides to claude-opus-4-5 / maxTokens 8192. The HTTP client has no timeout — cancellation is via ctx only. Headers: Content-Type: application/json, x-api-key, anthropic-version.
Prompt caching (on by default)
The provider enables Anthropic prompt caching by default (cacheEnabled: true, toggle via SetCacheEnabled). Because of this, the request System field is typed interface{} and is emitted as a []systemBlock rather than a plain string: buildSystemBlocks/applyToolCacheBreakpoint/applyTrailingMessageCacheBreakpoint insert cache_control: {type: "ephemeral"} breakpoints on the system prompt, the last tool definition, and the trailing message. The response's usage (including cache_creation_input_tokens / cache_read_input_tokens) is parsed into AgenticResponse.Usage. Caching is covered by provider/caching_test.go.
buildMessagesmaps turns to Claude content blocks. Crucially,toolturns are sent asusermessages containingtool_resultblocks (agentic_provider.go:446):
case "tool":
block := contentBlock{Type: "tool_result", ToolUseID: turn.ToolCallID, Content: turn.Content, IsError: turn.IsError}
messages = append(messages, agenticMessage{Role: "user", Content: []contentBlock{block}})
Assistant turns carry text + tool_use (+ thinking) blocks (and an assistant turn with none is dropped).
buildToolDefsmapsToolSpec→{name, description, input_schema}.parseResponseextractstext,tool_use, andthinkingblocks plus the stop reason; a non-200 formats"claude-agentic: %s (HTTP %d): %s".
Streaming (provider/streaming.go)
ClaudeAgenticProvider also implements the optional StreamingProvider. The orchestrator opts in by type assertion and falls back to CompleteWithTools when a provider doesn't implement it — so a non-streaming provider behaves exactly as before. Streaming is enabled per request via Stream: true (omitempty, so non-streaming request bytes are unchanged) with Accept: text/event-stream. parseClaudeStream consumes SSE StreamEvents (message_start, content_block_start, text_delta, tool_use_delta, thinking_delta, content_block_stop, message_delta, message_stop; unknown events skipped, ping ignored; 4 MB max line) and assembles an AgenticResponse identical in shape to the non-streaming one. The orchestrator live-emits only text_delta chunks (tool input must be complete valid JSON before dispatch); a stream that ends without message_stop returns a partial response plus a transient error that the retry layer handles.
Retry / backoff / model fallback (provider/retry.go)
Transient errors are retried inside the provider (retryWithPolicy). DefaultRetryPolicy: 5 attempts, exponential backoff base 1s → cap 30s, ±20% jitter, honours Retry-After. Retried: HTTP 408/429/529 and all 5xx, plus transport timeouts / connection-reset / EOF / refused. Not retried (fail fast): other 4xx and context.Canceled/DeadlineExceeded. When the primary's budget is exhausted, an optional model fallback (SetModelFallback) makes one final attempt against a secondary model. Configure via SetRetryPolicy/SetModelFallback; MaxAttempts:1 disables retries. (This replaces the orchestrator's old "retry once after 2s".)
Extended thinking (provider/thinking.go)
SetThinking(cfg) enables Anthropic extended thinking: ThinkingConfig{Type:"enabled", BudgetTokens} is sent as the request thinking field (DefaultThinkingConfig = {enabled, 8192}; nil disables). Responses carry ThinkingBlock{Text, Signature} — the opaque Signature must be echoed verbatim on subsequent turns. Streaming surfaces thinking_delta + a separately-arriving signature.
StaticAgenticProvider returns a fixed response when no API key is configured (used for offline/dev and for chatbot panes without a key).
4.5 Tools framework (tools/registry.go)
type ToolExecutor interface {
Execute(ctx, params json.RawMessage) (*ToolOutput, error)
Spec() ToolSpec
RequiresApproval(params json.RawMessage) bool
}
ToolOutput{Content, Error, ExitCode, Metadata map[string]string}—Metadata["file_path"]drives diff approvals, etc.ToolConfig{WorkDir, BashTimeout, BlockedCommands, MaxFileSize, BlockedPaths}.ToolRegistry— an RWMutex-guardedmap[string]ToolExecutor.Register,Unregister(name) bool(retracts a dynamically-added tool — e.g. when an MCP server or Forge integration is removed),Get,AllSpecs()(sorted, becomes thetoolsarray sent to Claude),Names(), andClone()(shallow — shared executor pointers, independent map):
func (r *ToolRegistry) Clone() *ToolRegistry {
r.mu.RLock(); defer r.mu.RUnlock()
clone := &ToolRegistry{executors: make(map[string]ToolExecutor, len(r.executors))}
for k, v := range r.executors { clone.executors[k] = v } // executor refs shared, map independent
return clone
}
Clone() is the mechanism for per-pane tool injection: each pane clones the base registry and adds NATS-dependent tools.
RequiresApproval is dynamic per call — e.g. bash only requires approval for dangerous patterns; browser_action only for execute_js. The ToolSpec.RequiresApproval bool is just a hint.
Host-agnostic tools shipped in rysh-shared/tools
| Tool | Approval | Purpose |
|---|---|---|
list_tools |
no | list all registered tools |
web_search |
no | Brave Search API (X-Subscription-Token) |
web_fetch |
no | fetch a URL; text/html/raw extraction |
browser_action |
only execute_js |
control the user's Chrome via the extension over NATS (23 actions) |
page_context |
no | read the current tab's {url,title,selected_text,body_text} (single-use LoadAndDelete) |
The full filesystem/shell/git/email toolbox lives in rysh-cli (§6.4). rysh-shared only ships the host-agnostic ones, so both CLI and server can compose registries from a common base.
4.6 The NATS↔actor bridge (bridge/bridge.go)
NATSBridge is what lets an actor "subscribe" to NATS as if it were just receiving mailbox messages.
sequenceDiagram
participant A as Actor (Started)
participant B as NATSBridge
participant N as NATS
A->>B: bridge.New(...); AddSubject(s)
B->>N: Subscribe(s)
N-->>B: raw message
B->>B: unmarshal NATSEnvelope → Decode(tag, payload)
alt ReplyTo set
B->>A: RequestEnvelope (inner + .Reply)
else fire-and-forget
B->>A: decoded inner message
end
A->>A: Stopping → bridge.Stop() (unsubscribe all)
The decode-and-dispatch core (bridge.go:59) — a request is detected from either the envelope r field or the raw NATS m.Reply, so both the custom Request path and a plain nats.Conn.Request work:
decoded, err := b.codecs.Decode(env.TypeTag, env.Payload)
replyTo := env.ReplyTo
if replyTo == "" { replyTo = m.Reply }
if replyTo != "" {
b.system.Root.Send(b.pid, &msg.RequestEnvelope{Inner: decoded, ReplyTo: replyTo, NC: b.nc, Codecs: b.codecs})
} else {
b.system.Root.Send(b.pid, decoded) // fire-and-forget: inner sent directly
}
Each actor creates one bridge in its *actor.Started handler and calls AddSubject() for each subject it owns; Stop() unsubscribes all on *actor.Stopping. SetPaneID makes log lines read pane:{8char}. This uniform delivery is why an actor's Receive can treat NATS traffic and in-process sends identically.
4.7 How CLI and server consume the engine
- Server (
rysh-server/internal/agentic/service.go): builds aDefaultCodecRegistry+NewNATSPublisher, picksClaudeAgenticProviderorStaticAgenticProvider, builds a baseToolRegistry(web_search/web_fetch), and per-pane clones it addingpage_context,browser_action,list_tools(modePaneToolsBrowser) — or zero tools (PaneToolsNone) for untrusted chatbot panes. SpawnsNewLLMPromptExecutionActorper pane. - CLI (
rysh-cli/internal/agentic): re-exports the shared actors via type aliases so its own packages keep their import paths. Registers the large local toolbox into the base registry, then per-pane clones and adds NATS-dependent tools (ask_user,pane_inspect,pane_send,context_store,todo, email tools for humanoids) before spawningNewLLMPromptExecutionActor.
This is the payoff of the shared design: one agentic loop, three trust profiles, composed by which tools the registry contains.