good but fucked a little bit

This commit is contained in:
voidlizard 2024-11-11 16:35:34 +03:00
parent 79a352a83f
commit ca995a8228
1 changed files with 135 additions and 55 deletions

View File

@ -60,6 +60,7 @@ import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy (ByteString)
import Data.Vector qualified as V
import Data.Vector ((!?))
import Data.ByteString qualified as BS
import Data.List qualified as L
import Data.Coerce
@ -69,8 +70,11 @@ import Control.Concurrent.STM qualified as STM
import UnliftIO.Concurrent
import System.Random
import Lens.Micro.Platform
import Streaming.Prelude qualified as S
import System.Random qualified as R
import System.Random.Shuffle qualified as Shuffle
import Streaming.Prelude qualified as S
data DownloadError e =
@ -141,7 +145,7 @@ queryBlockSizeFromPeer cache e h peer = do
s <- lift $ findBlockSize @e cache _peerSignKey h
debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s
-- debug $ "FOUND CACHED VALUE" <+> pretty h <+> pretty s
maybe none (exit . Just) s
@ -558,7 +562,7 @@ data BlockFetchResult =
| BlockAlreadyHere
data Work =
RequestSize HashRef (Maybe Integer -> IO ())
RequestSize HashRef (Maybe Integer -> IO ())
| FetchBlock HashRef Integer (BlockFetchResult -> IO ())
downloadDispatcher :: forall e m . ( e ~ L4Proto
@ -572,14 +576,18 @@ downloadDispatcher brains env = flip runContT pure do
pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), TQueue Work))
-- tasks <- newTVarIO ( HPSQ.empty :: HashPSQ (Work e) Double (TVar Int) )
_blkNum <- newTVarIO 0
wip <- newTVarIO ( mempty :: HashMap HashRef NominalDiffTime )
sizeRq <- newTVarIO ( HPSQ.empty @HashRef @TimeSpec @() )
sizeRqWip <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) TimeSpec)
sizeCache <- newTVarIO ( mempty :: HashMap (Peer e, HashRef) (Maybe Integer) )
downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @TimeSpec @Integer )
downWip <- newTVarIO ( HPSQ.empty @(Peer e, HashRef) @Double @Integer )
choosen <- newTVarIO ( mempty :: HashSet HashRef )
stat <- newTVarIO ( mempty :: HashMap (Peer e) Double )
fuckup <- newTVarIO ( mempty :: HashMap HashRef (HashSet (Peer e)) )
-- done <- newTVarIO ( mempty :: HashSet HashRef )
void $ ContT $ withAsync $ manageThreads pts
void $ ContT $ withAsync $ manageThreads stat pts
sto <- withPeerM env getStorage
@ -594,15 +602,23 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar wip (HM.insert (HashRef h) 10)
let onBlockSize p h s = do
let color = if isJust s then green else red
debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p
now <- getTimeCoarse
atomically do
modifyTVar sizeRq (HPSQ.delete h)
modifyTVar sizeCache (HM.insert (p,h) s)
modifyTVar sizeRqWip (HM.delete (p,h))
maybe1 s none $ \size -> do
modifyTVar downWip (HPSQ.insert (p,h) now size)
modifyTVar sizeRqWip (HM.delete (p,h))
modifyTVar sizeRq (HPSQ.delete h)
-- let color = if isJust s then green else red
-- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p
-- dtt <- randomRIO (-0.05, 0.05)
let dtt = 0
here <- hasBlock sto (coerce h) <&> isJust
unless here do
dt <- readTVarIO stat <&> (+dtt) . fromMaybe 1.0 . HM.lookup p
atomically do
-- blkNum <- stateTVar _blkNum (\x -> (x, succ x))
modifyTVar sizeCache (HM.insert (p,h) s)
maybe1 s none $ \size -> do
modifyTVar downWip (HPSQ.insert (p,h) dt size)
parseQ <- newTQueueIO
@ -624,17 +640,32 @@ downloadDispatcher brains env = flip runContT pure do
BlockAlreadyHere -> do
debug $ yellow "ALREADY HAVE BLOCK" <+> pretty h
deleteBlockFromWip h
-- deleteBlockFromWip h
BlockFetched bs -> do
debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p
-- debug $ green "GOT BLOCK" <+> pretty h <+> pretty (LBS.length bs) <+> pretty p
void $ putBlock sto bs
atomically $ writeTQueue parseQ (h, bs)
deleteBlockFromWip h
-- atomically $ modifyTVar done (HS.insert h)
-- deleteBlockFromWip h
BlockFetchError -> do
now <- getTimeCoarse
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
atomically $ modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do
let blkz = readTVarIO choosen <&> fmap (,5) . HS.toList
polling (Polling 1 1) blkz $ \h -> do
here <- hasBlock sto (coerce h) <&> isJust
if here then do
liftIO $ deleteBlockFromWip h
else do
now <- getTimeCoarse
atomically do
modifyTVar sizeRq (HPSQ.insert h now ())
modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
@ -644,16 +675,22 @@ downloadDispatcher brains env = flip runContT pure do
when here $ do
liftIO $ deleteBlockFromWip h
-- ContT $ withAsync do
-- let blkz = readTVarIO wip <&> HM.toList
-- polling (Polling 1 1) blkz $ \h -> do
-- debug $ "POLL BLOCK" <+> pretty h
-- atomically $ modifyTVar wip (HM.adjust (*1.10) h)
-- here <- hasBlock sto (coerce h) <&> isJust
-- now <- getTimeCoarse
-- unless here $ atomically do
-- modifyTVar sizeRq (HPSQ.insert h now ())
-- modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
atomically do
w <- readTVar wip
cs <- readTVar sizeCache <&> HM.toList
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ])
-- ContT $ withAsync do
-- let blkz = readTVarIO wip <&> HM.toList
-- polling (Polling 1 1) blkz $ \h -> do
-- debug $ "POLL BLOCK" <+> pretty h
-- atomically $ modifyTVar wip (HM.adjust (*1.10) h)
-- here <- hasBlock sto (coerce h) <&> isJust
-- now <- getTimeCoarse
-- unless here $ atomically do
-- modifyTVar sizeRq (HPSQ.insert h now ())
-- modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do
(h,bs) <- atomically $ readTQueue parseQ
@ -685,26 +722,49 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar sizeRqWip (HM.insert (p,h) now)
ContT $ withAsync $ forever do
atomically do
peers <- readTVar pts
let n = HM.size peers * 20
when ( n == 0 ) retry
(w1,dw,peerz) <- atomically do
(w,rest) <- readTVar downWip <&> L.splitAt n . HPSQ.toList
peers <- readTVar pts
writeTVar downWip (HPSQ.fromList rest)
let n = HM.size peers
when (L.null w) retry
when (n == 0) retry
for_ w $ \e@((p,h),prio,size) -> do
case HM.lookup p peers of
Nothing -> modifyTVar downWip (HPSQ.insert (p,h) prio size)
Just (_,q) -> do
here <- readTVar choosen <&> HS.member h
unless here do
writeTQueue q (FetchBlock h size (onBlock p h))
modifyTVar choosen (HS.insert h)
already <- readTVar choosen
dw <- readTVar downWip
let total = [ x | x@((p,_),_,_) <- L.take 256 (HPSQ.toList dw), HM.member p peers ]
let queue = total
let qq = [ (h, HS.singleton p)
| ((p,h),_,_) <- queue, not (HS.member h already), HM.member p peers
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
when (L.null qq) retry
for_ total $ \(k,_,_) -> do
modifyTVar downWip (HPSQ.delete k)
pure (qq,dw,peers)
for_ w1 $ \(h,pps) -> do
i <- randomIO @Int <&> (`mod` V.length pps) . abs
debug $ blue "CHOOSIN PEER" <+> pretty h <+> pretty i <+> pretty (V.length pps)
flip runContT pure do
p0 <- ContT $ maybe1 (pps !? i) (warn $ red "FUCKED PEER!")
(_,who) <- ContT $ maybe1 (HM.lookup p0 peerz) (warn $ red "FUCKED QUEUE")
(_,size) <- ContT $ maybe1 (HPSQ.lookup (p0,h) dw) (warn $ red "FUCKED SIZE")
atomically do
modifyTVar fuckup (HM.insertWith (<>) h (HS.singleton p0))
writeTQueue who (FetchBlock h size (onBlock p0 h))
modifyTVar choosen (HS.insert h)
forever $ (>> pause @'Seconds 10) do
@ -713,16 +773,21 @@ downloadDispatcher brains env = flip runContT pure do
sdw <- readTVarIO downWip <&> HPSQ.size
scsize <- readTVarIO sizeCache <&> HM.size
chooSize <- readTVarIO choosen <&> HS.size
fuck <- readTVarIO fuckup <&> HM.toList
let fucks = [ 1 | (h,p) <- fuck, HS.size p > 1 ] & sum
debug $ yellow $ "wip0" <+> pretty sw0
<+> "wip1:" <+> pretty srqw
<+> "wip2" <+> pretty sdw
<+> "wip3" <+> pretty chooSize
<+> "sizes" <+> pretty scsize
<+> "fuckup" <+> pretty fucks
where
manageThreads pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS"
peers <- getKnownPeers @e <&> HS.fromList
@ -731,7 +796,7 @@ downloadDispatcher brains env = flip runContT pure do
here <- readTVarIO pts <&> HM.member p
unless here do
work <- newTQueueIO
a <- async (peerThread p work)
a <- async (peerThread stat p work)
atomically $ modifyTVar pts (HM.insert p (a,work))
loosers <- atomically do
@ -743,7 +808,9 @@ downloadDispatcher brains env = flip runContT pure do
mapM_ cancel (fmap (fst.snd) loosers)
peerThread p work = flip runContT pure do
peerThread stat p work = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] )
sto <- withPeerM env getStorage
@ -758,30 +825,43 @@ downloadDispatcher brains env = flip runContT pure do
bmt <- ContT $ withAsync $ runBurstMachine bm
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
tss <- readTVarIO btimes
unless (L.null tss) do
let avg = sum tss / realToFrac (L.length tss)
atomically $ modifyTVar stat (HM.insert p avg)
twork <- ContT $ withAsync $ forever do
w <- atomically $ readTQueue work
case w of
RequestSize h answ -> do
here <- hasBlock sto (coerce h) <&> isJust
unless here do
debug $ yellow "RequestSize" <+> pretty h <+> pretty p
s <- queryBlockSizeFromPeer brains env (coerce h) p
case s of
Left{} -> none
Right s -> liftIO (answ s)
here <- hasBlock sto (coerce h)
case here of
Just s -> liftIO (answ (Just s))
Nothing -> do
s <- queryBlockSizeFromPeer brains env (coerce h) p
case s of
Left{} -> liftIO (answ Nothing)
Right s -> liftIO (answ s)
FetchBlock h s answ -> flip fix 0 $ \next i -> do
here <- hasBlock sto (coerce h) <&> isJust
if here then do
liftIO $ answ BlockAlreadyHere
else do
debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p
-- debug $ yellow "START TO DOWNLOAD" <+> pretty h <+> pretty p
bu <- lift $ getCurrentBurst bm
r <- lift $ downloadFromPeer (TimeoutSec 30) bu (KnownSize s) env (coerce h) p
t0 <- getTimeCoarse
r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p
t1 <- getTimeCoarse
case r of
Right bs -> liftIO $ answ (BlockFetched bs)
Right bs -> do
let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
liftIO $ answ (BlockFetched bs)
Left{} | i >= 5 -> liftIO $ answ BlockFetchError
| otherwise -> next (succ i)
@ -790,5 +870,5 @@ downloadDispatcher brains env = flip runContT pure do
pause @'Seconds 10
debug $ yellow "I'm thread" <+> pretty p
void $ waitAnyCatchCancel [bmt,bs,twork]
void $ waitAnyCatchCancel [bmt,bs,twork,tstat]