mirror of https://github.com/voidlizard/hbs2
fucked
This commit is contained in:
parent
e560067c0e
commit
0db033b54d
|
@ -552,535 +552,6 @@ instance BlockSizeCache e KnownSize where
|
||||||
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
findBlockSize (KnownSize s) _ _ = pure (Just s)
|
||||||
|
|
||||||
|
|
||||||
downloadDispatcher2 :: forall e m . ( e ~ L4Proto
|
|
||||||
, MonadUnliftIO m
|
|
||||||
)
|
|
||||||
=> SomeBrains e
|
|
||||||
-> PeerEnv e
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
downloadDispatcher2 brains e = do
|
|
||||||
|
|
||||||
let blkInfoLock = 5 :: Timeout 'Seconds
|
|
||||||
let blkWaitLock = 60 :: Timeout 'Seconds
|
|
||||||
let workloadFactor = 2.5
|
|
||||||
|
|
||||||
sto <- withPeerM e getStorage
|
|
||||||
|
|
||||||
let downT = 16
|
|
||||||
let sizeT = 4
|
|
||||||
|
|
||||||
inQ <- newTVarIO mempty
|
|
||||||
sizeRQ <- newTQueueIO
|
|
||||||
checkQ <- newTQueueIO
|
|
||||||
|
|
||||||
-- inQ <- withDownload env0 $ asks (view blockInQ)
|
|
||||||
-- checkQ <- withDownload env0 $ asks (view blockCheckQ)
|
|
||||||
sizeQ <- newTQueueIO
|
|
||||||
fetchQ <- newTQueueIO
|
|
||||||
parseQ <- newTQueueIO
|
|
||||||
-- sizeRQ <- withDownload env0 $ asks (view blockSizeRecvQ)
|
|
||||||
|
|
||||||
-- FIXME: cleanup-nonce
|
|
||||||
nonces <- newTVarIO (mempty :: HashMap (Peer e) PeerNonce)
|
|
||||||
|
|
||||||
-- FIXME: cleanup-busy
|
|
||||||
busy <- newTVarIO (mempty :: HashMap PeerNonce Double)
|
|
||||||
|
|
||||||
rates <- newTVarIO (mempty :: IntMap.IntMap [(Peer e,PeerNonce)])
|
|
||||||
|
|
||||||
fetchH <- newTVarIO (mempty :: HashSet (Hash HbSync))
|
|
||||||
sizes <- newTVarIO (mempty :: HashMap (Peer e, Hash HbSync) (Maybe Integer, TimeSpec))
|
|
||||||
sizeReq <- newTVarIO (mempty :: HashMap (Hash HbSync) TimeSpec)
|
|
||||||
|
|
||||||
seen <- newTVarIO (mempty :: HashMap (Hash HbSync) Int)
|
|
||||||
|
|
||||||
peerz <- newTVarIO ( mempty :: HashMap (Peer e) BurstMachine)
|
|
||||||
|
|
||||||
defBurst <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
|
||||||
|
|
||||||
liftIO $ withPeerM e do
|
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
|
||||||
here <- hasBlock sto h <&> isJust
|
|
||||||
unless here do
|
|
||||||
atomically $ modifyTVar inQ (HM.insert h ())
|
|
||||||
atomically $ writeTQueue sizeQ h
|
|
||||||
|
|
||||||
flip runContT pure do
|
|
||||||
|
|
||||||
-- UPDATE-STATS-LOOP
|
|
||||||
void $ ContT $ withAsync $ updateRates e rates nonces
|
|
||||||
|
|
||||||
void $ ContT $ withAsync $ forever do
|
|
||||||
pips <- withPeerM e $ getKnownPeers @e <&> HS.fromList
|
|
||||||
|
|
||||||
for_ pips $ \p -> do
|
|
||||||
here <- readTVarIO peerz <&> HM.member p
|
|
||||||
|
|
||||||
unless here do
|
|
||||||
newBum <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
|
||||||
atomically $ modifyTVar peerz (HM.insert p newBum)
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
modifyTVar peerz ( HM.filterWithKey (\k _ -> HS.member k pips))
|
|
||||||
|
|
||||||
pause @Seconds 5
|
|
||||||
|
|
||||||
void $ ContT $ withAsync do
|
|
||||||
forever do
|
|
||||||
pause @'Seconds 120
|
|
||||||
atomically do
|
|
||||||
q <- readTVar inQ
|
|
||||||
let isInQ x = HM.member x q
|
|
||||||
modifyTVar' fetchH (HS.filter isInQ)
|
|
||||||
modifyTVar' sizeReq (HM.filterWithKey (curry (isInQ . fst)))
|
|
||||||
modifyTVar' sizes (HM.filterWithKey (curry (isInQ . snd . fst)))
|
|
||||||
modifyTVar' seen (HM.filterWithKey (curry (isInQ . fst)))
|
|
||||||
|
|
||||||
livePeers <- readTVar rates <&> mconcat . IntMap.elems
|
|
||||||
let liveNonce = HS.fromList (fmap snd livePeers)
|
|
||||||
let livePeer = HS.fromList (fmap fst livePeers)
|
|
||||||
|
|
||||||
modifyTVar' busy (HM.filterWithKey (\x _ -> HS.member x liveNonce))
|
|
||||||
modifyTVar' nonces (HM.filterWithKey (\x _ -> HS.member x livePeer))
|
|
||||||
|
|
||||||
replicateM_ downT $ ContT $ withAsync do
|
|
||||||
forever do
|
|
||||||
blk <- atomically $ readTQueue checkQ
|
|
||||||
here <- hasBlock sto blk <&> isJust
|
|
||||||
if not here then do
|
|
||||||
atomically $ writeTQueue sizeQ blk
|
|
||||||
else do
|
|
||||||
atomically $ writeTQueue parseQ blk
|
|
||||||
|
|
||||||
void $ ContT $ withAsync $ liftIO $ withPeerM e do
|
|
||||||
forever do
|
|
||||||
blk <- atomically $ readTQueue parseQ
|
|
||||||
|
|
||||||
blks <- findMissedBlocks sto (HashRef blk)
|
|
||||||
|
|
||||||
for_ blks $ \b -> do
|
|
||||||
addDownload @e (Just blk) (fromHashRef b)
|
|
||||||
|
|
||||||
processBlock blk
|
|
||||||
atomically $ modifyTVar inQ (HM.delete (coerce blk))
|
|
||||||
|
|
||||||
replicateM_ 1 $ ContT $ withAsync do
|
|
||||||
forever do
|
|
||||||
|
|
||||||
-- pause @'Seconds 0.25
|
|
||||||
|
|
||||||
items <- atomically do
|
|
||||||
peekTQueue sizeRQ >> STM.flushTQueue sizeRQ
|
|
||||||
|
|
||||||
now <- getTimeCoarse
|
|
||||||
|
|
||||||
todo <- atomically do
|
|
||||||
w <- for items $ \(p,h,s) -> do
|
|
||||||
modifyTVar sizes (HM.insert (p,h) (s, now))
|
|
||||||
readTVar nonces <&> HM.lookup p >>= \case
|
|
||||||
Nothing -> pure ()
|
|
||||||
Just nonce -> setBusySTM nonce busy (Just (setFactor 0 (0.01-)))
|
|
||||||
pure h
|
|
||||||
|
|
||||||
for (L.nub w) pure
|
|
||||||
|
|
||||||
for_ todo $ \b -> do
|
|
||||||
here <- hasBlock sto b <&> isJust
|
|
||||||
|
|
||||||
already <- atomically do
|
|
||||||
readTVar fetchH <&> HS.member b
|
|
||||||
|
|
||||||
when (not here && not already) do
|
|
||||||
atomically $ writeTQueue fetchQ b
|
|
||||||
|
|
||||||
replicateM_ sizeT $ ContT $ withAsync do
|
|
||||||
|
|
||||||
-- TODO: trim-sizeReq
|
|
||||||
let blocks = readTVarIO sizeReq <&> HM.keys <&> fmap (,2)
|
|
||||||
|
|
||||||
polling (Polling 1 1) blocks $ \h -> do
|
|
||||||
pips <- readTVarIO nonces <&> HM.keys
|
|
||||||
s <- readTVarIO sizes <&> HM.toList
|
|
||||||
|
|
||||||
for_ pips $ \p -> do
|
|
||||||
here <- lookupSizeIO sizes p h <&> isRight
|
|
||||||
|
|
||||||
if here then do
|
|
||||||
atomically $ modifyTVar sizeReq (HM.delete h)
|
|
||||||
else
|
|
||||||
withPeerM e $ request p (GetBlockSize @e h)
|
|
||||||
|
|
||||||
|
|
||||||
replicateM_ sizeT $ ContT $ withAsync do
|
|
||||||
|
|
||||||
forever do
|
|
||||||
|
|
||||||
blk <- atomically do
|
|
||||||
readTVar rates <&> not . IntMap.null >>= STM.check
|
|
||||||
readTQueue sizeQ
|
|
||||||
|
|
||||||
debug $ green "PEER SIZE THREAD" <+> pretty blk
|
|
||||||
|
|
||||||
|
|
||||||
r <- readTVarIO rates <&> IntMap.toDescList
|
|
||||||
<&> foldMap snd
|
|
||||||
|
|
||||||
|
|
||||||
answ <- for r $ \(p,nonce) -> do
|
|
||||||
lookupSizeIO sizes p blk >>= \case
|
|
||||||
-- уже спрашивали, отрицает
|
|
||||||
Left{} -> do
|
|
||||||
npi <- newPeerInfo
|
|
||||||
PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))
|
|
||||||
modifyTVar _peerDownloadMiss succ
|
|
||||||
modifyTVar seen (HM.insertWith (+) blk 1)
|
|
||||||
modifyTVar sizeReq (HM.delete blk)
|
|
||||||
|
|
||||||
debug $ red "NONE:" <+> pretty p <+> pretty blk
|
|
||||||
pure 0
|
|
||||||
|
|
||||||
-- уже спрашивали, ответил
|
|
||||||
Right (Just w) -> do
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))
|
|
||||||
modifyTVar sizeReq (HM.delete blk)
|
|
||||||
|
|
||||||
debug $ red "SIZE:" <+> pretty p <+> pretty blk <+> pretty w
|
|
||||||
pure 1
|
|
||||||
|
|
||||||
-- не спрашивали еще
|
|
||||||
Right Nothing -> do
|
|
||||||
(doReq, f) <- atomically do
|
|
||||||
f <- lookupBusySTM nonce busy
|
|
||||||
if f > workloadFactor then
|
|
||||||
pure (False, f)
|
|
||||||
else do
|
|
||||||
setBusySTM nonce busy (Just (setFactor 0.01 (+0.01)))
|
|
||||||
pure (True, f)
|
|
||||||
|
|
||||||
debug $ green "BUSY" <+> pretty p <+> pretty f
|
|
||||||
|
|
||||||
when doReq do
|
|
||||||
debug $ red "SEND REQUEST FOR SIZE" <+> pretty p <+> pretty blk
|
|
||||||
async $ do
|
|
||||||
pause blkInfoLock
|
|
||||||
atomically (setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))))
|
|
||||||
|
|
||||||
withPeerM e $ request p (GetBlockSize @e blk)
|
|
||||||
now <- getTimeCoarse
|
|
||||||
atomically $ modifyTVar sizeReq (HM.insert blk now)
|
|
||||||
|
|
||||||
pure 0
|
|
||||||
|
|
||||||
if sum answ > 0 then do
|
|
||||||
atomically do
|
|
||||||
here <- readTVar fetchH <&> HS.member blk
|
|
||||||
readTVar seen <&> HM.delete blk
|
|
||||||
unless here $
|
|
||||||
writeTQueue fetchQ blk
|
|
||||||
|
|
||||||
else do
|
|
||||||
howMany <- readTVarIO seen <&> (fromMaybe 0 . HM.lookup blk)
|
|
||||||
pips <- readTVarIO nonces <&> HM.size
|
|
||||||
-- FIXME: hardcode
|
|
||||||
when (howMany < 10) do
|
|
||||||
atomically $ writeTQueue sizeQ blk
|
|
||||||
|
|
||||||
void $ ContT $ withAsync do
|
|
||||||
-- FIXME: ban-time-hardcode
|
|
||||||
let loosers = readTVarIO seen <&> fmap (,120) . HM.keys
|
|
||||||
polling (Polling 1 10) loosers $ \it -> do
|
|
||||||
atomically $ writeTQueue checkQ it
|
|
||||||
atomically $ modifyTVar seen (HM.delete it)
|
|
||||||
|
|
||||||
replicateM_ downT $ ContT $ withAsync do
|
|
||||||
|
|
||||||
gen <- newStdGen
|
|
||||||
|
|
||||||
forever do
|
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
|
||||||
|
|
||||||
blk <- atomically $ readTQueue fetchQ
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
modifyTVar fetchH (HS.insert blk)
|
|
||||||
|
|
||||||
here <- hasBlock sto blk <&> isJust
|
|
||||||
|
|
||||||
when here $ exit ()
|
|
||||||
|
|
||||||
debug $ green "PEER DOWNLOAD THREAD" <+> pretty blk
|
|
||||||
|
|
||||||
-- TODO: already-downloaded-possible
|
|
||||||
|
|
||||||
let ws = round . (*trimFactor) <$> randomRs (0, 2.5) gen
|
|
||||||
|
|
||||||
work <- lift $ race (pause @'Seconds 60) $ atomically do
|
|
||||||
r0 <- readTVar rates <&> IntMap.toList
|
|
||||||
bsy <- readTVar busy
|
|
||||||
|
|
||||||
let bx nonce =
|
|
||||||
round $ trimFactor * (1.75 / (1.0 + fromMaybe 0 (HM.lookup nonce bsy)))
|
|
||||||
|
|
||||||
let w = [ (-(v + w0 + bx nonce), p)
|
|
||||||
| (v, (w0, peers)) <- zip ws r0, p@(_,nonce) <- peers
|
|
||||||
] & L.sortOn fst & fmap snd
|
|
||||||
|
|
||||||
avail' <- for w $ \(peer,nonce) -> do
|
|
||||||
p <- readTVar busy <&> HM.lookup nonce
|
|
||||||
sz <- lookupSizeSTM sizes peer blk
|
|
||||||
if p < Just workloadFactor then
|
|
||||||
pure (Just (peer,nonce, sz))
|
|
||||||
else
|
|
||||||
pure Nothing
|
|
||||||
|
|
||||||
let avail = catMaybes avail'
|
|
||||||
|
|
||||||
STM.check (not $ L.null avail)
|
|
||||||
|
|
||||||
found <- for avail $ \(pip, nonce, msz) -> case msz of
|
|
||||||
Right (Just sz) -> do
|
|
||||||
pure $ Just (blk, pip, nonce, sz)
|
|
||||||
|
|
||||||
_ -> pure Nothing
|
|
||||||
|
|
||||||
case headMay (catMaybes found) of
|
|
||||||
Nothing -> do
|
|
||||||
writeTQueue checkQ blk
|
|
||||||
modifyTVar fetchH (HS.delete blk)
|
|
||||||
pure Nothing
|
|
||||||
|
|
||||||
Just what@(_,_,nonce,_) -> do
|
|
||||||
setBusySTM nonce busy (Just (setFactor 1.0 (+1.0)))
|
|
||||||
pure $ Just what
|
|
||||||
|
|
||||||
case work of
|
|
||||||
Right (Just (b,p,nonce,s)) -> do
|
|
||||||
debug $ green "WORKER CHOOSEN" <+> pretty p <+> pretty blk <+> pretty s
|
|
||||||
|
|
||||||
bum <- readTVarIO peerz <&> HM.findWithDefault defBurst p
|
|
||||||
bu <- liftIO $ getCurrentBurst bum
|
|
||||||
|
|
||||||
r <- lift $ race (pause @'Seconds 60) do
|
|
||||||
downloadFromPeer (TimeoutSec 55) bu (KnownSize s) e (coerce blk) p
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
setBusySTM nonce busy (Just (setFactor 0 (const 0)))
|
|
||||||
|
|
||||||
npi <- newPeerInfo
|
|
||||||
PeerInfo{..} <- withPeerM e $ fetch True npi (PeerInfoKey p) id
|
|
||||||
|
|
||||||
debug $ green "DOWNLOAD DONE!" <+> pretty p <+> pretty blk <+> pretty s <+> pretty (isRight r)
|
|
||||||
|
|
||||||
atomically $ modifyTVar fetchH (HS.delete blk)
|
|
||||||
|
|
||||||
case r of
|
|
||||||
Right block -> do
|
|
||||||
-- mh <- putBlock sto block
|
|
||||||
atomically do
|
|
||||||
modifyTVar _peerDownloaded succ
|
|
||||||
modifyTVar _peerDownloadedBlk succ
|
|
||||||
|
|
||||||
_ -> do
|
|
||||||
debug $ red "DOWNLOAD FAILED / TIMEOUT"
|
|
||||||
|
|
||||||
liftIO $ burstMachineAddErrors bum 1
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
modifyTVar _peerDownloadFail succ
|
|
||||||
modifyTVar _peerErrors succ
|
|
||||||
writeTQueue checkQ blk
|
|
||||||
|
|
||||||
_ -> do
|
|
||||||
debug $ red "WAIT FOR PEERS TIMEOUT" <+> pretty blk
|
|
||||||
atomically $ writeTVar busy mempty
|
|
||||||
|
|
||||||
forever do
|
|
||||||
pause @'Seconds 5
|
|
||||||
wip <- readTVarIO inQ <&> HM.size
|
|
||||||
notice $ yellow "wip" <+> pretty wip
|
|
||||||
|
|
||||||
where
|
|
||||||
|
|
||||||
trimFactor :: Double
|
|
||||||
trimFactor = 100
|
|
||||||
|
|
||||||
setFactor d f = \case
|
|
||||||
Nothing -> Just d
|
|
||||||
Just v -> Just (g v)
|
|
||||||
where
|
|
||||||
g y = f y & max 0
|
|
||||||
|
|
||||||
setBusySTM nonce busy = \case
|
|
||||||
Nothing -> modifyTVar busy (HM.delete nonce)
|
|
||||||
Just fn -> modifyTVar busy (HM.alter fn nonce)
|
|
||||||
|
|
||||||
lookupBusySTM nonce busy =
|
|
||||||
readTVar busy <&> fromMaybe 0 . HM.lookup nonce
|
|
||||||
|
|
||||||
lookupSizeSTM sizes p h = do
|
|
||||||
readTVar sizes
|
|
||||||
<&> HM.lookup (p,h)
|
|
||||||
<&> \case
|
|
||||||
Nothing -> Right Nothing
|
|
||||||
Just (Just x,_) -> Right (Just x)
|
|
||||||
Just (Nothing,_) -> Left ()
|
|
||||||
|
|
||||||
lookupSizeIO sizes p h = do
|
|
||||||
atomically $ lookupSizeSTM sizes p h
|
|
||||||
|
|
||||||
processBlock :: forall e s m . ( MonadIO m
|
|
||||||
, MyPeer e
|
|
||||||
, ForSignedBox s
|
|
||||||
, s ~ Encryption e
|
|
||||||
, HasPeerLocator e m
|
|
||||||
, m ~ PeerM e IO
|
|
||||||
)
|
|
||||||
=> Hash HbSync
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
processBlock h = do
|
|
||||||
|
|
||||||
sto <- withPeerM e getStorage
|
|
||||||
|
|
||||||
let parent = Just h
|
|
||||||
|
|
||||||
block <- liftIO $ getBlock sto h
|
|
||||||
|
|
||||||
let bt = tryDetect h <$> block
|
|
||||||
|
|
||||||
-- FIXME: если блок нашёлся, то удаляем его из wip
|
|
||||||
|
|
||||||
-- when (isJust bt) (deleteBlockFromQ h)
|
|
||||||
|
|
||||||
let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do
|
|
||||||
case hrr of
|
|
||||||
Left hx -> addDownload @e parent hx
|
|
||||||
Right hr -> do
|
|
||||||
|
|
||||||
for_ hr $ \(HashRef blk) -> do
|
|
||||||
|
|
||||||
-- debug $ pretty blk
|
|
||||||
|
|
||||||
here <- liftIO (hasBlock sto blk) <&> isJust
|
|
||||||
|
|
||||||
if here then do
|
|
||||||
pure ()
|
|
||||||
-- debug $ "block" <+> pretty blk <+> "is already here"
|
|
||||||
-- unless (h == blk) do
|
|
||||||
-- processBlock blk -- NOTE: хуже не стало
|
|
||||||
-- FIXME: fugure out if it's really required
|
|
||||||
|
|
||||||
else do
|
|
||||||
addDownload @e parent blk
|
|
||||||
|
|
||||||
case bt of
|
|
||||||
Nothing -> addDownload @e mzero h
|
|
||||||
|
|
||||||
Just (SeqRef (SequentialRef n (AnnotatedHashRef a' b))) -> do
|
|
||||||
maybe1 a' none $ \a -> do
|
|
||||||
debug $ "GOT AnnotatedHashRef" <+> pretty a
|
|
||||||
processBlock (fromHashRef a)
|
|
||||||
|
|
||||||
addDownload @e parent (fromHashRef b)
|
|
||||||
|
|
||||||
Just (AnnRef (AnnotatedHashRef ann hx)) -> do
|
|
||||||
maybe1 ann none $ addDownload @e parent . fromHashRef
|
|
||||||
addDownload @e parent (fromHashRef hx)
|
|
||||||
|
|
||||||
Just (MerkleAnn ann) -> do
|
|
||||||
case _mtaMeta ann of
|
|
||||||
NoMetaData -> pure ()
|
|
||||||
ShortMetadata {} -> pure ()
|
|
||||||
AnnHashRef hx -> addDownload @e parent hx
|
|
||||||
|
|
||||||
case _mtaCrypt ann of
|
|
||||||
NullEncryption -> pure ()
|
|
||||||
CryptAccessKeyNaClAsymm h -> addDownload @e parent h
|
|
||||||
EncryptGroupNaClSymm h _ -> addDownload @e parent h
|
|
||||||
|
|
||||||
trace $ "GOT WRAPPED MERKLE. requesting nodes/leaves" <+> pretty h
|
|
||||||
walkMerkleTree (_mtaTree ann) (liftIO . getBlock sto) handleHrr
|
|
||||||
|
|
||||||
Just (Merkle{}) -> do
|
|
||||||
trace $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h
|
|
||||||
walkMerkle h (liftIO . getBlock sto) handleHrr
|
|
||||||
|
|
||||||
Just (Blob{}) -> do
|
|
||||||
-- NOTE: bundle-ref-detection-note
|
|
||||||
-- добавлять обработку BundleRefValue в tryDetect
|
|
||||||
-- слишком накладно, т.к. требует большого количества
|
|
||||||
-- констрейнтов, которые не предполагались там
|
|
||||||
-- изначально. Как временная мера -- пробуем Bundle
|
|
||||||
-- обнаруживать здесь.
|
|
||||||
pure ()
|
|
||||||
|
|
||||||
where
|
|
||||||
unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
updateRates e rates nonces = withPeerM e do
|
|
||||||
|
|
||||||
let wRtt = 5
|
|
||||||
let wUdp = 1.5
|
|
||||||
let wTcp = 1.1
|
|
||||||
let wS = 1.5
|
|
||||||
let eps = 1e-8
|
|
||||||
|
|
||||||
forever do
|
|
||||||
pause @'Seconds 20
|
|
||||||
|
|
||||||
new <- S.toList_ do
|
|
||||||
withPeerM e $ forKnownPeers @e $ \peer pd -> do
|
|
||||||
pinfo <- find (PeerInfoKey peer) id
|
|
||||||
maybe1 pinfo none $ \pip -> do
|
|
||||||
|
|
||||||
let nonce = _peerOwnNonce pd
|
|
||||||
|
|
||||||
atomically $ modifyTVar nonces (HM.insert peer nonce)
|
|
||||||
|
|
||||||
sr <- readTVarIO (_peerDownloaded pip)
|
|
||||||
er <- readTVarIO (_peerDownloadFail pip)
|
|
||||||
|
|
||||||
let s = (eps + realToFrac sr) / (eps + realToFrac (sr + er))
|
|
||||||
|
|
||||||
{- HLINT ignore "Functor law" -}
|
|
||||||
rtt <- medianPeerRTT pip
|
|
||||||
<&> fmap ( (/1e9) . realToFrac )
|
|
||||||
<&> fromMaybe 1.0
|
|
||||||
|
|
||||||
let (udp,tcp) = case view sockType peer of
|
|
||||||
UDP -> (0, wUdp * 1.0)
|
|
||||||
TCP -> (wTcp * 1.0, 0)
|
|
||||||
|
|
||||||
let r = udp + tcp + wS*s
|
|
||||||
lift $ S.yield (peer, nonce, (r, rtt))
|
|
||||||
|
|
||||||
let maxRtt = maximumDef 1.0 [ rtt | (_, _, (_, rtt)) <- new ]
|
|
||||||
|
|
||||||
let mkRate s rtt = round $ trimFactor * (s + wRtt * (1 / (1 + rtt / maxRtt)))
|
|
||||||
|
|
||||||
let newRates = [ (mkRate s rtt, [(p,nonce)] )
|
|
||||||
| (p, nonce, (s, rtt)) <- new
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
writeTVar rates (IntMap.fromListWith (<>) newRates)
|
|
||||||
|
|
||||||
debug $ green "PEER RATES" <+> line <> vcat (fmap fmt newRates)
|
|
||||||
|
|
||||||
where
|
|
||||||
fmt (r,prs) = pretty r <+> hcat (fmap (pretty . view _1) prs)
|
|
||||||
|
|
||||||
|
|
||||||
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
downloadDispatcher :: forall e m . ( e ~ L4Proto
|
||||||
, MonadUnliftIO m
|
, MonadUnliftIO m
|
||||||
)
|
)
|
||||||
|
@ -1095,14 +566,8 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
let seenLimit = 1000
|
let seenLimit = 1000
|
||||||
|
|
||||||
seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef TimeSpec () )
|
|
||||||
|
|
||||||
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime )
|
||||||
|
|
||||||
let sizeCacheLimit = 10000
|
|
||||||
|
|
||||||
sizeCache <- newTVarIO ( HPSQ.empty :: HashPSQ (HashRef,Peer e) TimeSpec (Maybe Integer) )
|
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
work <- newTQueueIO
|
work <- newTQueueIO
|
||||||
|
@ -1111,153 +576,33 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
readTVarIO pts >>= mapM_ (cancel . fst)
|
readTVarIO pts >>= mapM_ (cancel . fst)
|
||||||
atomically $ writeTVar pts mempty
|
atomically $ writeTVar pts mempty
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
join $ atomically (readTQueue work)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
pause @'Seconds 600
|
|
||||||
|
|
||||||
debug $ "CLEANUP SEEN"
|
|
||||||
atomically do
|
|
||||||
fix \next -> do
|
|
||||||
n <- readTVar seen <&> HPSQ.size
|
|
||||||
when (n > seenLimit) do
|
|
||||||
modifyTVar seen HPSQ.deleteMin
|
|
||||||
next
|
|
||||||
|
|
||||||
debug $ "CLEANUP SIZES"
|
|
||||||
atomically do
|
|
||||||
fix \next -> do
|
|
||||||
n <- readTVar sizeCache <&> HPSQ.size
|
|
||||||
when (n > sizeCacheLimit) do
|
|
||||||
modifyTVar sizeCache HPSQ.deleteMin
|
|
||||||
next
|
|
||||||
|
|
||||||
liftIO $ withPeerM env do
|
liftIO $ withPeerM env do
|
||||||
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
new <- atomically do
|
here <- hasBlock sto h <&> isJust
|
||||||
already <- readTVar seen <&> HPSQ.member (HashRef h)
|
already <- readTVarIO blkQ <&> HPSQ.member (HashRef h)
|
||||||
if already then do
|
unless (here || already) do
|
||||||
pure False
|
atomically $ modifyTVar blkQ (HPSQ.insert (HashRef h) 1 10)
|
||||||
else do
|
|
||||||
modifyTVar seen ( HPSQ.insert (HashRef h) now () )
|
|
||||||
modifyTVar blkQ ( HPSQ.insert (HashRef h) 1 t0 )
|
|
||||||
pure True
|
|
||||||
when new do
|
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
|
|
||||||
let missChk = readTVarIO blkQ <&> fmap tr . HPSQ.toList
|
ContT $ withAsync $ manageThreads pts
|
||||||
where tr (k,_,v) = (k, realToFrac v)
|
|
||||||
|
|
||||||
let shiftPrio t = \case
|
|
||||||
Nothing -> ((), Nothing)
|
|
||||||
Just (p,v) -> ((), Just (succ p, v * t ))
|
|
||||||
|
|
||||||
ContT $ withAsync $
|
ContT $ withAsync do
|
||||||
polling (Polling 1 1) missChk $ \h -> do
|
let sq = readTVarIO blkQ <&> fmap (\(a,_,b) -> (a,b)) . HPSQ.toList
|
||||||
|
polling (Polling 1 1) sq $ \h -> do
|
||||||
pips <- readTVarIO pts <&> HM.keys
|
pips <- readTVarIO pts <&> HM.keys
|
||||||
forConcurrently_ pips $ \p -> do
|
forConcurrently_ pips $ \p -> do
|
||||||
now <- getTimeCoarse
|
s <- queryBlockSizeFromPeer brains env (coerce h) p
|
||||||
here <- readTVarIO sizeCache <&> HPSQ.member (h,p)
|
case s of
|
||||||
hereB <- hasBlock sto (coerce h) <&> isJust
|
Right x -> debug $ red "GOT SIZE FOR BLOCK" <+> pretty h <+> pretty x <+> pretty p
|
||||||
when (not here && not hereB) do
|
_ -> none
|
||||||
size <- queryBlockSizeFromPeer brains env (coerce h) p
|
|
||||||
case size of
|
|
||||||
Left{} -> pure ()
|
|
||||||
Right w -> do
|
|
||||||
atomically $ modifyTVar sizeCache ( HPSQ.insert (h,p) now w )
|
|
||||||
debug $ green "GOT SIZE" <+> pretty w <+> pretty h <+> pretty p
|
|
||||||
|
|
||||||
-- atomically $ modifyTVar blkQ ( snd . HPSQ.alter (shiftPrio 1.10) h )
|
|
||||||
|
|
||||||
|
|
||||||
_wip <- newTVarIO ( mempty :: HashMap HashRef () )
|
|
||||||
|
|
||||||
ContT $ withAsync $ do
|
|
||||||
let ws = readTVarIO _wip <&> fmap (,10) . HM.keys
|
|
||||||
polling (Polling 1 1) ws $ \h -> do
|
|
||||||
atomically $ modifyTVar _wip (HM.delete h)
|
|
||||||
|
|
||||||
ContT $ withAsync $ do
|
|
||||||
|
|
||||||
pause @'Seconds 10
|
|
||||||
|
|
||||||
_i <- newTVarIO 0
|
|
||||||
|
|
||||||
flip runContT pure $ forever do
|
|
||||||
|
|
||||||
blokz <- readTVarIO blkQ <&> fmap (view _1) . HPSQ.toList
|
|
||||||
|
|
||||||
flip runContT pure do
|
|
||||||
|
|
||||||
k <- for blokz $ \what -> callCC \next -> do
|
|
||||||
|
|
||||||
atomically $ modifyTVar _i succ
|
|
||||||
i <- readTVarIO _i
|
|
||||||
|
|
||||||
here <- hasBlock sto (coerce what) <&> isJust
|
|
||||||
|
|
||||||
when here $ next 0
|
|
||||||
|
|
||||||
wip <- readTVarIO _wip <&> HM.member what
|
|
||||||
|
|
||||||
when wip $ next 0
|
|
||||||
|
|
||||||
holders <- readTVarIO sizeCache <&> HPSQ.toList
|
|
||||||
|
|
||||||
wip <- readTVarIO _wip
|
|
||||||
|
|
||||||
let jobs = [ (h,p,s)
|
|
||||||
| ((h,p),_,Just s) <- holders
|
|
||||||
, what == h
|
|
||||||
, not (HM.member h wip)
|
|
||||||
] & V.fromList
|
|
||||||
|
|
||||||
when ( V.null jobs ) $ next 0
|
|
||||||
|
|
||||||
let j = i `mod` V.length jobs
|
|
||||||
let (h,p,s) = V.unsafeIndex jobs j
|
|
||||||
worker <- readTVarIO pts <&> HM.lookup p
|
|
||||||
|
|
||||||
n <- maybe1 worker (pure 0) $ \(_,q) -> do
|
|
||||||
atomically do
|
|
||||||
modifyTVar _wip $ HM.insert what ()
|
|
||||||
writeTQueue q (coerce h,s)
|
|
||||||
pure 1
|
|
||||||
|
|
||||||
debug $ green "FOUND WORKER" <+> pretty p <+> pretty h
|
|
||||||
pure n
|
|
||||||
|
|
||||||
when ( sum k == 0 ) do
|
|
||||||
debug $ red "NOTHING"
|
|
||||||
pause @'Seconds 0.1
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
|
||||||
polling (Polling 1 1) missChk $ \h -> do
|
|
||||||
-- debug $ blue "CHECK MISSED BLOCKS" <+> pretty h
|
|
||||||
|
|
||||||
missed <- findMissedBlocks sto h
|
|
||||||
|
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
|
||||||
|
|
||||||
atomically do
|
|
||||||
for_ missed $ \hm -> modifyTVar blkQ (HPSQ.insert (coerce hm) 2 t0)
|
|
||||||
when here do
|
|
||||||
modifyTVar blkQ (HPSQ.delete (coerce h))
|
|
||||||
|
|
||||||
ContT $ withAsync (manageThreads pts)
|
|
||||||
-- ContT $ withAsync (sizeQueryLoop pts rq down)
|
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
size <- atomically $ readTVar blkQ <&> HPSQ.size
|
size <- atomically $ readTVar blkQ <&> HPSQ.size
|
||||||
seenSize <- atomically $ readTVar seen <&> HPSQ.size
|
|
||||||
sizeCacheSize <- atomically $ readTVar sizeCache <&> HPSQ.size
|
|
||||||
debug $ yellow $ "I'm download dispatcher"
|
debug $ yellow $ "I'm download dispatcher"
|
||||||
debug $ yellow $ "wip:" <+> pretty size
|
debug $ yellow $ "wip:" <+> pretty size
|
||||||
debug $ yellow $ "seen:" <+> pretty seenSize
|
|
||||||
debug $ yellow $ "sizes:" <+> pretty sizeCacheSize
|
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
|
|
@ -1197,7 +1197,7 @@ runPeer opts = respawnOnError opts $ do
|
||||||
peerThread "pexLoop" (pexLoop @e brains tcp)
|
peerThread "pexLoop" (pexLoop @e brains tcp)
|
||||||
|
|
||||||
-- FIXME: new-download-loop
|
-- FIXME: new-download-loop
|
||||||
peerThread "downloadDispatcher" (downloadDispatcher2 (SomeBrains brains) env)
|
peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env)
|
||||||
|
|
||||||
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)
|
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue