3. Messaging & Protocol
This chapter is the protocol reference: the envelope, the codec, the subject scheme, the unified conversation model, the memory model, and the full message catalog. Everything here lives in rysh-shared/msg (with type-aliased re-exports in rysh-cli/internal/msg).
3.1 The envelope and codec
NATSEnvelope (msg/codec.go:52)
Every NATS message is a JSON envelope. The exact definition:
type NATSEnvelope struct {
TypeTag string `json:"t"` // message type discriminator string constant
ReplyTo string `json:"r"` // NATS reply subject; empty for fire-and-forget
Payload json.RawMessage `json:"p"` // JSON inner message, embedded verbatim
}
Using json.RawMessage for the payload means the inner JSON is embedded as-is, avoiding the ~33% base64 inflation a []byte field would cause. (The Chrome extension's TS port uses a base64-encoded JSON payload, and the browser_action tool decodes a local envelope with a []byte payload — see the gotcha in §4.5 and §8.)
CodecRegistry (msg/codec.go)
A bidirectional map between type tags and Go types:
| Method | Purpose |
|---|---|
Register(tag, typeName, decodeFn) |
register a message type and its decoder |
Decode(tag, payload) |
turn (tag, json) into a typed Go value |
TagOf(msg) |
get the tag for an outgoing Go value |
Decoders are built with a generic jsonDecoder[T]() (codec.go:145) — note an empty payload decodes to a zero-valued *T with no error:
func jsonDecoder[T any]() func([]byte) (interface{}, error) {
return func(b []byte) (interface{}, error) {
v := new(T)
if len(b) > 0 {
if err := json.Unmarshal(b, v); err != nil { return nil, err }
}
return v, nil // returns *T
}
}
TagOf keys off fmt.Sprintf("%T", msg) and returns "" for an unregistered type (the publisher then errors "publisher: unregistered message type %T"). Decode returns "codec: unknown TypeTag %q" on a miss.
DefaultCodecRegistry() registers, in order (codec.go:163-194): the 8 output tags (MsgPaneOutputAppend, MsgPaneShellOutputAppend, MsgPaneAIOutputAppend, MsgPaneChatOutputAppend, MsgPaneRyshOutputAppend, MsgPaneExternalOutputAppend, MsgPaneStatusUpdate, MsgPipelineOutputAppend); the 6 history tags; the 2 unified tags (MsgConversationAppend, MsgConversationHistoryAppend); then RegisterAgenticCodecs, RegisterBrowserCodecs, RegisterChatbotCodecs, registerMemoryCodecs. (Only DefaultCodecRegistry lives in codec.go; RegisterAgenticCodecs is defined in the separate file msg/agentic_codec.go.) The CLI's DefaultCodecRegistry starts from the shared one and adds ~200 CLI-specific tags (workspace/tab/lane/pane-group routing, snapshots, sharing, agents, humanoids, attention, raw terminal/relay).
RequestEnvelope (codec.go:64)
When a message arrives with ReplyTo set (or NATS's built-in reply subject), the bridge wraps the decoded inner message in a RequestEnvelope whose .Reply(resp) serializes the response and publishes it to the reply subject:
type RequestEnvelope struct {
Inner interface{}
ReplyTo string
NC *nats.Conn
Codecs *CodecRegistry
}
func (r *RequestEnvelope) Reply(responseMsg interface{}) error {
tag := r.Codecs.TagOf(responseMsg)
if tag == "" { return fmt.Errorf("reply: unknown message type %T", responseMsg) }
payload, _ := json.Marshal(responseMsg)
env := NATSEnvelope{TypeTag: tag, Payload: payload} // ReplyTo deliberately empty in a reply
data, _ := json.Marshal(env)
return r.NC.Publish(r.ReplyTo, data)
}
This is how snapshot and CLI request/reply work.
graph LR
Msg["typed Go msg"] -->|TagOf| Tag
Tag --> Env["NATSEnvelope{t,r,p=JSON}"]
Env -->|publish| Subj["NATS subject"]
Subj -->|NATSBridge| Dec["CodecRegistry.Decode"]
Dec -->|"r empty"| Mailbox["actor mailbox (inner msg)"]
Dec -->|"r set"| Req["RequestEnvelope (.Reply)"]
Req --> Mailbox
3.2 Subject / topic scheme (msg/topics.go)
Subjects are prefixed by a session prefix (default rysh; SetSessionPrefix changes it per session so multiple daemons can coexist). The builder (topics.go:29) is trivial — no escaping, set-once at startup with no locking:
var sessionPrefix = "rysh"
func SetSessionPrefix(session string) { if session == "" { session = "default" }; sessionPrefix = session }
func T(parts ...string) string { return sessionPrefix + "." + strings.Join(parts, ".") }
So T("pane", paneID, "output", mode) → {session}.pane.{paneID}.output.{mode}.
Per-pane subjects
| Subject | Direction | Purpose |
|---|---|---|
pane.{id}.inbox |
→pane | submit input / pane commands |
pane.{id}.output |
pane→ | merged output (shell + ai only) |
pane.{id}.output.{mode} |
pane→ | per-mode output |
pane.{id}.history[.{mode}] |
pane→ | conversation history append |
pane.{id}.status |
pane→ | status update |
pane.{id}.llm_prompt_execution.inbox |
→agent | prompts, cancel, approval responses |
pane.{id}.llm_prompt_execution.output |
agent→ | streamed agentic output |
pane.{id}.llm_prompt_execution.status |
agent→ | phase/iteration status |
pane.{id}.approval.request |
agent→ | approval request |
pane.{id}.approval.response |
→agent | approval decision |
pane.{id}.browser.request |
agent→ext | browser action request |
pane.{id}.browser.response |
ext→agent | browser action result |
pane.{id}.memory.{mode} / .memory.summarize |
both | conversation memory |
pane.{id}.relay.data |
both | native-speed raw PTY relay |
Other subjects
| Subject | Purpose |
|---|---|
ws.inbox / ws.snapshot |
workspace command inbox / snapshot request-reply |
tab.{id}.inbox |
tab commands |
agent.{name}.inbox / agent.registry.inbox |
headless agents |
humanoid.{name}.inbox |
humanoid agents |
pane-group.{id}.inbox |
ephemeral approval-pane creation |
share.registry.inbox |
share registry |
Server (cross-machine) subjects
The server scopes everything workspace-tenant by UUID:
ws.{workspaceID}.share.{shareID}.output share output stream
ws.{workspaceID}.share.{shareID}.output.layout periodic layout document (tab/lane/group shares)
ws.{workspaceID}.share.{shareID}.command inbound command (control mode)
ws.{workspaceID}.share.{shareID}.command.ack command acknowledgement
ws.{workspaceID}.share.{shareID}.{register|unregister|heartbeat|subscriber} control plane
rysh.pane.{paneID}.... server-side agentic panes (same per-pane scheme)
rysh.channel.{type}.inbound.{workspaceID} inbound external-channel messages
_INBOX.> is the NATS request/reply inbox subtree. The subject-ACL permits exactly ws.{workspaceID}.> and _INBOX.> per authenticated client.
3.3 The unified conversation model (msg/conversation.go)
A single ConversationMessage type replaced ~12 legacy per-mode message types. This is the central data structure of the whole protocol. Verbatim (conversation.go:86):
type ConversationMessage struct {
TurnID string `json:"turn_id"`
TurnType TurnType `json:"turn_type"` // question | answer
ConversationType ConversationType `json:"conversation_type"` // shell|ai|rysh|chat|email|slack|chatbot
InputType InputType `json:"input_type"` // shell|prompt|command|approval|message
MessageSource MessageSource `json:"message_source"` // human|ai|external|agent|subagent|humanoid|system
Content string `json:"content"`
TimestampMs int64 `json:"timestamp_ms"`
Sensitive bool `json:"sensitive,omitempty"`
SubjectToShare bool `json:"subject_to_share,omitempty"`
Role string `json:"role,omitempty"` // visitor, assistant, human, system
Streaming bool `json:"streaming,omitempty"`// partial chunk of an in-progress answer
Origin *MessageOrigin `json:"origin,omitempty"` // (not in proto) tab-mirroring provenance
}
The constant values are lowercase strings (note these differ in case from the proto enum names):
// ConversationType TurnType InputType MessageSource
ConvShell = "shell" ConvEmail = "email" TurnQuestion = "question" InputShell = "shell" SourceHuman = "human"
ConvAI = "ai" ConvSlack = "slack" TurnAnswer = "answer" InputPrompt = "prompt" SourceAI = "ai"
ConvRysh = "rysh" ConvChatbot = "chatbot" InputCommand = "command" SourceExternal = "external"
ConvChat = "chat" InputApproval = "approval" SourceAgent = "agent"
InputMessage = "message" SourceSubagent = "subagent"
SourceHumanoid = "humanoid"
SourceSystem = "system"
MessageOrigin (all fields ,omitempty) carries mirroring provenance so a mirrored tab can attribute a message to its source pane and the mirror pane rendering it:
type MessageOrigin struct {
SourceShareID, SourceTabID, SourcePaneID, SourcePaneAlias string
MirrorTabID, MirrorPaneID string
}
Convenience constructors (conversation.go:140-258) preset the fields: NewShellQuestion/Answer (Source=Human/System, SubjectToShare=true), NewAIQuestion/Answer (the answer carries Streaming), NewRyshQuestion/Answer (no SubjectToShare), NewChatMessage, and NewChannelMessage (sets Sensitive). NewTurnID() returns a UUID; NowMs() returns time.Now().UnixMilli().
Conversation types and their properties
| Mode | Dual-publish? | Has memory? | Backed by |
|---|---|---|---|
shell |
✅ | ✗ | a PTY |
ai |
✅ | ✅ | the agentic loop |
rysh |
✗ | ✗ | ## system commands |
chat |
✗ | ✗ | a chat buffer |
email |
✗ | ✅ | a humanoid channel |
slack |
✗ | ✅ | a humanoid channel |
chatbot |
✗ | ✅ | the website widget / server pane |
- Dual-publish (
IsDualPublish()):shellandaipublish to both the per-mode topic and the mergedoutputtopic. - Memory (
HasMemory()):ai,email,slack,chatbotsummarize older turns into memory entries.
func (ct ConversationType) IsDualPublish() bool { return ct == ConvShell || ct == ConvAI }
func (ct ConversationType) HasMemory() bool {
switch ct {
case ConvAI, ConvEmail, ConvSlack, ConvChatbot: return true
}
return false
}
Convenience constructors: NewShellQuestion/Answer, NewAIQuestion/Answer, NewRyshQuestion/Answer, NewChatMessage, NewChannelMessage.
Wire wrappers
| Type | Subject pattern |
|---|---|
MsgConversationAppend{message} |
pane.{id}.output[.{mode}] |
MsgConversationHistoryAppend{message} |
pane.{id}.history[.{mode}] |
The NATSPublisher.SendConversation helper publishes to the per-mode topic always, plus the merged topic for dual-publish modes.
3.4 The memory model (msg/memory.go)
Long conversations are compressed: older turns are summarized by the LLM into MemoryEntry records, which are prepended to the system prompt on subsequent prompts.
type MemoryEntry struct {
ID string `json:"id"` // UUID
Summary string `json:"summary"` // LLM-generated
TurnIDs []string `json:"turn_ids"`
Mode ConversationType `json:"mode"` // not a bare string — the ConversationType enum
CreatedAtMs int64 `json:"created_at_ms"`
TurnCount int `json:"turn_count"`
}
type MemoryState struct {
PaneID string `json:"pane_id"`
Mode ConversationType `json:"mode"`
Entries []MemoryEntry `json:"entries"` // capped at MaxMemoryEntries = 50 (oldest merged)
TotalSummarizedTurns int `json:"total_summarized_turns"`
}
sequenceDiagram
participant PA as Pane / MemoryManager
participant LLM as Summarizer (Claude)
participant M as LLMPromptExecutionActor
PA->>LLM: MsgMemorySummarize (turns, MemorySummarizationPrompt)
LLM-->>PA: summary (≤500 words: decisions/files/context/TODOs)
PA->>PA: append MemoryEntry (merge oldest if > 50)
PA->>M: MsgMemoryStateUpdate(state)
M->>M: prepend FormatMemoryForPrompt(state) to system prompt
FormatMemoryForPrompt(state) (memory.go:105) renders a ## Conversation Memory section — one ### Memory N (covers K turns, created <ago>) block per entry (where <ago> is just now / Nm ago / Nh ago / Nd ago) — followed by a ## Recent Conversation header. MemorySummarizationPrompt(mode, turns) (memory.go:145) is the condensing prompt:
You are summarizing a conversation for future context. The conversation took
place in <mode> mode. Produce a concise summary (max 500 words) that captures:
1. Key decisions made
2. Files modified or discussed
3. Important context that would help a future prompt understand what happened
4. Any unresolved issues or TODOs
Do NOT include raw code blocks unless they are critical to understanding.
3.5 Message catalog
Agentic control plane (msg/agentic_messages.go)
| Tag / Type | Role |
|---|---|
MsgAgenticPrompt{RequestID, Prompt} |
user prompt → LLMPromptExecutionActor |
MsgAgenticCancel |
cancel the in-flight orchestrator |
MsgOrchestratorDone{OrchestratorID, Success, Summary, FilesChanged, Errors, Conversation} |
orchestrator→parent (Conversation is in-process only) |
MsgToolCall{ToolCallID, ToolName, Parameters} |
tool dispatch |
MsgToolResult{ToolCallID, ToolName, Content, Error, ExitCode, Approved} |
tool result |
MsgAgenticOutput{OrchestratorID, Type, Content, Metadata} |
streamed output; Type ∈ thinking/text/tool_call/tool_result/diff/error |
MsgAgenticStatus{OrchestratorID, Phase, Iteration, MaxIterations, ActiveTools} |
status bar; Phase ∈ planning/executing/waiting_approval/compacting/done/error |
MsgApprovalRequest{RequestID, OrchestratorID, ToolCallID, Type, Description, Diff, Choices} |
request approval |
MsgApprovalResponse{RequestID, Decision, Reason, ChoiceIdx, RespondingPaneID} |
user decision |
MsgSpawnSubOrchestrator / MsgSubOrchestratorResult |
defined but unused — the real sub_agent (§4.2) spawns an in-process child OrchestratorActor and returns via the standard MsgOrchestratorDone, not these wire types |
MsgGetConversationHistory{LastN} / MsgConversationHistoryReply{PaneID, Turns} |
history query (Turns is []ConversationTurnInfo, each carrying []ToolCallInfo — serializable shapes mirroring the provider's turns) |
MsgRestoreConversation{Conversation} |
restore persisted conversation on restart |
MsgCreateApprovalPane / MsgDestroyApprovalPane / MsgPaneSetApprovalPaneGroups |
ephemeral approval-pane flow |
MsgSetChatOutputPane{PaneID} |
reroute agent output to a chat buffer |
MsgMemoryStateUpdate{State} |
inject memory state into the agent |
Approval types: diff, destructive_action, choice, question.
Approval decisions: yes, yes_always, no, no_with_explanation, choice_selected.
The two most important structs verbatim (agentic_messages.go:79,133,144):
type MsgAgenticOutput struct {
OrchestratorID string `json:"orchestrator_id"`
Type string `json:"type"` // thinking|text|tool_call|tool_result|diff|error
Content string `json:"content"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type MsgApprovalRequest struct {
RequestID, OrchestratorID, ToolCallID string
Type ApprovalType `json:"type"` // diff|destructive_action|choice|question
Description string `json:"description"`
Diff *DiffPayload `json:"diff,omitempty"` // DiffPayload{FilePath, UnifiedDiff}
Choices []Choice `json:"choices,omitempty"` // Choice{Label, Description}
}
type MsgApprovalResponse struct {
RequestID string `json:"request_id"`
Decision ApprovalDecision `json:"decision"`
Reason string `json:"reason,omitempty"`
ChoiceIdx int `json:"choice_idx,omitempty"`
RespondingPaneID string `json:"responding_pane_id,omitempty"`
}
Gotcha:
MsgOrchestratorDone.Conversationhas the json tag"-"— the full conversation travels only in-process (proto.actor mailbox), never over NATS. That is how tool-result context (e.g. an emaildraft_id) survives across orchestrator sessions.
Memory (msg/memory.go)
MsgMemoryAppend, MsgMemoryGet/MsgMemoryGetReply, MsgMemorySummarize{PaneID, Mode, Turns}, MsgMemorySummarizeDone. (Tags use a dotted form, e.g. memory.append.)
Browser control (msg/browser_messages.go)
| Type | Direction |
|---|---|
MsgBrowserActionRequest{RequestID, Action, Params} |
server agent → Chrome extension |
MsgBrowserActionResponse{RequestID, Success, Result, Error, Screenshot} |
extension → server agent |
Chatbot (msg/chatbot_messages.go)
Session lifecycle for the embedded widget: MsgChatbotSessionCreated/Closed, MsgChatbotTakeover/Release (human operator), MsgChatbotHumanReply, MsgChatbotVisitorTyping, MsgChatbotSessionList/Reply. Support type ChatbotSessionInfo.
Legacy (msg/messages.go)
Six deprecated MsgPane*OutputAppend + six MsgPane*HistoryAppend + MsgPaneStatusUpdate + MsgPipelineOutputAppend — retained only for codec compatibility during migration to the unified ConversationMessage.
3.6 The publisher and message logging
NATSPublisher (msg/publisher.go) wraps a *nats.Conn + *CodecRegistry:
Send(subject, msg)— fire-and-forget.Request(subject, msg, timeout)— request/reply via inbox.SendConversation/SendConversationHistory— dual-publish-aware (the per-mode send's error is discarded; only the merged send propagates):
func (p *NATSPublisher) SendConversation(paneID string, cm *ConversationMessage) error {
wrapped := &MsgConversationAppend{Message: cm}
mode := string(cm.ConversationType)
_ = p.Send(T("pane", paneID, "output", mode), wrapped) // always per-mode
if cm.ConversationType.IsDualPublish() { // shell or ai only
return p.Send(T("pane", paneID, "output"), wrapped) // → merged topic
}
return nil
}
Request(subject, msg, timeout)— creates anats.NewInbox(), subscribes a buffered channel before publishing (avoids a race), and decodes the reply through the codec. No default timeout — the caller supplies it.SendMemoryAppend,SendMemorySummarize,SendBrowserActionRequest.Flush()→nc.Flush()— important before terminal status so output ordering is guaranteed.
MessageLogger (msg/msglog.go) optionally writes every SEND/RECV/REQ/REPLY to .rysh/actor-comms.log (enabled by config or RYSH_LOG_MESSAGES=true) with file:line, actor, subject, tag, byte size, and a payload excerpt — invaluable for debugging the distributed actor system.
3.7 Relationship to rysh-proto
The .proto files in rysh-proto define ConversationMessage, MemoryEntry/MemoryState, the pane-message wrappers, and NATSEnvelope as reference schemas. They are not used for serialization at runtime — the Go structs above are canonical and the wire format is JSON. The proto enum names (e.g. SHELL, AI) differ in case from the Go string constants ("shell", "ai"). See 9. rysh-proto.