Wiring up Micronaut, jOOQ, Flyway and Testcontainers with R2DBC
Updated at 2022-11-28 with examples of using Kotlin coroutines in the repository layer. The proposed solution is just a PoC, but as you'll see it's pretty rough on the edges.
Using an application framework that is non-blocking by its nature is a nice thing, but when you do a lot of database operations in a blocking way, you can't really take an advantage of it. Many of the modern JVM frameworks encourage using a reactive approach, mostly because - when it has been implemented properly - it simply outperforms the "blocking" implementation . This advantage comes at a price, reactive programming has a quite shallow learning curve and sometimes also makes debugging a nightmare, but it also offers the most convenient solution for several use cases.
JDBC is the de facto standard database connectivity API in the JVM ecosystem since its introduction, however it's blocking by nature, and it's totally fine, there is nothing to do about it. JDBC simply cannot work in a non-blocking way, so using it to communicate with a database in our reactive code is not an option and that's why R2DBC was made. R2DBC is built on the Reactive Streams specifications which means that it provides a fully-reactive, non-blocking API. It was considered somewhat experimental until the recent months, but nowadays more and more library and framework starts supporting it officially, which is a great news. In this article I'll show you how to integrate it with Micronaut, Flyway, jOOQ and Testcontainers to achieve a comprehensive, production ready setup for your cloud-native microservices.
Used tools & the goal
We'll be using the following tools/technologies/libraries:
- Micronaut 3.7.x (with the official
micronaut-r2dbc
module) - PostgreSQL 13 (with the official
r2dbc-postgresql
driver) - Reactor 3.x.x
- jOOQ 3.17.* (and Etienne Studer's great Gradle plugin for jOOQ)
- Flyway 8.5.x
- Testcontainers (to be able to test the integration E2E)
A few words about the versions mentioned above:
- Micronaut introduced the official R2DBC - jOOQ integration in 3.6.0
- jOOQ has a built-in support for constructing a
DSLContext
by providing aio.r2dbc.spi.ConnectionFactory
since 3.15.0, but it also had a limitation regarding the reactive transaction handling that wasn't fixed until 3.17.0 came out
My goal was to:
- Bootstrap a Micronaut application that is able to build up and maintain an R2DBC connection to a PostgreSQL database.
- Let jOOQ to use this R2DBC connection to be able to execute SQL queries in a non-blocking way.
- Use Flyway for schema migrations by letting it use a JDBC connection (Flyway doesn't support R2DBC).
- Set up Testcontainers in a way that would fit for both jOOQ and Flyway.
The database schema
Let's take a very simple example with the following tables:
-- V1__Initial_schema.sql
create table account
(
id bigserial primary key,
name varchar(255) not null,
deleted_at timestamp null
);
create table address
(
id bigserial primary key,
account_id bigint not null references account (id),
full_address varchar(255) not null
);
create index if not exists idx_address_account_id on address (account_id);
insert into account(name, deleted_at)
values ('John', null);
insert into account(name, deleted_at)
values ('Adam', now());
insert into account(name, deleted_at)
values ('Regina', null);
insert into address(account_id, full_address)
values (1, 'Some Street in some city 12');
insert into address(account_id, full_address)
values (2, 'Another nice street somewhere');
Setting up Flyway with JDBC
Since Flyway doesn't support R2DBC connections, we should tell Flyway where is the database it should operate on. This can be simply achieved by setting the database's JDBC url under Flyway's own block in the application.yml
, instead of setting it under the datasources.*
property.
# application.yml
flyway:
datasources:
default:
enabled: true
default-schema: r2dbc-poc
url: ${DB_URL:`r2dbc:postgresql://localhost:5432/postgres`}
username: ${DB_USER:`postgres`}
password: ${DB_PASSWORD:`pass`}
Setting up Micronaut & jOOQ with R2DBC
Fortunately, Micronaut has an official micronaut-r2dbc
module that automatically sets up
a io.r2dbc.spi.ConnectionFactory
bean, while the micronaut-jooq
module is responsible for provisioning
an org.jooq.DSLContext
bean, based on this ConnectionFactory
bean.
What do you need to achieve this?
First of all, you need to add the following dependencies to your project (besides the "standard" ones that are shipped with Micronaut by default):
// build.gradle.kts
dependencies {
implementation("io.micronaut.r2dbc:micronaut-r2dbc-core")
implementation("io.micronaut.sql:micronaut-jooq")
runtimeOnly("org.postgresql:postgresql")
runtimeOnly("org.postgresql:r2dbc-postgresql")
}
You also have to set up the R2DBC datasource in your application.yml
:
r2dbc:
datasources:
default:
url: ${DB_URL:`r2dbc:postgresql://localhost:5432/postgres`}
username: ${DB_USER:`postgres`}
password: ${DB_PASSWORD:`pass`}
That's all? Yes, that's all, after these steps you'll be able to inject a DSLContext
that can be used to execute
queries over an R2DBC connection.
Fine-tuning jOOQ and setting up its generator
To configure jOOQ's generator, we'll use the nu.studer.jooq
Gradle plugin, so we have to add it to the plugins block
in our Gradle config. Also, because jOOQ's reactive transaction API is available only from the 3.17.0 version, we have to
override the version that is inherited from Micronaut's BOM. Furthermore, jOOQ's most recent version uses
the jakarta.*
namespace for several imports instead of the old javax.*
ones, so we also have to add these
dependencies implicitly to our classpath.
Please be aware that overriding the dependencies' versions that are originally coming from Micronaut's BOM is considered experimental, so make sure that you have profound tests to prevent any unwanted error in your application!
All in all, you have to add something like this to your build.gradle:
// build.gradle.kts
// Here we are overriding every jOOQ related dependencies' version
buildscript {
configurations["classpath"].resolutionStrategy.eachDependency {
if (requested.group == "org.jooq") {
useVersion("3.17.6")
}
}
}
plugins {
// ...
id("nu.studer.jooq") version "8.0.0"
}
dependencies {
// ...
implementation("jakarta.validation:jakarta.validation-api:3.0.2")
implementation("jakarta.persistence:jakarta.persistence-api:3.1.0")
}
Regarding the generator, you'll need another dependency in your build.gradle
, and also a configuration block. The configuration block below is just an example:
// build.gradle.kts
dependencies {
// ...
jooqGenerator("org.postgresql:postgresql:42.3.3")
}
val dbUrl = System.getenv("DB_URL") ?: "jdbc:postgresql://localhost:5432/postgres"
val dbUser = System.getenv("DB_USER") ?: "postgres"
val dbPassword = System.getenv("DB_PASSWORD") ?: "pass"
val dbSchema = "r2dbc-poc"
val dbDriver = "org.postgresql.Driver"
val jooqVersion = "3.17.5"
jooq {
version.set(jooqVersion)
configurations {
create("main") {
// To prevent the unnecessary regeneration of your schema sources!
generateSchemaSourceOnCompilation.set(false)
jooqConfiguration.apply {
jdbc.apply {
driver = dbDriver
url = dbUrl
user = dbUser
password = dbPassword
}
generator.apply {
name = "org.jooq.codegen.KotlinGenerator"
database.apply {
inputSchema = dbSchema
excludes = "flyway_schema_history" // We simply exclude Flyway's own meta table from the generation
}
generate.apply {
isDeprecated = false
isValidationAnnotations = true
isJpaAnnotations = true // This is a must have if you want to build a GraalVM native image from your application
isImmutablePojos = true
}
target.apply {
directory = "src/main/kotlin/jooq"
packageName = "com.akobor.r2dbcdemo"
}
}
}
}
}
}
After all these things, you should be able to generate the schema source files based on your existing database with the generateJooq
Gradle task.
Writing queries the reactive way - with Reactor
We're all set, so we can extend our skeleton repository we introduced a few steps above. Writing queries with jOOQ in a reactive way has only a little difference compared to the "regular" one. For these examples we'll use the Reactor library along with some handful, Kotlin related utility functions that are shipped with the io.projectreactor.kotlin:reactor-kotlin-extensions
package.
Basically, you have to keep two things in mind:
- you should wrap every query in a
Mono
or aFlux
- you should never call a
.fetch()
or a.fetchInto()
or anything similar that you used to at the end of your queries, because these calls will immediately block the underlying thread
Here are a few examples:
// ReactiveAccountRepository.kt
package com.akobor
import com.akobor.r2dbcdemo.tables.Account.Companion.ACCOUNT
import com.akobor.r2dbcdemo.tables.Address.Companion.ADDRESS
import jakarta.inject.Singleton
import org.jooq.DSLContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono
@Singleton
class ReactiveAccountRepository(private val ctx: DSLContext) {
fun getAccounts(): Flux<AccountDetailsWithAddress> =
Flux.from(ctx.getAccountQuery())
.map { r -> r.into(AccountDetailsWithAddress::class.java) }
private fun DSLContext.getAccountQuery() =
select(
ACCOUNT.ID,
ACCOUNT.NAME,
ACCOUNT.DELETED_AT,
ADDRESS.FULL_ADDRESS
).from(ACCOUNT).leftJoin(ADDRESS).on(ADDRESS.ACCOUNT_ID.eq(ACCOUNT.ID))
fun getAccount(accountId: Long): Mono<AccountDetailsWithAddress> = ctx
.getAccountQuery()
.where(ACCOUNT.ID.eq(accountId))
.toMono().map { r -> r.into(AccountDetailsWithAddress::class.java) }
}
The getAccounts()
method wraps a simple select into a Flux
and calls a .map()
on the items of it, to transform the result into a Kotlin data class called AccountDetailsWithAddress
. If you integrate this method into a controller action that also returns a Flux<AccountDetailsWithAddress>
then the result of a call to this endpoint will be an array of AccountDetailsWithAddress
.
The getAccount(accountId: Long)
method is very similar, but it returns only one account (or an empty Mono
if there is no account with the given ID), so we can use a Mono
here instead of a Flux
.
How about transactions?
Handling database transactions with reactive streams can be a bit tricky, because basically you have to pass the same DSLContext
through the whole chain of streams, but fortunately it's possible with the most recent versions of jOOQ (from 3.17.*). Look at the following example:
// AccountRepository.kt
fun getAccount(accountId: Long, ctx: DSLContext = this.ctx): Mono<AccountDetailsWithAddress> = ctx
.getAccountQuery()
.where(ACCOUNT.ID.eq(accountId))
.toMono().map { r -> r.into(AccountDetailsWithAddress::class.java) }
fun insertAccountWithAddress(accountWithAddressDto: AccountCreateDto): Mono<AccountDetailsWithAddress> =
// We initiate a transaction with jOOQ's new reactive transaction API
Flux.from(ctx.transactionPublisher { trx ->
trx.dsl()
.insertInto(ACCOUNT)
.columns(ACCOUNT.NAME, ACCOUNT.DELETED_AT)
.values(accountWithAddressDto.name, accountWithAddressDto.deletedAt)
.returningResult(ACCOUNT.ID)
.toMono()
.flatMap { insertedAccount ->
if (!accountWithAddressDto.fullAddress.isNullOrBlank()) {
trx.dsl().insertInto(ADDRESS)
.columns(ADDRESS.ACCOUNT_ID, ADDRESS.FULL_ADDRESS)
.values(insertedAccount.value1(), accountWithAddressDto.fullAddress)
.returningResult(ADDRESS.ACCOUNT_ID).toMono()
} else insertedAccount.toMono()
}.flatMap { insertedAccount -> getAccount(insertedAccount.value1()!!, trx.dsl()) }
}).single()
We had to modify the getAccount()
method too, because we have to be able to pass an optional DSLContext
from the transaction, otherwise we couldn't get the freshly inserted record (because the transaction will be committed only at the end of the transactionPublisher
block).
Looking at the insertAccountWithAddress()
method, we can see the following steps:
- We start a transaction inside a
Flux
with theDSLContext.transactionPublisher {}
call, and inside this: - We insert an
ACCOUNT
record and return its ID. - We check if there is an address in the payload, and insert an
ADDRESS
record as well, or just pass the account's ID to the next step - We get the freshly inserted records from the database to return it as a
Mono<AccountDetailsWithAddress>
.
If anything goes south while we're doing these things, jOOQ will automatically roll back the actual transaction.
Putting all these stuff together
By writing a simple controller we can easily test these things E2E:
// ReactiveAccountController.kt
package com.akobor
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@Controller("/reactive/accounts")
class ReactiveAccountController(private val accountRepository: ReactiveAccountRepository) : ReactiveAccountOperations {
override fun getAccounts(): Flux<AccountDetailsWithAddress> = accountRepository.getAccounts()
override fun getAccountById(accountId: Long): Mono<AccountDetailsWithAddress> =
accountRepository.getAccount(accountId)
override fun createAccount(createDto: AccountCreateDto): Mono<AccountDetailsWithAddress> =
accountRepository.insertAccountWithAddress(createDto)
}
interface ReactiveAccountOperations {
@Get("/")
fun getAccounts(): Flux<AccountDetailsWithAddress>
@Get("/{accountId}")
fun getAccountById(accountId: Long): Mono<AccountDetailsWithAddress>
@Post("/")
fun createAccount(@Body createDto: AccountCreateDto): Mono<AccountDetailsWithAddress>
}
Testing with Testcontainers
Using a PostgreSQL Testcontainer with R2DBC doesn't really differ from JDBC, fortunately. If you're not familiar with Testcontainers and Micronaut, you should read this first. Assuming that you already have a setup based on the mentioned post, you should do the following changes to the class you used to configure Testcontainers:
// TestDbContainer.kt
package com.akobor
import org.testcontainers.containers.PostgreSQLContainer
class TestDbContainer : PostgreSQLContainer<TestDbContainer>("postgres:13") {
companion object {
private lateinit var instance: TestDbContainer
fun start() {
if (!Companion::instance.isInitialized) {
instance = TestDbContainer()
instance.start()
val r2dbcUrl = "r2dbc:postgresql://${instance.host}:${instance.firstMappedPort}"
System.setProperty("r2dbc.datasources.default.url", r2dbcUrl)
System.setProperty("r2dbc.datasources.default.username", instance.username)
System.setProperty("r2dbc.datasources.default.password", instance.password)
System.setProperty("flyway.datasources.default.url", instance.jdbcUrl)
System.setProperty("flyway.datasources.default.username", instance.username)
System.setProperty("flyway.datasources.default.password", instance.password)
}
}
fun stop() {
instance.stop()
}
}
}
As you can see, the only magic here is that you have to set the container instance's URL, username and password for both configuration group (r2dbc.datasources.*
and flyway.datasources.*
). Also, you have to construct the R2DBC url manually.
With this setup now we are able to write E2E tests to see if everything works:
// ReactiveAccountControllerTest.kt
package com.akobor
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.inspectors.forNone
import io.kotest.inspectors.forOne
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.kotest.annotation.MicronautTest
import java.time.LocalDateTime
@MicronautTest
class ReactiveAccountControllerTest(
accountClient: ReactiveAccountClient,
accountRepository: ReactiveAccountRepository
) : DatabaseStringSpec({
"getting all the accounts from the DB should work E2E" {
val accounts = accountClient.getAccounts().collectList().block()!!
accounts shouldHaveSize 3
accounts.forOne { it.name shouldBe "Adam" }
accounts.forOne { it.name shouldBe "Regina" }
accounts.forOne { it.name shouldBe "John" }
}
"getting one specific account from the DB should work E2E" {
val account = accountClient.getAccountById(1).block()!!
account.id shouldBe 1
account.deletedAt shouldBe null
account.fullAddress shouldBe "Some Street in some city 12"
account.name shouldBe "John"
}
"creating a new account should work E2E" {
val accountWithAddress = AccountCreateDto(
name = "Test Person 1",
fullAddress = "Some address",
deletedAt = LocalDateTime.now()
)
val accountWithoutAddress = AccountCreateDto(
name = "Test Person 1",
fullAddress = null,
deletedAt = null
)
val createdAccountWithAddress = accountClient.createAccount(accountWithAddress).block()!!
val createdAccountWithoutAddress = accountClient.createAccount(accountWithoutAddress).block()!!
createdAccountWithAddress.id shouldNotBe null
createdAccountWithAddress.name shouldBe accountWithAddress.name
createdAccountWithAddress.fullAddress shouldBe accountWithAddress.fullAddress
createdAccountWithAddress.deletedAt shouldBe accountWithAddress.deletedAt
createdAccountWithoutAddress.id shouldNotBe null
createdAccountWithoutAddress.name shouldBe accountWithoutAddress.name
createdAccountWithoutAddress.fullAddress shouldBe null
createdAccountWithoutAddress.deletedAt shouldBe null
}
"if something bad happens during an account creation, it should be rolled back" {
val accountToCreate = AccountCreateDto(
name = "Test Person 1",
// We pass a very long string to force a DB related exception
fullAddress = "Some address".repeat(100),
deletedAt = LocalDateTime.now()
)
shouldThrow<HttpClientResponseException> {
accountClient.createAccount(accountToCreate).block()
}
val accountsInTheDb = accountRepository.getAccounts().collectList().block()!!
accountsInTheDb.forNone { it.name shouldBe accountToCreate.name }
}
})
@Client("/reactive/accounts")
interface ReactiveAccountClient : ReactiveAccountOperations
How about Kotlin coroutines? [UPDATE]
It's also possible to use Kotlin coroutines in the repository layer, but you'll have to use Reactor under the hood in this case too. To make this able, you'll need to add the implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.4")
dependency to your project.
You can see in the following example that the approach works, but you'll need some glue code to make it easier to read:
// CoroutineAccountRepository.kt
package com.akobor
import com.akobor.r2dbcdemo.tables.Account.Companion.ACCOUNT
import com.akobor.r2dbcdemo.tables.Address.Companion.ADDRESS
import jakarta.inject.Singleton
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.jooq.Configuration
import org.jooq.DSLContext
import org.jooq.Record
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
// Glue code
suspend inline fun <reified T : Any, R : Record> Publisher<R>.fetchInto(): List<T> =
Flux.from(this).map { r -> r.into(T::class.java) }.asFlow().toList()
suspend inline fun <reified T : Any, R : Record> Publisher<R>.fetchOneInto(): T? =
Mono.from(this).map { r -> r.into(T::class.java) }.awaitSingleOrNull()
suspend fun <T : Any> transactionWithSingleResult(
ctx: DSLContext,
transactional: (Configuration) -> Flow<T>
): T = Flux.from(ctx.transactionPublisher { trx -> transactional.invoke(trx).asFlux() }).awaitSingle()
@Singleton
class CoroutineAccountRepository(private val ctx: DSLContext) {
suspend fun getAccounts(): List<AccountDetailsWithAddress> = ctx.getAccountQuery().fetchInto()
private fun DSLContext.getAccountQuery() =
select(
ACCOUNT.ID,
ACCOUNT.NAME,
ACCOUNT.DELETED_AT,
ADDRESS.FULL_ADDRESS
).from(ACCOUNT).leftJoin(ADDRESS).on(ADDRESS.ACCOUNT_ID.eq(ACCOUNT.ID))
suspend fun getAccount(accountId: Long, ctx: DSLContext = this.ctx): AccountDetailsWithAddress? =
ctx.getAccountQuery().where(ACCOUNT.ID.eq(accountId)).fetchOneInto()
suspend fun insertAccountWithAddress(accountWithAddressDto: AccountCreateDto): AccountDetailsWithAddress {
return transactionWithSingleResult(ctx) { trx ->
trx.dsl()
.insertInto(ACCOUNT)
.columns(ACCOUNT.NAME, ACCOUNT.DELETED_AT)
.values(accountWithAddressDto.name, accountWithAddressDto.deletedAt)
.returningResult(ACCOUNT.ID)
.asFlow()
.map { insertedAccount ->
if (!accountWithAddressDto.fullAddress.isNullOrBlank()) {
trx.dsl().insertInto(ADDRESS)
.columns(ADDRESS.ACCOUNT_ID, ADDRESS.FULL_ADDRESS)
.values(insertedAccount.value1(), accountWithAddressDto.fullAddress)
.returningResult(ADDRESS.ACCOUNT_ID)
.awaitFirst()
} else insertedAccount
}
.map { getAccount(it.value1()!!, trx.dsl())!! }
}
}
}
If you want to check out the complete PoC, you can find it on my GitHub: https://github.com/adamkobor/micronaut-r2dbc-jooq-flyway-testcontainers-poc