DatabaseClient is the central class in the main R2DBC package. It handles resource creation and release, which helps avoid common mistakes such as forgetting to close a connection. It performs the basic tasks of the main R2DBC workflow (such as creating and executing statements), leaving the application code to provide the SQL and retrieve the results. Class DatabaseClient:

  • Executes SQL queries

  • Updates statements and stored procedure calls

  • Travers instances of Result

  • Catches R2DBC exceptions and transforms them into a typed, more meaningful exception hierarchy, defined in the org.springframework.dao package. (See Consistent Hierarchy of Exceptions)

The client has a rich, free API that uses reactive types for declarative composition.

If you use DatabaseClient for your code, you only need to implement the java.util.functioninterfaces by providing them with a clearly defined contract. Given a Connection provided by a DatabaseClient class, the Function callback creates a Publisher. The same goes for display functions that retrieve the result of Row.

You can use DatabaseClient inside a DAO implementation by directly instantiating it with a reference to ConnectionFactory, or you can configure it in the Spring IoC container and pass it to DAO objects as a bean reference.

The easiest way to create a DatabaseClient object is static factory method as shown below:

Java
DatabaseClient client = DatabaseClient.create(connectionFactory);
Kotlin
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory should always be configured as a bean in the Spring IoC container.

The previous method creates a DatabaseClient with default settings .

You can also get an instance of Builder from DatabaseClient.builder(). You can configure the client by calling the following methods:

  • ….bindMarkers(…): Specify a specific BindMarkersFactory to configure a named parameter to convert database binding tokens.

  • ….executeFunction(…): Set the ExecuteFunction according to which Statement objects will be executed.

  • ....namedParameters(false): Disable named parameter expansion. Enabled by default.

Dialects are determined by BindMarkersFactoryResolver from ConnectionFactory , usually by checking ConnectionFactoryMetadata. You can let Spring automatically discover your BindMarkersFactory by registering a class that implements org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider via META-INF/spring.factories. BindMarkersFactoryResolver discovers binding marker provider implementations from the classpath using SpringFactoriesLoader.

The following databases are currently supported:

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

All SQL queries issued by this class are logged at the DEBUG level under the category corresponding to the fully qualified class name of the client instance (typically DefaultDatabaseClient). Additionally, each execution logs a breakpoint in a reactive sequence to aid debugging.

The following sections provide some examples of using DatabaseClient. These examples do not represent an exhaustive list of all functionality provided by DatabaseClient. See companion javadoc.

Statement execution

DatabaseClient provides basic statement execution functionality. The following example shows what should be included in the minimal but fully functional code that creates a new table:

Java

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
Kotlin

client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient is designed for convenient, free use. It exposes intermediate, continuation, and final methods at each stage of the execution specification. The previous example uses then() to return a terminating Publisher that completes as soon as the query (or queries, if the SQL query contains multiple statements) completes.

execute(…) accepts either a SQL query string or a Supplier<String> query to defer the actual creation of the query before it is executed.

Building queries (SELECT)

SQL queries can return values through Row objects or number of rows affected. DatabaseClient may return the number of rows updated or the rows themselves, depending on the query issued.

The following query retrieves the columns id and name from the table:

Java

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
Kotlin

val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

In the next the request uses a binding variable:

Java

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
Kotlin

val first = client.sql("SELECT id, name FROM person WHERE WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

You may have noticed the use of fetch() in the example above. fetch() is a continuation statement that lets you specify how much data to fetch.

Calling first() returns the first row of the result and discards the remaining ones lines. Data can be consumed using the following operators:

  • first() returns the first row of the entire result. Its Kotlin form of coroutine is called awaitSingle() for non-nullable return values, and awaitSingleOrNull() if the value is optional.

  • one() returns exactly one result and fails if the result contains more lines. Kotlin coroutines are used, namely awaitOne() to get exactly one value or awaitOneOrNull() if the value can be null.

  • all() returns all rows of the result. When using coroutines in Kotlin, use flow().

  • rowsUpdated() returns the number of rows affected (counter INSERT/UPDATE/DELETE). Its Kotlin coroutine variant is called awaitRowsUpdated().

Without specifying further mapping details, queries return tabular results as a Map, whose keys are case-insensitive column names that map to column values.

You can control the display of results by providing a Function<Row, T> that is called for each Row so that it can return arbitrary values (singles, collections, Maps, and objects).

The following example retrieves the name column and produces its value:

Java

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
Kotlin

val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()
What about null?

Relational database results can contain null values. The Reactive Streams specification prohibits passing null values. This requirement implies proper handling of null in the extractor function. Although it is possible to obtain null values from Row, you do not need to produce a null value. You need to wrap all null values in an object (for example, Optional for single values) to ensure that the null value is never returned directly by the extractor function .

Updating (INSERT, UPDATE, and DELETE) using DatabaseClient

The only difference with updating statements is that these statements typically do not return table data, so the rowsUpdated() function is used to obtain the results.

The following example shows a UPDATE statement that returns the number of rows updated:

Java

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
Kotlin

val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

Binding values to queries

A typical application requires parameterized SQL statements to fetch or update rows based on specific inputs data. These are typically SELECT statements limited by a WHERE clause, or INSERT and UPDATE statements that accept input parameters. Parameterized statements carry the risk of SQL injection if the parameters are not properly configured. DatabaseClient uses the bind API from R2DBC to eliminate the risk of SQL injection for query parameters. You can specify a parameterized SQL statement using the execute(...) statement and bind the parameters to the actual Statement. The R2DBC driver then runs the statement using the compiled statement and parameter substitution.

Parameter binding supports two binding strategies:

  • By index, using null parameter indices.

    • By index, using zero parameter indices.

    • By name, using placeholder name.

    The following example shows parameter binding for a request:

    
    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
        .bind("id", "joe")
        .bind("name", "Joe")
        .bind("age", 34);
R2DBC native anchor markers

R2DBC uses native database binding tokens, which depend on the actual database vendor. As an example, Postgres uses indexed tokens such as $1, $2, $n. Another example is SQL Server, which uses named binding tokens prefixed with @.

This is different from JDBC, which requires ? as binding tokens. In JDBC, the actual drivers convert ? anchor tokens into native database tokens during the execution of the statement.

R2DBC support in the Spring Framework allows you to use native anchor tokens or named anchor tokens with :name syntax.

Named parameter support uses a BindMarkersFactory instance to extend named parameters to native binding markers at query time, providing a degree of query portability across different database manufacturers.

The query preprocessor expands Collection named parameters into a series of binding tokens to eliminate the need to dynamically construct a query based on the number of arguments. Nested arrays of objects are extended to allow the use of (for example) dropdown lists.

Consider the following query:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50)) 

The previous query can be parameterized and executed as follows:

Java

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
    .bind("tuples", tuples);
Kotlin

val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
    .bind("tuples", tuples)
Ability to use drop-down lists depend on the manufacturer.

The following example shows a simpler option using IN predicates:

Java

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
    .bind("ages", Arrays.asList(35, 50));
Kotlin

val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
    .bind("tuples", arrayOf(35, 50))
R2DBC itself does not support Collection type values. However, extending the given List in the example above works for named parameters in the Spring support for R2DBC, such as for use in IN expressions as shown above. However, inserting or updating columns with an array type (such as in Postgres) requires an array type that is supported by the underlying R2DBC driver: typically a Java array, such as String[] to update a text[ column ]. Don't pass Collection<String> or the like. as an array parameter.

Statement Filters

Sometimes you need to fine-tune the options for the Statement itself before it is run. Register a Statement filter (StatementFilterFunction) through the DatabaseClient to intercept and modify statements as they execute, as shown in the following example:

Java

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
    .bind("name", …)
    .bind("state", …);
Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
            .filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
            .bind("name", …)
            .bind("state", …)

DatabaseClient also opens a simplified filter(...) overload that accepts a Function<Statement, Statement>:

Java

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
    .filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
    .filter(statement -> s.fetchSize(25));
Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
    .filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction implementations allow you to filter Statement as well as filter Result objects.

Best practices for working with DatabaseClient

Once configured, instances of the DatabaseClient class are thread safe. This is important because it means that you can configure one DatabaseClient instance and then safely inject that shared reference into multiple DAOs (or repositories). The DatabaseClient is stateful because it stores a reference to the ConnectionFactory, but this state is not a conversational state.

It is common practice to use the DatabaseClient class is to configure a ConnectionFactory in the Spring configuration file, and then inject dependencies with that common ConnectionFactory bean into the DAO classes. DatabaseClient is created in the setter for ConnectionFactory. This results in a DAO that looks something like this:

Java

public class R2dbcCorporateEventDao implements CorporateEventDao {
    private DatabaseClient databaseClient;
    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.databaseClient = DatabaseClient.create(connectionFactory);
    }
    // Method implementations for CorporateEventDao with R2DBC support follow...
}
Kotlin

class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
    private val databaseClient = DatabaseClient.create(connectionFactory)
    // Method implementations for CorporateEventDao with R2DBC support follow...
}

Alternative The explicit configuration is to use component scanning and support annotations for dependency injection. In this case, you can mark the class with the @Component annotation (making it a candidate for component scanning) and annotate the ConnectionFactory setter with @Autowired . The following example shows how to do this:

Java

@Component 
public class R2dbcCorporateEventDao implements CorporateEventDao {
    private DatabaseClient databaseClient;
    @Autowired
    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.databaseClient = DatabaseClient.create(connectionFactory); 
    }
    // Method implementations for CorporateEventDao with R2DBC support follow...
}
  1. Annotate the class with @Component.
  2. Annotate the ConnectionFactory setter with @Autowired.
  3. Create a new DatabaseClient using ConnectionFactory.
Kotlin

@Component 
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { 
    private val databaseClient = DatabaseClient(connectionFactory) 
    // Method implementations for CorporateEventDao with R2DBC support follow...
}
  1. Annotate the class using @Component.
  2. Constructor injection of the ConnectionFactory.
  3. Create a new DatabaseClient using ConnectionFactory .

Regardless of which of the template initialization styles described above you decide to use (or not to use), there is rarely a need to create a new instance of the DatabaseClient class every time you when you need to execute SQL. Once configured, the DatabaseClient instance is thread safe. If your application accesses multiple databases, you may need multiple DatabaseClient instances, which requires multiple ConnectionFactorys and therefore multiple differently configured DatabaseClient instances.

Getting auto-generated keys

INSERT statements can generate keys when inserting rows into a table that identify an auto-incrementing or ID column. To get Full control over the name of the generated column, just register a StatementFilterFunction that will query the generated key for the desired column.

Java

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
    .filter(statement -> s.returnGeneratedValues("id"))
            .map(row -> row. get("id", Integer.class))
            .first();
// generatedId generates the generated key after the INSERT statement completes
Kotlin

val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
    .filter { statement -> s.returnGeneratedValues("id") }
        .map { row -> row.get("id", Integer.class) }
        .awaitOne()
// generatedId generates the generated key after the INSERT statement completes