--------------------------------------------------------------------------------

-- Copyright © 2009, Galois, Inc.
-- Copyright © 2018, DFINITY Stiftung
-- All rights reserved.
--
-- Redistribution and use in source and binary forms, with or without
-- modification, are permitted provided that the following conditions
-- are met:
--
--   * Redistributions of source code must retain the above copyright
--     notice, this list of conditions and the following disclaimer.
--   * Redistributions in binary form must reproduce the above copyright
--     notice, this list of conditions and the following disclaimer in
--     the documentation and/or other materials provided with the
--     distribution.
--   * Neither the name of the Galois, Inc. nor the names of its
--     contributors may be used to endorse or promote products derived
--     from this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-- "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-- LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-- FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-- COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-- INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
-- BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-- LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-- LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-- ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-- POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------------------

-- |
-- Module     : Control.Concurrent.Classy.BoundedChan
-- Copyright  : © 2009 Galois Inc.
--            , © 2018 DFINITY Stiftung
-- Maintainer : DFINITY USA Research <team@dfinity.org>
--
-- Implements bounded channels. These channels differ from normal 'Chan's in
-- that they are guaranteed to contain no more than a certain number of
-- elements. This is ideal when you may be writing to a channel faster than
-- you are able to read from it.
--
-- This module supports all the functions of "Control.Concurrent.Chan" except
-- 'unGetChan' and 'dupChan', which are not supported for bounded channels.
--
-- Extra consistency: This version enforces that if thread Alice writes
-- e1 followed by e2 then e1 will be returned by readBoundedChan before e2.
-- Conversely, if thead Bob reads e1 followed by e2 then it was true that
-- writeBoundedChan e1 preceded writeBoundedChan e2.
--
-- Previous versions did not enforce this consistency: if writeBoundedChan were
-- preempted between putMVars or killThread arrived between putMVars then it
-- can fail.  Similarly it might fail if readBoundedChan were stopped after putMVar
-- and before the second takeMVar.  An unlucky pattern of several such deaths
-- might actually break the invariants of the array in an unrecoverable way
-- causing all future reads and writes to block.

--------------------------------------------------------------------------------

module Control.Concurrent.Classy.BoundedChan
  ( BoundedChan
  , newBoundedChan
  , writeBoundedChan
  , trywriteBoundedChan
  , readBoundedChan
  , tryreadBoundedChan
  , isEmptyBoundedChan
  , writeList2BoundedChan
  ) where

--------------------------------------------------------------------------------

import           Control.Monad                  (replicateM)
import           Data.Array                     (Array, listArray, (!))

import qualified Control.Concurrent.Classy.MVar as MVar
import           Control.Monad.Catch            (mask_, onException)
import           Control.Monad.Conc.Class       (MonadConc(MVar))

--------------------------------------------------------------------------------

-- | A 'BoundedChan' is an abstract data type representing a bounded channel.
--
-- @since 1.6.2.0
data BoundedChan m a
  = BoundedChan
    { forall (m :: * -> *) a. BoundedChan m a -> Int
_size     :: Int
    , forall (m :: * -> *) a. BoundedChan m a -> Array Int (MVar m a)
_contents :: Array Int (MVar m a)
    , forall (m :: * -> *) a. BoundedChan m a -> MVar m Int
_writePos :: MVar m Int
    , forall (m :: * -> *) a. BoundedChan m a -> MVar m Int
_readPos  :: MVar m Int
    }
  deriving ()

-- TODO: check if the fields of BoundedChan could be strict / unpacked

--------------------------------------------------------------------------------

-- Versions of modifyMVar and withMVar that do not 'restore' the previous mask
-- state when running 'io', with added modification strictness.
-- The lack of 'restore' may make these perform better than the normal version.
-- Moving strictness here makes using them more pleasant.

{-# INLINE modifyMVarMask #-}
modifyMVarMask :: (MonadConc m) => MVar m a -> (a -> m (a, b)) -> m b
modifyMVarMask :: forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
modifyMVarMask MVar m a
m a -> m (a, b)
callback = forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ forall a b. (a -> b) -> a -> b
$ do
  a
a <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
MVar.takeMVar MVar m a
m
  (a
a', b
b) <- a -> m (a, b)
callback a
a forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m a
a
  forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m forall a b. (a -> b) -> a -> b
$! a
a'
  forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

{-# INLINE modifyMVarMask_ #-}
modifyMVarMask_ :: (MonadConc m) => MVar m a -> (a -> m a) -> m ()
modifyMVarMask_ :: forall (m :: * -> *) a.
MonadConc m =>
MVar m a -> (a -> m a) -> m ()
modifyMVarMask_ MVar m a
m a -> m a
callback =
  forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ forall a b. (a -> b) -> a -> b
$ do
    a
a <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
MVar.takeMVar MVar m a
m
    a
a' <- a -> m a
callback a
a forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m a
a
    forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m forall a b. (a -> b) -> a -> b
$! a
a'

{-# INLINE withMVarMask #-}
withMVarMask :: (MonadConc m) => MVar m a -> (a -> m b) -> m b
withMVarMask :: forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m b) -> m b
withMVarMask MVar m a
m a -> m b
callback =
  forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ forall a b. (a -> b) -> a -> b
$ do
    a
a <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
MVar.takeMVar MVar m a
m
    b
b <- a -> m b
callback a
a forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m a
a
    forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar MVar m a
m a
a
    forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

--------------------------------------------------------------------------------

-- |
-- @newBoundedChan n@ returns a channel than can contain no more than @n@
-- elements.
--
-- @since 1.6.2.0
newBoundedChan :: (MonadConc m) => Int -> m (BoundedChan m a)
newBoundedChan :: forall (m :: * -> *) a. MonadConc m => Int -> m (BoundedChan m a)
newBoundedChan Int
x = do
  [MVar m a]
entls <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
x forall (m :: * -> *) a. MonadConc m => m (MVar m a)
MVar.newEmptyMVar
  MVar m Int
wpos  <- forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
MVar.newMVar Int
0
  MVar m Int
rpos  <- forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
MVar.newMVar Int
0
  let entries :: Array Int (MVar m a)
entries = forall i e. Ix i => (i, i) -> [e] -> Array i e
listArray (Int
0, Int
x forall a. Num a => a -> a -> a
- Int
1) [MVar m a]
entls
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a.
Int
-> Array Int (MVar m a)
-> MVar m Int
-> MVar m Int
-> BoundedChan m a
BoundedChan Int
x Array Int (MVar m a)
entries MVar m Int
wpos MVar m Int
rpos)

-- |
-- Write an element to the channel. If the channel is full, this routine will
-- block until it is able to write. Blockers wait in a fair FIFO queue.
--
-- @since 1.6.2.0
writeBoundedChan :: (MonadConc m) => BoundedChan m a -> a -> m ()
writeBoundedChan :: forall (m :: * -> *) a. MonadConc m => BoundedChan m a -> a -> m ()
writeBoundedChan (BoundedChan Int
size Array Int (MVar m a)
contents MVar m Int
wposMV MVar m Int
_) a
x =
  forall (m :: * -> *) a.
MonadConc m =>
MVar m a -> (a -> m a) -> m ()
modifyMVarMask_ MVar m Int
wposMV forall a b. (a -> b) -> a -> b
$ \Int
wpos -> do
    forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
MVar.putMVar (Array Int (MVar m a)
contents forall i e. Ix i => Array i e -> i -> e
! Int
wpos) a
x
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Enum a => a -> a
succ Int
wpos forall a. Integral a => a -> a -> a
`mod` Int
size) -- only advance when putMVar succeeds

-- |
-- A variant of 'writeBoundedChan' which, instead of blocking when the channel is
-- full, simply aborts and does not write the element. Note that this routine
-- can still block while waiting for write access to the channel.
--
-- @since 1.6.2.0
trywriteBoundedChan :: (MonadConc m) => BoundedChan m a -> a -> m Bool
trywriteBoundedChan :: forall (m :: * -> *) a.
MonadConc m =>
BoundedChan m a -> a -> m Bool
trywriteBoundedChan (BoundedChan Int
size Array Int (MVar m a)
contents MVar m Int
wposMV MVar m Int
_) a
x =
  forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
modifyMVarMask MVar m Int
wposMV forall a b. (a -> b) -> a -> b
$ \Int
wpos -> do
    Bool
success <- forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m Bool
MVar.tryPutMVar (Array Int (MVar m a)
contents forall i e. Ix i => Array i e -> i -> e
! Int
wpos) a
x
    -- only advance when putMVar succeeds
    let wpos' :: Int
wpos' = if Bool
success then forall a. Enum a => a -> a
succ Int
wpos forall a. Integral a => a -> a -> a
`mod` Int
size else Int
wpos
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
wpos', Bool
success)

-- |
-- Read an element from the channel. If the channel is empty, this routine
-- will block until it is able to read. Blockers wait in a fair FIFO queue.
--
-- @since 1.6.2.0
readBoundedChan :: (MonadConc m) => BoundedChan m a -> m a
readBoundedChan :: forall (m :: * -> *) a. MonadConc m => BoundedChan m a -> m a
readBoundedChan (BoundedChan Int
size Array Int (MVar m a)
contents MVar m Int
_ MVar m Int
rposMV) =
  forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
modifyMVarMask MVar m Int
rposMV forall a b. (a -> b) -> a -> b
$ \Int
rpos -> do
    a
a <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
MVar.takeMVar (Array Int (MVar m a)
contents forall i e. Ix i => Array i e -> i -> e
! Int
rpos)
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. Enum a => a -> a
succ Int
rpos forall a. Integral a => a -> a -> a
`mod` Int
size, a
a) -- only advance when takeMVar succeeds

-- |
-- A variant of 'readBoundedChan' which, instead of blocking when the channel is
-- empty, immediately returns 'Nothing'. Otherwise, 'tryreadBoundedChan' returns
-- @'Just' a@ where @a@ is the element read from the channel. Note that this
-- routine can still block while waiting for read access to the channel.
--
-- @since 1.6.2.0
tryreadBoundedChan :: (MonadConc m) => BoundedChan m a -> m (Maybe a)
tryreadBoundedChan :: forall (m :: * -> *) a.
MonadConc m =>
BoundedChan m a -> m (Maybe a)
tryreadBoundedChan (BoundedChan Int
size Array Int (MVar m a)
contents MVar m Int
_ MVar m Int
rposMV) =
  forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
modifyMVarMask MVar m Int
rposMV forall a b. (a -> b) -> a -> b
$ \Int
rpos -> do
    Maybe a
ma <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m (Maybe a)
MVar.tryTakeMVar (Array Int (MVar m a)
contents forall i e. Ix i => Array i e -> i -> e
! Int
rpos)
    -- only advance when takeMVar succeeds
    let rpos' :: Int
rpos' = case Maybe a
ma of
                  Just a
_  -> forall a. Enum a => a -> a
succ Int
rpos forall a. Integral a => a -> a -> a
`mod` Int
size
                  Maybe a
Nothing -> Int
rpos
    forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
rpos', Maybe a
ma)

-- |
-- Returns 'True' if the supplied channel is empty.
--
-- NOTE: This may block on an empty channel if there is a blocked reader.
-- NOTE: This function is deprecated.
--
-- @since 1.6.2.0
{-# DEPRECATED isEmptyBoundedChan
               "This isEmptyBoundedChan can block, no non-blocking substitute yet" #-}
isEmptyBoundedChan :: (MonadConc m) => BoundedChan m a -> m Bool
isEmptyBoundedChan :: forall (m :: * -> *) a. MonadConc m => BoundedChan m a -> m Bool
isEmptyBoundedChan (BoundedChan Int
_ Array Int (MVar m a)
contents MVar m Int
_ MVar m Int
rposMV) =
  forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m b) -> m b
withMVarMask MVar m Int
rposMV forall a b. (a -> b) -> a -> b
$ \Int
rpos ->
    forall (m :: * -> *) a. MonadConc m => MVar m a -> m Bool
MVar.isEmptyMVar (Array Int (MVar m a)
contents forall i e. Ix i => Array i e -> i -> e
! Int
rpos)

-- |
-- Write a list of elements to the channel.
-- If the channel becomes full, this routine will block until it can write.
-- Competing writers may interleave with this one.
--
-- @since 1.6.2.0
writeList2BoundedChan :: (MonadConc m) => BoundedChan m a -> [a] -> m ()
writeList2BoundedChan :: forall (m :: * -> *) a.
MonadConc m =>
BoundedChan m a -> [a] -> m ()
writeList2BoundedChan = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadConc m => BoundedChan m a -> a -> m ()
writeBoundedChan

--------------------------------------------------------------------------------