Utilize the PostgreSQL database with the Agent
Introduction
In this documentation example, we demonstrate how to use the uAgent library to create agents that interact with PostgreSQL data within a Docker Compose setup. In this scenario, one agent handles the insertion of employee data into the PostgreSQL database, while another agent retrieves this data. This example illustrates the seamless integration between agents, PostgreSQL, and Docker, showcasing how to manage data flow and communication in a distributed system.
Project Structure
.postgres-database-with-an-agent
├── docker-compose.yml
├── Dockerfile
├── README.md
└── src
    ├── constants.py
    ├── db
    │   ├── db_connection.py
    │   ├── __init__.py
    │   ├── models
    │   │   └── models.py
    │   └── schemas
    │       └── employees.sql
    └── main.pyAgent with PostgreSQL database
Set up the PostgreSQL connection with Docker using Docker Compose
This section details the files involved in setting up the PostgreSQL connection within your project.
db_connection.py: Contains the functions to establish and close the connection to the PostgreSQL database using the psycopg2 library. The create_connection function connects to the database with provided credentials, while the close_connection function ensures the connection is safely terminated.
import psycopg2
from psycopg2 import OperationalError
 
def create_connection(dbname, user, password, host="localhost", port="5432"):
    """
    Create a connection to the PostgreSQL database.
 
    :param dbname: Name of the database
    :param user: Database user
    :param password: User's password
    :param host: Database host
    :param port: Database port
    :return: Connection object or None if connection fails
    """
    try:
        conn = psycopg2.connect(
            dbname=dbname, user=user, password=password, host=host, port=port
        )
        print("Connection successful")
        return conn
    except OperationalError as error:
        print(f"Error connecting to PostgreSQL database: {error}")
        return None
 
def close_connection(conn):
    if conn:
        conn.close()
        print("Connection closed")docker-compose.yml: Configures the Docker services, including the PostgreSQL database and the application. It defines the environment variables for the database connection and maps the database schema from the host to the container.
version: "3.8"
services:
  db:
    container_name: postgres_container
    image: postgres
    restart: always
    environment:
      POSTGRES_USER: ${DB_USER}
      POSTGRES_PASSWORD: ${DB_PASSWORD}
      POSTGRES_DB: ${DB_NAME}
    volumes:
      - "postgres:/var/lib/postgresql/data"
      - ./src/db/schemas/employees.sql:/docker-entrypoint-initdb.d/employees.sql
    ports:
      - "5432:5432"
    networks:
      - agent_network
  app:
    build: ..
    container_name: poetry_app
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    depends_on:
      - db
    networks:
      - agent_network
    environment:
      DB_USER: ${DB_USER}
      DB_PASSWORD: ${DB_PASSWORD}
      DB_NAME: ${DB_NAME}
    command: poetry run python ./src/main.py
 
volumes:
  postgres:
 
networks:
  agent_network:
    driver: bridgeDockerfile: Builds the application container, installing dependencies via Poetry and setting up the environment to run the application. The container exposes port 8000 for the application service.
FROM python:3.12-slim
ENV PATH="$PATH:/root/.local/bin"
 
RUN apt-get update && \
    apt-get install -y curl gcc && \
    curl -sSL https://install.python-poetry.org/ | python3 -
 
WORKDIR /app
ADD pyproject.toml poetry.lock /app/
RUN poetry install
ADD . /app
EXPOSE 8000
 
ENTRYPOINT ["poetry", "run"]
CMD ["python", "main.py"]Defining the Employees Database Schema and Model
The Employees class defines a model representing employee data as a dictionary. The GetEmployees class represents a model used to request employee information, with a flag indicating whether a response is expected.
The employees.sql script defines a schema for an Employees table in a PostgreSQL database, if it doesn’t already exist. This table includes columns for employee ID, first name, last name, birth date, and salary.
from uagents import Model
 
class Employees(Model):
    employees_data: dict
 
class GetEmployees(Model):
    reply_back: boolCREATE TABLE IF NOT EXISTS Employees (
    EmployeeID INT PRIMARY KEY,
    FirstName VARCHAR(50),
    LastName VARCHAR(50),
    BirthDate DATE,
    Salary DECIMAL(10, 2)
);Postgres data with agent
This script sets up and runs two agents, db_insert_agent and db_fetch_agent, which interact with a PostgreSQL database to manage employee data. The agents use asynchronous event handling to fetch and insert employee information into the database.
Database Connection
- The create_connectionfunction is used to establish a connection to the PostgreSQL database using parameters defined indb_params.
Agents
- db_insert_agent: Responsible for inserting employee data into the database.
- db_fetch_agent: Responsible for fetching and reporting employee data from the database.
Event Handlers
- 
on_startup(db_fetch_agent):- Triggered when db_fetch_agentstarts.
- Retrieves and logs the PostgreSQL database version.
- Sends employee data to db_insert_agentif the version retrieval is successful.
 
- Triggered when 
- 
handle_employee_data(db_insert_agent):- Handles messages with employee data.
- Inserts the received employee data into the Employeestable in the database.
 
- 
fetch_all_employee_details(db_fetch_agent):- Handles messages requesting all employee details.
- Retrieves all employee records from the Employeestable and logs the data.
 
Database Operations
- Fetching Database Version: Uses the query SELECT version();to get the PostgreSQL version.
- Inserting Employee Data: Executes an INSERTquery to add employee records to theEmployeestable.
- Fetching Employee Details: Executes a SELECT * FROM Employeesquery to retrieve all employee records.
Execution
- Initializes a Bureauinstance.
- Adds both agents (db_insert_agentanddb_fetch_agent) to the bureau.
- Runs the bureau, which starts the agents and their event handlers.
Usage
- Startup: On startup, db_fetch_agentwill log the database version and send employee data todb_insert_agent.
- Inserting Data: db_insert_agentwill insert received employee data into the database.
- Fetching Data: db_fetch_agentwill fetch and log all employee details upon request.
from db.db_connection import create_connection
from uagents import Agent, Context, Bureau
from db.models.models import Employees, GetEmployees
from constants import employees_data
from constants import db_params, DB_FETCH_AGENT_ADDRESS
 
def get_db_version():
    """
    Retrieves the PostgreSQL database version.
 
    :return: Database version string or None if retrieval fails
    """
    conn = create_connection(**db_params)
    if conn:
        try:
            cursor = conn.cursor()
            cursor.execute("SELECT version();")
            db_version = cursor.fetchone()
            cursor.close()
            return db_version
        except Exception as error:
            print(f"Error executing query: {error}")
            return None
 
db_insert_agent = Agent(name="db_inserter", seed="db_inserter_seed_phrase")
db_fetch_agent = Agent(name="db_fetcher", seed="db_fetcher_seed_phrase")
 
DB_FETCH_AGENT_ADDRESS = DB_FETCH_AGENT_ADDRESS
 
@db_fetch_agent.on_event("startup")
async def on_startup(ctx: Context):
    """
    Event handler triggered on agent startup to fetch database version and send employee data.
 
    :param ctx: Context object for handling agent events
    """
    db_version = get_db_version()
    if db_version:
        ctx.logger.info(
            f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. PostgreSQL database version: {db_version[0]}"
        )
        await ctx.send(DB_FETCH_AGENT_ADDRESS, Employees(employees_data=employees_data))
    else:
        ctx.logger.info(
            f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. Could not retrieve the database version."
        )
 
@db_insert_agent.on_message(model=Employees, replies=GetEmployees)
async def handle_employee_data(ctx: Context, sender: str, msg: Employees):
    """
    Handler for inserting employee data into the database.
 
    :param ctx: Context object for handling agent events
    :param sender: Sender of the message
    :param msg: Message containing employee data
    """
    ctx.logger.info(f"Received request from {sender} {msg.dict()}")
    employee_data = msg.employees_data
    conn = create_connection(**db_params)
    if conn:
        try:
            cursor = conn.cursor()
            insert_query = """
            INSERT INTO Employees (EmployeeID, FirstName, LastName, BirthDate, Salary)
            VALUES (%s, %s, %s, TO_DATE(%s, 'DD-MM-YYYY'), %s)
            """
            cursor.execute(
                insert_query,
                (
                    employee_data["EmployeeID"],
                    employee_data["FirstName"],
                    employee_data["LastName"],
                    employee_data["BirthDate"],
                    employee_data["Salary"],
                ),
            )
            REPLY_BACK = True
            conn.commit()
            cursor.close()
            ctx.logger.info(f"Inserted employee data: {employee_data}")
            await ctx.send(sender, GetEmployees(reply_back=REPLY_BACK))
        except Exception as error:
            ctx.logger.error(f"Error inserting employee data: {error}")
    else:
        ctx.logger.error("Could not connect to the database.")
 
@db_fetch_agent.on_message(model=GetEmployees)
async def fetch_all_employee_details(ctx: Context, sender: str, msg: GetEmployees):
    """
    Handler for fetching all employee details from the database.
 
    :param ctx: Context object for handling agent events
    :param sender: Sender of the message
    :param msg: Message triggering the fetch operation
    """
    if msg.reply_back:
        conn = create_connection(**db_params)
        if conn:
            try:
                cursor = conn.cursor()
                query = "SELECT * FROM Employees"
                cursor.execute(query)
                all_employees = cursor.fetchall()
                cursor.close()
 
                employees_list = []
                for employee in all_employees:
                    employee_info = {
                        "EmployeeID": employee[0],
                        "FirstName": employee[1],
                        "LastName": employee[2],
                        "BirthDate": employee[3].strftime("%d-%m-%Y"),
                        "Salary": employee[4],
                    }
                    employees_list.append(employee_info)
                ctx.logger.info(f"Retrieved all employee data: {employees_list}")
            except Exception as error:
                ctx.logger.error(f"Error retrieving employee data: {error}")
        else:
            ctx.logger.error("Could not connect to the database.")
 
bureau = Bureau()
bureau.add(db_insert_agent)
bureau.add(db_fetch_agent)
 
if __name__ == "__main__":
    bureau.run()
 This constant file initializes a dictionary for storing employee data and configures the database connection parameters using environment variables. It also defines a constant for the address of the database fetch agent.
import os
 
employees_data = {
    "EmployeeID": "",
    "FirstName": "",
    "LastName": "",
    "BirthDate": "",
    "Salary": 0,
}
 
db_params = {
    "dbname": os.getenv("DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "host": "db",
    "port": "5432",
}
 
DB_FETCH_AGENT_ADDRESS = (
    "agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu"
)
 Poetry Dependencies
[tool.poetry.dependencies]
python = "^3.10"
psycopg2-binary = "^2.9.9"
uagents = { version = "^0.13.0", python = ">=3.10,<3.13" }How to Run This Example
Update the Required environment variables
DB_USER=
DB_PASSWORD=
DB_NAME=Instructions to execute the example.
- Navigate to the root Folder of Example.
- Update the constant file with new entries to store in the database
- Run docker-compose build
- Run docker-compose up
Expected Output
poetry_app | Connection successful
poetry_app | INFO:     [db_fetcher]: Hello, I'm agent db_inserter and my address is agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu. PostgreSQL database version: PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit
poetry_app | INFO:     [db_inserter]: Received request from agent1qv470qn3vfgn3dwe5z90m8u6qvtn6chrgm4urfzdg2v9qyagln6sgnh4wwg {'employees_data': {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000}}
poetry_app | Connection successful
poetry_app | INFO:     [db_inserter]: Inserted employee data: {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000}
poetry_app | Connection successful
poetry_app | INFO:     [db_fetcher]: Retrieved all employee data: [{'EmployeeID': 0, 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': Decimal('50000.00')}]
poetry_app | INFO:     [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit)