faster-download-log-append

This commit is contained in:
Dmitry Zuikov 2023-02-25 07:27:08 +03:00
parent 8f0b8f0dc2
commit d7c93c8d3e
4 changed files with 41 additions and 11 deletions

View File

@ -219,4 +219,6 @@ fixme-set "assigned" "voidlizard" "BhME2nDpbd"
fixme-set "workflow" "test" "BhME2nDpbd"
fixme-set "workflow" "wip" "39Fc5R5uXU"
fixme-set "assigned" "voidlizard" "39Fc5R5uXU"
fixme-set "assigned" "voidlizard" "39Fc5R5uXU"
fixme-set "workflow" "test" "39Fc5R5uXU"
fixme-set "workflow" "backlog" "HcrvggGcAs"

View File

@ -38,6 +38,7 @@ blockSizeProto getBlockSize evHasBlock =
NoBlock h -> do
that <- thatPeer (Proxy @(BlockInfo e))
emit @e (BlockSizeEventKey h) (NoBlockEvent that)
evHasBlock ( that, h, Nothing )
BlockSize h sz -> do
@ -57,7 +58,9 @@ newtype instance EventKey e (BlockInfo e) =
deriving instance Hashable (EventKey e (BlockInfo e))
newtype instance Event e (BlockInfo e) =
BlockSizeEvent (Peer e, Hash HbSync, Integer)
data instance Event e (BlockInfo e) =
BlockSizeEvent (Peer e, Hash HbSync, Integer)
| NoBlockEvent (Peer e)
deriving stock (Typeable)

View File

@ -582,8 +582,12 @@ peerDownloadLoop peer = do
r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do
blksq <- liftIO newTQueueIO
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do
liftIO $ atomically $ writeTQueue blksq s
subscribe @e (BlockSizeEventKey h) $ \case
(BlockSizeEvent (_,_,s)) -> do
liftIO $ atomically $ writeTQueue blksq (Just s)
(NoBlockEvent p) -> do
debug $ "NoBlockEvent" <+> pretty p <+> pretty h
liftIO $ atomically $ writeTQueue blksq Nothing
request peer (GetBlockSize @e h)
@ -594,7 +598,10 @@ peerDownloadLoop peer = do
liftIO $ atomically $ modifyTVar downFail succ
addDownload h
Right size -> do
Right Nothing -> do
addDownload h -- this is a legit situation; it is handled above (block ban... etc).
Right (Just size) -> do
r2 <- liftIO $ race ( pause defBlockWaitMax )
$ withPeerM e
$ withDownload ee

View File

@ -17,6 +17,7 @@ import PeerConfig
import Data.Map qualified as Map
import Data.Foldable
import Control.Concurrent.STM
import Control.Concurrent.STM.TSem
import Data.ByteString.Char8 qualified as B8
import Data.List (nub)
import Data.Maybe
@ -24,6 +25,7 @@ import Data.Functor
import Data.Function
import Control.Exception
import Control.Monad
import Control.Concurrent.Async
import System.IO
@ -46,42 +48,56 @@ downloadQueue conf denv = do
sto <- getStorage
hq <- liftIO newTQueueIO
fsem <- liftIO $ atomically $ newTSem 1
pause @'Seconds 1
pause @'Seconds 2
let qfile' = cfgValue @PeerDownloadLogKey conf :: Maybe String
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
liftIO $ atomically $ writeTQueue hq h
maybe1 qfile' noLogFile $ \fn -> do
void $ liftIO $ async $ forever $ do
pause @'Seconds 10
fromq <- liftIO $ atomically $ flushTQueue hq
unless (null fromq) do
atomically $ waitTSem fsem
catchAny ( B8.appendFile fn ( B8.unlines (fmap (B8.pack.show.pretty) fromq) ) )
whimper
atomically $ signalTSem fsem
maybe1 qfile' noLogFile $ \fn -> forever do
debug $ "downloadQueue" <+> pretty fn
liftIO do
-- FIXME: will-crash-on-big-logs
atomically $ waitTSem fsem
r <- catchAny (B8.readFile fn) (\e -> whimper e >> pure "")
atomically $ signalTSem fsem
let hashes = B8.lines r & mapMaybe (fromStringMay . B8.unpack) & nub :: [Hash HbSync]
fromq <- liftIO $ atomically $ flushTQueue hq
let hashesWip = nub ( hashes <> fromq )
errnum <- newTVarIO mempty
errnum <- newTQueueIO
let walk h = walkMerkle h (getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
case hr of
Left{} -> atomically $ modifyTVar errnum (mappend [(h,True)])
Left{} -> atomically $ writeTQueue errnum (h,True)
Right (hrr :: [HashRef]) -> do
forM_ hrr $ \(HashRef hx) -> do
mblk <- hasBlock sto hx
case mblk of
Nothing -> atomically $ modifyTVar errnum (mappend [(h,True)])
Nothing -> atomically $ writeTQueue errnum (h,True)
_ -> pure ()
for_ hashesWip walk
loosers <- readTVarIO errnum <&> Map.fromListWith (||) <&> Map.filter id
loosers <- atomically $ flushTQueue errnum <&> Map.fromListWith (||) <&> Map.filter id
-- debug $ vcat (fmap pretty (Map.toList loosers))
@ -89,8 +105,10 @@ downloadQueue conf denv = do
for_ leftovers $ withDownload denv . addDownload
atomically $ waitTSem fsem
catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) )
whimper
atomically $ signalTSem fsem
debug "downloadQueue okay"