hbs2/hbs2-peer/app/BlockDownload.hs

775 lines
24 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-}
module BlockDownload where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
import HBS2.Merkle
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.Definition
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.System.Logger.Simple
import PeerInfo
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM.TSem as Sem
import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable hiding (find)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Lens.Micro.Platform
import Numeric ( showGFloat )
import Prettyprinter
import System.Random.Shuffle
import Type.Reflection
calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)]
calcBursts bu pieces = go seed
where
seed = fmap (,1) pieces
go ( (n1,s1) : (n2,s2) : xs )
| (s1 + s2) <= bu = go ((n1, s1+s2) : xs)
| otherwise = (n1,s1) : go ( (n2,s2) : xs)
go [x] = [x]
go [] = []
data BlockDownload =
BlockDownload
{ _sBlockHash :: Hash HbSync
, _sBlockSize :: Size
, _sBlockChunkSize :: ChunkSize
, _sBlockChunks :: TQueue (ChunkNum, ByteString)
}
deriving stock (Typeable)
makeLenses 'BlockDownload
newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload
newBlockDownload h = do
BlockDownload h 0 0 <$> liftIO newTQueueIO
type instance SessionData e (BlockChunks e) = BlockDownload
newtype instance SessionKey e (BlockChunks e) =
DownloadSessionKey (Peer e, Cookie e)
deriving stock (Generic,Typeable)
data BsFSM = Initial
| Downloading
| Postpone
data BlockState =
BlockState
{ _bsStart :: TimeSpec
, _bsTimes :: Int
, _bsState :: BsFSM
, _bsWipTo :: Double
}
makeLenses 'BlockState
data DownloadEnv e =
DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync)
, _peerBusy :: TVar (HashMap (Peer e) ())
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) ()
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
, _blockPostponed :: Cache (Hash HbSync) ()
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
}
makeLenses 'DownloadEnv
class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e)
newDownloadEnv = liftIO do
DownloadEnv <$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
<*> newTVarIO mempty
<*> Cache.newCache Nothing
<*> newTVarIO mempty
newtype BlockDownloadM e m a =
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader (DownloadEnv e)
, MonadTrans
)
runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a
runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv
withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
withDownload e m = runReaderT ( fromBlockDownloadM m ) e
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
setBlockState h s = do
sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
-- FIXME: что-то более обоснованное
calcWaitTime :: MonadIO m => BlockDownloadM e m Double
calcWaitTime = do
wip <- asks (view blockWip) >>= liftIO . Cache.size
let wipn = realToFrac wip * 3
let waiting = 5 + ( (realToFrac (toNanoSeconds defBlockWaitMax) * wipn) / 1e9 )
pure waiting
touchBlockState :: MonadIO m => Hash HbSync -> BsFSM -> BlockDownloadM e m BlockState
touchBlockState h st = do
sh <- asks (view blockState)
t <- liftIO $ getTime MonotonicCoarse
wo <- calcWaitTime
let s = BlockState t 0 st wo
sn <- liftIO $ atomically $ do
modifyTVar sh (HashMap.alter (doAlter s) h)
readTVar sh <&> fromMaybe s . HashMap.lookup h
case view bsState sn of
Initial -> do
let t0 = view bsStart sn
let dt = realToFrac (toNanoSecs t - toNanoSecs t0) / 1e9
wip <- asks (view blockWip) >>= liftIO . Cache.size
let waiting = view bsWipTo sn
if dt > waiting then do -- FIXME: remove-hardcode
debug $ "pospone block" <+> pretty h <+> pretty dt <+> pretty (showGFloat (Just 2) waiting "")
let sn1 = sn { _bsState = Postpone }
liftIO $ atomically $ modifyTVar sh (HashMap.insert h sn1)
pure sn1
else do
pure sn
_ -> pure sn
where
doAlter s1 = \case
Nothing -> Just s1
Just s -> Just $ over bsTimes succ s
getBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
getBlockState h = do
sh <- asks (view blockState)
touchBlockState h Initial
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
addDownload h = do
tinq <- asks (view blockInQ)
doAdd <- liftIO $ atomically $ stateTVar tinq
\hm -> case HashMap.lookup h hm of
Nothing -> (True, HashMap.insert h () hm)
Just{} -> (False, HashMap.insert h () hm)
when doAdd $ do
q <- asks (view downloadQ)
wip <- asks (view blockWip)
liftIO do
atomically $ writeTQueue q h
Cache.insert wip h ()
void $ touchBlockState h Initial
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
removeFromWip h = do
wip <- asks (view blockWip)
st <- asks (view blockState)
po <- asks (view blockPostponed)
liftIO $ Cache.delete wip h
liftIO $ Cache.delete po h
liftIO $ atomically $ modifyTVar' st (HashMap.delete h)
withFreePeer :: forall e m .
( MyPeer e
, MonadIO m
, Sessions e (KnownPeer e) m
)
=> Peer e
-> BlockDownloadM e m ()
-> BlockDownloadM e m ()
-> BlockDownloadM e m ()
withFreePeer p n m = do
busy <- asks (view peerBusy)
avail <- liftIO $ atomically
$ stateTVar busy $
\s -> case HashMap.lookup p s of
Nothing -> (True, HashMap.insert p () s)
Just{} -> (False, s)
auth <- lift $ find (KnownPeerKey p) id <&> isJust
unless auth do
debug $ "peer " <+> pretty p <+> "not authorized (yet?)"
if not (avail && auth)
then n
else do
r <- m
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
pure r
-- NOTE: dangerous! if called in
-- wrong place/wrong time,
-- it may cause a drastical
-- download speed degradation
dismissPeer :: (MyPeer e, MonadIO m)
=> Peer e
-> BlockDownloadM e m ()
dismissPeer p = do
busy <- asks (view peerBusy)
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
getBlockForDownload = do
q <- asks (view downloadQ)
inq <- asks (view blockInQ)
h <- liftIO $ atomically $ readTQueue q
liftIO $ atomically $ modifyTVar inq (HashMap.delete h)
pure h
withBlockForDownload :: MonadIO m
=> (Hash HbSync -> BlockDownloadM e m ())
-> BlockDownloadM e m ()
withBlockForDownload action = do
cache <- asks (view blockPostponed)
h <- getBlockForDownload
s <- getBlockState h
let postpone = toTimeSpec @'Seconds 10 -- FIXME: remove-hardcode
case view bsState s of
Postpone -> do
debug $ "posponed:" <+> pretty h
liftIO $ Cache.insert' cache (Just postpone) h ()
_ -> action h
addBlockInfo :: (MonadIO m, MyPeer e)
=> Peer e
-> Hash HbSync
-> Integer
-> BlockDownloadM e m ()
addBlockInfo pip h size = do
-- debug $ "addBlockInfo" <+> pretty h <+> pretty pip <+> pretty size
tv <- asks (view blockPeers)
let mySize = HashMap.singleton pip size
liftIO $ atomically
$ modifyTVar tv (HashMap.insertWith (<>) h mySize)
getPeersForBlock :: (MonadIO m, MyPeer e)
=> Hash HbSync
-> BlockDownloadM e m [(Peer e, Integer)]
getPeersForBlock h = do
tv <- asks (view blockPeers)
liftIO $ readTVarIO tv <&> foldMap HashMap.toList
. maybeToList
. HashMap.lookup h
processBlock :: forall e m . ( MonadIO m
, HasStorage m
, Block ByteString ~ ByteString
)
=> Hash HbSync
-> BlockDownloadM e m ()
processBlock h = do
sto <- lift getStorage
bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h)
-- FIXME: если блок нашёлся, то удаляем его из wip
when (isJust bt) (removeFromWip h)
case bt of
Nothing -> addDownload h
Just (AnnRef{}) -> pure ()
Just (Merkle{}) -> do
debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h
walkMerkle h (liftIO . getBlock sto) $ \(hrr :: Either (Hash HbSync) [HashRef]) -> do
case hrr of
Left hx -> addDownload hx
Right hr -> do
for_ hr $ \(HashRef blk) -> do
-- debug $ pretty blk
here <- liftIO (hasBlock sto blk) <&> isJust
if here then do
pure ()
-- debug $ "block" <+> pretty blk <+> "is already here"
-- unless (h == blk) do
-- processBlock blk -- NOTE: хуже не стало
-- FIXME: fugure out if it's really required
else do
addDownload blk
Just (Blob{}) -> do
pure ()
-- NOTE: if peer does not have a block, it may
-- cause to an unpleasant timeouts
-- So make sure that this peer really answered to
-- GetBlockSize request
downloadFromWithPeer :: forall e m . ( MyPeer e
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
, PeerMessaging e
, HasProtocol e (BlockInfo e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Block ByteString ~ ByteString
, HasStorage m
)
=> Peer e
-> Integer
-> Hash HbSync
-> BlockDownloadM e m ()
downloadFromWithPeer peer thisBkSize h = do
npi <- newPeerInfo
pinfo <- lift $ fetch True npi (PeerInfoKey peer) id
sto <- lift getStorage
coo <- genCookie (peer,h)
let key = DownloadSessionKey (peer, coo)
let chusz = defChunkSize
dnwld <- newBlockDownload h
let chuQ = view sBlockChunks dnwld
let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral thisBkSize)
$ dnwld
lift $ update @e new key id
let burstSizeT = view peerBurst pinfo
burstSize <- liftIO $ readTVarIO burstSizeT
let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)]
let chunkNums = [ 0 .. pred (length offsets) ]
let bursts = calcBursts burstSize chunkNums
-- debug $ "bursts: " <+> pretty bursts
r <- liftIO $ newTVarIO (mempty :: IntMap ByteString)
rq <- liftIO newTQueueIO
for_ bursts $ liftIO . atomically . writeTQueue rq
fix \next -> do
burst <- liftIO $ atomically $ tryReadTQueue rq
case burst of
Just (i,chunksN) -> do
let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN)
lift $ request peer (BlockChunks @e coo req)
-- TODO: here wait for all requested chunks!
-- FIXME: it may blocks forever, so must be timeout and retry
catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty )
( replicateM chunksN
$ atomically
$ readTQueue chuQ )
)
if not (null catched) then do
liftIO $ atomically do
modifyTVar (view peerDownloaded pinfo) (+chunksN)
writeTVar (view peerPingFailed pinfo) 0
else do
-- liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
updatePeerInfo True pinfo
newBurst' <- liftIO $ readTVarIO burstSizeT
let newBurst = max defBurst $ floor (realToFrac newBurst' * 0.5 )
liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN)
let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ]
liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
debug $ "new burst: " <+> pretty newBurst
debug $ "missed chunks for request" <+> pretty (i,chunksN)
for_ chuchu $ liftIO . atomically . writeTQueue rq
for_ catched $ \(num,bs) -> do
liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs)
next
Nothing -> do
sz <- liftIO $ readTVarIO r <&> IntMap.size
if sz == length offsets then do
pieces <- liftIO $ readTVarIO r <&> IntMap.elems
let block = mconcat pieces
let h1 = hashObject @HbSync block
if h1 == h then do
-- debug "PROCESS BLOCK"
lift $ expire @e key
void $ liftIO $ putBlock sto block
void $ processBlock h
else do
debug "HASH NOT MATCH"
debug "MAYBE THAT PEER IS JERK"
else do
debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS"
got <- liftIO $ readTVarIO r <&> IntMap.keysSet
let need = IntSet.fromList (fmap fromIntegral chunkNums)
let missed = IntSet.toList $ need `IntSet.difference` got
-- normally this should not happen
-- however, let's try do download the tails
-- by one chunk a time
for_ missed $ \n -> do
liftIO $ atomically $ writeTQueue rq (n,1)
instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where
getPeerLocator = lift getPeerLocator
updatePeerInfo :: MonadIO m => Bool -> PeerInfo e -> m ()
updatePeerInfo onError pinfo = do
t1 <- liftIO $ getTime MonotonicCoarse
void $ liftIO $ atomically $ do
bu <- readTVar (view peerBurst pinfo)
errs <- readTVar (view peerErrors pinfo)
errsLast <- readTVar (view peerErrorsLast pinfo)
t0 <- readTVar (view peerLastWatched pinfo)
down <- readTVar (view peerDownloaded pinfo)
downLast <- readTVar (view peerDownloadedLast pinfo)
let dE = realToFrac $ max 0 (errs - errsLast)
let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9
let eps = floor (dE / dT)
let bu1 = if (down - downLast > 0 || onError) then
max 1 $ min defBurstMax
$ if eps == 0 then
ceiling $ realToFrac bu * 1.10 -- FIXME: to defaults
else
floor $ realToFrac bu * 0.55
else
max defBurst $ floor (realToFrac bu * 0.75)
writeTVar (view peerErrorsLast pinfo) errs
writeTVar (view peerLastWatched pinfo) t1
writeTVar (view peerErrorsPerSec pinfo) eps
writeTVar (view peerBurst pinfo) bu1
writeTVar (view peerDownloadedLast pinfo) down
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockAnnounce e) m
, HasProtocol e (BlockInfo e)
, HasProtocol e (BlockAnnounce e)
, HasProtocol e (BlockChunks e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m
, EventListener e (PeerHandshake e) m
, EventEmitter e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Sessions e (KnownPeer e) m
, PeerSessionKey e (PeerInfo e)
, HasStorage m
, Pretty (Peer e)
, Block ByteString ~ ByteString
, PeerMessaging e
)
=> DownloadEnv e -> m ()
blockDownloadLoop env0 = do
e <- ask
stor <- getStorage
let blks = mempty
pl <- getPeerLocator @e
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 2
pee <- knownPeers @e pl
npi <- newPeerInfo
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo False pinfo
void $ liftIO $ async $ withPeerM e $ withDownload env0 (tossPostponed e)
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 20
pee <- knownPeers @e pl
npi <- newPeerInfo
debug $ "known peers" <+> pretty pee
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
burst <- liftIO $ readTVarIO (view peerBurst pinfo)
errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo)
debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst
<+> "errors:" <+> pretty errors
pure ()
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 5 -- FIXME: put to defaults
-- we need to show download stats
tinfo <- asks (view blockPeers)
binfo <- liftIO $ readTVarIO tinfo
wip <- asks (view blockWip)
liftIO $ Cache.purgeExpired wip
aliveWip <- Set.fromList <$> liftIO (Cache.keys wip)
let alive = HashMap.fromList [ (h,i)
| (h,i) <- HashMap.toList binfo
, Set.member h aliveWip
]
liftIO $ atomically $ writeTVar tinfo alive
debug $ "maintain blocks wip" <+> pretty (Set.size aliveWip)
withDownload env0 do
env <- ask
let again h = do
-- debug $ "retrying block: " <+> pretty h
withPeerM e $ withDownload env (addDownload h)
mapM_ processBlock blks
fix \next -> do
withBlockForDownload $ \h -> do
here <- liftIO $ hasBlock stor h <&> isJust
unless here do
peers <- getPeersForBlock h
when (null peers) $ do
lift do -- in PeerM
subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
withDownload env (addBlockInfo p1 hx s)
pips <- knownPeers @e pl
for_ pips $ \pip -> do
auth <- find (KnownPeerKey pip) id <&> isJust
when auth $ request pip (GetBlockSize @e h) -- FIXME: request only known peers
-- move this to peer locator
p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster
let withAllShit f = withPeerM e $ withDownload env f
maybe1 p0 (again h) $ \(p1,size) -> do
st <- getBlockState h
setBlockState h (set bsState Downloading st)
withFreePeer p1 (again h) $
liftIO do
re <- race ( pause defBlockWaitMax ) $
withAllShit $ downloadFromWithPeer p1 size h
case re of
Left{} -> withAllShit (again h)
Right{} -> withAllShit (processBlock h)
next
tossPostponed :: forall e m . ( MonadIO m
, EventListener e (PeerHandshake e) m
, MyPeer e
)
=> PeerEnv e
-> BlockDownloadM e m ()
tossPostponed penv = do
env <- ask
waitQ <- liftIO $ newTBQueueIO 1
busy <- liftIO $ newTVarIO False
cache <- asks (view blockPostponed)
lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do
cant <- liftIO $ readTVarIO busy
unless cant $ do
debug "AnyKnownPeerEventKey"
mt <- liftIO $ atomically $ isEmptyTBQueue waitQ
when mt do
liftIO $ atomically $ writeTBQueue waitQ ()
forever do
r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTBQueue waitQ )
void $ liftIO $ atomically $ flushTBQueue waitQ
liftIO $ atomically $ writeTVar busy True
void $ liftIO $ async $ do
pause @'Seconds 5
atomically $ writeTVar busy False
let allBack = either (const False) (const True) r
blks <- liftIO $ Cache.toList cache
w <- calcWaitTime
debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "")
<+> pretty (length blks)
for_ blks $ \case
(k,_,Nothing) | not allBack -> pure ()
| otherwise -> pushBack cache k
(k,_,Just{}) -> pushBack cache k
where
pushBack cache k = do
w <- calcWaitTime
liftIO $ Cache.delete cache k
st <- getBlockState k
t0 <- liftIO $ getTime MonotonicCoarse
setBlockState k ( set bsStart t0
. set bsState Initial
. set bsWipTo w
$ st
)
debug $ "returning block to downloads" <+> pretty k
addDownload k
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)
-- So don't be confused with types
--
mkAdapter :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockChunks e)
, Hashable (SessionKey e (BlockChunks e))
, Sessions e (BlockChunks e) (ResponseM e m)
, Typeable (SessionKey e (BlockChunks e))
, EventEmitter e (BlockChunks e) m
, Pretty (Peer e)
, Block ByteString ~ ByteString
)
=> m (BlockChunksI e (ResponseM e m ))
mkAdapter = do
storage <- getStorage
pure $
BlockChunksI
{ blkSize = liftIO . hasBlock storage
, blkChunk = \h o s -> liftIO (getChunk storage h o s)
, blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash)
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = DownloadSessionKey (p,c)
dwnld <- MaybeT $ find cKey (view sBlockChunks)
liftIO $ atomically $ writeTQueue dwnld (n, bs)
}