Adding New Entity Types¶
This guide walks through the process of adding support for a new PostgreSQL entity type to pgdelta, using indexes as a comprehensive example.
Overview¶
Adding a new entity type to pgdelta involves several steps:
- Model Creation: Define the PostgreSQL object model
- Extraction: Extract objects from PostgreSQL catalogs
- Diffing: Compare objects between catalogs
- Change Types: Define create/drop/alter operations
- SQL Generation: Generate DDL from change objects
- Testing: Comprehensive test coverage
- Documentation: Update entity documentation
Step-by-Step Guide: Adding Index Support¶
Let's walk through adding index support to demonstrate the complete process.
Step 1: Model Creation¶
Create the PostgreSQL model in src/pgdelta/model/pg_index.py
:
"""PostgreSQL index model."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from sqlalchemy import text
from .base import BasePgModel
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@dataclass(frozen=True)
class PgIndex(BasePgModel):
"""PostgreSQL index model."""
# Identity fields (uniquely identify the index)
indexname: str = field(metadata={"tag": "identity"})
schemaname: str = field(metadata={"tag": "identity"})
# Data fields (index properties)
tablename: str = field(metadata={"tag": "data"})
index_definition: str = field(metadata={"tag": "data"})
is_unique: bool = field(metadata={"tag": "data"})
is_primary: bool = field(metadata={"tag": "data"})
is_exclusion: bool = field(metadata={"tag": "data"})
# Internal fields (PostgreSQL internals)
oid: int = field(metadata={"tag": "internal"})
@property
def stable_id(self) -> str:
"""Cross-database portable identifier."""
return f"i:{self.schemaname}.{self.indexname}"
@property
def table_stable_id(self) -> str:
"""Stable ID of the table this index is on."""
return f"t:{self.schemaname}.{self.tablename}"
def extract_indexes(session: Session) -> list[PgIndex]:
"""Extract indexes from PostgreSQL."""
# Use PostgreSQL's information_schema and pg_* tables
query = text("""
SELECT
i.indexname,
i.schemaname,
i.tablename,
pg_get_indexdef(pi.oid) as index_definition,
pi.indisunique as is_unique,
pi.indisprimary as is_primary,
pi.indisexclusion as is_exclusion,
pi.oid
FROM pg_indexes i
JOIN pg_class pc ON pc.relname = i.tablename
JOIN pg_namespace pn ON pn.nspname = i.schemaname AND pn.oid = pc.relnamespace
JOIN pg_index pi ON pi.indexrelid = (
SELECT oid FROM pg_class
WHERE relname = i.indexname
AND relnamespace = pn.oid
)
WHERE i.schemaname NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
ORDER BY i.schemaname, i.tablename, i.indexname
""")
result = session.execute(query)
indexes = []
for row in result:
index = PgIndex(
indexname=row.indexname,
schemaname=row.schemaname,
tablename=row.tablename,
index_definition=row.index_definition,
is_unique=row.is_unique,
is_primary=row.is_primary,
is_exclusion=row.is_exclusion,
oid=row.oid,
)
indexes.append(index)
return indexes
Key Points:
- Use @dataclass(frozen=True)
for immutability
- Tag fields with metadata: identity
, data
, or internal
- Implement stable_id
property for cross-database identification
- Use PostgreSQL system catalogs for extraction
- Filter out system schemas
Step 2: Catalog Integration¶
Update src/pgdelta/catalog.py
to include indexes:
from .model.pg_index import PgIndex, extract_indexes
@dataclass(frozen=True)
class PgCatalog:
"""Immutable PostgreSQL catalog snapshot."""
# ... existing fields ...
indexes: dict[str, PgIndex] # Keyed by stable_id
# ... rest of fields ...
def extract_catalog(session: Session) -> PgCatalog:
"""Extract complete catalog from PostgreSQL session."""
# Extract all object types
namespaces = extract_namespaces(session)
classes = extract_classes(session)
attributes = extract_attributes(session)
constraints = extract_constraints(session)
indexes = extract_indexes(session) # Add this line
# ... other extractions ...
return PgCatalog(
namespaces={ns.stable_id: ns for ns in namespaces},
classes={cls.stable_id: cls for cls in classes},
attributes={attr.stable_id: attr for attr in attributes},
constraints={cons.stable_id: cons for cons in constraints},
indexes={idx.stable_id: idx for idx in indexes}, # Add this line
# ... other collections ...
)
Step 3: Diffing Logic¶
Create diff logic in src/pgdelta/diff/orchestrator.py
:
def diff_catalogs(master: PgCatalog, branch: PgCatalog) -> list[DDL]:
"""Generate changes to transform master to branch."""
changes = []
# ... existing diffs ...
# Add index diffing
changes.extend(diff_indexes(master.indexes, branch.indexes))
# ... rest of diffs ...
return changes
def diff_indexes(
master_indexes: dict[str, PgIndex],
branch_indexes: dict[str, PgIndex],
) -> list[DDL]:
"""Diff indexes between catalogs."""
changes = []
# Find indexes to create (in branch but not master)
for stable_id, branch_index in branch_indexes.items():
if stable_id not in master_indexes:
changes.append(CreateIndex(
stable_id=stable_id,
index=branch_index
))
# Find indexes to drop (in master but not branch)
for stable_id, master_index in master_indexes.items():
if stable_id not in branch_indexes:
changes.append(DropIndex(
stable_id=stable_id,
index=master_index
))
# Find indexes to alter (in both but different)
for stable_id, master_index in master_indexes.items():
if stable_id in branch_indexes:
branch_index = branch_indexes[stable_id]
if not master_index.semantic_equality(branch_index):
# For indexes, we typically drop and recreate
changes.append(DropIndex(
stable_id=stable_id,
index=master_index
))
changes.append(CreateIndex(
stable_id=stable_id,
index=branch_index
))
return changes
Step 4: Change Types¶
Create change types in src/pgdelta/changes/index/
:
src/pgdelta/changes/index/__init__.py
¶
"""Index change types."""
from .create import CreateIndex
from .drop import DropIndex
__all__ = ["CreateIndex", "DropIndex"]
src/pgdelta/changes/index/create.py
¶
"""Create index change type and SQL generation."""
from dataclasses import dataclass
from ...model.pg_index import PgIndex
@dataclass(frozen=True)
class CreateIndex:
"""Create index change."""
stable_id: str # i:namespace.index_name
index: PgIndex
def generate_create_index_sql(change: CreateIndex) -> str:
"""Generate CREATE INDEX SQL from the stored index definition."""
# PostgreSQL's pg_get_indexdef() returns the complete CREATE INDEX statement
index_def = change.index.index_definition
# Ensure it ends with a semicolon for consistency
if not index_def.endswith(";"):
index_def += ";"
return index_def
src/pgdelta/changes/index/drop.py
¶
"""Drop index change type and SQL generation."""
from dataclasses import dataclass
from ...model.pg_index import PgIndex
@dataclass(frozen=True)
class DropIndex:
"""Drop index change."""
stable_id: str # i:namespace.index_name
index: PgIndex
def generate_drop_index_sql(change: DropIndex) -> str:
"""Generate DROP INDEX SQL."""
quoted_schema = f'"{change.index.schemaname}"'
quoted_index = f'"{change.index.indexname}"'
return f"DROP INDEX {quoted_schema}.{quoted_index};"
Step 5: SQL Generation Integration¶
Update src/pgdelta/changes/dispatcher.py
:
from .index import CreateIndex, DropIndex
from .index.create import generate_create_index_sql
from .index.drop import generate_drop_index_sql
def generate_sql(change: DDL) -> str:
"""Generate SQL for a change object using structural pattern matching."""
match change:
# ... existing cases ...
case CreateIndex() as create_index:
return generate_create_index_sql(create_index)
case DropIndex() as drop_index:
return generate_drop_index_sql(drop_index)
# ... rest of cases ...
case _:
msg = f"Unsupported change type: {type(change)}"
raise NotImplementedError(msg)
Step 6: Dependencies¶
Update dependency tracking in src/pgdelta/model/pg_depend.py
:
def extract_depends(
session: Session,
namespaces: list[PgNamespace],
classes: list[PgClass],
constraints: list[PgConstraint],
indexes: list[PgIndex], # Add this parameter
# ... other parameters ...
) -> list[PgDepend]:
"""Extract dependencies from pg_depend."""
# ... existing OID mappings ...
# Map index OIDs (indexes also use pg_class)
for index in indexes:
oid_to_stable_id[("pg_class", index.oid)] = index.stable_id
# ... rest of function ...
Step 7: Testing¶
Create comprehensive tests in tests/
:
Unit Tests: tests/unit/test_index.py
¶
"""Unit tests for index functionality."""
import pytest
from pgdelta.model.pg_index import PgIndex
from pgdelta.changes.index import CreateIndex, DropIndex
from pgdelta.changes.index.create import generate_create_index_sql
from pgdelta.changes.index.drop import generate_drop_index_sql
def test_pg_index_stable_id():
"""Test PgIndex stable_id property."""
index = PgIndex(
indexname="idx_users_email",
schemaname="public",
tablename="users",
index_definition="CREATE INDEX ...",
is_unique=False,
is_primary=False,
is_exclusion=False,
oid=12345,
)
assert index.stable_id == "i:public.idx_users_email"
def test_pg_index_table_stable_id():
"""Test PgIndex table_stable_id property."""
index = PgIndex(
indexname="idx_users_email",
schemaname="public",
tablename="users",
index_definition="CREATE INDEX ...",
is_unique=False,
is_primary=False,
is_exclusion=False,
oid=12345,
)
assert index.table_stable_id == "t:public.users"
def test_create_index_sql_generation():
"""Test CREATE INDEX SQL generation."""
index = PgIndex(
indexname="idx_users_email",
schemaname="public",
tablename="users",
index_definition='CREATE INDEX "idx_users_email" ON "public"."users" ("email")',
is_unique=False,
is_primary=False,
is_exclusion=False,
oid=12345,
)
change = CreateIndex(
stable_id="i:public.idx_users_email",
index=index
)
sql = generate_create_index_sql(change)
assert 'CREATE INDEX "idx_users_email"' in sql
assert 'ON "public"."users"' in sql
assert sql.endswith(";")
def test_drop_index_sql_generation():
"""Test DROP INDEX SQL generation."""
index = PgIndex(
indexname="idx_users_email",
schemaname="public",
tablename="users",
index_definition='CREATE INDEX "idx_users_email" ON "public"."users" ("email")',
is_unique=False,
is_primary=False,
is_exclusion=False,
oid=12345,
)
change = DropIndex(
stable_id="i:public.idx_users_email",
index=index
)
sql = generate_drop_index_sql(change)
assert sql == 'DROP INDEX "public"."idx_users_email";'
Integration Tests: tests/integration/test_index_roundtrip.py
¶
"""Integration tests for index roundtrip fidelity."""
import pytest
from sqlalchemy import text
from pgdelta.catalog import extract_catalog
from pgdelta.changes.dispatcher import generate_sql
def test_index_creation_roundtrip(postgres_session):
"""Test index creation roundtrip fidelity."""
# Create table and index
postgres_session.execute(text("""
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL
);
CREATE INDEX idx_test_email ON test_table (email);
"""))
postgres_session.commit()
# Extract catalog
catalog = extract_catalog(postgres_session)
# Find the index
index = None
for idx in catalog.indexes.values():
if idx.indexname == "idx_test_email":
index = idx
break
assert index is not None
assert index.tablename == "test_table"
assert index.schemaname == "public"
assert not index.is_unique
assert not index.is_primary
def test_unique_index_roundtrip(postgres_session):
"""Test unique index roundtrip fidelity."""
# Create table with unique index
postgres_session.execute(text("""
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL
);
CREATE UNIQUE INDEX idx_test_email_unique ON test_table (email);
"""))
postgres_session.commit()
# Extract catalog
catalog = extract_catalog(postgres_session)
# Find the unique index
index = None
for idx in catalog.indexes.values():
if idx.indexname == "idx_test_email_unique":
index = idx
break
assert index is not None
assert index.is_unique
assert not index.is_primary
def test_index_diff_and_generation(postgres_session):
"""Test index diff and SQL generation."""
# Create initial table
postgres_session.execute(text("""
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL
);
"""))
postgres_session.commit()
# Extract master catalog
master_catalog = extract_catalog(postgres_session)
# Add index
postgres_session.execute(text("""
CREATE INDEX idx_test_email ON test_table (email);
"""))
postgres_session.commit()
# Extract branch catalog
branch_catalog = extract_catalog(postgres_session)
# Generate diff
changes = master_catalog.diff(branch_catalog)
# Should have one CreateIndex change
assert len(changes) == 1
assert isinstance(changes[0], CreateIndex)
# Generate SQL
sql = generate_sql(changes[0])
assert "CREATE INDEX" in sql
assert "idx_test_email" in sql
assert "test_table" in sql
Step 8: Documentation¶
Update entity documentation in docs/entities/indexes.md
:
# Indexes
PostgreSQL indexes improve query performance by providing faster data access paths.
## PostgreSQL Specification
### CREATE INDEX Syntax
```sql
CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] name ]
ON [ ONLY ] table_name [ USING method ]
( { column_name | ( expression ) } [ ... ] )
[ WHERE predicate ]
pgdelta Support¶
✅ Currently Supported¶
- CREATE INDEX (regular and unique)
- DROP INDEX
- All index types (B-tree, Hash, GIN, GiST, SP-GiST, BRIN)
- Partial indexes with WHERE clause
- Functional indexes with expressions
- Multi-column indexes
❌ Not Yet Supported¶
- ALTER INDEX operations
- CONCURRENTLY option (not needed for schema migration)
Usage Examples¶
Basic Index Creation¶
target_sql = """
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email TEXT NOT NULL
);
CREATE INDEX idx_users_email ON users (email);
"""
Implementation Details¶
Index Model¶
@dataclass(frozen=True)
class PgIndex:
indexname: str
schemaname: str
tablename: str
index_definition: str
is_unique: bool
is_primary: bool
is_exclusion: bool
oid: int
SQL Generation¶
def generate_create_index_sql(change: CreateIndex) -> str:
"""Generate CREATE INDEX SQL from stored definition."""
return change.index.index_definition + ";"
### Step 9: Update Public API
Update `src/pgdelta/__init__.py` to expose new types:
```python
from .changes.index import CreateIndex, DropIndex
__all__ = [
# ... existing exports ...
"CreateIndex",
"DropIndex",
# ... rest of exports ...
]
Common Patterns¶
1. Model Design Pattern¶
@dataclass(frozen=True)
class PgEntity(BasePgModel):
"""Template for new entity models."""
# Identity fields (what makes this object unique)
name: str = field(metadata={"tag": "identity"})
schema: str = field(metadata={"tag": "identity"})
# Data fields (object properties that matter for DDL)
property1: str = field(metadata={"tag": "data"})
property2: bool = field(metadata={"tag": "data"})
# Internal fields (PostgreSQL implementation details)
oid: int = field(metadata={"tag": "internal"})
@property
def stable_id(self) -> str:
"""Cross-database portable identifier."""
return f"prefix:{self.schema}.{self.name}"
2. Extraction Pattern¶
def extract_entities(session: Session) -> list[PgEntity]:
"""Extract entities from PostgreSQL."""
# Use appropriate PostgreSQL system catalogs
query = text("""
SELECT
entity_name,
schema_name,
entity_property1,
entity_property2,
entity_oid
FROM pg_entities e
JOIN pg_namespace n ON n.oid = e.schema_oid
WHERE n.nspname NOT IN ('information_schema', 'pg_catalog')
ORDER BY schema_name, entity_name
""")
result = session.execute(query)
entities = []
for row in result:
entity = PgEntity(
name=row.entity_name,
schema=row.schema_name,
property1=row.entity_property1,
property2=row.entity_property2,
oid=row.entity_oid,
)
entities.append(entity)
return entities
3. Diffing Pattern¶
def diff_entities(
master_entities: dict[str, PgEntity],
branch_entities: dict[str, PgEntity],
) -> list[DDL]:
"""Diff entities between catalogs."""
changes = []
# Create entities that exist in branch but not master
for stable_id, branch_entity in branch_entities.items():
if stable_id not in master_entities:
changes.append(CreateEntity(
stable_id=stable_id,
entity=branch_entity
))
# Drop entities that exist in master but not branch
for stable_id, master_entity in master_entities.items():
if stable_id not in branch_entities:
changes.append(DropEntity(
stable_id=stable_id,
entity=master_entity
))
# Alter entities that exist in both but are different
for stable_id, master_entity in master_entities.items():
if stable_id in branch_entities:
branch_entity = branch_entities[stable_id]
if not master_entity.semantic_equality(branch_entity):
changes.append(AlterEntity(
stable_id=stable_id,
old_entity=master_entity,
new_entity=branch_entity
))
return changes
4. SQL Generation Pattern¶
def generate_create_entity_sql(change: CreateEntity) -> str:
"""Generate CREATE ENTITY SQL."""
quoted_schema = f'"{change.entity.schema}"'
quoted_name = f'"{change.entity.name}"'
sql_parts = [f"CREATE ENTITY {quoted_schema}.{quoted_name}"]
# Add entity-specific properties
if change.entity.property1:
sql_parts.append(f"WITH PROPERTY1 = '{change.entity.property1}'")
if change.entity.property2:
sql_parts.append("WITH PROPERTY2")
return " ".join(sql_parts) + ";"
5. Testing Pattern¶
def test_entity_roundtrip(postgres_session):
"""Test entity roundtrip fidelity."""
# Create entity
postgres_session.execute(text("""
CREATE ENTITY test_entity WITH PROPERTY1 = 'value';
"""))
postgres_session.commit()
# Extract catalog
catalog = extract_catalog(postgres_session)
# Verify entity exists
entity_id = "prefix:public.test_entity"
assert entity_id in catalog.entities
# Verify properties
entity = catalog.entities[entity_id]
assert entity.name == "test_entity"
assert entity.schema == "public"
assert entity.property1 == "value"
assert entity.property2 is True
# Test SQL generation
changes = empty_catalog.diff(catalog)
sql = generate_sql(changes[0])
assert "CREATE ENTITY" in sql
assert "test_entity" in sql
Best Practices¶
1. Model Design¶
- Immutable: Always use
@dataclass(frozen=True)
- Field tagging: Tag all fields with appropriate metadata
- Stable IDs: Use consistent, cross-database identifiers
- Inheritance: Extend
BasePgModel
for semantic equality
2. Extraction¶
- System catalogs: Use PostgreSQL's system catalogs
- Schema filtering: Exclude system schemas
- Ordering: Order results for consistent output
- Error handling: Handle missing or invalid objects
3. Diffing¶
- Semantic equality: Use the model's
semantic_equality
method - Change types: Create appropriate change objects
- Completeness: Handle create, drop, and alter operations
4. SQL Generation¶
- Quoting: Always quote identifiers
- Formatting: Use consistent SQL formatting
- Completeness: Generate complete, valid SQL
- Error handling: Validate inputs and handle edge cases
5. Testing¶
- Real PostgreSQL: Use actual PostgreSQL instances
- Roundtrip fidelity: Test extract → diff → generate → apply
- Edge cases: Test unusual but valid scenarios
- Performance: Test with realistic data volumes
Troubleshooting¶
Common Issues¶
Model Not Found¶
# Error: Model not imported in catalog
from .model.pg_entity import PgEntity, extract_entities
# Solution: Add to catalog.py imports
SQL Generation Errors¶
# Error: Change type not handled in dispatcher
match change:
case CreateEntity() as create_entity:
return generate_create_entity_sql(create_entity)
# Solution: Add case to generate_sql() function
Dependency Issues¶
# Error: Objects created in wrong order
# Solution: Ensure dependencies are properly tracked in pg_depend.py
# Map entity OIDs
for entity in entities:
oid_to_stable_id[("pg_entity", entity.oid)] = entity.stable_id
Validation Checklist¶
- [ ] Model extends
BasePgModel
- [ ] All fields have appropriate metadata tags
- [ ]
stable_id
property is implemented - [ ] Extraction function queries system catalogs
- [ ] Catalog integration includes new entity type
- [ ] Diff logic handles create/drop/alter operations
- [ ] Change types are immutable dataclasses
- [ ] SQL generation produces valid DDL
- [ ] Dispatcher handles all change types
- [ ] Dependencies are tracked in pg_depend.py
- [ ] Comprehensive unit tests exist
- [ ] Integration tests with real PostgreSQL
- [ ] Roundtrip fidelity tests pass
- [ ] Documentation is updated
- [ ] Public API exports new types
Summary¶
Adding a new entity type to pgdelta requires:
- Defining the model with proper field metadata
- Implementing extraction from PostgreSQL catalogs
- Creating diff logic to detect changes
- Defining change types for operations
- Implementing SQL generation for each operation
- Adding dependency tracking for proper ordering
- Writing comprehensive tests with real PostgreSQL
- Updating documentation and public API
This systematic approach ensures that new entity types integrate seamlessly with pgdelta's architecture while maintaining correctness, performance, and maintainability.