From 098a748e7e5f44e28f5cd89c02c1550fc1e5e116 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 3 Apr 2023 11:25:54 +0300 Subject: [PATCH] udp-download-fix --- .fixme/log | 2 +- hbs2-core/lib/HBS2/Defaults.hs | 2 +- hbs2-peer/app/BlockDownload.hs | 39 +++-------- hbs2-peer/app/Brains.hs | 118 ++++++++++++++++++++++++++++++++- 4 files changed, 126 insertions(+), 35 deletions(-) diff --git a/.fixme/log b/.fixme/log index 80056e89..d9739835 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,2 +1,2 @@ -(fixme-set "workflow" "test" "8vUEBAHeh9") \ No newline at end of file +(fixme-set "workflow" "test" "2HmuD6jV8H") \ No newline at end of file diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index fcb0de41..5d99a5fe 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -70,7 +70,7 @@ defBlockWipTimeout :: TimeSpec defBlockWipTimeout = defCookieTimeout defBlockInfoTimeout :: Timeout 'Seconds -defBlockInfoTimeout = 20 +defBlockInfoTimeout = 5 defBlockInfoTimeoutSpec :: TimeSpec defBlockInfoTimeoutSpec = toTimeSpec defBlockInfoTimeout diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 8a6c565e..186aa646 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -569,9 +569,6 @@ peerDownloadLoop :: forall e m . ( MyPeer e ) => Peer e -> BlockDownloadM e m () peerDownloadLoop peer = do - sizeCache <- liftIO $ Cache.newCache @_ @Integer (Just defBlockSizeCacheTime) - noBlock <- liftIO $ Cache.newCache (Just defBlockBanTime) - pe <- lift ask e <- ask @@ -583,8 +580,8 @@ peerDownloadLoop peer = do subscribe @e (BlockSizeEventKey h) $ \case BlockSizeEvent (p1,_,s) -> do when (p1 == peer) do - liftIO $ Cache.insert sizeCache h s liftIO $ atomically $ writeTQueue q (Just s) + onBlockSize brains peer h s NoBlockEvent{} -> do -- TODO: ban-block-for-some-seconds @@ -610,6 +607,7 @@ peerDownloadLoop peer = do trace $ pretty peer <+> "block" <+> pretty h <+> "is already here" processBlock h else do + lift $ onBlockDownloadAttempt brains peer h let downFail = view peerDownloadFail pinfo let downBlk = view peerDownloadedBlk pinfo @@ -640,10 +638,6 @@ peerDownloadLoop peer = do let thenNext m = m >> next - liftIO do - Cache.purgeExpired sizeCache - Cache.purgeExpired noBlock - npi <- newPeerInfo auth' <- lift $ find (KnownPeerKey peer) id @@ -680,39 +674,24 @@ peerDownloadLoop peer = do liftIO $ atomically $ writeTVar idle 0 - lift $ onBlockDownloadAttempt brains peer h - trace $ "start download block" <+> pretty peer <+> pretty h - mbSize <- liftIO $ Cache.lookup sizeCache h - noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust + mbSize2 <- blockSize brains peer h - case mbSize of + case mbSize2 of Just size -> do trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size tryDownload pinfo h size - Nothing | noBlk -> do - trace $ pretty peer <+> "does not have block" <+> pretty h - addDownload mzero h - Nothing -> do - r <- doBlockSizeRequest h - case r of - Left{} -> failedDownload peer h - - Right Nothing -> do - here <- liftIO $ Cache.lookup noBlock h <&> isJust - - unless here $ - liftIO $ Cache.insert noBlock h () - - addDownload mzero h - - Right (Just s) -> do + (Right (Just s)) -> do tryDownload pinfo h s + pure () + + _ -> pure () + warnExit void $ delPeerThreadData peer diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index f3d67027..00f0ac22 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -26,11 +26,21 @@ import Database.SQLite.Simple import Database.SQLite.Simple.FromField import System.Random (randomRIO) import Data.Word +import Data.Either class HasBrains e a where onKnownPeers :: MonadIO m => a -> [Peer e] -> m () + onBlockSize :: ( MonadIO m + , IsPeerAddr e m + ) + => a + -> Peer e + -> Hash HbSync + -> Integer + -> m () + onBlockDownloadAttempt :: ( MonadIO m , IsPeerAddr e m ) @@ -73,12 +83,23 @@ class HasBrains e a where -> Hash HbSync -> m [PeerAddr e] + blockSize :: forall m . MonadIO m + => a + -> Peer e + -> Hash HbSync + -> m (Maybe Integer) + + blockSize _ _ _ = pure Nothing + type NoBrains = () instance Pretty (Peer e) => HasBrains e NoBrains where onKnownPeers _ ps = pure () + onBlockSize _ _ _ _ = do + pure () + onBlockDownloadAttempt _ p h = do pure () @@ -100,6 +121,7 @@ data SomeBrains e = forall a . HasBrains e a => SomeBrains a instance HasBrains e (SomeBrains e) where onKnownPeers (SomeBrains a) = onKnownPeers a + onBlockSize (SomeBrains a) = onBlockSize a onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a onBlockDownloaded (SomeBrains a) = onBlockDownloaded a onBlockPostponed (SomeBrains a) = onBlockPostponed @e a @@ -107,6 +129,9 @@ instance HasBrains e (SomeBrains e) where shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a + blockSize (SomeBrains a) = blockSize @e a + +newtype CommitCmd = CommitCmd { onCommited :: IO () } data BasicBrains e = BasicBrains @@ -115,6 +140,7 @@ data BasicBrains e = , _brainsExpire :: Cache (Hash HbSync) () , _brainsDb :: Connection , _brainsPipeline :: TQueue (IO ()) + , _brainsCommit :: TQueue CommitCmd } makeLenses 'BasicBrains @@ -133,6 +159,14 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe let tv = view brainsPeers br liftIO $ atomically $ writeTVar tv ps + onBlockSize b p h size = do + updateOP b $ insertSize b p h size + commitNow b True + -- FIXME: wait-till-really-commited + sz <- liftIO $ selectBlockSize b p h + trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz + pure () + onBlockDownloadAttempt b peer h = do -- trace "BRAINS: onBlockDownloadAttempt" noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null @@ -176,11 +210,67 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe r <- liftIO $ findPeers b h pure $ mapMaybe fromStringMay r + blockSize b p h = do + liftIO $ selectBlockSize b p h + +commitNow :: forall e m . MonadIO m + => BasicBrains e + -> Bool + -> m () + +commitNow br doWait = do + w <- liftIO newTQueueIO + + let answer | doWait = do + atomically $ writeTQueue w () + | otherwise = pure () + + liftIO $ atomically $ writeTQueue (view brainsCommit br) (CommitCmd answer) + + when doWait $ liftIO do + void $ atomically $ do + readTQueue w >> flushTQueue w + updateOP :: forall e m . MonadIO m => BasicBrains e -> IO () -> m () updateOP br op = do let pip = view brainsPipeline br liftIO $ atomically $ writeTQueue pip (liftIO op) +insertSize :: forall e . Pretty (Peer e) + => BasicBrains e + -> Peer e + -> Hash HbSync + -> Integer + -> IO () + +insertSize br p h s = do + + + let conn = view brainsDb br + + void $ liftIO $ execute conn [qc| + insert into blocksize (block, peer, size) values (?,?,?) + on conflict (block,peer) do update set size = ? + |] (show $ pretty h, show $ pretty p, s, s) + + +selectBlockSize :: forall e . Pretty (Peer e) + => BasicBrains e + -> Peer e + -> Hash HbSync + -> IO (Maybe Integer) +selectBlockSize br p h = do + + let conn = view brainsDb br + + liftIO $ query conn [qc| + select size + from blocksize + where + block = ? and peer = ? + limit 1 + |] (show $ pretty h, show $ pretty p) <&> fmap fromOnly <&> listToMaybe + insertAncestor :: BasicBrains e -> Hash HbSync -- ^ parent -> Hash HbSync -- ^ child @@ -288,6 +378,7 @@ SAVEPOINT zzz1; DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600; +DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > 300; RELEASE SAVEPOINT zzz1; @@ -309,7 +400,8 @@ transactional brains action = do Right{} -> do execute_ conn [qc|RELEASE SAVEPOINT {sp}|] - Left ( _ :: SomeException ) -> do + Left ( e :: SomeException ) -> do + err $ "BRAINS: " <+> viaShow e execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|] -- FIXME: eventually-close-db @@ -334,25 +426,45 @@ newBasicBrains = liftIO do , primary key (block,peer)) |] + execute_ conn [qc| + create table if not exists blocksize + ( block text not null + , peer text not null + , size int + , ts DATE DEFAULT (datetime('now','localtime')) + , primary key (block,peer)) + |] + BasicBrains <$> newTVarIO mempty <*> newTVarIO mempty <*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds))) <*> pure conn <*> newTQueueIO + <*> newTQueueIO runBasicBrains :: MonadIO m => BasicBrains e -> m () runBasicBrains brains = do let pip = view brainsPipeline brains let expire = view brainsExpire brains + let commit = view brainsCommit brains -- FIXME: async-error-handling void $ liftIO $ async $ forever do - pause @'Seconds 5 - -- transactional brains do + + ewaiters <- race (pause @'Seconds 5) $ do + atomically $ do + c <- readTQueue commit + cs <- flushTQueue commit + pure (c:cs) + + let waiters = fromRight mempty ewaiters & fmap onCommited + w <- atomically $ readTQueue pip ws <- atomically $ flushTQueue pip + transactional brains (sequence_ (w:ws)) + sequence_ waiters void $ liftIO $ async $ forever do pause @'Seconds 60