Skip to content

Commit

Permalink
Merge pull request #37 from ambarltd/file-connector
Browse files Browse the repository at this point in the history
Real support for a File Connector
  • Loading branch information
lazamar authored Dec 13, 2024
2 parents 7cec51d + 34c182d commit 0250711
Show file tree
Hide file tree
Showing 16 changed files with 496 additions and 223 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ data_sources:
partitioningColumn: aggregate_id

# Connect to a MySQL database
- id: postgres_source
- id: mysql_source
description: Main events store
type: mysql
host: localhost
Expand Down Expand Up @@ -71,6 +71,16 @@ data_sources:
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Use a plain text file as a data source.
# Each line must be a valid JSON object.
# Values are projected as they are added.
- id: file_source
description: My file JSON event store
type: file
path: ./path/to/source.file
incrementingField: id
partitioningField: aggregate_id

# Connections to your endpoint.
# The Emulator will send data read from the databases to these endpoints.
data_destinations:
Expand Down
1 change: 1 addition & 0 deletions emulator.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ test-suite emulator-tests
Test.Config
Test.Queue
Test.Connector
Test.Connector.File
Test.Connector.MySQL
Test.Connector.PostgreSQL
Test.Connector.MicrosoftSQLServer
Expand Down
12 changes: 11 additions & 1 deletion examples/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ data_sources:
partitioningColumn: aggregate_id

# Connect to a MySQL database
- id: postgres_source
- id: mysql_source
description: Main events store
type: mysql
host: localhost
Expand Down Expand Up @@ -56,6 +56,16 @@ data_sources:
autoIncrementingColumn: id
partitioningColumn: aggregate_id

# Use a plain text file as a data source.
# Each line must be a valid JSON object.
# Values are projected as they are added.
- id: file_source
description: My file JSON event store
type: file
path: ./path/to/source.file
incrementingField: id
partitioningField: aggregate_id

# Connections to your endpoint.
# The Emulator will send data read from the databases to these endpoints.
data_destinations:
Expand Down
12 changes: 7 additions & 5 deletions src/Ambar/Emulator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import System.Directory (doesFileExist)
import System.FilePath ((</>))

import Ambar.Emulator.Connector (Connector(..), connect, partitioner, encoder)
import Ambar.Emulator.Connector.File (FileConnectorState, mkFileConnector)
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServerState)
import Ambar.Emulator.Connector.MySQL (MySQLState)
import Ambar.Emulator.Connector.Postgres (PostgreSQLState)
Expand Down Expand Up @@ -98,7 +99,7 @@ emulate logger_ config env = do
SourcePostgreSQL _ -> StatePostgres def
SourceMySQL _ -> StateMySQL def
SourceSQLServer _ -> StateSQLServer def
SourceFile _ -> StateFile ()
SourceFile{} -> StateFile def

projectAll queue = forConcurrently_ (c_destinations env) (project queue)

Expand Down Expand Up @@ -137,7 +138,7 @@ data SavedState
= StatePostgres PostgreSQLState
| StateMySQL MySQLState
| StateSQLServer SQLServerState
| StateFile ()
| StateFile FileConnectorState
deriving (Generic)
deriving anyclass (ToJSON, FromJSON)

Expand Down Expand Up @@ -168,10 +169,11 @@ toConnectorConfig source sstate =
StateSQLServer state ->
return $ ConnectorConfig source sqlserver state StateSQLServer
_ -> incompatible
SourceFile path ->
SourceFile path partitioningField incrementingField ->
case sstate of
StateFile () ->
return $ ConnectorConfig source path () StateFile
StateFile state -> do
fileconn <- mkFileConnector path partitioningField incrementingField
return $ ConnectorConfig source fileconn state StateFile
_ -> incompatible
where
incompatible = throwIO $ ErrorCall $
Expand Down
9 changes: 6 additions & 3 deletions src/Ambar/Emulator/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import qualified Data.Yaml as Yaml
import Ambar.Emulator.Connector.MicrosoftSQLServer (SQLServer(..))
import Ambar.Emulator.Connector.Postgres (PostgreSQL(..))
import Ambar.Emulator.Connector.MySQL (MySQL(..))
import Ambar.Emulator.Connector.File (FileConnector(..))
import Ambar.Transport (SubmissionError)
import Ambar.Transport.Http (Endpoint, User, Password)

Expand Down Expand Up @@ -58,7 +57,7 @@ data DataSource = DataSource
}

data Source
= SourceFile FileConnector
= SourceFile { sf_path :: FilePath, sf_partitioningField :: Text, sf_incrementingField :: Text }
| SourcePostgreSQL PostgreSQL
| SourceMySQL MySQL
| SourceSQLServer SQLServer
Expand Down Expand Up @@ -150,7 +149,11 @@ instance FromJSON DataSource where
c_incrementingColumn <- o .: "autoIncrementingColumn"
return $ SourceSQLServer SQLServer{..}

parseFile o = SourceFile . FileConnector <$> (o .: "path")
parseFile o = do
sf_path <- o .: "path"
sf_partitioningField <- o .: "partitioningField"
sf_incrementingField <- o .: "incrementingField"
return $ SourceFile{..}

parseDataDestination
:: Map (Id DataSource) DataSource
Expand Down
238 changes: 215 additions & 23 deletions src/Ambar/Emulator/Connector/File.hs
Original file line number Diff line number Diff line change
@@ -1,47 +1,239 @@
module Ambar.Emulator.Connector.File
( FileConnector(..)
) where
( FileConnector
, FileConnectorState
, FileRecord
, mkFileConnector
, write
, c_path
) where

{-| File connector.
Read JSON values from a file.
One value per line.
-}

import Control.Concurrent (MVar, newMVar, withMVar)
import Control.Concurrent.STM
( STM
, TMVar
, TVar
, newTVarIO
, readTVar
, atomically
, writeTVar
, newTMVarIO
, modifyTVar
, retry
, takeTMVar
, putTMVar
)
import Control.Exception (bracket)
import Control.Monad (forever, when)
import qualified Data.Aeson as Json
import Control.Monad (forM_)
import qualified Data.ByteString.Lazy.Char8 as Char8
import qualified Data.Aeson.KeyMap as KeyMap
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char8
import qualified Data.ByteString.Lazy as LB
import qualified Data.Text.Lazy as Text
import qualified Data.Text.Lazy.Encoding as Text
import Data.Default (Default)
import Data.Maybe (fromMaybe)
import Data.String (IsString(fromString))
import Data.Text (Text)
import qualified Data.Text.Lazy as LText
import qualified Data.Text.Lazy.Encoding as LText
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import GHC.Generics (Generic)
import GHC.IO.FD (FD)
import System.Directory (getFileSize)
import System.IO
( Handle
, hSeek
, openFile
, hSeek
, hIsEOF
, hClose
, IOMode(..)
, SeekMode(..)
)
import Prettyprinter ((<+>))

import qualified Ambar.Emulator.Connector as C
import Ambar.Emulator.Queue.Partition.File
( openNonLockingWritableFD
, writeFD
)
import Ambar.Emulator.Queue.Topic (modPartitioner)
import Ambar.Emulator.Queue.Topic (Producer)
import qualified Ambar.Emulator.Queue.Topic as Topic
import Utils.Async (withAsyncThrow)
import Utils.Logger (fatal, logInfo)
import Utils.Logger (SimpleLogger, fatal, logInfo)
import Utils.Delay (Duration, delay, millis)
import Utils.Prettyprinter (prettyJSON, renderPretty, commaSeparated)

data FileConnector = FileConnector FilePath
_POLLING_INTERVAL :: Duration
_POLLING_INTERVAL = millis 50

data FileConnector = FileConnector
{ c_path :: FilePath
, c_partitioningField :: Text
, c_incrementingField :: Text
, c_state :: TVar FileConnectorState
, c_readHandle :: TMVar Handle
, c_writeHandle :: MVar FD
, c_getFileSize :: IO Integer
}

-- | We don't close these file descriptors because we consider that
-- this is only used during tests.
mkFileConnector :: FilePath -> Text -> Text -> IO FileConnector
mkFileConnector path partitioningField incrementingField = do
size <- getFileSize path
varState <- newTVarIO (FileConnectorState size 0)
varWriteHandle <- do
fd <- openNonLockingWritableFD path
newMVar fd
varReadHandle <- do
readHandle <- openFile path ReadMode
newTMVarIO readHandle
return $ FileConnector
path
partitioningField
incrementingField
varState
varReadHandle
varWriteHandle
(getFileSize path)

-- Does not work in the presence of external writers to the same file.
write :: FileConnector -> Json.Value -> IO ()
write FileConnector{..} json = do
withMVar c_writeHandle $ \fd -> do
let entry = LB.toStrict (Json.encode json) <> "\n"
entrySize = fromIntegral (BS.length entry)
writeFD fd entry
atomically $ modifyTVar c_state $ \state ->
state { c_fileSize = c_fileSize state + entrySize }

data FileConnectorState = FileConnectorState
{ c_fileSize :: Integer
, c_offset :: Integer
}
deriving (Show, Generic)
deriving anyclass (Json.ToJSON, Json.FromJSON, Default)

newtype FileRecord = FileRecord Json.Value

instance C.Connector FileConnector where
type ConnectorState FileConnector = ()
type ConnectorState FileConnector = FileConnectorState
type ConnectorRecord FileConnector = FileRecord
partitioner = modPartitioner (const 1)
encoder (FileRecord value) = LB.toStrict $ Json.encode value
connect (FileConnector path) logger () producer f =
withAsyncThrow worker $ f (return ())
connect = connect

connect
:: FileConnector
-> SimpleLogger
-> FileConnectorState
-> Producer (FileRecord)
-> (STM FileConnectorState -> IO a)
-> IO a
connect conn@(FileConnector {..}) logger initState producer f = do
h <- atomically $ do
writeTVar c_state initState
takeTMVar c_readHandle
hSeek h AbsoluteSeek (c_offset initState)
atomically $ putTMVar c_readHandle h
withAsyncThrow updateFileSize $
withAsyncThrow worker $
f (readTVar c_state)
where
updateFileSize = forever $ do
newSize <- c_getFileSize
delay _POLLING_INTERVAL -- also serves to wait until any writing finishes
atomically $ do
FileConnectorState fsize offset <- readTVar c_state
when (fsize < newSize) $
writeTVar c_state $ FileConnectorState newSize offset

worker = forever $ do
value <- readNext
let record = FileRecord value
Topic.write producer record
logResult record

logResult record =
logInfo logger $ renderPretty $
"ingested." <+> commaSeparated
[ "incrementing_value:" <+> prettyJSON (incrementingValue conn record)
, "partitioning_value:" <+> prettyJSON (partitioningValue conn record)
]

-- | Blocks until there is something to read.
readNext :: IO Json.Value
readNext =
withReadLock $ \readHandle -> do
bs <- Char8.hGetLine readHandle
value <- case Json.eitherDecode $ LB.fromStrict bs of
Left e -> fatal logger $ unlines
[ "Unable to decode value from source:"
, show e
, Text.unpack $ Text.decodeUtf8 bs
]
Right v -> return v
let entrySize = fromIntegral $ BS.length bs + BS.length "\n"
atomically $ modifyTVar c_state $ \state ->
state { c_offset = c_offset state + entrySize }
return value

withReadLock :: (Handle -> IO a) -> IO a
withReadLock = bracket acquire release
where
worker = do
bs <- Char8.readFile path
forM_ (Char8.lines bs) $ \line -> do
value <- case Json.eitherDecode line of
Left e -> fatal logger $ unlines
[ "Unable to decode value from source:"
, show e
, Text.unpack $ Text.decodeUtf8 bs
]
Right v -> return v
Topic.write producer (FileRecord value)
logInfo logger $ "ingested. " <> Text.decodeUtf8 line
acquire = do
-- wait till there is data to read and take the lock.
(h, offset) <- atomically $ do
FileConnectorState fsize offset <- readTVar c_state
when (fsize <= offset) retry
h <- takeTMVar c_readHandle
return (h, offset)

-- For some reason, if the file we are reading is updated by an external
-- program (like the user manually adding an entry) the file reading library
-- don't detect that EOF has moved. In this case we have to close this handle
-- and open a new one.
eof <- hIsEOF h
if not eof
then return h
else do
hClose h
h' <- openFile c_path ReadMode
hSeek h' AbsoluteSeek offset
return h'

release readHandle = atomically $
putTMVar c_readHandle readHandle


partitioningValue :: FileConnector -> FileRecord -> Json.Value
partitioningValue FileConnector{..} r = getField c_partitioningField r

incrementingValue :: FileConnector -> FileRecord -> Json.Value
incrementingValue FileConnector{..} r = getField c_incrementingField r

getField :: Text -> FileRecord -> Json.Value
getField field (FileRecord json) =
fromMaybe err $ do
o <- getObject json
let key = fromString $ Text.unpack field
v <- KeyMap.lookup key o
return $ v
where
err = error $ Text.unpack $ "invalid serial value in :" <> jsonToTxt json

jsonToTxt :: Json.Value -> Text
jsonToTxt = LText.toStrict . LText.decodeUtf8 . Json.encode

getObject :: Json.Value -> Maybe Json.Object
getObject = \case
Json.Object o -> Just o
_ -> Nothing

Loading

0 comments on commit 0250711

Please sign in to comment.