Skip to main content
A Worker is a specialized queue-based agent that processes messages sent to a MeshAgent room queue. Other agents or applications can push tasks to the queue and the Worker will handle them in the background. This is helpful for long‑running or asynchronous jobs that shouldn’t block an interactive chat agent.

When to Use It

  • Process background jobs pushed by apps or other agents
  • Run long or repetitive jobs off the main chat thread
  • Execute non-interactive tasks where no follow up questions are desired
  • Batch operations that process multiple items sequentially
  • Schedule or trigger work externally and have the agent pick it up
If you need an interactive assistant, use ChatBot. For real-time speech, see VoiceBot. For a callable function/agent pattern with structured schemas, see TaskRunner.

Constuctor Arguments

A Worker accepts everything from SingleRoomAgent (name, title, description, requires, labels) plus queue-specific and LLM based parameters.
ArgumentTypeDescription
queuestrRequired. Name of the room queue to listen for messages on.
llm_adapterLLMAdapterThe adapter to use so the agent works with an LLM. We recommend using the OpenAIResponsesAdapter from meshagent-openai.
tool_adapterToolResponseAdapter | NoneOptional adapter to translate tool outputs into LLM/context messages. We recommend using the OpenAIResponsesToolResponseAdapter from meshagent-openai.
toolkitslist[Toolkit] | NoneLocal toolkits always available to this worker (in addition to any requires).
ruleslist[str] | NoneOptional system prompt/rules that guide the agent’s behavior.
requireslist[Requirement] | NoneSchemas/toolkits to install in the room before processing. You can use RequiredSchema, RequiredToolkit to use existing toolkits and schemas that have been registered with the room.

Lifecycle Overview

  • await start(room): Connects to the room, installs requirements, and starts the background run() loop that consumes messages from the configured queue.
  • await stop(): Signals the loop to stop, awaits the main task, then disconnects cleanly.
  • room property: Access the active RoomClient for queues, storage, messaging, and tools.

Processing Flow

  1. Wait for work (long-polling). The worker listens for jobs using: message = await room.queues.receive(name=self._queue, create=True, wait=True) With wait=True, the call long-polls the queue: instead of returning immediately when the queue is empty, it waits asynchronously for a short window until a message arrives (or the wait times out). This is more efficient than tight polling and ensures the worker can pick up jobs immediately when they’re published.
  2. Build context and tools. Create a fresh chat context (init_chat_context()), apply any rules, and resolve toolkits from requires plus local toolkits.
  3. Represent the job in context. By default, append_message_context(...) serializes the message as a user message (JSON). Override this to customize how the payload is injected.
  4. Process the job. process_message(...) runs the task (default implementation calls your llm_adapter.next() with the prepared context and toolkits).
  5. Handle errors & keep running. Errors are logged. On receive failures, the loop backs off exponentially and retries; otherwise it immediately waits for the next message.
You can customize steps (2)–(4) by overriding decode_message(...), append_message_context(...), or process_message(...).

Key Behaviors and Hooks

  • Queue consumption: run() long-polls the queue (receive(..., wait=True)), creating it if needed.
  • Message decoding: decode_message(message) lets you normalize incoming payloads.
  • Context building: append_message_context(...) inserts job data into the chat context (default: dump JSON as a user message).
  • Job execution: process_message(...) is the main hook: it prepares context, tools, and (by default) calls the LLM adapter’s .next() function to work through the task.
  • Resilience: Errors are logged; run() applies an exponential backoff between receive retries.

Key Methods

MethodDescription
async def run(room)Main loop: receives queue messages and processes them.
async def process_message(chat_context, room, message, toolkits)Core job handler—override for custom behavior.
async def append_message_context(room, message, chat_context)How a job is represented in the chat context (override to change).
def decode_message(message)Normalize/validate incoming message payloads.
async def start(room) / async def stop()Start/stop the worker, manage background task.

Example: Creating a Storage Queue Worker

The sample below shows a Worker that listens on a queue and writes files using the StorageToolkit. After starting the service you can push a message to the queue to trigger the worker. First create a python file, main.py, and define our StorageWorker:
import os
import logging
import asyncio
from meshagent.otel import otel_config
from meshagent.agents.worker import Worker
from meshagent.openai.tools import (
    OpenAIResponsesAdapter,
    OpenAIResponsesToolResponseAdapter,
)
from meshagent.api.services import ServiceHost
from meshagent.tools.storage import StorageToolkit

otel_config(service_name="worker")
log = logging.getLogger("worker")
log.info(f"Listening on {os.getenv('WORKER_QUEUE')}")

host = ServiceHost()  # port defaults to an available port if not assigned


@host.path(path="/worker", identity="storage-worker")
class StorageWorker(Worker):
    def __init__(self):
        super().__init__(
            queue=os.getenv("WORKER_QUEUE"),
            name="storage-worker",
            title="storage worker sample",
            description="this sample reads messages from a queue",
            llm_adapter=OpenAIResponsesAdapter(),
            tool_adapter=OpenAIResponsesToolResponseAdapter(),
            toolkits=[StorageToolkit()],
            rules=[
                "you will receive a message with instructions, process it and do what it says",
                "you are not an interactive agent so you must not ask the user questions",
            ],
        )

    async def process_message(self, *, chat_context, room, message, toolkits):
        log.info(f"processing {message}")
        response = await super().process_message(
            chat_context=chat_context, room=room, message=message, toolkits=toolkits
        )
        log.info(f"response {response}")


asyncio.run(host.run())

Running the Worker

From your terminal inside an activated virtual environment start the service:
bash
export WORKER_QUEUE=test 
meshagent setup # authenticate to meshagent if not already
meshagent service run "main.py" --room=queue-test

Sending Work to the Queue

Now that our agent is running, let’s send it some work!

Option 1: Using the MeshAgent CLI

Use the MeshAgent CLI to directly connect to the room, mint a token, and send a message to the queue.
bash
meshagent queue send --room=queue-test --queue=test \
  --json '{"instructions": "save a poem about ai to poem.txt"}'

Option 2: Invoking the Queue from Code

Create a python file push_queue.py and define our function to push a message to the queue.
import os
import asyncio
import logging
from meshagent.api import (
    RoomClient,
    WebSocketClientProtocol,
    ParticipantToken,
    ApiScope,
    ParticipantGrant,
)
from meshagent.api.helpers import websocket_room_url
from meshagent.otel import otel_config

otel_config(service_name="worker")
log = logging.getLogger("worker")

api_key = os.getenv("MESHAGENT_API_KEY")
if not api_key:
    raise RuntimeError("Set MESHAGENT_API_KEY before running this script.")


async def push():
    room_name = (
        "queue-test"  # make sure this matches the room your service is running in
    )
    token = ParticipantToken(
        name="sample-participant",
        grants=[
            ParticipantGrant(name="room", scope=room_name),
            ParticipantGrant(name="role", scope="agent"),
            ParticipantGrant(name="api", scope=ApiScope.agent_default()),
        ],
    ).to_jwt(api_key=api_key)

    protocol = WebSocketClientProtocol(
        url=websocket_room_url(room_name=room_name), token=token
    )
    try:
        async with RoomClient(protocol=protocol) as room:
            log.info(f"Connected to room: {room.room_name}")
            await room.queues.send(
                name=os.getenv("WORKER_QUEUE"),
                message={"instructions": "save a poem about ai to poem.txt"},
            )
    except Exception as e:
        log.error(f"Connection failed:{e}")
        raise


asyncio.run(push())

Make sure your service is still running (meshagent service run "main.py" --room=queue-test). From a different tab in your terminal you can run the python file we created to send a task to the queue.
bash
python push_queue.py
You’ll see logs for the queue activity in the terminal window where your service is running. To verify the results we’ll head to MeshAgent Studio where we can see the .txt files.

Checking Results in the Studio

From studio.meshagent.com open the room queue-test and you’ll see our poem about AI in the poems.txt file!

Next Steps

Continue Learning about Agents and Explore a variety of other base agents including:
  • ChatBot for conversational text-based agents
  • VoiceBot for conversational speech/voice based agents
  • TaskRunner for agents that run in the background with defined inputs and outputs
  • MailWorker for email based agents