{-|
Module : Tubes
Description : Stream processing with a series of tubes
Maintainer : gatlin@niltag.net
Stability : experimental

My interest in stream processing was re-ignited by the excellent project
@https://github.com/iokasimov/pipeline@

Pipeline casts the Pipes / Conduit-style of programming in terms of 'CPS'.

Since I have my own special 'CPS' I have plagiarized Pipeline's approach.
My goal is to determine how this relates to 'Orc' and then have one module
subsume the other (or learn a valuable reason for why not).
-}

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE StrictData #-}

module Tubes
  ( -- * A series of tubes
    -- ** Construction and evaluation
    Series
  , deliver
  , yield
  , await
  , finish
  , embed
  , Generator
  , Async
  , AsyncGenerator
    -- ** Combination
  , (><)
    -- * Utility
  , Tube(..)
  , Source(..)
  , Sink(..)
  , pause
  , suspend )
where

import Data.Void (Void)
import LCPS (CPS(..))

-- | The head of a stream processing series.
newtype Source i t r = Source { forall i (t :: * -> *) r. Source i t r -> Sink i t r -> t r
play  :: Sink i t r -> t r }

-- | The reservoir at the end of a stream processing pipeline.
newtype Sink   o t r = Sink   { forall o (t :: * -> *) r. Sink o t r -> o -> Source o t r -> t r
resume :: o -> Source o t r -> t r }

-- | An intermediate link in the series of tubes.
newtype Tube i o r t a = Tube { forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow :: Source i t r -> Sink o t r -> t r }
  deriving (forall a b. (a -> b) -> Tube i o r t a -> Tube i o r t b
forall i o r (t :: * -> *) a b.
a -> Tube i o r t b -> Tube i o r t a
forall i o r (t :: * -> *) a b.
(a -> b) -> Tube i o r t a -> Tube i o r t b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Tube i o r t b -> Tube i o r t a
$c<$ :: forall i o r (t :: * -> *) a b.
a -> Tube i o r t b -> Tube i o r t a
fmap :: forall a b. (a -> b) -> Tube i o r t a -> Tube i o r t b
$cfmap :: forall i o r (t :: * -> *) a b.
(a -> b) -> Tube i o r t a -> Tube i o r t b
Functor)

-- | A 'Series' of 'Tube's is the (delimited) continuation embedding into some
-- base type @t :: * -> *@.
-- 'Series' may be connected into, well, a series via the '(><)' operator.
-- When the series finishes evaluating it will result in a value of type @r@.
type Series i o t a r = CPS r (Tube i o r t) a
type Generator t o = Series Void o t () ()
type Async t i  = Series i Void t () ()
type AsyncGenerator t i o = Series i o t () ()

-- | A covariant mapping of a 'Source i t r' into 'Source o t r'.
pause :: (() -> Tube i o r t a) -> Source i t r -> Source o t r
pause :: forall i o r (t :: * -> *) a.
(() -> Tube i o r t a) -> Source i t r -> Source o t r
pause () -> Tube i o r t a
next Source i t r
ik = forall i (t :: * -> *) r. (Sink i t r -> t r) -> Source i t r
Source forall a b. (a -> b) -> a -> b
$ \Sink o t r
ok -> (forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow forall a b. (a -> b) -> a -> b
$! () -> Tube i o r t a
next ()) Source i t r
ik Sink o t r
ok

-- | A contravariant mapping of a 'Sink o t r' into a 'Sink i t r'.
suspend :: (i -> Tube i o r t a) -> Sink o t r -> Sink i t r
suspend :: forall i o r (t :: * -> *) a.
(i -> Tube i o r t a) -> Sink o t r -> Sink i t r
suspend i -> Tube i o r t a
next Sink o t r
ok = forall o (t :: * -> *) r. (o -> Source o t r -> t r) -> Sink o t r
Sink forall a b. (a -> b) -> a -> b
$ \(!i
v) Source i t r
ik -> let !next_v :: Tube i o r t a
next_v = i -> Tube i o r t a
next i
v in forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow Tube i o r t a
next_v Source i t r
ik Sink o t r
ok

-- | Blocking-wait for an upstream value from the 'Series'.
await :: Series i o t i r
await :: forall i o (t :: * -> *) r. Series i o t i r
await = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \(!i -> Tube i o r t r
k) -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t r
ik Sink o t r
ok ->
  let !suspended :: Sink i t r
suspended = forall i o r (t :: * -> *) a.
(i -> Tube i o r t a) -> Sink o t r -> Sink i t r
suspend i -> Tube i o r t r
k Sink o t r
ok
      !playing :: t r
playing = forall i (t :: * -> *) r. Source i t r -> Sink i t r -> t r
play Source i t r
ik Sink i t r
suspended
  in  t r
playing

-- | Yield a value downstream in the 'Series'.
yield :: o -> Series i o t () r
yield :: forall o i (t :: * -> *) r. o -> Series i o t () r
yield !o
v = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \(!() -> Tube i o r t r
k) -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t r
ik Sink o t r
ok ->
  let !paused :: Source o t r
paused = forall i o r (t :: * -> *) a.
(() -> Tube i o r t a) -> Source i t r -> Source o t r
pause () -> Tube i o r t r
k Source i t r
ik
      !resumed :: t r
resumed = forall o (t :: * -> *) r. Sink o t r -> o -> Source o t r -> t r
resume Sink o t r
ok o
v Source o t r
paused
  in  t r
resumed

-- | A 'Series' which does nothing.
finish :: Monad t => Series i o t () ()
finish :: forall (t :: * -> *) i o. Monad t => Series i o t () ()
finish = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \() -> Tube i o () t ()
_ -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t ()
_ Sink o t ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Embeds a value with side effects into an appropriate 'Series'.
embed :: Monad t => t a -> Series i o t a ()
embed :: forall (t :: * -> *) a i o. Monad t => t a -> Series i o t a ()
embed !t a
e = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \a -> Tube i o () t ()
next -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t ()
ik Sink o t ()
ok -> t a
e forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(!a
v) ->
  let !next_v :: Tube i o () t ()
next_v = a -> Tube i o () t ()
next a
v
      !flowing :: t ()
flowing = forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow Tube i o () t ()
next_v Source i t ()
ik Sink o t ()
ok
  in  t ()
flowing

-- | Constructs a 'Series' of 'Tube's.
(><)
  :: forall i e a o t. Monad t
  => Series i e t () ()
  -> Series e o t () ()
  -> Series i o t a ()
~(CPS !(() -> Tube i e () t ()) -> Tube i e () t ()
p) >< :: forall i e a o (t :: * -> *).
Monad t =>
Series i e t () () -> Series e o t () () -> Series i o t a ()
>< ~(CPS !(() -> Tube e o () t ()) -> Tube e o () t ()
q) = forall {k} answer (m :: k -> *) (result :: k).
((answer -> m result) -> m result) -> CPS result m answer
CPS forall a b. (a -> b) -> a -> b
$ \a -> Tube i o () t ()
_ -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t ()
ik Sink o t ()
ok ->
  forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow Tube e o () t ()
q_end (forall i o r (t :: * -> *) a.
(() -> Tube i o r t a) -> Source i t r -> Source o t r
pause (\() -> let !p_end :: Tube i e () t ()
p_end = (() -> Tube i e () t ()) -> Tube i e () t ()
p forall b c d. b -> Tube c d () t ()
end in Tube i e () t ()
p_end) Source i t ()
ik) Sink o t ()
ok where

  end :: b -> Tube c d () t ()
  end :: forall b c d. b -> Tube c d () t ()
end b
_ = forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source c t ()
_ Sink d t ()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  !q_end :: Tube e o () t ()
q_end = (() -> Tube e o () t ()) -> Tube e o () t ()
q forall b c d. b -> Tube c d () t ()
end

-- | "...deliver[s] vast amounts of information" from a 'Series' of tubes. :)
deliver
  :: Monad t
  => Series i o t r r
  -> t r
deliver :: forall (t :: * -> *) i o r. Monad t => Series i o t r r -> t r
deliver ~(CPS !(r -> Tube i o r t r) -> Tube i o r t r
p) = forall i o r (t :: * -> *) a.
Tube i o r t a -> Source i t r -> Sink o t r -> t r
flow ((r -> Tube i o r t r) -> Tube i o r t r
p (\(!r
r) -> forall i o r (t :: * -> *) a.
(Source i t r -> Sink o t r -> t r) -> Tube i o r t a
Tube forall a b. (a -> b) -> a -> b
$ \Source i t r
_ Sink o t r
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r)) forall i (t :: * -> *) r. Source i t r
i forall o (t :: * -> *) r. Sink o t r
o where
  i :: Source i t r
  !i :: forall i (t :: * -> *) r. Source i t r
i = forall i (t :: * -> *) r. (Sink i t r -> t r) -> Source i t r
Source forall a b. (a -> b) -> a -> b
$ \(!Sink i t r
o') -> forall i (t :: * -> *) r. Source i t r -> Sink i t r -> t r
play forall i (t :: * -> *) r. Source i t r
i Sink i t r
o'

  o :: Sink o t r
  !o :: forall o (t :: * -> *) r. Sink o t r
o = forall o (t :: * -> *) r. (o -> Source o t r -> t r) -> Sink o t r
Sink forall a b. (a -> b) -> a -> b
$ \(!o
v) (!Source o t r
i') -> forall o (t :: * -> *) r. Sink o t r -> o -> Source o t r -> t r
resume forall o (t :: * -> *) r. Sink o t r
o o
v Source o t r
i'