Logo ← PostgreSQL Blog

Monitoring DDL Events in Postgres

Tracking DDL (Data Definition Language) operations in PostgreSQL databases is crucial for system security and auditing changes. In this…

Monitoring DDL Events in Postgres

Tracking DDL (Data Definition Language) operations in PostgreSQL databases is crucial for system security and auditing changes. In this article, I will explain step-by-step how to set up a system to log DDL commands and send notifications via email. Additionally, I’ll demonstrate how to monitor these operations using Prometheus and Grafana.

Step 1: Monitoring DDL Events

PostgreSQL supports monitoring DDL events through “event triggers.” Below is an example SQL code that creates a log_ddl_cbsdb table and defines a function to log DDL operations to this table:

CREATE TABLE admin.log_ddl_cbsdb (
    id SERIAL PRIMARY KEY,
    user_name TEXT,
    ddl_date TIMESTAMP,
    object_name TEXT,
    object_type TEXT,
    schema_name TEXT,
    client_host TEXT,
    command_tag TEXT,
    sql_statement TEXT
);

CREATE OR REPLACE FUNCTION log_ddl_events()
RETURNS event_trigger AS $$
DECLARE
    r RECORD;
    query TEXT;
BEGIN
    -- For CREATE and ALTER operations
    IF TG_EVENT = 'ddl_command_end' THEN
        FOR r IN
            SELECT
                objid::regclass::text AS object_name,
                object_type,
                schema_name,
                command_tag
            FROM
                pg_event_trigger_ddl_commands()
        LOOP
            query := format(
                $q$
                INSERT INTO admin.log_ddl_cbsdb
                (user_name, ddl_date, object_name, object_type, schema_name, client_host, command_tag, sql_statement)
                VALUES ('%s', clock_timestamp(), '%s', '%s', '%s', inet_client_addr(), '%s', %L)
                $q$,
                current_user,
                COALESCE(r.object_name, 'N/A'),
                COALESCE(r.object_type, 'N/A'),
                COALESCE(r.schema_name, 'N/A'),
                r.command_tag,
                current_query()
            );
            -- Execute the query via dblink_exec
            PERFORM dblink_exec(
                'host=***** port=***** dbname=**** user=****',
                query
            );
        END LOOP;
    END IF;
    -- For DROP operations
    IF TG_EVENT = 'sql_drop' THEN
        FOR r IN
            SELECT
                object_name,
                object_type,
                schema_name
            FROM
                pg_event_trigger_dropped_objects()
            WHERE
                object_type = 'table' -- Log only tables
        LOOP
            query := format(
                $q$
                INSERT INTO admin.log_ddl_cbsdb
                (user_name, ddl_date, object_name, object_type, schema_name, client_host, command_tag, sql_statement)
                VALUES ('%s', clock_timestamp(), '%s', '%s', '%s', inet_client_addr(), 'DROP', %L)
                $q$,
                current_user,
                COALESCE(r.object_name, 'N/A'),
                COALESCE(r.object_type, 'N/A'),
                COALESCE(r.schema_name, 'N/A'),
                current_query()
            );
            -- Execute the query via dblink_exec
            PERFORM dblink_exec(
                'host=10.**.**.** port=***** dbname=***** user=******',
                query
            );
        END LOOP;
    END IF;
END;
$$ LANGUAGE plpgsql;



CREATE EVENT TRIGGER log_ddl_on_command_end
ON ddl_command_end
EXECUTE FUNCTION log_ddl_events();


CREATE EVENT TRIGGER log_ddl_on_drop
ON sql_drop
EXECUTE FUNCTION log_ddl_events();

This code captures DDL operations and logs them into the admin.log_ddl_cbsdb table. Additionally, it specifically monitors sql_drop events for tables.

Step 2: Sending Notifications via Email

You can use Python to fetch data from the table and send it as an email. Here’s how:

Required Libraries

import pandas as pd
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from sqlalchemy import create_engine
from datetime import datetime, timedelta

Database and Email Configuration

The following script fetches data from multiple databases and sends email notifications for the last week’s records:

DB_SETTINGS = [
    {"host": "10.**.**.**", "port": ****, "dbname": "****", "user": "****", "password": "****"},
    {"host": "10.**.**.**", "port": ****, "dbname": "****", "user": "****", "password": "****"},
    # Additional databases can be added...
]

SMTP_SERVER = "*****"
SMTP_PORT = 25 # or 587
EMAIL_USER = "******"
EMAIL_PASS = "******"
EMAIL_RECEIVER = "******"
def create_db_engine(db_settings):
    db_url = f"postgresql+psycopg2://{db_settings['user']}:{db_settings['password']}@{db_settings['host']}:{db_settings['port']}/{db_settings['dbname']}"
    return create_engine(db_url)
def fetch_data():
    combined_data = []
    one_week_ago = datetime.now() - timedelta(weeks=1)
    one_week_ago_str = one_week_ago.strftime('%Y-%m-%d %H:%M:%S')
    for db in DB_SETTINGS:
        try:
            engine = create_db_engine(db)
            query = f"""
                SELECT * FROM admin.log_ddl_cbsdb
                WHERE ddl_date >= '{one_week_ago_str}'
                AND NOT user_name='postgres'
                ORDER BY ddl_date DESC
                LIMIT 15;
            """
            df = pd.read_sql_query(query, engine)
            df['Database'] = db['dbname']
            combined_data.append(df)
        except Exception as e:
            print(f"Error connecting to {db['dbname']}: {e}")
    if combined_data:
        combined_data = [df.dropna(axis=1, how='all') for df in combined_data]
        return pd.concat(combined_data, ignore_index=True) if combined_data else pd.DataFrame()
    return pd.DataFrame()
def send_email(content):
    msg = MIMEMultipart()
    msg["From"] = EMAIL_USER
    msg["To"] = EMAIL_RECEIVER
    msg["Subject"] = "Postgres Cluster Table Changes"
    body = MIMEText(content, "html")
    msg.attach(body)
    try:
        server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
        server.starttls()
        server.login(EMAIL_USER, EMAIL_PASS)
        server.send_message(msg)
        server.quit()
        print("Email sent successfully.")
    except Exception as e:
        print(f"Error sending email: {e}")
def main():
    data = fetch_data()
    if not data.empty:
        email_content = data.to_html()
        send_email(email_content)

Step 3: Monitoring with Prometheus and Grafana

To monitor DDL changes over time, you can integrate Grafana into your setup. Grafana provides a user-friendly interface for visualizing these metrics. First, ensure that you have Prometheus properly configured to collect data from your PostgreSQL database. Once Prometheus is up and running, follow these steps to add a data source and build visualizations in Grafana:

Add the Prometheus Data Source:

  • Open Grafana and navigate to “Configuration” > “Data Sources.”
  • Click “Add Data Source” and select “Prometheus.”
  • Enter your Prometheus server URL and click “Save & Test.”

Build a Dashboard:

  • Go to “Dashboards” > “New Dashboard.”
  • Add panels with queries to display metrics such as DDL events logged per hour, most affected tables, or user activity trends.

Explore Visualizations:

  • Use Grafana’s rich library of visualization tools like bar graphs, heatmaps, and tables to customize your dashboard and monitor DDL trends effectively.

Advantages of Logging Over Triggers

  • Performance: Triggers can introduce slight overhead, especially during high-frequency DDL operations, whereas logging is a native feature optimized for minimal performance impact.
  • Centralized Logs: With PostgreSQL logs, you can manage and analyze data across multiple clusters without modifying individual database schemas.
  • Simplicity: Configuring log-based monitoring is generally simpler than creating and maintaining trigger-based mechanisms.

To enable DDL logging, modify the PostgreSQL configuration (postgresql.conf) to include the following settings:

log_statement = 'ddl'
log_min_duration_statement = 0

After applying these settings and restarting the PostgreSQL server, all DDL statements will be logged. You can then use tools like grep to extract and analyze these logs. For instance, the following Bash script provides a quick way to monitor specific DDL operations:

#!/bin/bash

LOG_FILE="/var/lib/pgsql/data/log/postgresql.log"
OUTPUT_FILE="/var/log/ddl_changes.log"
# Search for DDL commands in the log file
grep -iE "CREATE|ALTER|DROP" "$LOG_FILE" > "$OUTPUT_FILE"
echo "DDL changes logged to $OUTPUT_FILE"

Integration with Monitoring Tools

For a more advanced setup, you can parse logs using ELK (Elasticsearch, Logstash, Kibana) or similar log management tools. These systems provide a centralized platform to collect, analyze, and visualize logs across your entire infrastructure.

Conclusion: Best Practices and Recommendations

The method you choose for monitoring DDL events largely depends on your specific use case and system requirements:

  1. Trigger-Based Monitoring: Ideal for granular logging and additional actions like remote database integration. However, this approach is more suitable for smaller-scale systems or specific auditing needs.
  2. Log-Based Monitoring: A more efficient and scalable solution for large databases or environments with multiple clusters. Combined with tools like Bash scripts, ELK stack, or Prometheus, this method offers broader monitoring capabilities.

When implementing any monitoring system, always balance between performance and comprehensiveness. While triggers provide detailed information directly at the database level, log-based approaches offer flexibility and scalability for complex environments. For the best results, consider combining both methods in critical systems to ensure thorough auditing and operational insight.