diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 69bad7d5..a58cc3fb 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -1,5 +1,6 @@ module HBS2.Defaults where +import HBS2.Clock import Data.String defChunkSize :: Integer @@ -14,4 +15,10 @@ defStorePath = "hbs2" defPipelineSize :: Int defPipelineSize = 100 +-- typical block hash 530+ chunks * parallel wip blocks amount +defProtoPipelineSize :: Int +defProtoPipelineSize = 65536 + +defCookieTimeout :: TimeSpec +defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) diff --git a/hbs2-core/lib/HBS2/Net/Peer.hs b/hbs2-core/lib/HBS2/Net/Peer.hs index 127c64a1..d2a1ab25 100644 --- a/hbs2-core/lib/HBS2/Net/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Peer.hs @@ -135,7 +135,7 @@ newEnv :: forall e bus m . ( Monad m -> m (EngineEnv e) newEnv p pipe = do - de <- liftIO $ newPipeline defPipelineSize + de <- liftIO $ newPipeline defProtoPipelineSize pure $ EngineEnv Nothing p pipe de runPeer :: forall e m a . ( MonadIO m diff --git a/hbs2-core/lib/HBS2/Net/Proto.hs b/hbs2-core/lib/HBS2/Net/Proto.hs index a29435bb..79eeb7a8 100644 --- a/hbs2-core/lib/HBS2/Net/Proto.hs +++ b/hbs2-core/lib/HBS2/Net/Proto.hs @@ -4,11 +4,12 @@ module HBS2.Net.Proto ) where import HBS2.Prelude.Plated +import HBS2.Hash import HBS2.Net.Proto.Types +dontHandle :: Applicative f => a -> f () +dontHandle = const $ pure () -newtype BlockInfo = BlockInfo Integer - deriving stock (Eq, Data) - +type GetBlockSize h m = Hash h -> m (Maybe Integer) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 97554306..2b979ac7 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -13,7 +13,6 @@ data BlockSize e = GetBlockSize (Hash HbSync) type HasBlockEvent h e m = (Peer e, Hash h, Maybe Integer) -> m () -type GetBlockSize h m = Hash h -> m (Maybe Integer) instance Serialise (BlockSize e) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index ebd60b87..6e27b79e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -13,6 +13,9 @@ import Control.Monad.IO.Class -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) +class HasCookie p where + type family Cookie p :: Type + class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where data family (Peer e) :: Type diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index 2c0d14fa..76cebd73 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -3,6 +3,7 @@ module HBS2.Prelude , module Safe , MonadIO(..) , void + , maybe1 ) where import Data.String (IsString(..)) @@ -11,5 +12,6 @@ import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad (void) - +maybe1 :: Maybe a -> b -> (a -> b) -> b +maybe1 mb n j = maybe n j mb diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index 2f0e0eb0..6c926098 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -1,13 +1,15 @@ {-# Language FunctionalDependencies #-} module HBS2.Storage where -import Data.Kind -import Data.Hashable hiding (Hashed) -import Prettyprinter - import HBS2.Hash import HBS2.Prelude.Plated +import Data.Kind +import Data.Hashable hiding (Hashed) +import Lens.Micro.Platform +import Prettyprinter + + class Pretty (Hash h) => IsKey h where type Key h :: Type @@ -47,3 +49,12 @@ class ( Monad m +calcChunks :: forall a b . (Integral a, Integral b) + => Integer -- | block size + -> Integer -- | chunk size + -> [(a, b)] + +calcChunks s1 s2 = fmap (over _1 fromIntegral . over _2 fromIntegral) chu + where + chu = fmap (,s2) (takeWhile ( Doc ann -> m () +debug p = liftIO $ hPrint stderr p + +newtype ChunkSize = ChunkSize Word16 + deriving newtype (Num,Enum,Real,Integral) + deriving stock (Eq,Ord,Show,Data,Generic) + + +newtype ChunkNum = ChunkNum Word16 + deriving newtype (Num,Enum,Real,Integral) + deriving stock (Eq,Ord,Show,Data,Generic) + + + +type GetBlockChunk h = forall m . MonadIO m => Hash h -> Offset -> Size -> m (Maybe ByteString) + +type MyCookie e = Cookie (BlockChunks e) + +data BlockChunksI e m = + BlockChunksI + { blkSize :: GetBlockSize HbSync m + , blkChunk :: GetBlockChunk HbSync + , blkGetHash :: MyCookie e -> m (Maybe (Hash HbSync)) + , blkAcceptChunk :: (Hash HbSync, ChunkNum, ByteString) -> m () + } + +data BlockChunks e = BlockGetAllChunks (MyCookie e) (Hash HbSync) ChunkSize + | BlockNoChunks (MyCookie e) + | BlockChunk (MyCookie e) ChunkNum ByteString + | BlockLost (MyCookie e) + deriving stock (Generic) + + +instance HasCookie (BlockChunks e) where + type instance Cookie (BlockChunks e) = Word32 + +instance Serialise ChunkSize +instance Serialise ChunkNum +instance Serialise (BlockChunks e) + +blockChunksProto :: forall e m . ( MonadIO m + , Response e (BlockChunks e) m + ) + => BlockChunksI e m + -> BlockChunks e + -> m () + +blockChunksProto adapter = + \case + BlockGetAllChunks c h size -> deferred proto do + bsz <- blkSize adapter h + + let offsets' = calcChunks (fromJust bsz) (fromIntegral size) :: [(Offset, Size)] + let offsets = zip offsets' [0..] + + for_ offsets $ \((o,sz),i) -> do + chunk <- blkChunk adapter h o sz + maybe (pure ()) (response . BlockChunk @e c i) chunk + + BlockChunk c n bs -> do + -- TODO: getHashByCookie c + h <- blkGetHash adapter c + + maybe1 h (response (BlockLost @e c)) $ \hh -> do + blkAcceptChunk adapter (hh, n, bs) + + BlockNoChunks {} -> do + -- TODO: notification + pure () + + BlockLost{} -> do + pure () + + where + proto = Proxy @(BlockChunks e) data Fake @@ -40,8 +123,6 @@ instance HasPeer Fake where instance Pretty (Peer Fake) where pretty (FakePeer n) = parens ("peer" <+> pretty n) -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p instance HasProtocol Fake (BlockSize Fake) where type instance ProtocolId (BlockSize Fake) = 1 @@ -49,10 +130,11 @@ instance HasProtocol Fake (BlockSize Fake) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise - - -dontHandle :: Applicative f => a -> f () -dontHandle = const $ pure () +instance HasProtocol Fake (BlockChunks Fake) where + type instance ProtocolId (BlockChunks Fake) = 2 + type instance Encoded Fake = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise main :: IO () main = do @@ -66,7 +148,7 @@ main = do -- ] -runFakePeer :: EngineEnv Fake -> IO () +runFakePeer :: forall e . e ~ Fake => EngineEnv e -> IO () runFakePeer env = do let pid = fromIntegral (hash (env ^. self)) :: Word8 @@ -97,8 +179,19 @@ runFakePeer env = do let handleBlockInfo (p, h, sz) = do debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz + blkCookies <- Cache.newCache @(Cookie (BlockChunks e)) @(Hash HbSync) (Just defCookieTimeout) + + let adapter = + BlockChunksI + { blkSize = hasBlock storage + , blkChunk = getChunk storage + , blkGetHash = liftIO . Cache.lookup blkCookies + , blkAcceptChunk = dontHandle + } + runPeer env [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) + , makeResponse (blockChunksProto adapter) ] cancel w @@ -128,6 +221,22 @@ test1 = do request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + -- request p1 (BlockGetAllChunks @Fake 0 (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + + -- Я ЗАПРОСИЛ БЛОК + -- У МЕНЯ НЕТ КУКИ + -- МНЕ ПРИШЛИ ЧАНКИ + -- КУКИ НЕТ -> ГОВОРЮ "БЛОК ЛОСТ" + -- Q1: ЧТО ДЕЛАТЬ + -- Q1.1: КАК КУКА ПОПАДЁТ в то, где работает "adapter" + -- Q2: КАК ДЕЛАТЬ ЗАПРОСЫ + -- + -- ОТСЮДА СЛЕДУЕТ: Cookie должны жить в Engine и быть там доступны + -- В монаде Response тоже должна быть кука + -- + -- Как быть с тем, что кука может не поддерживаться подпортоколом? + -- Требовать HasCookie у всех? + pause ( 0.5 :: Timeout 'Seconds) mapM_ cancel peerz @@ -136,7 +245,7 @@ test1 = do debug (pretty $ show e) debug "we're done" - assertBool "sucess" True + assertBool "success" True exitSuccess assertBool "failed" False