Skip to Content

Utilize the PostgreSQL database with the Agent

Introduction

In this documentation example, we demonstrate how to use the uAgent library to create agents that interact with PostgreSQL data within a Docker Compose setup. In this scenario, one agent handles the insertion of employee data into the PostgreSQL database, while another agent retrieves this data. This example illustrates the seamless integration between agents, PostgreSQL, and Docker, showcasing how to manage data flow and communication in a distributed system.

Project Structure

.postgres-database-with-an-agent ├── docker-compose.yml ├── Dockerfile ├── README.md └── src ├── constants.py ├── db │ ├── db_connection.py │ ├── __init__.py │ ├── models │ │ └── models.py │ └── schemas │ └── employees.sql └── main.py

Agent with PostgreSQL database

Set up the PostgreSQL connection with Docker using Docker Compose

This section details the files involved in setting up the PostgreSQL connection within your project.

db_connection.py: Contains the functions to establish and close the connection to the PostgreSQL database using the psycopg2 library. The create_connection function connects to the database with provided credentials, while the close_connection function ensures the connection is safely terminated.

db_connection.py
import psycopg2 from psycopg2 import OperationalError def create_connection(dbname, user, password, host="localhost", port="5432"): """ Create a connection to the PostgreSQL database. :param dbname: Name of the database :param user: Database user :param password: User's password :param host: Database host :param port: Database port :return: Connection object or None if connection fails """ try: conn = psycopg2.connect( dbname=dbname, user=user, password=password, host=host, port=port ) print("Connection successful") return conn except OperationalError as error: print(f"Error connecting to PostgreSQL database: {error}") return None def close_connection(conn): if conn: conn.close() print("Connection closed")

docker-compose.yml: Configures the Docker services, including the PostgreSQL database and the application. It defines the environment variables for the database connection and maps the database schema from the host to the container.

version: "3.8" services: db: container_name: postgres_container image: postgres restart: always environment: POSTGRES_USER: ${DB_USER} POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_DB: ${DB_NAME} volumes: - "postgres:/var/lib/postgresql/data" - ./src/db/schemas/employees.sql:/docker-entrypoint-initdb.d/employees.sql ports: - "5432:5432" networks: - agent_network app: build: .. container_name: poetry_app volumes: - .:/app ports: - "8000:8000" depends_on: - db networks: - agent_network environment: DB_USER: ${DB_USER} DB_PASSWORD: ${DB_PASSWORD} DB_NAME: ${DB_NAME} command: poetry run python ./src/main.py volumes: postgres: networks: agent_network: driver: bridge

Dockerfile: Builds the application container, installing dependencies via Poetry and setting up the environment to run the application. The container exposes port 8000 for the application service.

FROM python:3.12-slim ENV PATH="$PATH:/root/.local/bin" RUN apt-get update && \ apt-get install -y curl gcc && \ curl -sSL https://install.python-poetry.org/ | python3 - WORKDIR /app ADD pyproject.toml poetry.lock /app/ RUN poetry install ADD . /app EXPOSE 8000 ENTRYPOINT ["poetry", "run"] CMD ["python", "main.py"]

Defining the Employees Database Schema and Model

The Employees class defines a model representing employee data as a dictionary. The GetEmployees class represents a model used to request employee information, with a flag indicating whether a response is expected.

The employees.sql script defines a schema for an Employees table in a PostgreSQL database, if it doesn’t already exist. This table includes columns for employee ID, first name, last name, birth date, and salary.

models.py
from uagents import Model class Employees(Model): employees_data: dict class GetEmployees(Model): reply_back: bool
employees.sql
CREATE TABLE IF NOT EXISTS Employees ( EmployeeID INT PRIMARY KEY, FirstName VARCHAR(50), LastName VARCHAR(50), BirthDate DATE, Salary DECIMAL(10, 2) );

Postgres data with agent

This script sets up and runs two agents, db_insert_agent and db_fetch_agent, which interact with a PostgreSQL database to manage employee data. The agents use asynchronous event handling to fetch and insert employee information into the database.

Database Connection

  • The create_connection function is used to establish a connection to the PostgreSQL database using parameters defined in db_params.

Agents

  • db_insert_agent: Responsible for inserting employee data into the database.
  • db_fetch_agent: Responsible for fetching and reporting employee data from the database.

Event Handlers

  • on_startup (db_fetch_agent):

    • Triggered when db_fetch_agent starts.
    • Retrieves and logs the PostgreSQL database version.
    • Sends employee data to db_insert_agent if the version retrieval is successful.
  • handle_employee_data (db_insert_agent):

    • Handles messages with employee data.
    • Inserts the received employee data into the Employees table in the database.
  • fetch_all_employee_details (db_fetch_agent):

    • Handles messages requesting all employee details.
    • Retrieves all employee records from the Employees table and logs the data.

Database Operations

  • Fetching Database Version: Uses the query SELECT version(); to get the PostgreSQL version.
  • Inserting Employee Data: Executes an INSERT query to add employee records to the Employees table.
  • Fetching Employee Details: Executes a SELECT * FROM Employees query to retrieve all employee records.

Execution

  • Initializes a Bureau instance.
  • Adds both agents (db_insert_agent and db_fetch_agent) to the bureau.
  • Runs the bureau, which starts the agents and their event handlers.

Usage

  1. Startup: On startup, db_fetch_agent will log the database version and send employee data to db_insert_agent.
  2. Inserting Data: db_insert_agent will insert received employee data into the database.
  3. Fetching Data: db_fetch_agent will fetch and log all employee details upon request.
main.py
from db.db_connection import create_connection from uagents import Agent, Context, Bureau from db.models.models import Employees, GetEmployees from constants import employees_data from constants import db_params, DB_FETCH_AGENT_ADDRESS def get_db_version(): """ Retrieves the PostgreSQL database version. :return: Database version string or None if retrieval fails """ conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() cursor.execute("SELECT version();") db_version = cursor.fetchone() cursor.close() return db_version except Exception as error: print(f"Error executing query: {error}") return None db_insert_agent = Agent(name="db_inserter", seed="db_inserter_seed_phrase") db_fetch_agent = Agent(name="db_fetcher", seed="db_fetcher_seed_phrase") DB_FETCH_AGENT_ADDRESS = DB_FETCH_AGENT_ADDRESS @db_fetch_agent.on_event("startup") async def on_startup(ctx: Context): """ Event handler triggered on agent startup to fetch database version and send employee data. :param ctx: Context object for handling agent events """ db_version = get_db_version() if db_version: ctx.logger.info( f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. PostgreSQL database version: {db_version[0]}" ) await ctx.send(DB_FETCH_AGENT_ADDRESS, Employees(employees_data=employees_data)) else: ctx.logger.info( f"Hello, I'm agent {db_insert_agent.name} and my address is {db_insert_agent.address}. Could not retrieve the database version." ) @db_insert_agent.on_message(model=Employees, replies=GetEmployees) async def handle_employee_data(ctx: Context, sender: str, msg: Employees): """ Handler for inserting employee data into the database. :param ctx: Context object for handling agent events :param sender: Sender of the message :param msg: Message containing employee data """ ctx.logger.info(f"Received request from {sender} {msg.dict()}") employee_data = msg.employees_data conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() insert_query = """ INSERT INTO Employees (EmployeeID, FirstName, LastName, BirthDate, Salary) VALUES (%s, %s, %s, TO_DATE(%s, 'DD-MM-YYYY'), %s) """ cursor.execute( insert_query, ( employee_data["EmployeeID"], employee_data["FirstName"], employee_data["LastName"], employee_data["BirthDate"], employee_data["Salary"], ), ) REPLY_BACK = True conn.commit() cursor.close() ctx.logger.info(f"Inserted employee data: {employee_data}") await ctx.send(sender, GetEmployees(reply_back=REPLY_BACK)) except Exception as error: ctx.logger.error(f"Error inserting employee data: {error}") else: ctx.logger.error("Could not connect to the database.") @db_fetch_agent.on_message(model=GetEmployees) async def fetch_all_employee_details(ctx: Context, sender: str, msg: GetEmployees): """ Handler for fetching all employee details from the database. :param ctx: Context object for handling agent events :param sender: Sender of the message :param msg: Message triggering the fetch operation """ if msg.reply_back: conn = create_connection(**db_params) if conn: try: cursor = conn.cursor() query = "SELECT * FROM Employees" cursor.execute(query) all_employees = cursor.fetchall() cursor.close() employees_list = [] for employee in all_employees: employee_info = { "EmployeeID": employee[0], "FirstName": employee[1], "LastName": employee[2], "BirthDate": employee[3].strftime("%d-%m-%Y"), "Salary": employee[4], } employees_list.append(employee_info) ctx.logger.info(f"Retrieved all employee data: {employees_list}") except Exception as error: ctx.logger.error(f"Error retrieving employee data: {error}") else: ctx.logger.error("Could not connect to the database.") bureau = Bureau() bureau.add(db_insert_agent) bureau.add(db_fetch_agent) if __name__ == "__main__": bureau.run()

This constant file initializes a dictionary for storing employee data and configures the database connection parameters using environment variables. It also defines a constant for the address of the database fetch agent.

constants.py
import os employees_data = { "EmployeeID": "", "FirstName": "", "LastName": "", "BirthDate": "", "Salary": 0, } db_params = { "dbname": os.getenv("DB_NAME"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "host": "db", "port": "5432", } DB_FETCH_AGENT_ADDRESS = ( "agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu" )

Poetry Dependencies

pyproject.toml
[tool.poetry.dependencies] python = "^3.10" psycopg2-binary = "^2.9.9" uagents = { version = "^0.13.0", python = ">=3.10,<3.13" }

How to Run This Example

Update the Required environment variables

.env.example
DB_USER= DB_PASSWORD= DB_NAME=

Instructions to execute the example.

  • Navigate to the root Folder of Example.
  • Update the constant file with new entries to store in the database
  • Run docker-compose build
  • Run docker-compose up

Expected Output

poetry_app | Connection successful poetry_app | INFO: [db_fetcher]: Hello, I'm agent db_inserter and my address is agent1qwg0h3gx2kvqmwadlg0j4r258r7amcfskx2mudz92ztjmtfdclygxrh5esu. PostgreSQL database version: PostgreSQL 16.3 (Debian 16.3-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit poetry_app | INFO: [db_inserter]: Received request from agent1qv470qn3vfgn3dwe5z90m8u6qvtn6chrgm4urfzdg2v9qyagln6sgnh4wwg {'employees_data': {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000}} poetry_app | Connection successful poetry_app | INFO: [db_inserter]: Inserted employee data: {'EmployeeID': '0', 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': 50000} poetry_app | Connection successful poetry_app | INFO: [db_fetcher]: Retrieved all employee data: [{'EmployeeID': 0, 'FirstName': 'john', 'LastName': 'wick', 'BirthDate': '29-08-1999', 'Salary': Decimal('50000.00')}] poetry_app | INFO: [bureau]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit)
Last updated on