Worker is a specialized queue-based agent that processes messages sent to a MeshAgent room queue. Other agents or
applications can push tasks to the queue and the Worker will handle them in the background. This is helpful for
long‑running or asynchronous jobs that shouldn’t block an interactive chat agent.
When to Use It
- Process background jobs pushed by apps or other agents
- Run long or repetitive jobs off the main chat thread
- Execute non-interactive tasks where no follow up questions are desired
- Batch operations that process multiple items sequentially
- Schedule or trigger work externally and have the agent pick it up
Constuctor Arguments
AWorker accepts everything from SingleRoomAgent (name, title, description, requires, labels) plus queue-specific and LLM based parameters.
| Argument | Type | Description |
|---|---|---|
queue | str | Required. Name of the room queue to listen for messages on. |
llm_adapter | LLMAdapter | The adapter to use so the agent works with an LLM. We recommend using the OpenAIResponsesAdapter from meshagent-openai. |
tool_adapter | ToolResponseAdapter | None | Optional adapter to translate tool outputs into LLM/context messages. We recommend using the OpenAIResponsesToolResponseAdapter from meshagent-openai. |
toolkits | list[Toolkit] | None | Local toolkits always available to this worker (in addition to any requires). |
rules | list[str] | None | Optional system prompt/rules that guide the agent’s behavior. |
requires | list[Requirement] | None | Schemas/toolkits to install in the room before processing. You can use RequiredSchema, RequiredToolkit to use existing toolkits and schemas that have been registered with the room. |
Lifecycle Overview
await start(room): Connects to the room, installs requirements, and starts the background run() loop that consumes messages from the configured queue.await stop(): Signals the loop to stop, awaits the main task, then disconnects cleanly.room property: Access the activeRoomClientfor queues, storage, messaging, and tools.
Processing Flow
-
Wait for work (long-polling). The worker listens for jobs using:
message = await room.queues.receive(name=self._queue, create=True, wait=True)Withwait=True, the call long-polls the queue: instead of returning immediately when the queue is empty, it waits asynchronously for a short window until a message arrives (or the wait times out). This is more efficient than tight polling and ensures the worker can pick up jobs immediately when they’re published. -
Build context and tools. Create a fresh chat context (
init_chat_context()), apply any rules, and resolve toolkits from requires plus local toolkits. -
Represent the job in context. By default,
append_message_context(...)serializes the message as a user message (JSON). Override this to customize how the payload is injected. -
Process the job.
process_message(...)runs the task (default implementation calls yourllm_adapter.next()with the prepared context and toolkits). - Handle errors & keep running. Errors are logged. On receive failures, the loop backs off exponentially and retries; otherwise it immediately waits for the next message.
decode_message(...), append_message_context(...), or process_message(...).
Key Behaviors and Hooks
- Queue consumption:
run()long-polls the queue(receive(..., wait=True)), creating it if needed. - Message decoding:
decode_message(message)lets you normalize incoming payloads. - Context building:
append_message_context(...)inserts job data into the chat context (default: dump JSON as a user message). - Job execution:
process_message(...)is the main hook: it prepares context, tools, and (by default) calls the LLM adapter’s.next()function to work through the task. - Resilience: Errors are logged;
run()applies an exponential backoff between receive retries.
Key Methods
| Method | Description |
|---|---|
async def run(room) | Main loop: receives queue messages and processes them. |
async def process_message(chat_context, room, message, toolkits) | Core job handler—override for custom behavior. |
async def append_message_context(room, message, chat_context) | How a job is represented in the chat context (override to change). |
def decode_message(message) | Normalize/validate incoming message payloads. |
async def start(room) / async def stop() | Start/stop the worker, manage background task. |
Example: Creating a Storage Queue Worker
The sample below shows a Worker that listens on a queue and writes files using theStorageToolkit. After starting the
service you can push a message to the queue to trigger the worker.
First create a python file, main.py, and define our StorageWorker:
Running the Worker
From your terminal inside an activated virtual environment start the service:bash
Sending Work to the Queue
Now that our agent is running, let’s send it some work!Option 1: Using the MeshAgent CLI
Use the MeshAgent CLI to directly connect to the room, mint a token, and send a message to the queue.bash
Option 2: Invoking the Queue from Code
Create a python filepush_queue.py and define our function to push a message to the queue.
meshagent service run "main.py" --room=queue-test). From a different tab in your terminal you can run the python file we created to send a task to the queue.
bash
Checking Results in the Studio
From studio.meshagent.com open the roomqueue-test and you’ll see our poem about AI in the poems.txt file!
Next Steps
Continue Learning about Agents and Explore a variety of other base agents including:- ChatBot for conversational text-based agents
- VoiceBot for conversational speech/voice based agents
- TaskRunner for agents that run in the background with defined inputs and outputs
- MailWorker for email based agents