# Pipes
# Producers
A Producer is some monadic action that can yield values for downstream consumption:
type Producer b = Proxy X () () b
yield :: Monad m => a -> Producer a m ()
For example:
naturals :: Monad m => Producer Int m ()
naturals = each [1..] -- each is a utility function exported by Pipes
We can of course have Producers that are functions of other values too:
naturalsUntil :: Monad m => Int -> Producer Int m ()
naturalsUntil n = each [1..n]
# Connecting Pipes
Use >-> (opens new window) to connect Producers, Consumers and Pipes to compose larger Pipe functions.
printNaturals :: MonadIO m => Effect m ()
printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrint
Producer, Consumer, Pipe, and Effect types are all defined in terms of the general Proxy type. Therefore >-> (opens new window) can be used for a variety of purposes. Types defined by the left argument must match the type consumed by the right argument:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r
(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r
(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r
(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
# Pipes
Pipes (opens new window) can both await and yield.
type Pipe a b = Proxy () a () b
This Pipe awaits an Int and converts it to a String:
intToStr :: Monad m => Pipe Int String m ()
intToStr = forever $ await >>= (yield . show)
# Running Pipes with runEffect
We use runEffect (opens new window) to run our Pipe:
main :: IO ()
main = do
runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrint
Note that runEffect requires an Effect, which is a self-contained Proxy with no inputs or outputs:
runEffect :: Monad m => Effect m r -> m r
type Effect = Proxy X () () X
(where X is the empty type, also known as Void).
# Consumers
A Consumer can only await values from upstream.
type Consumer a = Proxy () a () X
await :: Monad m => Consumer a m a
For example:
fancyPrint :: MonadIO m => Consumer String m ()
fancyPrint = forever $ do
numStr <- await
liftIO $ putStrLn ("I received: " ++ numStr)
# The Proxy monad transformer
pipes's core data type is the Proxy monad transformer. Pipe, Producer, Consumer and so on are defined in terms of Proxy.
Since Proxy is a monad transformer, definitions of Pipes take the form of monadic scripts which await and yield values, additionally performing effects from the base monad m.
# Combining Pipes and Network communication
Pipes supports simple binary communication between a client and a server
In this example:
- a client connects and sends a
FirstMessage - the server receives and answers
DoSomething 0 - the client receives and answers
DoNothing - step 2 and 3 are repeated indefinitely
The command data type exchanged over the network:
-- Command.hs
{-# LANGUAGE DeriveGeneric #-}
module Command where
import Data.Binary
import GHC.Generics (Generic)
data Command = FirstMessage
| DoNothing
| DoSomething Int
deriving (Show,Generic)
instance Binary Command
Here, the server waits for a client to connect:
module Server where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Command as C
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as PipesPrelude
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.map
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "received message = " ++ (show c)
return $ C.DoSomething 0
-- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()
main = PNT.serve (PNT.Host "127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Remote connection from ip = " ++ (show remoteAddress)
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
-- if we want to use the pureHandler
--commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket
return ()
The client connects thus:
module Client where
import Pipes
import qualified Pipes.Binary as PipesBinary
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Prelude as PipesPrelude
import qualified Pipes.Parse as PP
import qualified Command as C
pageSize :: Int
pageSize = 4096
-- pure handler, to be used with PipesPrelude.amp
pureHandler :: C.Command -> C.Command
pureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapM
sideffectHandler :: MonadIO m => C.Command -> m C.Command
sideffectHandler c = do
liftIO $ putStrLn $ "Received: " ++ (show c)
return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()
main = PNT.connect ("127.0.0.1") "23456" $
\(connectionSocket, remoteAddress) -> do
putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress)
sendFirstMessage connectionSocket
_ <- runEffect $ do
let bytesReceiver = PNT.fromSocket connectionSocket pageSize
let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver
commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket
return ()
sendFirstMessage :: PNT.Socket -> IO ()
sendFirstMessage s = do
_ <- runEffect $ do
let encodedProducer = PipesBinary.encode C.FirstMessage
encodedProducer >-> PNT.toSocket s
return ()
# Remarks
As the hackage page (opens new window) describes:
pipes is a clean and powerful stream processing library that lets you build and connect reusable streaming components
Programs implemented through streaming can often be succinct and composable, with simple, short functions allowing you to "slot in or out" features easily with the backing of the Haskell type system.
await :: Monad m => Consumer' a m a
Pulls a value from upstream, where a is our input type.
yield :: Monad m => a -> Producer' a m ()
Produce a value, where a is the output type.
It's highly recommended you read through the embedded Pipes.Tutorial (opens new window) package which gives an excellent overview of the core concepts of Pipes and how Producer, Consumer and Effect interact.