Release: 1.1.0b1 | Release Date: not released

SQLAlchemy 1.1 Documentation

Source code for examples.performance.single_inserts

"""In this series of tests, we're looking at a method that inserts a row
within a distinct transaction, and afterwards returns to essentially a
"closed" state.   This would be analogous to an API call that starts up
a database connection, inserts the row, commits and closes.

"""
from . import Profiler

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine, bindparam, pool
from sqlalchemy.orm import Session

Base = declarative_base()
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))
    description = Column(String(255))


Profiler.init("single_inserts", num=10000)


@Profiler.setup
def setup_database(dburl, echo, num):
    global engine
    engine = create_engine(dburl, echo=echo)
    if engine.dialect.name == 'sqlite':
        engine.pool = pool.StaticPool(creator=engine.pool._creator)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


@Profiler.profile
def test_orm_commit(n):
    """Individual INSERT/COMMIT pairs via the ORM"""

    for i in range(n):
        session = Session(bind=engine)
        session.add(
            Customer(
                name='customer name %d' % i,
                description='customer description %d' % i)
        )
        session.commit()


@Profiler.profile
def test_bulk_save(n):
    """Individual INSERT/COMMIT pairs using the "bulk" API """

    for i in range(n):
        session = Session(bind=engine)
        session.bulk_save_objects([
            Customer(
                name='customer name %d' % i,
                description='customer description %d' % i
            )])
        session.commit()


@Profiler.profile
def test_bulk_insert_dictionaries(n):
    """Individual INSERT/COMMIT pairs using the "bulk" API with dictionaries"""

    for i in range(n):
        session = Session(bind=engine)
        session.bulk_insert_mappings(Customer, [
            dict(
                name='customer name %d' % i,
                description='customer description %d' % i
            )])
        session.commit()


@Profiler.profile
def test_core(n):
    """Individual INSERT/COMMIT pairs using Core."""

    for i in range(n):
        with engine.begin() as conn:
            conn.execute(
                Customer.__table__.insert(),
                dict(
                    name='customer name %d' % i,
                    description='customer description %d' % i
                )
            )


@Profiler.profile
def test_core_query_caching(n):
    """Individual INSERT/COMMIT pairs using Core with query caching"""

    cache = {}
    ins = Customer.__table__.insert()
    for i in range(n):
        with engine.begin() as conn:
            conn.execution_options(compiled_cache=cache).execute(
                ins,
                dict(
                    name='customer name %d' % i,
                    description='customer description %d' % i
                )
            )


@Profiler.profile
def test_dbapi_raw_w_connect(n):
    """Individual INSERT/COMMIT pairs w/ DBAPI + connection each time"""

    _test_dbapi_raw(n, True)


@Profiler.profile
def test_dbapi_raw_w_pool(n):
    """Individual INSERT/COMMIT pairs w/ DBAPI + connection pool"""

    _test_dbapi_raw(n, False)


def _test_dbapi_raw(n, connect):
    compiled = Customer.__table__.insert().values(
        name=bindparam('name'),
        description=bindparam('description')).\
        compile(dialect=engine.dialect)

    if compiled.positional:
        args = (
            ('customer name %d' % i, 'customer description %d' % i)
            for i in range(n))
    else:
        args = (
            dict(
                name='customer name %d' % i,
                description='customer description %d' % i
            )
            for i in range(n)
        )
    sql = str(compiled)

    if connect:
        for arg in args:
            # there's no connection pool, so if these were distinct
            # calls, we'd be connecting each time
            conn = engine.pool._creator()
            cursor = conn.cursor()
            cursor.execute(sql, arg)
            lastrowid = cursor.lastrowid
            conn.commit()
            conn.close()
    else:
        for arg in args:
            conn = engine.raw_connection()
            cursor = conn.cursor()
            cursor.execute(sql, arg)
            lastrowid = cursor.lastrowid
            conn.commit()
            conn.close()


if __name__ == '__main__':
    Profiler.main()