This commit is contained in:
Dmitry Zuikov 2023-01-18 18:47:21 +03:00
parent 4256a3663f
commit 0bc07eb912
3 changed files with 90 additions and 15 deletions

View File

@ -9,6 +9,7 @@ module HBS2.Actors.ChunkWriter
, newBlock
, delBlock
, writeChunk
, getHash
) where
import HBS2.Prelude
@ -32,6 +33,9 @@ import System.FilePath
import System.IO.Error
import System.IO
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as Q
-- TODO: cache file handles
newtype ChunkId = ChunkId FilePath
@ -40,7 +44,7 @@ newtype ChunkId = ChunkId FilePath
data ChunkWriter h m =
ChunkWriter
{ _pipeline :: Pipeline m ()
{ _pipeline :: Pipeline IO ()
, _dir :: FilePath
, storage :: forall a . (Key h ~ Hash h, Storage a h ByteString m) => a
}
@ -50,13 +54,13 @@ makeLenses 'ChunkWriter
runChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
runChunkWriter w = do
liftIO $ createDirectoryIfMissing True ( w ^. dir )
runPipeline ( w ^. pipeline)
liftIO $ runPipeline ( w ^. pipeline)
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = stopPipeline ( w ^. pipeline )
stopChunkWriter w = liftIO $ stopPipeline ( w ^. pipeline )
newChunkWriterIO :: Maybe FilePath -> IO (ChunkWriter h m)
newChunkWriterIO tmp = do
newChunkWriterIO tmp = do
pip <- newPipeline defChunkWriterQ
def <- getXdgDirectory XdgData (defStorePath </> "temp-chunks")
@ -65,7 +69,7 @@ newChunkWriterIO tmp = do
pure $
ChunkWriter
{ _pipeline = undefined
{ _pipeline = pip
, _dir = d
, storage = undefined
}
@ -90,14 +94,22 @@ newBlock w salt h size = liftIO do
where
fn = makeFileName w salt h
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> m ()
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m -> salt -> Hash h -> m ()
delBlock w salt h = liftIO do
void $ tryJust (guard . isDoesNotExistError) (removeFile fn)
where
fn = makeFileName w salt h
writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> Offset -> ByteString -> m ()
writeChunk w salt h o bs = liftIO do
writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk w salt h o bs = addJob (w ^. pipeline) $ liftIO do
withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
@ -106,3 +118,31 @@ writeChunk w salt h o bs = liftIO do
where
fn = makeFileName w salt h
-- Blocking!
-- we need to write last chunk before this will happen
-- FIXME: incremental calculation,
-- streaming, blah-blah
getHash :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
getHash w salt h = liftIO do
q <- Q.newTBQueueIO 1
addJob (w ^. pipeline) do
h1 <- hashObject @h <$> B.readFile fn
atomically $ Q.writeTBQueue q h1
atomically $ Q.readTBQueue q
where
fn = makeFileName w salt h

View File

@ -2,17 +2,18 @@ module HBS2.Prelude
( module Data.String
, module Safe
, MonadIO(..)
, void, guard
, void, guard, when, unless
, maybe1
, Hashable
, lift
) where
import Data.String (IsString(..))
import Safe
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (void,guard)
import Control.Monad (void,guard,when,unless)
import Data.Hashable (Hashable)
import Control.Monad.Trans.Class (lift)
maybe1 :: Maybe a -> b -> (a -> b) -> b
maybe1 mb n j = maybe n j mb

View File

@ -53,10 +53,15 @@ newtype ChunkNum = ChunkNum Word16
deriving stock (Eq,Ord,Show,Data,Generic)
-- FIXME: peer should be a part of the key
-- therefore, key is ( p | cookie )
-- but client's cookie in protocol should be just ( cookie :: Word32 )
data Sessions e =
Sessions
{ _sBlockHash :: Cache (Cookie e) (Hash HbSync)
, _sBlockChunkSize :: Cache (Cookie e) ChunkSize
, _sBlockOffset :: Cache (Cookie e) Offset
, _sBlockWritten :: Cache (Cookie e) Size
, _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size)
, _sBlockSize :: Cache (Hash HbSync) Size
}
@ -108,8 +113,6 @@ blockChunksProto adapter (BlockChunks c p) =
BlockGetAllChunks h size -> deferred proto do
bsz <- blkSize adapter h
debug $ "bzs" <+> pretty bsz
let offsets' = calcChunks (fromJust bsz) (fromIntegral size) :: [(Offset, Size)]
let offsets = zip offsets' [0..]
@ -177,6 +180,8 @@ emptySessions =
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
newSession :: (Eq k, Hashable k,MonadIO m)
=> s
@ -258,15 +263,44 @@ runFakePeer se env = do
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let chuKey = (p,c)
let bslen = fromIntegral $ B8.length bs
-- TODO: log this situation
mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
mbChSize <- MaybeT $ getSession se sBlockChunkSize c
mbChSize <- MaybeT $ getSession se sBlockChunkSize c
let offset = fromIntegral n * fromIntegral mbChSize :: Offset
updSession se offset sBlockOffset c (max offset)
liftIO $ do
-- newBlock cww (p,c) h mbSize
writeChunk cww (p,c) h offset bs
writeChunk cww chuKey h offset bs
updSession se 0 sBlockWritten c (+bslen)
maxOff <- MaybeT $ getSession se sBlockOffset c
written <- MaybeT $ getSession se sBlockWritten c
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
&& written >= mbSize
when mbDone $ lift do
deferred (Proxy @(BlockChunks e)) $ do
debug "THIS BLOCK MAYBE DONE"
h1 <- liftIO $ getHash cww chuKey h
when ( h1 == h ) $ do
debug $ "THIS BLOCK IS DEFINETLY DONE" <+> pretty h1
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
-- ЧАНКИ, ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
-- ТАК НЕ ПОЙДЕТ
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
-- ОТКУДА УЗНАТЬ РАЗМЕР БЛОКА?
-- ДОПУСТИМ, ОТ БЛОКИНФО?