mirror of https://github.com/voidlizard/hbs2
343 lines
8.5 KiB
Haskell
343 lines
8.5 KiB
Haskell
module HBS2.Actors.ChunkWriter
|
|
( ChunkWriter
|
|
, ChunkId
|
|
, newChunkWriterIO
|
|
, runChunkWriter
|
|
, stopChunkWriter
|
|
, delBlock
|
|
, commitBlock
|
|
, writeChunk
|
|
, getHash
|
|
, blocksInProcess
|
|
) where
|
|
|
|
import HBS2.Prelude
|
|
import HBS2.Actors
|
|
import HBS2.Hash
|
|
import HBS2.Storage
|
|
import HBS2.Defaults
|
|
import HBS2.Clock
|
|
|
|
import Control.Monad.Trans.Maybe
|
|
import Data.List qualified as L
|
|
import Data.Functor
|
|
import Data.Function
|
|
import Control.Exception
|
|
import Data.ByteString.Lazy (ByteString)
|
|
import Data.ByteString.Lazy qualified as B
|
|
import Data.ByteString qualified as BS
|
|
-- import Data.Cache (Cache)
|
|
-- import Data.Cache qualified as Cache
|
|
import Data.Foldable
|
|
import Data.Traversable
|
|
import Data.Hashable (hash)
|
|
import Data.Maybe
|
|
import Data.Word
|
|
import Prettyprinter
|
|
import System.Directory
|
|
import System.FilePath
|
|
import System.IO.Error
|
|
import System.IO
|
|
import System.IO.Temp
|
|
import System.FileLock
|
|
|
|
import Control.Concurrent.Async
|
|
|
|
import Control.Monad.Except
|
|
import Control.Monad
|
|
import Data.Cache (Cache)
|
|
import Data.Cache qualified as Cache
|
|
import Control.Concurrent.STM
|
|
import Control.Concurrent.STM.TVar as TV
|
|
import Control.Concurrent.STM.TBQueue qualified as Q
|
|
import Control.Concurrent.STM.TSem qualified as Sem
|
|
import Control.Concurrent.STM.TSem (TSem)
|
|
|
|
import Control.Concurrent.MVar as MVar
|
|
|
|
import Control.Concurrent.STM.TQueue qualified as Q0
|
|
import Control.Concurrent
|
|
|
|
import Data.HashMap.Strict (HashMap)
|
|
import Data.HashMap.Strict qualified as HashMap
|
|
|
|
--
|
|
--
|
|
--TODO: cache file handles
|
|
|
|
newtype ChunkId = ChunkId FilePath
|
|
deriving newtype (IsString)
|
|
deriving stock (Eq,Ord,Show)
|
|
|
|
data ChunkWriter h m = forall a . ( MonadIO m
|
|
, Storage a h ByteString m
|
|
, Block ByteString ~ ByteString
|
|
) =>
|
|
ChunkWriter
|
|
{ stopped :: TVar Bool
|
|
, pipeline :: Pipeline IO ()
|
|
, dir :: FilePath
|
|
, storage :: a
|
|
, perBlock :: !(TVar (HashMap FilePath [Handle -> IO ()]))
|
|
, perBlockLock :: !(TVar (HashMap FilePath TSem))
|
|
}
|
|
|
|
|
|
blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int
|
|
blocksInProcess cw = do
|
|
liftIO $ readTVarIO (perBlock cw) <&> HashMap.size
|
|
|
|
runChunkWriter :: forall h m . ( Eq (Hash h)
|
|
, Hashable (Hash h)
|
|
, MonadIO m )
|
|
=> ChunkWriter h IO -> m ()
|
|
|
|
runChunkWriter = runChunkWriter2
|
|
|
|
|
|
runChunkWriter2 :: forall h m . ( Eq (Hash h)
|
|
, Hashable (Hash h)
|
|
, MonadIO m )
|
|
=> ChunkWriter h IO -> m ()
|
|
|
|
runChunkWriter2 w = do
|
|
liftIO $ createDirectoryIfMissing True ( dir w )
|
|
let tv = perBlock w
|
|
liftIO $ runPipeline (pipeline w)
|
|
-- fix \next -> do
|
|
-- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys)
|
|
-- liftIO $ forConcurrently_ keys $ \f -> flush w f
|
|
-- pause ( 1.00 :: Timeout 'Seconds)
|
|
-- next
|
|
|
|
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
|
|
stopChunkWriter w = do
|
|
liftIO $ atomically $ writeTVar (stopped w) True
|
|
|
|
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
|
|
, Storage a h ByteString m
|
|
, Block ByteString ~ ByteString
|
|
, MonadIO m
|
|
)
|
|
=> a
|
|
-> Maybe FilePath
|
|
-> m (ChunkWriter h m)
|
|
|
|
newChunkWriterIO s tmp = do
|
|
pip <- newPipeline defChunkWriterQ
|
|
|
|
def <- liftIO $ getXdgDirectory XdgData (defStorePath </> "temp-chunks")
|
|
let d = fromMaybe def tmp
|
|
|
|
mt <- liftIO $ newTVarIO mempty
|
|
mts <- liftIO $ newTVarIO mempty
|
|
|
|
running <- liftIO $ newTVarIO False
|
|
|
|
pure $
|
|
ChunkWriter
|
|
{ stopped = running
|
|
, pipeline = pip
|
|
, dir = d
|
|
, storage = s
|
|
, perBlock = mt
|
|
, perBlockLock = mts
|
|
}
|
|
|
|
makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath
|
|
makeFileName w salt h = dir w </> suff
|
|
where
|
|
suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h
|
|
|
|
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h))
|
|
=> ChunkWriter h IO -> salt -> Hash h -> m ()
|
|
|
|
delBlock w salt h = liftIO do
|
|
|
|
let cache = perBlock w
|
|
let se = perBlockLock w
|
|
|
|
-- lock <- getLock w fn
|
|
|
|
flush w fn
|
|
|
|
-- atomically $ Sem.waitTSem lock
|
|
|
|
void $ runExceptT $ liftIO $ removeFile fn
|
|
|
|
liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete fn
|
|
liftIO $ atomically $ TV.modifyTVar' se $ HashMap.delete fn
|
|
|
|
-- atomically $ Sem.signalTSem lock
|
|
|
|
where
|
|
fn = makeFileName w salt h
|
|
|
|
writeChunk :: ( Hashable salt
|
|
, MonadIO m
|
|
, Pretty (Hash h)
|
|
, Hashable (Hash h), Eq (Hash h)
|
|
)
|
|
=> ChunkWriter h m
|
|
-> salt
|
|
-> Hash h
|
|
-> Offset
|
|
-> ByteString -> m ()
|
|
|
|
writeChunk = writeChunk2
|
|
|
|
|
|
getHash :: forall salt h m .
|
|
( Hashable salt
|
|
, Hashed h ByteString
|
|
, m ~ IO
|
|
, Block ByteString ~ ByteString
|
|
, Pretty (Hash h)
|
|
, Hashable (Hash h), Eq (Hash h)
|
|
)
|
|
=> ChunkWriter h m
|
|
-> salt
|
|
-> Hash h
|
|
-> m (Maybe (Hash h))
|
|
|
|
getHash = getHash2
|
|
|
|
|
|
commitBlock :: forall salt h m .
|
|
( Hashable salt
|
|
, Hashed h ByteString
|
|
, Block ByteString ~ ByteString
|
|
, m ~ IO
|
|
, Pretty (Hash h)
|
|
, Hashable (Hash h), Eq (Hash h)
|
|
)
|
|
=> ChunkWriter h m
|
|
-> salt
|
|
-> Hash h
|
|
-> m ()
|
|
|
|
commitBlock = commitBlock2
|
|
|
|
writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h))
|
|
=> ChunkWriter h m
|
|
-> salt
|
|
-> Hash h
|
|
-> Offset
|
|
-> ByteString -> m ()
|
|
|
|
writeChunk2 w salt h o !bs = do
|
|
|
|
let cache = perBlock w
|
|
|
|
let action fh = do
|
|
void $ runExceptT $ liftIO $ do
|
|
hSeek fh AbsoluteSeek (fromIntegral o)
|
|
B.hPutStr fh bs -- (BS.copy (B.toStrict bs))
|
|
hFlush fh
|
|
|
|
liftIO $ do
|
|
atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action])
|
|
|
|
where
|
|
fn = makeFileName w salt h
|
|
|
|
getLock w fn = do
|
|
_lock <- atomically $ Sem.newTSem 1
|
|
let locks = perBlockLock w
|
|
atomically $ stateTVar locks $ \x ->
|
|
case HashMap.lookup fn x of
|
|
Nothing -> (_lock, HashMap.insert fn _lock x)
|
|
Just s -> (s, x)
|
|
|
|
flush :: ChunkWriter h IO -> FilePath -> IO ()
|
|
flush w fn = do
|
|
let cache = perBlock w
|
|
|
|
let pip = pipeline w
|
|
|
|
|
|
q <- liftIO $ Q.newTBQueueIO 1
|
|
|
|
-- addJob pip $ do
|
|
|
|
lock <- getLock w fn
|
|
|
|
race (pause (2 :: Timeout 'Seconds)) $ do
|
|
void $ runExceptT $ liftIO $ do
|
|
atomically $ Sem.waitTSem lock
|
|
mbactions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v))
|
|
maybe1 mbactions (pure ()) $ \actions -> do
|
|
withBinaryFile fn ReadWriteMode $ \h -> do
|
|
for_ actions $ \f -> f h
|
|
|
|
atomically $ Sem.signalTSem lock
|
|
void $ liftIO $ atomically $ Q.writeTBQueue q ()
|
|
|
|
void $ liftIO $ atomically $ Q.readTBQueue q
|
|
|
|
|
|
-- Blocking!
|
|
-- we need to write last chunk before this will happen
|
|
-- FIXME: incremental calculation,
|
|
-- streaming, blah-blah
|
|
getHash2 :: forall salt h m .
|
|
( Hashable salt
|
|
, Hashed h ByteString
|
|
, m ~ IO
|
|
, Block ByteString ~ ByteString
|
|
, Pretty (Hash h)
|
|
, Hashable (Hash h), Eq (Hash h)
|
|
)
|
|
=> ChunkWriter h IO
|
|
-> salt
|
|
-> Hash h
|
|
-> m (Maybe (Hash h))
|
|
|
|
getHash2 w salt h = do
|
|
|
|
flush w fn
|
|
|
|
runMaybeT $ do
|
|
res <- liftIO $! runExceptT $ liftIO do
|
|
( B.readFile fn >>= \s -> pure $ hashObject @h s )
|
|
|
|
MaybeT $! pure $! either (const Nothing) Just res
|
|
|
|
where
|
|
fn = makeFileName w salt h
|
|
|
|
|
|
commitBlock2 :: forall salt h m .
|
|
( Hashable salt
|
|
, Hashed h ByteString
|
|
, Block ByteString ~ ByteString
|
|
, m ~ IO
|
|
, Pretty (Hash h)
|
|
, Hashable (Hash h), Eq (Hash h)
|
|
)
|
|
=> ChunkWriter h m
|
|
-> salt
|
|
-> Hash h
|
|
-> m ()
|
|
|
|
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
|
|
|
|
flush w fn
|
|
|
|
exists <- doesFileExist fn
|
|
|
|
when exists $ do
|
|
|
|
res <- liftIO $ runExceptT $! liftIO ( B.readFile fn )
|
|
|
|
case res of
|
|
Left _ -> pure ()
|
|
Right s -> do
|
|
void $ putBlock stor s
|
|
delBlock w salt h
|
|
|
|
where
|
|
fn = makeFileName w salt h
|
|
|
|
|