Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UNIX socket to serve RPC #83

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions msgpack-aeson/msgpack-aeson.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ library
hs-source-dirs: src
exposed-modules: Data.MessagePack.Aeson

build-depends: base >= 4.7 && < 4.14
, aeson >= 0.8.0.2 && < 0.12
|| >= 1.0 && < 1.5
, bytestring >= 0.10.4 && < 0.11
, msgpack >= 1.1.0 && < 1.2
, scientific >= 0.3.2 && < 0.4
, text >= 1.2.3 && < 1.3
, unordered-containers >= 0.2.5 && < 0.3
, vector >= 0.10.11 && < 0.13
, deepseq >= 1.3 && < 1.5
build-depends: base == 4.14.*
, aeson == 1.5.*
, bytestring == 0.10.*
, msgpack == 1.2.*
, scientific == 0.3.*
, text == 1.2.*
, unordered-containers == 0.2.*
, vector == 0.12.*
, deepseq == 1.4.*

default-language: Haskell2010

Expand All @@ -48,7 +47,7 @@ test-suite msgpack-aeson-test
, aeson
, msgpack
-- test-specific dependencies
, tasty == 1.2.*
, tasty-hunit == 0.10.*
, tasty
, tasty-hunit

default-language: Haskell2010
35 changes: 18 additions & 17 deletions msgpack-rpc/msgpack-rpc.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12
name: msgpack-rpc
version: 1.0.0
version: 1.1.0

synopsis: A MessagePack-RPC Implementation
description: A MessagePack-RPC Implementation <http://msgpack.org/>
Expand All @@ -26,19 +26,19 @@ library
exposed-modules: Network.MessagePack.Server
Network.MessagePack.Client

build-depends: base >= 4.5 && < 4.13
, bytestring >= 0.10.4 && < 0.11
, text >= 1.2.3 && < 1.3
, network >= 2.6 && < 2.9
|| >= 3.0 && < 3.1
, mtl >= 2.2.1 && < 2.3
, monad-control >= 1.0.0.0 && < 1.1
, conduit >= 1.2.3.1 && < 1.3
, conduit-extra >= 1.1.3.4 && < 1.3
, binary-conduit >= 1.2.3 && < 1.3
, exceptions >= 0.8 && < 0.11
, binary >= 0.7.1 && < 0.9
, msgpack >= 1.1.0 && < 1.2
build-depends: base == 4.14.*
, binary == 0.8.*
, bytestring == 0.10.*
, binary-conduit == 1.3.*
, conduit == 1.3.*
, conduit-extra == 1.3.*
, exceptions == 0.10.*
, msgpack == 1.2.*
, mtl == 2.2.*
, monad-control == 1.0.*
, network == 3.1.*
, streaming-commons == 0.2.*
, text == 1.2.*

test-suite msgpack-rpc-test
default-language: Haskell2010
Expand All @@ -49,9 +49,10 @@ test-suite msgpack-rpc-test
build-depends: msgpack-rpc
-- inherited constraints via `msgpack-rpc`
, base
, conduit-extra == 1.3.*
, mtl
, network
-- test-specific dependencies
, async == 2.2.*
, tasty == 1.2.*
, tasty-hunit == 0.10.*
, async
, tasty
, tasty-hunit
40 changes: 33 additions & 7 deletions msgpack-rpc/src/Network/MessagePack/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,28 @@

module Network.MessagePack.Client (
-- * MessagePack Client type
Client, execClient,
Client, execClient, execClientUnix,

-- * Call RPC method
call,

-- * RPC error
RpcError(..),

-- * Settings
ClientSettings,
clientSettings,
U.ClientSettingsUnix,
SN.clientSettingsUnix,

-- * Getters & setters
SN.serverSettingsUnix,
SN.getReadBufferSize,
SN.setReadBufferSize,
getAfterBind,
setAfterBind,
getPort,
setPort,
) where

import Control.Applicative
Expand All @@ -49,25 +64,36 @@ import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Network
import qualified Data.Conduit.Network.Unix as U
import Data.Conduit.Serialization.Binary
import Data.MessagePack
import qualified Data.Streaming.Network as SN
import Data.Typeable
import System.IO

clientSettingsUnix :: FilePath -> U.ClientSettingsUnix
clientSettingsUnix = U.clientSettings

newtype Client a
= ClientT { runClient :: StateT Connection IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadThrow)

-- | RPC connection type
data Connection
= Connection
!(ResumableSource IO S.ByteString)
!(Sink S.ByteString IO ())
!(SealedConduitT () S.ByteString IO ())
!(ConduitT S.ByteString Void IO ())
!Int

execClient :: S.ByteString -> Int -> Client a -> IO ()
execClient host port m =
runTCPClient (clientSettings port host) $ \ad -> do
execClient :: ClientSettings -> Client a -> IO ()
execClient settings m =
runTCPClient settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
void $ evalStateT (runClient m) (Connection rsrc (appSink ad) 0)

execClientUnix :: U.ClientSettingsUnix -> Client a -> IO ()
execClientUnix settings m =
U.runUnixClient settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
void $ evalStateT (runClient m) (Connection rsrc (appSink ad) 0)

Expand Down Expand Up @@ -97,7 +123,7 @@ rpcCall :: String -> [Object] -> Client Object
rpcCall methodName args = ClientT $ do
Connection rsrc sink msgid <- CMS.get
(rsrc', res) <- lift $ do
CB.sourceLbs (pack (0 :: Int, msgid, methodName, args)) $$ sink
runConduit $ CB.sourceLbs (pack (0 :: Int, msgid, methodName, args)) .| sink
rsrc $$++ sinkGet Binary.get
CMS.put $ Connection rsrc' sink (msgid + 1)

Expand Down
66 changes: 51 additions & 15 deletions msgpack-rpc/src/Network/MessagePack/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,40 @@ module Network.MessagePack.Server (
method,
-- * Start RPC server
serve,
serveUnix,

-- * RPC server settings
ServerSettings,
serverSettings,
U.ServerSettingsUnix,

-- * Getters & setters
SN.serverSettingsUnix,
SN.getReadBufferSize,
SN.setReadBufferSize,
getAfterBind,
setAfterBind,
getPort,
setPort,
) where

import Conduit (MonadUnliftIO)
import Control.Applicative
import Control.Monad
import Control.Monad.Catch
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Binary
import Data.ByteString (ByteString)
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Network
import qualified Data.Conduit.Network.Unix as U
import Data.Conduit.Serialization.Binary
import Data.List
import Data.MessagePack
import Data.MessagePack.Result
import qualified Data.Streaming.Network as SN
import Data.Typeable

-- ^ MessagePack RPC method
Expand Down Expand Up @@ -100,25 +120,41 @@ method :: MethodType m f
-> Method m
method name body = Method name $ toBody body

-- | Start RPC server with a set of RPC methods.
serve :: (MonadBaseControl IO m, MonadIO m, MonadCatch m, MonadThrow m)
=> Int -- ^ Port number
-> [Method m] -- ^ list of methods
-- | Start an RPC server with a set of RPC methods on a TCP socket.
serve :: (MonadBaseControl IO m, MonadUnliftIO m, MonadIO m, MonadCatch m, MonadThrow m)
=> ServerSettings -- ^ settings
-> [Method m] -- ^ list of methods
-> m ()
serve port methods = runGeneralTCPServer (serverSettings port "*") $ \ad -> do
serve settings methods = runGeneralTCPServer settings $ \ad -> do
(rsrc, _) <- appSource ad $$+ return ()
(_ :: Either ParseError ()) <- try $ processRequests rsrc (appSink ad)
(_ :: Either ParseError ()) <- try $ processRequests methods rsrc (appSink ad)
return ()
where
processRequests rsrc sink = do
(rsrc', res) <- rsrc $$++ do
obj <- sinkGet get
case fromObject obj of
Error e -> throwM $ ServerError e
Success req -> lift $ getResponse (req :: Request)
_ <- CB.sourceLbs (pack res) $$ sink
processRequests rsrc' sink

-- | Start an RPC server with a set of RPC methods on a Unix domain socket.
serveUnix :: (MonadBaseControl IO m, MonadIO m, MonadCatch m, MonadThrow m)
=> U.ServerSettingsUnix
-> [Method m] -- ^ list of methods
-> m ()
serveUnix settings methods = liftBaseWith $ \run ->
U.runUnixServer settings $ \ad -> void . run $ do
(rsrc, _) <- appSource ad $$+ return ()
(_ :: Either ParseError ()) <- try $ processRequests methods rsrc (appSink ad)
return ()

processRequests :: (MonadThrow m)
=> [Method m] -- ^ list of methods
-> SealedConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> m b
processRequests methods rsrc sink = do
(rsrc', res) <- rsrc $$++ do
obj <- sinkGet get
case fromObject obj of
Error err -> throwM $ ServerError $ "invalid request: " ++ err
Success req -> lift $ getResponse (req :: Request)
_ <- runConduit $ CB.sourceLbs (pack res) .| sink
processRequests methods rsrc' sink
where
getResponse (rtype, msgid, methodName, args) = do
when (rtype /= 0) $
throwM $ ServerError $ "request type is not 0, got " ++ show rtype
Expand Down
61 changes: 51 additions & 10 deletions msgpack-rpc/test/test.hs
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Control.Monad.Trans
import Test.Tasty
import Test.Tasty.HUnit

import Network.MessagePack.Client
import Network.MessagePack.Server
import Network.Socket (withSocketsDo)
import Network.Socket (Socket, withSocketsDo)

import System.IO (openTempFile)

port :: Int
port = 5000

main :: IO ()
main = withSocketsDo $ defaultMain $
testGroup "simple service"
[ testCase "test" $ server `race_` (threadDelay 1000 >> client) ]
main = do
(f, _) <- openTempFile "/tmp" "socket.sock"
withSocketsDo $ defaultMain $
testGroup "simple service"
[ testCase "test TCP" $ testClientServer (clientTCP port) (serverTCP port)
, testCase "test Unix" $ testClientServer (clientUnix f) (serverUnix f) ]

testClientServer :: IO () -> ((Socket -> IO ()) -> IO ()) -> IO ()
testClientServer client server = do
(okChan :: Chan ()) <- newChan
forkIO $ server (const $ writeChan okChan ())
readChan okChan
client

serverTCP :: Int -> (Socket -> IO ()) -> IO ()
serverTCP port afterBind =
serve (setAfterBind afterBind $ serverSettings port "*")
[ method "add" add
, method "echo" echo
]
where
add :: Int -> Int -> Server Int
add x y = return $ x + y

echo :: String -> Server String
echo s = return $ "***" ++ s ++ "***"

server :: IO ()
server =
serve port
clientTCP :: Int -> IO ()
clientTCP port = execClient (clientSettings port "localhost") $ do
r1 <- add 123 456
liftIO $ r1 @?= 123 + 456
r2 <- echo "hello"
liftIO $ r2 @?= "***hello***"
where
add :: Int -> Int -> Client Int
add = call "add"

echo :: String -> Client String
echo = call "echo"

serverUnix :: FilePath -> (Socket -> IO ()) -> IO ()
serverUnix path afterBind =
serveUnix (setAfterBind afterBind $ serverSettingsUnix path)
[ method "add" add
, method "echo" echo
]
Expand All @@ -31,8 +71,8 @@ server =
echo :: String -> Server String
echo s = return $ "***" ++ s ++ "***"

client :: IO ()
client = execClient "localhost" port $ do
clientUnix :: FilePath -> IO ()
clientUnix path = execClientUnix (clientSettingsUnix path) $ do
r1 <- add 123 456
liftIO $ r1 @?= 123 + 456
r2 <- echo "hello"
Expand All @@ -43,3 +83,4 @@ client = execClient "localhost" port $ do

echo :: String -> Client String
echo = call "echo"

Loading