This commit is contained in:
Dmitry Zuikov 2023-01-23 10:13:48 +03:00
parent fe27c56c35
commit 2d06149e25
2 changed files with 239 additions and 14 deletions

View File

@ -16,12 +16,17 @@ import HBS2.Actors
import HBS2.Hash
import HBS2.Storage
import HBS2.Defaults
import HBS2.Clock
import Data.Functor
import Data.Function
import Control.Exception
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as B
-- import Data.Cache (Cache)
-- import Data.Cache qualified as Cache
import Data.Foldable
import Data.Traversable
import Data.Hashable (hash)
import Data.Maybe
import Data.Word
@ -34,10 +39,18 @@ import System.IO.Temp
import Control.Concurrent.Async
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as Q
import Control.Concurrent.STM.TSem qualified as Sem
import Control.Concurrent.STM.TSem (TSem)
-- TODO: cache file handles
import Control.Concurrent.STM.TQueue qualified as Q0
import Control.Concurrent
--
--
--TODO: cache file handles
newtype ChunkId = ChunkId FilePath
deriving newtype (IsString)
@ -48,20 +61,72 @@ data ChunkWriter h m = forall a . ( MonadIO m
, Block ByteString ~ ByteString
) =>
ChunkWriter
{ pipeline :: Pipeline m ()
{ stopped :: TVar Bool
, pipeline :: Pipeline m ()
, dir :: FilePath
, storage :: a
, perBlock :: Cache FilePath (TQueue (Handle -> IO ()))
, semFlush :: Cache FilePath TSem
}
runChunkWriter :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h m -> m ()
runChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
runChunkWriter w = do
runChunkWriter = runChunkWriter2
runChunkWriter1 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h m -> m ()
runChunkWriter1 w = do
liftIO $ createDirectoryIfMissing True ( dir w )
runPipeline (pipeline w)
runChunkWriter2 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h m -> m ()
runChunkWriter2 w = do
liftIO $ createDirectoryIfMissing True ( dir w )
let cache = perBlock w
fix \next -> do
-- kks <- liftIO $ take 1 <$> Cache.keys cache
-- liftIO $ for_ kks $ \h -> flush w h
-- pause ( 1 :: Timeout 'Seconds )
-- yield
-- next
stop <- liftIO $ readTVarIO (stopped w)
if stop then do
ks <- liftIO $ take 20 <$> Cache.keys cache
for_ ks $ \k -> flush w k
else do
ks <- liftIO $ Cache.keys cache
amount <- for ks $ \k -> flush w k
if (sum amount == 0) then do
pause ( 0.5 :: Timeout 'Seconds )
else do
liftIO $ print ("flushed:" <+> pretty (sum amount))
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = stopPipeline ( pipeline w )
stopChunkWriter w = do
liftIO $ atomically $ writeTVar (stopped w) True
stopChunkWriter1 :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter1 w = do
let cache = perBlock w
stopPipeline ( pipeline w )
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
, Storage a h ByteString m
@ -78,11 +143,19 @@ newChunkWriterIO s tmp = do
def <- liftIO $ getXdgDirectory XdgData (defStorePath </> "temp-chunks")
let d = fromMaybe def tmp
mt <- liftIO $ Cache.newCache Nothing
sem <- liftIO $ Cache.newCache Nothing
running <- liftIO $ newTVarIO False
pure $
ChunkWriter
{ pipeline = pip
{ stopped = running
, pipeline = pip
, dir = d
, storage = s
, perBlock = mt
, semFlush = sem
}
makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath
@ -113,14 +186,63 @@ delBlock w salt h = liftIO do
where
fn = makeFileName w salt h
writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h))
writeChunk :: ( Hashable salt
, MonadIO m
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk w salt h o bs = addJob (pipeline w) $ liftIO do
writeChunk = writeChunk2
getHash :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
getHash = getHash2
commitBlock :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, Block ByteString ~ ByteString
, MonadIO m
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock = commitBlock2
writeChunk1 :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk1 w salt h o bs = addJob (pipeline w) $ liftIO do
-- writeChunk w salt h o bs = liftIO do
-- print $ "writeChunk:" <+> pretty fn
withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
@ -129,23 +251,47 @@ writeChunk w salt h o bs = addJob (pipeline w) $ liftIO do
where
fn = makeFileName w salt h
writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk2 w salt h o bs = do
let cache = perBlock w
liftIO $ do
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
atomically $ Q0.writeTQueue q $ \fh -> do
-- withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
-- hFlush fh
where
fn = makeFileName w salt h
-- Blocking!
-- we need to write last chunk before this will happen
-- FIXME: incremental calculation,
-- streaming, blah-blah
getHash :: forall salt h m .
getHash1 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
getHash w salt h = liftIO do
getHash1 w salt h = liftIO do
q <- Q.newTBQueueIO 1
@ -159,7 +305,57 @@ getHash w salt h = liftIO do
fn = makeFileName w salt h
commitBlock :: forall salt h m .
flush w fn = do
let cache = perBlock w
let scache = semFlush w
liftIO $ do
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
s <- Cache.fetchWithCache scache fn $ const (atomically $ Sem.newTSem 2)
atomically $ Sem.waitTSem s
Cache.delete cache fn
flushed <- atomically (Q0.flushTQueue q)
liftIO $ do
-- withBinaryFile fn ReadWriteMode $ \fh -> do
withFile fn ReadWriteMode $ \fh -> do
for_ flushed $ \f -> f fh
atomically $ Sem.signalTSem s
pure (length flushed)
-- Blocking!
-- we need to write last chunk before this will happen
-- FIXME: incremental calculation,
-- streaming, blah-blah
getHash2 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
getHash2 w salt h = do
flush w fn
h1 <- liftIO $ hashObject @h <$> B.readFile fn
pure h1
where
fn = makeFileName w salt h
commitBlock1 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, Block ByteString ~ ByteString
@ -171,7 +367,7 @@ commitBlock :: forall salt h m .
-> Hash h
-> m ()
commitBlock w@(ChunkWriter {storage = stor}) salt h = do
commitBlock1 w@(ChunkWriter {storage = stor}) salt h = do
q <- liftIO $ Q.newTBQueueIO 1
addJob (pipeline w) (liftIO $ B.readFile fn >>= atomically . Q.writeTBQueue q)
@ -186,3 +382,31 @@ commitBlock w@(ChunkWriter {storage = stor}) salt h = do
fn = makeFileName w salt h
commitBlock2 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, Block ByteString ~ ByteString
, MonadIO m
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
let cache = perBlock w
let scache = semFlush w
flush w fn
s <- liftIO $ B.readFile fn
void $ putBlock stor s
delBlock w salt h
liftIO $ Cache.delete cache fn
liftIO $ Cache.delete scache fn
where
fn = makeFileName w salt h

View File

@ -147,7 +147,7 @@ runTestPeer p zu = do
cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ replicateM 8 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 1 $ async $ runChunkWriter cww
cw <- liftIO $ replicateM 16 $ async $ runChunkWriter cww
zu stor cww
@ -260,7 +260,7 @@ blockDownloadLoop = do
env <- ask
pip <- asks (view envDeferred)
debug "process block!"
-- debug "process block!"
liftIO $ addJob pip $ withPeerM env $ do
-- void $ liftIO $ async $ withPeerM env $ do
@ -353,6 +353,7 @@ mkAdapter cww = do
when ( h1 == h ) $ do
liftIO $ commitBlock cww cKey h
expire cKey
-- debug "hash matched!"
emit @e (BlockChunksEventKey h) (BlockReady h)
when (written > mbSize * defBlockDownloadThreshold) $ do