Akka-persistence-jdbc is a plugin for akka-persistence that writes journal entries to a configured JDBC database. It supports writing journal messages and snapshots to two tables: the 'journal' table and the 'snapshot' table.
By setting the appropriate Journal and SnapshotStore classes in the application.conf, you can choose the following databases
- H2 (tested, works on 1.4.179)
- Postgresql (tested, works on v9.3.1)
- MySql (tested, works on 5.6.19 MySQL Community Server (GPL))
- Oracle XE (tested, works on Oracle XE 11g r2)
Note: The plugin should work on other databases too. When you wish to have support for a database, contact me and we can work something out.
Start of Disclaimer:
This plugin should not be used in production, ever! For a good, stable and scalable solution use Apache Cassandra with the akka-persistence-cassandra plugin Only use this plug-in for study projects and proof of concepts. Please use Docker and spotify/cassandra You have been warned!
End of Disclaimer
To include the JDBC plugin into your sbt project, add the following lines to your build.sbt file:
resolvers += "dnvriend at bintray" at "http://dl.bintray.com/dnvriend/maven"
libraryDependencies += "com.github.dnvriend" %% "akka-persistence-jdbc" % "1.1.1"
For Maven users, add the following to the pom.xml
<dependency>
<groupId>com.github.dnvriend</groupId>
<artifactId>akka-persistence-jdbc_2.10</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.github.dnvriend</groupId>
<artifactId>akka-persistence-jdbc_2.11</artifactId>
<version>1.1.1</version>
</dependency>
Add the following to the repositories section of the pom:
<repository>
<snapshots><enabled>false</enabled></snapshots>
<id>central</id>
<name>bintray</name>
<url>http://dl.bintray.com/dnvriend/maven</url>
</repository>
- ScalikeJDBC 2.2.2 -> 2.2.4
- Java 8 binary, so it needs Java 8, you still use Java 6 or 7, upgrade! :P
- Using the much faster Java8 java.util.Base64 encoder/decoder
- Bulk insert for journal entries (non-oracle only, sorry)
- Initial support for JNDI, needs testing though
- Merged Paul Roman Fix typo in journal log message #14, thanks!
- Merged Pavel Boldyrev Fix MS SQL Server support #15 (can not test it though, needs Vagrant), thanks!
- Merged Pavel Boldyrev Fix Oracle SQL
MERGE
statement usage #13 which fixes issue #9 (java.sql.SQLRecoverableException: No more data to read from socket #9), thanks! - Change to the Oracle schema, it needs a stored procedure definition.
- ScalikeJDBC 2.1.2 -> 2.2.2
- Merged miguel-vila Adds ´validationQuery´ configuration parameter #10, thanks!
- Removed Informix support: I just don't have a working Informix docker image (maybe someone can create one and publish it?)
- ScalikeJDBC 2.1.1 -> 2.1.2
- Moved to bintray
- Merged mwkohout fix using Oracle's MERGE on issue #3, thanks!
- Fixed - Issue3: Handling save attempts with duplicate snapshot ids and persistence ids
- Fixed - Issue5: Connection pool is being redefined when using journal and snapshot store
- Akka 2.3.5 -> 2.3.6
- ScalikeJDBC 2.1.0 -> 2.1.1
- Added schema name configuration for the journal and snapshot
- Added table name configuration for the journal and snapshot
- ScalikeJDBC 2.0.5 -> 2.1.0
- Akka 2.3.4 -> 2.3.5
- IBM Informix 12.10 supported
- Oracle XE 11g supported
- scalikejdbc 2.0.4 -> 2.0.5
- akka-persistence-testkit 0.3.3 -> 0.3.4
- Release to Maven Central
- Tested against MySQL/5.6.19 MySQL Community Server (GPL)
- Tested against H2/1.4.179
- Added the snapshot store
- Refactored the JdbcSyncWriteJournal so it supports the following databases:
- Using Martin Krasser's akka-persistence-testkit to test the akka-persistence-jdbc plugin.
- Update to Akka 2.3.4
- Using ScalikeJDBC as the JDBC access library instead of my home-brew one.
- Initial commit
In application.conf place the following:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.PostgresqlSyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.PostgresqlSyncSnapshotStore"
}
jdbc-connection {
username ="admin"
password = "admin"
driverClassName = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/mydb"
journalSchemaName = "public"
journalTableName = "journal"
snapshotSchemaName = "public"
snapshotTableName = "snapshot"
}
Note: You can drop the schema name (username/database name) part of the query by setting the schema names to "", the query that runs will not have the . format but only the tablename, like so:
jdbc-connection {
journalSchemaName = ""
snapshotSchemaName = ""
}
validationQuery: You can add the validationQuery key to the jdbc-connection so the connection pool will execute this query from time to time so the database connection will not time out when not in use. The example below can be used with the Oracle database. When you do not use the validationQuery, please remove the key:
jdbc-connection {
validationQuery = "select 1 from dual"
}
The jdbc-journal and jdbc-snapshot-store sections are optional. The defaults are the PostgresqlSyncWriteJournal and the PostgresqlSyncSnapshotStore.
To alter the database dialect for the journal, change the class to:
class = "akka.persistence.jdbc.journal.PostgresqlSyncWriteJournal"
class = "akka.persistence.jdbc.journal.MysqlSyncWriteJournal"
class = "akka.persistence.jdbc.journal.H2SyncWriteJournal"
class = "akka.persistence.jdbc.journal.OracleSyncWriteJournal"
To alter the database dialect of the snapshot-store, change the class to:
class = "akka.persistence.jdbc.snapshot.PostgresqlSyncSnapshotStore"
class = "akka.persistence.jdbc.snapshot.MysqlSyncSnapshotStore"
class = "akka.persistence.jdbc.snapshot.H2SyncSnapshotStore"
class = "akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore"
JNDI (UNTESTED):
ScalikeJDBC has support for JNDI connections, and using that example it should be as easy as adding the jndiPath to the configuration with the dataSourceName, and off you go:
jdbc-connection {
journalSchemaName = "public"
journalTableName = "journal"
snapshotSchemaName = "public"
snapshotTableName = "snapshot"
jndiPath = ""
dataSourceName = ""
}
The following schema works on H2
CREATE TABLE IF NOT EXISTS public.journal (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS public.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
The following schema works on Postgresql
CREATE TABLE IF NOT EXISTS public.journal (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS public.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
The following schema works on MS SQL Server
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'dbo.journal') AND type in (N'U'))
CREATE TABLE dbo.journal (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message NVARCHAR(MAX) NOT NULL,
created DATETIME NOT NULL,
PRIMARY KEY (persistence_id, sequence_number)
)
GO
IF NOT EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'dbo.snapshot') AND type in (N'U'))
CREATE TABLE dbo.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot NVARCHAR(MAX) NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
)
GO
Note: Please set the schema names to "dbo" like so:
journalSchemaName = "dbo"
journalTableName = "journal"
snapshotSchemaName = "dbo"
snapshotTableName = "snapshot"
The following schema works on MySQL
CREATE TABLE IF NOT EXISTS journal (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE TABLE IF NOT EXISTS snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
Note: Please set the schema names to "" like so:
journalSchemaName = ""
journalTableName = "journal"
snapshotSchemaName = ""
snapshotTableName = "snapshot"
When you wish to use the schemaName for the database name, then fill the schema names to be the database name
The following schema works on Oracle
CREATE TABLE public.journal (
persistence_id VARCHAR(255) NOT NULL,
sequence_number NUMERIC NOT NULL,
marker VARCHAR(255) NOT NULL,
message CLOB NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE TABLE public.snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr NUMERIC NOT NULL,
snapshot CLOB NOT NULL,
created NUMERIC NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
CREATE OR REPLACE PROCEDURE sp_save_snapshot(
p_persistence_id IN VARCHAR,
p_sequence_nr IN NUMERIC,
p_snapshot IN CLOB,
p_created IN NUMERIC
) IS
BEGIN
INSERT INTO public.snapshot (persistence_id, sequence_nr, snapshot, created)
VALUES
(p_persistence_id, p_sequence_nr, p_snapshot, p_created);
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
UPDATE public.snapshot
SET snapshot = p_snapshot, created = p_created
WHERE persistence_id = p_persistence_id AND sequence_nr = p_sequence_nr;
END;
Note1:
Name of the "snapshots" table is also specified in sp_save_snapshot
body. If you change table names don't forget to update SP as well.
Note2: Oracle schemas are users really. To use a username for the schema name, set the configuration like:
journalSchemaName = "system"
journalTableName = "journal"
snapshotSchemaName = "system"
snapshotTableName = "snapshot"
To not use schema names, set the configuration to:
journalSchemaName = ""
journalTableName = "journal"
snapshotSchemaName = ""
snapshotTableName = "snapshot"
The application.conf for Postgresql should be:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.PostgresqlSyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.PostgresqlSyncSnapshotStore"
}
jdbc-connection {
username ="admin"
password = "admin"
driverClassName = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/mydb"
journalSchemaName = "public"
journalTableName = "journal"
snapshotSchemaName = "public"
snapshotTableName = "snapshot"
}
The application.conf for MS SQL Server should be:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.MSSqlServerSyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.MSSqlServerSyncSnapshotStore"
}
mssql {
host = "localhost"
port = "1433"
database = "master"
}
jdbc-connection {
username = "sa"
password = "password"
driverClassName = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = "jdbc:sqlserver://"${mssql.host}":"${mssql.port}";databaseName=${mssql.database}"
journalSchemaName = "dbo"
journalTableName = "journal"
snapshotSchemaName = "dbo"
snapshotTableName = "snapshot"
validationQuery = "select 1"
}
The application.conf for MySQL should be:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.MysqlSyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.MysqlSyncSnapshotStore"
}
jdbc-connection {
username ="admin"
password = "admin"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost/test"
journalSchemaName = ""
journalTableName = "journal"
snapshotSchemaName = ""
snapshotTableName = "snapshot"
}
The application.conf for H2 should be:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.H2SyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.H2SyncSnapshotStore"
}
jdbc-connection {
username ="sa"
password = ""
driverClassName = "org.h2.Driver"
url = "jdbc:h2:mem:test;MODE=PostgreSQL;DB_CLOSE_DELAY=-1"
journalSchemaName = "public"
journalTableName = "journal"
snapshotSchemaName = "public"
snapshotTableName = "snapshot"
}
The application.conf for Oracle should be:
akka {
persistence {
journal.plugin = "jdbc-journal"
snapshot-store.plugin = "jdbc-snapshot-store"
}
}
jdbc-journal {
class = "akka.persistence.jdbc.journal.OracleSyncWriteJournal"
}
jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore"
}
jdbc-connection {
username ="system"
password = "oracle"
driverClassName = "oracle.jdbc.OracleDriver"
url = "jdbc:oracle:thin:@//192.168.99.99:49161/xe"
journalSchemaName = "system"
journalTableName = "journal"
snapshotSchemaName = "system"
snapshotTableName = "snapshot"
}
- Install boot2docker for your operating system (I use OSX)
- Add the shell integration configuration
- Execute the appropriate test shell script