diff --git a/README.md b/README.md index 962c428..c673baf 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ A YAML configuration file should describe all sources and destinations to be use data_sources: # Connect to a PostgreSQL database - - id: my_postgres_source + - id: postgres_source description: Main events store type: postgres host: localhost @@ -36,7 +36,7 @@ data_sources: partitioningColumn: aggregate_id # Connect to a MySQL database - - id: my_mysql_source + - id: postgres_source description: Main events store type: mysql host: localhost @@ -53,6 +53,24 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Connect to an SQLServer database + - id: sqlserver_source + description: Main events store + type: sqlserver + host: localhost + port: 1433 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: @@ -66,8 +84,9 @@ data_destinations: password: password123 sources: - - my_mysql_source - - my_postgres_source + - postgres_source + - sqlserver_source + - file_source # Send data to a file. One entry per line. - id: file_destination @@ -76,8 +95,9 @@ data_destinations: path: ./temp.file sources: - - my_mysql_source - - my_postgres_source + - postgres_source + - sqlserver_source + - file_source ``` ## Running the program diff --git a/examples/config.yml b/examples/config.yml index fee3b26..2a45380 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -38,6 +38,24 @@ data_sources: autoIncrementingColumn: id partitioningColumn: aggregate_id + # Connect to an SQLServer database + - id: sqlserver_source + description: Main events store + type: sqlserver + host: localhost + port: 1433 + username: my_user + password: my_pass + database: my_db + table: events_table + columns: + - id + - aggregate_id + - sequence_number + - payload + autoIncrementingColumn: id + partitioningColumn: aggregate_id + # Connections to your endpoint. # The Emulator will send data read from the databases to these endpoints. data_destinations: @@ -51,8 +69,9 @@ data_destinations: password: password123 sources: - - postgres source - - file source + - postgres_source + - sqlserver_source + - file_source # Send data to a file. One entry per line. - id: file_destination @@ -62,4 +81,5 @@ data_destinations: sources: - postgres_source + - sqlserver_source - file_source diff --git a/src/Ambar/Emulator.hs b/src/Ambar/Emulator.hs index 7c8940b..c8fdf58 100644 --- a/src/Ambar/Emulator.hs +++ b/src/Ambar/Emulator.hs @@ -15,9 +15,10 @@ import GHC.Generics (Generic) import System.Directory (doesFileExist) import System.FilePath (()) -import Ambar.Emulator.Connector.Postgres (PostgreSQLState) -import Ambar.Emulator.Connector.MySQL (MySQLState) import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder) +import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState) +import Ambar.Emulator.Connector.MySQL (MySQLState) +import Ambar.Emulator.Connector.Postgres (PostgreSQLState) import qualified Ambar.Emulator.Projector as Projector import Ambar.Emulator.Projector (Projection(..)) @@ -96,6 +97,7 @@ emulate logger_ config env = do case s_source source of SourcePostgreSQL _ -> StatePostgres def SourceMySQL _ -> StateMySQL def + SourceSQLServer _ -> StateSQLServer def SourceFile _ -> StateFile () projectAll queue = forConcurrently_ (c_destinations env) (project queue) @@ -134,6 +136,7 @@ newtype EmulatorState = EmulatorState data SavedState = StatePostgres PostgreSQLState | StateMySQL MySQLState + | StateSQLServer SQLServerState | StateFile () deriving (Generic) deriving anyclass (ToJSON, FromJSON) @@ -160,6 +163,11 @@ toConnectorConfig source sstate = StateMySQL state -> return $ ConnectorConfig source msql state StateMySQL _ -> incompatible + SourceSQLServer sqlserver -> + case sstate of + StateSQLServer state -> + return $ ConnectorConfig source sqlserver state StateSQLServer + _ -> incompatible SourceFile path -> case sstate of StateFile () -> diff --git a/src/Ambar/Emulator/Config.hs b/src/Ambar/Emulator/Config.hs index 10eb8c0..7416672 100644 --- a/src/Ambar/Emulator/Config.hs +++ b/src/Ambar/Emulator/Config.hs @@ -27,6 +27,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import qualified Data.Yaml as Yaml +import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Connector.MySQL (MySQL(..)) import Ambar.Emulator.Connector.File (FileConnector(..)) @@ -60,6 +61,7 @@ data Source = SourceFile FileConnector | SourcePostgreSQL PostgreSQL | SourceMySQL MySQL + | SourceSQLServer SQLServer data DataDestination = DataDestination { d_id :: Id DataDestination @@ -104,6 +106,7 @@ instance FromJSON DataSource where case t of "postgres" -> parsePostgreSQL o "mysql" -> parseMySQL o + "sqlserver" -> parseSQLServer o "file" -> parseFile o _ -> fail $ unwords [ "Invalid data source type: '" <> t <> "'." @@ -135,6 +138,18 @@ instance FromJSON DataSource where c_incrementingColumn <- o .: "autoIncrementingColumn" return $ SourceMySQL MySQL{..} + parseSQLServer o = do + c_host <- o .: "host" + c_port <- o .: "port" + c_username <- o .: "username" + c_password <- o .: "password" + c_database <- o .: "database" + c_table <- o .: "table" + c_columns <- o .: "columns" + c_partitioningColumn <- o .: "partitioningColumn" + c_incrementingColumn <- o .: "autoIncrementingColumn" + return $ SourceSQLServer SQLServer{..} + parseFile o = SourceFile . FileConnector <$> (o .: "path") parseDataDestination diff --git a/src/Ambar/Emulator/Projector.hs b/src/Ambar/Emulator/Projector.hs index 577cd91..ef6c602 100644 --- a/src/Ambar/Emulator/Projector.hs +++ b/src/Ambar/Emulator/Projector.hs @@ -26,6 +26,7 @@ import GHC.Generics (Generic) import Ambar.Emulator.Config (Id(..), DataDestination, DataSource(..), Source(..)) import Ambar.Emulator.Queue.Topic (Topic, ReadError(..), PartitionCount(..)) import qualified Ambar.Emulator.Queue.Topic as Topic +import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..)) import Ambar.Emulator.Connector.Postgres (PostgreSQL(..)) import Ambar.Emulator.Connector.MySQL (MySQL(..)) import Ambar.Transport (Transport) @@ -119,6 +120,12 @@ relevantFields source (Payload value) = renderPretty $ | field <- [c_serialColumn, c_partitioningColumn] , Just v <- [KeyMap.lookup (fromString $ Text.unpack field) o] ] + SourceSQLServer SQLServer{..} -> + fillSep $ + [ pretty field <> ":" <+> prettyJSON v + | field <- [c_incrementingColumn, c_partitioningColumn] + , Just v <- [KeyMap.lookup (fromString $ Text.unpack field) o] + ] where withObject f = case value of diff --git a/tests/Test/Utils/Docker.hs b/tests/Test/Utils/Docker.hs index 55272fd..34e59ef 100644 --- a/tests/Test/Utils/Docker.hs +++ b/tests/Test/Utils/Docker.hs @@ -3,9 +3,7 @@ module Test.Utils.Docker , withDocker ) where -import Control.Concurrent.Async (race, wait, withAsync) import Control.Concurrent (MVar, newMVar, modifyMVar) -import Control.Concurrent.Async (race_) import Control.Exception (ErrorCall(..), throwIO) import Control.Monad (forM_) import System.Exit (ExitCode(..)) @@ -24,15 +22,13 @@ import System.Process ( CreateProcess(..) , StdStream(..) , proc - , waitForProcess + , getProcessExitCode , createPipe - , cleanupProcess - , terminateProcess , withCreateProcess ) import System.IO.Unsafe (unsafePerformIO) import Utils.Async (withAsyncThrow) -import Utils.Delay (delay, seconds) +import Utils.Delay (seconds, every) data DockerCommand = DockerRun @@ -64,34 +60,24 @@ withDocker debug tag cmd act = } withCreateProcess create $ \stdin stdout stderr p -> do let pinfo = (stdin, stdout, stderr, p) - withAsync (waitFor name pinfo) $ \a -> do - r <- race (wait a) $ if debug - then tracing name hread act - else act hread - terminate pinfo - case r of - Left _ -> error "impossible" - Right v -> return v + withAsyncThrow (waitFor name pinfo) $ do + if debug + then tracing name hread act + else act hread where - terminate pinfo = race_ (cleanupProcess pinfo) (forceEnd pinfo) - - forceEnd (_,_,_,p) = do - putStrLn $ "Forcing end of container: " <> tag - delay (seconds 5) - terminateProcess p - withPipe f = do (hread, hwrite) <- createPipe hSetBuffering hread LineBuffering hSetBuffering hwrite LineBuffering f hread hwrite - waitFor name (_,_,_,p) = do - exit <- waitForProcess p - throwIO $ ErrorCall $ case exit of - ExitSuccess -> "unexpected successful termination of container " <> name - ExitFailure code -> - "docker failed with exit code" <> show code <> " for container " <> name + waitFor name (_,_,_,p) = every (seconds 1) $ do + mexit <- getProcessExitCode p + forM_ mexit $ \exit -> + throwIO $ ErrorCall $ case exit of + ExitSuccess -> "unexpected successful termination of container " <> name + ExitFailure code -> + "docker failed with exit code" <> show code <> " for container " <> name mkName = do number <- modifyMVar dockerImageNumber $ \n -> return (n + 1, n) diff --git a/tests/Test/Utils/SQL.hs b/tests/Test/Utils/SQL.hs index b3ecdb6..9e6a1a4 100644 --- a/tests/Test/Utils/SQL.hs +++ b/tests/Test/Utils/SQL.hs @@ -29,7 +29,7 @@ import Data.Maybe (fromMaybe) import Data.Text (Text) import GHC.Generics (Generic) import System.IO.Unsafe (unsafePerformIO) -import Test.Hspec (Spec, it, shouldBe) +import Test.Hspec (Spec, it, shouldBe, sequential) import Test.Hspec.Expectations.Contrib (annotate) import Ambar.Emulator.Connector (partitioner, encoder, Connector(..)) @@ -81,7 +81,7 @@ testGenericSQL -> (db -> table -> connector) -> Config table -> Spec -testGenericSQL od withConnection mkConfig conf = do +testGenericSQL od withConnection mkConfig conf = sequential $ do -- checks that our tests can connect to postgres it "connects" $ with (PartitionCount 1) $ \conn table _ _ -> do