diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index adfc05f5..fbff8bfe 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -65,6 +65,7 @@ library exposed-modules: HBS2.Actors + , HBS2.Actors.ChunkWriter , HBS2.Clock , HBS2.Data.Types , HBS2.Data.Types.Refs @@ -98,6 +99,8 @@ library , containers , cryptonite , deepseq + , directory + , filepath , hashable , interpolatedstring-perl6 , memory diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 33fc1a9d..51ac62a6 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -16,6 +16,9 @@ defStorePath = "hbs2" defPipelineSize :: Int defPipelineSize = 100 +defChunkWriterQ :: Integral a => a +defChunkWriterQ = 100 + -- typical block hash 530+ chunks * parallel wip blocks amount defProtoPipelineSize :: Int defProtoPipelineSize = 65536 @@ -23,3 +26,6 @@ defProtoPipelineSize = 65536 defCookieTimeout :: TimeSpec defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) +defBlockInfoTimeout :: TimeSpec +defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) + diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index 76cebd73..de6c03a2 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -2,14 +2,16 @@ module HBS2.Prelude ( module Data.String , module Safe , MonadIO(..) - , void + , void, guard , maybe1 + , Hashable ) where import Data.String (IsString(..)) import Safe import Control.Monad.IO.Class (MonadIO(..)) -import Control.Monad (void) +import Control.Monad (void,guard) +import Data.Hashable (Hashable) maybe1 :: Maybe a -> b -> (a -> b) -> b diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index 7e93494f..0d33132a 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -23,11 +23,11 @@ newtype StoragePrefix = StoragePrefix { fromPrefix :: FilePath } type family Block block :: Type newtype Offset = Offset Integer - deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable) + deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable,Pretty) deriving stock (Show) newtype Size = Size Integer - deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable) + deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable,Pretty) deriving stock (Show) class ( Monad m diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index 75fcdb34..6e949af1 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -14,6 +14,7 @@ import HBS2.Defaults import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra +import HBS2.Actors.ChunkWriter -- import Test.Tasty hiding (Timeout) import Test.Tasty.HUnit @@ -35,6 +36,9 @@ import System.FilePath.Posix import System.IO import Data.Cache (Cache) import Data.Cache qualified as Cache +import Data.Map (Map) +import Data.Map qualified as Map +import Control.Monad.Trans.Maybe debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -49,9 +53,12 @@ newtype ChunkNum = ChunkNum Word16 deriving stock (Eq,Ord,Show,Data,Generic) -newtype Sessions e = +data Sessions e = Sessions - { _sBlockHash :: Cache (Cookie e) (Hash HbSync) + { _sBlockHash :: Cache (Cookie e) (Hash HbSync) + , _sBlockChunkSize :: Cache (Cookie e) ChunkSize + , _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size) + , _sBlockSize :: Cache (Hash HbSync) Size } makeLenses 'Sessions @@ -64,7 +71,7 @@ data BlockChunksI e m = { blkSize :: GetBlockSize HbSync m , blkChunk :: GetBlockChunk HbSync m , blkGetHash :: Cookie e -> m (Maybe (Hash HbSync)) - , blkAcceptChunk :: (Hash HbSync, ChunkNum, ByteString) -> m () + , blkAcceptChunk :: Response e (BlockChunks e) m => (Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString) -> m () } @@ -107,15 +114,15 @@ blockChunksProto adapter (BlockChunks c p) = let offsets = zip offsets' [0..] for_ offsets $ \((o,sz),i) -> do - p <- thatPeer proto chunk <- blkChunk adapter h o sz maybe (pure ()) (response_ . BlockChunk @e i) chunk BlockChunk n bs -> do h <- blkGetHash adapter c + who <- thatPeer proto maybe1 h (response_ (BlockLost @e)) $ \hh -> do - blkAcceptChunk adapter (hh, n, bs) + blkAcceptChunk adapter (c, who, hh, n, bs) BlockNoChunks {} -> do -- TODO: notification @@ -165,14 +172,11 @@ main = do emptySessions :: IO (Sessions e) -emptySessions = do - - bh <- Cache.newCache (Just defCookieTimeout) - - pure $ - Sessions - { _sBlockHash = bh - } +emptySessions = + Sessions <$> Cache.newCache (Just defCookieTimeout) + <*> Cache.newCache (Just defBlockInfoTimeout) + <*> Cache.newCache (Just defBlockInfoTimeout) + <*> Cache.newCache (Just defBlockInfoTimeout) newSession :: (Eq k, Hashable k,MonadIO m) => s @@ -187,9 +191,16 @@ newSession se l k v = do withNewSession se l k v m = newSession se l k v >> m -getSession se l k = do +getSession' se l k fn = do let cache = view l se - liftIO $ Cache.lookup cache k + liftIO $ Cache.lookup cache k <&> fmap fn + +getSession se l k = getSession' se l k id + +updSession se def l k fn = liftIO do + let cache = view l se + v <- Cache.fetchWithCache cache k (const $ pure def) + Cache.insert cache k (fn v) runFakePeer :: forall e . e ~ Fake => Sessions e -> EngineEnv e -> IO () runFakePeer se env = do @@ -198,6 +209,8 @@ runFakePeer se env = do dir <- canonicalizePath ( ".peers" show pid) + let chDir = dir "tmp-chunks" + createDirectoryIfMissing True dir let opts = [ StoragePrefix dir @@ -207,6 +220,10 @@ runFakePeer se env = do w <- async $ simpleStorageWorker storage + cww <- newChunkWriterIO (Just chDir) + + cw <- async $ runChunkWriter cww + let size = 1024*1024 let blk = B8.concat [ fromString (take 1 $ show x) @@ -217,19 +234,50 @@ runFakePeer se env = do debug $ "I'm" <+> pretty pid <+> pretty root - let handleBlockInfo (p, h, sz) = do - debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz + let handleBlockInfo (p, h, sz') = do + maybe1 sz' (pure ()) $ \sz -> do + let bsz = fromIntegral sz + updSession se mempty sBlockSizes h (Map.insert p bsz) + updSession se bsz sBlockSize h (const bsz) - blkCookies <- Cache.newCache @(Cookie e) @(Hash HbSync) (Just defCookieTimeout) + debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz' let adapter = BlockChunksI { blkSize = hasBlock storage , blkChunk = getChunk storage , blkGetHash = getSession se sBlockHash - , blkAcceptChunk = \(h,n,bs) -> debug $ "got chunk" <+> pretty h - <+> pretty n - <+> pretty (B8.length bs) + + -- И ЧТО ТУТ ДЕЛАТЬ. + -- ЗАПИСАТЬ ЧАНК В ФАЙЛ КУДА-ТО НА TMP (КУДА? + -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): + -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ + -- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО: + -- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ + -- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ? + -- УДАЛЯЕМ КУКУ? + , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + + -- TODO: log this situation + mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing + mbChSize <- MaybeT $ getSession se sBlockChunkSize c + + let offset = fromIntegral n * fromIntegral mbChSize :: Offset + + liftIO $ do + -- newBlock cww (p,c) h mbSize + writeChunk cww (p,c) h offset bs + + -- ОТКУДА УЗНАТЬ РАЗМЕР БЛОКА? + -- ДОПУСТИМ, ОТ БЛОКИНФО? + -- ЕСЛИ НИЧЕГО НЕТ? => BLOCK_LOST + debug $ "got chunk" <+> pretty p + <+> pretty h + <+> pretty n + <+> parens ("off:" <+> pretty offset) + <+> pretty (B8.length bs) + -- <+> parens (pretty mbSize) + -- <+> braces ("chunkSize:" <+> pretty mbChSize) } runPeer env @@ -237,11 +285,13 @@ runFakePeer se env = do , makeResponse (blockChunksProto adapter) ] - cancel w - simpleStorageStop storage - pure () + stopChunkWriter cww + + pause ( 0.25 :: Timeout 'Seconds) + + mapM_ cancel [w,cw] test1 :: IO () @@ -258,7 +308,7 @@ test1 = do mtS <- emptySessions @Fake let ee = zip (repeat mtS) envs - void $ race (pause (2 :: Timeout 'Seconds)) $ do + void $ race (pause (5 :: Timeout 'Seconds)) $ do peerz <- mapM (async . uncurry runFakePeer) ee @@ -271,11 +321,17 @@ test1 = do let h = fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + + -- TODO: generate unique cookie!! let cookie = 0 let s0 = (fst . head) ee + -- getSession' se sBlockSizes h ??? + withNewSession s0 sBlockHash cookie h $ do - request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h defChunkSize)) + let chsz = defChunkSize + updSession s0 chsz sBlockChunkSize cookie (const chsz) + request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz)) pure () @@ -289,20 +345,8 @@ test1 = do -- НО ХЗ ГДЕ ДЕРЖАТЬ САМ КЭШ для конкретного подпротокола -- request p1 (BlockGetAllChunks @Fake 0 (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) - -- Q1: ЧТО ДЕЛАТЬ - -- Q1.1: КАК КУКА ПОПАДЁТ в то, где работает "adapter" - -- Q2: КАК ДЕЛАТЬ ЗАПРОСЫ - -- - -- ОТСЮДА СЛЕДУЕТ: Cookie должны жить в Engine и быть там доступны - -- В монаде Response тоже должна быть кука - -- - -- НУ есть кука и чо? какие данные с ней ассоциированы? - -- какого блеать типа? - -- - -- Как быть с тем, что кука может не поддерживаться подпротоколом? - -- Требовать HasCookie у всех? - pause ( 1 :: Timeout 'Seconds) + pause ( 2 :: Timeout 'Seconds) mapM_ cancel peerz