mirror of https://github.com/voidlizard/hbs2
bring back persistent DownloadQ
This commit is contained in:
parent
d75d7ef8cd
commit
051e7c417e
|
@ -606,6 +606,19 @@ downloadDispatcher brains env = flip runContT pure do
|
|||
onBlockSTM :: HashRef -> STM ()
|
||||
onBlockSTM = writeTQueue parseQ
|
||||
|
||||
insertNewDownloadSTM :: TimeSpec -> HashRef -> STM ()
|
||||
insertNewDownloadSTM now ha = do
|
||||
already <- readTVar wip <&> HM.member ha
|
||||
unless already do
|
||||
dcb <- newDcbSTM now mzero
|
||||
modifyTVar wip (HM.insert ha dcb)
|
||||
|
||||
insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 ()
|
||||
insertNewDownload ha = do
|
||||
now <- getTimeCoarse
|
||||
atomically $ insertNewDownloadSTM now ha
|
||||
newDownload @e brains ha
|
||||
|
||||
void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts
|
||||
|
||||
sto <- withPeerM env getStorage
|
||||
|
@ -614,14 +627,30 @@ downloadDispatcher brains env = flip runContT pure do
|
|||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||
here <- hasBlock sto h <&> isJust
|
||||
unless here do
|
||||
now <- getTimeCoarse
|
||||
debug $ green "New download request" <+> pretty h
|
||||
atomically do
|
||||
already <- readTVar wip <&> HM.member (HashRef h)
|
||||
dcb <- newDcbSTM now mzero
|
||||
let w = 1.0 -- realToFrac now
|
||||
unless already do
|
||||
modifyTVar wip (HM.insert (HashRef h) dcb)
|
||||
insertNewDownload (HashRef h)
|
||||
|
||||
dupes <- newTVarIO ( mempty :: HashMap HashRef Int )
|
||||
|
||||
ContT $ withAsync do
|
||||
polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do
|
||||
atomically $ modifyTVar dupes (HM.delete h)
|
||||
|
||||
ContT $ withAsync do
|
||||
pause @'Seconds 10
|
||||
forever $ (>> pause @'Seconds 60) $ do
|
||||
down <- listDownloads @e brains
|
||||
for down \(h,_) -> do
|
||||
already <- readTVarIO wip <&> HM.member h
|
||||
checked <- readTVarIO dupes <&> HM.member h
|
||||
unless checked do
|
||||
here <- hasBlock sto (coerce h) <&> isJust
|
||||
when here do
|
||||
atomically $ modifyTVar dupes (HM.insertWith (+) h 1)
|
||||
delDownload @e brains h
|
||||
unless already do
|
||||
missed <- findMissedBlocks sto h
|
||||
for_ missed insertNewDownload
|
||||
|
||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do
|
||||
debug "Sweep blocks"
|
||||
|
@ -637,22 +666,15 @@ downloadDispatcher brains env = flip runContT pure do
|
|||
|
||||
writeTVar wip (HM.fromList (catMaybes alive))
|
||||
|
||||
|
||||
ContT $ withAsync $ forever do
|
||||
what <- atomically $ readTQueue parseQ
|
||||
missed <- findMissedBlocks sto what
|
||||
now <- getTimeCoarse
|
||||
for_ missed $ \hi -> do
|
||||
atomically do
|
||||
dcb <- newDcbSTM now (Just what)
|
||||
let w = realToFrac now
|
||||
already <- readTVar wip <&> HM.member hi
|
||||
unless already do
|
||||
modifyTVar wip (HM.insert hi dcb)
|
||||
for_ missed insertNewDownload
|
||||
|
||||
forever $ (>> pause @'Seconds 10) do
|
||||
sw0 <- readTVarIO wip <&> HM.size
|
||||
debug $ yellow $ "wip0" <+> pretty sw0
|
||||
n <- countDownloads @e brains
|
||||
debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n)
|
||||
|
||||
where
|
||||
|
||||
|
@ -729,8 +751,8 @@ downloadDispatcher brains env = flip runContT pure do
|
|||
-- pure (ConstBurstMachine 256) -- newBurstMachine 60 256 (Just 256) 0.20 0.10
|
||||
bm <- liftIO do
|
||||
case _sockType p of
|
||||
TCP -> AnyBurstMachine @IO <$> newBurstMachine 3 256 (Just 50) 0.10 0.35
|
||||
UDP -> AnyBurstMachine @IO <$> newBurstMachine 3 256 (Just 50) 0.10 0.35
|
||||
TCP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 100) 0.10 0.35
|
||||
UDP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 50) 0.10 0.35
|
||||
|
||||
void $ ContT $ bracket none $ const do
|
||||
debug $ "Cancelling thread for" <+> pretty p
|
||||
|
|
|
@ -122,6 +122,11 @@ instance ( Hashable (Peer e)
|
|||
|
||||
listDownloads = liftIO . selectDownloads
|
||||
|
||||
countDownloads b = do
|
||||
let conn = view brainsDb b
|
||||
liftIO $ query_ conn [qc|select count(hash) from download|]
|
||||
<&> headDef 0 . fmap fromOnly
|
||||
|
||||
listPexInfo = liftIO . selectPexInfo
|
||||
|
||||
updatePexInfo b pex = updateOP b $ insertPexInfo b pex
|
||||
|
@ -130,6 +135,10 @@ instance ( Hashable (Peer e)
|
|||
liftIO $ Cache.insert (view brainsRemoved br) what ()
|
||||
updateOP br (deleteDownload br what)
|
||||
|
||||
newDownload br what = do
|
||||
-- debug $ "Brains: newDownload" <+> pretty what
|
||||
updateOP br (insertDownload br mzero what)
|
||||
|
||||
onKnownPeers br ps = do
|
||||
trace $ "BRAINS: onKnownPeers" <+> pretty ps
|
||||
let tv = view brainsPeers br
|
||||
|
|
|
@ -58,9 +58,15 @@ class HasBrains e a where
|
|||
updatePexInfo :: MonadIO m => a -> [PeerAddr e] -> m ()
|
||||
updatePexInfo _ _ = pure ()
|
||||
|
||||
countDownloads :: MonadIO m => a -> m Int
|
||||
countDownloads _ = pure 0
|
||||
|
||||
listDownloads :: MonadIO m => a -> m [(HashRef, Integer)]
|
||||
listDownloads _ = pure mempty
|
||||
|
||||
newDownload :: MonadIO m => a -> HashRef -> m ()
|
||||
newDownload _ _ = none
|
||||
|
||||
delDownload :: MonadIO m => a -> HashRef -> m ()
|
||||
delDownload _ _ = pure ()
|
||||
|
||||
|
@ -196,7 +202,9 @@ instance HasBrains e (SomeBrains e) where
|
|||
updatePexInfo (SomeBrains a) = updatePexInfo @e a
|
||||
|
||||
listDownloads (SomeBrains a) = listDownloads @e a
|
||||
countDownloads (SomeBrains a) = countDownloads @e a
|
||||
delDownload (SomeBrains a) = delDownload @e a
|
||||
newDownload (SomeBrains a) = newDownload @e a
|
||||
|
||||
onKnownPeers (SomeBrains a) = onKnownPeers a
|
||||
onBlockSize (SomeBrains a) = onBlockSize a
|
||||
|
|
Loading…
Reference in New Issue