Skip to content

Commit

Permalink
Handle files updating outside of the RTS
Browse files Browse the repository at this point in the history
  • Loading branch information
lazamar committed Dec 13, 2024
1 parent b7844f1 commit e313f4c
Showing 1 changed file with 57 additions and 38 deletions.
95 changes: 57 additions & 38 deletions src/Ambar/Emulator/Connector/File.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import System.IO
, hSeek
, openFile
, hSeek
, hIsEOF
, hClose
, IOMode(..)
, SeekMode(..)
)
Expand All @@ -63,7 +65,6 @@ import Ambar.Emulator.Queue.Partition.File
import Ambar.Emulator.Queue.Topic (modPartitioner)
import Ambar.Emulator.Queue.Topic (Producer)
import qualified Ambar.Emulator.Queue.Topic as Topic
import Ambar.Record (Value(..))
import Utils.Async (withAsyncThrow)
import Utils.Logger (SimpleLogger, fatal, logInfo)
import Utils.Delay (Duration, delay, millis)
Expand Down Expand Up @@ -113,32 +114,6 @@ write FileConnector{..} json = do
atomically $ modifyTVar c_state $ \state ->
state { c_fileSize = c_fileSize state + entrySize }

readNext :: SimpleLogger -> TVar FileConnectorState -> TMVar Handle -> IO Json.Value
readNext logger varState varReadHandle =
withReadLock $ \readHandle -> do
bs <- Char8.hGetLine readHandle
value <- case Json.eitherDecode $ LB.fromStrict bs of
Left e -> fatal logger $ unlines
[ "Unable to decode value from source:"
, show e
, Text.unpack $ Text.decodeUtf8 bs
]
Right v -> return v
let entrySize = fromIntegral $ BS.length bs + BS.length "\n"
atomically $ modifyTVar varState $ \state ->
state { c_offset = c_offset state + entrySize }
return value
where
withReadLock = bracket acquire release
where
acquire = atomically $ do
FileConnectorState fsize pos <- readTVar varState
when (fsize <= pos) retry
takeTMVar varReadHandle

release readHandle = atomically $
putTMVar varReadHandle readHandle

data FileConnectorState = FileConnectorState
{ c_fileSize :: Integer
, c_offset :: Integer
Expand Down Expand Up @@ -172,12 +147,6 @@ connect conn@(FileConnector {..}) logger initState producer f = do
withAsyncThrow worker $
f (readTVar c_state)
where
worker = forever $ do
value <- readNext logger c_state c_readHandle
let record = FileRecord value
Topic.write producer record
logResult record

updateFileSize = forever $ do
newSize <- c_getFileSize
delay _POLLING_INTERVAL -- also serves to wait until any writing finishes
Expand All @@ -186,27 +155,77 @@ connect conn@(FileConnector {..}) logger initState producer f = do
when (fsize < newSize) $
writeTVar c_state $ FileConnectorState newSize offset

worker = forever $ do
value <- readNext
let record = FileRecord value
Topic.write producer record
logResult record

logResult record =
logInfo logger $ renderPretty $
"ingested." <+> commaSeparated
[ "incrementing_value:" <+> prettyJSON (incrementingValue conn record)
, "partitioning_value:" <+> prettyJSON (partitioningValue conn record)
]

partitioningValue :: FileConnector -> FileRecord -> Value
-- | Blocks until there is something to read.
readNext :: IO Json.Value
readNext =
withReadLock $ \readHandle -> do
bs <- Char8.hGetLine readHandle
value <- case Json.eitherDecode $ LB.fromStrict bs of
Left e -> fatal logger $ unlines
[ "Unable to decode value from source:"
, show e
, Text.unpack $ Text.decodeUtf8 bs
]
Right v -> return v
let entrySize = fromIntegral $ BS.length bs + BS.length "\n"
atomically $ modifyTVar c_state $ \state ->
state { c_offset = c_offset state + entrySize }
return value

withReadLock :: (Handle -> IO a) -> IO a
withReadLock = bracket acquire release
where
acquire = do
-- wait till there is data to read and take the lock.
(h, offset) <- atomically $ do
FileConnectorState fsize offset <- readTVar c_state
when (fsize <= offset) retry
h <- takeTMVar c_readHandle
return (h, offset)

-- For some reason, if the file we are reading is updated by an external
-- program (like the user manually adding an entry) the file reading library
-- don't detect that EOF has moved. In this case we have to close this handle
-- and open a new one.
eof <- hIsEOF h
if not eof
then return h
else do
hClose h
h' <- openFile c_path ReadMode
hSeek h' AbsoluteSeek offset
return h'

release readHandle = atomically $
putTMVar c_readHandle readHandle


partitioningValue :: FileConnector -> FileRecord -> Json.Value
partitioningValue FileConnector{..} r = getField c_partitioningField r

incrementingValue :: FileConnector -> FileRecord -> Value
incrementingValue :: FileConnector -> FileRecord -> Json.Value
incrementingValue FileConnector{..} r = getField c_incrementingField r

getField :: Text -> FileRecord -> Value
getField :: Text -> FileRecord -> Json.Value
getField field (FileRecord json) =
fromMaybe err $ do
o <- getObject json
let key = fromString $ Text.unpack field
v <- KeyMap.lookup key o
let txt = jsonToTxt v
return $ Json txt v
return $ v
where
err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json

Expand Down

0 comments on commit e313f4c

Please sign in to comment.