Skip to content

Commit

Permalink
Remove streamly implementation.
Browse files Browse the repository at this point in the history
I don't quite understand the performance tradeoff yet. some things are faster and some things are slower. Can't tell what's going on for real.
  • Loading branch information
mchav committed Jan 16, 2025
1 parent 4788db1 commit 916922c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ jobs:
compilerVersion: 9.8.3
setup-method: ghcup-vanilla
allow-failure: false
- compiler: ghc-9.10.1
compilerKind: ghc
compilerVersion: 9.4.8
setup-method: ghcup
allow-failure: false
- compiler: ghc-9.6.6
compilerKind: ghc
compilerVersion: 9.6.6
Expand Down
6 changes: 0 additions & 6 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ library dataframe-lib
directory >= 1.3.0.0,
hashable >= 1.2,
statistics >= 0.13,
streamly >= 0.1,
streamly-core >= 0.2,
text >= 2.0,
time >= 1.12,
vector ^>= 0.13,
Expand Down Expand Up @@ -84,8 +82,6 @@ library dataframe-lib-dev
directory >= 1.3.0.0,
hashable >= 1.2,
statistics >= 0.13,
streamly >= 0.1,
streamly-core >= 0.2,
text >= 2.0,
time >= 1.12,
vector ^>= 0.13,
Expand Down Expand Up @@ -121,8 +117,6 @@ executable dataframe
directory >= 1.3.0.0,
hashable >= 1.2,
statistics >= 0.13,
streamly >= 0.1,
streamly-core >= 0.2,
text >= 2.0,
time >= 1.12,
vector ^>= 0.13,
Expand Down
74 changes: 32 additions & 42 deletions src/Data/DataFrame/IO/CSV.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,22 @@ import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as VU
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed.Mutable as VUM
import qualified Streamly.Data.Array as Array
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Internal.Data.Array as Array (compactOnByte, toStream)
import qualified Streamly.FileSystem.File as File
import qualified Streamly.FileSystem.Handle as Handle
import qualified Streamly.Unicode.Stream as Stream

import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many)
import Control.Monad (forM_, zipWithM_, unless, void, zipWithM)
import Control.Monad (forM_, zipWithM_, unless, void)
import Data.Attoparsec.Text
import Data.Char
import Data.DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength)
import Data.DataFrame.Internal.DataFrame (DataFrame(..))
import Data.DataFrame.Internal.Parsing
import Data.DataFrame.Operations.Typing
import Data.Either
import Data.Function (on, (&))
import Data.Function (on)
import Data.Maybe
import Data.Type.Equality
( TestEquality (testEquality),
type (:~:) (Refl)
)
import Data.Word
import GHC.Conc (numCapabilities)
import GHC.IO.Handle (Handle)
import GHC.IO (unsafePerformIO)
import Prelude hiding (concat, takeWhile)
import System.IO
import Type.Reflection
Expand Down Expand Up @@ -86,15 +74,15 @@ readSeparated c opts path = withFile path ReadMode $ \handle -> do
-- Initialize mutable vectors for each column
let numColumns = length columnNames
dataRow <- parseSep c <$> TIO.hGetLine handle
totalRows <- wc path
totalRows <- countRows path
let actualRows = if hasHeader opts then totalRows - 1 else totalRows
nullIndices <- VM.new numColumns
VM.set nullIndices []
mutableCols <- VM.new numColumns
getInitialDataVectors actualRows mutableCols dataRow

-- Read rows into the mutable vectors
fillColumns c mutableCols nullIndices handle
fillColumns actualRows c mutableCols nullIndices handle

-- Freeze the mutable vectors into immutable ones
nulls' <- V.unsafeFreeze nullIndices
Expand All @@ -110,51 +98,42 @@ getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO ()
getInitialDataVectors n mCol xs = do
forM_ (zip [0..] xs) $ \(i, x) -> do
col <- case inferValueType x of
"Int" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Int)) >>= \c -> VUM.write c 0 (fromMaybe 0 $ readInt x) >> return c)
"Double" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Double)) >>= \c -> VUM.write c 0 (fromMaybe 0 $ readDouble x) >> return c)
_ -> MutableBoxedColumn <$> ((VM.unsafeNew n :: IO (VM.IOVector T.Text)) >>= \c -> VM.write c 0 x >> return c)
"Int" -> MutableUnboxedColumn <$> ((VUM.new n :: IO (VUM.IOVector Int)) >>= \c -> VUM.write c 0 (fromMaybe 0 $ readInt x) >> return c)
"Double" -> MutableUnboxedColumn <$> ((VUM.new n :: IO (VUM.IOVector Double)) >>= \c -> VUM.write c 0 (fromMaybe 0 $ readDouble x) >> return c)
_ -> MutableBoxedColumn <$> ((VM.new n :: IO (VM.IOVector T.Text)) >>= \c -> VM.write c 0 x >> return c)
VM.write mCol i col

inferValueType :: T.Text -> T.Text
inferValueType s = let
inferValueType s = let
example = s
in case readInt example of
Just _ -> "Int"
Nothing -> case readDouble example of
Just _ -> "Double"
Nothing -> "Other"

wc :: String -> IO Int
wc file = File.read file
& Stream.decodeUtf8
& Stream.fold (Fold.foldl' (\l ch -> if ch == '\n' then l + 1 else l) 0)

fillColumns :: Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO ()
fillColumns c mutableCols nullIndices handle =
Handle.read handle
& Stream.decodeUtf8
& Stream.splitOn (== '\n') Fold.toList
& Stream.filter (not . null)
& Stream.parMapM (Stream.maxThreads numCapabilities) (return . T.pack)
& Stream.zipWith (,) (Stream.fromList [1..])
& Stream.fold (Fold.drainMapM (parseLine c mutableCols nullIndices))

parseLine :: Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> (Int, T.Text) -> IO ()
parseLine c mutableCols nullIndices (!i, !arr) = do
zipWithM_ (writeValue mutableCols nullIndices i) [0..] (parseSep c arr)
-- | Reads rows from the handle and stores values in mutable vectors.
fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO ()
fillColumns n c mutableCols nullIndices handle = do
forM_ [1..n] $ \i -> do
isEOF <- hIsEOF handle
unless isEOF $ do
row <- parseSep c <$> TIO.hGetLine handle
zipWithM_ (writeValue mutableCols nullIndices i) [0..] row

-- | Writes a value into the appropriate column, resizing the vector if necessary.
writeValue :: VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Int -> Int -> T.Text -> IO ()
writeValue mutableCols nullIndices count colIndex value = do
col <- VM.unsafeRead mutableCols colIndex
col <- VM.read mutableCols colIndex
res <- writeColumn count value col
let modify value = VM.unsafeModify nullIndices ((count, value) :) colIndex
either modify (const (return ())) res
case res of
Left value -> VM.modify nullIndices ((count, value) :) colIndex
Right _ -> return ()

-- | Freezes a mutable vector into an immutable one, trimming it to the actual row count.
freezeColumn :: VM.IOVector Column -> V.Vector [(Int, T.Text)] -> ReadOptions -> Int -> IO (Maybe Column)
freezeColumn mutableCols nulls opts colIndex = do
col <- VM.unsafeRead mutableCols colIndex
col <- VM.read mutableCols colIndex
Just <$> freezeColumn' (nulls V.! colIndex) col

-- | Constructs a dataframe column, optionally inferring types.
Expand Down Expand Up @@ -201,6 +180,17 @@ lineEnd =
void (char '\n') <|> void (string "\r\n") <|> void (char '\r')
<?> "end of line"


-- | First pass to count rows for exact allocation
countRows :: FilePath -> IO Int
countRows path = withFile path ReadMode $ \handle -> do
let loop !n = do
eof <- hIsEOF handle
if eof
then return n
else TLIO.hGetLine handle >> loop (n + 1)
loop 0

writeCsv :: String -> DataFrame -> IO ()
writeCsv = writeSeparated ','

Expand Down

0 comments on commit 916922c

Please sign in to comment.