Skip to content

SQLAlchemy Cheatsheet


Installation & Setup

Install

pip install sqlalchemy
pip install alembic  # For migrations

Basic Imports

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, DateTime, Boolean, Text, Float, Numeric
from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Session
from sqlalchemy import select, update, delete, insert, func, and_, or_, not_
from datetime import datetime

Engine & Session Setup

# SQLite
engine = create_engine("sqlite:///example.db", echo=True)

# PostgreSQL
engine = create_engine("postgresql://user:password@localhost/dbname")

# MySQL
engine = create_engine("mysql+pymysql://user:password@localhost/dbname")

# Session factory
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)

# Create session
session = SessionLocal()

Declarative Base & Models

Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    email = Column(String(255), unique=True, nullable=False, index=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

Create Tables

Base.metadata.create_all(engine)

Basic CRUD Operations

Insert (Create)

# Single insert
user = User(name="Alice", email="alice@example.com")
session.add(user)
session.commit()
session.refresh(user)  # Get updated data from DB

# Bulk insert
users = [
    User(name="Bob", email="bob@example.com"),
    User(name="Charlie", email="charlie@example.com")
]
session.add_all(users)
session.commit()

# Insert with returning (2.0 style)
stmt = insert(User).values(name="Dave", email="dave@example.com").returning(User)
result = session.execute(stmt)
new_user = result.scalar_one()
session.commit()

Read (Query)

# Get all
users = session.query(User).all()

# Get by ID
user = session.query(User).get(1)
user = session.query(User).filter_by(id=1).first()

# Get one (raises if not found or multiple)
user = session.query(User).filter_by(email="alice@example.com").one()

# Get first
user = session.query(User).first()

# Using select() (2.0 style)
stmt = select(User).where(User.id == 1)
user = session.execute(stmt).scalar_one()

# Scalar results
stmt = select(User)
users = session.execute(stmt).scalars().all()

Update

# Update single object
user = session.query(User).filter_by(id=1).first()
user.email = "newemail@example.com"
session.commit()

# Bulk update
session.query(User).filter(User.is_active == False).update({"is_active": True})
session.commit()

# Update with returning (2.0 style)
stmt = (
    update(User)
    .where(User.id == 1)
    .values(email="updated@example.com")
    .returning(User)
)
result = session.execute(stmt)
session.commit()

Delete

# Delete single object
user = session.query(User).filter_by(id=1).first()
session.delete(user)
session.commit()

# Bulk delete
session.query(User).filter(User.is_active == False).delete()
session.commit()

# Delete with 2.0 style
stmt = delete(User).where(User.id == 1)
session.execute(stmt)
session.commit()

Querying

Basic Filtering

# filter_by (keyword arguments)
users = session.query(User).filter_by(name="Alice").all()

# filter (expressions)
users = session.query(User).filter(User.name == "Alice").all()

# Multiple conditions
users = session.query(User).filter(
    User.name == "Alice",
    User.is_active == True
).all()

Ordering

# Ascending
users = session.query(User).order_by(User.name.asc()).all()

# Descending
users = session.query(User).order_by(User.created_at.desc()).all()

# Multiple orders
users = session.query(User).order_by(
    User.name.asc(),
    User.created_at.desc()
).all()

# Nulls first/last
users = session.query(User).order_by(User.name.asc().nulls_last()).all()

Limiting & Offsetting

# Limit
users = session.query(User).limit(10).all()

# Offset
users = session.query(User).offset(20).all()

# Pagination
page = 2
per_page = 10
users = session.query(User).limit(per_page).offset((page - 1) * per_page).all()

# Slice notation
users = session.query(User)[10:20]  # Items 10-19

Distinct

names = session.query(User.name).distinct().all()

Relationships

One-to-Many

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    addresses = relationship("Address", back_populates="user", cascade="all, delete-orphan")

class Address(Base):
    __tablename__ = "addresses"
    id = Column(Integer, primary_key=True)
    email = Column(String)
    user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"))
    user = relationship("User", back_populates="addresses")

Many-to-Many

from sqlalchemy import Table

user_group = Table(
    'user_group',
    Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('group_id', Integer, ForeignKey('groups.id'))
)

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    groups = relationship("Group", secondary=user_group, back_populates="users")

class Group(Base):
    __tablename__ = "groups"
    id = Column(Integer, primary_key=True)
    users = relationship("User", secondary=user_group, back_populates="groups")

One-to-One

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    profile = relationship("Profile", back_populates="user", uselist=False)

class Profile(Base):
    __tablename__ = "profiles"
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"), unique=True)
    user = relationship("User", back_populates="profile")

Advanced Queries

Select with Complex Filtering, Joins, and Ordering

from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload

# Complex query with eager loading
stmt = (
    select(User)
    .where(
        User.is_active == True,
        User.created_at >= datetime(2024, 1, 1)
    )
    .options(
        selectinload(User.addresses),  # Separate query
        joinedload(User.profile)       # JOIN
    )
    .order_by(
        User.created_at.desc().nulls_last(),
        User.name.asc()
    )
    .limit(100)
)
users = session.execute(stmt).unique().scalars().all()

# Chained eager loading
stmt = (
    select(Document)
    .options(
        selectinload(Document.source).selectinload(Source.items)
    )
)

Subqueries

from sqlalchemy import select

# Scalar subquery
subq = select(func.avg(User.age)).scalar_subquery()
query = session.query(User).filter(User.age > subq)

# Subquery as table
subq = select(Address.user_id, func.count().label("address_count")).group_by(Address.user_id).subquery()
query = session.query(User, subq.c.address_count).join(subq, User.id == subq.c.user_id)

Common Table Expressions (CTE)

# Recursive CTE
cte = select(User.id, User.name, User.manager_id).cte(recursive=True)

cte = cte.union_all(
    select(User.id, User.name, User.manager_id)
    .join(cte, User.manager_id == cte.c.id)
)

stmt = select(cte)
results = session.execute(stmt).all()

Window Functions

from sqlalchemy import over

# Row number
row_num = func.row_number().over(order_by=User.created_at).label('row_num')
query = session.query(User, row_num)

# Partition by
rank = func.rank().over(
    partition_by=User.department_id,
    order_by=User.salary.desc()
).label('salary_rank')
query = session.query(User, rank)

Filtering & Conditions

Comparison Operators

# Equal
users = session.query(User).filter(User.name == "Alice").all()

# Not equal
users = session.query(User).filter(User.name != "Alice").all()

# Greater than, Less than
users = session.query(User).filter(User.age > 18).all()
users = session.query(User).filter(User.age <= 65).all()

# IN
users = session.query(User).filter(User.id.in_([1, 2, 3])).all()

# NOT IN
users = session.query(User).filter(~User.id.in_([1, 2, 3])).all()

# IS NULL
users = session.query(User).filter(User.email.is_(None)).all()

# IS NOT NULL
users = session.query(User).filter(User.email.isnot(None)).all()

# LIKE
users = session.query(User).filter(User.name.like("%alice%")).all()

# ILIKE (case-insensitive)
users = session.query(User).filter(User.name.ilike("%alice%")).all()

# BETWEEN
users = session.query(User).filter(User.age.between(18, 65)).all()

Logical Operators

from sqlalchemy import and_, or_, not_

# AND
users = session.query(User).filter(
    and_(User.is_active == True, User.age > 18)
).all()

# OR
users = session.query(User).filter(
    or_(User.name == "Alice", User.name == "Bob")
).all()

# NOT
users = session.query(User).filter(
    not_(User.is_active == False)
).all()

# Combined
users = session.query(User).filter(
    and_(
        User.is_active == True,
        or_(User.age > 18, User.verified == True)
    )
).all()

Aggregations & Functions

Count

# Count all
count = session.query(User).count()
count = session.query(func.count(User.id)).scalar()

# Count with filter
count = session.query(User).filter(User.is_active == True).count()

# Count distinct
count = session.query(func.count(User.name.distinct())).scalar()

Sum, Avg, Min, Max

# Sum
total = session.query(func.sum(Order.total)).scalar()

# Average
avg_age = session.query(func.avg(User.age)).scalar()

# Min/Max
min_age = session.query(func.min(User.age)).scalar()
max_age = session.query(func.max(User.age)).scalar()

Group By

# Group by with aggregates
results = session.query(
    User.department,
    func.count(User.id).label("employee_count"),
    func.avg(User.salary).label("avg_salary")
).group_by(User.department).all()

# Having clause
results = session.query(
    User.department,
    func.count(User.id).label("count")
).group_by(User.department).having(func.count(User.id) > 5).all()

String Functions

# Concatenation
full_name = func.concat(User.first_name, ' ', User.last_name).label('full_name')

# Lower/Upper
lower_name = func.lower(User.name)
upper_name = func.upper(User.name)

# Length
name_length = func.length(User.name)

Date Functions

# Current timestamp
from sqlalchemy import func

now = func.now()
current_date = func.current_date()

# Date parts
year = func.extract('year', User.created_at)
month = func.extract('month', User.created_at)

# Date arithmetic (PostgreSQL)
future_date = User.created_at + func.interval('1 day')

Joins & Eager Loading

Explicit Joins

# Inner join
results = session.query(User).join(Address).all()

# Left outer join
results = session.query(User).outerjoin(Address).all()

# Join with condition
results = session.query(User).join(
    Address,
    and_(User.id == Address.user_id, Address.is_primary == True)
).all()

# Multiple joins
results = session.query(User).join(Address).join(City).all()

Eager Loading Strategies

selectinload (Separate SELECT)

from sqlalchemy.orm import selectinload

# Loads related objects with separate SELECT IN query
users = session.query(User).options(selectinload(User.addresses)).all()

# Chained
users = session.query(User).options(
    selectinload(User.orders).selectinload(Order.items)
).all()

joinedload (JOIN)

from sqlalchemy.orm import joinedload

# Loads related objects using LEFT OUTER JOIN
users = session.query(User).options(joinedload(User.profile)).all()

# Multiple relationships
users = session.query(User).options(
    joinedload(User.profile),
    selectinload(User.addresses)
).all()

subqueryload

from sqlalchemy.orm import subqueryload

# Loads related objects with subquery
users = session.query(User).options(subqueryload(User.addresses)).all()

raiseload (Prevent N+1)

from sqlalchemy.orm import raiseload

# Raises error if lazy loading attempted
users = session.query(User).options(raiseload(User.addresses)).all()

Transactions

Basic Transaction

try:
    user = User(name="Alice")
    session.add(user)
    session.commit()
except Exception as e:
    session.rollback()
    raise
finally:
    session.close()

Context Manager

from contextlib import contextmanager

@contextmanager
def get_db():
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except:
        db.rollback()
        raise
    finally:
        db.close()

# Usage
with get_db() as db:
    user = User(name="Alice")
    db.add(user)

Savepoints

session.begin_nested()  # Create savepoint
try:
    # ... operations ...
    session.commit()  # Commit savepoint
except:
    session.rollback()  # Rollback to savepoint

Manual Transaction Control

# Begin transaction explicitly
session.begin()

try:
    # ... operations ...
    session.commit()
except:
    session.rollback()

Session Management

Session Lifecycle

# Create session
session = SessionLocal()

# Use session
user = session.query(User).first()

# Close session
session.close()

# Check if object is in session
from sqlalchemy.orm import object_session
session = object_session(user)

Session States

from sqlalchemy.inspect import inspect

# Check object state
insp = inspect(user)
print(insp.persistent)  # In session and has database identity
print(insp.pending)     # In session, no database identity
print(insp.transient)   # Not in session
print(insp.detached)    # Has identity but not in session

Expiring & Refreshing

# Expire object (reload on next access)
session.expire(user)

# Expire all objects
session.expire_all()

# Refresh from database
session.refresh(user)

# Merge detached object
detached_user = User(id=1, name="Updated")
merged_user = session.merge(detached_user)

Flush vs Commit

# Flush: Send SQL to database, no commit
session.add(user)
session.flush()  # SQL executed, transaction still open

# Commit: Flush + commit transaction
session.commit()

Additional Tips

Raw SQL

# Execute raw SQL
result = session.execute("SELECT * FROM users WHERE name = :name", {"name": "Alice"})
for row in result:
    print(row)

# Use text()
from sqlalchemy import text
result = session.execute(text("SELECT * FROM users WHERE name = :name"), {"name": "Alice"})

Bulk Operations

# Bulk insert (bypasses ORM)
session.bulk_insert_mappings(User, [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"}
])

# Bulk update
session.bulk_update_mappings(User, [
    {"id": 1, "name": "Alice Updated"},
    {"id": 2, "name": "Bob Updated"}
])

Inspect Database

from sqlalchemy import inspect

inspector = inspect(engine)

# Get table names
tables = inspector.get_table_names()

# Get columns
columns = inspector.get_columns('users')

# Get foreign keys
fks = inspector.get_foreign_keys('addresses')