hbs2/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs

265 lines
6.4 KiB
Haskell

{-# Language UndecidableInstances #-}
module HBS2.Actors.ChunkWriter
( ChunkWriter
, newChunkWriterIO
, runChunkWriter
, stopChunkWriter
, delBlock
, commitBlock
, writeChunk
, getHash
, blocksInProcess
) where
import HBS2.Prelude
import HBS2.Actors
import HBS2.Hash
import HBS2.Storage
import HBS2.Defaults
import HBS2.Net.Proto.Sessions
import Data.Functor
import Data.ByteString.Lazy (ByteString)
import Prettyprinter
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar as TV
import Data.Typeable
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
class ( Eq salt
, Eq (Hash h)
, Hashable salt
, Hashable (Hash h)
, Typeable salt
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
instance ( Hashable salt
, Typeable salt
, Eq salt
, Eq (Hash h)
, Hashable (Hash h)
, Typeable (Hash h)
, Hashed h ByteString
) => ChunkKey salt h
data Chunk h = P (IntMap ByteString)
| S ByteString
instance Hashed h ByteString => Monoid (Chunk h) where
mempty = P mempty
instance Hashed h ByteString => Semigroup (Chunk h) where
(<>) (P a) (P b) = P ( a <> b )
(<>) (S s1) (S s2) = S s3
where
s3 = s1 <> s2
(<>) p@(P{}) (S s) = S s3
where
(S s1) = toS p
s3 = s1 <> s
(<>) (S s) p@(P{}) = S s3
where
(S s1) = toS p
s3 = s <> s1
mkP :: Offset -> ByteString -> Chunk h
mkP o b = P (IntMap.singleton (fromIntegral o) b)
toS :: Hashed h ByteString => Chunk h -> Chunk h
toS s@(S{}) = s
toS (P xs) = S s
where
s = mconcat $ IntMap.elems xs
data ChunkWriter h m = forall a . ( MonadIO m
, Storage a h ByteString m
, Block ByteString ~ ByteString
) =>
ChunkWriter
{ stopped :: TVar Bool
, pipeline :: Pipeline IO ()
, storage :: a
, perBlock :: !(TVar (HashMap SKey (Chunk h)))
}
-- FIXME: delete lost blocks!
blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int
blocksInProcess cw = do
liftIO $ readTVarIO (perBlock cw) <&> HashMap.size
runChunkWriter :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h IO -> m ()
runChunkWriter = runChunkWriter2
runChunkWriter2 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h IO -> m ()
runChunkWriter2 w = do
-- liftIO $ createDirectoryIfMissing True ( dir w )
let tv = perBlock w
liftIO $ runPipeline (pipeline w)
-- fix \next -> do
-- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys)
-- liftIO $ forConcurrently_ keys $ \f -> flush w f
-- pause ( 1.00 :: Timeout 'Seconds)
-- next
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = do
liftIO $ atomically $ writeTVar (stopped w) True
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
, Storage a h ByteString m
, Block ByteString ~ ByteString
, MonadIO m
)
=> a
-> Maybe FilePath
-> m (ChunkWriter h m)
newChunkWriterIO s _ = do
pip <- newPipeline defChunkWriterQ
mt <- liftIO $ newTVarIO mempty
running <- liftIO $ newTVarIO False
pure $
ChunkWriter
{ stopped = running
, pipeline = pip
, storage = s
, perBlock = mt
}
delBlock :: (MonadIO m, ChunkKey salt h, Pretty (Hash h))
=> ChunkWriter h IO
-> salt
-> Hash h
-> m ()
delBlock w salt h = liftIO do
let k = newSKey (salt, h)
let cache = perBlock w
liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k
writeChunk :: ( ChunkKey salt h
, MonadIO m
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk = writeChunk2
getHash :: forall salt h m .
( ChunkKey salt h
, m ~ IO
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Maybe (Hash h))
getHash = getHash2
commitBlock :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, Block ByteString ~ ByteString
, m ~ IO
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock = commitBlock2
writeChunk2 :: (ChunkKey salt h, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk2 w salt h o !bs = do
let cache = perBlock w
let k = newSKey (salt, h)
liftIO $ do
atomically $ modifyTVar cache (HashMap.insertWith (<>) k (mkP o bs) )
getHash2 :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, m ~ IO
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h IO
-> salt
-> Hash h
-> m (Maybe (Hash h))
getHash2 w salt h = do
let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of
Just (S s) -> pure (Just (hashObject s))
_ -> pure Nothing
commitBlock2 :: forall salt h m .
( ChunkKey salt h
, Hashed h ByteString
, Block ByteString ~ ByteString
, m ~ IO
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of
Just (S s) -> void $ putBlock stor s >> delBlock w salt h
_ -> pure () -- FIXME: error