MessagingClient enables sending messages between participants and agents in a room, as well as handling streaming messages (MessageStreamWriter and MessageStreamReader).


Overview

Messaging is a cornerstone of any application that involves dynamic collaboration—whether that collaboration happens between two people, two software agents, or a mix of both. By layering real-time messaging capabilities on top of your existing MeshAgent-powered infrastructure, you gain the flexibility to handle an extensive variety of use cases—from a simple text-based chat between human users to a sophisticated communication pipeline where AI-driven agents and humans collaborate to accomplish tasks.


Why Messaging?

  1. Real-time collaboration
    Effective collaboration happens in real time. The Messaging API allows participants—be they humans or agents—to exchange messages instantly. This immediacy is essential for systems that demand quick feedback loops, like interactive help desk solutions, live analytics dashboards, or agent-based problem solving.

  2. Uniform interface for agents and humans
    The Messaging API treats all participants the same, regardless of whether they’re powered by artificial intelligence or controlled by a human. A single, consistent abstraction lowers the complexity of real-time, multi-user applications. You focus on creating features; the API handles participant states, message routing, and attachment handling.

  3. Structured data exchange
    Our Messaging API supports sending JSON-based messages and optional binary attachments. This makes it easy to share structured data, multimedia files, or AI model outputs. Instead of rolling your own data-transmission system, you can rely on standardized endpoints and message formats.

  4. Broadcast and point-to-point
    Send targeted messages to a single participant or simultaneously reach everyone in a room. This flexibility simplifies building features like announcements, shared notifications, team-based workflows, or private agent-human DM channels.

  5. Extendable via Streams
    For more advanced scenarios—like sending large files or streaming partial updates—the Messaging API provides a streaming protocol. Your agents can pass chunks of data incrementally, maintaining a continuous exchange without blocking the rest of your system.


Core Features

Participant Model

Any entity in a room—human or agent—gets represented by a Participant. Each Participant has a unique identifier and can be addressed individually for one-on-one interactions, or collectively for broadcasts. This single abstraction unifies how you approach messaging, so your app doesn’t need separate code paths for humans vs. agents.

JSON-Powered Messages

Your messages can include arbitrary JSON data, which makes them easy to parse and manipulate. Whether you’re sending a text prompt to an AI agent, user profile updates, or a command to start a workflow, JSON-serialization keeps your messages compatible across the entire system.

Optional Attachments

In addition to JSON, you can attach binary data directly to a message. This is especially useful for file uploads, voice notes, images, or model outputs that need to be transferred as raw bytes.

Stream Communication

When your application requires transferring large or continuous data—like audio/video streams, logs, or sensor outputs—Streams help you break the data into manageable chunks, while preserving the real-time nature of the exchange.

Event-Driven Architecture

Register event handlers for specific message types so your application logic reacts immediately. For example, you can trigger a follow-up task whenever an agent returns a particular response, or display a system notification any time a participant updates their attributes.

  • MessagingClient – A high-level client that:
    • Sends and receives messages between room participants
    • Manages stream-based messages via createStream()
    • Tracks remote participants and their attributes
    • Can enable or disable the messaging subsystem
  • MessageStreamChunk – Represents a single chunk in a stream, containing a header and optional binary data.
  • MessageStreamWriter – A writer for sending streaming data (chunks) to a remote participant.
  • MessageStreamReader – A reader that receives streaming data (chunks) from a remote participant.

Example Usage

Below are some examples demonstrating how to use MessagingClient for basic messaging and streaming use cases.

Basic Example: Sending and Receiving Messages

# Suppose you already have a RoomClient instance named `room`

# Enable messaging (allows receiving participant and stream events)
await room.messaging.enable()

# Acquire a participant instance from somewhere; you might discover it via events
my_participant = ... # type: RemoteParticipant

# Send a simple message
await room.messaging.send_message({
  "to": my_participant,
  "type": "chat",
  "message": {"text": "Hello from Python!"}
})

# Broadcast a message to all participants
await room.messaging.broadcast_message({
  "type": "announcement",
  "message": {"content": "Hello everyone!"}
})

# Disable messaging
await room.messaging.disable()

Example: Creating and Handling Streams

# Create a new stream
stream_writer = await room.messaging.create_stream(
    to=my_participant,
    header={"streamPurpose": "file-transfer"}
)

# Write chunks
await stream_writer.write(MessageStreamChunk(
    header={"chunkNumber": 1},
    data=b"some binary data"
))

# Close the stream
await stream_writer.close()

enable(onStreamAccept?)

Enables the messaging subsystem on the server. Optionally provides a callback that is triggered whenever an incoming stream is opened by a remote participant.

await room.messaging.enable(
    lambda reader: print(f"New stream opened: {reader._streamId}"))

disable()

Disables the messaging subsystem on the server.

await room.messaging.disable()

sendMessage(to, type, message, attachment?)

Sends a message to a specific participant, optionally including a binary attachment.

Parameters:

  • to – The participant to receive the message.
  • type – A string that categorizes or labels the message.
  • message – A JSON-serializable object containing message data.
  • attachment – (Optional) A Uint8Array of binary data to attach.
await room.messaging.send_message({
  "to": my_participant,
  "type": "chat",
  "message": {"text": "Hey!"},
  "attachment": b"\x01\x02\x03"
})

broadcastMessage(type, message, attachment?)

Broadcasts a message to all participants in the room.

Parameters:

  • type – A string that categorizes or labels the message.
  • message – A JSON-serializable object containing message data.
  • attachment – (Optional) A Uint8Array of binary data to attach.
await room.messaging.broadcast_message({
  "type": "announcement",
  "message": {
    "title": "Server Maintenance",
    "content": "We will be down tonight.",
  }
})

createStream(to, header)

Creates a new stream directed at a single participant. Returns a MessageStreamWriter once the remote participant accepts the stream.

Parameters:

  • to – The participant to receive the stream.
  • header – A JSON-serializable object describing the stream.

Returns
A Promise<MessageStreamWriter> that resolves once the remote participant accepts the stream.

writer = await room.messaging.create_stream(
  to=my_participant,
  header={"purpose": "video-stream"}
)

remoteParticipants

Provides an iterable collection of all known remote participants.

for participant in room.messaging.remote_participants:
    print(f"Participant: {participant.id}, role: {participant.role}")

MessageStreamChunk

Represents a chunk of data in a stream, consisting of a header and optional binary data.


MessageStreamWriter

A writer for sending streaming data to a remote participant, created via MessagingClient.createStream().

  • write(MessageStreamChunk)
    Sends a chunk of data in the ongoing stream.

  • close(): Promise<void>
    Closes the stream, signaling to the remote participant that no more chunks will be sent.

# Example usage after acquiring a MessageStreamWriter
await stream_writer.write(MessageStreamChunk(
  header={"part": 1},
  data=b"some binary data"
))
await stream_writer.close()

  1. Event Emission
    MessagingClient extends EventEmitter<RoomMessageEvent>. It emits events whenever messages are received or streams are opened/closed. You can listen for the "message" event or for custom events like "participant_added", "participant_removed", etc.:
room.messaging.on(
    "message",
    lambda evt: print("Received a message of type:", evt.message.type))

room.messaging.on(
    "participant_added",
    lambda evt: print("A new participant joined:", evt.message.message))
  1. Remote Participants
    MessagingClient automatically keeps track of participants joining or leaving when the messaging subsystem is enabled. You can access them via remoteParticipants.

  2. Error Handling

    • If a request fails or the server returns an error, the underlying RoomClient may throw an exception.
    • Stream operations (createStream, write, etc.) may also fail if the remote participant rejects or if the connection is lost.
  3. Extensibility
    You can add additional features—like chunk-level encryption, compression, or custom handshake logic—by extending MessagingClient or by wrapping the provided stream objects (MessageStreamWriter / MessageStreamReader).

  4. Performance
    For large or frequent streams, consider buffering or batching chunk writes in your client logic. The streaming mechanism in MessagingClient is designed to handle incremental data transfer, but network considerations remain important for efficient performance.