new download

This commit is contained in:
Dmitry Zuikov 2023-02-22 12:39:04 +03:00
parent ec2785b1e2
commit 819ac05397
8 changed files with 215 additions and 139 deletions

View File

@ -422,8 +422,8 @@ runPeerM env f = do
void $ liftIO $ stopPipeline de
liftIO $ mapM_ cancel (as <> [sw])
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m ()
withPeerM env action = void $ runReaderT (fromPeerM action) env
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m a
withPeerM env action = runReaderT (fromPeerM action) env
runProto :: forall e m . ( MonadIO m
, HasOwnPeer e m

View File

@ -10,13 +10,13 @@ defMaxDatagramRPC :: Int
defMaxDatagramRPC = 4096
defMessageQueueSize :: Integral a => a
defMessageQueueSize = 65536
defMessageQueueSize = 65536*10
defBurst :: Integral a => a
defBurst = 4
defBurst = 8
defBurstMax :: Integral a => a
defBurstMax = 32
defBurstMax = 64
-- defChunkSize :: Integer
defChunkSize :: Integral a => a
@ -32,7 +32,7 @@ defPipelineSize :: Int
defPipelineSize = 16000
defBlockDownloadQ :: Integral a => a
defBlockDownloadQ = 65536
defBlockDownloadQ = 65536*10
defBlockDownloadThreshold :: Integral a => a
defBlockDownloadThreshold = 2
@ -61,11 +61,11 @@ defBlockInfoTimeout = 2
-- how much time wait for block from peer?
defBlockWaitMax :: Timeout 'Seconds
defBlockWaitMax = 1.5 :: Timeout 'Seconds
defBlockWaitMax = 1 :: Timeout 'Seconds
-- how much time wait for block from peer?
defChunkWaitMax :: Timeout 'Seconds
defChunkWaitMax = 0.15 :: Timeout 'Seconds
defChunkWaitMax = 0.5 :: Timeout 'Seconds
defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 30 -- FIXME: only for debug!
@ -76,5 +76,16 @@ defPeerAnnounceTime = 120
defPexMaxPeers :: Int
defPexMaxPeers = 50
defDownloadFails :: Int
defDownloadFails = 100
-- TODO: peer-does-not-have-a-block-ok
-- Это нормально, когда у пира нет блока.
-- У него может не быть каких-то блоков,
-- а какие-то могут быть. Нужно более умный
-- алгоритм, чем бан пира за отсутствие блока.
defUsefulLimit :: Double
defUsefulLimit = 0.25

View File

@ -70,7 +70,7 @@ instance MonadIO m => IsPeerAddr UDP m where
data MessagingUDP =
MessagingUDP
{ listenAddr :: SockAddr
, sink :: TBQueue (From UDP, ByteString)
, sink :: TQueue (From UDP, ByteString)
, inbox :: TQueue (To UDP, ByteString)
, sock :: TVar Socket
, mcast :: Bool
@ -91,7 +91,7 @@ newMessagingUDPMulticast s = runMaybeT $ do
a <- liftIO $ getSocketName so
liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize
liftIO $ MessagingUDP a <$> Q0.newTQueueIO
<*> Q0.newTQueueIO
<*> newTVarIO so
<*> pure True
@ -109,7 +109,7 @@ newMessagingUDP reuse saddr =
when reuse $ do
liftIO $ setSocketOption so ReuseAddr 1
liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize
liftIO $ MessagingUDP a <$> Q0.newTQueueIO
<*> Q0.newTQueueIO
<*> newTVarIO so
<*> pure False
@ -119,7 +119,7 @@ newMessagingUDP reuse saddr =
so <- liftIO $ socket AF_INET Datagram defaultProtocol
sa <- liftIO $ getSocketName so
liftIO $ Just <$> ( MessagingUDP sa <$> Q.newTBQueueIO defMessageQueueSize
liftIO $ Just <$> ( MessagingUDP sa <$> Q0.newTQueueIO
<*> Q0.newTQueueIO
<*> newTVarIO so
<*> pure False
@ -143,7 +143,7 @@ udpWorker env tso = do
-- pause ( 10 :: Timeout 'Seconds )
(msg, from) <- recvFrom so defMaxDatagram
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
liftIO $ atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg)
liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg)
sndLoop <- async $ forever $ do
pause ( 10 :: Timeout 'Seconds )
@ -182,5 +182,5 @@ instance Messaging MessagingUDP UDP ByteString where
-- (msg, from) <- recvFrom so defMaxDatagram
-- pure [(From (PeerUDP from), LBS.fromStrict msg)]
liftIO $ atomically $ Q.readTBQueue (sink bus) <&> L.singleton
liftIO $ atomically $ Q0.readTQueue (sink bus) <&> L.singleton

View File

@ -56,7 +56,7 @@ instance HasProtocol UDP (BlockInfo UDP) where
-- FIXME: requestMinPeriod-breaks-fast-block-download
--
requestPeriodLim = ReqLimPerMessage 10
-- requestPeriodLim = ReqLimPerMessage 0.5
instance HasProtocol UDP (BlockChunks UDP) where
type instance ProtocolId (BlockChunks UDP) = 2
@ -79,7 +79,7 @@ instance HasProtocol UDP (PeerHandshake UDP) where
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
requestPeriodLim = ReqLimPerProto 10
requestPeriodLim = ReqLimPerProto 2
instance HasProtocol UDP (PeerAnnounce UDP) where
type instance ProtocolId (PeerAnnounce UDP) = 5

View File

@ -1,5 +1,6 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
module BlockDownload where
import HBS2.Actors.Peer
@ -48,49 +49,9 @@ import System.Random.Shuffle
import Type.Reflection
none :: forall m . Monad m => m ()
none = pure ()
withFreePeer :: forall e m .
( MyPeer e
, MonadIO m
, Sessions e (KnownPeer e) m
)
=> Peer e
-> BlockDownloadM e m ()
-> BlockDownloadM e m ()
-> BlockDownloadM e m ()
withFreePeer p n m = do
busy <- asks (view peerBusy)
avail <- liftIO $ atomically
$ stateTVar busy $
\s -> case HashMap.lookup p s of
Nothing -> (True, HashMap.insert p () s)
Just{} -> (False, s)
auth <- lift $ find (KnownPeerKey p) id <&> isJust
unless auth do
debug $ "peer " <+> pretty p <+> "not authorized (yet?)"
if not (avail && auth)
then n
else do
r <- m
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
pure r
-- NOTE: dangerous! if called in
-- wrong place/wrong time,
-- it may cause a drastical
-- download speed degradation
dismissPeer :: (MyPeer e, MonadIO m)
=> Peer e
-> BlockDownloadM e m ()
dismissPeer p = do
busy <- asks (view peerBusy)
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
getBlockForDownload = do
@ -212,20 +173,22 @@ processBlock h = do
-- So make sure that this peer really answered to
-- GetBlockSize request
downloadFromWithPeer :: forall e m . ( MyPeer e
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
, PeerMessaging e
, HasProtocol e (BlockInfo e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Block ByteString ~ ByteString
, HasStorage m
)
type DownloadFromPeerStuff e m = ( MyPeer e
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
, PeerMessaging e
, HasProtocol e (BlockInfo e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Block ByteString ~ ByteString
, HasStorage m
)
downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m
=> Peer e
-> Integer
-> Hash HbSync
@ -258,9 +221,9 @@ downloadFromWithPeer peer thisBkSize h = do
let bursts = calcBursts burstSize chunkNums
-- debug $ "bursts: " <+> pretty bursts
let w = max defChunkWaitMax $ realToFrac (toNanoSeconds defBlockWaitMax) / realToFrac (length bursts) / 1e9 * 2
let burstTime = min defBlockWaitMax (0.8 * realToFrac burstSize * defChunkWaitMax)
let burstTime = realToFrac w :: Timeout 'Seconds -- defChunkWaitMax -- min defBlockWaitMax (0.8 * realToFrac burstSize * defChunkWaitMax)
r <- liftIO $ newTVarIO (mempty :: IntMap ByteString)
rq <- liftIO newTQueueIO
@ -369,25 +332,26 @@ updatePeerInfo onError pinfo = do
t0 <- readTVar (view peerLastWatched pinfo)
down <- readTVar (view peerDownloaded pinfo)
downLast <- readTVar (view peerDownloadedLast pinfo)
-- downFail <- readTVar (view peerDownloadFail pinfo)
-- downBlk <- readTVar (view peerDownloadedBlk pinfo)
let dE = realToFrac $ max 0 (errs - errsLast)
let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9
let eps = floor (dE / dT)
let win = min 10 $ defBurstMax - defBurst - 2
let win = min 10 $ defBurstMax - defBurst - 2
when (down - downLast > 0 || onError) do
(bu1, bus) <- if eps == 0 then do
(bu1, bus) <- if eps == 0 && not onError then do
let bmm = fromMaybe defBurstMax buMax
let buN = min bmm (ceiling $ (realToFrac bu * 1.05))
let buN = min bmm (ceiling (realToFrac bu * 1.05))
pure (buN, trimUp win $ IntSet.insert buN buSet)
else do
let buM = headMay $ drop 1 $ IntSet.toDescList buSet
let buM = headMay $ drop 2 $ IntSet.toDescList buSet
writeTVar (view peerBurstMax pinfo) buM
-- let s = IntSet.size buSet
let buN = headDef defBurst $ drop 2 $ IntSet.toDescList buSet
let buN = headDef defBurst $ drop 8 $ IntSet.toDescList buSet
pure (buN, trimDown win $ IntSet.insert buN buSet)
@ -397,7 +361,7 @@ updatePeerInfo onError pinfo = do
writeTVar (view peerBurst pinfo) bu1
writeTVar (view peerBurstSet pinfo) bus
writeTVar (view peerDownloadedLast pinfo) down
-- writeTVar (view peerUsefulness pinfo) usefulN
where
trimUp n s | IntSet.size s >= n = IntSet.deleteMin s
@ -438,6 +402,29 @@ blockDownloadLoop env0 = do
pl <- getPeerLocator @e
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 5
debug "I'm a peer maintaining thread"
pee <- knownPeers @e pl
for_ pee $ \p -> do
pinfo' <- find (PeerInfoKey p) id
maybe1 pinfo' none $ \pinfo -> do
fails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
when (fails >= defDownloadFails) do
warn $ "peer" <+> pretty p <+> "has too many failures:" <+> pretty fails
here <- withDownload env0 $ hasPeerThread p
unless here do
debug $ "peer" <+> pretty p <+> "does not have a thread"
runPeer <- liftIO $ async $ liftIO (withPeerM e $ withDownload env0 (peerDownloadLoop p))
withDownload env0 $ newPeerThread p runPeer
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 30
@ -450,12 +437,11 @@ blockDownloadLoop env0 = do
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 1
pause @'Seconds 2
pee <- knownPeers @e pl
npi <- newPeerInfo
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo False pinfo
@ -476,9 +462,14 @@ blockDownloadLoop env0 = do
burst <- liftIO $ readTVarIO (view peerBurst pinfo)
buM <- liftIO $ readTVarIO (view peerBurstMax pinfo)
errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo)
downFails <- liftIO $ readTVarIO (view peerDownloadFail pinfo)
down <- liftIO $ readTVarIO (view peerDownloadedBlk pinfo)
useful <- liftIO $ readTVarIO (view peerUsefulness pinfo)
debug $ "peer" <+> pretty p <+> "burst:" <+> pretty burst
<+> "burst-max:" <+> pretty buM
<+> "errors:" <+> pretty errors
<+> "errors:" <+> pretty (downFails + errors)
<+> "down:" <+> pretty down
<+> "useful:" <+> pretty useful
pure ()
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
@ -506,57 +497,89 @@ blockDownloadLoop env0 = do
env <- ask
let again h = do
-- debug $ "retrying block: " <+> pretty h
withPeerM e $ withDownload env (addDownload h)
mapM_ processBlock blks
fix \next -> do
withBlockForDownload $ \h -> do
here <- liftIO $ hasBlock stor h <&> isJust
unless here do
peers <- getPeersForBlock h
when (null peers) $ do
lift do -- in PeerM
subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
withDownload env (addBlockInfo p1 hx s)
pips <- knownPeers @e pl
for_ pips $ \pip -> do
auth <- find (KnownPeerKey pip) id <&> isJust
when auth $ request pip (GetBlockSize @e h) -- FIXME: request only known peers
-- move this to peer locator
p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster
let withAllShit f = withPeerM e $ withDownload env f
maybe1 p0 (again h) $ \(p1,size) -> do
st <- getBlockState h
setBlockState h (set bsState Downloading st)
withFreePeer p1 (again h) $
liftIO do
re <- race ( pause defBlockWaitMax ) $
withAllShit $ downloadFromWithPeer p1 size h
case re of
Left{} -> withAllShit (again h)
Right{} -> withAllShit (processBlock h)
pause @'Seconds 30
debug "I'm a download loop. I don't do anything anymore"
next
peerDownloadLoop :: forall e m . ( MyPeer e
, Sessions e (KnownPeer e) m
, Request e (BlockInfo e) m
, EventListener e (BlockInfo e) m
, DownloadFromPeerStuff e m
, m ~ PeerM e IO
) => Peer e -> BlockDownloadM e m ()
peerDownloadLoop peer = forever do
sto <- lift getStorage
auth <- lift $ find (KnownPeerKey peer) id <&> isJust
pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail)
maybe1 pinfo' none $ \pinfo -> do
let downFail = view peerDownloadFail pinfo
let downBlk = view peerDownloadedBlk pinfo
failNum <- liftIO $ readTVarIO downFail
-- FIXME: failNum-to-defaults
let notFailed = failNum < defDownloadFails
-- FIXME: better-avoiding-busyloop
-- unless notFailed do
-- pause @'Seconds 1
when (failNum > 5) do
pause @'Seconds defBlockWaitMax
when auth do
withBlockForDownload $ \h -> do
e <- lift ask
ee <- ask
st <- getBlockState h
setBlockState h (set bsState Downloading st)
r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do
blksq <- liftIO newTQueueIO
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do
liftIO $ atomically $ writeTQueue blksq s
request peer (GetBlockSize @e h)
liftIO $ atomically $ readTQueue blksq
case r1 of
Left{} -> do
liftIO $ atomically $ modifyTVar downFail succ
addDownload h
Right size -> do
r2 <- liftIO $ race ( pause defBlockWaitMax )
$ withPeerM e
$ withDownload ee
$ downloadFromWithPeer peer size h
case r2 of
Left{} -> do
liftIO $ atomically $ modifyTVar downFail succ
addDownload h
-- Right Nothing -> do
-- liftIO $ atomically $ modifyTVar downFail succ
-- addDownload h
Right{} -> do
processBlock h
liftIO $ atomically do
writeTVar downFail 0
modifyTVar downBlk succ
pure ()
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)
@ -582,6 +605,12 @@ mkAdapter = do
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = DownloadSessionKey (p,c)
dodo <- lift $ find cKey (view sBlockChunks)
unless (isJust dodo) $ do
debug $ "session lost for peer !" <+> pretty p
dwnld <- MaybeT $ find cKey (view sBlockChunks)
liftIO $ atomically $ writeTQueue dwnld (n, bs)
}

View File

@ -40,6 +40,9 @@ data PeerInfo e =
, _peerDownloaded :: TVar Int
, _peerDownloadedLast :: TVar Int
, _peerPingFailed :: TVar Int
, _peerDownloadedBlk :: TVar Int
, _peerDownloadFail :: TVar Int
, _peerUsefulness :: TVar Double
}
deriving stock (Generic,Typeable)
@ -58,6 +61,9 @@ newPeerInfo = liftIO do
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
type instance SessionData e (PeerInfo e) = PeerInfo e
@ -98,7 +104,7 @@ pexLoop = do
for_ peers sendPeerExchangeGet
pause @'Seconds 60 -- FIXME: defaults
pause @'Seconds 60 -- FIXME: defaults
peerPingLoop :: forall e m . ( HasPeerLocator e m
, HasPeer e
@ -140,13 +146,15 @@ peerPingLoop = do
for_ pips $ \p -> do
npi <- newPeerInfo
pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed)
pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail)
liftIO $ atomically $ modifyTVar pfails succ
sendPing @e p
pause @'Seconds 1 -- NOTE: it's okay?
fnum <- liftIO $ readTVarIO pfails
fdown <- liftIO $ readTVarIO pdownfails
when (fnum > 3) do -- FIXME: hardcode!
when (fnum > 10) do -- FIXME: hardcode!
warn $ "removing peer" <+> pretty p <+> "for not responding to our pings"
delPeers pl [p]
expire (PeerInfoKey p)

View File

@ -439,12 +439,19 @@ runPeer opts = Exception.handle myException $ do
addPeers pl [p]
-- TODO: better-handling-for-new-peers
npi <- newPeerInfo
pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed)
liftIO $ atomically $ writeTVar pfails 0
debug $ "Got authorized peer!" <+> pretty p
<+> pretty (AsBase58 (view peerSignKey d))
here <- find @e (KnownPeerKey p) id <&> isJust
unless here do
pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed)
pdownfails <- fetch True npi (PeerInfoKey p) (view peerDownloadFail)
liftIO $ atomically $ writeTVar pfails 0
liftIO $ atomically $ writeTVar pdownfails 0
debug $ "Got authorized peer!" <+> pretty p
<+> pretty (AsBase58 (view peerSignKey d))
void $ liftIO $ async $ withPeerM env do
pause @'Seconds 1
@ -535,9 +542,6 @@ runPeer opts = Exception.handle myException $ do
| otherwise -> do
debug "announce from a known peer"
debug "preparing to dowload shit"
withDownload denv $ do
processBlock h

View File

@ -15,6 +15,7 @@ import HBS2.Storage
import HBS2.System.Logger.Simple
import HBS2.Net.Messaging.UDP (UDP)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString)
@ -78,6 +79,15 @@ data BlockState =
makeLenses 'BlockState
data PeerTask e = DoDownload
data PeerThread e =
PeerThread
{ _peerThreadAsync :: Async ()
, _peerThreadMailbox :: TQueue (PeerTask e)
}
data DownloadEnv e =
DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync)
@ -87,6 +97,7 @@ data DownloadEnv e =
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
, _blockPostponed :: Cache (Hash HbSync) ()
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
, _peerThreads :: TVar (HashMap (Peer e) (PeerThread e))
}
makeLenses 'DownloadEnv
@ -103,6 +114,7 @@ newDownloadEnv = liftIO do
<*> newTVarIO mempty
<*> Cache.newCache Nothing
<*> newTVarIO mempty
<*> newTVarIO mempty
newtype BlockDownloadM e m a =
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
@ -181,7 +193,7 @@ addDownload h = do
tinq <- asks (view blockInQ)
doAdd <- liftIO $ atomically $ stateTVar tinq
doAdd <- do liftIO $ atomically $ stateTVar tinq
\hm -> case HashMap.lookup h hm of
Nothing -> (True, HashMap.insert h () hm)
Just{} -> (False, HashMap.insert h () hm)
@ -205,3 +217,15 @@ removeFromWip h = do
liftIO $ Cache.delete po h
liftIO $ atomically $ modifyTVar' st (HashMap.delete h)
hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool
hasPeerThread p = do
threads <- asks (view peerThreads)
liftIO $ readTVarIO threads <&> HashMap.member p
newPeerThread :: (MyPeer e, MonadIO m) => Peer e -> Async () -> BlockDownloadM e m ()
newPeerThread p m = do
q <- liftIO newTQueueIO
let pt = PeerThread m q
threads <- asks (view peerThreads)
liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt