block receiving fsm

This commit is contained in:
Dmitry Zuikov 2023-01-18 20:04:26 +03:00
parent 0ebebc4c87
commit 630cacc960
2 changed files with 39 additions and 10 deletions

View File

@ -6,6 +6,7 @@ module HBS2.Actors.ChunkWriter
, stopChunkWriter
, newBlock
, delBlock
, commitBlock
, writeChunk
, getHash
) where
@ -39,9 +40,9 @@ newtype ChunkId = ChunkId FilePath
deriving newtype (IsString)
deriving stock (Eq,Ord,Show)
data ChunkWriter h m = forall a . (Storage a h ByteString m) =>
data ChunkWriter h m = forall a . (MonadIO m, Storage a h ByteString m, Block ByteString ~ ByteString) =>
ChunkWriter
{ pipeline :: Pipeline IO ()
{ pipeline :: Pipeline m ()
, dir :: FilePath
, storage :: a
}
@ -51,30 +52,31 @@ data ChunkWriter h m = forall a . (Storage a h ByteString m) =>
runChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
runChunkWriter w = do
liftIO $ createDirectoryIfMissing True ( dir w )
liftIO $ runPipeline (pipeline w)
runPipeline (pipeline w)
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = liftIO $ stopPipeline ( pipeline w )
stopChunkWriter w = stopPipeline ( pipeline w )
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h
, Storage a h ByteString m
, Monad m
, Block ByteString ~ ByteString
, MonadIO m
)
=> a
-> Maybe FilePath
-> IO (ChunkWriter h m)
-> m (ChunkWriter h m)
newChunkWriterIO s tmp = do
pip <- newPipeline defChunkWriterQ
def <- getXdgDirectory XdgData (defStorePath </> "temp-chunks")
def <- liftIO $ getXdgDirectory XdgData (defStorePath </> "temp-chunks")
let d = fromMaybe def tmp
pure $
ChunkWriter
{ pipeline = pip
, dir = d
, storage = s
, storage = s
}
makeFileName :: (Hashable salt, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> FilePath
@ -140,7 +142,7 @@ getHash w salt h = liftIO do
q <- Q.newTBQueueIO 1
addJob (pipeline w) do
addJob (pipeline w) $ liftIO do
h1 <- hashObject @h <$> B.readFile fn
atomically $ Q.writeTBQueue q h1
@ -149,3 +151,30 @@ getHash w salt h = liftIO do
where
fn = makeFileName w salt h
commitBlock :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock w@(ChunkWriter {storage = stor}) salt h = do
q <- liftIO $ Q.newTBQueueIO 1
addJob (pipeline w) (liftIO $ B.readFile fn >>= atomically . Q.writeTBQueue q)
s <- liftIO $ atomically $ Q.readTBQueue q
void $ putBlock stor s
delBlock w salt h
where
fn = makeFileName w salt h

View File

@ -292,7 +292,7 @@ runFakePeer se env = do
when ( h1 == h ) $ do
debug $ "THIS BLOCK IS DEFINITLY DONE" <+> pretty h1
commitChunk cw chuKey h
liftIO $ commitBlock cww chuKey h
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК