Параллелизм, акторная модель и Kafka в системах финансовых транзакций

Введение в проблематику

Управление параллельными финансовыми транзакциями, или, проще, параллелизмом — одна из самых сложных задач, стоящих перед разработчиками и системными архитекторами. Проблемы параллелизма (concurrency) возникают при одновременной обработке нескольких транзакций, что может привести к потенциальным конфликтам и несоответствиям данных. Эти проблемы проявляются в различных формах, таких как обнуление счетов, дублирование транзакций или несоответствие записей — все это может серьезно подорвать надежность системы и доверие к ней. 

В финансовом мире, где ставки исключительно высоки, даже одна ошибка может привести к значительным финансовым потерям, нарушениям нормативных требований и репутационному ущербу для организации. Поэтому очень важно внедрить надежные механизмы для эффективной обработки параллелизма, обеспечивающие целостность и надежность системы.

Сложности с приложениями для денежных переводов

На первый взгляд управление балансом счета клиента может казаться простой задачей. Основные операции — зачисление средств на счет, разрешение на снятие средств или перевод средств между счетами — по сути, являются простыми операциями с базой данных. Эти операции обычно связаны с добавлением или вычитанием из баланса счета, при этом основной задачей является предотвращение овердрафта и постоянное поддержание положительного или нулевого баланса. 

Однако в реальности все гораздо сложнее. Перед выполнением любой операции часто необходимо провести ряд проверок смежных систем. Например, система должна убедиться в том, что данный счет действительно существует, для чего обычно запрашивается центральная база данных счетов или соответствующий сервис. Кроме того, система должна убедиться, что счет не заблокирован из-за таких проблем, как подозрительная активность, несоответствие нормативным требованиям или ожидание верификации. 

Эти дополнительные шаги создают уровни сложности, выходящие за рамки простых дебетовых и кредитных операций. Для обеспечения безопасного и точного управления остатками средств клиентов требуются надежные системы проверки и балансировки, что значительно усложняет систему в целом.

Требования реального мира (Знай своего клиента, предотвращение мошенничества и т. д.) 

Рассмотрим практический пример компании, занимающейся денежными переводами, которая позволяет клиентам переводить средства в разных валютах и странах. С точки зрения клиента процесс прост: 

  1. Клиент открывает счет в системе. 
  2. Создается счет в евро для получения денег. 
  3. Клиент создает получателя в системе. 
  4. Клиент инициирует перевод 100 евро в 110 долларов получателю. 
  5. Система ожидает поступления 100 евро. 
  6. После поступления средств они конвертируются в 110 долларов. 
  7. Наконец, система отправляет 110 долларов получателю.

Этот процесс можно визуализировать следующим образом:

financial transactions scheme

Хотя последовательность кажется простой, реальные требования создают дополнительные сложности: 

  1. Верификация платежа
  • Система должна проверить происхождение входящего платежа. 
  • Банковский счет плательщика должен быть валидным. 
  • BIC-код банка должен быть авторизован в системе. 
  • Если платеж поступает из небанковской платежной системы, требуются дополнительные проверки. 
  1. Валидация получателя
  • Банковский счет получателя должен быть активным. 
  1. Валидация клиента
  • Получатель должен пройти различные проверки, такие как верификация личности (например, действительный паспорт и подтверждение по селфи). 
  1. Источник средств и комплаенс
  • В зависимости от суммы входящего перевода может потребоваться проверка источника средств. 
  • Антифрод-система должна проверить входящий платеж. 
  • Ни отправитель, ни получатель не должны фигурировать в санкционных списках. 
  1. Лимиты транзакций и комиссии
  • Система должна рассчитать месячные и годовые лимиты платежей, чтобы определить комиссии. 
  • Если транзакция предполагает конвертацию валюты, система должна обработать курсы обмена валют. 
  1. Аудит и комплаенс
  • Система должна регистрировать все транзакции для целей аудита и соответствия требованиям.

Эти требования значительно усложняют то, что первоначально кажется простым процессом. Кроме того, по результатам этих проверок платеж может потребовать ручной проверки, что еще больше удлиняет процесс оплаты.

Визуализация потока данных и потенциальных точек сбоя

В системе финансовых операций поток данных для обработки входящих платежей включает в себя множество этапов и проверок комплаенса, безопасности и точности. Однако на протяжении всего этого процесса существуют потенциальные точки отказа, особенно когда внешние системы накладывают ограничения, или когда система должна динамически принимать решение о порядке действий на основе данных в реальном времени. 

Стандартный поток входящих платежей

Ниже упрощенная визуализация потока данных при обработке входящего платежа, включая последовательность взаимодействий между различными компонентами:

Транзакции в реальном мире

Пояснение потока

  1. Клиент инициирует платеж: Клиент отправляет платеж в свой банк. 
  2. Банк отправляет платеж: Банк направляет платеж в систему переводов. 
  3. Проверка комплаенса: Система переводов проверяет отправителя и получателя на соответствие нормативным требованиям. 
  4. Верификация: Система проверяет, прошли ли отправитель и получатель необходимые проверки личности и документов. 
  5. Обнаружение мошенничества (антифрод): Выполняется проверка на мошенничество, чтобы убедиться, что платеж не является подозрительным. 
  6. Расчет статистики: Система рассчитывает лимиты транзакций и другие соответствующие показатели. 
  7. Расчет комиссий: Рассчитываются все применимые комиссии. 
  8. Подтверждение: Система подтверждает получение платежа клиентом.

Потенциальные точки сбоя и динамические ограничения

Несмотря на то что описанный выше процесс кажется простым, он может усложняться из-за динамических изменений, например, когда внешняя система накладывает ограничения на учетную запись клиента. 

Вот как может разворачиваться этот процесс, с указанием потенциальных точек сбоя:

Потенциальные точки сбоя

Объяснение потенциальных точек отказа

  1. Динамические ограничения
  • В ходе процесса команда по соблюдению нормативных требований может принять решение об ограничении всех операций определенного клиента из-за санкций или других нормативных причин. Это создает потенциальную точку отказа, когда процесс может быть остановлен или изменен на середине пути. 
  1. Конфликты состояний базы данных
  • После того, как комплаенс принимает решение об ограничении операций, системе переводов (трансферов) необходимо обновить состояние трансфера в базе данных. Сложность здесь заключается в управлении согласованностью состояния, особенно если одновременно выполняется несколько операций или имеются конфликтующие обновления.
  • Система должна обеспечить точное отражение состояния трансфера в базе данных с учетом наложенного ограничения. При отсутствии должного внимания это может привести к несогласованным состояниям или неудачным транзакциям.
  1. Точки принятия решений:
  • Способность системы динамически пересчитывать состояние и решать, принять или отклонить входящий платеж, имеет решающее значение. Любая ошибка в процессе принятия решения может привести к несанкционированным транзакциям, блокировке средств или нарушению законодательства. 

Визуализация потока данных и выявление потенциальных точек отказа в системах финансовых транзакций раскрывает всю сложность и риски, связанные с обработкой платежей. Понимая эти риски, системные архитекторы могут разработать более надежные механизмы для управления состоянием, обработки динамических изменений и обеспечения целостности процесса транзакций.

Традиционные подходы к параллелизму

Существуют различные подходы к решению проблем параллелизма в системах финансовых транзакций.

Транзакции в базе данных и их ограничения

Наиболее простым подходом к управлению параллелизмом являются транзакции в базе данных. Для начала давайте определимся с контекстом: система переводов хранит свои данные в базе данных Postgres. Несмотря на то что топология базы данных может быть различной (расшариваться между несколькими экземплярами, центрами обработки данных, местоположениями или регионами), мы сосредоточимся на простом, единственном экземпляре базы данных Postgres, обрабатывающем как чтение, так и запись.

Чтобы гарантировать, что одна транзакция не перезапишет данные другой, мы можем заблокировать строку, связанную с трансфером:

SELECT * FROM transfers WHERE id = 'ABCD' FOR UPDATE;

Эта команда блокирует строку в начале процесса и снимает блокировку после завершения транзакции. На следующей диаграмме показано, как этот подход решает проблему потерянных обновлений:

Хотя этот подход может решить проблему потерянных обновлений в простых сценариях, он становится менее эффективным по мере масштабирования системы и увеличения количества активных транзакций.

Проблемы масштабирования и исчерпания ресурсов

Рассмотрим последствия масштабирования этого подхода. Предположим, что обработка одного платежа занимает 5 секунд, а система обрабатывает 100 входящих платежей каждую секунду. Это означает 500 активных транзакций в любой момент времени. Каждая из этих транзакций требует подключения к базе данных, что может быстро привести к исчерпанию ресурсов, увеличению задержек и снижению производительности системы, особенно в условиях высокой нагрузки.

Блокировки: Локальные и распределенные

Локальные блокировки — еще один распространенный метод управления параллелизмом в рамках одного экземпляра приложения. Они гарантируют, что критические участки кода будут выполняться только одним потоком одновременно, предотвращая условия гонки и обеспечивая согласованность данных. Реализовать локальные блокировки относительно просто, используя такие конструкции, как синхронизированные блоки или ReentrantLocks в Java, которые эффективно управляют доступом к общим ресурсам в рамках одной системы.

Однако в распределенных средах, где несколько экземпляров приложения должны координировать свои действия, локальные блокировки оказываются неэффективными. В таких сценариях локальная блокировка одного экземпляра не предотвращает конфликтующие действия других экземпляров. Именно здесь на помощь приходят распределенные блокировки. Распределенные блокировки гарантируют, что только один экземпляр приложения может получить доступ к определенному ресурсу в любой момент времени, независимо от того, на каком узле кластера выполняется код.

Реализация распределенных блокировок по своей сути сложнее, часто требуются внешние системы, такие как ZooKeeper, Consul, Hazelcast или Redis, для управления состоянием блокировок на нескольких узлах. Эти системы должны быть высокодоступными и согласованными, чтобы механизм распределенных блокировок не превратился в единую точку отказа или узкое место.

Следующая диаграмма иллюстрирует типичный поток распределенной системы блокировки:

Проблема упорядочивания

В распределенных системах, где несколько узлов одновременно могут запрашивать блокировки, обеспечение прозрачной обработки и поддержание согласованности данных может быть сложной задачей. Упорядочивание очереди запросов на блокировку между узлами сопряжено с рядом трудностей: 

  • Задержка в сети: Изменяющиеся задержки в сети могут затруднить поддержание строгого упорядочения
  • Отказоустойчивость: Механизм упорядочивания должен быть отказоустойчивым и не превращаться в единую точку отказа, добавляя сложности системе.

Ожидание потребителей блокировок и дедлоки

Когда несколько узлов владеют различными ресурсами и ждут друг друга, чтобы освободить блокировки, может возникнуть взаимная блокировка (дедлок), который остановит систему. Чтобы смягчить эту проблему, распределенные блокировки часто включают тайм-ауты. 

Тайм-ауты 

  • Тайм-ауты получения блокировки: Узлы задают максимальное время ожидания блокировки. Если блокировка не будет предоставлена в течение этого времени, запрос прерывается, предотвращая бесконечное ожидание. 
  • Таймауты удержания блокировки: Узлы, удерживающие блокировку, имеют максимальный срок ее удержания. Если это время превышено, блокировка автоматически снимается, чтобы предотвратить неограниченное удержание ресурсов. 
  • Обработка таймаутов: При возникновении тайм-аута система должна легко справляться с ним, будь то повторная попытка, прерывание или запуск компенсирующих действий.

Учитывая эти проблемы, обеспечение надежной обработки платежей в системе, основанной на распределенной блокировке, является сложной задачей. Баланс между необходимостью контроля параллелизма и реалиями распределенных систем требует тщательного планирования и надежного проектирования.

Смена парадигмы: упрощение параллелизма 

Давайте проанализируем наш подход к обработке трансферов. Разбив процесс на мелкие этапы, мы можем упростить каждую операцию, сделав всю систему более управляемой и снизив риск возникновения проблем с параллелизмом.

Когда поступает платеж, он запускает серию проверок, каждая из которых требует вычислений от различных систем. Когда все результаты получены, система принимает решение о дальнейших действиях. Эти шаги напоминают переходы в конечных автоматах (FSM).

Внедрение модели обработки на основе сообщений

Как показано на диаграмме, обработка платежей включает в себя комбинацию команд и переходов состояний. Для каждой команды система определяет начальное состояние и возможные переходы.

Например, если система получает команду [ReceivePayment], она проверяет, находится ли перевод в состоянии создания. Если нет, то ничего не делает. Для команды [ApplyCheckResult] система переводит передачу в состояние либо checks_approved, либо checks_rejected по результатам проверок. 

Эти проверки разработаны для гранулярной и быстрой обработки, поскольку каждая проверка происходит независимо и не изменяет состояние передачи напрямую. Для определения результата проверки требуются только входные данные. 

Как может выглядеть код такой обработки:

interface Check<Input> {
    CheckResult run(Input input);
}

interface Processor<State, Command> {
    State process(State initial, Command command);
}

interface CommandSender<Command> {
    void send(UUID transferId, Command command);
}

Посмотрим, как взаимодействуют эти компоненты в случае отправки, получения и обработки чеков:

enum CheckStatus {
    NEW,
    ACCEPTED,
    REJECTED
}

record Check(UUID transferId, CheckType type, CheckStatus status, Data data);

class CheckProcessor {
    void process(Check check) {
        // Run all required calculations
        // Send result to `TransferProcessor`
    }
}

enum TransferStatus {
    CREATED,
    PAYMENT_RECEIVED,
    CHECKS_SENT,
    CHECKS_PENDING,
    CHECKS_APPROVED,
    CHECKS_REJECTED
}

record Transfer(UUID id, List<Check> checks);

sealed interface Command permits
    ReceivePayment,
    SendChecks,
    ApplyCheckResult {}

class TransferProcessor {
    State process(State state, Command command) {
        // (1) If status == CREATED and command is `ReceivePayment`
        // (2) Write payment details to the state
        // (3) Send command `SendChecks` to self
        // (4) Set status = PAYMENT_RECEIVED

        // (4) If state = PAYMENT_RECEIVED and command is `SendChecks`
        // (5) Calculate all required checks (without processing)
        // (6) Send checks for processing to other processors
        // (7) Set status = CHECKS_SENT

        // (10) If status = CHECKS_SENT or CHECKS_PENDING
        //     and command is ApplyCheckResult
        // (11) Update `transfer.checks()`
        // (12) Compute overall status
        // (13) If all checks are accepted - set status = CHECKS_APPROVED
        // (14) If any of the checks is rejected - set status CHECKS_REJECTED
        // (15) Otherwise - set status = CHECKS_PENDING
    }
}

Такой подход снижает время ожидания обработки за счет разделения вычислений результатов проверки на отдельные процессы, что приводит к уменьшению количества одновременных операций. Однако он не полностью решает проблему обеспечения атомарности обработки команд.

Связь через сообщения

В этой модели связь между различными частями системы происходит через сообщения. Такой подход обеспечивает асинхронную связь, разделение компонентов и повышение гибкости и масштабируемости. Управление сообщениями осуществляется с помощью очередей и брокеров сообщений, которые обеспечивают упорядоченную передачу и прием сообщений. Диаграмма ниже иллюстрирует этот процесс:

Communication through messages

Однократная обработка сообщений

Для обеспечения правильной и последовательной обработки команд очень важно упорядочить и линеаризовать все сообщения в одной передаче. Это означает, что сообщения должны обрабатываться в том порядке, в котором они были отправлены, и никакие два сообщения в одной передаче не должны обрабатываться одновременно. Последовательная обработка гарантирует, что каждый шаг в жизненном цикле транзакции выполняется в правильной последовательности, предотвращая конфликты, повреждение данных или несогласованные состояния. Вот как это работает:

  1. Очередь сообщений: Поддерживается выделенная очередь для каждой передачи, чтобы обеспечить обработку сообщений в порядке их поступления. 
  2. Потребитель: Потребитель получает сообщения из очереди, обрабатывает их и подтверждает успешную обработку.
  3. Последовательная обработка: Потребитель обрабатывает каждое сообщение по очереди, гарантируя, что два сообщения для одной и той же передачи не будут обработаны одновременно.

Долговременное хранение сообщений

Обеспечение долговременного хранения сообщений имеет решающее значение для систем финансовых транзакций, поскольку позволяет системе воспроизвести сообщение, если процессор не сможет обработать команду из-за таких проблем, как сбои внешних платежей, сбои в системе хранения данных или проблемы с сетью.

Представим себе сценарий, в котором команда обработки платежей не выполняется из-за временного сбоя в сети или ошибки базы данных. Без долговременного хранения сообщений эта команда может быть потеряна, что приведет к незавершенным транзакциям или другим несоответствиям. Благодаря долговременному хранению сообщений мы обеспечиваем надежную запись каждой команды и шага транзакции. Если произойдет сбой, система сможет восстановить и воспроизвести сообщение, как только проблема будет решена, обеспечив успешное завершение транзакции.

Долговечное хранение сообщений также неоценимо при работе с внешними платежными системами. Если внешняя система не подтверждает платеж, мы можем воспроизвести сообщение, чтобы повторить операцию без потери важных данных, сохраняя целостность и непротиворечивость наших транзакций.

Кроме того, долговременное хранение сообщений необходимо для аудита и комплаенса, предоставляя надежный лог всех транзакций и действий, выполняемых системой, и облегчая отслеживание и проверку операций в случае необходимости. 

На следующей схеме показано, как работает долговременное хранение сообщений:

Durable message storage

Благодаря использованию долговечного хранилища сообщений система становится более надежной и устойчивой, обеспечивая плавное устранение сбоев без ущерба для целостности данных и доверия клиентов.

Kafka как магистраль обмена сообщениями

Apache Kafka — распределенная потоковая платформа обработки сообщений с высокой пропускной способностью и низкой задержкой. Она широко используется в качестве магистраль для обмена сообщениями в сложных системах, благодаря своей способности эффективно обрабатывать данные, поступающие в режиме реального времени. Рассмотрим основные компоненты Kafka: продюсеры, топики, партиции и маршрутизацию сообщений, чтобы понять, как он работает в распределенной системе. 

Топики и партиции

Топики

В Kafka топик — это название категории или фида, в котором хранятся и публикуются записи. Топики делятся на партиции, для облегчения параллельной обработки и масштабируемости.

Партиции

Каждый топик может быть разделен на несколько партиций, которые являются фундаментальными единицами параллелизма в Kafka. Партиции — это упорядоченные, неизменяемые последовательности записей, постоянно добавляемые в структурированный журнал коммитов. Kafka хранит данные в этих разделах в распределенном кластере брокеров. Каждая партиция реплицируется между несколькими брокерами для обеспечения отказоустойчивости и высокой доступности. Коэффициент репликации определяет количество копий данных, а Kafka автоматически управляет процессом репликации для обеспечения согласованности и надежности данных.

Partition storage

Каждая запись в партиции имеет уникальное смещение, служащее идентификатором позиции записи в партиции. Это смещение позволяет консьюмерам отслеживать свое положение и в случае сбоя продолжать обработку с того места, на котором они остановились.

Маршрутизация сообщений

Маршрутизация сообщений в Kafka — это ключевой механизм, определяющий, как сообщения распределяются между партициями в топике. Существует несколько методов маршрутизации сообщений: 

  • Round-robin: Метод по умолчанию, при котором сообщения равномерно распределяются по всем доступным партициям для обеспечения сбалансированной нагрузки и эффективного использования ресурсов 
  • Маршрутизация на основе ключей: Сообщения с одинаковым ключом направляются в одну и ту же партицию, что полезно для поддержания упорядоченности связанных сообщений и обеспечения их последовательной обработки. Например, все транзакции для определенного счета могут быть направлены в одну и тот же партицию с использованием ID счета в качестве ключа. 
  • Кастомные разделители: Kafka позволяет использовать кастомную логику разделения, чтобы определить, как сообщения должны быть направлены, на основе определенных критериев. Это полезно в случае сложных требований к маршрутизации, для которых недостаточно стандартных методов.

Этот механизм маршрутизации оптимизирует производительность, сохраняет порядок сообщений при необходимости, поддерживает масштабируемость и отказоустойчивость.

Producers

Продюсеры

Продюсеры Kafka отвечают за публикацию записей в топиках. Они могут задавать параметры подтверждения, чтобы контролировать, когда сообщение считается успешно отправленным: 

  • acks=0: Подтверждение не требуется, обеспечивая наименьшую задержку, но не гарантируя доставку 
  • acks=1: Брокер-лидер подтверждает сообщение, гарантируя, что оно было записано в лог лидера. 
  • acks=all: Все синхронизированные реплики должны подтвердить сообщение, обеспечивая наивысший уровень долговечности и отказоустойчивости.

Эти конфигурации позволяют продюсерам Kafka удовлетворять различные требования приложений к доставке и сохранению сообщений, обеспечивая надежное хранение и доступность данных для консьюмеров.

Kafka producers and brokers

Консьюмеры

Консьюмеры Kafka считывают данные из топиков. Ключевым понятием в модели консьюмеров Kafka является группа консьюмеров. Группа консьюмеров состоит из нескольких консьюмеров, задействованных вместе для чтения данных из топика. Каждый консьюмер в группе считывает данные из разных разделов топика, что обеспечивает параллельную обработку и повышенную пропускную способность. 

Когда консьюмер выходит из строя или покидает группу, Kafka автоматически переназначает партиции оставшимся консьюмерам, обеспечивая отказоустойчивость и доступность. Такая динамическая балансировка назначений партиций обеспечивает равномерное распределение нагрузки между консьюмерами в группе, оптимизируя использование ресурсов и эффективность обработки.

Kafka consumer group

Способность Kafka управлять большими объемами данных, обеспечивать отказоустойчивость и поддерживать упорядоченность сообщений делает ее идеальным выбором для использования в качестве основы для обмена сообщениями в распределенных системах, особенно в средах, требующих обработки данных в реальном времени и надежного управления параллелизмом.

Система обмена сообщениями с использованием Kafka

Использование Apache Kafka в качестве магистрали для обмена сообщениями в нашей системе позволяет нам решить различные проблемы, связанные с обработкой сообщений, долговечностью и масштабируемостью. Рассмотрим, как Kafka согласуется с нашими требованиями и облегчает реализацию системы на основе модели Actor.

Одноразовая обработка сообщений

Чтобы убедиться, что сообщения для конкретного трансфера обрабатываются последовательно и без дублирования, можем создать топик Kafka с именем transfer.commands с несколькими партициями. Ключом каждого сообщения будет transferId, что гарантирует, что все команды, относящиеся к конкретному трансферу, будут направлены в одну и ту же партицию. Поскольку партиция может быть одновременно использована только одним консьюмером, такая настройка гарантирует однократную обработку сообщений для каждого трансфера.

Producers partitions and consumers

Долговечное хранилище сообщений

Архитектура Kafka разработана таким образом, чтобы обеспечить долговечность сообщений, сохраняя их между распределенными брокерами. Вот несколько ключевых настроек Kafka, которые повышают долговечность и надежность сообщений: 

  • retention.ms: Определяет, как долго Kafka хранит запись перед удалением; например, установка log.retention.ms=604800000 сохраняет сообщения в течение 7 дней 
  • log.segment.bytes: Управляет размером каждого сегмента журнала; например, установка log.segment.bytes=1073741824 создает новые сегменты через 1 ГБ 
  • min.insync.replicas: Определяет минимальное количество реплик, которые должны подтвердить запись, прежде чем она будет считаться успешной; установка min.insync.replicas=2 гарантирует, что как минимум две реплики подтвердят запись.
  • acks: Параметр продюсера, определяющий количество требуемых подтверждений. Установка acks=all гарантирует, что все синхронизированные реплики должны подтвердить сообщение, обеспечивая высокую долговечность.

Примеры конфигурации для обеспечения долговечности сообщений:

# Example 1: Retention Policy
log.retention.ms=604800000  # Retain messages for 7 days
log.segment.bytes=1073741824  # 1 GB segment size

# Example 2: Replication and Acknowledgment
min.insync.replicas=2  # At least 2 replicas must acknowledge a write
acks=all  # Producer requires acknowledgment from all in-sync replicas

# Example 3: Producer Configuration
acks=all  # Ensures high durability
retries=5  # Number of retries in case of transient failures

Нужная модель: паттерн Actor

В нашей системе обработчик, о котором мы говорили ранее, теперь будет называться Actor. Модель Actor хорошо подходит для управления состояниями и асинхронной обработки команд, что делает ее полезной для нашей системы на основе Kafka.

Основные понятия акторной модели

  • Акторы как фундаментальные единицы: Каждый Актор отвечает за получение сообщений, их обработку и изменение своего внутреннего состояния. Это согласуется с использованием обработчиков команд для каждой передачи.
  • Асинхронная передача сообщений: Общение между Акторами происходит через топики Kafka, что обеспечивает асинхронное взаимодействие. 
  • Изоляция состояний: Каждый Актор поддерживает свое собственное состояние, которое может быть изменено только путем отправки команды Актору. Это обеспечивает контролируемое и последовательное изменение состояния.
  • Последовательная обработка сообщений: Kafka гарантирует, что сообщения внутри партиции будут обрабатываться по порядку, что поддерживает потребность акторной модели в последовательной обработке команд. 
  • Прозрачность местоположения: Акторы могут быть распределены по разным машинам или локациям, что повышает масштабируемость и отказоустойчивость.
  • Отказоустойчивость: Встроенные в Kafka механизмы отказоустойчивости в сочетании с распределенной природой акторной модели гарантируют, что система будет легко справляться с отказами. 
  • Масштабируемость: Масштабируемость системы определяется количеством партиций Kafka. Например, при 64 партициях система может обрабатывать 64 одновременные команды. Архитектура Kafka позволяет масштабироваться, добавляя партиции и консьюмеров по мере необходимости.

Реализация акторной модели в системе

Начнем с описания простого интерфейса управления состоянием:

interface StateStorage<K, S> {
    S newState();
    S get(K key);
    void put(K key, S state);
}

Далее описываем интерфейс Actor:

interface Actor<S, C> {
    S receive(S state, C command);
}

Чтобы интегрировать Kafka, нам нужны helper-интерфейсы для чтения ключей и значений из записей Kafka:

interface KafkaMessageKeyReader<K> { 
    K readKey(byte[] key); 
}

interface KafkaMessageValueReader<V> { 
    V readValue(byte[] value); 
}

Наконец, реализуем KafkaActorConsumer, который управляет взаимодействием между Kafka и нашей акторной системой:

class KafkaActorConsumer<K, S, C> {
    private final Supplier<Actor<S, C>> actorFactory;
    private final StateStorage<K, S> storage;
    private final KafkaMessageKeyReader<K> keyReader;
    private final KafkaMessageValueReader<C> valueReader;

    public KafkaActorConsumer(Supplier<Actor<S, C>> actorFactory, StateStorage<K, S> storage,
                              KafkaMessageKeyReader<K> keyReader, KafkaMessageValueReader<C> valueReader) {
        this.actorFactory = actorFactory;
        this.storage = storage;
        this.keyReader = keyReader;
        this.valueReader = valueReader;
    }

    public void consume(ConsumerRecord<byte[], byte[]> record) {
        // (1) Read the key and value from the record
        K messageKey = keyReader.readKey(record.key());
        C messageValue = valueReader.readValue(record.value());

        // (2) Get the current state from the storage
        S state = storage.get(messageKey);
        if (state == null) {
            state = storage.newState();
        }

        // (3) Get the actor instance
        Actor<S, C> actor = actorFactory.get();

        // (4) Process the message
        S newState = actor.receive(state, messageValue);

        // (5) Save the new state
        storage.put(messageKey, newState);
    }
}

Эта реализация принимает сообщения из Kafka, обрабатывает их с помощью Actor и соответствующим образом обновляет состояние. Для повышения надежности этой системы можно добавить такие дополнительные элементы, как обработка ошибок, ведение журналов и трассировка. 

Объединив мощные возможности Kafka по обмену сообщениями со структурированным управлением состоянием и параллелизмом акторной модели, мы можем построить масштабируемую, устойчивую и эффективную систему обработки финансовых транзакций. Такая система гарантирует, что каждая команда будет обработана правильно, последовательно и с полной гарантией долговечности.

Дополнительные темы

Паттерн Outbox

Паттерн Outbox — это критически важный паттерн проектирования, предназначенный для обеспечения надежной доставки сообщений в распределенных системах, особенно при интеграции PostgreSQL с Kafka. Основная проблема, которую он решает — это риск несоответствий, когда транзакция может быть зафиксирована в PostgreSQL, но соответствующее сообщение не доставлено в Kafka из-за проблем в сети или сбоя системы. Это может привести к рассинхронизации состояния базы данных и потока сообщений.

Паттерн Outbox решает эту проблему, сохраняя сообщения в локальной «выходной» таблице в рамках одной и той же транзакции PostgreSQL. Это гарантирует, что сообщение будет отправлено в Kafka только после успешной фиксации транзакции. Таким образом, обеспечивается семантика доставки точно в одно место, что предотвращает потерю сообщений и обеспечивает согласованность между базой данных и потоком сообщений.

Реализация паттерна Outbox

При наличии паттерна Outbox реализация KafkaActorConsumer и Actor может быть скорректирована с учетом этого паттерна:

record OutboxMessage(UUID id, String topic, byte[] key, Map<String, byte[]> headers, byte[] payload) {}
record ActorReceiveResult<S, M>(S newState, List<M> messages) {}

interface Actor<S, C> {
    ActorReceiveResult<S, OutboxMessage> receive(S state, C command);
}

class KafkaActorConsumer<K, S, C> {
    public void consume(ConsumerRecord<byte[], byte[]> record) {
        // ... other steps
        // (5) Process the message
        var result = actor.receive(state, messageValue);

        // (6) Save the new state
        storage.put(messageKey, result.newState());
    }

    @Transactional
    public void persist(S state, List<OutboxMessage> messages) {
        // (7) Persist the new state
        storage.put(stateKey, state);

        // (8) Persist the outbox messages
        for (OutboxMessage message : messages) {
            outboxTable.save(message);
        }
    }
}

В данной реализации:

  • Actor возвращает результат ActorReceiveResult, содержащий новое состояние и список сообщений outbox, которые необходимо отправить в Kafka. 
  • KafkaActorConsumer обрабатывает эти сообщения и сохраняет состояние и сообщения в выходной таблице в рамках одной транзакции. 
  • После фиксации транзакции внешний процесс (например, Debezium) читает из таблицы и отправляет сообщения в Kafka, обеспечивая доставку точно один раз.

Токсичные сообщения и недоставленные сообщения

В распределенных системах некоторые сообщения могут быть неправильно сформированы или вызывать ошибки, препятствующие успешной обработке. Такие проблемные сообщения часто называют «токсичными». Чтобы справиться с такими сценариями, мы можем реализовать очередь недоставленных сообщений (DLQ). Это специальная очередь, в которую отправляются необработанные сообщения для дальнейшего изучения. Такой подход гарантирует, что эти сообщения не будут блокировать обработку других сообщений, и позволяет устранить первопричину без потери данных. Вот базовая реализация обработки токсичных сообщений:

class ToxicMessage extends Exception {}
class LogicException extends ToxicMessage {}
class SerializationException extends ToxicMessage {}

class DefaultExceptionDecider {
    public boolean isToxic(Throwable e) {
        return e instanceof ToxicMessage;
    }
}

interface DeadLetterProducer {
    void send(ConsumerRecord<?, ?> record, Throwable e);
}

class Consumer {
    private final ExceptionDecider exceptionDecider;
    private final DeadLetterProducer deadLetterProducer;

    void consume(ConsumerRecord<String, String> record) {
        try {
            // process record
        } catch (Exception e) {
            if (exceptionDecider.isToxic(e)) {
                deadLetterProducer.send(record, e);
            } else {
                // throw exception to retry the operation
                throw e;
            }
        }
    }
}

В данной реализации: 

  • ToxicMessage: Базовый класс исключений для любых ошибок, которые считаются «токсичными», то есть их не следует повторять, а лучше отправить в DLQ 
  • DefaultExceptionDecider: Решает, является ли исключение токсичным и должно ли оно вызывать отправку сообщения в DLQ 
  • DeadLetterProducer: Отвечает за отправку сообщений в DLQ
  • Consumer: Обрабатывает сообщения и использует ExceptionDecider и DeadLetterProducer для соответствующей обработки ошибок

Заключение

Используя Kafka в качестве магистрали для обмена сообщениями и реализуя акторную модель, мы можем построить надежную, масштабируемую и отказоустойчивую систему финансовых транзакций. Акторная модель предлагает простой подход к управлению состоянием и параллелизмом, а Kafka предоставляет инструменты, необходимые для надежной обработки, долговечности и разделения сообщений. 

Акторная модель — это не специализированный или сложный фреймворк, а скорее набор простых абстракций, которые могут значительно повысить масштабируемость и надежность системы. Встроенные в Kafka функции, такие как долговечность, упорядочивание и отказоустойчивость сообщений, естественным образом согласуются с принципами акторной модели, позволяя реализовать эти концепции эффективно и результативно, не прибегая к дополнительным фреймворкам.

Внедрение передовых паттернов, таких как Outbox, и обработка токсичных сообщений с помощью DLQ еще больше повышает надежность системы, обеспечивая последовательную обработку сообщений и изящное устранение ошибок. Благодаря такому комплексному подходу система финансовых операций остается надежной, масштабируемой и способной беспрепятственно обрабатывать сложные рабочие процессы.

Никита Мельников, VPE в Atlantic Money

Линкедин


Какой была ваша первая зарплата в QA и как вы искали первую работу?

Мега обсуждение в нашем телеграм-канале о поиске первой работы. Обмен опытом и мнения.

1 КОММЕНТАРИЙ

Подписаться
Уведомить о
guest

1 Комментарий
Старые
Новые Популярные
Межтекстовые Отзывы
Посмотреть все комментарии
Александр
Александр
6 месяцев назад

Статья гипер-серьезная, спасибо

Мы в Telegram

Наш официальный канал
Полезные материалы и тесты
Готовимся к собеседованию
Project- и Product-менеджмент

? Популярное

? Telegram-обсуждения

Наши подписчики обсуждают, как искали первую работу в QA. Некоторые ищут ее прямо сейчас.
Наши подписчики рассказывают о том, как не бояться задавать тупые вопросы и чувствовать себя уверенно в новой команде.
Обсуждаем, куда лучше податься - в менеджмент или по технической ветке?
Говорим о конфликтных ситуациях в команде и о том, как их избежать
$1100*
медианная зарплата в QA в июне 2023

*по результатам опроса QA-инженеров в нашем телеграм-канале

Собеседование

19%*
IT-специалистов переехало или приняло решение о переезде из России по состоянию на конец марта 2022

*по результатам опроса в нашем телеграм-канале

live

Обсуждают сейчас