diff --git a/hbs2-core/lib/HBS2/Actors/OldPeer.hs b/hbs2-core/lib/HBS2/Actors/OldPeer.hs deleted file mode 100644 index f04c28d5..00000000 --- a/hbs2-core/lib/HBS2/Actors/OldPeer.hs +++ /dev/null @@ -1,252 +0,0 @@ -{-# Language TemplateHaskell #-} -{-# Language UndecidableInstances #-} -module HBS2.Actors.OldPeer where - -import HBS2.Prelude -import HBS2.Prelude.Plated -import HBS2.Net.Proto -import HBS2.Net.Messaging -import HBS2.Clock -import HBS2.Actors -import HBS2.Defaults - - -import Control.Concurrent.Async -import Control.Monad.Reader -import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy ( ByteString ) -import Data.Digest.Murmur32 -import Data.Foldable hiding (find) -import Data.Hashable -import Data.Kind -import Data.Map qualified as Map -import GHC.TypeLits -import Lens.Micro.Platform -import System.Random qualified as Random -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Dynamic -import Data.Maybe - -import Codec.Serialise hiding (encode,decode) - - - -data AnyMessage e = AnyMessage Integer (Encoded e) - deriving stock (Generic) - -instance Serialise (Encoded e) => Serialise (AnyMessage e) - - -data EngineEnv e = forall bus . ( Messaging bus e ByteString - , Serialise (Encoded e) - ) => - EngineEnv - { _peer :: Maybe (Peer e) - , _self :: Peer e - , _sessions :: Cache SKey Dynamic - , bus :: bus - , defer :: Pipeline IO () - } - - -newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (EngineEnv e) - , MonadTrans - ) - -data ResponseEnv e = - ResponseEnv - { _engine :: EngineEnv e - , _respPeer :: Peer e - } - -newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (ResponseEnv e) - , MonadTrans - ) - - -makeLenses 'EngineEnv - -makeLenses 'ResponseEnv - -data AnyProtocol e m = forall p . ( HasProtocol e p - , Response e p m - ) => - AnyProtocol - { myProtoId :: Integer - , protoDecode :: Encoded e -> Maybe p - , protoEncode :: p -> Encoded e - , handle :: p -> m () - } - -makeResponse :: forall e p m . ( MonadIO m - , Response e p m - , HasProtocol e p - ) - => (p -> m ()) -> AnyProtocol e m - -makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) - , protoDecode = decode - , protoEncode = encode - , handle = h - } - - -runEngineM :: EngineEnv e -> EngineM e m a -> m a -runEngineM e f = runReaderT (fromEngine f) e - -runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a -runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p) - -instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where - request p msg = do - let proto = protoId @e @p (Proxy @p) - ask >>= \case - EngineEnv { _self = s, bus = b} -> do - let bs = serialise (AnyMessage @e proto (encode msg)) - liftIO $ sendTo b (To p) (From s) bs - - - -instance ( MonadIO m - , HasProtocol e p - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (ResponseM e m) where - - find k f = flip runEngineM (find k f) =<< asks (view engine) - - fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) - - update d k f = flip runEngineM (update d k f) =<< asks (view engine) - - expire k = flip runEngineM (expire k) =<< asks (view engine) - - -instance ( MonadIO m - , HasProtocol e p - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (EngineM e m) where - - - find k f = do - se <- asks (view sessions) - let sk = newSKey @(SessionKey e p) k - r <- liftIO $ Cache.lookup se sk - case fromDynamic @(SessionData e p) <$> r of - Just v -> pure $ f <$> v - Nothing -> pure Nothing - - fetch upd def k fn = do - se <- asks (view sessions) - let sk = newSKey @(SessionKey e p) k - let ddef = toDyn def - - r <- liftIO $ Cache.lookup se sk - - case r of - Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v ) - Nothing -> do - when upd $ liftIO $ Cache.insert se sk ddef - pure (fn def) - - update def k f = do - se <- asks (view sessions) - val <- fetch @e @p True def k id - liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) - - expire k = do - se <- asks (view sessions) - liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) - -instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where - - thatPeer _ = asks (view respPeer) - - deferred _ m = do - e@(EngineEnv { defer = d }) <- asks (view engine) - p <- asks (view respPeer) - addJob d (runEngineM e (runResponseM e p m)) - - response resp = do - env <- ask - let p = env ^. respPeer - let s = env ^. (engine . self) - let proto = protoId @e @p (Proxy @p) - let bs = serialise (AnyMessage @e proto (encode resp)) - - -- TODO: wrap somehow - case env ^. engine of - EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs - -newEnv :: forall e bus m . ( Monad m - , MonadIO m - , Messaging bus e ByteString - , Serialise (Encoded e) - ) - => Peer e - -> bus - -> m (EngineEnv e) - -newEnv p pipe = do - de <- liftIO $ newPipeline defProtoPipelineSize - se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass - pure $ EngineEnv Nothing p se pipe de - -runPeer :: forall e m a . ( MonadIO m - ) - => EngineEnv e - -> [AnyProtocol e (ResponseM e m)] - -> m a - -runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do - - let me = env ^. self - - let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] - - let disp = Map.fromList resp - - void $ liftIO $ async $ runPipeline d - - runEngineM env $ do - - forever $ do - messages <- receive pipe (To me) - - for_ messages $ \(From pip, bs) -> do - - case deserialiseOrFail @(AnyMessage e) bs of - - Left _-> pure () -- liftIO $ print "failed to deserialise" - - Right (AnyMessage n msg) -> do - - local (set peer (Just pip)) do - - ee <- ask - - case Map.lookup n disp of - Nothing -> pure () - - Just (AnyProtocol { protoDecode = decoder - , handle = h - }) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg) - - - diff --git a/hbs2-tests/test/PeerMain.hs b/hbs2-tests/test/PeerMain.hs deleted file mode 100644 index 957f4310..00000000 --- a/hbs2-tests/test/PeerMain.hs +++ /dev/null @@ -1,527 +0,0 @@ -{-# Language RankNTypes #-} -{-# Language TemplateHaskell #-} -{-# Language AllowAmbiguousTypes #-} -{-# Language UndecidableInstances #-} -module Main where - -import HBS2.Prelude.Plated -import HBS2.Clock -import HBS2.Hash -import HBS2.Actors --- import HBS2.Net.Messaging -import HBS2.Net.Proto -import HBS2.Net.Proto.BlockInfo -import HBS2.Net.Proto.BlockChunks -import HBS2.Net.Messaging -import HBS2.Net.Messaging.Fake -import HBS2.Actors.Peer -import HBS2.Defaults - -import HBS2.Data.Types.Refs -import HBS2.Storage -import HBS2.Storage.Simple -import HBS2.Storage.Simple.Extra -import HBS2.Actors.ChunkWriter - --- import Test.Tasty hiding (Timeout) -import Test.Tasty.HUnit - -import Codec.Serialise -import Control.Concurrent.Async -import Control.Monad -import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy.Char8 qualified as B8 -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Foldable hiding (find) -import Data.Hashable -import Data.Map (Map) -import Data.Map qualified as Map -import Data.Maybe -import Data.Word -import Lens.Micro.Platform -import Prettyprinter -import System.Directory -import System.Exit -import System.FilePath.Posix -import System.IO -import Control.Concurrent -import Data.Default -import Control.Monad.Reader -import Data.Dynamic -import Data.Kind - -import Control.Concurrent.STM -import Control.Concurrent.STM.TQueue qualified as Q -import Control.Concurrent.STM.TBQueue qualified as TBQ -import Control.Concurrent.STM.TBQueue (TBQueue) - -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - - - --- FIXME: peer should be a part of the key --- therefore, key is ( p | cookie ) --- but client's cookie in protocol should be just ( cookie :: Word32 ) - - -data BlockDownload = - BlockDownload - { _sBlockHash :: Hash HbSync - , _sBlockSize :: Size - , _sBlockChunkSize :: ChunkSize - , _sBlockOffset :: Offset - , _sBlockWritten :: Size - } - -makeLenses 'BlockDownload - -newBlockDownload :: Hash HbSync -> BlockDownload -newBlockDownload h = BlockDownload h 0 0 0 0 - -data Fake - -instance HasPeer Fake where - newtype instance Peer Fake = FakePeer Word8 - deriving newtype (Hashable,Num,Enum,Real,Integral) - deriving stock (Eq,Ord,Show) - - -instance Pretty (Peer Fake) where - pretty (FakePeer n) = parens ("peer" <+> pretty n) - - -instance HasProtocol Fake (BlockSize Fake) where - type instance ProtocolId (BlockSize Fake) = 1 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - -instance HasProtocol Fake (BlockChunks Fake) where - type instance ProtocolId (BlockChunks Fake) = 2 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - - -type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake -type instance SessionData Fake (BlockChunks Fake) = BlockDownload - -newtype instance SessionKey Fake (BlockChunks Fake) = - DownloadSessionKey (Peer Fake, Cookie Fake) - deriving newtype (Eq, Hashable) - deriving stock (Generic) - -newtype BlockSizeSession e = - BlockSizeSession - { _bsBlockSizes :: Map (Peer e) Size - } - -makeLenses 'BlockSizeSession - -instance Ord (Peer e) => Default (BlockSizeSession e) where - def = BlockSizeSession mempty - -deriving stock instance Show (BlockSizeSession Fake) - - -main :: IO () -main = do - hSetBuffering stderr LineBuffering - test1 - - -- defaultMain $ - -- testGroup "root" - -- [ - -- testCase "test1" test1 - -- ] - - --- TODO: абстрактные нотификации, т.к это всё типизируется --- по ключу-значению - -data PeerEvents e (m :: Type -> Type) = - PeerEvents - { onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m]) - , onBlockReady :: TVar (Map (Hash HbSync) [OnBlockReady HbSync m]) - } - -newPeerEventsIO :: forall e m . MonadIO m => IO (PeerEvents e m) -newPeerEventsIO = PeerEvents <$> newTVarIO mempty - <*> newTVarIO mempty - -addBlockSizeEventNotify :: forall e m . (MonadIO m) - => PeerEvents e m - -> Hash HbSync - -> HasBlockEvent HbSync e m - -> m () - -addBlockSizeEventNotify pe h e = do - void $ liftIO $ atomically $ modifyTVar' (onBlockSize pe) (Map.insertWith (<>) h [e]) - -addBlockReadyEventNotify :: forall e m . (MonadIO m) - => PeerEvents e m - -> Hash HbSync - -> OnBlockReady HbSync m - -> m () - -addBlockReadyEventNotify pe h e = do - void $ liftIO $ atomically $ modifyTVar' (onBlockReady pe) (Map.insertWith (<>) h [e]) - -emitBlockSizeEvent :: forall e m . MonadIO m - => PeerEvents e m - -> Hash HbSync - -> (Peer e, Hash HbSync, Maybe Integer) - -> m () - -emitBlockSizeEvent pe h event = do - ev <- liftIO $ atomically $ stateTVar (onBlockSize pe) alter - for_ ev $ \e -> e event - - where - alter m = - let ev = Map.lookup h m - in (mconcat (maybeToList ev), Map.delete h m) - - -emitBlockReadyEvent :: forall e m . MonadIO m - => PeerEvents e m - -> Hash HbSync - -> m () - -emitBlockReadyEvent pe h = do - ev <- liftIO $ atomically $ stateTVar (onBlockReady pe) alter - for_ ev $ \e -> e h - - where - alter m = - let ev = Map.lookup h m - in (mconcat (maybeToList ev), Map.delete h m) - - --- Бежал ридер по ридеру, видит ридер сидит на ридере --- ридер. Схватил ридер ридера за хуй да и выкинул нахуй. --- -newtype PeerM e m a = PeerM { fromPeerM :: ReaderT () (EngineM e m) a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader () - ) - --- instance MonadTrans (PeerM e) where --- lift = lift . lift - - -runPeerM p0 bus f = do - env <- newEnv p0 bus - runEngineM env (runReaderT (fromPeerM f) ()) - - -instance Request Fake (BlockSize Fake) (PeerM e IO) where - request p proto = undefined - - -runFakePeer :: forall e b . ( e ~ Fake - -- , MonadIO m - , Messaging b e ByteString - -- , Monad m - -- , Sessions Fake (BlockSize Fake) - -- , m ~ ResponseM Fake IO - -- , MonadIO m - -- , Response e p m - -- , EngineM e m - ) - => PeerEvents e (EngineM e IO) - -> Peer e - -> b - -> PeerM e IO () - -> IO () - -runFakePeer ev p0 bus work = do - - env <- newEnv p0 bus - - let pid = fromIntegral (hash (env ^. self)) :: Word8 - - dir <- liftIO $ canonicalizePath ( ".peers" show pid) - - let chDir = dir "tmp-chunks" - - liftIO $ createDirectoryIfMissing True dir - - let opts = [ StoragePrefix dir - ] - - storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) - - w <- liftIO $ async $ simpleStorageWorker storage - - cww <- newChunkWriterIO storage (Just chDir) - - cw <- async $ runChunkWriter cww - - let size = 1024*1024 - - let blk = B8.concat [ fromString (take 1 $ show x) - | x <- replicate size (fromIntegral pid :: Int) - ] - - root <- putAsMerkle storage blk - - debug $ "I'm" <+> pretty pid <+> pretty root - - let handleBlockInfo (p, h, sz') = do - maybe1 sz' (pure ()) $ \sz -> do - let bsz = fromIntegral sz - - update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) - lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit - - let adapter = - BlockChunksI - { blkSize = hasBlock storage - , blkChunk = getChunk storage - , blkGetHash = \c -> find (DownloadSessionKey c) (view sBlockHash) - - -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): - -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ - -- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО: - -- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ - -- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ? - -- УДАЛЯЕМ КУКУ? - , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do - - let cKey = DownloadSessionKey (p,c) - - -- check if there is a session - -- FIXME: - -- TODO: log situation when no session - dwnld <- MaybeT $ find cKey id - - let bslen = fromIntegral $ B8.length bs - - let mbSize = view sBlockSize dwnld - let mbChSize = view sBlockChunkSize dwnld - - let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset - - liftIO $ do - writeChunk cww cKey h offset0 bs - - let written = view sBlockWritten dwnld + bslen - let maxOff = max offset0 (view sBlockOffset dwnld) - - lift $ update dwnld cKey ( set sBlockOffset maxOff - . set sBlockWritten written - ) - - let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize - && written >= mbSize - - when mbDone $ lift do - deferred (Proxy @(BlockChunks e)) $ do - h1 <- liftIO $ getHash cww cKey h - - -- ПОСЧИТАТЬ ХЭШ - -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК - -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - when ( h1 == h ) $ do - liftIO $ commitBlock cww cKey h - expire cKey - lift $ runEngineM env $ emitBlockReadyEvent ev h -- TODO: fix this crazy shit - - when (written > mbSize * defBlockDownloadThreshold) $ do - debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p - lift $ expire cKey - -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, - -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ - -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. - -- ТАК НЕ ПОЙДЕТ - -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся - } - - peer <- async $ runPeer env - [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) - , makeResponse (blockChunksProto adapter) - ] - - runPeerM p0 bus work - - simpleStorageStop storage - - stopChunkWriter cww - - mapM_ cancel [w,cw,peer] - - --- TODO: замутить мап/кэш со статистикой по блоку: --- сколько блок там маринуется и т.п. --- Если блок в этом кэше и еще не скачан, то --- ... пробуем качать повторно? --- ... увеличиваем время --- ... если не появилось новых пиров --- ... запоминать, у какого пира уже спрашивали и стараться --- ... спрашивать у других? --- ... для каждого блока - вести список, у кого лучше спрашивать? --- ... и там whilelist, blacklist --- ... не дохрена ли это будет занимать? --- --- ... и тут, короче, еще кэш WiP --- ... и еще один поток, который это всё хэндлит, например: --- ... берём статистику блоков, берём wip --- ... если блок не wip и до сих пор в мапе --- то то добавляем --- ... в очередь. --- --- ... блоку пишем, у каких пиров уже спрашивали (Set) --- ... блоку пишем, когда стартовал процесс --- --- - - -blockDownloadLoop :: forall e . PeerM e IO () -blockDownloadLoop = do - - let who = FakePeer 1 - let blkHash = "" - request who (GetBlockSize @Fake blkHash) - - pure () - - --- blockDownloadLoop :: PeerEvents Fake m -> Peer e -> PeerM () m () -blockDownloadLoop1 ev0 p1 = do - - - let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" - , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - ] - - blkQ <- liftIO $ do - b <- newTBQueueIO defBlockDownloadQ - traverse_ (atomically . TBQ.writeTBQueue b) ini - pure b - - - -- TODO: random shuffle and take X - -- подтягиваем новых пиров откуда можем - -- для каждого блока решаем, откуда брать: - -- shuffle (white-list) <> shuffle (black-list) - -- - -- - let knownPeers = [p1] - - fix \next -> do - - -- Вечно ждём. Это и правильно и неправильно - - blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ - - -- TODO: check is this block is already here - -- maybe emit event to continue -> parse/seek for content - - -- TODO: убивать нотификации, если блок скачан или что-то с ним еще - -- приключилось - -- - -- добавляем сюда экшоны на почистить: - -- добавили нотификацию --- экшон на - -- почистить нотификацию - -- - -- добавили еще какую парашу -- экшон на - -- её очистку - -- - -- у каждого экшона - дедлайн - -- и там процесс, который берёт тех, у кого дедлайн - -- истёк и вызывает их - -- ? - - addBlockReadyEventNotify ev0 blkHash $ \hash -> do - debug $ "DOWNLOADED BLOCK" <+> pretty hash <+> "NOW WHAT?" - - -- ВЫКОВЫРЯТЬ СТОРЕЙДЖ (как?) - -- ЗАСУНУТЫЙ В READER? - - obj <- undefined -- getBlock ss hash - - let mbLink = deserialiseOrFail @AnnotatedHashRef obj - - pure () - - - -- -- TODO: смотрим, что за блок - -- -- если Merkle - то качаем рекурсивно - -- -- если ссылка - то смотрим, что за ссылка - -- -- проверяем пруфы - -- -- качаем рекурсивно - - -- TODO: надо трекать, может блок-то и найден - -- либо по всем пирам спросить - - addBlockSizeEventNotify ev0 blkHash $ \case - (p, h, Just size) -> do - coo <- genCookie (p,blkHash) - let key = DownloadSessionKey (p, coo) - let chusz = defChunkSize - - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral size) - $ newBlockDownload blkHash - - update @Fake new key id - request p (BlockChunks coo (BlockGetAllChunks @Fake blkHash chusz)) -- FIXME: nice construction - - _ -> pure () - - -- TODO: смотрим, может у нас уже есть block-size - -- тогда ловим случайного пира, у которого оно есть - -- и ставим на закачку - - -- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ - for_ knownPeers $ \who -> - request who (GetBlockSize @Fake blkHash) - - next - - - -test1 :: IO () -test1 = do - - hSetBuffering stderr LineBuffering - - fake <- newFakeP2P True - - void $ race (pause (10 :: Timeout 'Seconds)) $ do - - let p0 = 0 :: Peer Fake - let p1 = 1 :: Peer Fake - - ev1 <- newPeerEventsIO - ev0 <- newPeerEventsIO - - p1Thread <- async $ runFakePeer ev1 p1 fake $ forever $ liftIO yield - - p0Thread <- async $ runFakePeer ev0 p0 fake $ do - blockDownloadLoop - - -- blockDownloadLoop ev0 p1 - - let peerz = p0Thread : [p1Thread] - - - pause ( 5 :: Timeout 'Seconds) - - mapM_ cancel peerz - - (_, e) <- waitAnyCatchCancel peerz - - debug (pretty $ show e) - debug "we're done" - assertBool "success" True - exitSuccess - - assertBool "failed" False -