hbs2/hbs2-peer/app/BlockDownload.hs

893 lines
29 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module BlockDownload where
import HBS2.Peer.Prelude
import HBS2.Base58
import HBS2.Actors.Peer
import HBS2.Data.Types.Peer
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Data.Bundle
import HBS2.Data.Types.SignedBox
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
import HBS2.Merkle
import HBS2.Net.PeerLocator
import HBS2.Peer.Proto
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Misc.PrettyStuff
import PeerTypes
import PeerInfo
import Brains
import DownloadMon
import Control.Concurrent.STM qualified as STM
import Control.Monad.Trans.Cont
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.Cache qualified as Cache
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.IntMap.Strict (IntMap)
import Data.IntMap.Strict qualified as IntMap
import Data.IntSet qualified as IntSet
import Data.Maybe
import Data.Either
import Data.ByteString.Lazy (ByteString)
import Data.List qualified as L
import Lens.Micro.Platform
import Codec.Serialise
import Data.Hashable
import System.Random.Shuffle (shuffleM)
import Control.Concurrent (getNumCapabilities)
import Streaming.Prelude qualified as S
import System.Random
import UnliftIO
trimFactor :: Double
trimFactor = 100
-- NOTE: if peer does not have a block, it may
-- cause to an unpleasant timeouts
-- So make sure that this peer really answered to
-- GetBlockSize request
downloadFromWithPeer :: forall e m . ( DownloadFromPeerStuff e m
, e ~ L4Proto
, HasPeerLocator e (BlockDownloadM e m) )
=> Peer e
-> Integer
-> Hash HbSync
-> BlockDownloadM e m (Maybe ByteString)
downloadFromWithPeer peer thisBkSize h = do
brains <- asks (view downloadBrains)
npi <- newPeerInfo
pinfo <- lift $ fetch True npi (PeerInfoKey peer) id
sto <- lift getStorage
let chunkSize = case view sockType peer of
UDP -> defChunkSize
TCP -> defChunkSize
coo <- genCookie (peer,h)
let key = DownloadSessionKey (peer, coo)
let chusz = fromIntegral chunkSize -- defChunkSize
dnwld <- newBlockDownload h
let chuQ = view sBlockChunks dnwld
let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral thisBkSize)
$ dnwld
trace $ "downloadFromWithPeer STARTED" <+> pretty coo
lift $ update @e new key id
let burstSizeT = view peerBurst pinfo
burstSize <- liftIO $ readTVarIO burstSizeT
let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)]
let chunkNums = [ 0 .. pred (length offsets) ]
let bursts = calcBursts burstSize chunkNums
-- let burstTime = min defChunkWaitMax $ realToFrac w :: Timeout 'Seconds
-- trace $ "BURST TIME" <+> pretty burstTime
let r = view sBlockChunks2 new
rq <- liftIO newTQueueIO
for_ bursts $ liftIO . atomically . writeTQueue rq
rtt <- medianPeerRTT pinfo <&> fmap ( (/1e9) . realToFrac )
<&> fromMaybe 0.1
r <- fix \next -> do
burst <- liftIO $ atomically $ tryReadTQueue rq
case burst of
Just (i,chunksN) -> do
let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN)
void $ liftIO $ atomically $ STM.flushTQueue chuQ
lift $ request peer (BlockChunks @e coo req)
let waity = do
fix \zzz -> do
wt <- race ( pause @'Seconds 1 ) (atomically $ peekTQueue chuQ >> STM.flushTQueue chuQ)
case wt of
Left{} -> pure False
Right{} -> do
d <- atomically do
m <- readTVar r
hc <- forM [i .. i + chunksN-1 ] $ \j -> do
pure (IntMap.member j m)
pure ( and hc )
if d then pure True else zzz
catched <- race (pause @'Seconds 3 >> pure False) waity <&> either id id
void $ liftIO $ atomically $ STM.flushTQueue chuQ
if catched then do
liftIO $ atomically do
modifyTVar (view peerDownloaded pinfo) (+chunksN)
writeTVar (view peerPingFailed pinfo) 0
else do
liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
updatePeerInfo True peer pinfo
newBurst <- liftIO $ readTVarIO burstSizeT
-- let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 )
liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN)
let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ]
liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
trace $ "new burst: " <+> pretty newBurst
trace1 $ red $ "missed chunks for request" <+> pretty (peer,i,chunksN)
for_ chuchu $ liftIO . atomically . writeTQueue rq
next
Nothing -> do
sz <- liftIO $ readTVarIO r <&> IntMap.size
if sz >= length offsets then do
pieces <- liftIO $ readTVarIO r <&> IntMap.elems
let block = mconcat pieces
let h1 = hashObject @HbSync block
if h1 == h then do
trace $ "PROCESS BLOCK" <+> pretty coo <+> pretty h
-- onBlockDownloaded brains peer h
pure (Just block)
else do
debug $ red "HASH NOT MATCH / PEER MAYBE JERK"
pure Nothing
else do
debug $ red $ "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS"
got <- liftIO $ readTVarIO r <&> IntMap.keysSet
let need = IntSet.fromList (fmap fromIntegral chunkNums)
let missed = IntSet.toList $ need `IntSet.difference` got
-- normally this should not happen
-- however, let's try do download the tails
-- by one chunk a time
for_ missed $ \n -> do
debug $ "MISSED CHUNK" <+> pretty coo <+> pretty n
liftIO $ atomically $ writeTQueue rq (n,1)
next
lift $ expire @e key
debug $ yellow $ "downloadFromWithPeer EXIT" <+> pretty coo
pure r
instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where
getPeerLocator = lift getPeerLocator
-- NOTE: updatePeerInfo is CC
-- updatePeerInfo is actuall doing CC (congestion control)
updatePeerInfo :: forall e m . (e ~ L4Proto, MonadIO m) => Bool -> Peer e -> PeerInfo e -> m ()
updatePeerInfo _ p pinfo | view sockType p == TCP = do
liftIO $ atomically $ writeTVar (view peerBurst pinfo) 256
updatePeerInfo onError _ pinfo = do
t1 <- liftIO getTimeCoarse
void $ liftIO $ atomically $ do
bu <- readTVar (view peerBurst pinfo)
buMax <- readTVar (view peerBurstMax pinfo)
buSet <- readTVar (view peerBurstSet pinfo)
errs <- readTVar (view peerErrors pinfo)
errsLast <- readTVar (view peerErrorsLast pinfo)
t0 <- readTVar (view peerLastWatched pinfo)
down <- readTVar (view peerDownloaded pinfo)
downLast <- readTVar (view peerDownloadedLast pinfo)
-- downFail <- readTVar (view peerDownloadFail pinfo)
-- downBlk <- readTVar (view peerDownloadedBlk pinfo)
let dE = realToFrac $ max 0 (errs - errsLast)
let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9
let eps = floor (dE / dT)
let win = min 10 $ 4 * (defBurstMax - defBurst)
when (down - downLast > 0 || onError) do
(bu1, bus) <- if eps == 0 && not onError then do
let bmm = fromMaybe defBurstMax buMax
let buN = min bmm (ceiling (realToFrac bu * 1.10))
pure (buN, trimUp win $ IntSet.insert buN buSet)
else do
let buM = headMay $ drop 1 $ IntSet.toDescList buSet
writeTVar (view peerBurstMax pinfo) buM
let buN = headDef defBurst $ drop 4 $ IntSet.toDescList buSet
pure (buN, trimDown win $ IntSet.insert buN buSet)
writeTVar (view peerErrorsLast pinfo) errs
writeTVar (view peerLastWatched pinfo) t1
writeTVar (view peerErrorsPerSec pinfo) eps
writeTVar (view peerBurst pinfo) bu1
writeTVar (view peerBurstSet pinfo) bus
writeTVar (view peerDownloadedLast pinfo) down
-- writeTVar (view peerUsefulness pinfo) usefulN
where
trimUp n s | IntSet.size s >= n = IntSet.deleteMin s
| otherwise = s
trimDown n s | IntSet.size s >= n = IntSet.deleteMax s
| otherwise = s
data ByFirst a b = ByFirst a b
instance Eq a => Eq (ByFirst a b) where
(==) (ByFirst a _) (ByFirst b _) = a == b
instance Hashable a => Hashable (ByFirst a b) where
hashWithSalt s (ByFirst a _) = hashWithSalt s a
downloadOnBlockSize :: (MonadIO m, IsPeerAddr e m, MyPeer e)
=> DownloadEnv e
-> (Peer e, Hash HbSync, Maybe Integer)
-> m ()
downloadOnBlockSize denv item@(p,h,size) = do
let f = if isJust size then green else red
debug $ f "GOT BLOCK SIZE" <+> pretty p <+> pretty h <+> pretty size
atomically $ writeTVar (_blockInDirty denv) True
atomically $ writeTQueue (_blockSizeRecvQ denv) item
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockAnnounce e) m
, HasProtocol e (BlockInfo e)
, HasProtocol e (BlockAnnounce e)
, HasProtocol e (BlockChunks e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m
, EventListener e (PeerHandshake e) m
, EventListener e (RefLogUpdateEv e) m
, EventListener e (RefLogRequestAnswer e) m
, EventEmitter e (BlockChunks e) m
, EventEmitter e (DownloadReq e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Sessions e (KnownPeer e) m
, PeerSessionKey e (PeerInfo e)
, HasStorage m
, Pretty (Peer e)
, PeerMessaging e
, IsPeerAddr e m
, HasPeerLocator e m
, e ~ L4Proto
)
=> DownloadEnv e -> m ()
blockDownloadLoop env0 = do
let blkInfoLock = 5 :: Timeout 'Seconds
let blkWaitLock = 60 :: Timeout 'Seconds
let workloadFactor = 1.10
e <- ask
sto <- getStorage
let downT = 8
let sizeT = 1
inQ <- withDownload env0 $ asks (view blockInQ)
checkQ <- withDownload env0 $ asks (view blockCheckQ)
sizeQ <- newTQueueIO
fetchQ <- newTQueueIO
parseQ <- newTQueueIO
sizeRQ <- withDownload env0 $ asks (view blockSizeRecvQ)
-- FIXME: cleanup-nonce
nonces <- newTVarIO (mempty :: HashMap (Peer e) PeerNonce)
-- FIXME: cleanup-busy
busy <- newTVarIO (mempty :: HashMap PeerNonce Double)
rates <- newTVarIO (mempty :: IntMap.IntMap [(Peer e,PeerNonce)])
fetchH <- newTVarIO (mempty :: HashSet (Hash HbSync))
sizes <- newTVarIO (mempty :: HashMap (Peer e, Hash HbSync) (Maybe Integer, TimeSpec))
sizeReq <- newTVarIO (mempty :: HashMap (Hash HbSync) TimeSpec)
seen <- newTVarIO (mempty :: HashMap (Hash HbSync) Int)
flip runContT pure do
void $ ContT $ withAsync updatePeers
-- UPDATE-STATS-LOOP
void $ ContT $ withAsync $ updateRates e rates nonces
replicateM_ downT $ ContT $ withAsync do
forever do
pause @'Seconds 120
atomically do
q <- readTVar inQ
let isInQ x = HashMap.member x q
modifyTVar' fetchH (HS.filter isInQ)
modifyTVar' sizeReq (HM.filterWithKey (curry (isInQ . fst)))
modifyTVar' sizes (HM.filterWithKey (curry (isInQ . snd . fst)))
modifyTVar' seen (HM.filterWithKey (curry (isInQ . fst)))
livePeers <- readTVar rates <&> mconcat . IntMap.elems
let liveNonce = HS.fromList (fmap snd livePeers)
let livePeer = HS.fromList (fmap fst livePeers)
modifyTVar' busy (HM.filterWithKey (\x _ -> HS.member x liveNonce))
modifyTVar' nonces (HM.filterWithKey (\x _ -> HS.member x livePeer))
replicateM_ downT $ ContT $ withAsync do
forever do
blk <- atomically $ readTQueue checkQ
here <- hasBlock sto blk <&> isJust
if not here then do
atomically $ writeTQueue sizeQ blk
else do
atomically $ writeTQueue parseQ blk
void $ ContT $ withAsync do
forever do
blk <- atomically $ readTQueue parseQ
withDownload env0 do
blks <- findMissedBlocks sto (HashRef blk)
for_ blks $ \b -> do
addDownload (Just blk) (fromHashRef b)
processBlock blk
deleteBlockFromQ blk
replicateM_ 1 $ ContT $ withAsync do
forever do
-- pause @'Seconds 0.25
items <- atomically do
peekTQueue sizeRQ >> STM.flushTQueue sizeRQ
now <- getTimeCoarse
todo <- atomically do
w <- for items $ \(p,h,s) -> do
modifyTVar sizes (HashMap.insert (p,h) (s, now))
readTVar nonces <&> HashMap.lookup p >>= \case
Nothing -> pure ()
Just nonce -> setBusySTM nonce busy (Just (setFactor 0 (0.01-)))
pure h
for (L.nub w) pure
for_ todo $ \b -> do
here <- hasBlock sto b <&> isJust
already <- atomically do
readTVar fetchH <&> HS.member b
when (not here && not already) do
atomically $ writeTQueue fetchQ b
replicateM_ sizeT $ ContT $ withAsync do
-- TODO: trim-sizeReq
let blocks = readTVarIO sizeReq <&> HashMap.keys <&> fmap (,2)
polling (Polling 1 1) blocks $ \h -> do
pips <- readTVarIO nonces <&> HashMap.keys
s <- readTVarIO sizes <&> HashMap.toList
for_ pips $ \p -> do
here <- lookupSizeIO sizes p h <&> isRight
if here then do
atomically $ modifyTVar sizeReq (HashMap.delete h)
else
request p (GetBlockSize @e h)
replicateM_ sizeT $ ContT $ withAsync do
forever do
blk <- atomically do
readTVar rates <&> not . IntMap.null >>= STM.check
readTQueue sizeQ
debug $ green "PEER SIZE THREAD" <+> pretty blk
r <- readTVarIO rates <&> IntMap.toDescList
<&> foldMap snd
answ <- for r $ \(p,nonce) -> do
lookupSizeIO sizes p blk >>= \case
-- уже спрашивали, отрицает
Left{} -> do
npi <- newPeerInfo
PeerInfo{..} <- fetch True npi (PeerInfoKey p) id
atomically do
setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))
modifyTVar _peerDownloadMiss succ
modifyTVar seen (HashMap.insertWith (+) blk 1)
modifyTVar sizeReq (HashMap.delete blk)
debug $ red "NONE:" <+> pretty p <+> pretty blk
pure 0
-- уже спрашивали, ответил
Right (Just w) -> do
atomically do
setBusySTM nonce busy (Just (setFactor 0 (+(-0.01))))
modifyTVar sizeReq (HashMap.delete blk)
debug $ red "SIZE:" <+> pretty p <+> pretty blk <+> pretty w
pure 1
-- не спрашивали еще
Right Nothing -> do
(doReq, f) <- atomically do
f <- lookupBusySTM nonce busy
if f > workloadFactor then
pure (False, f)
else do
setBusySTM nonce busy (Just (setFactor 0.01 (+0.01)))
pure (True, f)
debug $ green "BUSY" <+> pretty p <+> pretty f
when doReq do
debug $ red "SEND REQUEST FOR SIZE" <+> pretty p <+> pretty blk
async $ do
pause blkInfoLock
atomically (setBusySTM nonce busy (Just (setFactor 0 (+(-0.01)))))
withPeerM e $ request p (GetBlockSize @e blk)
now <- getTimeCoarse
atomically $ modifyTVar sizeReq (HashMap.insert blk now)
pure 0
if sum answ > 0 then do
atomically do
here <- readTVar fetchH <&> HS.member blk
readTVar seen <&> HM.delete blk
unless here $
writeTQueue fetchQ blk
else do
howMany <- readTVarIO seen <&> (fromMaybe 0 . HashMap.lookup blk)
pips <- readTVarIO nonces <&> HM.size
-- FIXME: hardcode
when (howMany < 10) do
atomically $ writeTQueue sizeQ blk
void $ ContT $ withAsync do
-- FIXME: ban-time-hardcode
let loosers = readTVarIO seen <&> fmap (,120) . HashMap.keys
polling (Polling 1 10) loosers $ \it -> do
atomically $ writeTQueue checkQ it
atomically $ modifyTVar seen (HashMap.delete it)
replicateM_ downT $ ContT $ withAsync do
gen <- newStdGen
forever do
flip runContT pure $ callCC \exit -> do
blk <- atomically $ readTQueue fetchQ
atomically do
modifyTVar fetchH (HS.insert blk)
here <- hasBlock sto blk <&> isJust
when here $ exit ()
debug $ green "PEER DOWNLOAD THREAD" <+> pretty blk
-- TODO: already-downloaded-possible
let ws = round . (*trimFactor) <$> randomRs (0, 2.5) gen
work <- lift $ race (pause @'Seconds 60) $ atomically do
r0 <- readTVar rates <&> IntMap.toList
bsy <- readTVar busy
let bx nonce =
round $ trimFactor * (1.75 / (1.0 + fromMaybe 0 (HashMap.lookup nonce bsy)))
let w = [ (-(v + w0 + bx nonce), p)
| (v, (w0, peers)) <- zip ws r0, p@(_,nonce) <- peers
] & L.sortOn fst & fmap snd
avail' <- for w $ \(peer,nonce) -> do
p <- readTVar busy <&> HashMap.lookup nonce
sz <- lookupSizeSTM sizes peer blk
if p < Just workloadFactor then
pure (Just (peer,nonce, sz))
else
pure Nothing
let avail = catMaybes avail'
STM.check (not $ L.null avail)
found <- for avail $ \(pip, nonce, msz) -> case msz of
Right (Just sz) -> do
pure $ Just (blk, pip, nonce, sz)
_ -> pure Nothing
case headMay (catMaybes found) of
Nothing -> do
writeTQueue checkQ blk
modifyTVar fetchH (HS.delete blk)
pure Nothing
Just what@(_,_,nonce,_) -> do
setBusySTM nonce busy (Just (setFactor 1.0 (+1.0)))
pure $ Just what
case work of
Right (Just (b,p,nonce,s)) -> do
debug $ green "WORKER CHOOSEN" <+> pretty p <+> pretty blk <+> pretty s
r <- lift $ race (pause @'Seconds 60) (withDownload env0 $ downloadFromWithPeer p s b)
atomically do
setBusySTM nonce busy (Just (setFactor 0 (const 0)))
npi <- newPeerInfo
PeerInfo{..} <- lift $ fetch True npi (PeerInfoKey p) id
debug $ green "DOWNLOAD DONE!" <+> pretty p <+> pretty blk <+> pretty s <+> pretty (isRight r)
atomically $ modifyTVar fetchH (HS.delete blk)
case r of
Right (Just block) -> do
mh <- putBlock sto block
atomically do
modifyTVar _peerDownloaded succ
modifyTVar _peerDownloadedBlk succ
case mh of
Nothing -> err $ red "storage write error!"
Just h-> do
atomically $ writeTQueue parseQ h
_ -> do
debug $ red "DOWNLOAD FAILED / TIMEOUT"
atomically do
modifyTVar _peerDownloadFail succ
modifyTVar _peerErrors succ
writeTQueue checkQ blk
_ -> do
debug $ red "WAIT FOR PEERS TIMEOUT" <+> pretty blk
atomically $ writeTVar busy mempty
forever do
withPeerM e $ withDownload env0 do
pause @'Seconds 5
wip <- asks _blockInQ >>= readTVarIO <&> HashMap.size
notice $ yellow "wip" <+> pretty wip
where
setFactor d f = \case
Nothing -> Just d
Just v -> Just (g v)
where
g y = f y & max 0
setBusySTM nonce busy = \case
Nothing -> modifyTVar busy (HashMap.delete nonce)
Just fn -> modifyTVar busy (HashMap.alter fn nonce)
lookupBusySTM nonce busy =
readTVar busy <&> fromMaybe 0 . HashMap.lookup nonce
lookupSizeSTM sizes p h = do
readTVar sizes
<&> HashMap.lookup (p,h)
<&> \case
Nothing -> Right Nothing
Just (Just x,_) -> Right (Just x)
Just (Nothing,_) -> Left ()
lookupSizeIO sizes p h = do
atomically $ lookupSizeSTM sizes p h
updateRates e rates nonces = withPeerM e do
let wRtt = 5
let wUdp = 1.75
let wTcp = 1.0
let wS = 1.5
let eps = 1e-8
forever do
pause @'Seconds 20
new <- S.toList_ do
withPeerM e $ forKnownPeers @e $ \peer pd -> do
pinfo <- find (PeerInfoKey peer) id
maybe1 pinfo none $ \pip -> do
let nonce = _peerOwnNonce pd
atomically $ modifyTVar nonces (HashMap.insert peer nonce)
sr <- readTVarIO (_peerDownloaded pip)
er <- readTVarIO (_peerDownloadFail pip)
let s = (eps + realToFrac sr) / (eps + realToFrac (sr + er))
{- HLINT ignore "Functor law" -}
rtt <- medianPeerRTT pip
<&> fmap ( (/1e9) . realToFrac )
<&> fromMaybe 1.0
let (udp,tcp) = case view sockType peer of
UDP -> (0, wUdp * 1.0)
TCP -> (wTcp * 1.0, 0)
let r = udp + tcp + wS*s
lift $ S.yield (peer, nonce, (r, rtt))
let maxRtt = maximumDef 1.0 [ rtt | (_, _, (_, rtt)) <- new ]
let mkRate s rtt = round $ trimFactor * (s + wRtt * (1 / (1 + rtt / maxRtt)))
let newRates = [ (mkRate s rtt, [(p,nonce)] )
| (p, nonce, (s, rtt)) <- new
]
atomically do
writeTVar rates (IntMap.fromListWith (<>) newRates)
debug $ green "PEER RATES" <+> line <> vcat (fmap fmt newRates)
where
fmt (r,prs) = pretty r <+> hcat (fmap (pretty . view _1) prs)
updatePeers = do
e <- ask
pl <- getPeerLocator @e
forever $ withPeerM e do
pause @'Seconds 3.0
pee <- knownPeers @e pl
npi <- newPeerInfo
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo False p pinfo
processBlock :: forall e m . ( MonadIO m
, HasStorage m
, MyPeer e
, ForSignedBox e
, HasPeerLocator e (BlockDownloadM e m)
)
=> Hash HbSync
-> BlockDownloadM e m ()
processBlock h = do
sto <- lift getStorage
brains <- asks (view downloadBrains)
let parent = Just h
block <- liftIO $ getBlock sto h
let bt = tryDetect h <$> block
-- FIXME: если блок нашёлся, то удаляем его из wip
when (isJust bt) (deleteBlockFromQ h)
let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do
case hrr of
Left hx -> addDownload parent hx
Right hr -> do
for_ hr $ \(HashRef blk) -> do
-- debug $ pretty blk
here <- liftIO (hasBlock sto blk) <&> isJust
if here then do
pure ()
-- debug $ "block" <+> pretty blk <+> "is already here"
-- unless (h == blk) do
-- processBlock blk -- NOTE: хуже не стало
-- FIXME: fugure out if it's really required
else do
addDownload parent blk
case bt of
Nothing -> addDownload mzero h
Just (SeqRef (SequentialRef n (AnnotatedHashRef a' b))) -> do
maybe1 a' none $ \a -> do
debug $ "GOT AnnotatedHashRef" <+> pretty a
processBlock (fromHashRef a)
addDownload parent (fromHashRef b)
Just (AnnRef (AnnotatedHashRef ann hx)) -> do
maybe1 ann none $ addDownload parent . fromHashRef
addDownload parent (fromHashRef hx)
Just (MerkleAnn ann) -> do
case _mtaMeta ann of
NoMetaData -> pure ()
ShortMetadata {} -> pure ()
AnnHashRef hx -> addDownload parent hx
case _mtaCrypt ann of
NullEncryption -> pure ()
CryptAccessKeyNaClAsymm h -> addDownload parent h
EncryptGroupNaClSymm h _ -> addDownload parent h
trace $ "GOT WRAPPED MERKLE. requesting nodes/leaves" <+> pretty h
walkMerkleTree (_mtaTree ann) (liftIO . getBlock sto) handleHrr
Just (Merkle{}) -> do
trace $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h
walkMerkle h (liftIO . getBlock sto) handleHrr
Just (Blob{}) -> do
-- NOTE: bundle-ref-detection-note
-- добавлять обработку BundleRefValue в tryDetect
-- слишком накладно, т.к. требует большого количества
-- констрейнтов, которые не предполагались там
-- изначально. Как временная мера -- пробуем Bundle
-- обнаруживать здесь.
mon <- asks (view downloadMon)
runMaybeT do
bs <- MaybeT $ pure block
-- TODO: check-if-we-somehow-trust-this-key
(pk, BundleRefSimple ref) <- MaybeT $ pure $ deserialiseOrFail @(BundleRefValue e) bs
& either (const Nothing) unboxBundleRef
debug $ "GOT BundleRefValue" <+> parens (pretty ref)
downloadMonAdd mon ref do
debug $ "Downloaded bundle:" <+> pretty ref
r <- importBundle sto (void . putBlock sto . snd) ref
case r of
Right{} -> debug $ "Imported bundle: " <+> pretty ref
Left e -> err (viaShow e)
lift $ addDownload parent (fromHashRef ref)
pure ()
where
unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)
-- So don't be confused with types
--
mkAdapter :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockChunks e)
, Hashable (SessionKey e (BlockChunks e))
, Sessions e (BlockChunks e) (ResponseM e m)
, Typeable (SessionKey e (BlockChunks e))
, EventEmitter e (BlockChunks e) m
, Pretty (Peer e)
)
=> m (BlockChunksI e (ResponseM e m ))
mkAdapter = do
storage <- getStorage
pure $
BlockChunksI
{ blkSize = liftIO . hasBlock storage
, blkChunk = \h o s -> liftIO (getChunk storage h o s)
, blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash)
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = DownloadSessionKey (p,c)
dodo <- lift $ find cKey (view sBlockChunks)
unless (isJust dodo) $ do
debug $ "session lost for peer !" <+> pretty p
-- debug $ "FINDING-SESSION:" <+> pretty c <+> pretty n
-- debug $ "GOT SHIT" <+> pretty c <+> pretty n
se <- MaybeT $ find cKey id
let dwnld = view sBlockChunks se
let dwnld2 = view sBlockChunks2 se
-- debug $ "WRITE SHIT" <+> pretty c <+> pretty n
liftIO $ atomically do
writeTQueue dwnld (n, bs)
modifyTVar' dwnld2 (IntMap.insert (fromIntegral n) bs)
}