advanced ddd building blocks

Implementing the Repository pattern using SqlAlchemy (part 2)

In the previous post, we implemented a really simple file-based repository that used the pickle module. As we didn't use a database, we didn't have to deal with creating tables and columns, and all the domain models were directly serialized from a memory to a byte stream (and vice versa). Such an approach was ok to introduce the concept of a repository, but in a production system we will need some kind of a database to store the application state.

As a side note, last time I wrote that repositories are used in loading and saving entities. In fact, repositories work with aggregates (which are composites of entities) but for the sake of simplicity, I'm sticking with entities.

This time we are going to use SQL Alchemy as a persistence mechanism for the entities. We will keep the original interface, but all the implementation details will be encapsulated in the SqlAlchemyListingRepository. Let's start with a revised interface to a ListingRepository, which additionally supports adding, removing, and persisting Listing domain entities:

# somewhere in the domain layer

class ListingRepository(metaclass=abc.ABCMeta):
    """An interface to listing repository"""

    @abc.abstractmethod
    def add(self, entity: Listing):
        """Adds new entity to a repository"""
        raise NotImplementedError()

    @abc.abstractmethod
    def remove(self, entity: Listing):
        """Removes existing entity from a repository"""
        raise NotImplementedError()

    @abc.abstractmethod
    def get_by_id(id: ListingId) -> Listing:
        """Retrieves entity by its identity"""
        raise NotImplementedError()

    def __getitem__(self, index) -> Listing:
        return self.get_by_id(index)

In Domain-Driven Design, the goal of a repository is to encapsulate the logic required to access domain objects. Therefore, the interface to a repository is a part of a domain layer, while the actual implementation of this interface belongs to the infrastructure layer. As a consequence, the domain code stays clean - it is isolated from any technical concerns, and it is database agnostic.

A bit of theory...

Here are some concepts I'd like to introduce and discuss before we jump right into the implementation of the SQL Alchemy repository:

1. ORM Model ≠ Domain Object

Entities are Python classes - this is not a surprise. They contain attributes: primitive types, value objects, enums, references to other entities, and collections of all the above. In memory, this forms a graph-like structure of nested fields. Just as a reminder, this is how our Listing entity looks like (at this point we do not care about the logic):

@dataclass
class Listing(Entity):
    id: int
    name: str
    min_price: Money
    ....

On the other hand, databases organize data in a flat, tabular fashion. Python classes and relational data models do not work very well together, and ORMs were introduced to overcome this problem. However, because of the mapping between model attributes and table columns, ORM models are still tightly coupled to a database and a specific ORM implementation:

class ListingModel(ModelBase):
    __tablename__ = "listing"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    min_price__amount = Column(Integer)
    min_price__currency = Column(String(3))

This raises 2 problems:

  1. ORM Models shouldn't be used in the domain layer. ORM Models are harder to test when compared to Domain Objects (they need a database to exist). Also, we want the business layer to remain clean, without any dependencies on underlying storage mechanisms. ListingModel is tightly coupled to SQL Alchemy, and having such a dependency in a business layer is a no-go. However, it's still fine to use SQLA Models in some cases, i.e. if we do basic CRUD operations that do not require business logic.

  2. There is no easy way to have a multi-field value object (i.e. min_price Money composed of amount and currency) to multiple columns in the model. Theoretically, we could add some getters and setters for Value Objects:

class ListingModel(ModelBase):
    ...
    min_price__amount = Column(Integer)
    min_price__currency = Column(String(3))

    def get_min_price(self) -> Money:
        return Money(self.min_price__amount, self.min_price__currency)

    def set_min_price(self, value: Money):
        self.min_price__amount = value.amount
        self.min_price__currency = value.currency

...but this contradicts the idea of value objects being immutable (you can still change an individual attribute of a model that logically belongs to a value object).

To overcome these problems, we need a mechanism to transfer data between a business layer - a data mapper.

2. Data Mapper

We already know that we don't want to use Models in the domain layer. Models are SQL Alchemy-specific, as they are composed of Columns, ForeignKeys, relationships, etc. We do not want any of the infrastructure details to leak into the domain. We could still use Models for querying data (as it's okay to query the database through other channels than repos), but this is beyond the scope of this article.

What we need to do is to:

  • map models to entities when reading data from a repo (i.e. get_by_id(...) call returns a domain entity),
  • map entities to models when any data changes are about to be saved.

This can be implemented in 2 ways:

  1. We can use SQL Alchemy imperative mapping to map data from SQL tables to pure Python classes. Cosmic Python follows this approach. Here, all the attributes of a model are automatically translated from/to a domain object, via mapper_registry.map_imperatively(). However, this technique is limited to models consisting only of primitive types. If our domain model has multi-attribute value objects, then we need a more sophisticated approach.

  2. We can implement our own data mapper for translating to and from our domain model. To do the mapping we will need 2 functions: XYZ_model_to_entity and XYZ_entity_to_model. The repository will use the data mapper to do all the translations. This is the approach we are going to take in the implementation of SqlAlchemyListingRepository.

3. Identity Map

One of the essential elements in the repository pattern is the Identity Map, which is used to improve performance by providing a context-specific, in-memory cache to prevent duplicate retrieval of the same object data from the database during a single transaction. Here is another definition:

An Identity Map keeps a record of all objects that have been read from the database in a single business transaction. Whenever you want an object, you check the Identity Map first to see if you already have it.
Martin Fowler

This is pretty straightforward: when we query the repository for an entity, we will check the identity map first. If the entity is already present in the map, the repository will simply return a reference to a cached object. Otherwise, we will read the model from a database, transform it with a data mapper, store it in the identity map and return a reference. When saving an entity, we will use a data mapper to transform it into a model instance and then persist it using the built-in SQL Alchemy session mechanism.

The repository pattern implementation

Now that we know what are the building blocks of a repository, let's look at all the pieces that we need to implement the SQL Alchemy Listing Repository.

Domain objects

Here is the data part of our domain model. For clarity reasons, all the logic was removed from domain objects.

from uuid import UUID
from dataclasses import dataclass

# some type aliases
ListingId = UUID
BidderId = UUID

@dataclass
class Money:
    """A value object that represents money"""
    amount: int
    currency: str

@dataclass
class Bid:
    """A value object that represents a bid placed on a listing by a buyer"""
    bidder_id: BidderId
    price: Money

@dataclass
class Listing(Entity):
    """An entity that represents a listing with an ask price and all the bids already placed on this item"""
    id: int
    name: str
    min_price: Money
    bids: List[Bid] = field(default_factory=list)

Here are the interesting things to notice. A Listing is composed of some primitive types (i.e. id, name), a single value object (min_price), and a list of value objects (bids). For some undisclosed reason, let's assume that we intentionally use a list so that we can preserve the order of bids (otherwise we could use an unordered set).

Data model

The above domain objects are reflected as database models as follows:

import uuid
from infrastructure.sqlalchemy_common import Base, Column, UUID, String, Integer, ForeignKey, relationship

UniqueIdentifer = UUID(as_uuid=True)

class BidModel(Base):
    """ Stores Bid value object"""
    __tablename__ = "bid"
    # composite primary key
    listing_id = Column(UniqueIdentifier,
                        ForeignKey("listing.id"),
                        primary_key=True)

    # since bids are stored in an ordered collection (list), an index column is required
    idx = Column(Integer, primary_key=True)

    bidder_id = Column(UniqueIdentifier)
    price__amount = Column(Integer)
    price__currency = Column(String(3))

    # parent relationship
    listing = relationship("ListingModel", back_populates="bids")

class ListingModel(Base):
    __tablename__ = "listing"

    id = Column(UniqueIdentifier, primary_key=True, default=uuid.uuid4)
    name = Column(String(30))

    min_price__amount = Column(Integer)
    min_price__currency = Column(String(3))

    bids = relationship("BidModel",
                        order_by="BidModel.idx.asc()",
                        cascade="save-update, merge, delete, delete-orphan")

As you can see, we do not have a separate table for Money - these are stored as primitive fields in the corresponding models (i. e. price__amount, price__currency and min_price__amount, min_price__currency). However, we are storing bids in a separate BidModel as there is a 1 to many relationship between ListingModel and BidModel. Also, bids are ordered by idx field to preserve the order of the elements in Listing.bids list, and there is a cascade option set in such a way to keep the Listing.bids and the table rows in sync when Bid value objects are removed from the list. Also, take note that is using a composite primary key composed of listing_id and idx as it is enough to identify a bid at a database level.

Data mapper

The data mapper logic is nothing fancy - just a rather tedious mapping from one type to the other. One small interesting bit to notice is packing/unpacking value objects.

from infrastructure.models import ListingModel, BidModel
from domain.entities import Listing
from domain.value_objects import Money, Bid

def listing_model_to_entity(instance: ListingModel) -> Listing:
    def map_bid_to_value_object(bid: BidModel) -> Bid:
        return Bid(
            bidder_id=bid.bidder_id, 
            price=Money(amount=bid.price__amount, currency=bid.price__currency)
        )

    return Listing(
        id=instance.id,
        name=instance.name,
        min_price=Money(amount=instance.min_price__amount, currency=instance.min_price__currency),
        bids=[map_bid_to_value_object(bid) for bid in instance.bids]
    )

def listing_entity_to_model(listing: Listing, existing=None) -> ListingModel:
    def map_bid_to_model(idx: int, bid: Bid) -> BidModel:
        return BidModel(bidder_id=bid.bidder_id, price__amount=bid.price.amount, price__currency=bid.price.currency, idx=idx)

    return ListingModel(
        id=listing.id,
        name=listing.name,
        min_price__amount=listing.min_price.amount,
        min_price__currency=listing.min_price.currency,
        bids=[map_bid_to_model(idx, bid) for idx, bid in enumerate(listing.bids)]

Listing Repository

Finally, let's look at the implementation of a repository:

from sqlalchemy.orm import Session
from domain.repositories import ListingRepository
from domain.entities import Listing
from infrastructure.models import ListingModel
from infrastructure.data_mappers import listing_model_to_entity, listing_entity_to_model

# a sentinel value for keeping track of entities removed from the repository
REMOVED = object()

class SqlAlchemyListingRepository(ListingRepository):
    """SqlAlchemy implementation of ListingRepository"""

    def __init__(self, session: Session, identity_map=None):
        self.session = session
        self._identity_map = identity_map or dict()

    def add(self, entity: Listing):
        self._identity_map[entity.id] = entity
        instance = listing_entity_to_model(entity)
        self.session.add(instance)

    def remove(self, entity: Listing):
        self._check_not_removed(entity)
        self._identity_map[entity.id] = REMOVED
        listing_model = self.session.query(ListingModel).get(entity.id)
        self.session.delete(listing_model)

    def get_by_id(self, id):
        instance = self.session.query(ListingModel).get(id)
        return self._get_entity(instance, listing_model_to_entity)

    def get_by_name(self, name):
        instance = self.session.query(ListingModel).filter_by(name=name).one()
        return self._get_entity(instance, listing_model_to_entity)

    def _get_entity(self, instance, mapper_func):
        if instance is None:
            return None
        entity = listing_model_to_entity(instance)
        self._check_not_removed(entity)

        if entity.id in self._identity_map:
            return self._identity_map[entity.id]

        self._identity_map[entity.id] = entity
        return entity

    def __getitem__(self, key):
        return self.get_by_id(key)

    def _check_not_removed(self, entity):
        assert self._identity_map.get(entity.id, None) is not REMOVED, f"Entity {entity.id} already removed"

    def persist(self, entity: Listing):
        self._check_not_removed(entity)
        assert entity.id in self._identity_map, "Cannon persist entity which is unknown to the repo. Did you forget to call repo.add() for this entity?"
        instance = listing_entity_to_model(entity)
        merged = self.session.merge(instance)
        self.session.add(merged)

    def persist_all(self):
        for entity in self._identity_map:
            if entity is not REMOVED:
                self.persist(entity)

There are a couple of things worth noting here:

  • we are passing a SQL AlchemySession and the Identity Map instances via the __init__. There should be one session and one identity map per business transaction.
  • when add()ing a new entity to a repository, we store the entity in the identity map and the corresponding model in the SQL Alchemy session.
  • when entity state is being changed
  • to persist the changes made to the entity, use persist() or persist_all() methods. When an entity is persisted, its state is translated back to the model using listing_entity_to_model data mapper.
  • persist()ing the changes and commit()ing the session is not a responsibility of a repository - and this is intended. Coordinating the writes to the database should be handled by a Unit Of Work.
  • this repository implementation is not ideal in terms of memory consumption (in fact we use 2 identity maps here: one which is used by a repo, and the other one which is a part of SQLA session) but I consider it to be good enough for our purposes.

The use case

This is how we could publish a listing via publish_listing_use_case function:

engine = create_engine("sqlite+pysqlite:///:memory:",
                       echo=True,
                       future=True)
Base.metadata.create_all(engine)

def publish_listing_use_case(listing_id: ListingId, repository: ListingRepository):
    listing = repository.get_by_id(listing_id)
    listing.publish()

def execute_publish_listing_via_unit_of_work():
    identity_map = []
    with Session(engine) as session:
        repository = SqlAlchemyListingRepository(session, identity_map)
        publish_listing_use_case(listing_id=..., repository=repository)
        repository.persist_all()

execute_publish_listing_via_unit_of_work()

As we can see, there is no explicit saving of the entity state in the database by the repository at this point. All the heavy lifting is handled by execute_publish_listing_via_unit_of_work function which is responsible for creating a session, instantiating the repository, calling a use case function, and saving all the results. In general, this is a responsibility of a Unit of Work which will be introduced later.

Conclusion & Final remarks

As we can see, the actual implementation of a repository pattern is a lot of work. We need to define a data model for our domain object, configure the mapping between these two, then implement the repository itself. We also need some logic to keep a model instance and entity in sync, so that if the entity changed it will be saved automatically in a database. Another layer of abstraction certainly means much more work. So is it all worth it? When should we use it? What are the benefits?

First of all, we should use the repository pattern if our intention is to change the state of the entity by executing the business logic. It would be an overkill if all we want to do is to query the data (i.e. searching, sorting, filtering) and display it on the screen.

And about the benefits: we get a separation of concerns - the business layer does not need to know the data source nor track the changes. Repositories are interchangeable - the code is easier to test and more maintainable in the longer perspective.

.