diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index d5d62e81..b36994a2 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -9,6 +9,7 @@ module HBS2.Actors.ChunkWriter , newBlock , delBlock , writeChunk + , getHash ) where import HBS2.Prelude @@ -32,6 +33,9 @@ import System.FilePath import System.IO.Error import System.IO +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue qualified as Q + -- TODO: cache file handles newtype ChunkId = ChunkId FilePath @@ -40,7 +44,7 @@ newtype ChunkId = ChunkId FilePath data ChunkWriter h m = ChunkWriter - { _pipeline :: Pipeline m () + { _pipeline :: Pipeline IO () , _dir :: FilePath , storage :: forall a . (Key h ~ Hash h, Storage a h ByteString m) => a } @@ -50,13 +54,13 @@ makeLenses 'ChunkWriter runChunkWriter :: MonadIO m => ChunkWriter h m -> m () runChunkWriter w = do liftIO $ createDirectoryIfMissing True ( w ^. dir ) - runPipeline ( w ^. pipeline) + liftIO $ runPipeline ( w ^. pipeline) stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () -stopChunkWriter w = stopPipeline ( w ^. pipeline ) +stopChunkWriter w = liftIO $ stopPipeline ( w ^. pipeline ) newChunkWriterIO :: Maybe FilePath -> IO (ChunkWriter h m) -newChunkWriterIO tmp = do +newChunkWriterIO tmp = do pip <- newPipeline defChunkWriterQ def <- getXdgDirectory XdgData (defStorePath "temp-chunks") @@ -65,7 +69,7 @@ newChunkWriterIO tmp = do pure $ ChunkWriter - { _pipeline = undefined + { _pipeline = pip , _dir = d , storage = undefined } @@ -90,14 +94,22 @@ newBlock w salt h size = liftIO do where fn = makeFileName w salt h -delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> m () +delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) + => ChunkWriter h m -> salt -> Hash h -> m () + delBlock w salt h = liftIO do void $ tryJust (guard . isDoesNotExistError) (removeFile fn) where fn = makeFileName w salt h -writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h)) => ChunkWriter h m -> salt -> Hash h -> Offset -> ByteString -> m () -writeChunk w salt h o bs = liftIO do +writeChunk :: (Hashable salt, MonadIO m, Pretty (Hash h)) + => ChunkWriter h m + -> salt + -> Hash h + -> Offset + -> ByteString -> m () + +writeChunk w salt h o bs = addJob (w ^. pipeline) $ liftIO do withBinaryFile fn ReadWriteMode $ \fh -> do hSeek fh AbsoluteSeek (fromIntegral o) B.hPutStr fh bs @@ -106,3 +118,31 @@ writeChunk w salt h o bs = liftIO do where fn = makeFileName w salt h +-- Blocking! +-- we need to write last chunk before this will happen +-- FIXME: incremental calculation, +-- streaming, blah-blah +getHash :: forall salt h m . + ( Hashable salt + , Hashed h ByteString + , MonadIO m + , Pretty (Hash h) + ) + => ChunkWriter h m + -> salt + -> Hash h + -> m (Hash h) + +getHash w salt h = liftIO do + + q <- Q.newTBQueueIO 1 + + addJob (w ^. pipeline) do + h1 <- hashObject @h <$> B.readFile fn + atomically $ Q.writeTBQueue q h1 + + atomically $ Q.readTBQueue q + + where + fn = makeFileName w salt h + diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index de6c03a2..a7353a86 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -2,17 +2,18 @@ module HBS2.Prelude ( module Data.String , module Safe , MonadIO(..) - , void, guard + , void, guard, when, unless , maybe1 , Hashable + , lift ) where import Data.String (IsString(..)) import Safe import Control.Monad.IO.Class (MonadIO(..)) -import Control.Monad (void,guard) +import Control.Monad (void,guard,when,unless) import Data.Hashable (Hashable) - +import Control.Monad.Trans.Class (lift) maybe1 :: Maybe a -> b -> (a -> b) -> b maybe1 mb n j = maybe n j mb diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index 6e949af1..25727805 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -53,10 +53,15 @@ newtype ChunkNum = ChunkNum Word16 deriving stock (Eq,Ord,Show,Data,Generic) +-- FIXME: peer should be a part of the key +-- therefore, key is ( p | cookie ) +-- but client's cookie in protocol should be just ( cookie :: Word32 ) data Sessions e = Sessions { _sBlockHash :: Cache (Cookie e) (Hash HbSync) , _sBlockChunkSize :: Cache (Cookie e) ChunkSize + , _sBlockOffset :: Cache (Cookie e) Offset + , _sBlockWritten :: Cache (Cookie e) Size , _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size) , _sBlockSize :: Cache (Hash HbSync) Size } @@ -108,8 +113,6 @@ blockChunksProto adapter (BlockChunks c p) = BlockGetAllChunks h size -> deferred proto do bsz <- blkSize adapter h - debug $ "bzs" <+> pretty bsz - let offsets' = calcChunks (fromJust bsz) (fromIntegral size) :: [(Offset, Size)] let offsets = zip offsets' [0..] @@ -177,6 +180,8 @@ emptySessions = <*> Cache.newCache (Just defBlockInfoTimeout) <*> Cache.newCache (Just defBlockInfoTimeout) <*> Cache.newCache (Just defBlockInfoTimeout) + <*> Cache.newCache (Just defBlockInfoTimeout) + <*> Cache.newCache (Just defBlockInfoTimeout) newSession :: (Eq k, Hashable k,MonadIO m) => s @@ -258,15 +263,44 @@ runFakePeer se env = do -- УДАЛЯЕМ КУКУ? , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + let chuKey = (p,c) + + let bslen = fromIntegral $ B8.length bs -- TODO: log this situation mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing - mbChSize <- MaybeT $ getSession se sBlockChunkSize c + mbChSize <- MaybeT $ getSession se sBlockChunkSize c let offset = fromIntegral n * fromIntegral mbChSize :: Offset + updSession se offset sBlockOffset c (max offset) + liftIO $ do -- newBlock cww (p,c) h mbSize - writeChunk cww (p,c) h offset bs + writeChunk cww chuKey h offset bs + updSession se 0 sBlockWritten c (+bslen) + + maxOff <- MaybeT $ getSession se sBlockOffset c + written <- MaybeT $ getSession se sBlockWritten c + + let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize + && written >= mbSize + + when mbDone $ lift do + deferred (Proxy @(BlockChunks e)) $ do + debug "THIS BLOCK MAYBE DONE" + h1 <- liftIO $ getHash cww chuKey h + + when ( h1 == h ) $ do + debug $ "THIS BLOCK IS DEFINETLY DONE" <+> pretty h1 + + -- ПОСЧИТАТЬ ХЭШ + -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК + -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ + -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, + -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ + -- ЧАНКИ, ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. + -- ТАК НЕ ПОЙДЕТ + -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся -- ОТКУДА УЗНАТЬ РАЗМЕР БЛОКА? -- ДОПУСТИМ, ОТ БЛОКИНФО?