diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 29781766..e08d1fa5 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -35,9 +35,9 @@ import Data.Hashable (hash) import Prettyprinter hiding (pipe) -data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu +data AnyStorage = forall zu . (Block ByteString ~ ByteString, Storage zu HbSync ByteString IO) => AnyStorage zu -instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync ByteString IO where +instance (IsKey HbSync, Key HbSync ~ Hash HbSync, Block ByteString ~ ByteString) => Storage AnyStorage HbSync ByteString IO where putBlock (AnyStorage s) = putBlock s enqueueBlock (AnyStorage s) = enqueueBlock s diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index c4cf1d51..8075771a 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -20,6 +20,7 @@ instance Serialise (BlockSize e) blockSizeProto :: forall e m . ( MonadIO m , Response e (BlockSize e) m + , EventEmitter e (BlockSize e) m ) => GetBlockSize HbSync m -> HasBlockEvent HbSync e m @@ -40,9 +41,9 @@ blockSizeProto getBlockSize evHasBlock = BlockSize h sz -> do that <- thatPeer (Proxy @(BlockSize e)) + emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz)) evHasBlock ( that, h, Just sz ) - newtype instance SessionKey e (BlockSize e) = BlockSizeKey (Hash HbSync) deriving stock (Typeable,Eq,Show) @@ -54,7 +55,8 @@ newtype instance EventKey e (BlockSize e) = deriving stock (Typeable, Eq) deriving newtype (Hashable) -data instance Event e (BlockSize e) = - BlockSizeEvent +newtype instance Event e (BlockSize e) = + BlockSizeEvent (Peer e, Hash HbSync, Integer) deriving stock (Typeable) + diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs index aee4881d..f35a023d 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple/Extra.hs @@ -10,8 +10,6 @@ import HBS2.Data.Types.Refs import HBS2.Defaults import Data.Bifunctor -import Data.Functor -import Control.Monad import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as B import Data.Function @@ -19,14 +17,13 @@ import Data.Function import Streaming.Prelude qualified as S import Streaming qualified as S -import Prettyprinter import System.IO pieces :: Integral a => a pieces = 8192 class SimpleStorageExtra a where - putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString,Block ByteString~ByteString) => SimpleStorage h -> a -> IO MerkleHash + putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString, Block ByteString ~ ByteString) => SimpleStorage h -> a -> IO MerkleHash readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked handle size = fuu @@ -52,7 +49,7 @@ instance SimpleStorageExtra Handle where pure (MerkleHash root) -instance SimpleStorageExtra ByteString where +instance Block ByteString ~ ByteString => SimpleStorageExtra ByteString where putAsMerkle ss bs = do hashes <- S.each (B.unpack bs) diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index e498ad50..b717f50c 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -18,16 +18,18 @@ import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra +import HBS2.Defaults import Test.Tasty.HUnit import Codec.Serialise hiding (encode,decode) import Control.Concurrent.Async import Control.Monad.Reader +import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy.Char8 qualified as B8 import Data.Default -import Data.Foldable +import Data.Foldable hiding (find) import Data.Map (Map) import Data.Map qualified as Map import Data.Word @@ -51,9 +53,12 @@ data BlockDownload = , _sBlockOffset :: Offset , _sBlockWritten :: Size } + deriving stock (Typeable) makeLenses 'BlockDownload +newBlockDownload :: Hash HbSync -> BlockDownload +newBlockDownload h = BlockDownload h 0 0 0 0 instance HasPeer Fake where newtype instance Peer Fake = FakePeer Word8 @@ -83,12 +88,11 @@ instance HasProtocol Fake (BlockChunks Fake) where type instance SessionData e (BlockSize e) = BlockSizeSession e -type instance SessionData Fake (BlockChunks Fake) = BlockDownload +type instance SessionData e (BlockChunks e) = BlockDownload -newtype instance SessionKey Fake (BlockChunks Fake) = - DownloadSessionKey (Peer Fake, Cookie Fake) - deriving newtype (Eq, Hashable) - deriving stock (Generic) +newtype instance SessionKey e (BlockChunks e) = + DownloadSessionKey (Peer e, Cookie e) + deriving stock (Generic,Typeable) newtype BlockSizeSession e = BlockSizeSession @@ -102,7 +106,8 @@ instance Ord (Peer e) => Default (BlockSizeSession e) where deriving stock instance Show (BlockSizeSession Fake) - +deriving newtype instance Hashable (SessionKey Fake (BlockChunks Fake)) +deriving stock instance Eq (SessionKey Fake (BlockChunks Fake)) runTestPeer :: Peer Fake -> (SimpleStorage HbSync -> IO ()) @@ -136,7 +141,7 @@ handleBlockInfo :: forall e m . ( MonadIO m , Default (SessionData e (BlockSize e)) , Ord (Peer e) , Pretty (Peer e) - , EventEmitter e (BlockSize e) m + -- , EventEmitter e (BlockSize e) m ) => (Peer e, Hash HbSync, Maybe Integer) @@ -146,16 +151,18 @@ handleBlockInfo (p, h, sz') = do maybe1 sz' (pure ()) $ \sz -> do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) - liftIO $ debug $ "got block:" <+> pretty (p, h, sz) - emit @e (BlockSizeEventKey h) BlockSizeEvent - -- FIXME: turn back on event notification - -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit - blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e) + , HasProtocol e (BlockChunks e) , Request e (BlockSize e) (PeerM e IO) + , Request e (BlockChunks e) (PeerM e IO) , EventListener e (BlockSize e) (PeerM e IO) + , Sessions e (BlockSize e) (PeerM e IO) + , Hashable (SessionKey e (BlockChunks e)) + , Typeable (SessionKey e (BlockChunks e)) + , Eq (SessionKey e (BlockChunks e)) , Num (Peer e) + -- , Ord (Peer e) ) => PeerM e IO () blockDownloadLoop = do @@ -168,8 +175,17 @@ blockDownloadLoop = do debug $ "subscribing to" <+> pretty h - subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \_ -> do + subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do debug $ "can't believe this shit works" <+> pretty h + coo <- genCookie (p,h) + let key = DownloadSessionKey (p, coo) + let chusz = defChunkSize + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral s) + $ newBlockDownload h + + update @e new key id + request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request 1 (GetBlockSize @e h) @@ -179,6 +195,77 @@ blockDownloadLoop = do pause ( 0.85 :: Timeout 'Seconds ) next +mkAdapter :: forall e m . ( m ~ PeerM e IO + , HasProtocol e (BlockChunks e) + ) => m (BlockChunksI e (ResponseM e m )) +mkAdapter = do + -- storage <- asks (view _envS + storage <- getStorage + pure $ + BlockChunksI + { blkSize = hasBlock storage + , blkChunk = getChunk storage + , blkGetHash = \c -> find (DownloadSessionKey c) (view sBlockHash) + + -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): + -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ + -- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО: + -- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ + -- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ? + -- УДАЛЯЕМ КУКУ? + , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + + let cKey = DownloadSessionKey (p,c) + + -- check if there is a session + -- FIXME: + -- TODO: log situation when no session + dwnld <- MaybeT $ find cKey id + + let bslen = fromIntegral $ B8.length bs + + let mbSize = view sBlockSize dwnld + let mbChSize = view sBlockChunkSize dwnld + + let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset + + liftIO $ do + writeChunk cww cKey h offset0 bs + + let written = view sBlockWritten dwnld + bslen + let maxOff = max offset0 (view sBlockOffset dwnld) + + lift $ update dwnld cKey ( set sBlockOffset maxOff + . set sBlockWritten written + ) + + let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize + && written >= mbSize + + when mbDone $ lift do + deferred (Proxy @(BlockChunks e)) $ do + h1 <- liftIO $ getHash cww cKey h + + -- ПОСЧИТАТЬ ХЭШ + -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК + -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ + when ( h1 == h ) $ do + liftIO $ commitBlock cww cKey h + expire cKey + -- FIXME: return this event! + -- lift $ runEngineM env $ emitBlockReadyEvent ev h -- TODO: fix this crazy shit + + when (written > mbSize * defBlockDownloadThreshold) $ do + debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p + lift $ expire cKey + -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, + -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ + -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. + -- ТАК НЕ ПОЙДЕТ + -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся + } + + main :: IO () main = do hSetBuffering stderr LineBuffering @@ -204,21 +291,22 @@ main = do debug $ "I'm" <+> pretty p <+> pretty root runPeerM (AnyStorage s) fake p $ do + adapter <- mkAdapter runProto @Fake [ makeResponse (blockSizeProto findBlk dontHandle) - -- , makeResponse (blockChunksProto undefined) + , makeResponse (blockChunksProto adapter) ] our <- async $ runTestPeer p0 $ \s -> do let blk = hasBlock s runPeerM (AnyStorage s) fake p0 $ do + adapter <- mkAdapter env <- ask - as <- liftIO $ async $ withPeerM env blockDownloadLoop runProto @Fake [ makeResponse (blockSizeProto blk handleBlockInfo) - -- , makeResponse (blockChunksProto undefined) + , makeResponse (blockChunksProto adapter) ] liftIO $ cancel as