mirror of https://github.com/voidlizard/hbs2
moar betta
This commit is contained in:
parent
0fc86e34ad
commit
6d1fcf419d
|
@ -456,7 +456,6 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
|||
coo <- genCookie (peer,h)
|
||||
let key = DownloadSessionKey (peer, coo)
|
||||
down@BlockDownload{..} <- newBlockDownload h
|
||||
let chuQ = _sBlockChunks
|
||||
let new = set sBlockChunkSize chunkSize
|
||||
. set sBlockSize (fromIntegral size)
|
||||
$ down
|
||||
|
@ -477,40 +476,44 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
|||
|
||||
flip fix 0 \again n -> do
|
||||
|
||||
wx <- atomically do
|
||||
void $ flushTQueue chuQ
|
||||
readTVar _wx
|
||||
|
||||
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
|
||||
|
||||
lift $ request peer req
|
||||
|
||||
t0 <- getTimeCoarse
|
||||
|
||||
let watchdog = fix \next -> do
|
||||
r <- race (pause @'MilliSeconds wx) do
|
||||
void $ atomically $ readTQueue chuQ
|
||||
either (const none) (const next) r
|
||||
_num <- newTVarIO 0
|
||||
|
||||
r <- liftIO $ race watchdog do
|
||||
atomically do
|
||||
pieces <- readTVar _sBlockChunks2
|
||||
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
|
||||
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
|
||||
wx <- readTVarIO _wx
|
||||
|
||||
let w0 = 2.0 :: Timeout 'MilliSeconds
|
||||
|
||||
let watchdog = flip fix 0 \next x -> do
|
||||
r <- race (pause @'MilliSeconds wx) do
|
||||
atomically do
|
||||
y <- readTVar _num
|
||||
if x == y then retry else pure y
|
||||
either (const none) next r
|
||||
|
||||
r <- liftIO $ pause w0 >> race watchdog do
|
||||
atomically do
|
||||
pieces <- readTVar _sBlockChunks2
|
||||
writeTVar _num ( IntMap.size pieces )
|
||||
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
|
||||
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
|
||||
|
||||
t1 <- getTimeCoarse
|
||||
|
||||
atomically do
|
||||
void $ flushTQueue chuQ
|
||||
when (isRight r) do
|
||||
-- wx0 <- readTVar _wx
|
||||
let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
|
||||
let wx1 = 5 * realToFrac nano / 1e6 -- millis
|
||||
let wx1 = 100 * realToFrac nano / 1e6 -- millis
|
||||
writeTVar _wx wx1
|
||||
|
||||
case r of
|
||||
Left{} -> do
|
||||
if n < 2 then do
|
||||
debug $ red "Retry" <+> pretty i <+> pretty chunkN <+> pretty h <+> pretty peer
|
||||
again (succ n)
|
||||
else do
|
||||
exit2 (Left $ DownloadStuckError (HashRef h) peer)
|
||||
|
|
|
@ -184,11 +184,10 @@ calcBursts bu pieces = go seed
|
|||
|
||||
data BlockDownload =
|
||||
BlockDownload
|
||||
{ _sBlockHash :: Hash HbSync
|
||||
, _sBlockSize :: Size
|
||||
, _sBlockChunkSize :: ChunkSize
|
||||
, _sBlockChunks :: TQueue (ChunkNum, ByteString)
|
||||
, _sBlockChunks2 :: TVar (IntMap ByteString)
|
||||
{ _sBlockHash :: !(Hash HbSync)
|
||||
, _sBlockSize :: !Size
|
||||
, _sBlockChunkSize :: !ChunkSize
|
||||
, _sBlockChunks2 :: !(TVar (IntMap ByteString))
|
||||
}
|
||||
deriving stock (Typeable)
|
||||
|
||||
|
@ -196,8 +195,7 @@ makeLenses 'BlockDownload
|
|||
|
||||
newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
|
||||
newBlockDownload h = liftIO do
|
||||
BlockDownload h 0 0 <$> newTQueueIO <*> newTVarIO mempty
|
||||
|
||||
BlockDownload h 0 0 <$> newTVarIO mempty
|
||||
|
||||
type instance SessionData e (BlockChunks e) = BlockDownload
|
||||
|
||||
|
@ -208,20 +206,6 @@ newtype instance SessionKey e (BlockChunks e) =
|
|||
deriving newtype instance Hashable (SessionKey L4Proto (BlockChunks L4Proto))
|
||||
deriving stock instance Eq (SessionKey L4Proto (BlockChunks L4Proto))
|
||||
|
||||
data BlkS =
|
||||
BlkNew
|
||||
| BlkSizeAsked TimeSpec
|
||||
| BlkDownloadStarted TimeSpec
|
||||
|
||||
data BlockState =
|
||||
BlockState
|
||||
{ _bsStart :: TimeSpec
|
||||
, _bsWip :: Maybe TimeSpec
|
||||
, _bsState :: TVar BlkS
|
||||
}
|
||||
|
||||
makeLenses 'BlockState
|
||||
|
||||
data DownloadMonEnv =
|
||||
DownloadMonEnv
|
||||
{ _downloads :: TVar (HashMap HashRef (IO ()))
|
||||
|
@ -447,21 +431,10 @@ mkAdapter = do
|
|||
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
|
||||
let cKey = DownloadSessionKey (p,c)
|
||||
|
||||
dodo <- lift $ find cKey (view sBlockChunks)
|
||||
|
||||
unless (isJust dodo) $ do
|
||||
debug $ "session lost for peer !" <+> pretty p
|
||||
|
||||
-- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n
|
||||
-- debug $ "GOT SHIT" <+> pretty c <+> pretty n
|
||||
|
||||
se <- MaybeT $ find cKey id
|
||||
let dwnld = view sBlockChunks se
|
||||
let dwnld2 = view sBlockChunks2 se
|
||||
|
||||
-- debug $ "WRITE SHIT" <+> pretty c <+> pretty n
|
||||
liftIO $ atomically do
|
||||
writeTQueue dwnld (n, bs)
|
||||
modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue