diff --git a/README.md b/README.md new file mode 100644 index 00000000..6a929d0f --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ + + + +## How to launch a peer + +Example: +``` + +hbs2-peer run -p .peers/1 -k .peers/1/key -l addr:port -r rpcaddr:rpcport + +``` + diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index c392f63c..97dc07fd 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -13,7 +13,10 @@ defMessageQueueSize :: Integral a => a defMessageQueueSize = 65536 defBurst :: Integral a => a -defBurst = 16 +defBurst = 4 + +defBurstMax :: Integral a => a +defBurstMax = 256 -- defChunkSize :: Integer defChunkSize :: Integral a => a @@ -47,6 +50,9 @@ defCookieTimeoutSec = 1200 defCookieTimeout :: TimeSpec defCookieTimeout = toTimeSpec defCookieTimeoutSec +defBlockWipTimeout :: TimeSpec +defBlockWipTimeout = toTimeSpec defCookieTimeoutSec + defBlockInfoTimeout :: Timeout 'Seconds defBlockInfoTimeout = 2 diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 5b6c1a37..6111f77e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -44,7 +44,6 @@ blockSizeProto getBlockSize evHasBlock = emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz)) evHasBlock ( that, h, Just sz ) - newtype instance SessionKey e (BlockInfo e) = BlockSizeKey (Hash HbSync) deriving stock (Typeable,Eq,Show) diff --git a/hbs2-core/lib/HBS2/System/Logger/Simple.hs b/hbs2-core/lib/HBS2/System/Logger/Simple.hs index 499640d8..ee4b47e0 100644 --- a/hbs2-core/lib/HBS2/System/Logger/Simple.hs +++ b/hbs2-core/lib/HBS2/System/Logger/Simple.hs @@ -12,7 +12,7 @@ module HBS2.System.Logger.Simple , notice , info , setLogging - , asIs + , defLog , loggerTr , module HBS2.System.Logger.Simple.Class ) where @@ -39,8 +39,8 @@ data LoggerEntry = makeLenses 'LoggerEntry -asIs :: a -> a -asIs = id +defLog :: a -> a +defLog = id {-# OPTIONS_GHC -fno-cse #-} {-# NOINLINE loggers #-} diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index c41a3275..02560827 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -21,18 +21,22 @@ import HBS2.System.Logger.Simple import PeerInfo -import Data.Foldable hiding (find) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Foldable hiding (find) import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.IntMap (IntMap) import Data.IntMap qualified as IntMap import Data.IntSet qualified as IntSet import Data.Maybe +import Data.Set qualified as Set +import Data.Set (Set) import Lens.Micro.Platform import Prettyprinter import System.Random.Shuffle @@ -50,7 +54,6 @@ calcBursts bu pieces = go seed go [x] = [x] go [] = [] - data BlockDownload = BlockDownload { _sBlockHash :: Hash HbSync @@ -74,11 +77,16 @@ newtype instance SessionKey e (BlockChunks e) = deriving stock (Generic,Typeable) +-- data MyBlkInfo e = +-- MyBlkInfo (Peer e) Integer + -- deriving stock (Eq,Ord) data DownloadEnv e = DownloadEnv - { _downloadQ :: TQueue (Hash HbSync) - , _peerBusy :: TVar (HashMap (Peer e) ()) + { _downloadQ :: TQueue (Hash HbSync) + , _peerBusy :: TVar (HashMap (Peer e) ()) + , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) + , _blockWip :: Cache (Hash HbSync) () } makeLenses 'DownloadEnv @@ -90,6 +98,8 @@ newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) newDownloadEnv = liftIO do DownloadEnv <$> newTQueueIO <*> newTVarIO mempty + <*> newTVarIO mempty + <*> Cache.newCache (Just defBlockWipTimeout) newtype BlockDownloadM e m a = BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } @@ -110,9 +120,16 @@ withDownload e m = runReaderT ( fromBlockDownloadM m ) e addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () addDownload h = do q <- asks (view downloadQ) - liftIO $ atomically $ writeTQueue q h - -- debug $ "addDownload" <+> pretty h - -- pause ( 0.25 :: Timeout 'Seconds ) + wip <- asks (view blockWip) + + liftIO do + atomically $ writeTQueue q h + Cache.insert wip h () + +removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +removeFromWip h = do + wip <- asks (view blockWip) + liftIO $ Cache.delete wip h withFreePeer :: (MyPeer e, MonadIO m) => Peer e @@ -134,11 +151,46 @@ withFreePeer p n m = do liftIO $ atomically $ modifyTVar busy $ HashMap.delete p pure r +-- NOTE: dangerous! if called in +-- wrong place/wrong time, +-- if may cause a drastical +-- download speed degradation + +dismissPeer :: (MyPeer e, MonadIO m) + => Peer e + -> BlockDownloadM e m () +dismissPeer p = do + busy <- asks (view peerBusy) + liftIO $ atomically $ modifyTVar busy $ HashMap.delete p + getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) getBlockForDownload = do q <- asks (view downloadQ) liftIO $ atomically $ readTQueue q +addBlockInfo :: (MonadIO m, MyPeer e) + => Peer e + -> Hash HbSync + -> Integer + -> BlockDownloadM e m () + +addBlockInfo pip h size = do + -- debug $ "addBlockInfo" <+> pretty h <+> pretty pip <+> pretty size + tv <- asks (view blockPeers) + let mySize = HashMap.singleton pip size + liftIO $ atomically + $ modifyTVar tv (HashMap.insertWith (<>) h mySize) + +getPeersForBlock :: (MonadIO m, MyPeer e) + => Hash HbSync + -> BlockDownloadM e m [(Peer e, Integer)] + +getPeersForBlock h = do + tv <- asks (view blockPeers) + liftIO $ readTVarIO tv <&> foldMap HashMap.toList + . maybeToList + . HashMap.lookup h + processBlock :: forall e m . ( MonadIO m , HasStorage m , Block ByteString ~ ByteString @@ -152,6 +204,10 @@ processBlock h = do bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) + -- FIXME: если блок нашёлся, то удаляем его из wip + + when (isJust bt) (removeFromWip h) + case bt of Nothing -> addDownload h @@ -169,14 +225,28 @@ processBlock h = do if here then do debug $ "block" <+> pretty blk <+> "is already here" + processBlock blk -- NOTE: хуже не стало + -- FIXME: processBlock h + -- может быть, в этом причина того, + -- что мы периодически не докачиваем? + -- + -- может быть, нужно рекурсировать, что бы + -- посмотреть, что это за блок и что у нас + -- из него есть? + pure () -- we don't need to recurse, cause walkMerkle is recursing for us - else + else do addDownload blk + Just (Blob{}) -> do pure () +-- NOTE: if peer does not have a block, it may +-- cause to an unpleasant timeouts +-- So make sure that this peer really answered to +-- GetBlockSize request downloadFromWithPeer :: forall e m . ( MyPeer e , MonadIO m @@ -193,136 +263,159 @@ downloadFromWithPeer :: forall e m . ( MyPeer e , HasStorage m ) => Peer e + -> Integer -> Hash HbSync -> BlockDownloadM e m () -downloadFromWithPeer peer h = do - +downloadFromWithPeer peer thisBkSize h = do npi <- newPeerInfo pinfo <- lift $ fetch True npi (PeerInfoKey peer) id - waitSize <- liftIO $ newTBQueueIO 1 + sto <- lift getStorage - lift $ do - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do - when ( p1 == peer ) $ do - liftIO $ atomically $ writeTBQueue waitSize s + coo <- genCookie (peer,h) + let key = DownloadSessionKey (peer, coo) + let chusz = defChunkSize + dnwld <- newBlockDownload h + let chuQ = view sBlockChunks dnwld + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral thisBkSize) + $ dnwld - request @e peer (GetBlockSize @e h) + lift $ update @e new key id - esize <- liftIO $ race ( pause defBlockInfoTimeout ) do -- FIXME: block size wait time - atomically $ readTBQueue waitSize + let burstSizeT = view peerBurst pinfo - let mbSize = either (const Nothing) Just esize + burstSize <- liftIO $ readTVarIO burstSizeT - sto <- lift $ getStorage + let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)] - case mbSize of - Nothing -> void $ addDownload h - Just thisBkSize -> do + let chunkNums = [ 0 .. pred (length offsets) ] - coo <- genCookie (peer,h) - let key = DownloadSessionKey (peer, coo) - let chusz = defChunkSize - dnwld <- newBlockDownload h - let chuQ = view sBlockChunks dnwld - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral thisBkSize) - $ dnwld + let bursts = calcBursts burstSize chunkNums - lift $ update @e new key id + -- debug $ "bursts: " <+> pretty bursts - let burstSizeT = view peerBurst pinfo + r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) + rq <- liftIO newTQueueIO - burstSize <- liftIO $ readTVarIO burstSizeT + for_ bursts $ liftIO . atomically . writeTQueue rq - let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)] + fix \next -> do + burst <- liftIO $ atomically $ tryReadTQueue rq - let chunkNums = [ 0 .. pred (length offsets) ] + case burst of - let bursts = calcBursts burstSize chunkNums + Just (i,chunksN) -> do + let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) + lift $ request peer (BlockChunks @e coo req) - -- debug $ "bursts: " <+> pretty bursts + -- TODO: here wait for all requested chunks! + -- FIXME: it may blocks forever, so must be timeout and retry - r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) - rq <- liftIO newTQueueIO + catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty ) + ( replicateM chunksN + $ atomically + $ readTQueue chuQ ) - for_ bursts $ liftIO . atomically . writeTQueue rq + ) + if not (null catched) then do + liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) + else do - fix \next -> do - burst <- liftIO $ atomically $ tryReadTQueue rq + liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ + updatePeerInfo pinfo - case burst of + newBurst <- liftIO $ readTVarIO burstSizeT - Just (i,chunksN) -> do - let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) - lift $ request peer (BlockChunks @e coo req) + liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) - -- TODO: here wait for all requested chunks! - -- FIXME: it may blocks forever, so must be timeout and retry + let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] - catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty ) - ( replicateM chunksN - $ atomically - $ readTQueue chuQ ) + liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ - ) - when (null catched) $ do + debug $ "new burst: " <+> pretty newBurst + debug $ "missed chunks for request" <+> pretty (i,chunksN) - -- nerfing peer burst size. - -- FIXME: we need a thread that will be reset them again + for_ chuchu $ liftIO . atomically . writeTQueue rq - newBurst <- liftIO $ atomically - $ stateTVar burstSizeT $ \c -> let v = max 1 (c `div` 2) - in (v,v) + for_ catched $ \(num,bs) -> do + liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) - let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] + next - debug $ "new burst: " <+> pretty newBurst - debug $ "missed chunks for request" <+> pretty (i,chunksN) + Nothing -> do - for_ chuchu $ liftIO . atomically . writeTQueue rq + sz <- liftIO $ readTVarIO r <&> IntMap.size - for_ catched $ \(num,bs) -> do - liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) + if sz == length offsets then do + pieces <- liftIO $ readTVarIO r <&> IntMap.elems + let block = mconcat pieces + let h1 = hashObject @HbSync block - next + if h1 == h then do + -- debug "PROCESS BLOCK" + lift $ expire @e key + void $ liftIO $ putBlock sto block + void $ processBlock h + else do + debug "HASH NOT MATCH" + debug "MAYBE THAT PEER IS JERK" - Nothing -> do + else do + debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS" + got <- liftIO $ readTVarIO r <&> IntMap.keysSet + let need = IntSet.fromList (fmap fromIntegral chunkNums) - sz <- liftIO $ readTVarIO r <&> IntMap.size + let missed = IntSet.toList $ need `IntSet.difference` got - if sz == length offsets then do - pieces <- liftIO $ readTVarIO r <&> IntMap.elems - let block = mconcat pieces - let h1 = hashObject @HbSync block - - if h1 == h then do - -- debug "PROCESS BLOCK" - lift $ expire @e key - void $ liftIO $ putBlock sto block - void $ processBlock h - else do - debug "HASH NOT MATCH" - debug "MAYBE THAT PEER IS JERK" - - else do - debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS" - got <- liftIO $ readTVarIO r <&> IntMap.keysSet - let need = IntSet.fromList (fmap fromIntegral chunkNums) - - let missed = IntSet.toList $ need `IntSet.difference` got - - -- normally this should not happen - -- however, let's try do download the tails - -- by one chunk a time - for_ missed $ \n -> do - liftIO $ atomically $ writeTQueue rq (n,1) + -- normally this should not happen + -- however, let's try do download the tails + -- by one chunk a time + for_ missed $ \n -> do + liftIO $ atomically $ writeTQueue rq (n,1) instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where getPeerLocator = lift getPeerLocator + +updatePeerInfo :: MonadIO m => PeerInfo e -> m () +updatePeerInfo pinfo = do + + t1 <- liftIO $ getTime MonotonicCoarse + + void $ liftIO $ atomically $ do + + bu <- readTVar (view peerBurst pinfo) + errs <- readTVar (view peerErrors pinfo) + errsLast <- readTVar (view peerErrorsLast pinfo) + t0 <- readTVar (view peerLastWatched pinfo) + down <- readTVar (view peerDownloaded pinfo) + downLast <- readTVar (view peerDownloadedLast pinfo) + + let dE = realToFrac $ max 0 (errs - errsLast) + let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9 + + let eps = floor (dE / dT) + + let bu1 = if down - downLast > 0 then + max 1 $ min defBurstMax + $ ceiling + $ if eps == 0 then + realToFrac bu * 1.05 -- FIXME: to defaults + else + realToFrac bu * 0.65 + else + max defBurst $ ceiling (realToFrac bu * 0.65) + + writeTVar (view peerErrorsLast pinfo) errs + writeTVar (view peerLastWatched pinfo) t1 + writeTVar (view peerErrorsPerSec pinfo) eps + writeTVar (view peerBurst pinfo) bu1 + writeTVar (view peerDownloadedLast pinfo) down + + blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , MonadIO m , Request e (BlockInfo e) m @@ -354,6 +447,18 @@ blockDownloadLoop env0 = do pl <- getPeerLocator @e + + void $ liftIO $ async $ forever $ withPeerM e do + pause @'Seconds 0.5 + + pee <- knownPeers @e pl + npi <- newPeerInfo + + + for_ pee $ \p -> do + pinfo <- fetch True npi (PeerInfoKey p) id + updatePeerInfo pinfo + -- TODO: peer info loop void $ liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 20 @@ -365,17 +470,40 @@ blockDownloadLoop env0 = do for_ pee $ \p -> do pinfo <- fetch True npi (PeerInfoKey p) id - burst <- liftIO $ readTVarIO (view peerBurst pinfo) + burst <- liftIO $ readTVarIO (view peerBurst pinfo) + errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo) debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst + <+> "errors:" <+> pretty errors pure () + void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do + pause @'Seconds 5 -- FIXME: put to defaults + -- we need to show download stats + + tinfo <- asks (view blockPeers) + binfo <- liftIO $ readTVarIO tinfo + wip <- asks (view blockWip) + + liftIO $ Cache.purgeExpired wip + + aliveWip <- Set.fromList <$> liftIO (Cache.keys wip) + + let alive = HashMap.fromList [ (h,i) + | (h,i) <- HashMap.toList binfo + , Set.member h aliveWip + ] + + liftIO $ atomically $ writeTVar tinfo alive + + debug $ "maintain blocks wip" <+> pretty (Set.size aliveWip) + withDownload env0 do env <- ask let again h = do - debug $ "block fucked: " <+> pretty h - withPeerM e $ withDownload env (processBlock h) + -- debug $ "retrying block: " <+> pretty h + withPeerM e $ withDownload env (addDownload h) mapM_ processBlock blks @@ -387,13 +515,46 @@ blockDownloadLoop env0 = do unless here do - void $ runMaybeT $ do - p <- MaybeT $ knownPeers @e pl >>= liftIO . shuffleM <&> headMay + peers <- getPeersForBlock h - liftIO $ race ( pause defBlockWaitMax >> again h ) do - withPeerM e $ withDownload env $ do -- NOTE: really crazy shit - withFreePeer p (processBlock h >> pause (0.1 :: Timeout 'Seconds)) do - downloadFromWithPeer p h + when (null peers) $ do + + lift do -- in PeerM + subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do + withDownload env (addBlockInfo p1 hx s) + + pips <- knownPeers @e pl + for_ pips $ \pip -> request pip (GetBlockSize @e h) + + p <- knownPeers @e pl >>= liftIO . shuffleM + + -- FIXME: нам не повезло с пиром => сидим ждём defBlockWaitMax и скачивание + -- простаивает. + -- + -- Нужно: сначала запросить всех у кого есть блок. + -- Потом выбрать победителей и попытаться скачать + -- у них, запомнив размер в кэше. + -- + -- Когда находим блоки -- то сразу же асинхронно запрашиваем + -- размеры, что бы по приходу сюда они уже были + + + -- debug $ "known peers" <+> pretty p + -- debug $ "peers/blocks" <+> pretty peers + + p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster + + let withAllShit f = withPeerM e $ withDownload env f + + maybe1 p0 (again h) $ \(p1,size) -> do + withFreePeer p1 (again h) $ + liftIO do + re <- race ( pause defBlockWaitMax ) $ + withAllShit $ downloadFromWithPeer p1 size h + + case re of + Left{} -> withAllShit (again h) + Right{} -> withAllShit (processBlock h) next diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index fc41f853..365419be 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -11,9 +11,15 @@ import Lens.Micro.Platform import Control.Concurrent.STM.TVar -newtype PeerInfo e = +data PeerInfo e = PeerInfo - { _peerBurst :: TVar Int + { _peerBurst :: TVar Int + , _peerErrors :: TVar Int + , _peerErrorsLast :: TVar Int + , _peerErrorsPerSec :: TVar Int + , _peerLastWatched :: TVar TimeSpec + , _peerDownloaded :: TVar Int + , _peerDownloadedLast :: TVar Int } deriving stock (Generic,Typeable) @@ -23,7 +29,12 @@ makeLenses 'PeerInfo newPeerInfo :: MonadIO m => m (PeerInfo e) newPeerInfo = liftIO do PeerInfo <$> newTVarIO defBurst - + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 type instance SessionData e (PeerInfo e) = PeerInfo e diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 13740085..046ce81b 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -65,6 +65,7 @@ data RPCCommand = | ANNOUNCE (Hash HbSync) | PING (PeerAddr UDP) | CHECK PeerNonce (PeerAddr UDP) (Hash HbSync) + | FETCH (Hash HbSync) data PeerOpts = PeerOpts @@ -86,10 +87,10 @@ main = do sodiumInit setLogging @DEBUG (set loggerTr ("[debug] " <>)) - setLogging @INFO asIs - setLogging @ERROR asIs - setLogging @WARN asIs - setLogging @NOTICE asIs + setLogging @INFO defLog + setLogging @ERROR defLog + setLogging @WARN defLog + setLogging @NOTICE defLog withSimpleLogger runCLI @@ -106,6 +107,7 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ <> command "poke" (info pPoke (progDesc "poke peer by rpc")) <> command "announce" (info pAnnounce (progDesc "announce block")) <> command "ping" (info pPing (progDesc "ping another peer")) + <> command "fetch" (info pFetch (progDesc "fetch block")) ) common = do @@ -145,6 +147,11 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ h <- strArgument ( metavar "HASH" ) pure $ runRpcCommand rpc (ANNOUNCE h) + pFetch = do + rpc <- pRpcCommon + h <- strArgument ( metavar "HASH" ) + pure $ runRpcCommand rpc (FETCH h) + pPing = do rpc <- pRpcCommon h <- strArgument ( metavar "ADDR" ) @@ -289,6 +296,12 @@ runPeer opts = Exception.handle myException $ do debug $ "Got authorized peer!" <+> pretty p <+> pretty (AsBase58 (view peerSignKey d)) + + void $ liftIO $ async $ withPeerM env do + pause @'Seconds 1 + debug "sending first peer announce" + request localMulticast (PeerAnnounce @UDP pnonce) + void $ liftIO $ async $ withPeerM env $ forever $ do pause defPeerAnnounceTime -- FIXME: setting! debug "sending local peer announce" @@ -339,6 +352,8 @@ runPeer opts = Exception.handle myException $ do withDownload denv $ do processBlock h + _ -> pure () + me <- liftIO $ async $ withPeerM env $ do runProto @UDP @@ -359,10 +374,16 @@ runPeer opts = Exception.handle myException $ do let pingAction pa = do liftIO $ atomically $ writeTQueue rpcQ (PING pa) + let fetchAction h = do + debug $ "fetchAction" <+> pretty h + liftIO $ withPeerM penv + $ withDownload denv (processBlock h) + let arpc = RpcAdapter pokeAction dontHandle annAction pingAction + fetchAction rpc <- async $ runRPC udp1 do runProto @UDP @@ -408,12 +429,6 @@ emitToPeer env k e = liftIO $ withPeerM env (emit k e) withRPC :: String -> RPC UDP -> IO () withRPC saddr cmd = withSimpleLogger do - setLogging @DEBUG asIs - setLogging @INFO asIs - setLogging @ERROR asIs - setLogging @WARN asIs - setLogging @NOTICE asIs - as <- parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress) let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as @@ -437,6 +452,8 @@ withRPC saddr cmd = withSimpleLogger do RPCPing{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + _ -> pure () void $ liftIO $ waitAnyCatchCancel [proto] @@ -445,15 +462,17 @@ withRPC saddr cmd = withSimpleLogger do where adapter = RpcAdapter dontHandle - (const $ debug "alive-and-kicking" >> liftIO exitSuccess) + (const $ notice "alive-and-kicking" >> liftIO exitSuccess) (const $ liftIO exitSuccess) (const $ debug "wat?") + dontHandle runRpcCommand :: String -> RPCCommand -> IO () runRpcCommand saddr = \case POKE -> withRPC saddr (RPCPoke @UDP) PING s -> withRPC saddr (RPCPing s) ANNOUNCE h -> withRPC saddr (RPCAnnounce @UDP h) + FETCH h -> withRPC saddr (RPCFetch @UDP h) _ -> pure () diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index e7c42379..44a87d00 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -18,6 +18,7 @@ data RPC e = | RPCPing (PeerAddr e) | RPCPokeAnswer | RPCAnnounce (Hash HbSync) + | RPCFetch (Hash HbSync) deriving stock (Generic) @@ -44,6 +45,7 @@ data RpcAdapter e m = , rpcOnPokeAnswer :: RPC e -> m () , rpcOnAnnounce :: Hash HbSync -> m () , rpcOnPing :: PeerAddr e -> m () + , rpcOnFetch :: Hash HbSync -> m () } newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } @@ -85,4 +87,5 @@ rpcHandler adapter = \case p@RPCPokeAnswer{} -> rpcOnPokeAnswer adapter p (RPCAnnounce h) -> rpcOnAnnounce adapter h (RPCPing pa) -> rpcOnPing adapter pa + (RPCFetch h) -> rpcOnFetch adapter h