Skip to content

Commit

Permalink
Merge pull request #34 from ambarltd/sql-server-config
Browse files Browse the repository at this point in the history
Add SQLServer config options
  • Loading branch information
lazamar authored Dec 12, 2024
2 parents 4fcbfff + 065b546 commit bbe5b61
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 39 deletions.
32 changes: 26 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ A YAML configuration file should describe all sources and destinations to be use
data_sources:

# Connect to a PostgreSQL database
- id: my_postgres_source
- id: postgres_source
description: Main events store
type: postgres
host: localhost
Expand All @@ -36,7 +36,7 @@ data_sources:
partitioningColumn: aggregate_id

# Connect to a MySQL database
- id: my_mysql_source
- id: postgres_source
description: Main events store
type: mysql
host: localhost
Expand All @@ -53,6 +53,24 @@ data_sources:
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Connect to an SQLServer database
- id: sqlserver_source
description: Main events store
type: sqlserver
host: localhost
port: 1433
username: my_user
password: my_pass
database: my_db
table: events_table
columns:
- id
- aggregate_id
- sequence_number
- payload
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Connections to your endpoint.
# The Emulator will send data read from the databases to these endpoints.
data_destinations:
Expand All @@ -66,8 +84,9 @@ data_destinations:
password: password123

sources:
- my_mysql_source
- my_postgres_source
- postgres_source
- sqlserver_source
- file_source

# Send data to a file. One entry per line.
- id: file_destination
Expand All @@ -76,8 +95,9 @@ data_destinations:
path: ./temp.file

sources:
- my_mysql_source
- my_postgres_source
- postgres_source
- sqlserver_source
- file_source
```
## Running the program
Expand Down
24 changes: 22 additions & 2 deletions examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ data_sources:
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Connect to an SQLServer database
- id: sqlserver_source
description: Main events store
type: sqlserver
host: localhost
port: 1433
username: my_user
password: my_pass
database: my_db
table: events_table
columns:
- id
- aggregate_id
- sequence_number
- payload
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Connections to your endpoint.
# The Emulator will send data read from the databases to these endpoints.
data_destinations:
Expand All @@ -51,8 +69,9 @@ data_destinations:
password: password123

sources:
- postgres source
- file source
- postgres_source
- sqlserver_source
- file_source

# Send data to a file. One entry per line.
- id: file_destination
Expand All @@ -62,4 +81,5 @@ data_destinations:

sources:
- postgres_source
- sqlserver_source
- file_source
12 changes: 10 additions & 2 deletions src/Ambar/Emulator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import GHC.Generics (Generic)
import System.Directory (doesFileExist)
import System.FilePath ((</>))

import Ambar.Emulator.Connector.Postgres (PostgreSQLState)
import Ambar.Emulator.Connector.MySQL (MySQLState)
import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder)
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState)
import Ambar.Emulator.Connector.MySQL (MySQLState)
import Ambar.Emulator.Connector.Postgres (PostgreSQLState)

import qualified Ambar.Emulator.Projector as Projector
import Ambar.Emulator.Projector (Projection(..))
Expand Down Expand Up @@ -96,6 +97,7 @@ emulate logger_ config env = do
case s_source source of
SourcePostgreSQL _ -> StatePostgres def
SourceMySQL _ -> StateMySQL def
SourceSQLServer _ -> StateSQLServer def
SourceFile _ -> StateFile ()

projectAll queue = forConcurrently_ (c_destinations env) (project queue)
Expand Down Expand Up @@ -134,6 +136,7 @@ newtype EmulatorState = EmulatorState
data SavedState
= StatePostgres PostgreSQLState
| StateMySQL MySQLState
| StateSQLServer SQLServerState
| StateFile ()
deriving (Generic)
deriving anyclass (ToJSON, FromJSON)
Expand All @@ -160,6 +163,11 @@ toConnectorConfig source sstate =
StateMySQL state ->
return $ ConnectorConfig source msql state StateMySQL
_ -> incompatible
SourceSQLServer sqlserver ->
case sstate of
StateSQLServer state ->
return $ ConnectorConfig source sqlserver state StateSQLServer
_ -> incompatible
SourceFile path ->
case sstate of
StateFile () ->
Expand Down
15 changes: 15 additions & 0 deletions src/Ambar/Emulator/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Yaml as Yaml

import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..))
import Ambar.Emulator.Connector.Postgres (PostgreSQL(..))
import Ambar.Emulator.Connector.MySQL (MySQL(..))
import Ambar.Emulator.Connector.File (FileConnector(..))
Expand Down Expand Up @@ -60,6 +61,7 @@ data Source
= SourceFile FileConnector
| SourcePostgreSQL PostgreSQL
| SourceMySQL MySQL
| SourceSQLServer SQLServer

data DataDestination = DataDestination
{ d_id :: Id DataDestination
Expand Down Expand Up @@ -104,6 +106,7 @@ instance FromJSON DataSource where
case t of
"postgres" -> parsePostgreSQL o
"mysql" -> parseMySQL o
"sqlserver" -> parseSQLServer o
"file" -> parseFile o
_ -> fail $ unwords
[ "Invalid data source type: '" <> t <> "'."
Expand Down Expand Up @@ -135,6 +138,18 @@ instance FromJSON DataSource where
c_incrementingColumn <- o .: "autoIncrementingColumn"
return $ SourceMySQL MySQL{..}

parseSQLServer o = do
c_host <- o .: "host"
c_port <- o .: "port"
c_username <- o .: "username"
c_password <- o .: "password"
c_database <- o .: "database"
c_table <- o .: "table"
c_columns <- o .: "columns"
c_partitioningColumn <- o .: "partitioningColumn"
c_incrementingColumn <- o .: "autoIncrementingColumn"
return $ SourceSQLServer SQLServer{..}

parseFile o = SourceFile . FileConnector <$> (o .: "path")

parseDataDestination
Expand Down
7 changes: 7 additions & 0 deletions src/Ambar/Emulator/Projector.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import GHC.Generics (Generic)
import Ambar.Emulator.Config (Id(..), DataDestination, DataSource(..), Source(..))
import Ambar.Emulator.Queue.Topic (Topic, ReadError(..), PartitionCount(..))
import qualified Ambar.Emulator.Queue.Topic as Topic
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..))
import Ambar.Emulator.Connector.Postgres (PostgreSQL(..))
import Ambar.Emulator.Connector.MySQL (MySQL(..))
import Ambar.Transport (Transport)
Expand Down Expand Up @@ -119,6 +120,12 @@ relevantFields source (Payload value) = renderPretty $
| field <- [c_serialColumn, c_partitioningColumn]
, Just v <- [KeyMap.lookup (fromString $ Text.unpack field) o]
]
SourceSQLServer SQLServer{..} ->
fillSep $
[ pretty field <> ":" <+> prettyJSON v
| field <- [c_incrementingColumn, c_partitioningColumn]
, Just v <- [KeyMap.lookup (fromString $ Text.unpack field) o]
]
where
withObject f =
case value of
Expand Down
40 changes: 13 additions & 27 deletions tests/Test/Utils/Docker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ module Test.Utils.Docker
, withDocker
) where

import Control.Concurrent.Async (race, wait, withAsync)
import Control.Concurrent (MVar, newMVar, modifyMVar)
import Control.Concurrent.Async (race_)
import Control.Exception (ErrorCall(..), throwIO)
import Control.Monad (forM_)
import System.Exit (ExitCode(..))
Expand All @@ -24,15 +22,13 @@ import System.Process
( CreateProcess(..)
, StdStream(..)
, proc
, waitForProcess
, getProcessExitCode
, createPipe
, cleanupProcess
, terminateProcess
, withCreateProcess
)
import System.IO.Unsafe (unsafePerformIO)
import Utils.Async (withAsyncThrow)
import Utils.Delay (delay, seconds)
import Utils.Delay (seconds, every)

data DockerCommand
= DockerRun
Expand Down Expand Up @@ -64,34 +60,24 @@ withDocker debug tag cmd act =
}
withCreateProcess create $ \stdin stdout stderr p -> do
let pinfo = (stdin, stdout, stderr, p)
withAsync (waitFor name pinfo) $ \a -> do
r <- race (wait a) $ if debug
then tracing name hread act
else act hread
terminate pinfo
case r of
Left _ -> error "impossible"
Right v -> return v
withAsyncThrow (waitFor name pinfo) $ do
if debug
then tracing name hread act
else act hread
where
terminate pinfo = race_ (cleanupProcess pinfo) (forceEnd pinfo)

forceEnd (_,_,_,p) = do
putStrLn $ "Forcing end of container: " <> tag
delay (seconds 5)
terminateProcess p

withPipe f = do
(hread, hwrite) <- createPipe
hSetBuffering hread LineBuffering
hSetBuffering hwrite LineBuffering
f hread hwrite

waitFor name (_,_,_,p) = do
exit <- waitForProcess p
throwIO $ ErrorCall $ case exit of
ExitSuccess -> "unexpected successful termination of container " <> name
ExitFailure code ->
"docker failed with exit code" <> show code <> " for container " <> name
waitFor name (_,_,_,p) = every (seconds 1) $ do
mexit <- getProcessExitCode p
forM_ mexit $ \exit ->
throwIO $ ErrorCall $ case exit of
ExitSuccess -> "unexpected successful termination of container " <> name
ExitFailure code ->
"docker failed with exit code" <> show code <> " for container " <> name

mkName = do
number <- modifyMVar dockerImageNumber $ \n -> return (n + 1, n)
Expand Down
4 changes: 2 additions & 2 deletions tests/Test/Utils/SQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import Data.Maybe (fromMaybe)
import Data.Text (Text)
import GHC.Generics (Generic)
import System.IO.Unsafe (unsafePerformIO)
import Test.Hspec (Spec, it, shouldBe)
import Test.Hspec (Spec, it, shouldBe, sequential)
import Test.Hspec.Expectations.Contrib (annotate)

import Ambar.Emulator.Connector (partitioner, encoder, Connector(..))
Expand Down Expand Up @@ -81,7 +81,7 @@ testGenericSQL
-> (db -> table -> connector)
-> Config table
-> Spec
testGenericSQL od withConnection mkConfig conf = do
testGenericSQL od withConnection mkConfig conf = sequential $ do
-- checks that our tests can connect to postgres
it "connects" $
with (PartitionCount 1) $ \conn table _ _ -> do
Expand Down

0 comments on commit bbe5b61

Please sign in to comment.