HomeDocsArchitecture › 3. Messaging & Protocol

3. Messaging & Protocol

This chapter is the protocol reference: the envelope, the codec, the subject scheme, the unified conversation model, the memory model, and the full message catalog. Everything here lives in rysh-shared/msg (with type-aliased re-exports in rysh-cli/internal/msg).


3.1 The envelope and codec

NATSEnvelope (msg/codec.go:52)

Every NATS message is a JSON envelope. The exact definition:

type NATSEnvelope struct {
    TypeTag string          `json:"t"` // message type discriminator string constant
    ReplyTo string          `json:"r"` // NATS reply subject; empty for fire-and-forget
    Payload json.RawMessage `json:"p"` // JSON inner message, embedded verbatim
}

Using json.RawMessage for the payload means the inner JSON is embedded as-is, avoiding the ~33% base64 inflation a []byte field would cause. (The Chrome extension's TS port uses a base64-encoded JSON payload, and the browser_action tool decodes a local envelope with a []byte payload — see the gotcha in §4.5 and §8.)

CodecRegistry (msg/codec.go)

A bidirectional map between type tags and Go types:

Method Purpose
Register(tag, typeName, decodeFn) register a message type and its decoder
Decode(tag, payload) turn (tag, json) into a typed Go value
TagOf(msg) get the tag for an outgoing Go value

Decoders are built with a generic jsonDecoder[T]() (codec.go:145) — note an empty payload decodes to a zero-valued *T with no error:

func jsonDecoder[T any]() func([]byte) (interface{}, error) {
    return func(b []byte) (interface{}, error) {
        v := new(T)
        if len(b) > 0 {
            if err := json.Unmarshal(b, v); err != nil { return nil, err }
        }
        return v, nil // returns *T
    }
}

TagOf keys off fmt.Sprintf("%T", msg) and returns "" for an unregistered type (the publisher then errors "publisher: unregistered message type %T"). Decode returns "codec: unknown TypeTag %q" on a miss.

DefaultCodecRegistry() registers, in order (codec.go:163-194): the 8 output tags (MsgPaneOutputAppend, MsgPaneShellOutputAppend, MsgPaneAIOutputAppend, MsgPaneChatOutputAppend, MsgPaneRyshOutputAppend, MsgPaneExternalOutputAppend, MsgPaneStatusUpdate, MsgPipelineOutputAppend); the 6 history tags; the 2 unified tags (MsgConversationAppend, MsgConversationHistoryAppend); then RegisterAgenticCodecs, RegisterBrowserCodecs, RegisterChatbotCodecs, registerMemoryCodecs. (Only DefaultCodecRegistry lives in codec.go; RegisterAgenticCodecs is defined in the separate file msg/agentic_codec.go.) The CLI's DefaultCodecRegistry starts from the shared one and adds ~200 CLI-specific tags (workspace/tab/lane/pane-group routing, snapshots, sharing, agents, humanoids, attention, raw terminal/relay).

RequestEnvelope (codec.go:64)

When a message arrives with ReplyTo set (or NATS's built-in reply subject), the bridge wraps the decoded inner message in a RequestEnvelope whose .Reply(resp) serializes the response and publishes it to the reply subject:

type RequestEnvelope struct {
    Inner   interface{}
    ReplyTo string
    NC      *nats.Conn
    Codecs  *CodecRegistry
}
func (r *RequestEnvelope) Reply(responseMsg interface{}) error {
    tag := r.Codecs.TagOf(responseMsg)
    if tag == "" { return fmt.Errorf("reply: unknown message type %T", responseMsg) }
    payload, _ := json.Marshal(responseMsg)
    env := NATSEnvelope{TypeTag: tag, Payload: payload} // ReplyTo deliberately empty in a reply
    data, _ := json.Marshal(env)
    return r.NC.Publish(r.ReplyTo, data)
}

This is how snapshot and CLI request/reply work.

graph LR
    Msg["typed Go msg"] -->|TagOf| Tag
    Tag --> Env["NATSEnvelope{t,r,p=JSON}"]
    Env -->|publish| Subj["NATS subject"]
    Subj -->|NATSBridge| Dec["CodecRegistry.Decode"]
    Dec -->|"r empty"| Mailbox["actor mailbox (inner msg)"]
    Dec -->|"r set"| Req["RequestEnvelope (.Reply)"]
    Req --> Mailbox

3.2 Subject / topic scheme (msg/topics.go)

Subjects are prefixed by a session prefix (default rysh; SetSessionPrefix changes it per session so multiple daemons can coexist). The builder (topics.go:29) is trivial — no escaping, set-once at startup with no locking:

var sessionPrefix = "rysh"
func SetSessionPrefix(session string) { if session == "" { session = "default" }; sessionPrefix = session }
func T(parts ...string) string { return sessionPrefix + "." + strings.Join(parts, ".") }

So T("pane", paneID, "output", mode){session}.pane.{paneID}.output.{mode}.

Per-pane subjects

Subject Direction Purpose
pane.{id}.inbox →pane submit input / pane commands
pane.{id}.output pane→ merged output (shell + ai only)
pane.{id}.output.{mode} pane→ per-mode output
pane.{id}.history[.{mode}] pane→ conversation history append
pane.{id}.status pane→ status update
pane.{id}.llm_prompt_execution.inbox →agent prompts, cancel, approval responses
pane.{id}.llm_prompt_execution.output agent→ streamed agentic output
pane.{id}.llm_prompt_execution.status agent→ phase/iteration status
pane.{id}.approval.request agent→ approval request
pane.{id}.approval.response →agent approval decision
pane.{id}.browser.request agent→ext browser action request
pane.{id}.browser.response ext→agent browser action result
pane.{id}.memory.{mode} / .memory.summarize both conversation memory
pane.{id}.relay.data both native-speed raw PTY relay

Other subjects

Subject Purpose
ws.inbox / ws.snapshot workspace command inbox / snapshot request-reply
tab.{id}.inbox tab commands
agent.{name}.inbox / agent.registry.inbox headless agents
humanoid.{name}.inbox humanoid agents
pane-group.{id}.inbox ephemeral approval-pane creation
share.registry.inbox share registry

Server (cross-machine) subjects

The server scopes everything workspace-tenant by UUID:

ws.{workspaceID}.share.{shareID}.output           share output stream
ws.{workspaceID}.share.{shareID}.output.layout    periodic layout document (tab/lane/group shares)
ws.{workspaceID}.share.{shareID}.command           inbound command (control mode)
ws.{workspaceID}.share.{shareID}.command.ack        command acknowledgement
ws.{workspaceID}.share.{shareID}.{register|unregister|heartbeat|subscriber}   control plane
rysh.pane.{paneID}....                              server-side agentic panes (same per-pane scheme)
rysh.channel.{type}.inbound.{workspaceID}           inbound external-channel messages

_INBOX.> is the NATS request/reply inbox subtree. The subject-ACL permits exactly ws.{workspaceID}.> and _INBOX.> per authenticated client.


3.3 The unified conversation model (msg/conversation.go)

A single ConversationMessage type replaced ~12 legacy per-mode message types. This is the central data structure of the whole protocol. Verbatim (conversation.go:86):

type ConversationMessage struct {
    TurnID           string           `json:"turn_id"`
    TurnType         TurnType         `json:"turn_type"`          // question | answer
    ConversationType ConversationType `json:"conversation_type"` // shell|ai|rysh|chat|email|slack|chatbot
    InputType        InputType        `json:"input_type"`        // shell|prompt|command|approval|message
    MessageSource    MessageSource    `json:"message_source"`    // human|ai|external|agent|subagent|humanoid|system
    Content          string           `json:"content"`
    TimestampMs      int64            `json:"timestamp_ms"`
    Sensitive        bool             `json:"sensitive,omitempty"`
    SubjectToShare   bool             `json:"subject_to_share,omitempty"`
    Role             string           `json:"role,omitempty"`     // visitor, assistant, human, system
    Streaming        bool             `json:"streaming,omitempty"`// partial chunk of an in-progress answer
    Origin           *MessageOrigin   `json:"origin,omitempty"`   // (not in proto) tab-mirroring provenance
}

The constant values are lowercase strings (note these differ in case from the proto enum names):

// ConversationType                    TurnType                  InputType                 MessageSource
ConvShell   = "shell"   ConvEmail   = "email"     TurnQuestion = "question"   InputShell    = "shell"     SourceHuman    = "human"
ConvAI      = "ai"      ConvSlack   = "slack"      TurnAnswer   = "answer"     InputPrompt   = "prompt"    SourceAI       = "ai"
ConvRysh    = "rysh"    ConvChatbot = "chatbot"                                InputCommand  = "command"   SourceExternal = "external"
ConvChat    = "chat"                                                           InputApproval = "approval"  SourceAgent    = "agent"
                                                                               InputMessage  = "message"   SourceSubagent = "subagent"
                                                                                                           SourceHumanoid = "humanoid"
                                                                                                           SourceSystem   = "system"

MessageOrigin (all fields ,omitempty) carries mirroring provenance so a mirrored tab can attribute a message to its source pane and the mirror pane rendering it:

type MessageOrigin struct {
    SourceShareID, SourceTabID, SourcePaneID, SourcePaneAlias string
    MirrorTabID, MirrorPaneID                                 string
}

Convenience constructors (conversation.go:140-258) preset the fields: NewShellQuestion/Answer (Source=Human/System, SubjectToShare=true), NewAIQuestion/Answer (the answer carries Streaming), NewRyshQuestion/Answer (no SubjectToShare), NewChatMessage, and NewChannelMessage (sets Sensitive). NewTurnID() returns a UUID; NowMs() returns time.Now().UnixMilli().

Conversation types and their properties

Mode Dual-publish? Has memory? Backed by
shell a PTY
ai the agentic loop
rysh ## system commands
chat a chat buffer
email a humanoid channel
slack a humanoid channel
chatbot the website widget / server pane
  • Dual-publish (IsDualPublish()): shell and ai publish to both the per-mode topic and the merged output topic.
  • Memory (HasMemory()): ai, email, slack, chatbot summarize older turns into memory entries.
func (ct ConversationType) IsDualPublish() bool { return ct == ConvShell || ct == ConvAI }
func (ct ConversationType) HasMemory() bool {
    switch ct {
    case ConvAI, ConvEmail, ConvSlack, ConvChatbot: return true
    }
    return false
}

Convenience constructors: NewShellQuestion/Answer, NewAIQuestion/Answer, NewRyshQuestion/Answer, NewChatMessage, NewChannelMessage.

Wire wrappers

Type Subject pattern
MsgConversationAppend{message} pane.{id}.output[.{mode}]
MsgConversationHistoryAppend{message} pane.{id}.history[.{mode}]

The NATSPublisher.SendConversation helper publishes to the per-mode topic always, plus the merged topic for dual-publish modes.


3.4 The memory model (msg/memory.go)

Long conversations are compressed: older turns are summarized by the LLM into MemoryEntry records, which are prepended to the system prompt on subsequent prompts.

type MemoryEntry struct {
    ID            string           `json:"id"`              // UUID
    Summary       string           `json:"summary"`         // LLM-generated
    TurnIDs       []string         `json:"turn_ids"`
    Mode          ConversationType `json:"mode"`            // not a bare string — the ConversationType enum
    CreatedAtMs   int64            `json:"created_at_ms"`
    TurnCount     int              `json:"turn_count"`
}
type MemoryState struct {
    PaneID                 string           `json:"pane_id"`
    Mode                   ConversationType `json:"mode"`
    Entries                []MemoryEntry    `json:"entries"`  // capped at MaxMemoryEntries = 50 (oldest merged)
    TotalSummarizedTurns   int              `json:"total_summarized_turns"`
}
sequenceDiagram
    participant PA as Pane / MemoryManager
    participant LLM as Summarizer (Claude)
    participant M as LLMPromptExecutionActor
    PA->>LLM: MsgMemorySummarize (turns, MemorySummarizationPrompt)
    LLM-->>PA: summary (≤500 words: decisions/files/context/TODOs)
    PA->>PA: append MemoryEntry (merge oldest if > 50)
    PA->>M: MsgMemoryStateUpdate(state)
    M->>M: prepend FormatMemoryForPrompt(state) to system prompt

FormatMemoryForPrompt(state) (memory.go:105) renders a ## Conversation Memory section — one ### Memory N (covers K turns, created <ago>) block per entry (where <ago> is just now / Nm ago / Nh ago / Nd ago) — followed by a ## Recent Conversation header. MemorySummarizationPrompt(mode, turns) (memory.go:145) is the condensing prompt:

You are summarizing a conversation for future context. The conversation took
place in <mode> mode. Produce a concise summary (max 500 words) that captures:
1. Key decisions made
2. Files modified or discussed
3. Important context that would help a future prompt understand what happened
4. Any unresolved issues or TODOs

Do NOT include raw code blocks unless they are critical to understanding.

3.5 Message catalog

Agentic control plane (msg/agentic_messages.go)

Tag / Type Role
MsgAgenticPrompt{RequestID, Prompt} user prompt → LLMPromptExecutionActor
MsgAgenticCancel cancel the in-flight orchestrator
MsgOrchestratorDone{OrchestratorID, Success, Summary, FilesChanged, Errors, Conversation} orchestrator→parent (Conversation is in-process only)
MsgToolCall{ToolCallID, ToolName, Parameters} tool dispatch
MsgToolResult{ToolCallID, ToolName, Content, Error, ExitCode, Approved} tool result
MsgAgenticOutput{OrchestratorID, Type, Content, Metadata} streamed output; Type ∈ thinking/text/tool_call/tool_result/diff/error
MsgAgenticStatus{OrchestratorID, Phase, Iteration, MaxIterations, ActiveTools} status bar; Phase ∈ planning/executing/waiting_approval/compacting/done/error
MsgApprovalRequest{RequestID, OrchestratorID, ToolCallID, Type, Description, Diff, Choices} request approval
MsgApprovalResponse{RequestID, Decision, Reason, ChoiceIdx, RespondingPaneID} user decision
MsgSpawnSubOrchestrator / MsgSubOrchestratorResult defined but unused — the real sub_agent (§4.2) spawns an in-process child OrchestratorActor and returns via the standard MsgOrchestratorDone, not these wire types
MsgGetConversationHistory{LastN} / MsgConversationHistoryReply{PaneID, Turns} history query (Turns is []ConversationTurnInfo, each carrying []ToolCallInfo — serializable shapes mirroring the provider's turns)
MsgRestoreConversation{Conversation} restore persisted conversation on restart
MsgCreateApprovalPane / MsgDestroyApprovalPane / MsgPaneSetApprovalPaneGroups ephemeral approval-pane flow
MsgSetChatOutputPane{PaneID} reroute agent output to a chat buffer
MsgMemoryStateUpdate{State} inject memory state into the agent

Approval types: diff, destructive_action, choice, question. Approval decisions: yes, yes_always, no, no_with_explanation, choice_selected.

The two most important structs verbatim (agentic_messages.go:79,133,144):

type MsgAgenticOutput struct {
    OrchestratorID string            `json:"orchestrator_id"`
    Type           string            `json:"type"` // thinking|text|tool_call|tool_result|diff|error
    Content        string            `json:"content"`
    Metadata       map[string]string `json:"metadata,omitempty"`
}
type MsgApprovalRequest struct {
    RequestID, OrchestratorID, ToolCallID string
    Type        ApprovalType `json:"type"`        // diff|destructive_action|choice|question
    Description string       `json:"description"`
    Diff        *DiffPayload `json:"diff,omitempty"`     // DiffPayload{FilePath, UnifiedDiff}
    Choices     []Choice     `json:"choices,omitempty"`  // Choice{Label, Description}
}
type MsgApprovalResponse struct {
    RequestID        string           `json:"request_id"`
    Decision         ApprovalDecision `json:"decision"`
    Reason           string           `json:"reason,omitempty"`
    ChoiceIdx        int              `json:"choice_idx,omitempty"`
    RespondingPaneID string           `json:"responding_pane_id,omitempty"`
}

Gotcha: MsgOrchestratorDone.Conversation has the json tag "-" — the full conversation travels only in-process (proto.actor mailbox), never over NATS. That is how tool-result context (e.g. an email draft_id) survives across orchestrator sessions.

Memory (msg/memory.go)

MsgMemoryAppend, MsgMemoryGet/MsgMemoryGetReply, MsgMemorySummarize{PaneID, Mode, Turns}, MsgMemorySummarizeDone. (Tags use a dotted form, e.g. memory.append.)

Browser control (msg/browser_messages.go)

Type Direction
MsgBrowserActionRequest{RequestID, Action, Params} server agent → Chrome extension
MsgBrowserActionResponse{RequestID, Success, Result, Error, Screenshot} extension → server agent

Chatbot (msg/chatbot_messages.go)

Session lifecycle for the embedded widget: MsgChatbotSessionCreated/Closed, MsgChatbotTakeover/Release (human operator), MsgChatbotHumanReply, MsgChatbotVisitorTyping, MsgChatbotSessionList/Reply. Support type ChatbotSessionInfo.

Legacy (msg/messages.go)

Six deprecated MsgPane*OutputAppend + six MsgPane*HistoryAppend + MsgPaneStatusUpdate + MsgPipelineOutputAppend — retained only for codec compatibility during migration to the unified ConversationMessage.


3.6 The publisher and message logging

NATSPublisher (msg/publisher.go) wraps a *nats.Conn + *CodecRegistry:

  • Send(subject, msg) — fire-and-forget.
  • Request(subject, msg, timeout) — request/reply via inbox.
  • SendConversation / SendConversationHistory — dual-publish-aware (the per-mode send's error is discarded; only the merged send propagates):
func (p *NATSPublisher) SendConversation(paneID string, cm *ConversationMessage) error {
    wrapped := &MsgConversationAppend{Message: cm}
    mode := string(cm.ConversationType)
    _ = p.Send(T("pane", paneID, "output", mode), wrapped)   // always per-mode
    if cm.ConversationType.IsDualPublish() {                  // shell or ai only
        return p.Send(T("pane", paneID, "output"), wrapped)   // → merged topic
    }
    return nil
}
  • Request(subject, msg, timeout) — creates a nats.NewInbox(), subscribes a buffered channel before publishing (avoids a race), and decodes the reply through the codec. No default timeout — the caller supplies it.
  • SendMemoryAppend, SendMemorySummarize, SendBrowserActionRequest.
  • Flush()nc.Flush() — important before terminal status so output ordering is guaranteed.

MessageLogger (msg/msglog.go) optionally writes every SEND/RECV/REQ/REPLY to .rysh/actor-comms.log (enabled by config or RYSH_LOG_MESSAGES=true) with file:line, actor, subject, tag, byte size, and a payload excerpt — invaluable for debugging the distributed actor system.


3.7 Relationship to rysh-proto

The .proto files in rysh-proto define ConversationMessage, MemoryEntry/MemoryState, the pane-message wrappers, and NATSEnvelope as reference schemas. They are not used for serialization at runtime — the Go structs above are canonical and the wire format is JSON. The proto enum names (e.g. SHELL, AI) differ in case from the Go string constants ("shell", "ai"). See 9. rysh-proto.