diff --git a/src/Ambar/Emulator/Connector/File.hs b/src/Ambar/Emulator/Connector/File.hs index 032f6c1..67a6fb5 100644 --- a/src/Ambar/Emulator/Connector/File.hs +++ b/src/Ambar/Emulator/Connector/File.hs @@ -50,6 +50,8 @@ import System.IO , hSeek , openFile , hSeek + , hIsEOF + , hClose , IOMode(..) , SeekMode(..) ) @@ -63,7 +65,6 @@ import Ambar.Emulator.Queue.Partition.File import Ambar.Emulator.Queue.Topic (modPartitioner) import Ambar.Emulator.Queue.Topic (Producer) import qualified Ambar.Emulator.Queue.Topic as Topic -import Ambar.Record (Value(..)) import Utils.Async (withAsyncThrow) import Utils.Logger (SimpleLogger, fatal, logInfo) import Utils.Delay (Duration, delay, millis) @@ -113,32 +114,6 @@ write FileConnector{..} json = do atomically $ modifyTVar c_state $ \state -> state { c_fileSize = c_fileSize state + entrySize } -readNext :: SimpleLogger -> TVar FileConnectorState -> TMVar Handle -> IO Json.Value -readNext logger varState varReadHandle = - withReadLock $ \readHandle -> do - bs <- Char8.hGetLine readHandle - value <- case Json.eitherDecode $ LB.fromStrict bs of - Left e -> fatal logger $ unlines - [ "Unable to decode value from source:" - , show e - , Text.unpack $ Text.decodeUtf8 bs - ] - Right v -> return v - let entrySize = fromIntegral $ BS.length bs + BS.length "\n" - atomically $ modifyTVar varState $ \state -> - state { c_offset = c_offset state + entrySize } - return value - where - withReadLock = bracket acquire release - where - acquire = atomically $ do - FileConnectorState fsize pos <- readTVar varState - when (fsize <= pos) retry - takeTMVar varReadHandle - - release readHandle = atomically $ - putTMVar varReadHandle readHandle - data FileConnectorState = FileConnectorState { c_fileSize :: Integer , c_offset :: Integer @@ -172,12 +147,6 @@ connect conn@(FileConnector {..}) logger initState producer f = do withAsyncThrow worker $ f (readTVar c_state) where - worker = forever $ do - value <- readNext logger c_state c_readHandle - let record = FileRecord value - Topic.write producer record - logResult record - updateFileSize = forever $ do newSize <- c_getFileSize delay _POLLING_INTERVAL -- also serves to wait until any writing finishes @@ -186,6 +155,12 @@ connect conn@(FileConnector {..}) logger initState producer f = do when (fsize < newSize) $ writeTVar c_state $ FileConnectorState newSize offset + worker = forever $ do + value <- readNext + let record = FileRecord value + Topic.write producer record + logResult record + logResult record = logInfo logger $ renderPretty $ "ingested." <+> commaSeparated @@ -193,20 +168,64 @@ connect conn@(FileConnector {..}) logger initState producer f = do , "partitioning_value:" <+> prettyJSON (partitioningValue conn record) ] -partitioningValue :: FileConnector -> FileRecord -> Value + -- | Blocks until there is something to read. + readNext :: IO Json.Value + readNext = + withReadLock $ \readHandle -> do + bs <- Char8.hGetLine readHandle + value <- case Json.eitherDecode $ LB.fromStrict bs of + Left e -> fatal logger $ unlines + [ "Unable to decode value from source:" + , show e + , Text.unpack $ Text.decodeUtf8 bs + ] + Right v -> return v + let entrySize = fromIntegral $ BS.length bs + BS.length "\n" + atomically $ modifyTVar c_state $ \state -> + state { c_offset = c_offset state + entrySize } + return value + + withReadLock :: (Handle -> IO a) -> IO a + withReadLock = bracket acquire release + where + acquire = do + -- wait till there is data to read and take the lock. + (h, offset) <- atomically $ do + FileConnectorState fsize offset <- readTVar c_state + when (fsize <= offset) retry + h <- takeTMVar c_readHandle + return (h, offset) + + -- For some reason, if the file we are reading is updated by an external + -- program (like the user manually adding an entry) the file reading library + -- don't detect that EOF has moved. In this case we have to close this handle + -- and open a new one. + eof <- hIsEOF h + if not eof + then return h + else do + hClose h + h' <- openFile c_path ReadMode + hSeek h' AbsoluteSeek offset + return h' + + release readHandle = atomically $ + putTMVar c_readHandle readHandle + + +partitioningValue :: FileConnector -> FileRecord -> Json.Value partitioningValue FileConnector{..} r = getField c_partitioningField r -incrementingValue :: FileConnector -> FileRecord -> Value +incrementingValue :: FileConnector -> FileRecord -> Json.Value incrementingValue FileConnector{..} r = getField c_incrementingField r -getField :: Text -> FileRecord -> Value +getField :: Text -> FileRecord -> Json.Value getField field (FileRecord json) = fromMaybe err $ do o <- getObject json let key = fromString $ Text.unpack field v <- KeyMap.lookup key o - let txt = jsonToTxt v - return $ Json txt v + return $ v where err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json