diff --git a/.fixme/log b/.fixme/log index 900836af..d413e4aa 100644 --- a/.fixme/log +++ b/.fixme/log @@ -219,4 +219,6 @@ fixme-set "assigned" "voidlizard" "BhME2nDpbd" fixme-set "workflow" "test" "BhME2nDpbd" fixme-set "workflow" "wip" "39Fc5R5uXU" -fixme-set "assigned" "voidlizard" "39Fc5R5uXU" \ No newline at end of file +fixme-set "assigned" "voidlizard" "39Fc5R5uXU" +fixme-set "workflow" "test" "39Fc5R5uXU" +fixme-set "workflow" "backlog" "HcrvggGcAs" \ No newline at end of file diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 0c83e05e..85a1b07f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -38,6 +38,7 @@ blockSizeProto getBlockSize evHasBlock = NoBlock h -> do that <- thatPeer (Proxy @(BlockInfo e)) + emit @e (BlockSizeEventKey h) (NoBlockEvent that) evHasBlock ( that, h, Nothing ) BlockSize h sz -> do @@ -57,7 +58,9 @@ newtype instance EventKey e (BlockInfo e) = deriving instance Hashable (EventKey e (BlockInfo e)) -newtype instance Event e (BlockInfo e) = - BlockSizeEvent (Peer e, Hash HbSync, Integer) +data instance Event e (BlockInfo e) = + BlockSizeEvent (Peer e, Hash HbSync, Integer) + | NoBlockEvent (Peer e) deriving stock (Typeable) + diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 531778c9..9fc45e29 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -582,8 +582,12 @@ peerDownloadLoop peer = do r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do blksq <- liftIO newTQueueIO - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do - liftIO $ atomically $ writeTQueue blksq s + subscribe @e (BlockSizeEventKey h) $ \case + (BlockSizeEvent (_,_,s)) -> do + liftIO $ atomically $ writeTQueue blksq (Just s) + (NoBlockEvent p) -> do + debug $ "NoBlockEvent" <+> pretty p <+> pretty h + liftIO $ atomically $ writeTQueue blksq Nothing request peer (GetBlockSize @e h) @@ -594,7 +598,10 @@ peerDownloadLoop peer = do liftIO $ atomically $ modifyTVar downFail succ addDownload h - Right size -> do + Right Nothing -> do + addDownload h -- this is a legit situation; it is handled above (block ban... etc). + + Right (Just size) -> do r2 <- liftIO $ race ( pause defBlockWaitMax ) $ withPeerM e $ withDownload ee diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs index fc59e314..02df5754 100644 --- a/hbs2-peer/app/DownloadQ.hs +++ b/hbs2-peer/app/DownloadQ.hs @@ -17,6 +17,7 @@ import PeerConfig import Data.Map qualified as Map import Data.Foldable import Control.Concurrent.STM +import Control.Concurrent.STM.TSem import Data.ByteString.Char8 qualified as B8 import Data.List (nub) import Data.Maybe @@ -24,6 +25,7 @@ import Data.Functor import Data.Function import Control.Exception import Control.Monad +import Control.Concurrent.Async import System.IO @@ -46,42 +48,56 @@ downloadQueue conf denv = do sto <- getStorage hq <- liftIO newTQueueIO + fsem <- liftIO $ atomically $ newTSem 1 - pause @'Seconds 1 + pause @'Seconds 2 let qfile' = cfgValue @PeerDownloadLogKey conf :: Maybe String subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do liftIO $ atomically $ writeTQueue hq h + maybe1 qfile' noLogFile $ \fn -> do + void $ liftIO $ async $ forever $ do + pause @'Seconds 10 + fromq <- liftIO $ atomically $ flushTQueue hq + unless (null fromq) do + atomically $ waitTSem fsem + catchAny ( B8.appendFile fn ( B8.unlines (fmap (B8.pack.show.pretty) fromq) ) ) + whimper + atomically $ signalTSem fsem + maybe1 qfile' noLogFile $ \fn -> forever do debug $ "downloadQueue" <+> pretty fn liftIO do + -- FIXME: will-crash-on-big-logs + atomically $ waitTSem fsem r <- catchAny (B8.readFile fn) (\e -> whimper e >> pure "") + atomically $ signalTSem fsem let hashes = B8.lines r & mapMaybe (fromStringMay . B8.unpack) & nub :: [Hash HbSync] fromq <- liftIO $ atomically $ flushTQueue hq let hashesWip = nub ( hashes <> fromq ) - errnum <- newTVarIO mempty + errnum <- newTQueueIO let walk h = walkMerkle h (getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do case hr of - Left{} -> atomically $ modifyTVar errnum (mappend [(h,True)]) + Left{} -> atomically $ writeTQueue errnum (h,True) Right (hrr :: [HashRef]) -> do forM_ hrr $ \(HashRef hx) -> do mblk <- hasBlock sto hx case mblk of - Nothing -> atomically $ modifyTVar errnum (mappend [(h,True)]) + Nothing -> atomically $ writeTQueue errnum (h,True) _ -> pure () for_ hashesWip walk - loosers <- readTVarIO errnum <&> Map.fromListWith (||) <&> Map.filter id + loosers <- atomically $ flushTQueue errnum <&> Map.fromListWith (||) <&> Map.filter id -- debug $ vcat (fmap pretty (Map.toList loosers)) @@ -89,8 +105,10 @@ downloadQueue conf denv = do for_ leftovers $ withDownload denv . addDownload + atomically $ waitTSem fsem catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) ) whimper + atomically $ signalTSem fsem debug "downloadQueue okay"