mirror of https://github.com/voidlizard/hbs2
wip, block download rework
This commit is contained in:
parent
d07db60f74
commit
0326f08392
|
@ -190,7 +190,7 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
|
||||||
. set sBlockSize (fromIntegral size)
|
. set sBlockSize (fromIntegral size)
|
||||||
$ down
|
$ down
|
||||||
|
|
||||||
(what, rs) <- liftIO $ withPeerM env do
|
eblk <- liftIO $ withPeerM env do
|
||||||
update @e new key id
|
update @e new key id
|
||||||
|
|
||||||
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
|
debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h
|
||||||
|
@ -199,30 +199,48 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
let total = L.length $ calcChunks size (fromIntegral chunkSize)
|
let total = L.length $ calcChunks size (fromIntegral chunkSize)
|
||||||
|
|
||||||
blk <- atomically do
|
flip runContT pure do
|
||||||
wtf <- readTVar _sBlockChunks2
|
|
||||||
unless (IntMap.size wtf == total) retry
|
|
||||||
pure wtf
|
|
||||||
|
|
||||||
atomically $ flushTQueue chuQ
|
worker <- ContT $ withAsync do
|
||||||
|
atomically do
|
||||||
|
wtf <- readTVar _sBlockChunks2
|
||||||
|
unless (IntMap.size wtf == total) retry
|
||||||
|
pure wtf
|
||||||
|
|
||||||
|
waiter <- ContT $ withAsync $ fix \next -> do
|
||||||
|
r <- race (pause t) (atomically $ readTQueue chuQ)
|
||||||
|
case r of
|
||||||
|
Left{} -> cancel worker
|
||||||
|
Right{} -> next
|
||||||
|
|
||||||
|
result <- waitCatch worker
|
||||||
|
|
||||||
|
void $ ContT $ bracket none $ const do
|
||||||
|
cancel waiter
|
||||||
|
atomically $ flushTQueue chuQ
|
||||||
|
expire @e key
|
||||||
|
|
||||||
|
pure result
|
||||||
|
|
||||||
|
callCC \exit2 -> do
|
||||||
|
|
||||||
|
blk <- case eblk of
|
||||||
|
Right x -> pure x
|
||||||
|
Left{} -> exit2 (Left (PeerRequestTimeout peer))
|
||||||
|
|
||||||
let rs = LBS.concat $ IntMap.elems blk
|
let rs = LBS.concat $ IntMap.elems blk
|
||||||
|
|
||||||
ha <- enqueueBlock sto rs
|
ha <- enqueueBlock sto rs
|
||||||
|
|
||||||
expire @e key
|
case ha of
|
||||||
|
Nothing -> pure $ Left StorageError
|
||||||
|
|
||||||
pure (ha, rs)
|
Just h1 | h1 == h -> do
|
||||||
|
pure $ Right rs
|
||||||
|
|
||||||
case what of
|
Just h1 -> do
|
||||||
Nothing -> pure $ Left StorageError
|
delBlock sto h1
|
||||||
|
pure $ Left (PeerBlockHashMismatch peer)
|
||||||
Just h1 | h1 == h -> do
|
|
||||||
pure $ Right rs
|
|
||||||
|
|
||||||
Just h1 -> do
|
|
||||||
delBlock sto h1
|
|
||||||
pure $ Left (PeerBlockHashMismatch peer)
|
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue