Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed PostgresDriver (for jvmMain) to correctly make use of the connection pool. #54

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,35 @@ In JDBC, the placeholder would be `?` but with libpq, we will pass `$1`, `$2`, e
This feature implementation tries to follow Spring's `NamedParameterJdbcTemplate` as close as possible.
[NamedParameterJdbcTemplate](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.html)

## Logging

Currently, we don't have implemented the logging layer, we're still thinking on how could implement it.
So, any contributions are welcome.
Although, at the moment, and only for the `jvmTest`, we make use of the `slf4j` libraries in order to make the development process easier.

## Development

### Local Build

By default, this project will attempt to build for all targets. If you have a linux machine and only want to build
the `linuxX64` and `linuxArm64` targets, you can do:
By default, this project will attempt to build for all targets.
If you have a linux machine and only want to build the `linuxX64` and `linuxArm64` targets, you can do:

```shell
./gradlew build -Ptargets=linuxX64,linuxArm64
```

for a macOS:

```shell
./gradlew build -Ptargets=macosArm64
```

Additionally, you can build for JVM:

```shell
./gradlew build -Ptargets=macosArm64,jvm
```

## FAQ

### Two HomeBrews
Expand All @@ -91,4 +109,4 @@ TODO - clarify this better:
2. Two homebrews is good for macosarm and macosX, but it isn't enough for linuxX64
3. For linuxX64 I had to brew install libpq in linux and copy over the files to macos
4. https://discuss.kotlinlang.org/t/how-to-determine-linkeropts-at-build-time/17402/2
5. https://github.com/JetBrains/kotlin-native/issues/1534
5. https://github.com/JetBrains/kotlin-native/issues/1534
15 changes: 6 additions & 9 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ kotlin {
mingwX64("mingwX64")
*/

// android, ios, watchos, tvos, js will never(?) be supported
// Android, ios, watchOS, tvos, js will never(?) be supported.
applyDefaultHierarchyTemplate()
sourceSets {
configureEach {
Expand All @@ -71,14 +71,16 @@ kotlin {
if (chosenTargets.contains("jvm")) {
val jvmMain by getting {
dependencies {
implementation("org.springframework.data:spring-data-r2dbc:3.2.4")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.8.0")
implementation("org.postgresql:r2dbc-postgresql:1.0.4.RELEASE")
implementation("io.r2dbc:r2dbc-pool:1.0.1.RELEASE")
}
}
val jvmTest by getting {
dependencies {
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation("org.slf4j:slf4j-api:2.0.13")
implementation("org.slf4j:slf4j-reload4j:2.0.13")
smyrgeorge marked this conversation as resolved.
Show resolved Hide resolved
implementation("org.postgresql:r2dbc-postgresql:1.0.4.RELEASE")
implementation("org.jetbrains.kotlin:kotlin-test:1.9.23")
}
Expand All @@ -95,11 +97,7 @@ val create by tasks.registering(DockerCreateContainer::class) {
dependsOn(pull)
containerName.set("test")
imageId.set("postgres:15-alpine")
this.envVars.set(
mapOf(
"POSTGRES_PASSWORD" to "postgres"
)
)
envVars.set(mapOf("POSTGRES_PASSWORD" to "postgres"))
hostConfig.portBindings.set(listOf("5678:5432"))

/*
Expand All @@ -109,7 +107,7 @@ val create by tasks.registering(DockerCreateContainer::class) {
healthCheck.timeout.set(1000000000)
healthCheck.retries.set(3)
healthCheck.startPeriod.set(100000000000000)
*/
*/
}
val start by tasks.registering(DockerStartContainer::class) {
dependsOn(create)
Expand Down Expand Up @@ -187,7 +185,6 @@ tasks {

}


java {
targetCompatibility = JavaVersion.VERSION_21
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ interface PostgresDriver {
suspend fun <T> execute(sql: String, paramSource: SqlParameterSource, handler: (ResultSet) -> T): List<T>
suspend fun execute(sql: String, namedParameters: Map<String, Any?> = emptyMap()): Long
suspend fun execute(sql: String, paramSource: SqlParameterSource): Long

/**
* Warm-up the connection pool.
*/
suspend fun warmup()
}
95 changes: 62 additions & 33 deletions src/jvmMain/kotlin/io/github/moreirasantos/pgkn/PostgresDriver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,58 @@ import io.github.moreirasantos.pgkn.paramsource.MapSqlParameterSource
import io.github.moreirasantos.pgkn.paramsource.SqlParameterSource
import io.github.moreirasantos.pgkn.resultset.PostgresResultSet
import io.github.moreirasantos.pgkn.resultset.ResultSet
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE
import io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE
import io.r2dbc.spi.Connection
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactoryOptions.*
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions.DATABASE
import io.r2dbc.spi.ConnectionFactoryOptions.DRIVER
import io.r2dbc.spi.ConnectionFactoryOptions.HOST
import io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD
import io.r2dbc.spi.ConnectionFactoryOptions.PORT
import io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL
import io.r2dbc.spi.ConnectionFactoryOptions.USER
import io.r2dbc.spi.ConnectionFactoryOptions.builder
import io.r2dbc.spi.Result
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirst
import java.util.*


@Suppress("LongParameterList")
suspend fun PostgresDriver(
fun PostgresDriver(
host: String,
port: Int = 5432,
database: String,
user: String,
password: String,
poolSize: Int = 20
): PostgresDriver {

): PostgresDriver = PostgresDriverPool(
host = host,
port = port,
database = database,
user = user,
password = password,
poolSize = poolSize
)

private class PostgresDriverPool(
host: String,
port: Int,
database: String,
user: String,
password: String,
poolSize: Int
) : PostgresDriver {

val connectionFactory = ConnectionFactories.get(
// https://github.com/r2dbc/r2dbc-pool
private val pool: ConnectionFactory = ConnectionFactories.get(
builder()
.option(DRIVER, "pool")
.option(PROTOCOL, "postgresql")
Expand All @@ -36,21 +64,20 @@ suspend fun PostgresDriver(
.option(USER, user)
.option(PASSWORD, password)
.option(DATABASE, database)
.option(INITIAL_SIZE, poolSize)
.option(MAX_SIZE, poolSize)
smyrgeorge marked this conversation as resolved.
Show resolved Hide resolved
.build()
)

val connection = connectionFactory.create().awaitFirst()

return PostgresDriverPool(connection = connection)
}

private class PostgresDriverPool(private val connection: Connection) : PostgresDriver {
override suspend fun <T> execute(sql: String, namedParameters: Map<String, Any?>, handler: (ResultSet) -> T) =
override suspend fun <T> execute(
sql: String,
namedParameters: Map<String, Any?>,
handler: (ResultSet) -> T
): List<T> =
if (namedParameters.isEmpty()) doExecute(sql).handleResults(handler)
else execute(sql, MapSqlParameterSource(namedParameters), handler)

override suspend fun <T> execute(sql: String, paramSource: SqlParameterSource, handler: (ResultSet) -> T) =
override suspend fun <T> execute(sql: String, paramSource: SqlParameterSource, handler: (ResultSet) -> T): List<T> =
doExecute(sql, paramSource).handleResults(handler)

override suspend fun execute(sql: String, namedParameters: Map<String, Any?>): Long =
Expand All @@ -60,28 +87,30 @@ private class PostgresDriverPool(private val connection: Connection) : PostgresD
override suspend fun execute(sql: String, paramSource: SqlParameterSource): Long =
doExecute(sql, paramSource).returnCount()

private fun doExecute(sql: String): Flow<Result> {
return connection.createStatement(sql).execute().asFlow()
override suspend fun warmup() {
// ConnectionFactories.get(..) creates a [ConnectionPool] wrapping an underlying [ConnectionFactory].
(pool as ConnectionPool).warmup().awaitFirst()
}

private fun doExecute(sql: String, paramSource: SqlParameterSource) = (paramSource.parameterNames ?: emptyArray())
.fold(connection.createStatement(sql)) { acc, name ->
paramSource.getValue(name)
?.let { acc.bind(name, it) }
?: acc.bindNull(name, Any::class.java)
}
.execute()
.asFlow()
private suspend fun doExecute(sql: String): Flow<Result> =
pool.create().awaitFirst().createStatement(sql).execute().asFlow()

private suspend fun doExecute(sql: String, paramSource: SqlParameterSource): Flow<Result> =
(paramSource.parameterNames ?: emptyArray()).fold(
pool.create().awaitFirst().createStatement(sql)
) { acc, name ->
paramSource.getValue(name)?.let { acc.bind(name, it) }
?: acc.bindNull(name, Any::class.java)
}.execute().asFlow()

// Await First and toList both suspend?
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun <T> Flow<Result>.handleResults(handler: (ResultSet) -> T) = flatMapConcat {
return@flatMapConcat it.map { row -> Optional.ofNullable(handler(PostgresResultSet(row))) }.asFlow()
}.map { it.orElse(null) }.toList()
private suspend fun <T> Flow<Result>.handleResults(handler: (ResultSet) -> T): List<T> =
flatMapConcat { it.map { row -> Optional.ofNullable(handler(PostgresResultSet(row))) }.asFlow() }
.map { it.orElse(null) }.toList()

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun Flow<Result>.returnCount() = flatMapConcat {
it.rowsUpdated.asFlow()
}.fold(0L) { accumulator, value -> accumulator + value }

private suspend fun Flow<Result>.returnCount(): Long =
flatMapConcat { it.rowsUpdated.asFlow() }
.fold(0L) { accumulator, value -> accumulator + value }
}
9 changes: 9 additions & 0 deletions src/jvmTest/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.github.moreirasantos.pgkn.sql.parseSql
import io.github.moreirasantos.pgkn.sql.substituteNamedParameters
import kotlinx.cinterop.*
import libpq.*

@Suppress("LongParameterList")
@OptIn(ExperimentalForeignApi::class)
fun PostgresDriver(
Expand Down Expand Up @@ -62,6 +63,10 @@ private class PostgresDriverPool(

override suspend fun execute(sql: String, paramSource: SqlParameterSource) =
pool.invoke { it.execute(sql, paramSource) }

override suspend fun warmup() {
// Intentionally left empty since the default behaviour warmups the connection pool.
}
}

internal sealed interface PostgresDriverUnit {
Expand Down
Loading