Pipes
Producers
Section titled “Producers”A Producer is some monadic action that can yield values for downstream consumption:
type Producer b = Proxy X () () byield :: Monad m => a -> Producer a m ()For example:
naturals :: Monad m => Producer Int m ()naturals = each [1..] -- each is a utility function exported by PipesWe can of course have Producers that are functions of other values too:
naturalsUntil :: Monad m => Int -> Producer Int m ()naturalsUntil n = each [1..n]Connecting Pipes
Section titled “Connecting Pipes”Use >-> to connect Producers, Consumers and Pipes to compose larger Pipe functions.
printNaturals :: MonadIO m => Effect m ()printNaturals = naturalsUntil 10 >-> intToStr >-> fancyPrintProducer, Consumer, Pipe, and Effect types are all defined in terms of the general Proxy type. Therefore >-> can be used for a variety of purposes. Types defined by the left argument must match the type consumed by the right argument:
(>->) :: Monad m => Producer b m r -> Consumer b m r -> Effect m r(>->) :: Monad m => Producer b m r -> Pipe b c m r -> Producer c m r(>->) :: Monad m => Pipe a b m r -> Consumer b m r -> Consumer a m r(>->) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m rPipes can both await and yield.
type Pipe a b = Proxy () a () bThis Pipe awaits an Int and converts it to a String:
intToStr :: Monad m => Pipe Int String m ()intToStr = forever $ await >>= (yield . show)Running Pipes with runEffect
Section titled “Running Pipes with runEffect”We use runEffect to run our Pipe:
main :: IO ()main = do runEffect $ naturalsUntil 10 >-> intToStr >-> fancyPrintNote that runEffect requires an Effect, which is a self-contained Proxy with no inputs or outputs:
runEffect :: Monad m => Effect m r -> m rtype Effect = Proxy X () () X(where X is the empty type, also known as Void).
Consumers
Section titled “Consumers”A Consumer can only await values from upstream.
type Consumer a = Proxy () a () Xawait :: Monad m => Consumer a m aFor example:
fancyPrint :: MonadIO m => Consumer String m ()fancyPrint = forever $ do numStr <- await liftIO $ putStrLn ("I received: " ++ numStr)The Proxy monad transformer
Section titled “The Proxy monad transformer”pipes’s core data type is the Proxy monad transformer. Pipe, Producer, Consumer and so on are defined in terms of Proxy.
Since Proxy is a monad transformer, definitions of Pipes take the form of monadic scripts which await and yield values, additionally performing effects from the base monad m.
Combining Pipes and Network communication
Section titled “Combining Pipes and Network communication”Pipes supports simple binary communication between a client and a server
In this example:
- a client connects and sends a
FirstMessage - the server receives and answers
DoSomething 0 - the client receives and answers
DoNothing - step 2 and 3 are repeated indefinitely
The command data type exchanged over the network:
-- Command.hs{-# LANGUAGE DeriveGeneric #-}module Command whereimport Data.Binaryimport GHC.Generics (Generic)
data Command = FirstMessage | DoNothing | DoSomething Int deriving (Show,Generic)
instance Binary CommandHere, the server waits for a client to connect:
module Server where
import Pipesimport qualified Pipes.Binary as PipesBinaryimport qualified Pipes.Network.TCP as PNTimport qualified Command as Cimport qualified Pipes.Parse as PPimport qualified Pipes.Prelude as PipesPrelude
pageSize :: IntpageSize = 4096
-- pure handler, to be used with PipesPrelude.mappureHandler :: C.Command -> C.CommandpureHandler c = c -- answers the same command that we have receveid
-- impure handler, to be used with PipesPremude.mapMsideffectHandler :: MonadIO m => C.Command -> m C.CommandsideffectHandler c = do liftIO $ putStrLn $ "received message = " ++ (show c) return $ C.DoSomething 0 -- whatever incoming command `c` from the client, answer DoSomething 0
main :: IO ()main = PNT.serve (PNT.Host "127.0.0.1") "23456" $ \(connectionSocket, remoteAddress) -> do putStrLn $ "Remote connection from ip = " ++ (show remoteAddress) _ <- runEffect $ do let bytesReceiver = PNT.fromSocket connectionSocket pageSize let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket -- if we want to use the pureHandler --commandDecoder >-> PipesPrelude.map pureHandler >-> for cat PipesBinary.Encode >-> PNT.toSocket connectionSocket return ()The client connects thus:
module Client where
import Pipesimport qualified Pipes.Binary as PipesBinaryimport qualified Pipes.Network.TCP as PNTimport qualified Pipes.Prelude as PipesPreludeimport qualified Pipes.Parse as PPimport qualified Command as C
pageSize :: IntpageSize = 4096
-- pure handler, to be used with PipesPrelude.amppureHandler :: C.Command -> C.CommandpureHandler c = c -- answer the same command received from the server
-- inpure handler, to be used with PipesPremude.mapMsideffectHandler :: MonadIO m => C.Command -> m C.CommandsideffectHandler c = do liftIO $ putStrLn $ "Received: " ++ (show c) return C.DoNothing -- whatever is received from server, answer DoNothing
main :: IO ()main = PNT.connect ("127.0.0.1") "23456" $ \(connectionSocket, remoteAddress) -> do putStrLn $ "Connected to distant server ip = " ++ (show remoteAddress) sendFirstMessage connectionSocket _ <- runEffect $ do let bytesReceiver = PNT.fromSocket connectionSocket pageSize let commandDecoder = PP.parsed PipesBinary.decode bytesReceiver commandDecoder >-> PipesPrelude.mapM sideffectHandler >-> for cat PipesBinary.encode >-> PNT.toSocket connectionSocket return ()
sendFirstMessage :: PNT.Socket -> IO ()sendFirstMessage s = do _ <- runEffect $ do let encodedProducer = PipesBinary.encode C.FirstMessage encodedProducer >-> PNT.toSocket s return ()Remarks
Section titled “Remarks”As the hackage page describes:
pipes is a clean and powerful stream processing library that lets you build and connect reusable streaming components
Programs implemented through streaming can often be succinct and composable, with simple, short functions allowing you to “slot in or out” features easily with the backing of the Haskell type system.
await :: Monad m => Consumer' a m a
Pulls a value from upstream, where a is our input type.
yield :: Monad m => a -> Producer' a m ()
Produce a value, where a is the output type.
It’s highly recommended you read through the embedded Pipes.Tutorial package which gives an excellent overview of the core concepts of Pipes and how Producer, Consumer and Effect interact.