From 41da76483c3594b3cc5330cdbf977f1448ac4221 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 3 Feb 2023 09:44:01 +0300 Subject: [PATCH] peer announces and basic logging --- hbs2-core/hbs2-core.cabal | 7 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 63 +- hbs2-core/lib/HBS2/Defaults.hs | 13 +- hbs2-core/lib/HBS2/Events.hs | 4 +- hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs | 14 +- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 18 + hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 15 +- hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs | 63 ++ hbs2-core/lib/HBS2/Net/Proto/Types.hs | 1 + hbs2-core/lib/HBS2/System/Logger/Simple.hs | 46 ++ hbs2-peer/app/BlockDownload.hs | 14 +- hbs2-peer/app/Logger.hs | 11 - hbs2-peer/app/PeerMain.hs | 101 ++- hbs2-peer/app/RPC.hs | 11 +- hbs2-peer/hbs2-peer.cabal | 2 +- hbs2-tests/hbs2-tests.cabal | 95 +-- hbs2-tests/test/Peer2Main.hs | 699 ------------------ hbs2-tests/test/TestLogger.hs | 20 + 18 files changed, 373 insertions(+), 824 deletions(-) create mode 100644 hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs create mode 100644 hbs2-core/lib/HBS2/System/Logger/Simple.hs delete mode 100644 hbs2-peer/app/Logger.hs delete mode 100644 hbs2-tests/test/Peer2Main.hs create mode 100644 hbs2-tests/test/TestLogger.hs diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 050268f0..52dd641b 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -71,8 +71,8 @@ library , HBS2.Clock , HBS2.Data.Detect , HBS2.Data.Types - , HBS2.Data.Types.Refs , HBS2.Data.Types.Crypto + , HBS2.Data.Types.Refs , HBS2.Defaults , HBS2.Events , HBS2.Hash @@ -89,13 +89,15 @@ library , HBS2.Net.Proto.BlockChunks , HBS2.Net.Proto.BlockInfo , HBS2.Net.Proto.Definition - , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.Peer + , HBS2.Net.Proto.PeerAnnounce + , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.Types , HBS2.OrDie , HBS2.Prelude , HBS2.Prelude.Plated , HBS2.Storage + , HBS2.System.Logger.Simple -- other-modules: @@ -114,6 +116,7 @@ library , cryptonite , deepseq , directory + , fast-logger , filelock , filepath , hashable diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index f0f60bd5..fbd2c305 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,7 +1,7 @@ {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} {-# Language FunctionalDependencies #-} --- {-# Language AllowAmbiguousTypes #-} +{-# Language AllowAmbiguousTypes #-} module HBS2.Actors.Peer where import HBS2.Actors @@ -21,6 +21,7 @@ import Control.Monad.Trans.Maybe import Control.Concurrent.Async import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS import Data.Cache (Cache) import Data.Cache qualified as Cache import Data.Dynamic @@ -39,6 +40,7 @@ import Codec.Serialise (serialise, deserialiseOrFail) import Prettyprinter hiding (pipe) + data AnyStorage = forall zu . (Block ByteString ~ ByteString, Storage zu HbSync ByteString IO) => AnyStorage zu instance (IsKey HbSync, Key HbSync ~ Hash HbSync, Block ByteString ~ ByteString) => Storage AnyStorage HbSync ByteString IO where @@ -66,6 +68,8 @@ data Fabriq e = forall bus . (Messaging bus e (Encoded e)) => Fabriq bus class HasFabriq e m where getFabriq :: m (Fabriq e) +class HasPeerNonce e m where + peerNonce :: m PeerNonce class Messaging (Fabriq e) e (AnyMessage (Encoded e) e) => PeerMessaging e @@ -125,6 +129,7 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) data PeerEnv e = PeerEnv { _envSelf :: Peer e + , _envPeerNonce :: PeerNonce , _envFab :: Fabriq e , _envStorage :: AnyStorage , _envPeerLocator :: AnyPeerLocator e @@ -182,9 +187,8 @@ instance Monad m => HasFabriq e (PeerM e m) where instance Monad m => HasStorage (PeerM e m) where getStorage = asks (view envStorage) --- instance Monad m => HasKeys 'Sign e (PeerM e m) where --- getPrivateKey = asks (view (envCred . peerSignSk)) --- getPublicKey = asks (view (envCred . peerSignPk)) +instance Monad m => HasPeerNonce e (PeerM e m) where + peerNonce = asks (view envPeerNonce) instance ( MonadIO m -- , HasProtocol e p @@ -289,8 +293,7 @@ sweep = do liftIO $ atomically $ modifyTVar' sw (<> HashMap.fromList (mconcat alive)) -instance ( HasProtocol e p - , Typeable (EventKey e p) +instance ( Typeable (EventKey e p) , Typeable (Event e p) , Hashable (EventKey e p) , Eq (EventKey e p) @@ -320,22 +323,41 @@ instance ( HasProtocol e p void $ liftIO $ atomically $ modifyTVar' se (HashMap.insert sk (mconcat pers)) -runPeerM :: forall e m . (MonadIO m, HasPeer e, Ord (Peer e), Pretty (Peer e)) - => AnyStorage - -> Fabriq e - -> Peer e - -> PeerM e m () - -> m () -runPeerM s bus p f = do +newPeerEnv :: forall e m . ( MonadIO m + , HasPeer e + , Ord (Peer e) + , Pretty (Peer e) + , HasNonces () m + ) + => AnyStorage + -> Fabriq e + -> Peer e + -> m (PeerEnv e) + +newPeerEnv s bus p = do pl <- AnyPeerLocator <$> newStaticPeerLocator @e mempty - env <- PeerEnv p bus s pl <$> newPipeline defProtoPipelineSize - <*> liftIO (Cache.newCache (Just defCookieTimeout)) - <*> liftIO (newTVarIO mempty) - <*> liftIO (Cache.newCache (Just defCookieTimeout)) - <*> liftIO (newTVarIO mempty) + nonce <- newNonce @() + + PeerEnv p nonce bus s pl <$> newPipeline defProtoPipelineSize + <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (newTVarIO mempty) + <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (newTVarIO mempty) + +runPeerM :: forall e m . ( MonadIO m + , HasPeer e + , Ord (Peer e) + , Pretty (Peer e) + , HasNonces () m + ) + => PeerEnv e + -> PeerM e m () + -> m () + +runPeerM env f = do let de = view envDeferred env as <- liftIO $ replicateM 8 $ async $ runPipeline de @@ -440,3 +462,8 @@ instance ( MonadIO m instance (Monad m, HasOwnPeer e m) => HasOwnPeer e (ResponseM e m) where ownPeer = lift ownPeer + +instance (Monad m, HasFabriq e m) => HasFabriq e (ResponseM e m) where + getFabriq = lift getFabriq + + diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 775177d9..c392f63c 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -13,11 +13,11 @@ defMessageQueueSize :: Integral a => a defMessageQueueSize = 65536 defBurst :: Integral a => a -defBurst = 64 +defBurst = 16 -- defChunkSize :: Integer defChunkSize :: Integral a => a -defChunkSize = 1024 +defChunkSize = 1200 defBlockSize :: Integer defBlockSize = 256 * 1024 @@ -42,7 +42,7 @@ defProtoPipelineSize :: Int defProtoPipelineSize = 65536*4 defCookieTimeoutSec :: Timeout 'Seconds -defCookieTimeoutSec = 120 +defCookieTimeoutSec = 1200 defCookieTimeout :: TimeSpec defCookieTimeout = toTimeSpec defCookieTimeoutSec @@ -52,14 +52,17 @@ defBlockInfoTimeout = 2 -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 3 :: Timeout 'Seconds +defBlockWaitMax = 5.0 :: Timeout 'Seconds -- how much time wait for block from peer? defChunkWaitMax :: Timeout 'Seconds -defChunkWaitMax = 1 :: Timeout 'Seconds +defChunkWaitMax = 1.0 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds defSweepTimeout = 30 -- FIXME: only for debug! +-- FIXME: debug only! +defPeerAnnounceTime :: Timeout 'Seconds +defPeerAnnounceTime = 30 diff --git a/hbs2-core/lib/HBS2/Events.hs b/hbs2-core/lib/HBS2/Events.hs index 6d7c678c..c64cee9f 100644 --- a/hbs2-core/lib/HBS2/Events.hs +++ b/hbs2-core/lib/HBS2/Events.hs @@ -33,10 +33,10 @@ data family Event e a :: Type type EventHandler e a m = Event e a -> m () -class Monad m => EventListener e a m | a -> e where +class Monad m => EventListener e a m where subscribe :: EventKey e a -> EventHandler e a m -> m () -class Monad m => EventEmitter e a m | a -> e where +class Monad m => EventEmitter e a m where emit :: EventKey e a -> Event e a -> m () class EventType a where diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs index e00b024e..cfa58566 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs @@ -1,4 +1,5 @@ {-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} module HBS2.Net.Proto.BlockAnnounce where import HBS2.Prelude.Plated @@ -38,10 +39,10 @@ instance Serialise BlockInfoNonce instance Serialise (BlockAnnounceInfo e) -newtype BlockAnnounce e = BlockAnnounce (BlockAnnounceInfo e) - deriving stock (Eq,Generic,Show) +data BlockAnnounce e = BlockAnnounce PeerNonce (BlockAnnounceInfo e) + deriving stock (Generic) -instance Serialise (BlockAnnounce e) +instance Serialise PeerNonce => Serialise (BlockAnnounce e) makeLenses ''BlockAnnounceInfo @@ -53,16 +54,16 @@ blockAnnounceProto :: forall e m . ( MonadIO m ) => BlockAnnounce e -> m () blockAnnounceProto = \case - BlockAnnounce info -> do + BlockAnnounce n info -> do that <- thatPeer (Proxy @(BlockAnnounce e)) - emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info) + emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info n) data instance EventKey e (BlockAnnounce e) = BlockAnnounceInfoKey deriving stock (Typeable, Eq,Generic) data instance Event e (BlockAnnounce e) = - BlockAnnounceEvent (Peer e) (BlockAnnounceInfo e) + BlockAnnounceEvent (Peer e) (BlockAnnounceInfo e) PeerNonce deriving stock (Typeable) instance Typeable (BlockAnnounceInfo e) => Hashable (EventKey e (BlockAnnounce e)) where @@ -74,4 +75,3 @@ instance EventType ( Event e ( BlockAnnounce e) ) where isPersistent = True - diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index eac3b079..4feace58 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -14,6 +14,7 @@ import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerAnnounce import HBS2.Defaults import Data.Functor @@ -25,6 +26,7 @@ import Crypto.Saltine.Core.Box qualified as Crypto import Crypto.Saltine.Class qualified as Crypto import Crypto.Saltine.Core.Sign qualified as Sign + type instance PubKey 'Sign e = Sign.PublicKey type instance PrivKey 'Sign e = Sign.SecretKey @@ -57,6 +59,13 @@ instance HasProtocol UDP (PeerHandshake UDP) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +instance HasProtocol UDP (PeerAnnounce UDP) where + type instance ProtocolId (PeerAnnounce UDP) = 5 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + + instance Expires (SessionKey UDP (BlockInfo UDP)) where expiresIn _ = Just defCookieTimeoutSec @@ -75,12 +84,21 @@ instance Expires (SessionKey UDP (KnownPeer UDP)) where instance Expires (SessionKey UDP (PeerHandshake UDP)) where expiresIn _ = Just 10 +instance Expires (EventKey UDP (PeerAnnounce UDP)) where + expiresIn _ = Nothing + + instance MonadIO m => HasNonces (PeerHandshake UDP) m where type instance Nonce (PeerHandshake UDP) = BS.ByteString newNonce = do n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 32 n +instance MonadIO m => HasNonces () m where + type instance Nonce () = BS.ByteString + newNonce = do + n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) + pure $ BS.take 32 n instance Serialise Sign.Signature diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index 416a2c5d..a6d9e90b 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -5,15 +5,13 @@ module HBS2.Net.Proto.Peer where import HBS2.Base58 import HBS2.Data.Types import HBS2.Events -import HBS2.Net.Auth.Credentials -import HBS2.Net.PeerLocator import HBS2.Net.Proto import HBS2.Clock import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated +import Data.Maybe import Codec.Serialise() -import Data.ByteString.Lazy (ByteString) import Data.ByteString qualified as BS import Data.Hashable import Lens.Micro.Platform @@ -32,9 +30,6 @@ newtype PeerData e = makeLenses 'PeerData -newtype PeerAnnounce e = PeerAnnounce (PeerData e) - deriving stock (Generic) - data PeerHandshake e = PeerPing PingNonce | PeerPong (Signature e) (PeerData e) @@ -74,6 +69,7 @@ sendPing pip = do peerHandShakeProto :: forall e m . ( MonadIO m , Response e (PeerHandshake e) m + , Request e (PeerHandshake e) m , Sessions e (PeerHandshake e) m , Sessions e (KnownPeer e) m , HasNonces (PeerHandshake e) m @@ -99,6 +95,13 @@ peerHandShakeProto = -- TODO: отправить обратно вместе с публичным ключом response (PeerPong @e sign (PeerData (view peerSignPk creds))) + -- TODO: да и пингануть того самим + + se <- find (KnownPeerKey pip) id <&> isJust + + unless se $ do + sendPing pip + PeerPong sign d -> do pip <- thatPeer proto diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs new file mode 100644 index 00000000..95067372 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerAnnounce.hs @@ -0,0 +1,63 @@ +{-# Language UndecidableInstances #-} +module HBS2.Net.Proto.PeerAnnounce where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Events + +import Type.Reflection (someTypeRep) +import Data.Hashable +import Codec.Serialise (Serialise) + +-- This subprotocol is assumed to work with a +-- multicast address for local peer discovery. +-- +-- For single cast case seems that PeerHandshake +-- subprotocol is sufficient: +-- peer Bob pings peer Alice, +-- now both of them know each other. +-- +-- For making life easier in a local network, +-- we introduce PeerAnnounce subprotocol. +-- +-- The idea is following: +-- Peer sends PeerAnnounce to a multicast address, +-- all available peers send their pings and now +-- they all know this peer. +-- + +newtype PeerAnnounce e = + PeerAnnounce PeerNonce + deriving stock (Typeable, Generic) + + +peerAnnounceProto :: forall e m . ( MonadIO m + , EventEmitter e (PeerAnnounce e) m + , Response e (PeerAnnounce e) m + ) => PeerAnnounce e -> m () +peerAnnounceProto = + \case + PeerAnnounce nonce -> do + who <- thatPeer (Proxy @(PeerAnnounce e)) + emit @e PeerAnnounceEventKey (PeerAnnounceEvent who nonce) + + +data instance EventKey e (PeerAnnounce e) = + PeerAnnounceEventKey + deriving stock (Typeable, Eq,Generic) + +data instance Event e (PeerAnnounce e) = + PeerAnnounceEvent (Peer e) PeerNonce + deriving stock (Typeable) + +instance Typeable (PeerAnnounce e) => Hashable (EventKey e (PeerAnnounce e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(PeerAnnounce e) + +instance EventType ( Event e ( PeerAnnounce e) ) where + isPersistent = True + + +instance Serialise PeerNonce => Serialise (PeerAnnounce e) + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 33344b76..3a906027 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -47,6 +47,7 @@ class HasCookie e p | p -> e where getCookie :: p -> Maybe (Cookie e) getCookie = const Nothing +type PeerNonce = Nonce () data PeerCredentials e = PeerCredentials diff --git a/hbs2-core/lib/HBS2/System/Logger/Simple.hs b/hbs2-core/lib/HBS2/System/Logger/Simple.hs new file mode 100644 index 00000000..688f453f --- /dev/null +++ b/hbs2-core/lib/HBS2/System/Logger/Simple.hs @@ -0,0 +1,46 @@ +{-# Language UndecidableInstances #-} +module HBS2.System.Logger.Simple + ( withSimpleLogger + , debug + ) where + +import Control.Monad +import Data.Foldable +import Control.Monad.IO.Class +import System.Log.FastLogger +import System.Log.FastLogger.LoggerSet +import Data.IORef +import System.IO.Unsafe +import Prettyprinter + +loggers :: IORef (Maybe LoggerSet) +loggers = unsafePerformIO (newIORef Nothing) +{-# NOINLINE loggers #-} + + +withSimpleLogger :: IO () -> IO () +withSimpleLogger program = do + set <- newStdoutLoggerSet 10000 + void $ atomicModifyIORef' loggers $ \case + Nothing -> (Just set, Just set) + Just s -> (Just s, Just s) + program + withLogger flushLogStr + +withLogger :: MonadIO m => (LoggerSet -> m b) -> m () +withLogger f = do + lo <- liftIO $ readIORef loggers + forM_ lo f + +debug :: (MonadIO m, ToLogStr a) => a -> m () +debug s = do + liftIO $ withLogger $ \set -> pushLogStrLn set (toLogStr s) + + +instance {-# OVERLAPPABLE #-} Pretty a => ToLogStr a where + toLogStr p = toLogStr (show (pretty p)) + + +instance {-# OVERLAPPABLE #-} ToLogStr (Doc ann) where + toLogStr p = toLogStr (show p) + diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index c86a3389..c41a3275 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -17,9 +17,9 @@ import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated import HBS2.Storage +import HBS2.System.Logger.Simple import PeerInfo -import Logger import Data.Foldable hiding (find) import Control.Concurrent.Async @@ -344,8 +344,8 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , Block ByteString ~ ByteString , PeerMessaging e ) - => m () -blockDownloadLoop = do + => DownloadEnv e -> m () +blockDownloadLoop env0 = do e <- ask stor <- getStorage @@ -361,19 +361,21 @@ blockDownloadLoop = do npi <- newPeerInfo + debug $ "known peers" <+> pretty pee + for_ pee $ \p -> do pinfo <- fetch True npi (PeerInfoKey p) id burst <- liftIO $ readTVarIO (view peerBurst pinfo) debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst pure () - runDownloadM @e $ do + withDownload env0 do env <- ask let again h = do debug $ "block fucked: " <+> pretty h - withPeerM e $ withDownload env (addDownload h) + withPeerM e $ withDownload env (processBlock h) mapM_ processBlock blks @@ -390,7 +392,7 @@ blockDownloadLoop = do liftIO $ race ( pause defBlockWaitMax >> again h ) do withPeerM e $ withDownload env $ do -- NOTE: really crazy shit - withFreePeer p (addDownload h >> pause (0.1 :: Timeout 'Seconds)) do + withFreePeer p (processBlock h >> pause (0.1 :: Timeout 'Seconds)) do downloadFromWithPeer p h next diff --git a/hbs2-peer/app/Logger.hs b/hbs2-peer/app/Logger.hs deleted file mode 100644 index 0cc3efdc..00000000 --- a/hbs2-peer/app/Logger.hs +++ /dev/null @@ -1,11 +0,0 @@ -module Logger where - -import HBS2.Prelude - -import System.IO -import Prettyprinter - -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - - diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index d7819178..ff6ee440 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -17,14 +17,18 @@ import HBS2.Net.PeerLocator import HBS2.Net.Proto import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerAnnounce import HBS2.Net.Proto.Sessions import HBS2.OrDie import HBS2.Prelude.Plated import HBS2.Storage.Simple +import HBS2.System.Logger.Simple import RPC import BlockDownload +import Data.Maybe +import Crypto.Saltine (sodiumInit) import Data.Function import Control.Concurrent.Async import Control.Concurrent.STM @@ -42,9 +46,6 @@ import System.Directory import System.Exit import System.IO -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - defStorageThreads :: Integral a => a defStorageThreads = 4 @@ -61,6 +62,7 @@ data RPCCommand = POKE | ANNOUNCE (Hash HbSync) | PING (PeerAddr UDP) + | CHECK PeerNonce (PeerAddr UDP) (Hash HbSync) data PeerOpts = PeerOpts @@ -186,9 +188,18 @@ instance ( Monad m response = lift . response -runPeer :: () => PeerOpts -> IO () -runPeer opts = Exception.handle myException $ do +-- FIXME: Нормальные синхронизированные логи. Можно даже цветные. +-- Ориентированные на Prettyprinter. +-- Без лишнего мусора. + +-- FIXME: Убрать хардкод UDP отовсюду ниже. +-- Вынести в сигнатуру. + +runPeer :: PeerOpts -> IO () +runPeer opts = Exception.handle myException $ withSimpleLogger do + + sodiumInit rpcQ <- newTQueueIO @RPCCommand @@ -217,6 +228,8 @@ runPeer opts = Exception.handle myException $ do `orDie` "assertion: localMulticastPeer not set" + debug $ pretty localMulticast + mess <- newMessagingUDP False (Just (view listenOn opts)) `orDie` "unable listen on the given addr" @@ -235,22 +248,39 @@ runPeer opts = Exception.handle myException $ do messMcast <- async $ runMessagingUDP mcast `catch` (\(e::SomeException) -> throwIO e ) + denv <- newDownloadEnv + + penv <- newPeerEnv (AnyStorage s) (Fabriq mess) (getOwnPeer mess) + loop <- async do - runPeerM (AnyStorage s) (Fabriq mess) (getOwnPeer mess) $ do + runPeerM penv $ do adapter <- mkAdapter env <- ask + pnonce <- peerNonce @UDP + pl <- getPeerLocator @UDP addPeers @UDP pl ps + subscribe @UDP PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent pip nonce) -> do + unless (nonce == pnonce) $ do + debug $ "Got peer announce!" <+> pretty pip + known <- find (KnownPeerKey pip) id <&> isJust + unless known $ sendPing pip + subscribe @UDP KnownPeerEventKey $ \(KnownPeerEvent p d) -> do addPeers pl [p] debug $ "Got authorized peer!" <+> pretty p <+> pretty (AsBase58 (view peerSignKey d)) - as <- liftIO $ async $ withPeerM env blockDownloadLoop + void $ liftIO $ async $ withPeerM env $ forever $ do + pause defPeerAnnounceTime -- FIXME: setting! + debug "sending local peer announce" + request localMulticast (PeerAnnounce @UDP pnonce) + + as <- liftIO $ async $ withPeerM env (blockDownloadLoop denv) rpc <- liftIO $ async $ withPeerM env $ forever $ do cmd <- liftIO $ atomically $ readTQueue rpcQ @@ -269,7 +299,32 @@ runPeer opts = Exception.handle myException $ do maybe1 mbsize (pure ()) $ \size -> do let ann = BlockAnnounceInfo 0 NoBlockInfoMeta size h - request localMulticast (BlockAnnounce @UDP ann) + no <- peerNonce @UDP + request localMulticast (BlockAnnounce @UDP no ann) + + CHECK nonce pa h -> do + pip <- fromPeerAddr @UDP pa + + n1 <- peerNonce @UDP + + unless (nonce == n1) do + + peer <- find @UDP (KnownPeerKey pip) id + + debug $ "received announce from" + <+> pretty pip + <+> pretty h + + case peer of + Nothing -> sendPing @UDP pip + Just{} -> do + debug "announce from a known peer" + debug "preparing to dowload shit" + debug "checking policy, blah-blah-blah. tomorrow" + + withDownload denv $ do + processBlock h + me <- liftIO $ async $ withPeerM env $ do runProto @UDP @@ -300,17 +355,24 @@ runPeer opts = Exception.handle myException $ do [ makeResponse (rpcHandler arpc) ] - ann <- async $ runPeerM (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) $ do + menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) + + ann <- async $ runPeerM menv $ do self <- ownPeer @UDP - subscribe @UDP BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi) -> do + subscribe @UDP BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do unless (p == self) do - debug $ "announce" <+> pretty p - <+> pretty (view biHash bi) + pa <- toPeerAddr p + liftIO $ atomically $ writeTQueue rpcQ (CHECK no pa (view biHash bi)) + + subscribe @UDP PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent pip nonce) -> do + -- debug $ "Got peer announce!" <+> pretty pip + emitToPeer penv PeerAnnounceEventKey pe runProto @UDP [ makeResponse blockAnnounceProto + , makeResponse peerAnnounceProto ] void $ waitAnyCatchCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast] @@ -318,8 +380,19 @@ runPeer opts = Exception.handle myException $ do simpleStorageStop s + +emitToPeer :: ( MonadIO m + , EventEmitter e a (PeerM e IO) + ) + => PeerEnv e + -> EventKey e a + -> Event e a + -> m () + +emitToPeer env k e = liftIO $ withPeerM env (emit k e) + withRPC :: String -> RPC UDP -> IO () -withRPC saddr cmd = do +withRPC saddr cmd = withSimpleLogger do as <- parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress) let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as @@ -362,3 +435,5 @@ runRpcCommand saddr = \case PING s -> withRPC saddr (RPCPing s) ANNOUNCE h -> withRPC saddr (RPCAnnounce @UDP h) + _ -> pure () + diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 53372de4..e7c42379 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -2,25 +2,16 @@ {-# Language UndecidableInstances #-} module RPC where - import HBS2.Prelude.Plated import HBS2.Net.Proto import HBS2.Hash -import HBS2.Net.Messaging import HBS2.Net.Messaging.UDP import HBS2.Actors.Peer -import HBS2.Defaults -import Logger - -import Control.Concurrent.Async import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) -import Codec.Serialise (serialise, deserialiseOrFail,Serialise) +import Codec.Serialise (serialise,deserialiseOrFail) import Lens.Micro.Platform -import Data.Text (Text) - -import Prettyprinter data RPC e = RPCPoke diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 0b4b43b9..dc3b8249 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -38,6 +38,7 @@ common common-deps , random , random-shuffle , safe + , saltine ^>=0.2.0.1 , serialise , split , stm @@ -102,7 +103,6 @@ executable hbs2-peer other-modules: BlockDownload , PeerInfo - , Logger , RPC -- other-extensions: diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 00be925e..5e7a92ad 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -121,50 +121,6 @@ test-suite test-cw main-is: TestChunkWriter.hs -executable test-peer-run - import: shared-properties - import: common-deps - default-language: Haskell2010 - - ghc-options: - -- -prof - -- -fprof-auto - - other-modules: - - -- other-extensions: - - -- type: exitcode-stdio-1.0 - hs-source-dirs: test - main-is: Peer2Main.hs - - build-depends: - base ^>=4.15.1.0, hbs2-core, hbs2-storage-simple - , async - , bytestring - , cache - , clock - , containers - , data-default - , directory - , filepath - , hashable - , microlens-platform - , mtl - , mwc-random - , prettyprinter - , QuickCheck - , random - , safe - , serialise - , stm - , streaming - , tasty - , tasty-hunit - , transformers - , uniplate - , vector - executable test-udp import: shared-properties import: common-deps @@ -214,3 +170,54 @@ executable test-udp , vector + +executable test-logger + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestLogger.hs + + build-depends: + base ^>=4.15.1.0, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + , fast-logger + + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs deleted file mode 100644 index 951c0d35..00000000 --- a/hbs2-tests/test/Peer2Main.hs +++ /dev/null @@ -1,699 +0,0 @@ -{-# Language TemplateHaskell #-} -{-# Language UndecidableInstances #-} -{-# Language RankNTypes #-} -{-# Language AllowAmbiguousTypes #-} -{-# LANGUAGE MultiWayIf #-} -module Main where - -import HBS2.Prelude -import HBS2.Hash -import HBS2.Actors -import HBS2.Actors.ChunkWriter -import HBS2.Actors.Peer -import HBS2.Clock -import HBS2.Data.Detect -import HBS2.Data.Types -import HBS2.Defaults -import HBS2.Events -import HBS2.Merkle -import HBS2.Net.Messaging.UDP -import HBS2.Net.PeerLocator -import HBS2.Net.Proto -import HBS2.Net.Proto.BlockAnnounce -import HBS2.Net.Proto.BlockChunks -import HBS2.Net.Proto.BlockInfo -import HBS2.Net.Proto.Definition -import HBS2.Net.Proto.Sessions -import HBS2.OrDie -import HBS2.Prelude.Plated -import HBS2.Storage -import HBS2.Storage.Simple - -import Test.Tasty.HUnit - -import System.Random.Shuffle -import Codec.Serialise hiding (encode,decode) -import Control.Concurrent.Async -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue qualified as Q -import Control.Monad.Reader -import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy (ByteString) -import Data.ByteString.Lazy.Char8 qualified as B8 -import Data.Default -import Data.Foldable hiding (find) -import Data.Map (Map) -import Data.Map qualified as Map -import Data.Maybe -import Data.Word -import Lens.Micro.Platform -import Prettyprinter hiding (pipe) -import System.Directory -import System.Exit -import System.FilePath.Posix -import System.IO -import Data.Hashable -import Type.Reflection -import Data.Fixed -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap -import Data.List qualified as List -import Data.List.Split (chunksOf) - -import Data.IntMap (IntMap) -import Data.IntMap qualified as IntMap -import Data.IntSet qualified as IntSet - -import Data.Dynamic - -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - - -calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] -calcBursts bu pieces = go seed - where - seed = fmap (,1) pieces - - go ( (n1,s1) : (n2,s2) : xs ) - | (s1 + s2) <= bu = go ((n1, s1+s2) : xs) - | otherwise = (n1,s1) : go ( (n2,s2) : xs) - - go [x] = [x] - go [] = [] - -type Fake = UDP - -newtype PeerInfo e = - PeerInfo - { _peerBurst :: TVar Int - } - deriving stock (Generic,Typeable) - -makeLenses 'PeerInfo - - -newPeerInfo :: MonadIO m => m (PeerInfo e) -newPeerInfo = liftIO do - PeerInfo <$> newTVarIO defBurst - - -type instance SessionData e (PeerInfo e) = PeerInfo e - -newtype instance SessionKey e (PeerInfo e) = - PeerInfoKey (Peer e) - -deriving newtype instance Hashable (SessionKey Fake (PeerInfo Fake)) -deriving stock instance Eq (SessionKey Fake (PeerInfo Fake)) - -instance Expires (SessionKey Fake (PeerInfo Fake)) where - expiresIn = const (Just 600) - -data BlockDownload = - BlockDownload - { _sBlockHash :: Hash HbSync - , _sBlockSize :: Size - , _sBlockChunkSize :: ChunkSize - , _sBlockChunks :: TQueue (ChunkNum, ByteString) - } - deriving stock (Typeable) - -makeLenses 'BlockDownload - -newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload -newBlockDownload h = do - BlockDownload h 0 0 <$> liftIO newTQueueIO - - -type instance SessionData e (BlockInfo e) = BlockSizeSession e -type instance SessionData e (BlockChunks e) = BlockDownload - -newtype instance SessionKey e (BlockChunks e) = - DownloadSessionKey (Peer e, Cookie e) - deriving stock (Generic,Typeable) - -newtype BlockSizeSession e = - BlockSizeSession - { _bsBlockSizes :: Map (Peer e) Size - } - -makeLenses 'BlockSizeSession - -instance Ord (Peer e) => Default (BlockSizeSession e) where - def = BlockSizeSession mempty - -deriving stock instance Show (BlockSizeSession Fake) - -deriving newtype instance Hashable (SessionKey Fake (BlockChunks Fake)) -deriving stock instance Eq (SessionKey Fake (BlockChunks Fake)) - -runTestPeer :: (Key HbSync ~ Hash HbSync, Storage (SimpleStorage HbSync) HbSync ByteString (ResponseM Fake (PeerM Fake IO))) - => MessagingUDP - -> Peer Fake - -> (SimpleStorage HbSync -> ChunkWriter HbSync IO -> IO ()) - -> IO () - -runTestPeer mess p zu = do - - dir <- liftIO $ canonicalizePath ( ".peers" show (pretty (AsFileName p))) - let chDir = dir "tmp-chunks" - liftIO $ createDirectoryIfMissing True dir - - let opts = [ StoragePrefix dir - ] - - udp <- async $ runMessagingUDP mess - stor <- simpleStorageInit opts - cww <- newChunkWriterIO stor (Just chDir) - - sw <- liftIO $ replicateM 8 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww - - zu stor cww - - simpleStorageStop stor - stopChunkWriter cww - - mapM_ cancel $ sw <> cw <> [udp] - - -handleBlockInfo :: forall e m . ( MonadIO m - , Sessions e (BlockInfo e) m - , Default (SessionData e (BlockInfo e)) - , Ord (Peer e) - , Pretty (Peer e) - -- , EventEmitter e (BlockSize e) m - ) - - => (Peer e, Hash HbSync, Maybe Integer) - -> m () - -handleBlockInfo (p, h, sz') = do - maybe1 sz' (pure ()) $ \sz -> do - let bsz = fromIntegral sz - update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) - - -data Stats e = - Stats - { _blkNum :: !Int - , _blkNumLast :: !Int - , _timeLast :: !TimeSpec - } - deriving stock (Typeable,Generic) - -makeLenses 'Stats - -instance Default (Stats e) where - def = Stats 0 0 0 - -newStatsIO :: MonadIO m => m (Stats e) -newStatsIO = pure $ Stats 0 0 0 - -type instance SessionData e (Stats e) = Stats e - -instance Serialise TimeSpec -instance Serialise (Stats e) - -data instance SessionKey e (Stats e) = StatsKey - deriving stock (Typeable,Eq) - -instance Typeable (SessionKey e (Stats e)) => Hashable (SessionKey e (Stats e)) where - hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) - where - p = Proxy @(SessionKey e (Stats e)) - - -newtype Speed = Speed (Fixed E1) - deriving newtype (Ord, Eq, Num, Real, Fractional, Show) - -instance Pretty Speed where - pretty (Speed n) = pretty (show n) - - -updateStats :: forall e m . (MonadIO m, Sessions e (Stats e) m) - => Bool -> Int -> m (Stats e) - -updateStats updTime blknum = do - de <- newStatsIO - stats <- fetch @e True de StatsKey id - - t <- if updTime then do - liftIO $ getTime Monotonic - else - pure (view timeLast stats) - - let blkNumNew = view blkNum stats + blknum - - let blast = if updTime then - blkNumNew - else - view blkNumLast stats - - let newStats = set blkNum blkNumNew - . set timeLast t - . set blkNumLast blast - $ stats - - update @e de StatsKey (const newStats) - - pure newStats - -data DownloadEnv e = - DownloadEnv - { _downloadQ :: TQueue (Hash HbSync) - , _peerBusy :: TVar (HashMap (Peer e) ()) - } - -makeLenses 'DownloadEnv - -class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e -instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e - -newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) -newDownloadEnv = liftIO do - DownloadEnv <$> newTQueueIO - <*> newTVarIO mempty - -newtype BlockDownloadM e m a = - BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (DownloadEnv e) - , MonadTrans - ) - -runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a -runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv - -withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a -withDownload e m = runReaderT ( fromBlockDownloadM m ) e - -addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -addDownload h = do - q <- asks (view downloadQ) - liftIO $ atomically $ writeTQueue q h - -- debug $ "addDownload" <+> pretty h - -- pause ( 0.25 :: Timeout 'Seconds ) - -withFreePeer :: (MyPeer e, MonadIO m) - => Peer e - -> BlockDownloadM e m () - -> BlockDownloadM e m () - -> BlockDownloadM e m () - -withFreePeer p n m = do - busy <- asks (view peerBusy) - avail <- liftIO $ atomically - $ stateTVar busy $ - \s -> case HashMap.lookup p s of - Nothing -> (True, HashMap.insert p () s) - Just{} -> (False, s) - if not avail - then n - else do - r <- m - liftIO $ atomically $ modifyTVar busy $ HashMap.delete p - pure r - -getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) -getBlockForDownload = do - q <- asks (view downloadQ) - liftIO $ atomically $ readTQueue q - -processBlock :: forall e m . ( MonadIO m - , HasStorage m - , Block ByteString ~ ByteString - ) - => Hash HbSync - -> BlockDownloadM e m () - -processBlock h = do - - sto <- lift getStorage - - bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) - - case bt of - Nothing -> addDownload h - - Just (AnnRef{}) -> pure () - - Just (Merkle{}) -> do - debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h - walkMerkle h (liftIO . getBlock sto) $ \(hr :: [HashRef]) -> do - - for_ hr $ \(HashRef blk) -> do - - -- debug $ pretty blk - - here <- liftIO (hasBlock sto blk) <&> isJust - - if here then do - debug $ "block" <+> pretty blk <+> "is already here" - pure () -- we don't need to recurse, cause walkMerkle is recursing for us - - else - addDownload blk - - Just (Blob{}) -> do - pure () - - -downloadFromWithPeer :: forall e m . ( MyPeer e - , MonadIO m - , Request e (BlockInfo e) m - , Request e (BlockChunks e) m - , MonadReader (PeerEnv e ) m - , PeerMessaging e - , HasProtocol e (BlockInfo e) - , EventListener e (BlockInfo e) m - , EventListener e (BlockChunks e) m - , Sessions e (BlockChunks e) m - , Sessions e (PeerInfo e) m - , Block ByteString ~ ByteString - , HasStorage m - ) - => Peer e - -> Hash HbSync - -> BlockDownloadM e m () -downloadFromWithPeer peer h = do - - - npi <- newPeerInfo - pinfo <- lift $ fetch True npi (PeerInfoKey peer) id - - waitSize <- liftIO $ newTBQueueIO 1 - - lift $ do - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do - when ( p1 == peer ) $ do - liftIO $ atomically $ writeTBQueue waitSize s - - request @e peer (GetBlockSize @e h) - - esize <- liftIO $ race ( pause defBlockInfoTimeout ) do -- FIXME: block size wait time - atomically $ readTBQueue waitSize - - let mbSize = either (const Nothing) Just esize - - sto <- lift $ getStorage - - case mbSize of - Nothing -> void $ addDownload h - Just thisBkSize -> do - - coo <- genCookie (peer,h) - let key = DownloadSessionKey (peer, coo) - let chusz = defChunkSize - dnwld <- newBlockDownload h - let chuQ = view sBlockChunks dnwld - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral thisBkSize) - $ dnwld - - lift $ update @e new key id - - let burstSizeT = view peerBurst pinfo - - burstSize <- liftIO $ readTVarIO burstSizeT - - let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)] - - let chunkNums = [ 0 .. pred (length offsets) ] - - let bursts = calcBursts burstSize chunkNums - - -- debug $ "bursts: " <+> pretty bursts - - r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) - rq <- liftIO newTQueueIO - - for_ bursts $ liftIO . atomically . writeTQueue rq - - fix \next -> do - burst <- liftIO $ atomically $ tryReadTQueue rq - - case burst of - - Just (i,chunksN) -> do - let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) - lift $ request peer (BlockChunks @e coo req) - - -- TODO: here wait for all requested chunks! - -- FIXME: it may blocks forever, so must be timeout and retry - - catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty ) - ( replicateM chunksN - $ atomically - $ readTQueue chuQ ) - - ) - when (null catched) $ do - - -- nerfing peer burst size. - -- FIXME: we need a thread that will be reset them again - - newBurst <- liftIO $ atomically - $ stateTVar burstSizeT $ \c -> let v = max 1 (c `div` 2) - in (v,v) - - let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] - - debug $ "new burst: " <+> pretty newBurst - debug $ "missed chunks for request" <+> pretty (i,chunksN) - - for_ chuchu $ liftIO . atomically . writeTQueue rq - - for_ catched $ \(num,bs) -> do - liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) - - next - - Nothing -> do - - sz <- liftIO $ readTVarIO r <&> IntMap.size - - if sz == length offsets then do - pieces <- liftIO $ readTVarIO r <&> IntMap.elems - let block = mconcat pieces - let h1 = hashObject @HbSync block - - if h1 == h then do - -- debug "PROCESS BLOCK" - lift $ expire @e key - void $ liftIO $ putBlock sto block - void $ processBlock h - else do - debug "HASH NOT MATCH" - debug "MAYBE THAT PEER IS JERK" - - else do - debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS" - got <- liftIO $ readTVarIO r <&> IntMap.keysSet - let need = IntSet.fromList (fmap fromIntegral chunkNums) - - let missed = IntSet.toList $ need `IntSet.difference` got - - -- normally this should not happen - -- however, let's try do download the tails - -- by one chunk a time - for_ missed $ \n -> do - liftIO $ atomically $ writeTQueue rq (n,1) - - -instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where - getPeerLocator = lift getPeerLocator - -blockDownloadLoop :: forall e m . ( m ~ PeerM e IO - , MonadIO m - , Request e (BlockInfo e) m - , Request e (BlockAnnounce e) m - , HasProtocol e (BlockInfo e) - , HasProtocol e (BlockAnnounce e) - , HasProtocol e (BlockChunks e) - , EventListener e (BlockInfo e) m - , EventListener e (BlockChunks e) m - , EventListener e (BlockAnnounce e) m - , EventEmitter e (BlockChunks e) m - , Sessions e (BlockInfo e) m - , Sessions e (BlockChunks e) m - , Sessions e (PeerInfo e) m - , PeerSessionKey e (PeerInfo e) - , Typeable (SessionKey e (BlockChunks e)) - , Typeable (SessionKey e (BlockInfo e)) - , HasStorage m - , Pretty (Peer e) - , Block ByteString ~ ByteString - , PeerMessaging e - ) - => m () -blockDownloadLoop = do - - e <- ask - stor <- getStorage - - let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" - , "ECWYwWXiLgNvCkN1EFpSYqsPcWfnL4bAQADsyZgy1Cbr" - ] - - pl <- getPeerLocator @e - - -- TODO: peer info loop - void $ liftIO $ async $ forever $ withPeerM e $ do - pause @'Seconds 20 - pee <- knownPeers @e pl - - npi <- newPeerInfo - - for_ pee $ \p -> do - pinfo <- fetch True npi (PeerInfoKey p) id - burst <- liftIO $ readTVarIO (view peerBurst pinfo) - debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst - pure () - - runDownloadM @e $ do - - env <- ask - - let again h = do - debug $ "block fucked: " <+> pretty h - withPeerM e $ withDownload env (addDownload h) - - mapM_ processBlock blks - - fix \next -> do - - h <- getBlockForDownload - - here <- liftIO $ hasBlock stor h <&> isJust - - unless here do - - void $ runMaybeT $ do - p <- MaybeT $ knownPeers @e pl >>= liftIO . shuffleM <&> headMay - - liftIO $ race ( pause defBlockWaitMax >> again h ) do - withPeerM e $ withDownload env $ do -- NOTE: really crazy shit - withFreePeer p (addDownload h >> pause (0.1 :: Timeout 'Seconds)) do - downloadFromWithPeer p h - - next - - --- NOTE: this is an adapter for a ResponseM monad --- because response is working in ResponseM monad (ha!) --- So don't be confused with types --- -mkAdapter :: forall e m . ( m ~ PeerM e IO - , HasProtocol e (BlockChunks e) - , Hashable (SessionKey e (BlockChunks e)) - , Sessions e (BlockChunks e) (ResponseM e m) - , Typeable (SessionKey e (BlockChunks e)) - , Default (SessionData e (Stats e)) - , EventEmitter e (BlockChunks e) m - , Pretty (Peer e) - , Block ByteString ~ ByteString - ) - => m (BlockChunksI e (ResponseM e m )) -mkAdapter = do - storage <- getStorage - pure $ - BlockChunksI - { blkSize = liftIO . hasBlock storage - , blkChunk = \h o s -> liftIO (getChunk storage h o s) - , blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash) - - , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do - - -- debug "AAAA!" - - let cKey = DownloadSessionKey (p,c) - - -- check if there is a session - -- FIXME: - -- TODO: log situation when no session - - ddd <- lift $ find cKey id - - when (isNothing ddd) $ do - debug "SESSION NOT FOUND!" - - dwnld <- MaybeT $ find cKey (view sBlockChunks) - - liftIO $ atomically $ writeTQueue dwnld (n, bs) - } - - - -main :: IO () -main = do - hSetBuffering stderr LineBuffering - - void $ race (pause (600 :: Timeout 'Seconds)) $ do - - -- fake <- newFakeP2P True <&> Fabriq - - udp0 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" - udp1 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" - - let (p0:ps) = [getOwnPeer udp0, getOwnPeer udp1] - - -- others - others <- forM ps $ \p -> async $ runTestPeer udp1 p $ \s cw -> do - let findBlk = hasBlock s - - runPeerM (AnyStorage s) (Fabriq udp1) p $ do - adapter <- mkAdapter - - runProto @Fake - [ makeResponse (blockSizeProto findBlk dontHandle) - , makeResponse (blockChunksProto adapter) - , makeResponse blockAnnounceProto - ] - - our <- async $ runTestPeer udp0 p0 $ \s cw -> do - let blk = hasBlock s - - -- void $ async $ forever $ do - -- pause ( 1 :: Timeout 'Seconds ) - -- wip <- blocksInProcess cw - -- debug $ "blocks wip:" <+> pretty wip - - runPeerM (AnyStorage s) (Fabriq udp0) p0 $ do - adapter <- mkAdapter - env <- ask - - pl <- getPeerLocator @Fake - - addPeers @Fake pl ps - - as <- liftIO $ async $ withPeerM env blockDownloadLoop - - me <- liftIO $ replicateM 1 $ async $ liftIO $ withPeerM env $ do - runProto @Fake - [ makeResponse (blockSizeProto blk handleBlockInfo) - , makeResponse (blockChunksProto adapter) - , makeResponse blockAnnounceProto - ] - - liftIO $ mapM_ wait me - - liftIO $ cancel as - - pause ( 599.9 :: Timeout 'Seconds ) - - mapM_ cancel (our:others) - - (_, e) <- waitAnyCatchCancel (our:others) - - debug (pretty $ show e) - debug "we're done" - assertBool "success" True - exitSuccess - - assertBool "failed" False - - diff --git a/hbs2-tests/test/TestLogger.hs b/hbs2-tests/test/TestLogger.hs new file mode 100644 index 00000000..35f78781 --- /dev/null +++ b/hbs2-tests/test/TestLogger.hs @@ -0,0 +1,20 @@ +module Main where + +import HBS2.System.Logger.Simple + +import Control.Monad +import Control.Concurrent.Async + +import System.Log.FastLogger +import Prettyprinter + +main :: IO () +main = do + withSimpleLogger do + replicateConcurrently_ 1000 do + debug $ "DEBUG" <+> pretty 1000 + + + + +