Skip to Content

Agent Handlers

Introduction

Within the uAgents Framework, functions can be decorated with handlers, to only be triggered on a condition caught by the uAgents library.

Below, we show how to use the following different event handlers:

  1. Interval tasks: .on_interval()
  2. Handle messages: .on_message()
  3. Answer queries: .on_query()
  4. Triggered by event: on_event()

Creating an interval task with on_interval() handler

Sometimes an agent will need to perform a task periodically. To do this we can use the on_interval() decorator which periodically repeats a given function for the agent. For instance, an agent could send a message every 2 seconds to another agent.

Let’s get started and create our first interval task!

Imports needed

Walk-through

  1. Let’s create a Python script for this task, and name it:

    windows
    echo. > interval_task.py
  2. Then import the necessary classes from uagents library, Agent and Context, and create our agent:

interval-task.py
from uagents import Agent, Context agent = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://localhost:8000/submit"])
  1. Create a function to handle the startup event, which will introduce the agent:
interval-task.py
@agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.")
  1. We can now define our agent’s interval behavior. We want our agent to log a message every 2 seconds using the on_interval decorator:
interval-task.py
@agent.on_interval(period=2.0) async def say_hello(ctx: Context): ctx.logger.info("Hello!") if __name__ == "__main__": agent.run()

The output will be printed out using the ctx.logger.info() method.

  1. Save the script.

The overall script should look as follows:

interval-task.py
from uagents import Agent, Context agent = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://localhost:8000/submit"]) @agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.") @agent.on_interval(period=2.0) async def say_hello(ctx: Context): ctx.logger.info("Hello!") if __name__ == "__main__": agent.run()

Run the script

Run the script: python interval_task.py

The output should be as follows:

INFO: [alice]: Registration on Almanac API successful INFO: [alice]: Registering on almanac contract... INFO: [alice]: Registering on almanac contract...complete INFO: [alice]: Agent inspector available at https://agentverse.ai/inspect/?uri=http%3A//127.0.0.1%3A8000&address=agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t INFO: [alice]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: [alice]: Hello, I'm agent alice and my address is agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t. INFO: [alice]: Hello! INFO: [alice]: Hello! INFO: [alice]: Hello!

Handle messages using the on_message() handler

We now showcase a scenario where three agents, named alice, bob, and charles, use a custom protocol to communicate. In the example, Alice and Bob support the protocol, whereas Charles attempts to send broadcast messages to all agents using the protocol. Agents use the on_message() handler which allows them to handle messages matching specific data models.

Let’s get started!

Walk-through

  1. First of all, let’s create a Python script for this task, and name it:

    windows
    echo. > broadcast.py
  2. We then need to import the Agent, Bureau, Context, Model, and Protocol classes from the uagents library. Then, let’s create the 3 different agents using the class Agent. Each agent is initialized with a unique name and a seed phrase for wallet recovery.

broadcast.py
from uagents import Agent, Bureau, Context, Model, Protocol # create agents # alice and bob will support the protocol # charles will try to reach all agents supporting the protocol alice = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://127.0.0.1:8000/submit"]) bob = Agent(name="bob", seed="bob recovery phrase", port=8001, endpoint=["http://127.0.0.1:8001/submit"]) charles = Agent(name="charles", seed="charles recovery phrase", port=8002, endpoint=["http://127.0.0.1:8002/submit"])

It is optional but useful to include a seed parameter when creating an agent to set fixed addresses. Otherwise, random addresses will be generated every time you run the agent.

  1. Let’s then define the message data models to specify the type of messages being handled and exchanged by the agents. We define a BroadcastExampleRequest and a BroadcastExampleResponse data models. Finally, create a protocol named proto with version 1.0:
broadcast.py
class BroadcastExampleRequest(Model): pass class BroadcastExampleResponse(Model): text: str # define protocol proto = Protocol(name="proto", version="1.0")
  1. Let’s now define a message handler function for incoming messages of type BroadcastExampleRequest in the protocol:
broadcast.py
@proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse) async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest): await ctx.send( sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}") )

Here we defined a handle_request() function which is used whenever a request is received. This sends a response back to the sender. This function is decorated with the .on_message() decorator indicating that this function is triggered whenever a message of type BroadcastExampleRequest is received. The function sends a response containing a greeting message with the name of the agent that sent the request in the first place.

  1. Now, we need to include the protocol into the agents. Specifically, the protocol is included in both alice and bob agents. This means they will follow the rules defined in the protocol when communicating:
broadcast.py
alice.include(proto) bob.include(proto)
ℹ️

After the first registration in the Almanac smart contract, it will take about 5 minutes before the agents can be found through the protocol.

  1. It is now time to define the behavior and function of charles agent:
broadcast.py
@charles.on_interval(period=5) async def say_hello(ctx: Context): status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest()) ctx.logger.info(f"Trying to contact {len(status_list)} agents.") @charles.on_message(model=BroadcastExampleResponse) async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse): ctx.logger.info(f"Received response from {sender}: {msg.text}")

In the first part, we use the .on_interval() decorator to define an interval behavior for this agent when the script is being run. In this case, the agent will execute the say_hello() function every 5 seconds. The Context object is a collection of data and functions related to the agent. Inside the say_hello() function, the agent uses the ctx.broadcast() method to send a broadcast message. The message is of type BroadcastExampleRequest() and it is being sent using the protocol’s digest (proto.digest).

Then, we defined a .on_message() decorator which decorates handle_response() function. This function handles all incoming messages of type BroadcastExampleResponse from other agents. When a response is received, it logs the information. Inside the handle_response() function, the agent logs an informational message using ctx.logger.info() method to print the sender and the content of the message. The message includes the sender’s name and the text content of the response message.

  1. We are now ready to set up a Bureau object for agents to be run together at the same time, and we add alice, bob, and charles to it using the bureau.add() method:
broadcast.py
bureau = Bureau(port=8000, endpoint="http://localhost:8000/submit") bureau.add(alice) bureau.add(bob) bureau.add(charles) if __name__ == "__main__": bureau.run()

The bureau is assigned to listen on port=8000 and specifies an endpoint at "http://localhost:8000/submit" for submitting data.

  1. Save the script.

The overall script should look as follows:

broadcast.py
from uagents import Agent, Bureau, Context, Model, Protocol # create agents # alice and bob will support the protocol # charles will try to reach all agents supporting the protocol alice = Agent(name="alice", seed="alice recovery phrase", port=8000, endpoint=["http://127.0.0.1:8000/submit"]) bob = Agent(name="bob", seed="bob recovery phrase", port=8001, endpoint=["http://127.0.0.1:8001/submit"]) charles = Agent(name="charles", seed="charles recovery phrase", port=8002, endpoint=["http://127.0.0.1:8002/submit"]) class BroadcastExampleRequest(Model): pass class BroadcastExampleResponse(Model): text: str # define protocol proto = Protocol(name="proto", version="1.0") @proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse) async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest): await ctx.send( sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}") ) # include protocol # Note: after the first registration on the almanac smart contract, it will # take about 5 minutes before the agents can be found through the protocol alice.include(proto) bob.include(proto) # let charles send the message to all agents supporting the protocol @charles.on_interval(period=5) async def say_hello(ctx: Context): status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest()) ctx.logger.info(f"Trying to contact {len(status_list)} agents.") @charles.on_message(model=BroadcastExampleResponse) async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse): ctx.logger.info(f"Received response from {sender}: {msg.text}") bureau = Bureau(port=8000, endpoint="http://localhost:8000/submit") bureau.add(alice) bureau.add(bob) bureau.add(charles) if __name__ == "__main__": bureau.run()

Run the script

Make sure to have activated your virtual environment correctly.

Run the script: python broadcast.py

The output would be:

INFO: [alice]: Registration on Almanac API successful INFO: [alice]: Registering on almanac contract... INFO: [alice]: Registering on almanac contract...complete INFO: [ bob]: Registration on Almanac API successful INFO: [ bob]: Registering on almanac contract... INFO: [ bob]: Registering on almanac contract...complete INFO: [charles]: Registration on Almanac API successful INFO: [charles]: Registration on Almanac API successful INFO: [charles]: Registering on almanac contract... INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit) INFO: [charles]: Trying to contact 2 agents. INFO: [charles]: Received response from agent1q0mau8vkmg78xx0sh8cyl4tpl4ktx94pqp2e94cylu6haugt2hd7j9vequ7: Hello from bob INFO: [charles]: Received response from agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t: Hello from alice

Answer queries with on_query() handler

The on_query() handler is used to register a Function as a handler for incoming queries that match a specified Model. This decorator enables the agent to respond to queries in an event-driven manner.

Walk-through

Agent’s script

For the agent, the script sets up an agent to handle incoming queries. It defines two models: TestRequest and Response. Upon startup, it logs the agent’s details. The core functionality lies in the query_handler, decorated with @agent.on_query(), which processes received queries and sends back a predefined response. This demonstrates creating responsive agents within the uagents Framework, showcasing how they can interact with other agents or functions in an asynchronous, event-driven architecture.

agent.py
from uagents import Agent, Context, Model class TestRequest(Model): message: str class Response(Model): text: str # Initialize the agent with its configuration. agent = Agent( name="your_agent_name_here", seed="your_agent_seed_here", port=8001, endpoint="http://localhost:8001/submit", ) @agent.on_event("startup") async def startup(ctx: Context): ctx.logger.info(f"Starting up {agent.name}") ctx.logger.info(f"With address: {agent.address}") ctx.logger.info(f"And wallet address: {agent.wallet.address()}") # Decorator to handle incoming queries. @agent.on_query(model=TestRequest, replies={Response}) async def query_handler(ctx: Context, sender: str, _query: TestRequest): ctx.logger.info("Query received") try: # do something here await ctx.send(sender, Response(text="success")) except Exception: await ctx.send(sender, Response(text="fail")) # Main execution block to run the agent. if __name__ == "__main__": agent.run()

The agent is created using the Agent class from uagents library. You can initialize it by providing it with a name, seed, port, and endpoint. It defines an on_event() handler for the startup event, where it logs information about the agent’s initialization. It defines an on_query() handler for handling queries of type TestRequest. Upon receiving a query, it processes it and sends back a Response. The agent is then set to run.

Proxy

The proxy is implemented using FastAPI. It sets up two routes: "/" for a simple root message and "/endpoint" for receiving requests. When a POST request is made to "/endpoint" with a JSON payload containing a TestRequest, it triggers the make_agent_call function. Inside make_agent_call, it calls agent_query to communicate with the agent. The agent receives the query, processes it, and sends back a response. The proxy receives the response from the agent and sends back a success message along with the response text.

Let’s explore the Proxy code script step-by-step:

  1. First of all navigate to directory where you want to create your project.

  2. Create a Python script name on_query.py by running:

    windows
    echo. > on_query.py
  3. We need to import json, fastapi, uagent’s Model and query. Then we would need to define the query format using the TestRequest class as a subclass of Model:

on_query.py
import json from fastapi import FastAPI, Request from uagents import Model from uagents.query import query from uagents.envelope import Envelope AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0" class TestRequest(Model): message: str
  1. Create agent_query() function to send query to agent and decode the response received.
on_query.py
async def agent_query(req): response = await query(destination=AGENT_ADDRESS, message=req, timeout=15) if isinstance(response, Envelope): data = json.loads(response.decode_payload()) return data["text"] return response
  1. Initialize a FastAPI app:
on_query.py
app = FastAPI()
  1. Define a root endpoint to test the server:
on_query.py
@app.get("/") def read_root(): return "Hello from the Agent controller"
  1. Define an endpoint to make agent calls:
on_query.py
@app.post("/endpoint") async def make_agent_call(req: Request): model = TestRequest.parse_obj(await req.json()) try: res = await agent_query(model) return f"successful call - agent response: {res}" except Exception: return "unsuccessful agent call"
  1. Save the script. Remember that you need to provide the AGENT_ADDRESS parameter to correctly run this code.

The overall script should look as follows:

on_query.py
import json from fastapi import FastAPI, Request from uagents import Model from uagents.query import query from uagents.envelope import Envelope AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0" class TestRequest(Model): message: str async def agent_query(req): response = await query(destination=AGENT_ADDRESS, message=req, timeout=15) if isinstance(response, Envelope): data = json.loads(response.decode_payload()) return data["text"] return response app = FastAPI() @app.get("/") def read_root(): return "Hello from the Agent controller" @app.post("/endpoint") async def make_agent_call(req: Request): model = TestRequest.parse_obj(await req.json()) try: res = await agent_query(model) return f"successful call - agent response: {res}" except Exception: return "unsuccessful agent call"

Run the example

In separate terminals:

  1. Run the FastAPI proxy: uvicorn proxy:app

  2. Run the agent: python agent.py

  3. Query the agent via the proxy: curl -d '{"message": "test"}' -H "Content-Type: application/json" -X POST http://localhost:8000/endpoint

Catching events with on_event() handler

During startup, and shutdown there are two events that are caught by the uAgents library, startup and shutdown.

Here’s an example:

on_event(“startup”)

@agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.") ...

on_event(“shutdown”)

@agent.on_event("shutdown") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and I am shutting down") ...
Last updated on