Eventual Consistency in Microservices

Microservices misconception

Even though, the architecture based on Microservices has been around for some years now, there is still a terrible misconception circulating around this subject, especially in teams that are trying to move from Monolith to Microservices, without prior knowledge on how to achieve this.

Availability is one of the main reasons why we decide to go with Microservices, as in a Monolith, there is big chance that the whole system becomes unavailable do to a change in any area of the application. We do not want to stop customers from ordering because of a change in a financing calculator. However,  implementing a system with Microservices is inherently more difficult than a Monolith, as simple things that we take for granted, become more complex to achieve. 

One thing that is not present anymore is inter-service method call. While in a Monolith, OrderService class could call PaymentService class directly, this is not possible in Microservices, as each class resides in a separate domain and service. For this reason, we need to ask ourselves how to mitigate the situation when one of the services is unavailable. We also need to take into account that network latency will affect performance. So, instead of making a simple method call, now we have other issues on our plate.

Another thing that is even more problematic, is the fact that there is no central database anymore. While in a Monolith there is usually a single, central database, and requests to the system are handled in a single transaction, in Microservices, there is a separate database per service. This implies that a synchronization between the services is needed. You could argue that distributed transactions (2-phase commit) can be implemented, but as this is basically a synchronous call, we would end up in a similar situation as the inter-service method call scenario, asking ourselves what to do in case of unavailability. Additionally, we might be limited in choices, as not all technologies (e.g. Kafka) suport 2PC.

Eventual consistency

Lets take the example of an online shop during Black Friday. We are expecting large amounts of customers in a short period of time. What do we prefer ? To have as many customers as possible place orders asynchronously, and accept that orders will not be processed right away (some might not even be processed at all), or to place orders synchronously, and accept the possibility that customers won’t be able to order at all due to payment service becoming unavailable ? As we can see, in this case, availability is preferred over consistency.

What is Eventual Consistency ?

EC means the system will be consistent, at some point in time. EC is achieved through asynchronous communication between services. It is a way of implementing transactions that span multiple services, without affecting availability. In order to implement EC, we have to leverage Saga pattern, which in turns uses Outbox and Message Relay patterns. Don’t worry, as we will do a deep dive in all of them.

Saga

As its name suggest, it is a story. It is composed of a sequence of local transactions (Spring’s @Transactional), each updating data within a single service. The synchronization is done by using an event driven approach, by exchanging messages that signal the start of the next local transaction, or a rollback if it is the case. As already stated, it leverages Outbox and Message Relay patterns in order to achieve this.

Outbox

Ensures that an event is sent as part of a database transaction, by saving the event in an Events table, and leveraging a Message Relay to read the changes and produce messages. This is necessary, since tools like Kafka do not participate in a distributed transaction. While you could argue that with the help of Spring nested transactions you can achieve the same result, there is always the chance that the message is sent while the database record is not committed. For more details please read this article, as it is a very good explanation on how Spring transaction managers behave.

Message Relay

It is a mechanism for pooling the database in order to publish events from the Events table. While we can implement this ourselves, it is a better approach to use tools that already do this. We will soon see an example of using a CDC (change data capture) system like Debezium, that reads the database logs and publishes events on Kafka.

Lack of isolation

While saga is the way to implement eventual consistency, it does come with some problems that need to be addressed. Sagas are not full ACID, they are only ACD. This means that there is no isolation like in a traditional transaction. A saga can see and modify uncommitted data of another saga that is executing concurrently. This can generate the following concurrency anomalies:

Lost updates

You place an order and immediately cancel it. This can result in a placed order, as the cancel order saga has finished executing while the place order saga was in progress. The place order saga lost its update.

Dirty reads

You place an order for the last item in stock, but you do not have credit. A rollback will take place. In the meantime, another user places the same order, but gets rejected, as there are no more items left in stock. The second saga performed a dirty read of the stock, while the first saga was in progress.

Non-repeatable reads

You place an order for the last item in stock, but before this saga finishes updating the stock, another user places the same order and finishes first. If the first saga would read again the stock, it will get a non-repeatable read, as the stock has been modified.

Mitigation strategies for lack of isolation

There are different mitigation strategies for lack of isolation, some of which are solving only some concurrency anomalies, while one in particular can be used to solve all anomalies.

Semantic lock

Commutative updates

Pessimistic view

Reread value

Version file

By value

How does semantic lock fix anomalies ?

Lost updates

Dirty reads

Non-repeatable reads

What if I want more performance and I am willing to take the risk ?

Semantic lock can have a impact on performance, as sagas wait in line to do their processing. However, since the operations are asynchronous, for most scenarios this should not be a big deal. If you still want more performance, you can still leverage semantic lock to be aware of the possible anomalies and either apply the lock selectively (only several items in stock) and/or apply business mitigation strategies like offering a voucher or sending a follow up email.

Structure of a Saga

There are 3 types of steps/transactions that a saga is composed of:

Compensable transaction

Pivot transaction

Retriable transaction

Saga types

There are 2 ways of implementing a saga:

Choreography

Orchestration

Customer registration choreography Saga

The full code for this example can be found here. While we could have used frameworks like Axon or Eventuate for implementing Saga, I find it more easy to just use an event-driven approach without the use of any framework. For our example, we are going to implement a customer registration flow. Upon registering, the customer’s card is checked in order to deduct the authorization fee (no actual deduction takes place, we just check for the balance). Below is the use case diagram.

In the component diagram below, we can see that there are 2 microservices (customer and payment) involved in this use case. Each service has its own database. For this example we use MongoDB, but Debezium supports a wide range of databases. In order for Debezium to work we need a replica set for each database. The communication is possible with the help of Debezium, a change data capture system based on Kafka Connect, which reads the database logs and pushes all the changes to Kafka in the form of events. Debezium implements the outbox and message relay pattern for us.  For more information about how Debezium works, please visit this link.

Firstly, we need an environment. Below is the docker-compose file that can be used for ramping up everything. We need Debezium, Kafka, Zookeeper (Debezium requirement), Kafka-UI for checking the messages, and a replica set of 2 MongoDB instances. We will use the same replica set for both microservices, but each will have its own database. In a real production environment, we will likely want a separate replica set for each microservice. In order to connect to the replica set, we need to add 127.0.0.1 mongodb1 mongodb2 to the hosts file.

version: "3.8"
networks:
  ms-network:
    driver: bridge
    name: ms-network
services:
  #ZOOKEEPER
  zookeeper:
    container_name: zookeeper
    image: "bitnami/zookeeper:3.9.1"
    networks:
      - ms-network
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
      - KAFKA_OPTS=-Dzookeeper.4lw.commands.whitelist=ruok
    healthcheck:
      test: ["CMD", "echo", "ruok", "|", "nc", "127.0.0.1", "2181"]
      interval: 10s
      timeout: 10s
      retries: 2
  #KAFKA
  kafka:
    container_name: kafka
    image: "bitnami/kafka:3.6.0"
    networks:
      - ms-network
    ports:
      - "9093:9093"
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092, EXTERNAL://localhost:9093
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      zookeeper:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "bash", "-c", "unset" , "JMX_PORT" ,";" ,"kafka-topics.sh","--zookeeper","zookeeper:2181","--list"]
      interval: 10s
      timeout: 10s
      retries: 2
  #MONGODB1
  mongodb1:
    container_name: mongodb1
    image: "mongo:5.0.22"
    ports:
      - "27017:27017"
    networks:
      - ms-network
    command: ["--replSet", "local-mongodb-replica-set", "--bind_ip_all"]
    healthcheck:
      test: ["CMD","mongo", "--eval", "db.adminCommand('ping')"]
      interval: 10s
      timeout: 10s
      retries: 2
  #MONGODB2
  mongodb2:
    container_name: mongodb2
    image: "mongo:5.0.22"
    expose:
      - 27018
    ports:
      - "27018:27018"
    networks:
      - ms-network
    command: ["--port", "27018", "--replSet", "local-mongodb-replica-set", "--bind_ip_all"]
    depends_on: [mongodb1]
    healthcheck:
      test: ["CMD","mongo","--port","27018", "--eval", "db.adminCommand('ping')"]
      interval: 10s
      timeout: 10s
      retries: 2
  #DEBEZIUM
  debezium:
    container_name: debezium
    image: "quay.io/debezium/connect:2.4.0.Final"
    networks:
      - ms-network
    ports:
      - "8083:8083"
    environment:
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - BOOTSTRAP_SERVERS=kafka:9092
    links:
      - "zookeeper:zookeeper"
      - "kafka:kafka"
      - "mongodb1:mongodb1"
      - "mongodb2:mongodb2"
    depends_on:
      zookeeper:
        condition: service_healthy
      kafka:
        condition: service_healthy
      mongodb1:
        condition: service_healthy
      mongodb2:
        condition: service_healthy
    healthcheck:
      test: [ "CMD", "curl", "http://127.0.0.1:8083/connectors" ]
      interval: 30s
      timeout: 10s
      retries: 10
  #KAFKA-UI
  kafka-ui:
    container_name: kafka-ui
    image: "provectuslabs/kafka-ui:latest"
    platform: linux/amd64
    networks:
      - ms-network
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
    depends_on:
      zookeeper:
        condition: service_healthy
      kafka:
        condition: service_healthy
      debezium:
        condition: service_healthy

After the environment is up and running, we need to use Postman in order to configure Debezium MongoDB connector. Debezium offers a REST API for managing connectors. Adding a new connector is done via a POST request to http://localhost:8083/connectors/ with the following body:

{
  "name": "eventual-consistency-poc-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
        "mongodb.connection.string": "mongodb://mongodb1:27017,mongodb2:27018/?replicaSet=local-mongodb-replica-set&readPreference=primary&serverSelectionTimeoutMS=5000&connectTimeoutMS=10000",  
    "mongodb.members.auto.discover":"true",
    "mongodb.name": "mongodb-connector",
    "mongodb.connection.mode": "replica_set",
    "topic.prefix": "mongodb",
    "database.include.list": "customers-db,payments-db",
    "collection.exclude.list":"customers-db.consumed-events,payments-db.consumed-events",
    "time.precision.mode": "connect",
    "value.converter.schemas.enable":"false",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "transforms":"outbox",
    "transforms.outbox.type":"io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
    "transforms.outbox.collection.expand.json.payload":"true",
    "transforms.outbox.collection.fields.additional.placement":"type:header:eventType,_id:header:eventId",
    "transforms.outbox.collection.field.event.key":"aggregateId",
    "transforms.outbox.route.by.field":"aggregateType",
    "transforms.outbox.predicate":"isOutboxTable",
    "predicates":"isOutboxTable",
    "predicates.isOutboxTable.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.isOutboxTable.pattern":".*events"
  }
}

If everything is good, we should see a 201 response code. We can check the available connectors by using a GET request to the same endpoint:

[ "eventual-consistency-poc-connector" ]

That was all for the environment setup. What is left to do is to implement our microservices. In this example we use SpringBoot and Spring Cloud. For simplicity sake, the project is a multi module maven project. Normally, each microservice should stand on its own. There is also a shared library (common-lib) that holds classes common to both microservices. While in theory this is not a good idea, as long as there is someone responsible of maintaining it and it does it well, I would deem it acceptable in a real scenario. Each microservice also has a client library, that holds DTOs and such. Each client library is used by the other microservice.

parent pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.lowheap.poc</groupId>
    <artifactId>eventual-consistency</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>eventual-consistency</name>
    <description>Eventual consistency POC</description>
    <packaging>pom</packaging>

    <properties>
        <spring-cloud.version>2022.0.4</spring-cloud.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <modules>
        <module>customer/customer-service</module>
        <module>customer/customer-service-client</module>
        <module>payment/payment-service</module>
        <module>payment/payment-service-client</module>
        <module>common-lib</module>
    </modules>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.yaml</groupId>
                    <artifactId>snakeyaml</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.yaml</groupId>
                    <artifactId>snakeyaml</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>1.9.2.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.modelmapper</groupId>
            <artifactId>modelmapper</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.16.0</version>
        </dependency>
    </dependencies>
</project>

Common-lib

The shared library holds events, services, repositories and other utility classes that are used in both microservices. All events, besides ConsumedEvent, inherit from the base DomainEvent class that follows the structure required by Debezium outbox implementation. Debezium supports the addition of a payload field that is used to hold the actual event contents. Because Kafka guarantees at leat once delivery, we also need the ConsumedEvent, with its service and repository, as we need to track which events were already consumed.

DomainEvent.java

@Data
@NoArgsConstructor
@SuperBuilder
public abstract class DomainEvent implements Serializable {

    private static final long serialVersionUID = 4945887390590304513L;
    @Id
    private String id;
    private String aggregateType;
    private String aggregateId;
    private String type;
}

ConsumedEvent.java

@Document(collection= "consumed-events")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsumedEvent implements Serializable {
    private static final long serialVersionUID = 8049100074666503841L;
    @Id
    private String id;
    private String eventId;
    private LocalDateTime timestamp;
    private String eventType;
}

ConsumedEventServiceImpl.class

@Service
@Transactional
@Slf4j
public class ConsumedEventServiceImpl implements ConsumedEventService {

    private final ConsumedEventRepository consumedEventRepository;

    @Autowired
    public ConsumedEventServiceImpl(ConsumedEventRepository consumedEventRepository) {
        this.consumedEventRepository = consumedEventRepository;
    }

    @Override
    public ConsumedEvent save(ConsumedEvent consumedEvent) {
        return consumedEventRepository.save(consumedEvent);
    }

    @Override
    public ConsumedEvent findByEventId(String eventId) {
        return consumedEventRepository.findByEventId(eventId);
    }

    @Override
    @Scheduled(cron = "${consumed.events.cleanup.cron}")
    public void cleanup() {
        log.info("Cleaning up consumed events at {}", LocalDateTime.now());
        consumedEventRepository.deleteAll();
    }
}

ConsumedEventRepository.class

@Repository
public interface ConsumedEventRepository extends MongoRepository<ConsumedEvent, String> {

    ConsumedEvent findByEventId(String eventId);
}

Customer Service

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.lowheap.poc</groupId>
        <artifactId>eventual-consistency</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../../pom.xml</relativePath>
    </parent>
    <artifactId>customer-service</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>customer-service</name>
    <description>Customer service</description>
    <dependencies>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>common-lib</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>customer-service-client</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>payment-service-client</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

Holds configuration for connecting MongoDB and Kafka. Customer service needs to connect to Payment Service topic and vice-versa, but since we might not be interested in all the events that each service will send, we use the interestingEvents property in order to tell our service which events it should take into account. There is also a cleanup cron, in order to schedule the cleanup of consumed events.

server.port=9990
spring.application.name=customer-service
server.servlet.context-path=/customer-service
spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false

#mongo
spring.data.mongodb.uri=mongodb://mongodb1:27017,mongodb2:27018
spring.data.mongodb.database=customers-db
spring.data.mongodb.auto-index-creation=true

#kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9093
#functionRouter is used for routing to different consumers by a routing expression
spring.cloud.stream.bindings.functionRouter-in-0.destination=outbox.event.payment
#groups together multiple instances of this application and treats them as the same logical consumer
#(when scaling this should be provided by environment variable so there will be multiple groups, each with a number of instances)
spring.cloud.stream.bindings.functionRouter-in-0.group=customerConsumerGroup1
spring.cloud.stream.bindings.functionRouter.content-type=application/json
#enable dead letter topic.all failed events will go here. if the name is not specified each consumer will have its own DLT
spring.cloud.stream.kafka.default.consumer.enableDlq=true
spring.cloud.stream.kafka.default.consumer.dlqName=DLT
#pick up the first message that was sent when this consumer was down
spring.cloud.stream.binder.kafka.start-offset=earliest
#enable routing
spring.cloud.stream.function.routing.enabled=true
#helps in deserializing custom headers
spring.cloud.stream.kafka.binder.headerMapperBeanName=defaultKafkaHeaderMapper

interestingEvents=PaymentAuthorizedEvent;PaymentNotAuthorizedEvent
#Should take into account the period that Kafka is configured to keep the logs. When Kafka removes the logs
#consumed events can also be removed, as there is no chance of resending a previous sent log.
consumed.events.cleanup.cron=0 1 0 * * ?

CustomerServiceApplication.java

Holds the configuration of Spring Cloud custom router, that take into account only the interesting events, by filtering based on the eventType field of the Kafka event. Also we use Mongo transaction manager, as we need to insert in 2 collections and MongoDB supports transactions starting with version 4.0.

@SpringBootApplication(scanBasePackages = {"com.lowheap.poc"})
@EnableScheduling
public class CustomerServiceApplication {

	@Value("#{'${interestingEvents}'.split(';')}")
	private List<String> interestingEvents;

	public static void main(String[] args) {
		SpringApplication.run(CustomerServiceApplication.class, args);
	}

    /**
     * Helps in deserializing custom headers
     */
	@Bean
    public KafkaHeaderMapper defaultKafkaHeaderMapper(){
        DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
		defaultKafkaHeaderMapper.setRawMappedHeaders(Map.of("eventType", true, "eventId", true, KafkaHeaders.RECEIVED_KEY, true));
        return defaultKafkaHeaderMapper;
    }

	@Bean
	public ModelMapper modelMapper() {
		return new ModelMapper();
	}

	/**
	 * Route by eventType header.The name of the consumer bean should be the event type.
	 */
	@Bean
	public MessageRoutingCallback customRouter() {
		return new MessageRoutingCallback() {
			@Override
			public String routingResult(Message<?> message) {
				String eventType = (String) message.getHeaders().get("eventType");
				if (!interestingEvents.contains(eventType)) {
					return "NotInterestedEvent";
				}
				return eventType;
			}
		};
	}

	@Bean
	public MongoTransactionManager mongoTransactionManager(MongoDatabaseFactory factory) {
		return new MongoTransactionManager(factory);
	}
}

CustomerController.java

Nothing fancy here. Just a regular REST controller.

@RestController
@Slf4j
@RequestMapping(path = "/customers/")
public class CustomerController {

    private final CustomerService customerService;
    private final ModelMapper modelMapper;

    @Autowired
    public CustomerController(CustomerService customerService, ModelMapper modelMapper) {
        this.customerService = customerService;
        this.modelMapper = modelMapper;
    }

    @PostMapping
    public ResponseEntity<CustomerDto> register(@RequestBody CustomerDto customerDto) {
        Customer customer = customerService.createPendingCustomerRegistration(modelMapper.map(customerDto, Customer.class));
        log.info("Saved customer {}", customer);
        return new ResponseEntity<>(modelMapper.map(customer, CustomerDto.class), OK);
    }

    @GetMapping(value = "/{id}")
    public ResponseEntity<CustomerDto> findById(@PathVariable("id") String id) {
        Customer customer = customerService.findById(id);
        log.info("Found customer {}", customer);
        return new ResponseEntity<>(modelMapper.map(customer, CustomerDto.class), OK);

    }
}

CustomerServiceImpl.class

As we can see in the create/update methods, after saving the Customer we also need to save an event to the customer-events collection. Every create/update operation needs to save an event as the last step. From this point on, Debezium reads the logs of MongoDB and pushes the them to Kafka. This class contains also the dead-lock detection scheduler.

Please note the the @SagaStart, @SagaEnd, @Compensable, @Retriable, @Pivot and @CompensationFor, @DeadLockDetectionFor, @DeadLockCleanupFor annotations are just marker annotations used for better understanding of the flow.

@Service
@Transactional
@Slf4j
public class CustomerServiceImpl implements CustomerService {

    private final CustomerRepository customerRepository;
    private final CustomerEventsRepository customerEventsRepository;
    private final ModelMapper modelMapper;


    @Autowired
    public CustomerServiceImpl(CustomerRepository customerRepository, CustomerEventsRepository customerEventsRepository, ModelMapper modelMapper) {
        this.customerRepository = customerRepository;
        this.customerEventsRepository = customerEventsRepository;
        this.modelMapper = modelMapper;
    }

    @Override
    @Compensable(sagas = {CUSTOMER_REGISTRATION_SAGA})
    @SagaStart(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public Customer createPendingCustomerRegistration(Customer customer) {
        customer.setStatus(RegistrationStatus.REGISTRATION_PENDING);
        customer.setCreatedAt(LocalDateTime.now());
        customer = customerRepository.save(customer);
        CustomerPendingRegistrationEventPayload payload = CustomerPendingRegistrationEventPayload.builder().customerDto(modelMapper.map(customer, CustomerDto.class)).build();
        CustomerPendingRegistrationEvent customerPendingRegistrationEvent = CustomerPendingRegistrationEvent.builder().aggregateId(customer.getId()).aggregateType(CUSTOMER).type(CUSTOMER_PENDING_REGISTRATION_EVENT).payload(payload).build();
        customerEventsRepository.save(customerPendingRegistrationEvent);
        return customer;
    }

    @Override
    @Retriable(sagas = {CUSTOMER_REGISTRATION_SAGA})
    @SagaEnd(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public void approveRegistration(String customerId) {
        log.info("Registration success");
        updateRegistrationStatus(customerId, RegistrationStatus.REGISTRATION_SUCCESS, null);
    }

    @Override
    @CompensationFor(sagas = {CUSTOMER_REGISTRATION_SAGA}, compensableOperations = {"createPendingCustomerRegistration"})
    @SagaEnd(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public void rejectRegistration(String customerId, String reason) {
        log.info("Registration failed");
        updateRegistrationStatus(customerId, RegistrationStatus.REGISTRATION_FAILED, reason);
    }

    @Override
    public Customer findById(String id) {
        Optional<Customer> customer = customerRepository.findById(id);
        if(customer.isEmpty()){
            throw new BusinessException(CUSTOMER_NOT_FOUND);
        }
        return customer.get();
    }

    @Override
    @Scheduled(fixedRate = 50000)
    @DeadLockDetectionFor(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public void checkForRegistrationDeadLock() {
        log.info("Checking for registration dead-locks...");
        int deadLocks = 0;
        List<Customer> pendingRegistrationCustomers = customerRepository.findByStatus(RegistrationStatus.REGISTRATION_PENDING);
        for(Customer customer : pendingRegistrationCustomers){
            if(Duration.between(customer.getCreatedAt(), LocalDateTime.now()).getSeconds()/60 > 5){
                log.info("Registration dead-lock detected for customer {}", customer.getId());
                rejectRegistration(customer.getId(), REGISTRATION_DEAD_LOCK_DETECTED);
                String name = new StringBuilder(customer.getFirstName()).append(" ").append(customer.getLastName()).toString();
                CardDto cardDto = modelMapper.map(customer.getCard(), CardDto.class);
                CustomerRegistrationDeadLockEvent deadLockEvent = CustomerRegistrationDeadLockEvent.builder()
                        .aggregateId(customer.getId()).aggregateType(CUSTOMER).type(CUSTOMER_REGISTRATION_DEAD_LOCK_EVENT)
                        .payload(CustomerRegistrationDeadLockEventPayload.builder().name(name).card(cardDto).build()).build();
                customerEventsRepository.save(deadLockEvent);
                deadLocks++;
            }
        }
        log.info("{} registration dead-locks found", deadLocks);

    }

    private void updateRegistrationStatus(String customerId, RegistrationStatus status, String failReason){
        Optional<Customer> customerOptional = customerRepository.findById(customerId);
        if(customerOptional.isEmpty()){
            //the message should be put on dlq
            throw new IllegalArgumentException(CUSTOMER_NOT_FOUND);
        }
        Customer customer = customerOptional.get();
        customer.setStatus(status);
        customer = customerRepository.save(customer);
        CustomerDto customerDto = modelMapper.map(customer, CustomerDto.class);
        if(status == RegistrationStatus.REGISTRATION_SUCCESS) {
            CustomerRegisteredEventPayload payload = CustomerRegisteredEventPayload.builder().customerDto(customerDto).build();
            CustomerRegisteredEvent customerRegisteredEvent = CustomerRegisteredEvent.builder().aggregateId(customer.getId()).aggregateType(CUSTOMER).type(CUSTOMER_REGISTERED_EVENT).payload(payload).build();
            customerEventsRepository.save(customerRegisteredEvent);
        }else{
            CustomerNotRegisteredEventPayload payload = CustomerNotRegisteredEventPayload.builder().reason(failReason).customerDto(customerDto).build();
            CustomerNotRegisteredEvent customerNotRegisteredEvent = CustomerNotRegisteredEvent.builder().aggregateId(customer.getId()).aggregateType(CUSTOMER).type(CUSTOMER_NOT_REGISTERED_EVENT).payload(payload).build();
            customerEventsRepository.save(customerNotRegisteredEvent);
        }
    }

}

Repositories

There are 2 mongo repositories, one for customer and one for the customer events.

@Repository
public interface CustomerRepository extends MongoRepository<Customer, String> {
    List<Customer> findByStatus(RegistrationStatus status);
}
@Repository
public interface CustomerEventsRepository extends MongoRepository<CustomerEvent, UUID> {
}

Kafka consumers

Customer service holds 3 consumers, out of which 2 are processing payment events (PaymentAuthorizedEventConsumer, PaymentNotAuthorizedEventConsumer). Every processing checks if the event was already consumed, as Kafka guarantees at leat once delivery. Also the consumed event is saved to the consumed-events collection. NotInterestedConsumer just logs the events that are not interesting, as Kafka sends them anyway.

@Service(PAYMENT_AUTHORIZED_EVENT)
@Slf4j
@Transactional
public class PaymentAuthorizedEventConsumer implements Consumer<Message<PaymentAuthorizedEventPayload>> {

    private final CustomerService customerService;
    private final ConsumedEventService consumedEventService;

    @Autowired
    public PaymentAuthorizedEventConsumer(CustomerService customerService, ConsumedEventService consumedEventService) {
        this.customerService = customerService;
        this.consumedEventService = consumedEventService;
    }


    @Override
    public void accept(Message<PaymentAuthorizedEventPayload> event) {
        String eventId = (String) event.getHeaders().get("eventId");
        //skip an already consumed event
        if (consumedEventService.findByEventId(eventId) != null) {
            log.info("Event with id " + eventId + " already processed");
            return;
        }
        customerService.approveRegistration(event.getPayload().getCustomerId());
        //save the consumed event
        consumedEventService.save(ConsumedEvent.builder().eventId(eventId).timestamp(LocalDateTime.now()).build());
    }
}
public class PaymentNotAuthorizedEventConsumer implements Consumer<Message<PaymentAuthorizationFailedEventPayload>> {

    private final CustomerService customerService;
    private final ConsumedEventRepository consumedEventRepository;

    @Autowired
    public PaymentNotAuthorizedEventConsumer(CustomerService customerService, ConsumedEventRepository consumedEventRepository) {
        this.customerService = customerService;
        this.consumedEventRepository = consumedEventRepository;
    }

    @Override
    public void accept(Message<PaymentAuthorizationFailedEventPayload> event) {
        String eventId = (String) event.getHeaders().get("eventId");
        //skip an already consumed event
        if(consumedEventRepository.findByEventId(eventId) != null){
            log.info("Event with id " + eventId + " already processed");
            return;
        }
        customerService.rejectRegistration(event.getPayload().getCustomerId(), event.getPayload().getReason());
        //save the consumed event
        consumedEventRepository.save(ConsumedEvent.builder().eventId(eventId).timestamp(LocalDateTime.now()).build());
    }
}
@Service("NotInterestedEvent")
@Slf4j
public class NotInterestedConsumer implements Consumer<Message<Object>> {
    @Override
    public void accept(Message<Object> objectMessage) {
        log.info("Not interested about event :" + objectMessage.getHeaders().get("eventType"));
    }
}

Events

There are multiple events, but they all follow the same pattern. Inherit from CustomerEvent (which inherits from DomainEvent) and add a payload field, which holds all the data we need.

@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class CustomerRegisteredEvent extends CustomerEvent{
    private static final long serialVersionUID = 2046835779768635862L;
    private CustomerRegisteredEventPayload payload;
}

Payment Service

The structure is similar with Customer Service. It follows the same patterns.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.lowheap.poc</groupId>
        <artifactId>eventual-consistency</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../../pom.xml</relativePath>
    </parent>
    <artifactId>payment-service</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>payment-service</name>
    <description>Payment service</description>


    <dependencies>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>common-lib</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>payment-service-client</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.lowheap.poc</groupId>
            <artifactId>customer-service-client</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

server.port=9991
spring.application.name=payment-service
server.servlet.context-path=/payment-service
spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false
#spring.data.mongodb.uri=mongodb://dev-backend-cosmos01:yw3AL4Ks5ySYByYvQc5uVnqAfIPhipz8Csr4KudD3RyvuO5bUDZIxdl0KlCDaOQ9efnpTFb74VHCoKgfXNUigA==@dev-backend-cosmos01.mongo.cosmos.azure.com:10255/?ssl=true&amp;replicaSet=globaldb&amp;retryWrites=false&amp;socketKeepAlive=true&amp;maxIdleTimeMS=20000&amp;minPoolSize=5&amp;maxPoolSize=10&amp;connectTimeoutMS=30000&amp;appName=@dev-backend-cosmos01@
spring.data.mongodb.uri=mongodb://mongodb1:27017,mongodb2:27018
spring.data.mongodb.database=payments-db
spring.data.mongodb.auto-index-creation=true
#kafka
spring.cloud.stream.kafka.binder.brokers=localhost:9093
#functionRouter is used for routing to different consumers by a routing expression
spring.cloud.stream.bindings.functionRouter-in-0.destination=outbox.event.customer
#groups together multiple instances of this application and treats them as the same logical consumer
#(when scaling this should be provided by environment variable so there will be multiple groups, each with a number of instances)
spring.cloud.stream.bindings.functionRouter-in-0.group=paymentConsumerGroup1
spring.cloud.stream.bindings.functionRouter.content-type=application/json
#enable dead letter topic.all failed events will go here. if the name is not specified each consumer will have its own DLT
spring.cloud.stream.kafka.default.consumer.enableDlq=true
spring.cloud.stream.kafka.default.consumer.dlqName=DLT
#pick up the first message that was sent when this consumer was down
spring.cloud.stream.binder.kafka.start-offset=earliest
#enable routing
spring.cloud.stream.function.routing.enabled=true
#helps in deserializing custom headers
spring.cloud.stream.kafka.binder.headerMapperBeanName=defaultKafkaHeaderMapper
interestingEvents=CustomerPendingRegistrationEvent;CustomerRegistrationDeadLockEvent
#Should take into account the period that Kafka is configured to keep the logs. When Kafka removes the logs
#consumed events can also be removed, as there is no chance of resending a previous sent log.
consumed.events.cleanup.cron=0 1 0 * * ?

authorization.fee=2
company.name=SOME-COMPANY
company.bankAccount=1234 5678 9101 1121
company.bankName=SOME-BANK

PaymentServiceApplication.java

@SpringBootApplication(scanBasePackages = {"com.lowheap.poc"})
public class PaymentServiceApplication {

    @Value("#{'${interestingEvents}'.split(';')}")
    private List<String> interestingEvents;


    public static void main(String[] args) {
        SpringApplication.run(PaymentServiceApplication.class, args);
    }


    @Bean
    public ModelMapper modelMapper() {
        return new ModelMapper();
    }

    /**
     * Helps in deserializing custom headers
     */
    @Bean
    public KafkaHeaderMapper defaultKafkaHeaderMapper(){
        DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
        defaultKafkaHeaderMapper.setRawMappedHeaders(Map.of("eventType", true, "eventId", true, KafkaHeaders.RECEIVED_KEY, true));
        return defaultKafkaHeaderMapper;
    }

    /**
     * Route by eventType header.The name of the consumer bean should be the event type.
     */
    @Bean
    public MessageRoutingCallback customRouter() {
        return new MessageRoutingCallback() {
            @Override
            public String routingResult(Message<?> message) {
                String eventType = (String) message.getHeaders().get("eventType");
                if (!interestingEvents.contains(eventType)) {
                    return "NotInterestedEvent";
                }
                return eventType;
            }
        };
    }

    @Bean
    public MongoTransactionManager mongoTransactionManager(MongoDatabaseFactory factory) {
        return new MongoTransactionManager(factory);
    }
}

PaymentServiceImpl.java

This class contains also a dead-lock cleanup step.

@Service
@Transactional
@Slf4j
public class PaymentServiceImpl implements PaymentService{

    private final PaymentRepository paymentRepository;
    private final PaymentEventsRepository paymentEventsRepository;
    private final ModelMapper modelMapper;

    @Value("${authorization.fee}")
    private Double authorizationFee;
    @Value("${company.name}")
    private String companyName;
    @Value("${company.bankAccount}")
    private String companyBankAccount;
    @Value("${company.bankName}")
    private String companyBankName;

    @Autowired
    public PaymentServiceImpl(PaymentRepository paymentRepository, PaymentEventsRepository paymentEventsRepository,ModelMapper modelMapper) {
        this.paymentRepository = paymentRepository;
        this.paymentEventsRepository = paymentEventsRepository;
        this.modelMapper = modelMapper;
    }

    @Override
    @Pivot(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public Payment authorizeCustomer(Customer customer) {
        Payment payment = buildPaymentFromCustomer(customer, true);
        if (payment.getPayer().getCard().getBalance() < authorizationFee) {
            //do compensation
            return failCustomerAuthorization(customer, Messages.NO_BALANCE_LEFT);
        }
        payment = paymentRepository.save(payment);
        PaymentAuthorizedEventPayload payload = PaymentAuthorizedEventPayload.builder().customerId(customer.getId()).payment(modelMapper.map(payment, PaymentDto.class)).build();
        //save the new event to be sent
        PaymentAuthorizedEvent paymentAuthorizedEvent = PaymentAuthorizedEvent.builder().payload(payload).type(PAYMENT_AUTHORIZED_EVENT)
                .aggregateId(payment.getId()).aggregateType(PAYMENT).build();
        paymentEventsRepository.save(paymentAuthorizedEvent);
        log.info("Customer payment authorized");
        return payment;
    }

    @Override
    @CompensationFor(sagas = {CUSTOMER_REGISTRATION_SAGA}, compensableEvents = {CUSTOMER_PENDING_REGISTRATION_EVENT})
    public Payment failCustomerAuthorization(Customer customer, String reason) {
        Payment payment = buildPaymentFromCustomer(customer, false);
        payment = paymentRepository.save(payment);
        PaymentAuthorizationFailedEvent paymentAuthorizationFailedEvent = PaymentAuthorizationFailedEvent.builder()
                .payload(PaymentAuthorizationFailedEventPayload.builder().customerId(customer.getId()).reason(reason).payment(modelMapper.map(payment, PaymentDto.class)).build())
                .type(PAYMENT_NOT_AUTHORIZED_EVENT).aggregateId(payment.getId()).aggregateType(PAYMENT).build();
        paymentEventsRepository.save(paymentAuthorizationFailedEvent);
        log.info("Customer payment not authorized");
        return payment;
    }

    @Override
    @DeadLockCleanupFor(sagas = {CUSTOMER_REGISTRATION_SAGA})
    public void revertAuthorizationPayment(Payer payer) {
        log.info("Deleting authorization fee payments for payer {} if any" , payer.getName());
        paymentRepository.deleteByPayer(payer);
    }


    private Payment buildPaymentFromCustomer(Customer customer, boolean authorized) {
        String name = customer.getFirstName() + " " + customer.getLastName();
        Card card = modelMapper.map(customer.getCard(), Card.class);
        Payer payer = Payer.builder().name(name).card(card).build();
        Payee company = Payee.builder().name(companyName).bankAccount(companyBankAccount).bankName(companyBankName).build();
        return Payment.builder().payer(payer).payee(company).amount(authorizationFee).authorized(authorized).build();
    }
}

Repositories

@Repository
public interface PaymentRepository extends MongoRepository<Payment, String> {
    List<Payment> deleteByPayer(Payer payer);
}
@Repository
public interface PaymentEventsRepository extends MongoRepository<PaymentEvent, String> {
}

Kafka consumers

@Service(CUSTOMER_PENDING_REGISTRATION_EVENT)
@Slf4j
@Transactional
public class CustomerPendingRegistrationEventConsumer implements Consumer<Message<CustomerPendingRegistrationEventPayload>> {

    private final PaymentService paymentService;
    private final ConsumedEventService consumedEventService;
    private final ModelMapper modelMapper;

    @Autowired
    public CustomerPendingRegistrationEventConsumer(PaymentService paymentService, ConsumedEventService consumedEventService, ModelMapper modelMapper) {
        this.paymentService = paymentService;
        this.consumedEventService = consumedEventService;
        this.modelMapper = modelMapper;
    }

    @Override
    public void accept(Message<CustomerPendingRegistrationEventPayload> event) {
        String eventId = (String) event.getHeaders().get("eventId");
        //skip an already consumed event
        if (consumedEventService.findByEventId(eventId) != null) {
            log.info("Event with id " + eventId + " already processed");
            return;
        }
        CustomerDto customer = event.getPayload().getCustomerDto();
        paymentService.authorizeCustomer(modelMapper.map(customer, Customer.class));
        //save the consumed event
        consumedEventService.save(ConsumedEvent.builder().eventId(eventId).timestamp(LocalDateTime.now()).build());
    }
}
@Service("NotInterestedEvent")
@Slf4j
public class NotInterestedConsumer implements Consumer<Message<Object>> {
    @Override
    public void accept(Message<Object> objectMessage) {
        log.info("Not interested about event :" + objectMessage.getHeaders().get("eventType"));
    }
}

Events

The same idea as in Customer Service applies. The only difference is that all events extend PaymentEvent.

@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class PaymentAuthorizedEvent extends PaymentEvent {
    private static final long serialVersionUID = 3701425076046479598L;
    private PaymentAuthorizedEventPayload payload;
}

How to test the example ?

I encourage you to take the code from Github and run it. After you have started the environment and both services, you should use Postman to send a POST request to http://localhost:9990/customer-service/customers/ with the following JSON:

{
    "firstName": "John",
    "lastName": "Doe",
    "card": {
        "number": "1234567890",
        "holderName": "John Doe",
        "cvv": 120,
        "validThru": "2024-05-04",
        "balance": 100
    }
}

This is the successful case so the customer should be registered, as there is available balance. The response is a customer with status REGISTRATION_PENDING.

In order to check if the the registration was successful, you need send a GET request to http://localhost:9990/customer-service/customers/{id-of-the-customer-from-postman-reponse}.

Testing the unsuccessful scenario implies changing the balance to a value < 2, as the application checks if the balance is lower than the authorization fee of 2. In the end there will be status of REGISTRATION_FAILED.

Kafka UI

Kafka UI can be accessed at http://localhost:8080/. Here we can see all the topics and the messages. We do not create anything by ourselves. Debezium handles everything on our behalf. Services need to subscribe to outbox.* topics.

Conclusion

This has been a long post, in which we have seen the caveats of implementing transactions in microservices. I would even dare say that if you do not have this kind of problems in your current transition from Monolith to Microservices, you most likely have implemented a Distributed Monolith. However, this should not discourage you from implementing Microservice Architecture, as it provides many advantages over Monolith. I encourage you to read Microservice Patterns book, as it is a good source for learning microservices.

Leave a Reply

Your email address will not be published. Required fields are marked *