Python API¶
pgdelta provides a Python API for programmatic schema diffing and DDL generation.
Installation¶
Basic Usage¶
from pgdelta import PgCatalog, generate_sql
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
# Connect to databases
source_engine = create_engine("postgresql://user:pass@localhost/source_db")
target_engine = create_engine("postgresql://user:pass@localhost/target_db")
with Session(source_engine) as source_session, Session(target_engine) as target_session:
# Extract schemas
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
# Generate migration from target to source
changes = target_catalog.diff(source_catalog)
# Generate SQL statements
sql_statements = [generate_sql(change) for change in changes]
for sql in sql_statements:
print(sql)
API Reference¶
Core Functions¶
pgdelta.catalog.extract_catalog(session)
¶
Extract catalog from PostgreSQL database session.
Source code in src/pgdelta/catalog.py
Classes¶
pgdelta.PgCatalog
dataclass
¶
Immutable PostgreSQL catalog snapshot.
Source code in src/pgdelta/catalog.py
diff(branch)
¶
Generate changes to transform this catalog to the branch catalog.
get_class_attributes(class_stable_id)
¶
Get all attributes for a class (table/view/etc).
Source code in src/pgdelta/catalog.py
semantically_equals(other)
¶
Check if two catalogs are semantically equal.
This compares the logical structure of the database catalogs, ignoring implementation details like OIDs, file nodes, and statistics.
Source code in src/pgdelta/catalog.py
Functions¶
pgdelta.generate_sql(change)
¶
Generate SQL for a DDL change.
Source code in src/pgdelta/changes/dispatcher.py
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
|
Exceptions¶
pgdelta.PgDeltaError
¶
Bases: Exception
Base exception class for all pgdelta-specific exceptions.
This serves as the root of the exception hierarchy and allows users to catch all pgdelta-related exceptions with a single except clause.
Source code in src/pgdelta/exceptions.py
pgdelta.DependencyResolutionError
¶
Bases: PgDeltaError
Exception raised when dependency resolution fails.
This exception is raised when the dependency resolver encounters an unresolvable situation, such as cyclic dependencies between database objects.
Source code in src/pgdelta/exceptions.py
pgdelta.CyclicDependencyError
¶
Bases: DependencyResolutionError
Exception raised when a cyclic dependency is detected.
This specific exception is raised when the dependency resolver detects a cycle in the dependency graph that cannot be resolved through standard topological sorting.
Source code in src/pgdelta/exceptions.py
__init__(message='Cyclic dependency detected in DDL operations')
¶
Initialize the cyclic dependency error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
Custom error message describing the cyclic dependency |
'Cyclic dependency detected in DDL operations'
|
Source code in src/pgdelta/exceptions.py
Advanced Usage¶
Custom Connection Handling¶
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from pgdelta.catalog import extract_catalog
def extract_from_database(connection_string: str) -> PgCatalog:
"""Extract catalog from a database connection string."""
engine = create_engine(connection_string)
with Session(engine) as session:
return extract_catalog(session)
# Usage
source_catalog = extract_from_database("postgresql://user:pass@localhost/db1")
target_catalog = extract_from_database("postgresql://user:pass@localhost/db2")
Filtering Changes¶
from pgdelta.changes import CreateTable, DropTable
# Extract catalogs
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
# Get all changes
all_changes = target_catalog.diff(source_catalog)
# Filter only table creation changes
table_creates = [change for change in all_changes if isinstance(change, CreateTable)]
# Filter only table drops
table_drops = [change for change in all_changes if isinstance(change, DropTable)]
# Generate SQL for specific changes
create_sql = [generate_sql(change) for change in table_creates]
Semantic Equality Checking¶
# Check if two catalogs are semantically identical
if source_catalog.semantically_equals(target_catalog):
print("Schemas are identical")
else:
print("Schemas differ")
changes = source_catalog.diff(target_catalog)
print(f"Found {len(changes)} changes")
Working with Individual Models¶
from pgdelta.model import PgClass, PgAttribute
# Access individual tables
for table in source_catalog.tables:
print(f"Table: {table.schema}.{table.name}")
# Access columns
for column in table.columns:
print(f" Column: {column.name} ({column.type_name})")
# Check column properties
if not column.is_nullable:
print(f" NOT NULL")
if column.has_default:
print(f" DEFAULT {column.default}")
Error Handling¶
from pgdelta import DependencyResolutionError, CyclicDependencyError
try:
changes = source_catalog.diff(target_catalog)
sql_statements = [generate_sql(change) for change in changes]
except DependencyResolutionError as e:
print(f"Could not resolve dependencies: {e}")
except CyclicDependencyError as e:
print(f"Cyclic dependency detected: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Integration Examples¶
Flask Application¶
from flask import Flask, request, jsonify
from pgdelta import extract_catalog, generate_sql
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
app = Flask(__name__)
@app.route('/diff', methods=['POST'])
def generate_diff():
"""Generate schema diff between two databases."""
data = request.json
source_url = data['source_url']
target_url = data['target_url']
try:
source_engine = create_engine(source_url)
target_engine = create_engine(target_url)
with Session(source_engine) as source_session, \
Session(target_engine) as target_session:
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
changes = source_catalog.diff(target_catalog)
sql_statements = [generate_sql(change) for change in changes]
return jsonify({
'success': True,
'sql': sql_statements,
'change_count': len(changes)
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
Django Management Command¶
from django.core.management.base import BaseCommand
from django.db import connection
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
class Command(BaseCommand):
help = 'Generate schema diff'
def add_arguments(self, parser):
parser.add_argument('--target-url', required=True)
parser.add_argument('--output', required=False)
def handle(self, *args, **options):
# Use Django's database connection for source
django_url = f"postgresql://{connection.settings_dict['USER']}:" \
f"{connection.settings_dict['PASSWORD']}@" \
f"{connection.settings_dict['HOST']}:" \
f"{connection.settings_dict['PORT']}/" \
f"{connection.settings_dict['NAME']}"
source_engine = create_engine(django_url)
target_engine = create_engine(options['target_url'])
with Session(source_engine) as source_session, \
Session(target_engine) as target_session:
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
changes = source_catalog.diff(target_catalog)
if not changes:
self.stdout.write("No changes detected")
return
sql_statements = [generate_sql(change) for change in changes]
if options['output']:
with open(options['output'], 'w') as f:
f.write('\n'.join(sql_statements))
self.stdout.write(f"Wrote {len(sql_statements)} statements to {options['output']}")
else:
for sql in sql_statements:
self.stdout.write(sql)
Async Usage¶
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from pgdelta.catalog import extract_catalog
async def async_diff():
"""Example of using pgdelta with async SQLAlchemy."""
# Note: extract_catalog currently requires sync sessions
# This is a pattern for working with async engines
source_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db1")
target_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db2")
# Convert to sync for extraction
source_sync = source_engine.sync_engine
target_sync = target_engine.sync_engine
with Session(source_sync) as source_session, \
Session(target_sync) as target_session:
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
changes = source_catalog.diff(target_catalog)
sql_statements = [generate_sql(change) for change in changes]
return sql_statements
# Usage
async def main():
statements = await async_diff()
for sql in statements:
print(sql)
asyncio.run(main())
Testing¶
Unit Testing with pytest¶
import pytest
from pgdelta import PgCatalog, generate_sql
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from testcontainers.postgres import PostgresContainer
@pytest.fixture
def postgres_container():
"""Pytest fixture providing a PostgreSQL container."""
with PostgresContainer("postgres:17") as container:
yield container
def test_table_creation_diff(postgres_container):
"""Test that table creation is detected correctly."""
# Get connection URL
url = postgres_container.get_connection_url()
engine = create_engine(url)
with Session(engine) as session:
# Create initial schema
session.execute(text("CREATE SCHEMA test"))
session.commit()
# Extract empty catalog
empty_catalog = extract_catalog(session)
# Add a table
session.execute(text("""
CREATE TABLE test.users (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL
)
"""))
session.commit()
# Extract catalog with table
table_catalog = extract_catalog(session)
# Generate diff
changes = empty_catalog.diff(table_catalog)
# Should have one CREATE TABLE change
assert len(changes) == 1
assert "CREATE TABLE" in generate_sql(changes[0])
assert "test" in generate_sql(changes[0])
assert "users" in generate_sql(changes[0])
Integration Testing¶
from pgdelta.catalog import extract_catalog
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
def test_roundtrip_fidelity():
"""Test that Extract → Diff → Generate → Apply produces identical schemas."""
# Setup two identical databases
source_engine = create_engine("postgresql://user:pass@localhost/source")
target_engine = create_engine("postgresql://user:pass@localhost/target")
with Session(source_engine) as source_session, \
Session(target_engine) as target_session:
# Apply initial schema to source
source_session.execute(text("""
CREATE SCHEMA app;
CREATE TABLE app.users (
id SERIAL PRIMARY KEY,
email TEXT UNIQUE NOT NULL
);
"""))
source_session.commit()
# Extract catalogs
source_catalog = extract_catalog(source_session)
target_catalog = extract_catalog(target_session)
# Generate migration
changes = target_catalog.diff(source_catalog)
sql_statements = [generate_sql(change) for change in changes]
# Apply migration to target
for sql in sql_statements:
target_session.execute(text(sql))
target_session.commit()
# Extract final catalog
final_catalog = extract_catalog(target_session)
# Should be semantically identical
assert source_catalog.semantically_equals(final_catalog)
Performance Considerations¶
Large Schema Handling¶
# For very large schemas, consider extracting only specific schemas
from pgdelta.catalog import extract_catalog
# Extract only specific schemas (not yet implemented, but planned)
# catalog = extract_catalog(session, schema_filter=['public', 'app'])
# Current approach - extract all and filter
catalog = extract_catalog(session)
filtered_tables = [t for t in catalog.tables if t.schema in ['public', 'app']]
Best Practices¶
- Use context managers for database connections
- Handle exceptions appropriately for production code
- Test with real databases using testcontainers
- Validate generated SQL before applying to production
- Use semantic equality to verify migrations
- Filter changes when you only need specific types
- Test with representative data before production use