udp-download-fix

This commit is contained in:
Dmitry Zuikov 2023-04-03 11:25:54 +03:00
parent c687c3654b
commit 098a748e7e
4 changed files with 126 additions and 35 deletions

View File

@ -1,2 +1,2 @@
(fixme-set "workflow" "test" "8vUEBAHeh9")
(fixme-set "workflow" "test" "2HmuD6jV8H")

View File

@ -70,7 +70,7 @@ defBlockWipTimeout :: TimeSpec
defBlockWipTimeout = defCookieTimeout
defBlockInfoTimeout :: Timeout 'Seconds
defBlockInfoTimeout = 20
defBlockInfoTimeout = 5
defBlockInfoTimeoutSpec :: TimeSpec
defBlockInfoTimeoutSpec = toTimeSpec defBlockInfoTimeout

View File

@ -569,9 +569,6 @@ peerDownloadLoop :: forall e m . ( MyPeer e
) => Peer e -> BlockDownloadM e m ()
peerDownloadLoop peer = do
sizeCache <- liftIO $ Cache.newCache @_ @Integer (Just defBlockSizeCacheTime)
noBlock <- liftIO $ Cache.newCache (Just defBlockBanTime)
pe <- lift ask
e <- ask
@ -583,8 +580,8 @@ peerDownloadLoop peer = do
subscribe @e (BlockSizeEventKey h) $ \case
BlockSizeEvent (p1,_,s) -> do
when (p1 == peer) do
liftIO $ Cache.insert sizeCache h s
liftIO $ atomically $ writeTQueue q (Just s)
onBlockSize brains peer h s
NoBlockEvent{} -> do
-- TODO: ban-block-for-some-seconds
@ -610,6 +607,7 @@ peerDownloadLoop peer = do
trace $ pretty peer <+> "block" <+> pretty h <+> "is already here"
processBlock h
else do
lift $ onBlockDownloadAttempt brains peer h
let downFail = view peerDownloadFail pinfo
let downBlk = view peerDownloadedBlk pinfo
@ -640,10 +638,6 @@ peerDownloadLoop peer = do
let thenNext m = m >> next
liftIO do
Cache.purgeExpired sizeCache
Cache.purgeExpired noBlock
npi <- newPeerInfo
auth' <- lift $ find (KnownPeerKey peer) id
@ -680,39 +674,24 @@ peerDownloadLoop peer = do
liftIO $ atomically $ writeTVar idle 0
lift $ onBlockDownloadAttempt brains peer h
trace $ "start download block" <+> pretty peer <+> pretty h
mbSize <- liftIO $ Cache.lookup sizeCache h
noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust
mbSize2 <- blockSize brains peer h
case mbSize of
case mbSize2 of
Just size -> do
trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size
tryDownload pinfo h size
Nothing | noBlk -> do
trace $ pretty peer <+> "does not have block" <+> pretty h
addDownload mzero h
Nothing -> do
r <- doBlockSizeRequest h
case r of
Left{} -> failedDownload peer h
Right Nothing -> do
here <- liftIO $ Cache.lookup noBlock h <&> isJust
unless here $
liftIO $ Cache.insert noBlock h ()
addDownload mzero h
Right (Just s) -> do
(Right (Just s)) -> do
tryDownload pinfo h s
pure ()
_ -> pure ()
warnExit
void $ delPeerThreadData peer

View File

@ -26,11 +26,21 @@ import Database.SQLite.Simple
import Database.SQLite.Simple.FromField
import System.Random (randomRIO)
import Data.Word
import Data.Either
class HasBrains e a where
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
onBlockSize :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> Integer
-> m ()
onBlockDownloadAttempt :: ( MonadIO m
, IsPeerAddr e m
)
@ -73,12 +83,23 @@ class HasBrains e a where
-> Hash HbSync
-> m [PeerAddr e]
blockSize :: forall m . MonadIO m
=> a
-> Peer e
-> Hash HbSync
-> m (Maybe Integer)
blockSize _ _ _ = pure Nothing
type NoBrains = ()
instance Pretty (Peer e) => HasBrains e NoBrains where
onKnownPeers _ ps = pure ()
onBlockSize _ _ _ _ = do
pure ()
onBlockDownloadAttempt _ p h = do
pure ()
@ -100,6 +121,7 @@ data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where
onKnownPeers (SomeBrains a) = onKnownPeers a
onBlockSize (SomeBrains a) = onBlockSize a
onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a
onBlockDownloaded (SomeBrains a) = onBlockDownloaded a
onBlockPostponed (SomeBrains a) = onBlockPostponed @e a
@ -107,6 +129,9 @@ instance HasBrains e (SomeBrains e) where
shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a
shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a
advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a
blockSize (SomeBrains a) = blockSize @e a
newtype CommitCmd = CommitCmd { onCommited :: IO () }
data BasicBrains e =
BasicBrains
@ -115,6 +140,7 @@ data BasicBrains e =
, _brainsExpire :: Cache (Hash HbSync) ()
, _brainsDb :: Connection
, _brainsPipeline :: TQueue (IO ())
, _brainsCommit :: TQueue CommitCmd
}
makeLenses 'BasicBrains
@ -133,6 +159,14 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
let tv = view brainsPeers br
liftIO $ atomically $ writeTVar tv ps
onBlockSize b p h size = do
updateOP b $ insertSize b p h size
commitNow b True
-- FIXME: wait-till-really-commited
sz <- liftIO $ selectBlockSize b p h
trace $ "BRAINS: onBlockSize" <+> pretty p <+> pretty h <+> pretty sz
pure ()
onBlockDownloadAttempt b peer h = do
-- trace "BRAINS: onBlockDownloadAttempt"
noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null
@ -176,11 +210,67 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
r <- liftIO $ findPeers b h
pure $ mapMaybe fromStringMay r
blockSize b p h = do
liftIO $ selectBlockSize b p h
commitNow :: forall e m . MonadIO m
=> BasicBrains e
-> Bool
-> m ()
commitNow br doWait = do
w <- liftIO newTQueueIO
let answer | doWait = do
atomically $ writeTQueue w ()
| otherwise = pure ()
liftIO $ atomically $ writeTQueue (view brainsCommit br) (CommitCmd answer)
when doWait $ liftIO do
void $ atomically $ do
readTQueue w >> flushTQueue w
updateOP :: forall e m . MonadIO m => BasicBrains e -> IO () -> m ()
updateOP br op = do
let pip = view brainsPipeline br
liftIO $ atomically $ writeTQueue pip (liftIO op)
insertSize :: forall e . Pretty (Peer e)
=> BasicBrains e
-> Peer e
-> Hash HbSync
-> Integer
-> IO ()
insertSize br p h s = do
let conn = view brainsDb br
void $ liftIO $ execute conn [qc|
insert into blocksize (block, peer, size) values (?,?,?)
on conflict (block,peer) do update set size = ?
|] (show $ pretty h, show $ pretty p, s, s)
selectBlockSize :: forall e . Pretty (Peer e)
=> BasicBrains e
-> Peer e
-> Hash HbSync
-> IO (Maybe Integer)
selectBlockSize br p h = do
let conn = view brainsDb br
liftIO $ query conn [qc|
select size
from blocksize
where
block = ? and peer = ?
limit 1
|] (show $ pretty h, show $ pretty p) <&> fmap fromOnly <&> listToMaybe
insertAncestor :: BasicBrains e
-> Hash HbSync -- ^ parent
-> Hash HbSync -- ^ child
@ -288,6 +378,7 @@ SAVEPOINT zzz1;
DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600;
DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600;
DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > 300;
RELEASE SAVEPOINT zzz1;
@ -309,7 +400,8 @@ transactional brains action = do
Right{} -> do
execute_ conn [qc|RELEASE SAVEPOINT {sp}|]
Left ( _ :: SomeException ) -> do
Left ( e :: SomeException ) -> do
err $ "BRAINS: " <+> viaShow e
execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|]
-- FIXME: eventually-close-db
@ -334,25 +426,45 @@ newBasicBrains = liftIO do
, primary key (block,peer))
|]
execute_ conn [qc|
create table if not exists blocksize
( block text not null
, peer text not null
, size int
, ts DATE DEFAULT (datetime('now','localtime'))
, primary key (block,peer))
|]
BasicBrains <$> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds)))
<*> pure conn
<*> newTQueueIO
<*> newTQueueIO
runBasicBrains :: MonadIO m => BasicBrains e -> m ()
runBasicBrains brains = do
let pip = view brainsPipeline brains
let expire = view brainsExpire brains
let commit = view brainsCommit brains
-- FIXME: async-error-handling
void $ liftIO $ async $ forever do
pause @'Seconds 5
-- transactional brains do
ewaiters <- race (pause @'Seconds 5) $ do
atomically $ do
c <- readTQueue commit
cs <- flushTQueue commit
pure (c:cs)
let waiters = fromRight mempty ewaiters & fmap onCommited
w <- atomically $ readTQueue pip
ws <- atomically $ flushTQueue pip
transactional brains (sequence_ (w:ws))
sequence_ waiters
void $ liftIO $ async $ forever do
pause @'Seconds 60