Posts

Mastering Reactive Programming with Spring Data R2DBC

Aug 15, 2024
Mikhail Polivakha
24.6

Spring Data R2DBC (Reactive Relational Database Connectivity) is part of the Spring Data project that enables the developers to build Spring applications combining relational databases and reactive programming.

This article offers a guide to Spring Data R2DBC, starting with basic concepts and moving to more complex subjects as we go. We will

  • Explore the core concepts of reactive programming and R2DBC,
  • Offer a step-by-step tutorial for building your first project with Spring Data R2DBC, and 
  • Discuss more advanced topics such as differences between R2DBC and JDBC and nuances of working with R2DBC.

If you already have an understanding of reactive programming and are familiar with the key concepts of R2DBC, feel free to jump to the section you are most interested in.

The code for all examples is available on GitHub, the links will be provided in the corresponding sections. 

Introduction

What is reactive programming

Reactive programming is a declarative programming paradigm based on the concept of asynchronous (or non-blocking) event processing. It is aimed at a more efficient runtime resource usage when there’s some task in progress. In other words, a thread does not block or perform a busy wait for the environment to answer (e.g., waiting for the data on Unix domain or TCP/IP socket). This can be achieved by different means, for instance, via Linux epoll API (which, for example, Netty uses internally). As a result, reactive programming enables the creation of programs that can work with dynamic data flow.

Reactive programming relies heavily on the Observer pattern and the concept of producers and consumers. The consumer registers itself in the producer's API to notify the latter that it wants to receive events from this stream (from now on, or all events). The producer notifies the consumers automatically of any state changes. Several consumers can subscribe to one publisher. 

Another important concept of reactive programming is reactive data streams. Reactive data stream is a flow of ordered-in-time data of undetermined volume, which is controlled in a push fashion. It means that a consumer doesn’t signal the producer that it is ready to accept events. It specifies the action to be taken when the event happens, and a producer is responsible for pushing the event onto all consumers that have subscribed.

Reactive data streams can also be processed using different functions: you can aggregate, filter, map, merge streams, etc.

On the whole, reactive programming doesn’t demonstrate better overall performance than its imperative counterpart, but reactive applications can potentially scale better under load. So, you can try it out with your application and see whether there are any benefits for your particular case. 

What is Spring Data R2DBC

Developing a reactive app with the usual Spring Data APIs such as JDBC is challenging because of synchronous access to the database. So, the R2DBC specification was designed to provide the common set of features and interfaces that reactive drivers should have. Spring Data R2DBC abstracts these drivers away and provides some common Spring Data tools.

Following database reactive drivers are currently supported:

  • H2 (io.r2dbc:r2dbc-h2)
  • MariaDB (org.mariadb:r2dbc-mariadb)
  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)
  • MySQL (io.asyncer:r2dbc-mysql)
  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)
  • Postgres (io.r2dbc:r2dbc-postgresql)
  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

Spring Data R2DBC is fairly similar to Spring Data JDBC, the reason being that the code base of the two is strongly interlaced. So if you are familiar with the latter, mastering R2DBC will be easier. However, there are some differences, some of them quite significant. Most of these differences are related to entity mapping. We will discuss these differences in more detail below.

Differences aside (for a while), Spring Data R2DBC offers a selection of features for convenient database interaction, such as R2dbcEntityTemplate or Repository interfaces with support for custom queries. 

In the next section, we will see how we can implement CRUD operations with R2DBC API by creating a simple, yet fully functional demo application. 

Basics of Spring Data R2DBC

Project setup: dependencies, database configuration

The full code for the application below is available on GitHub

Prerequisites:

The best way to start a new Spring project is to visit Spring Initializr. We will need at least the following dependencies:

  • Spring Data R2DBC
  • PostgreSQL Driver
  • Spring Reactive Web
  • Liquibase Migration

Generate the project and open it in your IDE.

First of all, let’s configure the database connection. Although it is possible to configure Spring Data R2DBC via application.properties or yaml file when using it along the Spring Boot, we're going to configure the minimal required setup (like ConnectionFactory) manually.

Create an ApplicationConfiguration class that extends AbstractR2dbcConfiguration, which contains essential bean declarations for the correct R2DBC functioning.

The class requires three annotations: @Configuration, @EnableR2dbcRepositories (to be able to use reactive repository interfaces provided by Spring Data), and @EnableR2dbcAuditing (to enable auditing).

We need to create a ConnectionFactory object in the ApplicationConfiguration class, which is similar to DataSource in JDBC. As we use PostgreSQL, we can create ConnectionFactory with the help of PostgresqlConnectionFactory by passing necessary parameters for database access.

@Configuration(proxyBeanMethods = true)
@EnableR2dbcRepositories
@EnableR2dbcAuditing
public class ApplicationConfiguration extends AbstractR2dbcConfiguration {

    @Value("${r2dbc.host}")
    private String host;

    @Value("${r2dbc.port}")
    private Integer port;

    @Value("${r2dbc.password}")
    private String password;

    @Value("${r2dbc.db}")
    private String database;

    @Value("${r2dbc.username}")
    private String username;

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host(host)
                .port(port)
                .username(username)
                .database(database)
                .password(password)
                .build()
        );
    }

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager() {
        return new R2dbcTransactionManager(connectionFactory());
    }

    @Bean
    public TransactionalOperator transactionalOperator() {
        return TransactionalOperator.create(reactiveTransactionManager());
    }

}

The application.properties file contains the following configurations. This application uses a locally installed PostgreSQL server. Note that R2DBC, just like JDBC, can’t create database schema based on the domain classes, so we need to provide a schema ourselves. The most conventional way is to use a Flyway or Liquibase migration tool.

spring.application.name=r2dbc

r2dbc.host=127.0.0.1
r2dbc.port=5432
r2dbc.db=customersdb
r2dbc.username=postgres
r2dbc.password=12345

spring.liquibase.url=jdbc:postgresql://localhost:5432/customersdb
spring.liquibase.user=postgres
spring.liquibase.password=12345
spring.liquibase.change-log=classpath:db/changelog/db.changelog-master.yaml
logging.level.org.springframework.data.r2dbc=debug

As we added the Liquibase dependency when creating the project, there’s already a db.changelog directory. So, the final touch would be to create a schema and populate our database with some sample data.

The db.changelog-master.yaml file:

databaseChangeLog:
  - include:
      file: db/changelog/changelog-1-schema.sql
  - include:
      file: db/changelog/changelog-2-data.sql

The changelog-1-schema.sql file (the ALTER SEQUENCE part changes the current value of the id so that we can insert data without conflicts with the existing records):

--liquibase formatted sql
--changeset catherine:create-multiple-tables splitStatements:true endDelimiter:;

CREATE TABLE customers (
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL
);

CREATE TABLE orders (
id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
customer_id BIGINT NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
price DOUBLE PRECISION NOT NULL,
created_at TIMESTAMP NOT NULL
);

ALTER SEQUENCE customers_id_seq RESTART WITH 101;
ALTER SEQUENCE orders_id_seq RESTART WITH 101;

The changelog-2-data.sql file:

--liquibase formatted sql
--changeset catherine:populate-tables-with-data splitStatements:true endDelimiter:;

INSERT INTO customers (id, name, email)
VALUES
(1, 'Smith Paul', '[email protected]'),
(2, 'Doe Jane', '[email protected]');

INSERT INTO orders (id, customer_id, price, created_at)
VALUES
(1, 1, 123.77, '2024-03-23T20:31:34.873822Z'),
(2, 1, 20.5, '2024-06-10T15:06:34.873822Z'),
(3, 2, 765.0, '2024-04-24T23:21:34.873822Z');

Finally, let’s create classes for our domain objects: Customer and Order.

The Customer class:

@Table(name = "customers")
public class Customer {
    @Id
    private Long id;
    private String name;
    private String email;
    @Transient
    private List<Order> orders = new ArrayList<>();

    public List<Order> getOrders() {
        return  orders == null ? new ArrayList<>() : orders;
    }


// getters, setters, constructors, equals and hashCode

}

The @Table annotation indicates that the class is a candidate for database mapping. The @Id annotation marks the primary key. What we should also note here is the @Transient annotation. As Spring Data R2DBC can’t automatically map relations, we should exclude these fields from being stored in the database.

The Order class:

@Table(name = "orders")
public class Order {
    @Id
    private Long id;
    private Long customerId;
    private double price;
    @CreatedDate
    private Instant createdAt;

// getters, setters, constructors, equals and hashCode

}

Here, we should note the auditing @CreatedDate annotation that declares the date the entity was created at.

Finally, a CustomerDTO is a simple record class:

public record CustomerDTO (Long id,
                           String name,
                           String email,
                           List<Order> orders) {
}

Spring Data R2DBC Repositories

The next step is to create repositories for our entities.

There are several reactive repositories you can extend your repositories from. Their functionality is similar to that of the traditional Spring Data Repositories:

  • ReactiveCrudRepository provides common methods for finding, saving, and deleting entities;
  • ReactiveSortingRepository provides sorting capabilities for retrieving entities.

In addition, there’s support for RxJava3CrudRepository and RxJava3SortingRepository that use RxJava 3 types.

For example, let’s create repository interfaces for Customer and Order extending from ReactiveCrudRepository:

public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
}
public interface OrderRepository extends ReactiveCrudRepository<Order, Long>  {

    Flux<Order> findAllByCustomerId(Long customerId);
    Mono<Void> deleteAllByCustomerId(Long customerId);

}

The key difference is that these interfaces enable us to handle database interaction in a reactive way using Flux and Mono provided by the Project Reactor. Before moving on, let’s look at these classes in more detail to understand what’s going on under the hood of R2DBC.

Working with Flux and Mono

Flux and Mono are types of a Publisher interface, which provides a stream of elements in response to the demand received from a Subscriber. A Publisher can have many Subscribers, but each Subscriber can subscribe only once to a single Publisher. After a Subscriber subscribes to a Publisher with subscribe(Subscriber), a Publisher can send one or several onNext(T t) notifications with data, onComplete() in the case of a successful terminal state, or onError(Throwable t) in the case of a failed terminal state. But the Publisher sends notifications only if Subscription.request(long) is invoked, where long is the number of elements a Subscriber is ready to accept.

Now, returning to Flux and Mono. Mono emits 0 to 1 elements for the onNext() and completes with onComplete() or onError(). Flux, in turn, emits 0 to N elements for the onNext(), and after that completes successfully or with an error.

We will see these classes in action in the following section. 

REST API examples for creating, reading, updating, and deleting data

Let’s start with retrieving a customer by id. We will need a following method in our CustomerService class:

    public Mono<CustomerDTO> findById(Long id) {

        return customerRepository.findById(id)
                .switchIfEmpty(Mono.error(new NoSuchElementException()))
                .flatMap(this::getOrdersForCustomer)
                .as(transactionalOperator::transactional);
    }

The findById() method accesses the database and retrieves the entity if it is there. The switchIfEmpty() method is responsible for providing a fallback to another Mono in case this one completes without data. In our case, it creates a Mono that terminates with a specified error.

In case we do have a Mono<Customer>, we need to map it to a DTO and add a list of orders. That’s what flatMap() does: it transforms the value emitted by our Mono asynchronously and returns a value emitted by another Mono.

We also need to add transaction management here and in all queries because if there are queries issued in different transactions, we could potentially break the consistency of the data. 

As we already know, R2DBC can’t automatically load the entity relations, so we need to do that manually. Let’s do that in a separate getOrdersForCustomer() method:

private Mono<CustomxerDTO> getOrdersForCustomer(Customer customer) {

        return Mono.just(customer)
.zipWith(orderRepository.findAllByCustomerId(customer.getId()).collectList())
 .as(transactionalOperator::transactional)
              .map(result -> new CustomerDTO(
                        result.getT1().getId(),
                        result.getT1().getName(),
                        result.getT1().getEmail(),
                        result.getT2()));

    }

What do we have here?

  • just() creates a new Mono that emits a specified item, in our case, the customer;
  • zipWith() combines the result of this mono with a new one (in this case, a List of orders for a given customer) into a Tuple2<T1,T2>;
  • map() enables us to use Tuple2 to create a CustomerDTO with all required fields, including a List of orders.

The method for finding all customers is fairly similar, except that in this case, it returns a Flux of CustomerDTOs:

    public Flux<CustomerDTO> findAll() {
        return customerRepository.findAll()
                .flatMap(this::getOrdersForCustomer)
                .as(transactionalOperator::transactional);
    }

Let’s see how to create a customer:

    public Mono<CustomerDTO> createCustomer(Customer customer) {
        if (customer.getId() != null) {
            return Mono.error(new IllegalArgumentException());
        }

                return customerRepository.save(customer)
                .map(newCustomer -> new CustomerDTO(
                        newCustomer.getId(),
                        newCustomer.getName(),
                        newCustomer.getEmail(),
                        newCustomer.getOrders()))
                .as(transactionalOperator::transactional);

    }

The method returns a Mono<CustomerDTO> in case of a successful transaction, or a Mono that terminates with a specified error. 

If we want to update a customer, we should first check its existence in the database:

    private Mono<Boolean> customerExists(Long id) {
        return customerRepository.existsById(id).handle((exists, sink) -> {
            if (Boolean.FALSE.equals(exists)) {
                sink.error(new IllegalArgumentException());
            } else {
                sink.next(exists);
            }
        });
    }

Let’s delve into what’s happening here. existsById() returns a Mono<Boolean>, which we then use in the handle() method that calls a BiConsumer with an output sink (SynchronousSink) for each onNext(). If the boolean equals false, we call SynchronousSink.error(Throwable). Otherwise, we call SynchronousSink.next(Object)

After that, we can create a method for updating a customer and also for creating an order for a customer:

    public Mono<CustomerDTO> updateCustomer(Customer customer) {
        return customerExists(customer.getId())
                .then(customerRepository.save(customer))
                .map(newCustomer -> new CustomerDTO(
                        newCustomer.getId(),
                        newCustomer.getName(),
                        newCustomer.getEmail(),
                        newCustomer.getOrders()))
                .as(transactionalOperator::transactional);

    }

    public Mono<Void> createOrderForCustomer(Order order) {
        if (order.getId() != null) {
            return Mono.error(new IllegalArgumentException());
        }

        return customerExists(order.getCustomerId())
                .then(orderRepository.save(order)).then()
                .as(transactionalOperator::transactional);

    }

As for deleting a customer, we must first delete all orders associated with a given customer, and only after that we can delete a customer:

    public Mono<Void> deleteCustomer(Long id) {

        return orderRepository.deleteAllByCustomerId(id)
                .flatMap(c -> customerRepository.deleteById(id))
                .as(transactionalOperator::transactional);
    }

The CustomerController class is pretty straightforward and resembles the controllers we are used to writing (the full code can be found on GitHub):

@RestController
@RequestMapping("/customers")
public class CustomerController {

    private final CustomerService customerService;

    public CustomerController(CustomerService customerService) {
        this.customerService = customerService;
    }


    @GetMapping("/{id}")
    @ResponseStatus(HttpStatus.OK)
    public Mono<CustomerDTO> findCustomerById(@PathVariable Long id) {
        return customerService.findById(id);

    }

    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    public Flux<CustomerDTO> findAllCustomers() {
        return customerService.findAll();
    }

//other methods

}

Our first R2DBC application is ready! But the code above touches only the tip of an iceberg. The next section covers the intricacies of working with R2DBC in more complex scenarios. 

Advanced Topics: Spring Data R2DBC vs Spring Data JDBC

As we stated, in terms of features and functionality, Spring Data R2DBC is not that much different from Spring Data JDBC. The reason for this is a significant overlap in their codebases (let alone the fact that all Spring Data modules have a common module inside). But still, there are some discrepancies. Let’s discuss them one by one.

Embedded entities mapping

Mapping of embedded entities is not supported in general in Spring Data R2DBC. It is not that it is quite difficult or impossible to implement, it is just not ready yet. Spring Data JDBC, on the other hand, can handle embedded entities, although there are some subtleties.

Working with relationships

Here, we need to pause for a moment. There is a problem: relationships in Spring Data R2DBC are not supported in principle. There has been a lot of discussion on this topic in various threads (here or here, for example). The very first question, absolutely legitimate: how did it happen? Why are they generally not supported and why was such a product released to the market at all?

It is important to understand the reasoning behind. Let's start from the beginning, more precisely, from the process of loading entities.

Eager relations

Spring Data R2DBC does not have lazy loading in any form, just like Spring Data JDBC. There are some conceptual reasons for this. But in the case of R2DBC, the problem is not even conceptual; it is more related to the fact that we do not want to block when working in the world of reactive programming (otherwise, the whole point is lost). However, in this case, if we imagine that we have a getter, for example:

Department getDepartment();

Then calling this method will block the thread when working with lazy loading — we simply have no choice. That is because the return value is the Department — a very concrete domain object. 

We cannot do it any other way. Someone might say that we can do it like this:

Publisher<Department> getDepartment();

Or something similar. The answer is yes, theoretically, we can, but in this case, the data model becomes strongly tied to Reactor. The main question is — is this bad? There are many opinions on this, but the decision made by the Spring Data core team states that it is bad, and we will not do it this way.

Another possible option would be to take an approach similar to Hibernate, that is, to proxy the returned entity. The problem is that in Hibernate, the boundaries and the consistency of lazy loading are strictly formed by the open session and a single transaction. But we do not have a notion of session in Spring Data R2DBC

Therefore, the only viable option now is to follow Spring Data JDBC path and work always in an eager loading mode.

Loading of relationships

It should be understood that it is not always possible to load an aggregate root along with its dependencies in one query with a couple of JOINs, for example. There are cases where such an approach breaks pagination, and the framework simply would be forced to send several different queries, like when dealing with the Cartesian product problem along with pagination.

But the root of the problem here is that mapping data from the database to objects is essentially a synchronous process. If we send queries to the database at this stage, we get blocking behavior again. And there is simply no straightforward solution in this situation. Either we do not support relationships, or we block during mapping.

But! It should be noted that overall, if there is an opportunity to fetch everything in one query, then in such a mode, support for relationships is theoretically possible. It's just that it would require writing quite a complex mechanism that currently does not exist.

Fluent API for DML operations

In conclusion, to lighten the mood, it is worth mentioning that Spring Data R2DBC supports a query builder API different from Spring Data JDBC. Examples can be found here. Overall, it's quite convenient: it’s still challenging to compete with Criteria API, but it's getting close. This API is implemented on top of the same R2dbcEntityTemplate.

Conclusion

To conclude, we need to emphasize the following points:

  • Reactive applications in general do not perform as well as regular apps, but they tend to scale better under high load.
  • JDBC API was initially designed to perform synchronous communication with RDBMS. R2DBC is a spec designed for drivers as an alternative way to interact with the database in an asynchronous way.
  • Spring Data R2DBC is an even higher abstraction layer over the R2DBC compliant drivers that provides an easy way to work with the DB in a reactive manner.
  • Spring Data R2DBC and Spring Data JDBC share a lot of features, mostly because of an overlap in their codebases. 
  • Spring Data R2DBC still lacks some functionality, part of which is problematic to implement from both conceptual and practical point of view.

Still, Spring Data R2DBC is a promising project. Humanity is exploring ways to communicate reactively with R2BMS. As we saw, there are some challenges. For now, perhaps, reactive apps would work better with other databases, such as MongoDB, that do not have many of the problems described above. But that’s another story.

 

Subcribe to our newsletter

figure

Read the industry news, receive solutions to your problems, and find the ways to save money.

Further reading