paint-brush
Concurrency management in DDDby@ayacaste
137 reads

Concurrency management in DDD

by Anton MusatovJanuary 6th, 2025
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Explore how Domain-Driven Design tackles concurrency issues in complex legacy systems, based on real-world experiences and practical insights.
featured image - Concurrency management in DDD
Anton Musatov HackerNoon profile picture
0-item

In several previous articles, I described in detail various nuances of organizing code in a large, complex legacy project using the Domain-Driven Design methodology. Today, I want to discuss another issue I encountered while developing this system - managing concurrency issues.


To recap, our project automated the procurement processes of companies in the B2B segment. The system supported multiple types of trading procedures that organizers could conduct online using flexible configurations. Participants submitted their bids and offers, and the organizer analyzed them to choose the best option and sign a contract with the winner. This approach not only simplified the procurement process but also significantly increased its efficiency through automation and heightened competition.


The project architecture was organized according to DDD principles and included four layers:

  1. Client Code - framework, external API controllers, views
  2. Application Layer - internal API for working with the domain, organized based on the Command pattern
  3. Domain - all business logic of the application, isolated from other layers
  4. Infrastructure - supporting code ensuring the functionality of the above layers: persistent data storage, queues, integration events


The primary entity in the system was the trading procedure - TradeEntity. The business required us to address the issue of managing concurrent access. This included preventing simple collisions, such as parallel editing of a procedure by different users from the organizer's side, and handling more complex scenarios.


For instance, an organizer might change the key conditions of a procedure while a participant was submitting a bid. As a result, the bid might no longer meet the updated conditions, leading to risks of errors and disputes. Another example was multiple participants submitting bids simultaneously. The first bid might change the state of the procedure by extending it, recalculating rankings, or otherwise affecting the subsequent processing of other bids.


After discussing various scenarios with the business, I proposed implementing a unified locking mechanism for the trading procedure. This mechanism ensured control over critical actions by both organizers and participants. Whenever a state change occurred, the system checked and acquired a lock to prevent potential conflicts.


The technical aspects of implementing the locking mechanism are not part of the business logic of trading procedures. Thus, the first step was to create a simple interface for working with locks at the domain level:

interface ILockService
{
    public function getLock(name: string, timeout: int): bool

    public function releaseLock(name: string): bool

    public function isUsedLock(name: string): bool
}


I placed the implementation of this interface at the infrastructure level. In this project, the locking mechanism was based on MySQL, but other tools could be used in different projects.

final class LockService implements ILockService
{
    public function __construct(private dbConnection: DbConnection) {}

    public function getLock(name: string, timeout: int): bool
    {
        return (bool) this->dbConnection
            ->query("SELECT GET_LOCK('" . name . "', " . timeout . ")")
            ->fetchFirstFromFirstRow()
    }

    public function releaseLock(name: string): bool
    {
        return (bool) this->dbConnection
            ->query("SELECT RELEASE_LOCK('" . name . "')")
            ->fetchFirstFromFirstRow()
    }

    public function isUsedLock(name: string): bool
    {
        return (bool) this->dbConnection
            ->query("SELECT IS_USED_LOCK('" . name . "')")
            ->fetchFirstFromFirstRow()
    }
}


For convenient unit testing, a Null implementation of the service was also created:

final class NullLockService implements ILockService
{
    public function getLock(name: string, timeout: int): bool
    {
        return true
    }

    public function releaseLock(name: string): bool
    {
        return true
    }

    public function isUsedLock(name: string): bool
    {
        return false
    }
}


The ILockService interface provides basic methods for managing locks. Next, it was necessary to determine what exactly should be locked and how. This logic relates to business rules, so I created a separate service at the domain level:

final class TradeLockService
{
    private const LOCK_TIMEOUT = 5

    public function __construct(
        private lockService: ILockService,
        private tradeRepository: ITradeRepository,
    ) {}

    public function lockTrade(tradeId: TradeId): bool
    {
        lockName = this->getLockName(tradeId)
        
        if (null === lockName) {
            return true
        }

        return this->lockService->getLock(lockName, self::LOCK_TIMEOUT)
    }

    public function releaseTradeLock(tradeId: TradeId): void
    {
        lockName = this->getLockName(tradeId)

        if (null === lockName) {
            return
        }

        $this->lockService->releaseLock(lockName)
    }

    private function getLockName(tradeId: TradeId): ?string
    {
        if (tradeId->isNull()) {
            return null
        }

        trade = this->tradeRepository->findBySpec(new TradeIdSpec(tradeId))
        if (trade->isNull()) {
            return null
        }

        lockName = 'trade_lock_';
        if (trade instanceof ITradeLotEntity) {
            return lockName . trade->getContainerId()
        }

        return lockName . trade->getId()
    }
}


The TradeLockService applies locks at the procedure level. If the procedure is a lot, the container it belongs to is locked. The lock timeout is 5 seconds.


The next step was to integrate this service into the Application layer commands. To avoid duplicating logic, I created a command decorator responsible for lock management:

final class TradeConcurrencyCommand extends AbstractCommand
{
    public function __construct(
        private tradeId: TradeIdDTO,
        private command: ICommand,
    ) {}

    public function execute(): mixed
    {
        tradeIdMapper = this->getDIContainer()->get(ITradeIdMapper::class)
        tradeId = tradeIdMapper->mapTradeIdDTO(this->tradeId)

        tradeLockService = this->getDIContainer()->make(TradeLockService::class)
        if (! tradeLockService->lockTrade(tradeId)) {
            throw new ConcurrencyException()
        }

        try {
            result = this->command->execute()
            tradeLockService->releaseTradeLock(tradeId)
            return result
        }
        catch (AbstractAppException exception) {
            tradeLockService->releaseTradeLock(tradeId)
            throw exception
        }
    }
}


When incorporating it into another command, a static constructor was added, and the regular constructor was marked private:

final class PublishTradeCommand extends AbstractCommand
{
    private function __construct(
        private userId: int,
        private tradeId: TradeIdDTO,
    ) { }

    public static function get(
        userId: int,
        tradeId: TradeIdDTO,
    ): TradeConcurrencyCommand
    {
        return new TradeConcurrencyCommand(
            tradeId,
            new self(userId, tradeId),
        );
    }


    public function execute(): void
    {
        ...
    }
}


Thus, the TradeConcurrencyCommand checks the possibility of acquiring a lock with a 5-second timeout for each call. In case of failure, a ConcurrencyException is thrown. If the lock is successfully acquired, the provided command is executed, and then the lock is released. This approach became a crucial part of the project architecture, ensuring data reliability and reducing collision risks.


This method provided comprehensive locking for various critical actions involving the procedure entity and other related entities. However, it did not always solve the issue of using outdated data during edits.


For example, if two users simultaneously edited a trading procedure (User A and User B), they could both load the initial data, make changes to different fields, and then attempt to save. User A might acquire a lock and save their data, and then User B might also successfully acquire a lock and overwrite some of the changes made by User A.


To address this issue, entity versioning can be used. When saving changes, the entity's version is incremented. If the version saved in the database does not match the current entity version, the save attempt should fail.


At the domain level, the trading procedure entity can include a version:

abstract class AbstractTradeEntity extends AbstractEntity implements ITradeEntity
{
    protected version: int
}


The version is populated when mapping data from the database model to the entity. Users cannot modify the version at the Application Layer level. During repository save operations, the version's validity is checked:

final class TradePersistentRepository extends AbstractRepository implements ITradeRepository
{
    public function save(trade: ITradeEntity): void
    {
        DbModel = (new TradeMapper())->mapTradeToDbModel(trade)
        DbModel->saveWithVersionCheck()
        ...
    }
}

final class TradeDbModel extends DbModel
{
    public function saveWithVersionCheck(): void
    {
        if (this->getIsNew()) {
            this->save()
            return
        }

        transactionId = 'save_trade_with_version_check_' . this->id
        dbConnection = this->getDbConnection()
        dbConnection->startTransaction(transactionId)
        version = this->rawQuery("
            SELECT
                version
            FROM
                %%TABLE_NAME%%
            WHERE
                id = " . this->id . "
            FOR UPDATE
        ")
        ->fetchFirstFromFirstRow()

        if ((int) version !== this->data['version']) {
            dbConnection->commitTransaction(transactionId)
            throw new VersionException()
        }

        this->data['version']++
        this->save()
        dbConnection->commitTransaction(transactionId)
    }
}


In this case, I used a MySQL transaction and a SELECT FOR UPDATE query. Such a query locks the table row, preventing simultaneous data modifications by other queries until the current transaction is completed. If the version from the database does not match the entity version, an exception is thrown. On successful save, the version is incremented.


This versioning approach should be implemented for all entities requiring safe concurrent editing.

Summary

The mechanisms described above are justified if:

  • The system is actively used by many users simultaneously.
  • Conflicts due to concurrent access can lead to critical errors or data loss.


However, it is important to consider the complexity of implementing and maintaining such solutions. If the system is not highly loaded or concurrency is unlikely, these changes may be excessive.