Skip to content

Commit

Permalink
Go all in on r2dbc
Browse files Browse the repository at this point in the history
  • Loading branch information
mattupstate committed Dec 8, 2023
1 parent 4f09e20 commit deea631
Show file tree
Hide file tree
Showing 43 changed files with 977 additions and 957 deletions.
6 changes: 4 additions & 2 deletions acme-data/acme-data-scheduling/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ dependencies {

testImplementation(project(":acme-lib:acme-lib-liquibase"))
testImplementation(libs.com.zaxxer.hikariCP)
testImplementation(libs.io.kotest.kotest.runner.junit5)
testImplementation(libs.org.postgresql)
testImplementation(libs.org.slf4j.slf4j.api)
testImplementation(libs.org.postgresql.r2dbc)
testImplementation(libs.org.slf4j.slf4j.simple)
testImplementation(libs.org.testcontainers.postgresql)
testImplementation(libs.io.kotest.kotest.runner.junit5)
testImplementation(libs.org.testcontainers.r2dbc)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData
import com.acme.jooq.asExcluded
import com.acme.scheduling.Appointment
import com.acme.sql.scheduling.tables.references.APPOINTMENTS
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.jooq.DSLContext
Expand All @@ -20,10 +22,10 @@ class JooqAppointmentAggregateRepository(
private val clock: Clock = Clock.systemUTC()
) : AggregateRepository<Appointment, Appointment.Id> {

override fun find(id: Appointment.Id): PersistedAggregate<Appointment>? =
override suspend fun find(id: Appointment.Id): PersistedAggregate<Appointment>? =
dsl.selectFrom(APPOINTMENTS)
.where(APPOINTMENTS.ID.eq(id.value))
.fetchOne {
.awaitFirstOrNull()?.let {
PersistedAggregate(
aggregate = Json.decodeFromString(it.aggregate!!.data()),
metaData = PersistenceMetaData(
Expand All @@ -34,15 +36,16 @@ class JooqAppointmentAggregateRepository(
)
}

override fun get(id: Appointment.Id): PersistedAggregate<Appointment> = getOrThrow(id) { NoSuchElementException() }
override suspend fun get(id: Appointment.Id): PersistedAggregate<Appointment> =
getOrThrow(id) { NoSuchElementException() }

override fun getOrThrow(id: Appointment.Id, block: () -> Throwable): PersistedAggregate<Appointment> =
override suspend fun getOrThrow(id: Appointment.Id, block: () -> Throwable): PersistedAggregate<Appointment> =
find(id) ?: throw block()

override fun exists(id: Appointment.Id): Boolean =
dsl.fetchExists(APPOINTMENTS, APPOINTMENTS.ID.eq(id.value))
override suspend fun exists(id: Appointment.Id): Boolean =
dsl.selectOne().from(APPOINTMENTS).where(APPOINTMENTS.ID.eq(id.value)).awaitFirstOrNull() != null

override fun save(aggregate: Appointment) {
override suspend fun save(aggregate: Appointment) {
val now = LocalDateTime.ofInstant(Instant.now(clock), ZoneOffset.UTC)
val json = JSONB.valueOf(Json.encodeToString(aggregate))

Expand All @@ -65,6 +68,7 @@ class JooqAppointmentAggregateRepository(
.set(APPOINTMENTS.AGGREGATE, APPOINTMENTS.AGGREGATE.asExcluded())
.set(APPOINTMENTS.REVISION, APPOINTMENTS.REVISION.add(1))
.set(APPOINTMENTS.UPDATED_AT, now)
.execute()
.returning()
.awaitFirst()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData
import com.acme.jooq.asExcluded
import com.acme.scheduling.Client
import com.acme.sql.scheduling.tables.references.CLIENTS
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.jooq.DSLContext
Expand All @@ -18,10 +20,10 @@ class JooqClientAggregateRepository(
private val clock: Clock = Clock.systemUTC()
) : AggregateRepository<Client, Client.Id> {

override fun find(id: Client.Id): PersistedAggregate<Client>? =
override suspend fun find(id: Client.Id): PersistedAggregate<Client>? =
dsl.selectFrom(CLIENTS)
.where(CLIENTS.ID.eq(id.value))
.fetchOne {
.awaitFirstOrNull()?.let {
PersistedAggregate(
aggregate = Json.decodeFromString(it.aggregate!!.data()),
metaData = PersistenceMetaData(
Expand All @@ -32,15 +34,15 @@ class JooqClientAggregateRepository(
)
}

override fun get(id: Client.Id): PersistedAggregate<Client> = getOrThrow(id) { NoSuchElementException() }
override suspend fun get(id: Client.Id): PersistedAggregate<Client> = getOrThrow(id) { NoSuchElementException() }

override fun getOrThrow(id: Client.Id, block: () -> Throwable): PersistedAggregate<Client> =
override suspend fun getOrThrow(id: Client.Id, block: () -> Throwable): PersistedAggregate<Client> =
find(id) ?: throw block()

override fun exists(id: Client.Id): Boolean =
dsl.fetchExists(CLIENTS, CLIENTS.ID.eq(id.value))
override suspend fun exists(id: Client.Id): Boolean =
dsl.selectOne().from(CLIENTS).where(CLIENTS.ID.eq(id.value)).awaitFirstOrNull() != null

override fun save(aggregate: Client) {
override suspend fun save(aggregate: Client) {
val now = LocalDateTime.now(clock)
val json = JSONB.valueOf(Json.encodeToString(aggregate))

Expand All @@ -63,6 +65,7 @@ class JooqClientAggregateRepository(
.set(CLIENTS.AGGREGATE, CLIENTS.AGGREGATE.asExcluded())
.set(CLIENTS.REVISION, CLIENTS.REVISION.add(1))
.set(CLIENTS.UPDATED_AT, now)
.execute()
.returning()
.awaitFirst()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData
import com.acme.jooq.asExcluded
import com.acme.scheduling.Practice
import com.acme.sql.scheduling.tables.Practices.Companion.PRACTICES
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.jooq.DSLContext
Expand All @@ -18,10 +20,10 @@ class JooqPracticeAggregateRepository(
private val clock: Clock = Clock.systemUTC()
) : AggregateRepository<Practice, Practice.Id> {

override fun find(id: Practice.Id): PersistedAggregate<Practice>? =
override suspend fun find(id: Practice.Id): PersistedAggregate<Practice>? =
dsl.selectFrom(PRACTICES)
.where(PRACTICES.ID.eq(id.value))
.fetchOne {
.awaitFirstOrNull()?.let {
PersistedAggregate(
aggregate = Json.decodeFromString(it.aggregate!!.data()),
metaData = PersistenceMetaData(
Expand All @@ -32,15 +34,15 @@ class JooqPracticeAggregateRepository(
)
}

override fun get(id: Practice.Id): PersistedAggregate<Practice> = getOrThrow(id) { NoSuchElementException() }
override suspend fun get(id: Practice.Id): PersistedAggregate<Practice> = getOrThrow(id) { NoSuchElementException() }

override fun getOrThrow(id: Practice.Id, block: () -> Throwable): PersistedAggregate<Practice> =
override suspend fun getOrThrow(id: Practice.Id, block: () -> Throwable): PersistedAggregate<Practice> =
find(id) ?: throw block()

override fun exists(id: Practice.Id): Boolean =
dsl.fetchExists(PRACTICES, PRACTICES.ID.eq(id.value))
override suspend fun exists(id: Practice.Id): Boolean =
dsl.selectOne().from(PRACTICES).where(PRACTICES.ID.eq(id.value)).awaitFirstOrNull() != null

override fun save(aggregate: Practice) {
override suspend fun save(aggregate: Practice) {
val now = LocalDateTime.now(clock)
val json = JSONB.valueOf(Json.encodeToString(aggregate))

Expand All @@ -63,6 +65,7 @@ class JooqPracticeAggregateRepository(
.set(PRACTICES.AGGREGATE, PRACTICES.AGGREGATE.asExcluded())
.set(PRACTICES.REVISION, PRACTICES.REVISION.add(1))
.set(PRACTICES.UPDATED_AT, now)
.execute()
.returning()
.awaitFirst()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.acme.core.PersistenceMetaData
import com.acme.jooq.asExcluded
import com.acme.scheduling.Practitioner
import com.acme.sql.scheduling.tables.Practitioners.Companion.PRACTITIONERS
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.jooq.DSLContext
Expand All @@ -18,10 +20,10 @@ class JooqPractitionerAggregateRepository(
private val clock: Clock = Clock.systemUTC()
) : AggregateRepository<Practitioner, Practitioner.Id> {

override fun find(id: Practitioner.Id): PersistedAggregate<Practitioner>? =
override suspend fun find(id: Practitioner.Id): PersistedAggregate<Practitioner>? =
dsl.selectFrom(PRACTITIONERS)
.where(PRACTITIONERS.ID.eq(id.value))
.fetchOne {
.awaitFirstOrNull()?.let {
PersistedAggregate(
aggregate = Json.decodeFromString(it.aggregate!!.data()),
metaData = PersistenceMetaData(
Expand All @@ -32,15 +34,16 @@ class JooqPractitionerAggregateRepository(
)
}

override fun get(id: Practitioner.Id): PersistedAggregate<Practitioner> = getOrThrow(id) { NoSuchElementException() }
override suspend fun get(id: Practitioner.Id): PersistedAggregate<Practitioner> =
getOrThrow(id) { NoSuchElementException() }

override fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): PersistedAggregate<Practitioner> =
override suspend fun getOrThrow(id: Practitioner.Id, block: () -> Throwable): PersistedAggregate<Practitioner> =
find(id) ?: throw block()

override fun exists(id: Practitioner.Id): Boolean =
dsl.fetchExists(PRACTITIONERS, PRACTITIONERS.ID.eq(id.value))
override suspend fun exists(id: Practitioner.Id): Boolean =
dsl.selectOne().from(PRACTITIONERS).where(PRACTITIONERS.ID.eq(id.value)).awaitFirstOrNull() != null

override fun save(aggregate: Practitioner) {
override suspend fun save(aggregate: Practitioner) {
val now = LocalDateTime.now(clock)
val json = JSONB.valueOf(Json.encodeToString(aggregate))

Expand All @@ -63,6 +66,7 @@ class JooqPractitionerAggregateRepository(
.set(PRACTITIONERS.AGGREGATE, PRACTITIONERS.AGGREGATE.asExcluded())
.set(PRACTITIONERS.REVISION, PRACTITIONERS.REVISION.add(1))
.set(PRACTITIONERS.UPDATED_AT, now)
.execute()
.returning()
.awaitFirst()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import java.time.LocalDateTime

class JooqAppointmentAggregateRepositoryTest : ShouldSpec({

val jooq = listener(TestDatabaseListener())
val now = LocalDateTime.now()

val appointment = Appointment(
Expand All @@ -30,7 +29,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({
)

should("save new aggregate") {
jooq.testTransaction {
testTransaction {
val time = timeFixtureFactory()
val repo = JooqAppointmentAggregateRepository(it.dsl(), time.clock)
repo.save(appointment)
Expand All @@ -45,7 +44,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({
}

should("update existing aggregate and increment revision") {
jooq.testTransaction {
testTransaction {
val createTime = timeFixtureFactory()
val createRepo = JooqAppointmentAggregateRepository(it.dsl(), createTime.clock)
createRepo.save(appointment)
Expand All @@ -69,7 +68,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({
}

should("throw NoSuchElementException") {
jooq.testTransaction {
testTransaction {
val repo = JooqAppointmentAggregateRepository(it.dsl())
shouldThrow<NoSuchElementException> {
repo.get(appointment.id)
Expand All @@ -78,7 +77,7 @@ class JooqAppointmentAggregateRepositoryTest : ShouldSpec({
}

should("throw user supplied exception") {
jooq.testTransaction {
testTransaction {
val repo = JooqAppointmentAggregateRepository(it.dsl())
shouldThrow<FakeException> {
repo.getOrThrow(appointment.id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import io.kotest.matchers.shouldBe

class JooqClientAggregateRepositoryTest : ShouldSpec({

val jooq = listener(TestDatabaseListener())

val client = Client(
id = Client.Id("Client123"),
names = setOf(
Expand All @@ -29,7 +27,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({
)

should("should save new aggregate") {
jooq.testTransaction {
testTransaction {
val time: TimeFixture = timeFixtureFactory()
val repo = JooqClientAggregateRepository(it.dsl(), time.clock)
repo.save(client)
Expand All @@ -44,7 +42,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({
}

should("update existing aggregate and increment revision") {
jooq.testTransaction {
testTransaction {
val createTime: TimeFixture = timeFixtureFactory()
val createRepo = JooqClientAggregateRepository(it.dsl(), createTime.clock)
createRepo.save(client)
Expand All @@ -63,7 +61,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({
}

should("should throw NoSuchElementException") {
jooq.testTransaction {
testTransaction {
val repo = JooqClientAggregateRepository(it.dsl())
shouldThrow<NoSuchElementException> {
repo.get(client.id)
Expand All @@ -72,7 +70,7 @@ class JooqClientAggregateRepositoryTest : ShouldSpec({
}

should("should throw user supplied exception") {
jooq.testTransaction {
testTransaction {
val repo = JooqClientAggregateRepository(it.dsl())
shouldThrow<FakeException> {
repo.getOrThrow(client.id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import io.kotest.matchers.shouldBe

class JooqPracticeAggregateRepositoryTest : ShouldSpec({

val jooq = listener(TestDatabaseListener())

val practice = Practice(
id = Practice.Id("PracticeID"),
name = Practice.Name("Practice & Associates"),
Expand All @@ -28,7 +26,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({
)

should("save new aggregate") {
jooq.testTransaction {
testTransaction {
val time: TimeFixture = timeFixtureFactory()
val repo = JooqPracticeAggregateRepository(it.dsl(), time.clock)
repo.save(practice)
Expand All @@ -43,7 +41,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({
}

should("update an existing aggregate and increment revision") {
jooq.testTransaction {
testTransaction {
val createTime = timeFixtureFactory()
val createRepo = JooqPracticeAggregateRepository(it.dsl(), createTime.clock)
createRepo.save(practice)
Expand All @@ -64,7 +62,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({
}

should("throw NoSuchElementException") {
jooq.testTransaction {
testTransaction {
val repo = JooqPracticeAggregateRepository(it.dsl())
shouldThrow<NoSuchElementException> {
repo.get(practice.id)
Expand All @@ -73,7 +71,7 @@ class JooqPracticeAggregateRepositoryTest : ShouldSpec({
}

should("throw user supplied exception") {
jooq.testTransaction {
testTransaction {
val repo = JooqPracticeAggregateRepository(it.dsl())
shouldThrow<FakeException> {
repo.getOrThrow(practice.id) {
Expand Down
Loading

0 comments on commit deea631

Please sign in to comment.