moar betta

This commit is contained in:
voidlizard 2024-11-14 14:30:16 +03:00
parent fc9d1fc4e8
commit 54d7e1af6f
2 changed files with 25 additions and 49 deletions

View File

@ -456,7 +456,6 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
coo <- genCookie (peer,h) coo <- genCookie (peer,h)
let key = DownloadSessionKey (peer, coo) let key = DownloadSessionKey (peer, coo)
down@BlockDownload{..} <- newBlockDownload h down@BlockDownload{..} <- newBlockDownload h
let chuQ = _sBlockChunks
let new = set sBlockChunkSize chunkSize let new = set sBlockChunkSize chunkSize
. set sBlockSize (fromIntegral size) . set sBlockSize (fromIntegral size)
$ down $ down
@ -477,40 +476,44 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
flip fix 0 \again n -> do flip fix 0 \again n -> do
wx <- atomically do
void $ flushTQueue chuQ
readTVar _wx
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
lift $ request peer req lift $ request peer req
t0 <- getTimeCoarse t0 <- getTimeCoarse
let watchdog = fix \next -> do _num <- newTVarIO 0
r <- race (pause @'MilliSeconds wx) do
void $ atomically $ readTQueue chuQ
either (const none) (const next) r
r <- liftIO $ race watchdog do wx <- readTVarIO _wx
let w0 = 2.0 :: Timeout 'MilliSeconds
let watchdog = flip fix 0 \next x -> do
r <- race (pause @'MilliSeconds wx) do
atomically do
y <- readTVar _num
if x == y then retry else pure y
either (const none) next r
r <- liftIO $ pause w0 >> race watchdog do
atomically do atomically do
pieces <- readTVar _sBlockChunks2 pieces <- readTVar _sBlockChunks2
writeTVar _num ( IntMap.size pieces )
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
t1 <- getTimeCoarse t1 <- getTimeCoarse
atomically do atomically do
void $ flushTQueue chuQ
when (isRight r) do when (isRight r) do
-- wx0 <- readTVar _wx
let nano = toNanoSeconds $ TimeoutTS (t1 - t0) let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
let wx1 = 5 * realToFrac nano / 1e6 -- millis let wx1 = 100 * realToFrac nano / 1e6 -- millis
writeTVar _wx wx1 writeTVar _wx wx1
case r of case r of
Left{} -> do Left{} -> do
if n < 2 then do if n < 2 then do
debug $ red "Retry" <+> pretty i <+> pretty chunkN <+> pretty h <+> pretty peer
again (succ n) again (succ n)
else do else do
exit2 (Left $ DownloadStuckError (HashRef h) peer) exit2 (Left $ DownloadStuckError (HashRef h) peer)

View File

@ -184,11 +184,10 @@ calcBursts bu pieces = go seed
data BlockDownload = data BlockDownload =
BlockDownload BlockDownload
{ _sBlockHash :: Hash HbSync { _sBlockHash :: !(Hash HbSync)
, _sBlockSize :: Size , _sBlockSize :: !Size
, _sBlockChunkSize :: ChunkSize , _sBlockChunkSize :: !ChunkSize
, _sBlockChunks :: TQueue (ChunkNum, ByteString) , _sBlockChunks2 :: !(TVar (IntMap ByteString))
, _sBlockChunks2 :: TVar (IntMap ByteString)
} }
deriving stock (Typeable) deriving stock (Typeable)
@ -196,8 +195,7 @@ makeLenses 'BlockDownload
newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
newBlockDownload h = liftIO do newBlockDownload h = liftIO do
BlockDownload h 0 0 <$> newTQueueIO <*> newTVarIO mempty BlockDownload h 0 0 <$> newTVarIO mempty
type instance SessionData e (BlockChunks e) = BlockDownload type instance SessionData e (BlockChunks e) = BlockDownload
@ -208,20 +206,6 @@ newtype instance SessionKey e (BlockChunks e) =
deriving newtype instance Hashable (SessionKey L4Proto (BlockChunks L4Proto)) deriving newtype instance Hashable (SessionKey L4Proto (BlockChunks L4Proto))
deriving stock instance Eq (SessionKey L4Proto (BlockChunks L4Proto)) deriving stock instance Eq (SessionKey L4Proto (BlockChunks L4Proto))
data BlkS =
BlkNew
| BlkSizeAsked TimeSpec
| BlkDownloadStarted TimeSpec
data BlockState =
BlockState
{ _bsStart :: TimeSpec
, _bsWip :: Maybe TimeSpec
, _bsState :: TVar BlkS
}
makeLenses 'BlockState
data DownloadMonEnv = data DownloadMonEnv =
DownloadMonEnv DownloadMonEnv
{ _downloads :: TVar (HashMap HashRef (IO ())) { _downloads :: TVar (HashMap HashRef (IO ()))
@ -447,21 +431,10 @@ mkAdapter = do
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = DownloadSessionKey (p,c) let cKey = DownloadSessionKey (p,c)
dodo <- lift $ find cKey (view sBlockChunks)
unless (isJust dodo) $ do
debug $ "session lost for peer !" <+> pretty p
-- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n
-- debug $ "GOT SHIT" <+> pretty c <+> pretty n
se <- MaybeT $ find cKey id se <- MaybeT $ find cKey id
let dwnld = view sBlockChunks se
let dwnld2 = view sBlockChunks2 se let dwnld2 = view sBlockChunks2 se
-- debug $ "WRITE SHIT" <+> pretty c <+> pretty n
liftIO $ atomically do liftIO $ atomically do
writeTQueue dwnld (n, bs)
modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs) modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs)
} }