Skip to main content

Overview

A Worker is a specialized queue-based agent that processes messages sent to a MeshAgent room queue. Other agents or applications can push tasks to the queue and the Worker will handle them in the background. This is helpful for long‑running or asynchronous jobs that shouldn’t block an interactive chat agent.

Two ways to build a Worker

  1. CLI: Run production-ready workers with a single command. Configure queue, tools, and rules using flags. Ideal for most use cases.
  2. SDK: Extend the base Worker class with custom code when you need deeper integrations or specialized behaviors.

In this guide you will learn

  • When to use Worker
  • How to run and deploy a Worker with the MeshAgent CLI
  • How to build and deploy a Worker with the MeshAgent SDK
  • How the Worker works including constructor parameters, lifecycle, processing flow, and hooks

When to use Worker

  • Process background jobs pushed by apps or other agents
  • Run long or repetitive jobs off the main chat thread
  • Execute non-interactive tasks where no follow up questions are desired
  • Batch operations that process multiple items sequentially
  • Schedule or trigger work externally and have the agent pick it up
If you need an interactive assistant, use ChatBot. For real-time speech, see VoiceBot. For a callable function/agent pattern with structured schemas, see TaskRunner.

Run and deploy a Worker with the CLI

Step 1: Run a Worker from the CLI

Let’s run a Worker that listens on a queue and can write files to room storage.
bash
# Authenticate to MeshAgent if not already signed in
meshagent setup

# Call a worker into your room
meshagent worker join --room quickstart --agent-name worker --queue tasks --require-storage \
  --room-rules "agents/worker/rules.txt" --rule "Process queued tasks and save results to storage."
When you add the --room-rules "agents/worker/rules.txt" flag and supply a file path for the rules, the file will be created if it does not already exist, this file is relative to the room storage. The --queue flag is required.
Tip: Use meshagent worker join --help to see all available tools and options.

Step 2: Send work to the queue

bash
meshagent room queue send --room=quickstart --queue=tasks \
  --json '{"prompt": "save a poem about ai to poem.txt"}'

Step 3: Package and deploy the worker

Once your worker runs locally, package it as a service so it is always available in the room. Both options below deploy the exact same worker - choose based on your workflow:
  • Option 1 (meshagent worker deploy): One command that deploys immediately (fastest/easiest approach)
  • Option 2 (meshagent worker spec + meshagent service create): Generates a yaml file you can review or futher customize before deploying
Option 1: Deploy directly Use the CLI to automatically deploy the worker to your room.
bash
meshagent worker deploy --room quickstart --service-name worker --agent-name worker --queue tasks --require-storage \
  --room-rules "agents/worker/rules.txt" --rule "Process queued tasks and save results to storage."
Option 2: Generate a YAML spec Create a meshagent.yaml file that defines how our service should run, then deploy the agent to our room. The service spec can be dynamically generated from the CLI by running:
meshagent worker spec --service-name worker --agent-name worker --queue tasks --require-storage \
  --room-rules "agents/worker/rules.txt" --rule "Process queued tasks and save results to storage."
Next, copy the output to a meshagent.yaml file
kind: Service # switch to service Template if installing from link for Powerboards
version: v1
metadata:
  name: worker
  description: "A worker that processes queued jobs"
  annotations:
    meshagent.service.id: "meshagent.worker"
agents:
  - name: worker
    description: "A queue based worker agent"
    annotations:
      meshagent.agent.type: "Worker"
ports:
- num: "*"                    # automatically assign an available MESHAGENT_PORT for the agent to run on 
  type: http
  endpoints:
  - path: /agent              # service path to call and run the agent on
    meshagent:
      identity: worker        # name of the agent as it shows up in the Room
container:
  image: us-central1-docker.pkg.dev/meshagent-public/images/cli:{SERVER_VERSION}-esgz
  command: "meshagent worker service --agent-name=worker --queue=tasks --require-storage --rule='Process queued tasks and save results to storage.'"
  storage:
    room:
      - path: /data             # mount room storage path for the agent to write files to
        read_only: false        # allow write access 
Then deploy it to your Room:
bash
meshagent service create --file meshagent.yaml --room quickstart
The --room flag is optional. Without it, the worker is deployed at the project level and appears in all rooms in your project.

Build and deploy a Worker with the SDK

The SDK approach produces the same deployed worker as the CLI examples above. Using the SDK allows you further control to write custom Python code for specialized processing logic, integrations, or behaviors. For most use cases the CLI is sufficient.

Step 1: Create a Worker Agent

The sample below shows a Worker that listens on a queue and writes files using the StorageToolkit. After starting the service you can push a message to the queue to trigger the worker. First create a python file, main.py, and define our StorageWorker:
import os
import logging
import asyncio
from meshagent.otel import otel_config
from meshagent.agents.worker import Worker
from meshagent.openai.tools import (
    OpenAIResponsesAdapter,
    OpenAIResponsesToolResponseAdapter,
)
from meshagent.api.services import ServiceHost
from meshagent.tools.storage import StorageToolkit

otel_config(service_name="worker")
log = logging.getLogger("worker")

host = ServiceHost()  # port defaults to an available port if not assigned


@host.path(path="/worker", identity="storage-worker")
class StorageWorker(Worker):
    def __init__(self):
        super().__init__(
            queue="storage-worker-queue",
            title="storage worker sample",
            description="this sample reads messages from a queue",
            llm_adapter=OpenAIResponsesAdapter(),
            tool_adapter=OpenAIResponsesToolResponseAdapter(),
            toolkits=[StorageToolkit()],
            rules=[
                "you will receive a message with instructions, process it and do what it says",
                "you are not an interactive agent so you must not ask the user questions",
            ],
        )

    async def process_message(self, *, chat_context, message, toolkits):
        log.info(f"processing {message}")
        response = await super().process_message(
            chat_context=chat_context, message=message, toolkits=toolkits
        )
        log.info(f"response {response}")


asyncio.run(host.run())

Step 2: Running the Worker

From your terminal inside an activated virtual environment start the service:
bash
meshagent setup # authenticate to meshagent if not already
meshagent service run "main.py" --room=quickstart

Step 3: Sending Work to the Queue

Now that our agent is running, let’s send it some work! Option 1: Using the MeshAgent CLI Use the MeshAgent CLI to directly connect to the room, mint a token, and send a message to the queue.
bash
meshagent room queue send --room=quickstart --queue=storage-worker-queue \
  --json '{"prompt": "save a poem about ai to poem.txt"}'
Option 2: Invoking the queue from code Create a python file push_queue.py and define our function to push a message to the queue.
import os
import asyncio
import logging
from meshagent.api import (
    RoomClient,
    WebSocketClientProtocol,
    ParticipantToken,
    ApiScope,
    ParticipantGrant,
)
from meshagent.api.helpers import websocket_room_url
from meshagent.otel import otel_config

otel_config(service_name="worker")
log = logging.getLogger("worker")

api_key = os.getenv("MESHAGENT_API_KEY")
if not api_key:
    raise RuntimeError("Set MESHAGENT_API_KEY before running this script.")


async def push():
    room_name = (
        "quickstart"  # make sure this matches the room your service is running in
    )
    token = ParticipantToken(
        name="sample-participant",
        grants=[
            ParticipantGrant(name="room", scope=room_name),
            ParticipantGrant(name="role", scope="agent"),
            ParticipantGrant(name="api", scope=ApiScope.agent_default()),
        ],
    ).to_jwt(api_key=api_key)

    protocol = WebSocketClientProtocol(
        url=websocket_room_url(room_name=room_name), token=token
    )
    try:
        async with RoomClient(protocol=protocol) as room:
            log.info(f"Connected to room: {room.room_name}")
            await room.queues.send(
                name="storage-worker-queue",
                message={"instructions": "save a poem about ai to poem.txt"},
            )
    except Exception as e:
        log.error(f"Connection failed:{e}")
        raise


asyncio.run(push())

Make sure your service is still running (meshagent service run "main.py" --room=quickstart). From a different tab in your terminal you can run the python file we created to send a task to the queue.
bash
export MESHAGENT_API_KEY="<your_api_key>"
python push_queue.py
You’ll see logs for the queue activity in the terminal window where your service is running. To verify the results we’ll head to MeshAgent Studio where we can see the .txt files.

Checking results in the Studio

From studio.meshagent.com open the room quickstart and you’ll see our poem about AI in the poems.txt file!

Step 4: Package and deploy the agent

To deploy your SDK Worker permanently, you’ll package your code with a meshagent.yaml file that defines the service configuration and a container image that MeshAgent can run. For full details on the service spec and deployment flow, see Packaging Services and Deploying Services. MeshAgent supports two deployment patterns for containers:
  1. Runtime image + code mount (recommended): Use a pre-built MeshAgent runtime image (like python-sdk-slim) that contains Python and all MeshAgent dependencies. Mount your lightweight code-only image on top. This keeps your code image tiny (~KB), eliminates dependency installation time, and allows your service to start quickly.
  2. Single Image: Bundle your code and all dependencies into one image. This is good when you need to install additional libraries, but can result in larger images and slower pulls. If you build your own images we recommend optimizing them with eStargz.
This example uses the runtime image + code mount pattern with the public python-docs-examples code image so you can run the documentation sample without building your own image. If you want to build and push your own code image, follow the steps below and update the storage.images entry in meshagent.yaml. Prepare your project structure This example organizes the agent code and configuration in the same folder, making each agent self-contained:
your-project/
├── Dockerfile                    # Shared by all samples
├── queueworker/
   ├── queueworker.py
   └── meshagent.yaml           # Config specific to this sample
└── another_sample/              # Other samples follow same pattern
    ├── another_sample.py
    └── meshagent.yaml
Note: If you’re building a single agent, you only need the queueworker/ folder. The structure shown supports multiple samples sharing one Dockerfile.
Step 4a: Build a Docker container If you want a code-only image, create a scratch Dockerfile and copy the files you want to run. This creates a minimal image that pairs with the runtime image + code mount pattern.
FROM scratch

COPY . /
Build and push the image with docker buildx:
bash
docker buildx build . \
  -t "<REGISTRY>/<NAMESPACE>/<IMAGE_NAME>:<TAG>" \
  --platform linux/amd64 \
  --push
Note: Building from the project root copies your entire project structure into the image. For a single agent, this is fine - your image will just contain one folder. For multi-agent projects, all agents will be in one image, but each can deploy independently using its own meshagent.yaml.
Step 4b: Package the agent Define the service configuration in a meshagent.yaml file.
kind: Service
version: v1
metadata:
  name: storage-worker
  description: "A worker agent that processes messages from a queue"
  annotations:
    meshagent.service.id: "storageworker"
agents:
  - name: storage-worker
    description: "A worker agent that processes messages from a queue"
    annotations:
      meshagent.agent.type: "Worker"
ports:
- num: "*"
  endpoints:
  - path: /worker
    meshagent:
      identity: storage-worker
container:
  image: "us-central1-docker.pkg.dev/meshagent-public/images/python-sdk:{SERVER_VERSION}-esgz"
  command: python /src/queueworker/queueworker.py
  storage:
    images:
      # Replace this image tag with your own code-only image if you build one.
      - image: "us-central1-docker.pkg.dev/meshagent-public/images/python-docs-examples:{SERVER_VERSION}"
        path: /src
        read_only: true

How the paths work:
  • Your code image contains queueworker/queueworker.py
  • It’s mounted at /src in the runtime container
  • The command runs python /src/queueworker/queueworker.py
Note: The default YAML in the docs uses us-central1-docker.pkg.dev/meshagent-public/images/python-docs-examples so you can test this example immediately without building your own image first. Replace this with your actual image tag when deploying your own code.
Step 4c: Deploy the agent Next from the CLI in the directory where your meshagent.yaml file is run:
bash
meshagent service create --file "meshagent.yaml" --room=quickstart
The Worker is now deployed to the quickstart room! Now the agent will always be available inside the room for us to send queue-based tasks. We can use the same CLI commands or SDK code from above to send messages to the queue.

How Worker Works

Constructor Arguments

A Worker accepts everything from SingleRoomAgent (title, description, requires, labels). The name constructor argument is deprecated; agent identity comes from its participant token.
ArgumentTypeDescription
queuestrRequired. Name of the room queue to listen for messages on.
llm_adapterLLMAdapterThe adapter to use so the agent works with an LLM. We recommend using the OpenAIResponsesAdapter from meshagent-openai.
tool_adapterToolResponseAdapter | NoneOptional adapter to translate tool outputs into LLM/context messages. We recommend using the OpenAIResponsesToolResponseAdapter from meshagent-openai.
toolkitslist[Toolkit] | NoneLocal toolkits always available to this worker (in addition to any requires).
ruleslist[str] | NoneOptional system prompt/rules that guide the agent’s behavior.
requireslist[Requirement] | NoneSchemas/toolkits to install in the room before processing. You can use RequiredSchema, RequiredToolkit to use existing toolkits and schemas that have been registered with the room.

Lifecycle Overview

  • await start(room): Connects to the room, installs requirements, and starts the background run() loop that consumes messages from the configured queue.
  • await stop(): Signals the loop to stop, awaits the main task, then disconnects cleanly.
  • room property: Access the active RoomClient for queues, storage, messaging, and tools.

Processing Flow

  1. Wait for work (long-polling). The worker listens for jobs using: message = await room.queues.receive(name=self._queue, create=True, wait=True) With wait=True, the call long-polls the queue: instead of returning immediately when the queue is empty, it waits asynchronously for a short window until a message arrives (or the wait times out). This is more efficient than tight polling and ensures the worker can pick up jobs immediately when they’re published.
  2. Build context and tools. Create a fresh chat context (init_chat_context()), apply any rules, and resolve toolkits from requires plus local toolkits.
  3. Represent the job in context. By default, append_message_context(...) serializes the message as a user message (JSON). Override this to customize how the payload is injected.
  4. Process the job. process_message(...) runs the task (default implementation calls your llm_adapter.next() with the prepared context and toolkits).
  5. Handle errors & keep running. Errors are logged. On receive failures, the loop backs off exponentially and retries; otherwise it immediately waits for the next message.

Key Behaviors and Hooks

  • Queue consumption: run() long-polls the queue (receive(..., wait=True)), creating it if needed.
  • Context building: append_message_context(...) inserts job data into the chat context (default: dump JSON as a user message).
  • Job execution: process_message(...) is the main hook: it prepares context, tools, and (by default) calls the LLM adapter’s .next() function to work through the task.
  • Resilience: Errors are logged; run() applies an exponential backoff between receive retries.

Key Methods

MethodDescription
async def run(room)Main loop: receives queue messages and processes them.
async def process_message(chat_context, room, message, toolkits)Core job handler—override for custom behavior.
async def append_message_context(room, message, chat_context)How a job is represented in the chat context (override to change).
async def start(room) / async def stop()Start/stop the worker, manage background task.

Next Steps

Continue Learning about Agents and Explore a variety of other base agents including:
  • ChatBot for conversational text-based agents
  • VoiceBot for conversational speech/voice based agents
  • TaskRunner for agents that run in the background with defined inputs and outputs
  • MailBot for email based agents