@akobor

Wiring up Micronaut, jOOQ, Flyway and Testcontainers with R2DBC

Cover Image for Wiring up Micronaut, jOOQ, Flyway and Testcontainers with R2DBC
Adam Kobor
Adam Kobor
| 15 min read

Updated at 2022-11-28 with examples of using Kotlin coroutines in the repository layer. The proposed solution is just a PoC, but as you'll see it's pretty rough on the edges.

Using an application framework that is non-blocking by its nature is a nice thing, but when you do a lot of database operations in a blocking way, you can't really take an advantage of it. Many of the modern JVM frameworks encourage using a reactive approach, mostly because - when it has been implemented properly - it simply outperforms the "blocking" implementation . This advantage comes at a price, reactive programming has a quite shallow learning curve and sometimes also makes debugging a nightmare, but it also offers the most convenient solution for several use cases.

JDBC is the de facto standard database connectivity API in the JVM ecosystem since its introduction, however it's blocking by nature, and it's totally fine, there is nothing to do about it. JDBC simply cannot work in a non-blocking way, so using it to communicate with a database in our reactive code is not an option and that's why R2DBC was made. R2DBC is built on the Reactive Streams specifications which means that it provides a fully-reactive, non-blocking API. It was considered somewhat experimental until the recent months, but nowadays more and more library and framework starts supporting it officially, which is a great news. In this article I'll show you how to integrate it with Micronaut, Flyway, jOOQ and Testcontainers to achieve a comprehensive, production ready setup for your cloud-native microservices.

Used tools & the goal

We'll be using the following tools/technologies/libraries:

  • Micronaut 3.7.x (with the official micronaut-r2dbc module)
  • PostgreSQL 13 (with the official r2dbc-postgresql driver)
  • Reactor 3.x.x
  • jOOQ 3.17.* (and Etienne Studer's great Gradle plugin for jOOQ)
  • Flyway 8.5.x
  • Testcontainers (to be able to test the integration E2E)

A few words about the versions mentioned above:

  • Micronaut introduced the official R2DBC - jOOQ integration in 3.6.0
  • jOOQ has a built-in support for constructing a DSLContext by providing a io.r2dbc.spi.ConnectionFactory since 3.15.0, but it also had a limitation regarding the reactive transaction handling that wasn't fixed until 3.17.0 came out

My goal was to:

  1. Bootstrap a Micronaut application that is able to build up and maintain an R2DBC connection to a PostgreSQL database.
  2. Let jOOQ to use this R2DBC connection to be able to execute SQL queries in a non-blocking way.
  3. Use Flyway for schema migrations by letting it use a JDBC connection (Flyway doesn't support R2DBC).
  4. Set up Testcontainers in a way that would fit for both jOOQ and Flyway.

The database schema

Let's take a very simple example with the following tables:

-- V1__Initial_schema.sql

create table account
(
    id         bigserial primary key,
    name       varchar(255) not null,
    deleted_at timestamp null

);

create table address
(
    id           bigserial primary key,
    account_id   bigint       not null references account (id),
    full_address varchar(255) not null
);

create index if not exists idx_address_account_id on address (account_id);

insert into account(name, deleted_at)
values ('John', null);
insert into account(name, deleted_at)
values ('Adam', now());
insert into account(name, deleted_at)
values ('Regina', null);

insert into address(account_id, full_address)
values (1, 'Some Street in some city 12');
insert into address(account_id, full_address)
values (2, 'Another nice street somewhere');

Setting up Flyway with JDBC

Since Flyway doesn't support R2DBC connections, we should tell Flyway where is the database it should operate on. This can be simply achieved by setting the database's JDBC url under Flyway's own block in the application.yml, instead of setting it under the datasources.* property.

# application.yml

flyway:
  datasources:
    default:
      enabled: true
      default-schema: r2dbc-poc
      url: ${DB_URL:`r2dbc:postgresql://localhost:5432/postgres`}
      username: ${DB_USER:`postgres`}
      password: ${DB_PASSWORD:`pass`}

Setting up Micronaut & jOOQ with R2DBC

Fortunately, Micronaut has an official micronaut-r2dbc module that automatically sets up a io.r2dbc.spi.ConnectionFactory bean, while the micronaut-jooq module is responsible for provisioning an org.jooq.DSLContext bean, based on this ConnectionFactory bean.

What do you need to achieve this?

First of all, you need to add the following dependencies to your project (besides the "standard" ones that are shipped with Micronaut by default):

// build.gradle.kts

dependencies {
    implementation("io.micronaut.r2dbc:micronaut-r2dbc-core")
    implementation("io.micronaut.sql:micronaut-jooq")
    runtimeOnly("org.postgresql:postgresql")
    runtimeOnly("org.postgresql:r2dbc-postgresql")
}

You also have to set up the R2DBC datasource in your application.yml:

r2dbc:
  datasources:
    default:
      url: ${DB_URL:`r2dbc:postgresql://localhost:5432/postgres`}
      username: ${DB_USER:`postgres`}
      password: ${DB_PASSWORD:`pass`}

That's all? Yes, that's all, after these steps you'll be able to inject a DSLContext that can be used to execute queries over an R2DBC connection.

Fine-tuning jOOQ and setting up its generator

To configure jOOQ's generator, we'll use the nu.studer.jooq Gradle plugin, so we have to add it to the plugins block in our Gradle config. Also, because jOOQ's reactive transaction API is available only from the 3.17.0 version, we have to override the version that is inherited from Micronaut's BOM. Furthermore, jOOQ's most recent version uses the jakarta.* namespace for several imports instead of the old javax.* ones, so we also have to add these dependencies implicitly to our classpath.

Please be aware that overriding the dependencies' versions that are originally coming from Micronaut's BOM is considered experimental, so make sure that you have profound tests to prevent any unwanted error in your application!

All in all, you have to add something like this to your build.gradle:

// build.gradle.kts

// Here we are overriding every jOOQ related dependencies' version
buildscript {
    configurations["classpath"].resolutionStrategy.eachDependency {
        if (requested.group == "org.jooq") {
            useVersion("3.17.6")
        }
    }
}

plugins {
    // ...
    id("nu.studer.jooq") version "8.0.0"
}

dependencies {
    // ...
    implementation("jakarta.validation:jakarta.validation-api:3.0.2")
    implementation("jakarta.persistence:jakarta.persistence-api:3.1.0")
}

Regarding the generator, you'll need another dependency in your build.gradle, and also a configuration block. The configuration block below is just an example:

// build.gradle.kts

dependencies {
    // ...
    jooqGenerator("org.postgresql:postgresql:42.3.3")
}

val dbUrl = System.getenv("DB_URL") ?: "jdbc:postgresql://localhost:5432/postgres"
val dbUser = System.getenv("DB_USER") ?: "postgres"
val dbPassword = System.getenv("DB_PASSWORD") ?: "pass"
val dbSchema = "r2dbc-poc"
val dbDriver = "org.postgresql.Driver"
val jooqVersion = "3.17.5"

jooq {
    version.set(jooqVersion)

    configurations {
        create("main") {
            // To prevent the unnecessary regeneration of your schema sources! 
            generateSchemaSourceOnCompilation.set(false)

            jooqConfiguration.apply {
                jdbc.apply {
                    driver = dbDriver
                    url = dbUrl
                    user = dbUser
                    password = dbPassword
                }
                generator.apply {
                    name = "org.jooq.codegen.KotlinGenerator"
                    database.apply {
                        inputSchema = dbSchema
                        excludes = "flyway_schema_history" // We simply exclude Flyway's own meta table from the generation
                    }
                    generate.apply {
                        isDeprecated = false
                        isValidationAnnotations = true
                        isJpaAnnotations = true // This is a must have if you want to build a GraalVM native image from your application
                        isImmutablePojos = true
                    }
                    target.apply {
                        directory = "src/main/kotlin/jooq"
                        packageName = "com.akobor.r2dbcdemo"
                    }
                }
            }
        }
    }
}

After all these things, you should be able to generate the schema source files based on your existing database with the generateJooq Gradle task.

Writing queries the reactive way - with Reactor

We're all set, so we can extend our skeleton repository we introduced a few steps above. Writing queries with jOOQ in a reactive way has only a little difference compared to the "regular" one. For these examples we'll use the Reactor library along with some handful, Kotlin related utility functions that are shipped with the io.projectreactor.kotlin:reactor-kotlin-extensions package.

Basically, you have to keep two things in mind:

  • you should wrap every query in a Mono or a Flux
  • you should never call a .fetch() or a .fetchInto() or anything similar that you used to at the end of your queries, because these calls will immediately block the underlying thread

Here are a few examples:

// ReactiveAccountRepository.kt

package com.akobor

import com.akobor.r2dbcdemo.tables.Account.Companion.ACCOUNT
import com.akobor.r2dbcdemo.tables.Address.Companion.ADDRESS
import jakarta.inject.Singleton
import org.jooq.DSLContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono

@Singleton
class ReactiveAccountRepository(private val ctx: DSLContext) {

    fun getAccounts(): Flux<AccountDetailsWithAddress> =
        Flux.from(ctx.getAccountQuery())
        .map { r -> r.into(AccountDetailsWithAddress::class.java) }

    private fun DSLContext.getAccountQuery() =
        select(
            ACCOUNT.ID,
            ACCOUNT.NAME,
            ACCOUNT.DELETED_AT,
            ADDRESS.FULL_ADDRESS
        ).from(ACCOUNT).leftJoin(ADDRESS).on(ADDRESS.ACCOUNT_ID.eq(ACCOUNT.ID))

    fun getAccount(accountId: Long): Mono<AccountDetailsWithAddress> = ctx
        .getAccountQuery()
        .where(ACCOUNT.ID.eq(accountId))
        .toMono().map { r -> r.into(AccountDetailsWithAddress::class.java) }
}

The getAccounts() method wraps a simple select into a Flux and calls a .map() on the items of it, to transform the result into a Kotlin data class called AccountDetailsWithAddress. If you integrate this method into a controller action that also returns a Flux<AccountDetailsWithAddress> then the result of a call to this endpoint will be an array of AccountDetailsWithAddress.

The getAccount(accountId: Long) method is very similar, but it returns only one account (or an empty Mono if there is no account with the given ID), so we can use a Mono here instead of a Flux.

How about transactions?

Handling database transactions with reactive streams can be a bit tricky, because basically you have to pass the same DSLContext through the whole chain of streams, but fortunately it's possible with the most recent versions of jOOQ (from 3.17.*). Look at the following example:

// AccountRepository.kt

fun getAccount(accountId: Long, ctx: DSLContext = this.ctx): Mono<AccountDetailsWithAddress> = ctx
    .getAccountQuery()
    .where(ACCOUNT.ID.eq(accountId))
    .toMono().map { r -> r.into(AccountDetailsWithAddress::class.java) }

fun insertAccountWithAddress(accountWithAddressDto: AccountCreateDto): Mono<AccountDetailsWithAddress> =
    // We initiate a transaction with jOOQ's new reactive transaction API
    Flux.from(ctx.transactionPublisher { trx ->
        trx.dsl()
            .insertInto(ACCOUNT)
            .columns(ACCOUNT.NAME, ACCOUNT.DELETED_AT)
            .values(accountWithAddressDto.name, accountWithAddressDto.deletedAt)
            .returningResult(ACCOUNT.ID)
            .toMono()
            .flatMap { insertedAccount ->
                if (!accountWithAddressDto.fullAddress.isNullOrBlank()) {
                    trx.dsl().insertInto(ADDRESS)
                        .columns(ADDRESS.ACCOUNT_ID, ADDRESS.FULL_ADDRESS)
                        .values(insertedAccount.value1(), accountWithAddressDto.fullAddress)
                        .returningResult(ADDRESS.ACCOUNT_ID).toMono()
                } else insertedAccount.toMono()
            }.flatMap { insertedAccount -> getAccount(insertedAccount.value1()!!, trx.dsl()) }
    }).single()

We had to modify the getAccount() method too, because we have to be able to pass an optional DSLContext from the transaction, otherwise we couldn't get the freshly inserted record (because the transaction will be committed only at the end of the transactionPublisher block).

Looking at the insertAccountWithAddress() method, we can see the following steps:

  1. We start a transaction inside a Flux with the DSLContext.transactionPublisher {} call, and inside this:
  2. We insert an ACCOUNT record and return its ID.
  3. We check if there is an address in the payload, and insert an ADDRESS record as well, or just pass the account's ID to the next step
  4. We get the freshly inserted records from the database to return it as a Mono<AccountDetailsWithAddress>.

If anything goes south while we're doing these things, jOOQ will automatically roll back the actual transaction.

Putting all these stuff together

By writing a simple controller we can easily test these things E2E:

// ReactiveAccountController.kt

package com.akobor

import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@Controller("/reactive/accounts")
class ReactiveAccountController(private val accountRepository: ReactiveAccountRepository) : ReactiveAccountOperations {

    override fun getAccounts(): Flux<AccountDetailsWithAddress> = accountRepository.getAccounts()

    override fun getAccountById(accountId: Long): Mono<AccountDetailsWithAddress> =
        accountRepository.getAccount(accountId)

    override fun createAccount(createDto: AccountCreateDto): Mono<AccountDetailsWithAddress> =
        accountRepository.insertAccountWithAddress(createDto)
}

interface ReactiveAccountOperations {

    @Get("/")
    fun getAccounts(): Flux<AccountDetailsWithAddress>

    @Get("/{accountId}")
    fun getAccountById(accountId: Long): Mono<AccountDetailsWithAddress>

    @Post("/")
    fun createAccount(@Body createDto: AccountCreateDto): Mono<AccountDetailsWithAddress>
}

Testing with Testcontainers

Using a PostgreSQL Testcontainer with R2DBC doesn't really differ from JDBC, fortunately. If you're not familiar with Testcontainers and Micronaut, you should read this first. Assuming that you already have a setup based on the mentioned post, you should do the following changes to the class you used to configure Testcontainers:

// TestDbContainer.kt

package com.akobor

import org.testcontainers.containers.PostgreSQLContainer

class TestDbContainer : PostgreSQLContainer<TestDbContainer>("postgres:13") {
    companion object {
        private lateinit var instance: TestDbContainer

        fun start() {
            if (!Companion::instance.isInitialized) {
                instance = TestDbContainer()
                instance.start()
                val r2dbcUrl = "r2dbc:postgresql://${instance.host}:${instance.firstMappedPort}"

                System.setProperty("r2dbc.datasources.default.url", r2dbcUrl)
                System.setProperty("r2dbc.datasources.default.username", instance.username)
                System.setProperty("r2dbc.datasources.default.password", instance.password)

                System.setProperty("flyway.datasources.default.url", instance.jdbcUrl)
                System.setProperty("flyway.datasources.default.username", instance.username)
                System.setProperty("flyway.datasources.default.password", instance.password)
            }
        }

        fun stop() {
            instance.stop()
        }
    }
}

As you can see, the only magic here is that you have to set the container instance's URL, username and password for both configuration group (r2dbc.datasources.* and flyway.datasources.*). Also, you have to construct the R2DBC url manually.

With this setup now we are able to write E2E tests to see if everything works:

// ReactiveAccountControllerTest.kt

package com.akobor

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.inspectors.forNone
import io.kotest.inspectors.forOne
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.kotest.annotation.MicronautTest
import java.time.LocalDateTime

@MicronautTest
class ReactiveAccountControllerTest(
    accountClient: ReactiveAccountClient, 
    accountRepository: ReactiveAccountRepository
) : DatabaseStringSpec({

    "getting all the accounts from the DB should work E2E" {

        val accounts = accountClient.getAccounts().collectList().block()!!

        accounts shouldHaveSize 3
        accounts.forOne { it.name shouldBe "Adam" }
        accounts.forOne { it.name shouldBe "Regina" }
        accounts.forOne { it.name shouldBe "John" }
    }

    "getting one specific account from the DB should work E2E" {

        val account = accountClient.getAccountById(1).block()!!

        account.id shouldBe 1
        account.deletedAt shouldBe null
        account.fullAddress shouldBe "Some Street in some city 12"
        account.name shouldBe "John"
    }

    "creating a new account should work E2E" {

        val accountWithAddress = AccountCreateDto(
            name = "Test Person 1",
            fullAddress = "Some address",
            deletedAt = LocalDateTime.now()
        )
        val accountWithoutAddress = AccountCreateDto(
            name = "Test Person 1",
            fullAddress = null,
            deletedAt = null
        )
        val createdAccountWithAddress = accountClient.createAccount(accountWithAddress).block()!!
        val createdAccountWithoutAddress = accountClient.createAccount(accountWithoutAddress).block()!!

        createdAccountWithAddress.id shouldNotBe null
        createdAccountWithAddress.name shouldBe accountWithAddress.name
        createdAccountWithAddress.fullAddress shouldBe accountWithAddress.fullAddress
        createdAccountWithAddress.deletedAt shouldBe accountWithAddress.deletedAt

        createdAccountWithoutAddress.id shouldNotBe null
        createdAccountWithoutAddress.name shouldBe accountWithoutAddress.name
        createdAccountWithoutAddress.fullAddress shouldBe null
        createdAccountWithoutAddress.deletedAt shouldBe null
    }

    "if something bad happens during an account creation, it should be rolled back" {

        val accountToCreate = AccountCreateDto(
            name = "Test Person 1",
            // We pass a very long string to force a DB related exception
            fullAddress = "Some address".repeat(100),
            deletedAt = LocalDateTime.now()
        )

        shouldThrow<HttpClientResponseException> { 
            accountClient.createAccount(accountToCreate).block() 
        }

        val accountsInTheDb = accountRepository.getAccounts().collectList().block()!!

        accountsInTheDb.forNone { it.name shouldBe accountToCreate.name }
    }
})

@Client("/reactive/accounts")
interface ReactiveAccountClient : ReactiveAccountOperations

How about Kotlin coroutines? [UPDATE]

It's also possible to use Kotlin coroutines in the repository layer, but you'll have to use Reactor under the hood in this case too. To make this able, you'll need to add the implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.4") dependency to your project. You can see in the following example that the approach works, but you'll need some glue code to make it easier to read:

// CoroutineAccountRepository.kt

package com.akobor

import com.akobor.r2dbcdemo.tables.Account.Companion.ACCOUNT
import com.akobor.r2dbcdemo.tables.Address.Companion.ADDRESS
import jakarta.inject.Singleton
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.jooq.Configuration
import org.jooq.DSLContext
import org.jooq.Record
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

// Glue code
suspend inline fun <reified T : Any, R : Record> Publisher<R>.fetchInto(): List<T> =
    Flux.from(this).map { r -> r.into(T::class.java) }.asFlow().toList()

suspend inline fun <reified T : Any, R : Record> Publisher<R>.fetchOneInto(): T? =
    Mono.from(this).map { r -> r.into(T::class.java) }.awaitSingleOrNull()

suspend fun <T : Any> transactionWithSingleResult(
    ctx: DSLContext,
    transactional: (Configuration) -> Flow<T>
): T = Flux.from(ctx.transactionPublisher { trx -> transactional.invoke(trx).asFlux() }).awaitSingle()


@Singleton
class CoroutineAccountRepository(private val ctx: DSLContext) {

    suspend fun getAccounts(): List<AccountDetailsWithAddress> = ctx.getAccountQuery().fetchInto()

    private fun DSLContext.getAccountQuery() =
        select(
            ACCOUNT.ID,
            ACCOUNT.NAME,
            ACCOUNT.DELETED_AT,
            ADDRESS.FULL_ADDRESS
        ).from(ACCOUNT).leftJoin(ADDRESS).on(ADDRESS.ACCOUNT_ID.eq(ACCOUNT.ID))

    suspend fun getAccount(accountId: Long, ctx: DSLContext = this.ctx): AccountDetailsWithAddress? =
        ctx.getAccountQuery().where(ACCOUNT.ID.eq(accountId)).fetchOneInto()

    suspend fun insertAccountWithAddress(accountWithAddressDto: AccountCreateDto): AccountDetailsWithAddress {
        return transactionWithSingleResult(ctx) { trx ->
            trx.dsl()
                .insertInto(ACCOUNT)
                .columns(ACCOUNT.NAME, ACCOUNT.DELETED_AT)
                .values(accountWithAddressDto.name, accountWithAddressDto.deletedAt)
                .returningResult(ACCOUNT.ID)
                .asFlow()
                .map { insertedAccount ->
                    if (!accountWithAddressDto.fullAddress.isNullOrBlank()) {
                        trx.dsl().insertInto(ADDRESS)
                            .columns(ADDRESS.ACCOUNT_ID, ADDRESS.FULL_ADDRESS)
                            .values(insertedAccount.value1(), accountWithAddressDto.fullAddress)
                            .returningResult(ADDRESS.ACCOUNT_ID)
                            .awaitFirst()
                    } else insertedAccount
                }
                .map { getAccount(it.value1()!!, trx.dsl())!! }
        }
    }
}

If you want to check out the complete PoC, you can find it on my GitHub: https://github.com/adamkobor/micronaut-r2dbc-jooq-flyway-testcontainers-poc


Comments