Skip to content

Python API

pgdelta provides a Python API for programmatic schema diffing and DDL generation.

Installation

pip install pgdelta

Basic Usage

from pgdelta import PgCatalog, generate_sql
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Connect to databases
source_engine = create_engine("postgresql://user:pass@localhost/source_db")
target_engine = create_engine("postgresql://user:pass@localhost/target_db")

with Session(source_engine) as source_session, Session(target_engine) as target_session:
    # Extract schemas
    source_catalog = extract_catalog(source_session)
    target_catalog = extract_catalog(target_session)

    # Generate migration from target to source
    changes = target_catalog.diff(source_catalog)

    # Generate SQL statements
    sql_statements = [generate_sql(change) for change in changes]

    for sql in sql_statements:
        print(sql)

API Reference

Core Functions

pgdelta.catalog.extract_catalog(session)

Extract catalog from PostgreSQL database session.

Source code in src/pgdelta/catalog.py
def extract_catalog(session: Session) -> PgCatalog:
    """Extract catalog from PostgreSQL database session."""
    # Extract namespaces (schemas)
    namespaces = extract_namespaces(session)

    # Extract classes (tables, views, etc.)
    classes = extract_classes(session)

    # Extract attributes (columns)
    attributes = extract_attributes(session)

    # Extract constraints
    constraints = extract_constraints(session)

    # Extract indexes
    indexes = extract_indexes(session)

    # Extract sequences
    sequences = extract_sequences(session)

    # Extract RLS policies
    policies = extract_policies(session)

    # Extract procedures/functions
    procedures = extract_procedures(session)

    # Extract triggers
    triggers = extract_triggers(session)

    # Extract types
    namespace_oids = [ns.oid for ns in namespaces]
    types = extract_types(session, namespace_oids)

    # Extract dependencies from pg_depend
    depends = extract_depends(
        session,
        namespaces,
        classes,
        constraints,
        indexes,
        sequences,
        policies,
        procedures,
        triggers,
        types,
    )

    # Extract view dependencies from pg_rewrite and add them to depends list
    view_deps = extract_view_dependencies_as_pg_depend(session, classes)
    depends.extend(view_deps)

    return catalog(
        namespaces=namespaces,
        classes=classes,
        attributes=attributes,
        constraints=constraints,
        indexes=indexes,
        sequences=sequences,
        policies=policies,
        procedures=procedures,
        triggers=triggers,
        types=types,
        depends=depends,
    )

Classes

pgdelta.PgCatalog dataclass

Immutable PostgreSQL catalog snapshot.

Source code in src/pgdelta/catalog.py
@dataclass(frozen=True)
class PgCatalog:
    """Immutable PostgreSQL catalog snapshot."""

    namespaces: dict[str, PgNamespace]  # Keyed by stable_id (nspname)
    classes: dict[str, PgClass]  # Keyed by stable_id (relkind:namespace.relname)
    attributes: dict[str, PgAttribute]  # Keyed by stable_id (namespace.table.column)
    constraints: dict[
        str, PgConstraint
    ]  # Keyed by stable_id (namespace.table.constraint_name)
    indexes: dict[str, PgIndex]  # Keyed by stable_id (i:namespace.index_name)
    sequences: dict[str, PgSequence]  # Keyed by stable_id (S:namespace.seqname)
    policies: dict[str, PgPolicy]  # Keyed by stable_id (P:namespace.table.policy)
    procedures: dict[
        str, PgProc
    ]  # Keyed by stable_id (function:namespace.name(argtypes))
    triggers: dict[str, PgTrigger]  # Keyed by stable_id (trigger:namespace.table.name)
    types: dict[str, PgType]  # Keyed by stable_id (type:namespace.typename)
    depends: list[PgDepend]  # All dependencies

    def diff(self, branch: PgCatalog) -> list[DDL]:
        """Generate changes to transform this catalog to the branch catalog."""
        from .diff.orchestrator import diff_catalogs

        return diff_catalogs(self, branch)

    def get_class_attributes(self, class_stable_id: str) -> list[PgAttribute]:
        """Get all attributes for a class (table/view/etc)."""
        attributes = []
        for attr in self.attributes.values():
            if attr.class_stable_id == class_stable_id:
                attributes.append(attr)

        # Sort by column number for consistent ordering
        return sorted(attributes, key=lambda col: col.attnum)

    def semantically_equals(self, other: PgCatalog) -> bool:
        """
        Check if two catalogs are semantically equal.

        This compares the logical structure of the database catalogs,
        ignoring implementation details like OIDs, file nodes, and statistics.
        """

        def _compare(left: BasePgModel | None, right: BasePgModel | None) -> bool:
            if left is None or right is None:
                return False
            return left.semantic_equality(right)

        def _key(item: BasePgModel) -> str:
            return item.__class__.__name__ + item.stable_id

        left_entities: list[BasePgModel] = []
        right_entities: list[BasePgModel] = []
        for field in fields(self):
            # We don't need to compare dependencies
            if field.name != "depends":
                field_values = getattr(self, field.name).values()
                other_field_values = getattr(other, field.name).values()

                left_entities += field_values
                right_entities += other_field_values

        return (
            flu(left_entities)
            .join_full(
                right_entities,
                key=_key,
                other_key=_key,
            )
            .map(lambda x: _compare(x[0], x[1]))
            .filter(lambda x: not x)
            .first(default=None)
        ) is None

diff(branch)

Generate changes to transform this catalog to the branch catalog.

Source code in src/pgdelta/catalog.py
def diff(self, branch: PgCatalog) -> list[DDL]:
    """Generate changes to transform this catalog to the branch catalog."""
    from .diff.orchestrator import diff_catalogs

    return diff_catalogs(self, branch)

get_class_attributes(class_stable_id)

Get all attributes for a class (table/view/etc).

Source code in src/pgdelta/catalog.py
def get_class_attributes(self, class_stable_id: str) -> list[PgAttribute]:
    """Get all attributes for a class (table/view/etc)."""
    attributes = []
    for attr in self.attributes.values():
        if attr.class_stable_id == class_stable_id:
            attributes.append(attr)

    # Sort by column number for consistent ordering
    return sorted(attributes, key=lambda col: col.attnum)

semantically_equals(other)

Check if two catalogs are semantically equal.

This compares the logical structure of the database catalogs, ignoring implementation details like OIDs, file nodes, and statistics.

Source code in src/pgdelta/catalog.py
def semantically_equals(self, other: PgCatalog) -> bool:
    """
    Check if two catalogs are semantically equal.

    This compares the logical structure of the database catalogs,
    ignoring implementation details like OIDs, file nodes, and statistics.
    """

    def _compare(left: BasePgModel | None, right: BasePgModel | None) -> bool:
        if left is None or right is None:
            return False
        return left.semantic_equality(right)

    def _key(item: BasePgModel) -> str:
        return item.__class__.__name__ + item.stable_id

    left_entities: list[BasePgModel] = []
    right_entities: list[BasePgModel] = []
    for field in fields(self):
        # We don't need to compare dependencies
        if field.name != "depends":
            field_values = getattr(self, field.name).values()
            other_field_values = getattr(other, field.name).values()

            left_entities += field_values
            right_entities += other_field_values

    return (
        flu(left_entities)
        .join_full(
            right_entities,
            key=_key,
            other_key=_key,
        )
        .map(lambda x: _compare(x[0], x[1]))
        .filter(lambda x: not x)
        .first(default=None)
    ) is None

Functions

pgdelta.generate_sql(change)

Generate SQL for a DDL change.

Source code in src/pgdelta/changes/dispatcher.py
def generate_sql(change: DDL) -> str:
    """Generate SQL for a DDL change."""
    sql = ""

    match change:
        case CreateSchema():
            sql = generate_create_schema_sql(change)
        case DropSchema():
            sql = generate_drop_schema_sql(change)
        case CreateTable():
            sql = generate_create_table_sql(change)
        case DropTable():
            sql = generate_drop_table_sql(change)
        case AlterTable():
            sql = generate_alter_table_sql(change)
        case CreateView():
            sql = generate_create_view_sql(change)
        case DropView():
            sql = generate_drop_view_sql(change)
        case ReplaceView():
            sql = generate_replace_view_sql(change)
        case CreateMaterializedView():
            sql = generate_create_materialized_view_sql(change)
        case DropMaterializedView():
            sql = generate_drop_materialized_view_sql(change)
        case ReplaceMaterializedView():
            sql = generate_replace_materialized_view_sql(change)
        case CreateConstraint():
            sql = generate_create_constraint_sql(change)
        case DropConstraint():
            sql = generate_drop_constraint_sql(change)
        case AlterConstraint():
            sql = generate_alter_constraint_sql(change)
        case CreateFunction():
            sql = generate_create_function_sql(change)
        case DropFunction():
            sql = generate_drop_function_sql(change)
        case ReplaceFunction():
            sql = generate_replace_function_sql(change)
        case CreateIndex():
            sql = generate_create_index_sql(change)
        case DropIndex():
            sql = generate_drop_index_sql(change)
        case AlterIndex():
            raise NotImplementedError("ALTER INDEX operations are not yet implemented")
        case CreateSequence():
            sql = generate_create_sequence_sql(change)
        case DropSequence():
            sql = generate_drop_sequence_sql(change)
        case AlterSequence():
            sql = generate_alter_sequence_sql(change)
        case CreatePolicy():
            sql = generate_create_policy_sql(change)
        case DropPolicy():
            sql = generate_drop_policy_sql(change)
        case AlterPolicy():
            sql = generate_alter_policy_sql(change)
        case RenamePolicyTo():
            sql = generate_rename_policy_sql(change)
        case CreateTrigger():
            sql = generate_create_trigger_sql(change)
        case DropTrigger():
            sql = generate_drop_trigger_sql(change)
        case CreateType():
            sql = generate_create_type_sql(change)
        case DropType():
            sql = generate_drop_type_sql(change)
        case (
            AlterTypeOwnerTo()
            | AlterTypeRename()
            | AlterTypeSetSchema()
            | AlterTypeAddAttribute()
            | AlterTypeDropAttribute()
            | AlterTypeAlterAttribute()
            | AlterTypeAddValue()
            | AlterTypeRenameValue()
        ):
            sql = generate_alter_type_sql(change)
        case _:
            assert_never(change)

    # Log SQL generation
    logger.debug(
        "sql.generated",
        extra={
            "change_type": type(change).__name__,
            "stable_id": getattr(change, "stable_id", None),
            "sql": sql,
        },
    )

    return sql

Exceptions

pgdelta.PgDeltaError

Bases: Exception

Base exception class for all pgdelta-specific exceptions.

This serves as the root of the exception hierarchy and allows users to catch all pgdelta-related exceptions with a single except clause.

Source code in src/pgdelta/exceptions.py
class PgDeltaError(Exception):
    """
    Base exception class for all pgdelta-specific exceptions.

    This serves as the root of the exception hierarchy and allows users
    to catch all pgdelta-related exceptions with a single except clause.
    """

    pass

pgdelta.DependencyResolutionError

Bases: PgDeltaError

Exception raised when dependency resolution fails.

This exception is raised when the dependency resolver encounters an unresolvable situation, such as cyclic dependencies between database objects.

Source code in src/pgdelta/exceptions.py
class DependencyResolutionError(PgDeltaError):
    """
    Exception raised when dependency resolution fails.

    This exception is raised when the dependency resolver encounters
    an unresolvable situation, such as cyclic dependencies between
    database objects.
    """

    pass

pgdelta.CyclicDependencyError

Bases: DependencyResolutionError

Exception raised when a cyclic dependency is detected.

This specific exception is raised when the dependency resolver detects a cycle in the dependency graph that cannot be resolved through standard topological sorting.

Source code in src/pgdelta/exceptions.py
class CyclicDependencyError(DependencyResolutionError):
    """
    Exception raised when a cyclic dependency is detected.

    This specific exception is raised when the dependency resolver
    detects a cycle in the dependency graph that cannot be resolved
    through standard topological sorting.
    """

    def __init__(self, message: str = "Cyclic dependency detected in DDL operations"):
        """
        Initialize the cyclic dependency error.

        Args:
            message: Custom error message describing the cyclic dependency
        """
        super().__init__(message)
        self.message = message

__init__(message='Cyclic dependency detected in DDL operations')

Initialize the cyclic dependency error.

Parameters:

Name Type Description Default
message str

Custom error message describing the cyclic dependency

'Cyclic dependency detected in DDL operations'
Source code in src/pgdelta/exceptions.py
def __init__(self, message: str = "Cyclic dependency detected in DDL operations"):
    """
    Initialize the cyclic dependency error.

    Args:
        message: Custom error message describing the cyclic dependency
    """
    super().__init__(message)
    self.message = message

Advanced Usage

Custom Connection Handling

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from pgdelta.catalog import extract_catalog

def extract_from_database(connection_string: str) -> PgCatalog:
    """Extract catalog from a database connection string."""
    engine = create_engine(connection_string)
    with Session(engine) as session:
        return extract_catalog(session)

# Usage
source_catalog = extract_from_database("postgresql://user:pass@localhost/db1")
target_catalog = extract_from_database("postgresql://user:pass@localhost/db2")

Filtering Changes

from pgdelta.changes import CreateTable, DropTable

# Extract catalogs
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)

# Get all changes
all_changes = target_catalog.diff(source_catalog)

# Filter only table creation changes
table_creates = [change for change in all_changes if isinstance(change, CreateTable)]

# Filter only table drops
table_drops = [change for change in all_changes if isinstance(change, DropTable)]

# Generate SQL for specific changes
create_sql = [generate_sql(change) for change in table_creates]

Semantic Equality Checking

# Check if two catalogs are semantically identical
if source_catalog.semantically_equals(target_catalog):
    print("Schemas are identical")
else:
    print("Schemas differ")
    changes = source_catalog.diff(target_catalog)
    print(f"Found {len(changes)} changes")

Working with Individual Models

from pgdelta.model import PgClass, PgAttribute

# Access individual tables
for table in source_catalog.tables:
    print(f"Table: {table.schema}.{table.name}")

    # Access columns
    for column in table.columns:
        print(f"  Column: {column.name} ({column.type_name})")

        # Check column properties
        if not column.is_nullable:
            print(f"    NOT NULL")
        if column.has_default:
            print(f"    DEFAULT {column.default}")

Error Handling

from pgdelta import DependencyResolutionError, CyclicDependencyError

try:
    changes = source_catalog.diff(target_catalog)
    sql_statements = [generate_sql(change) for change in changes]

except DependencyResolutionError as e:
    print(f"Could not resolve dependencies: {e}")

except CyclicDependencyError as e:
    print(f"Cyclic dependency detected: {e}")

except Exception as e:
    print(f"Unexpected error: {e}")

Integration Examples

Flask Application

from flask import Flask, request, jsonify
from pgdelta import extract_catalog, generate_sql
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Flask(__name__)

@app.route('/diff', methods=['POST'])
def generate_diff():
    """Generate schema diff between two databases."""
    data = request.json
    source_url = data['source_url']
    target_url = data['target_url']

    try:
        source_engine = create_engine(source_url)
        target_engine = create_engine(target_url)

        with Session(source_engine) as source_session, \
             Session(target_engine) as target_session:

            source_catalog = extract_catalog(source_session)
            target_catalog = extract_catalog(target_session)

            changes = source_catalog.diff(target_catalog)
            sql_statements = [generate_sql(change) for change in changes]

            return jsonify({
                'success': True,
                'sql': sql_statements,
                'change_count': len(changes)
            })

    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        }), 500

Django Management Command

from django.core.management.base import BaseCommand
from django.db import connection
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

class Command(BaseCommand):
    help = 'Generate schema diff'

    def add_arguments(self, parser):
        parser.add_argument('--target-url', required=True)
        parser.add_argument('--output', required=False)

    def handle(self, *args, **options):
        # Use Django's database connection for source
        django_url = f"postgresql://{connection.settings_dict['USER']}:" \
                    f"{connection.settings_dict['PASSWORD']}@" \
                    f"{connection.settings_dict['HOST']}:" \
                    f"{connection.settings_dict['PORT']}/" \
                    f"{connection.settings_dict['NAME']}"

        source_engine = create_engine(django_url)
        target_engine = create_engine(options['target_url'])

        with Session(source_engine) as source_session, \
             Session(target_engine) as target_session:

            source_catalog = extract_catalog(source_session)
            target_catalog = extract_catalog(target_session)

            changes = source_catalog.diff(target_catalog)

            if not changes:
                self.stdout.write("No changes detected")
                return

            sql_statements = [generate_sql(change) for change in changes]

            if options['output']:
                with open(options['output'], 'w') as f:
                    f.write('\n'.join(sql_statements))
                self.stdout.write(f"Wrote {len(sql_statements)} statements to {options['output']}")
            else:
                for sql in sql_statements:
                    self.stdout.write(sql)

Async Usage

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from pgdelta.catalog import extract_catalog

async def async_diff():
    """Example of using pgdelta with async SQLAlchemy."""

    # Note: extract_catalog currently requires sync sessions
    # This is a pattern for working with async engines

    source_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db1")
    target_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db2")

    # Convert to sync for extraction
    source_sync = source_engine.sync_engine
    target_sync = target_engine.sync_engine

    with Session(source_sync) as source_session, \
         Session(target_sync) as target_session:

        source_catalog = extract_catalog(source_session)
        target_catalog = extract_catalog(target_session)

        changes = source_catalog.diff(target_catalog)
        sql_statements = [generate_sql(change) for change in changes]

        return sql_statements

# Usage
async def main():
    statements = await async_diff()
    for sql in statements:
        print(sql)

asyncio.run(main())

Testing

Unit Testing with pytest

import pytest
from pgdelta import PgCatalog, generate_sql
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from testcontainers.postgres import PostgresContainer

@pytest.fixture
def postgres_container():
    """Pytest fixture providing a PostgreSQL container."""
    with PostgresContainer("postgres:17") as container:
        yield container

def test_table_creation_diff(postgres_container):
    """Test that table creation is detected correctly."""

    # Get connection URL
    url = postgres_container.get_connection_url()
    engine = create_engine(url)

    with Session(engine) as session:
        # Create initial schema
        session.execute(text("CREATE SCHEMA test"))
        session.commit()

        # Extract empty catalog
        empty_catalog = extract_catalog(session)

        # Add a table
        session.execute(text("""
            CREATE TABLE test.users (
                id SERIAL PRIMARY KEY,
                email TEXT NOT NULL
            )
        """))
        session.commit()

        # Extract catalog with table
        table_catalog = extract_catalog(session)

        # Generate diff
        changes = empty_catalog.diff(table_catalog)

        # Should have one CREATE TABLE change
        assert len(changes) == 1
        assert "CREATE TABLE" in generate_sql(changes[0])
        assert "test" in generate_sql(changes[0])
        assert "users" in generate_sql(changes[0])

Integration Testing

from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

def test_roundtrip_fidelity():
    """Test that Extract → Diff → Generate → Apply produces identical schemas."""

    # Setup two identical databases
    source_engine = create_engine("postgresql://user:pass@localhost/source")
    target_engine = create_engine("postgresql://user:pass@localhost/target")

    with Session(source_engine) as source_session, \
         Session(target_engine) as target_session:

        # Apply initial schema to source
        source_session.execute(text("""
            CREATE SCHEMA app;
            CREATE TABLE app.users (
                id SERIAL PRIMARY KEY,
                email TEXT UNIQUE NOT NULL
            );
        """))
        source_session.commit()

        # Extract catalogs
        source_catalog = extract_catalog(source_session)
        target_catalog = extract_catalog(target_session)

        # Generate migration
        changes = target_catalog.diff(source_catalog)
        sql_statements = [generate_sql(change) for change in changes]

        # Apply migration to target
        for sql in sql_statements:
            target_session.execute(text(sql))
        target_session.commit()

        # Extract final catalog
        final_catalog = extract_catalog(target_session)

        # Should be semantically identical
        assert source_catalog.semantically_equals(final_catalog)

Performance Considerations

Large Schema Handling

# For very large schemas, consider extracting only specific schemas
from pgdelta.catalog import extract_catalog

# Extract only specific schemas (not yet implemented, but planned)
# catalog = extract_catalog(session, schema_filter=['public', 'app'])

# Current approach - extract all and filter
catalog = extract_catalog(session)
filtered_tables = [t for t in catalog.tables if t.schema in ['public', 'app']]

Best Practices

  1. Use context managers for database connections
  2. Handle exceptions appropriately for production code
  3. Test with real databases using testcontainers
  4. Validate generated SQL before applying to production
  5. Use semantic equality to verify migrations
  6. Filter changes when you only need specific types
  7. Test with representative data before production use