This commit is contained in:
voidlizard 2024-12-03 12:35:24 +03:00
parent 4941c5442c
commit 1012368cb5
1 changed files with 106 additions and 75 deletions

View File

@ -52,8 +52,13 @@ import Numeric
import UnliftIO
import Control.Concurrent.STM.TSem (TSem)
import Control.Concurrent.STM.TSem qualified as TSem
import UnliftIO.Concurrent
import UnliftIO.STM
import UnliftIO.Exception as U
import Control.Exception qualified as E
import Lens.Micro.Platform
import System.Random
import System.Random.Shuffle (shuffleM,shuffle')
@ -582,6 +587,13 @@ data DCB =
newDcbSTM :: TimeSpec -> Maybe HashRef -> STM DCB
newDcbSTM ts parent = DCB ts parent <$> newTVar 0 <*> newTVar False
data DownloadSweepOnIdle =
DownloadSweepOnIdle
deriving stock (Show,Typeable)
instance Exception DownloadSweepOnIdle
data PSt =
PChoose
| PInit HashRef DCB
@ -596,101 +608,121 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto
-> SomeBrains e
-> PeerEnv e
-> m ()
downloadDispatcher probe brains env = flip runContT pure do
downloadDispatcher probe brains env = forever $ flip runContT pure do
debug $ red "downloadDispatcher spawned!"
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) )
wip <- newTVarIO ( mempty :: HashMap HashRef DCB )
parseQ <- newTQueueIO
wip <- newTVarIO ( mempty :: HashMap HashRef DCB )
parseQ <- newTQueueIO
let
onBlockSTM :: HashRef -> STM ()
onBlockSTM = writeTQueue parseQ
let
onBlockSTM :: HashRef -> STM ()
onBlockSTM = writeTQueue parseQ
insertNewDownloadSTM :: TimeSpec -> HashRef -> STM ()
insertNewDownloadSTM now ha = do
already <- readTVar wip <&> HM.member ha
unless already do
dcb <- newDcbSTM now mzero
modifyTVar wip (HM.insert ha dcb)
insertNewDownloadSTM :: TimeSpec -> HashRef -> STM ()
insertNewDownloadSTM now ha = do
already <- readTVar wip <&> HM.member ha
unless already do
dcb <- newDcbSTM now mzero
modifyTVar wip (HM.insert ha dcb)
insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 ()
insertNewDownload ha = do
now <- getTimeCoarse
atomically $ insertNewDownloadSTM now ha
newDownload @e brains ha
insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 ()
insertNewDownload ha = do
now <- getTimeCoarse
atomically $ insertNewDownloadSTM now ha
newDownload @e brains ha
void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts
void $ ContT $ bracket none $ const do
readTVarIO pts <&> fmap fst . HM.elems >>= mapM_ cancel
sto <- withPeerM env getStorage
void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts
liftIO $ withPeerM env do
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
here <- hasBlock sto h <&> isJust
unless here do
debug $ green "New download request" <+> pretty h
insertNewDownload (HashRef h)
sto <- withPeerM env getStorage
dupes <- newTVarIO ( mempty :: HashMap HashRef Int )
liftIO $ withPeerM env do
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
here <- hasBlock sto h <&> isJust
unless here do
debug $ green "New download request" <+> pretty h
insertNewDownload (HashRef h)
ContT $ withAsync $ forever $ pause @'Seconds 10 >> do
acceptReport probe =<< S.toList_ do
wip <- readTVarIO wip <&> HM.size
pn <- readTVarIO pts <&> HM.size
S.yield ( "wip", fromIntegral wip )
S.yield ( "peerThreads", fromIntegral pn )
dupes <- newTVarIO ( mempty :: HashMap HashRef Int )
ContT $ withAsync do
polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do
atomically $ modifyTVar dupes (HM.delete h)
ContT $ withAsync $ forever $ pause @'Seconds 10 >> do
acceptReport probe =<< S.toList_ do
wip <- readTVarIO wip <&> HM.size
pn <- readTVarIO pts <&> HM.size
S.yield ( "wip", fromIntegral wip )
S.yield ( "peerThreads", fromIntegral pn )
ContT $ withAsync do
pause @'Seconds 10
forever $ (>> pause @'Seconds 60) $ do
down <- listDownloads @e brains
for down \(h,_) -> do
already <- readTVarIO wip <&> HM.member h
checked <- readTVarIO dupes <&> HM.member h
unless checked do
here <- hasBlock sto (coerce h) <&> isJust
when here do
atomically $ modifyTVar dupes (HM.insertWith (+) h 1)
delDownload @e brains h
unless already do
missed <- findMissedBlocks sto h
for_ missed insertNewDownload
ContT $ withAsync do
polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do
atomically $ modifyTVar dupes (HM.delete h)
ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do
debug "Sweep blocks"
atomically do
total <- readTVar wip <&> HM.toList
ContT $ withAsync do
pause @'Seconds 10
forever $ (>> pause @'Seconds 60) $ do
down <- listDownloads @e brains
for down \(h,_) -> do
already <- readTVarIO wip <&> HM.member h
checked <- readTVarIO dupes <&> HM.member h
unless checked do
here <- hasBlock sto (coerce h) <&> isJust
when here do
atomically $ modifyTVar dupes (HM.insertWith (+) h 1)
delDownload @e brains h
unless already do
missed <- findMissedBlocks sto h
for_ missed insertNewDownload
alive <- for total $ \e@(h,DCB{..}) -> do
down <- readTVar dcbDownloaded
if down then
pure Nothing
else
pure (Just e)
ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do
debug "Sweep blocks"
atomically do
total <- readTVar wip <&> HM.toList
writeTVar wip (HM.fromList (catMaybes alive))
alive <- for total $ \e@(h,DCB{..}) -> do
down <- readTVar dcbDownloaded
if down then
pure Nothing
else
pure (Just e)
ContT $ withAsync $ forever do
what <- atomically $ readTQueue parseQ
missed <- findMissedBlocks sto what
for_ missed insertNewDownload
writeTVar wip (HM.fromList (catMaybes alive))
forever $ (>> pause @'Seconds 10) do
sw0 <- readTVarIO wip <&> HM.size
n <- countDownloads @e brains
debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n)
ContT $ withAsync $ forever do
what <- atomically $ readTQueue parseQ
missed <- findMissedBlocks sto what
for_ missed insertNewDownload
idle <- ContT $ withAsync $ do
t0 <- getTimeCoarse
flip fix t0 $ \next ti -> do
num <- readTVarIO wip <&> HM.size
t1 <- getTimeCoarse
if num /= 0 then do
pause @Seconds 5 >> next t1
else do
let idle = expired (TimeoutSec 600) (t1 - ti)
-- debug $ blue "EXPIRED" <+> pretty (idle,t1,ti)
when idle $ throwIO DownloadSweepOnIdle
pause @Seconds 5
next t0
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
sw0 <- readTVarIO wip <&> HM.size
n <- countDownloads @e brains
debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n)
void $ waitCatch idle
where
manageThreads :: (HashRef -> STM ())
-> TVar (HashMap HashRef DCB)
-> TVar (HashMap (Peer e) (Async (), PeerNonce))
-> m ()
-- manageThreads :: (HashRef -> STM ())
-- -> TVar (HashMap HashRef DCB)
-- -> TVar (HashMap (Peer e) (Async (), PeerNonce))
-- -> m ()
manageThreads onBlock wip pts = do
_pnum <- newTVarIO 1
@ -711,7 +743,6 @@ downloadDispatcher probe brains env = flip runContT pure do
maybe1 mpd (pure ()) $
\PeerData{..} -> S.yield (p, _peerOwnNonce)
for_ (HM.toList peers) $ \(p,nonce) -> do
here <- readTVarIO pts <&> HM.member p