better-download-algorithm

This commit is contained in:
Dmitry Zuikov 2023-04-01 09:30:28 +03:00
parent 3f47cd102c
commit 56c0ac4bc1
19 changed files with 734 additions and 372 deletions

View File

@ -0,0 +1,7 @@
TODO: flood-protection
Если пир сидит на плохом канале и пробует скачать, у него
ничего не получается, он пробует опять и опять и в итоге
забивает весь канал. Нужно давать ему сколько-то попыток
и временно банить. Соответственно, нужен механизм
временного бана пира.

View File

@ -0,0 +1,6 @@
TODO: hbs2-peer-ban-peers-rpc
Сделать команду и всё остальное для бана пира прямо в рантайме
без перечитывания конфига.
Нужно для тестирования и выявления проблемы '[warn] lost peer auth'
и окончательного победы над ней, кроме того, ну и так пригодится.

View File

@ -0,0 +1,13 @@
TODO: new-download-test
1. Занести единственного пира в whitelist
2. Скачать большой блок данных
3. Замерять скорость/наличия провалов в скачивании
4. Удалить скачанные данные
5. Удалить whitelist
6. Удалить скачанный блок
7. Повторить скачивание
8. Замерить скорость
9. По итогам решить, срочно ли нужно
делать алгоритм выбора пиров в Brains

View File

@ -0,0 +1,4 @@
TODO: implement-recursive-block-delete
В hbs2 del -r или del --recursure
Смотрит, если это дерево --- то пытается обойти
его рекурсивно и удалить все блоки.

View File

@ -5,15 +5,24 @@ import HBS2.Hash
import HBS2.Data.Types import HBS2.Data.Types
import HBS2.Merkle import HBS2.Merkle
import HBS2.System.Logger.Simple
import Data.Foldable (for_)
import Control.Monad.Trans.Maybe
import Codec.Serialise (deserialiseOrFail) import Codec.Serialise (deserialiseOrFail)
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.Either import Data.Either
import Data.Function import Data.Function
import Data.Functor import Data.Functor
data BlobType = Merkle (Hash HbSync) import Data.Maybe
import Control.Concurrent.STM
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
data BlobType = Merkle (MTree [HashRef])
| MerkleAnn (MTreeAnn [HashRef]) | MerkleAnn (MTreeAnn [HashRef])
| AnnRef (Hash HbSync) | AnnRef AnnotatedHashRef
| SeqRef SequentialRef | SeqRef SequentialRef
| Blob (Hash HbSync) | Blob (Hash HbSync)
deriving (Show,Data) deriving (Show,Data)
@ -23,9 +32,96 @@ tryDetect :: Hash HbSync -> ByteString -> BlobType
tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob
where where
mbLink = deserialiseOrFail @AnnotatedHashRef obj >> pure (AnnRef hash) mbLink = deserialiseOrFail @AnnotatedHashRef obj <&> AnnRef
mbMerkle = deserialiseOrFail @(MTree [HashRef]) obj >> pure (Merkle hash) mbMerkle = deserialiseOrFail @(MTree [HashRef]) obj <&> Merkle
mbSeq = deserialiseOrFail @SequentialRef obj <&> SeqRef mbSeq = deserialiseOrFail @SequentialRef obj <&> SeqRef
mbAnn = deserialiseOrFail obj <&> MerkleAnn mbAnn = deserialiseOrFail obj <&> MerkleAnn
orBlob = Blob hash orBlob = Blob hash
data ScanLevel = ScanShallow | ScanDeep
-- TODO: control-nesting-level-to-avoid-abuse
-- TODO: switch-all-merkle-walks-to-deep-scan
-- TODO: asap-make-it-support-encryption
-- Передавать параметры расшифровки через тайпкласс
-- Сделать реализацию тайпкласса для MonadIO по умолчанию,
-- будет возращать блоки как есть
--
deepScan :: MonadIO m
=> ScanLevel
-> ( Hash HbSync -> m () ) -- ^ missed block handler
-> Hash HbSync -- ^ root
-> ( Hash HbSync -> m (Maybe ByteString) ) -- ^ block reading function
-> ( Hash HbSync -> m () ) -- ^ sink function
-> m ()
deepScan l miss from reader sink = do
tv <- liftIO $ newTVarIO mempty
deepScan_ tv (HashRef from)
where
deepScan_ tv item = do
here <- reader (fromHashRef item) <&> isJust
when here do
sink (fromHashRef item)
void $ runMaybeT $ do
blk <- MaybeT $ reader (fromHashRef item)
let what = tryDetect (fromHashRef item) blk
case what of
Blob{} -> pure ()
Merkle t -> do
lift $ walkTree t
MerkleAnn ann -> case _mtaCrypt ann of
NullEncryption -> do
lift $ walkTree (_mtaTree ann)
-- FIXME: ASAP-support-encryption
CryptAccessKeyNaClAsymm{} -> do
err "deepScan does not support encryption yet"
pure ()
SeqRef (SequentialRef _ (AnnotatedHashRef ann hx)) -> do
lift $ maybe1 ann (pure ()) sinkDeep
lift $ sinkDeep hx
AnnRef (AnnotatedHashRef ann hx) -> do
lift $ maybe1 ann (pure ()) sinkDeep
lift $ sinkDeep hx
where
deep = case l of
ScanDeep -> True
_ -> False
sinkDeep h = do
visited <- liftIO $ readTVarIO tv <&> HashMap.member h
unless visited do
liftIO $ atomically $ modifyTVar tv (HashMap.insert h ())
sinkDeep_ h
sinkDeep_ h | deep = deepScan_ tv h
| otherwise = walk (fromHashRef h)
stepInside = \case
Left x -> miss x
Right ( hxx :: [HashRef] ) -> do
for_ hxx sinkDeep
walkTree t = do
walkMerkleTree t reader stepInside
walk h = walkMerkle h reader stepInside

View File

@ -60,10 +60,10 @@ defBlockBanTime :: TimeSpec
defBlockBanTime = toTimeSpec defBlockBanTimeSec defBlockBanTime = toTimeSpec defBlockBanTimeSec
defBlockPostponeTime :: TimeSpec defBlockPostponeTime :: TimeSpec
defBlockPostponeTime = toTimeSpec ( 60 :: Timeout 'Seconds) defBlockPostponeTime = toTimeSpec ( 45 :: Timeout 'Seconds)
defBlockBanTimeSec :: Timeout 'Seconds defBlockBanTimeSec :: Timeout 'Seconds
defBlockBanTimeSec = 60 :: Timeout 'Seconds defBlockBanTimeSec = 30 :: Timeout 'Seconds
defBlockWipTimeout :: TimeSpec defBlockWipTimeout :: TimeSpec
defBlockWipTimeout = defCookieTimeout defBlockWipTimeout = defCookieTimeout
@ -103,4 +103,9 @@ defDownloadFails = 100
defUsefulLimit :: Double defUsefulLimit :: Double
defUsefulLimit = 0.25 defUsefulLimit = 0.25
defInterBlockDelay :: Timeout 'Seconds
defInterBlockDelay = 0.0085
defBlockReqNum :: Integral a => a
defBlockReqNum = 2

View File

@ -44,7 +44,7 @@ instance Serialise IPv6
newtype IPAddrPort e = newtype IPAddrPort e =
IPAddrPort (IP, Word16) IPAddrPort (IP, Word16)
deriving (Generic) deriving stock (Generic,Eq,Ord)
instance Serialise (IPAddrPort e) instance Serialise (IPAddrPort e)
@ -60,6 +60,13 @@ instance IsString (IPAddrPort e) where
where where
(h,p) = fromMaybe (error "no parse IPAddrPort") (getHostPort (Text.pack s)) (h,p) = fromMaybe (error "no parse IPAddrPort") (getHostPort (Text.pack s))
instance FromStringMaybe (IPAddrPort e) where
fromStringMay x = IPAddrPort <$> ( (,) <$> ip <*> fmap fromIntegral po)
where
hp = getHostPort (Text.pack x)
ip = readMay . fst =<< hp
po = snd <$> hp
getHostPort :: Text -> Maybe (String, PortNumber) getHostPort :: Text -> Maybe (String, PortNumber)
getHostPort s = parseOnly p s & either (const Nothing) Just getHostPort s = parseOnly p s & either (const Nothing) Just
where where

View File

@ -1,4 +1,5 @@
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-}
module HBS2.Net.Messaging.UDP where module HBS2.Net.Messaging.UDP where
import HBS2.Clock import HBS2.Clock
@ -58,7 +59,7 @@ instance Pretty (Peer UDP) where
makeLenses 'PeerUDP makeLenses 'PeerUDP
instance MonadIO m => IsPeerAddr UDP m where instance (FromStringMaybe (IPAddrPort UDP), MonadIO m) => IsPeerAddr UDP m where
type instance PeerAddr UDP = IPAddrPort UDP type instance PeerAddr UDP = IPAddrPort UDP
toPeerAddr p = pure $ fromString $ show $ pretty p toPeerAddr p = pure $ fromString $ show $ pretty p

View File

@ -97,64 +97,66 @@ blockChunksProto adapter (BlockChunks c p) = do
peer <- thatPeer (Proxy @(BlockChunks e)) peer <- thatPeer (Proxy @(BlockChunks e))
auth <- find (KnownPeerKey peer) id <&> isJust auth <- find (KnownPeerKey peer) id <&> isJust
when auth do
case p of case p of
BlockGetChunks h size n1 num -> do BlockGetChunks h size n1 num | auth -> do
bsz' <- blkSize adapter h bsz' <- blkSize adapter h
maybe1 bsz' (pure ()) $ \bsz -> do maybe1 bsz' (pure ()) $ \bsz -> do
let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)]
let offsets = take (fromIntegral num) $ drop (fromIntegral n1) $ zip offsets' [0..] let offsets = take (fromIntegral num) $ drop (fromIntegral n1) $ zip offsets' [0..]
-- liftIO $ print $ "sending " <+> pretty (length offsets) -- liftIO $ print $ "sending " <+> pretty (length offsets)
-- <+> "chunks for block" -- <+> "chunks for block"
-- <+> pretty h -- <+> pretty h
-- for_ offsets $ \((o,sz),i) -> deferred proto do -- for_ offsets $ \((o,sz),i) -> deferred proto do
for_ offsets $ \((o,sz),i) -> deferred proto do for_ offsets $ \((o,sz),i) -> deferred proto do
-- liftIO $ print $ "send chunk " <+> pretty i <+> pretty sz -- liftIO $ print $ "send chunk " <+> pretty i <+> pretty sz
chunk <- blkChunk adapter h o sz chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockGetAllChunks h size -> do BlockGetAllChunks h size | auth -> do
me <- ownPeer @e me <- ownPeer @e
who <- thatPeer proto who <- thatPeer proto
bsz' <- blkSize adapter h bsz' <- blkSize adapter h
maybe1 bsz' (pure ()) $ \bsz -> do maybe1 bsz' (pure ()) $ \bsz -> do
let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)]
let offsets = zip offsets' [0..] let offsets = zip offsets' [0..]
-- liftIO $ print $ "sending " <+> pretty (length offsets) -- liftIO $ print $ "sending " <+> pretty (length offsets)
-- <+> "chunks for block" -- <+> "chunks for block"
-- <+> pretty h -- <+> pretty h
for_ offsets $ \((o,sz),i) -> deferred proto do for_ offsets $ \((o,sz),i) -> deferred proto do
chunk <- blkChunk adapter h o sz chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> deferred proto do BlockChunk n bs | auth -> deferred proto do
who <- thatPeer proto who <- thatPeer proto
me <- ownPeer @e me <- ownPeer @e
h <- blkGetHash adapter (who, c) h <- blkGetHash adapter (who, c)
maybe1 h (response_ (BlockLost @e)) $ \hh -> do maybe1 h (response_ (BlockLost @e)) $ \hh -> do
void $ blkAcceptChunk adapter (c, who, hh, n, bs) void $ blkAcceptChunk adapter (c, who, hh, n, bs)
BlockNoChunks {} -> do BlockNoChunks {} -> do
-- TODO: notification -- TODO: notification
pure () pure ()
BlockLost{} -> do BlockLost{} -> do
liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR"
pure () pure ()
_ -> do
pure ()
where where
proto = Proxy @(BlockChunks e) proto = Proxy @(BlockChunks e)

View File

@ -31,6 +31,7 @@ blockSizeProto :: forall e m . ( MonadIO m
-> BlockInfo e -> BlockInfo e
-> m () -> m ()
-- FIXME: with-auth-combinator
blockSizeProto getBlockSize evHasBlock = blockSizeProto getBlockSize evHasBlock =
\case \case
GetBlockSize h -> do GetBlockSize h -> do

View File

@ -6,6 +6,7 @@ module HBS2.Net.Proto.Types
( module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types
) where ) where
import HBS2.Prelude (FromStringMaybe(..))
import HBS2.Clock import HBS2.Clock
import Data.Kind import Data.Kind
@ -43,13 +44,15 @@ class HasPeerNonce e m where
peerNonce :: m PeerNonce peerNonce :: m PeerNonce
data WithCookie e p = WithCookie (Cookie e) p data WithCookie e p = WithCookie (Cookie e) p
class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where
data family (Peer e) :: Type data family (Peer e) :: Type
class Monad m => IsPeerAddr e m where class ( FromStringMaybe (PeerAddr e)
, Eq (PeerAddr e)
, Monad m
) => IsPeerAddr e m where
type family PeerAddr e :: Type type family PeerAddr e :: Type
toPeerAddr :: Peer e -> m (PeerAddr e) toPeerAddr :: Peer e -> m (PeerAddr e)

View File

@ -4,7 +4,6 @@
module BlockDownload where module BlockDownload where
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Clock import HBS2.Clock
import HBS2.Data.Detect import HBS2.Data.Detect
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
@ -39,64 +38,56 @@ import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet import Data.IntSet qualified as IntSet
import Data.List qualified as List import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Data.Set qualified as Set
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Concurrent import System.Random (randomRIO)
import System.Random.Shuffle (shuffleM)
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e)
getBlockForDownload = do
q <- asks (view downloadQ)
inq <- asks (view blockInQ)
liftIO $ atomically $ do
h <- readTQueue q
modifyTVar' inq (HashMap.delete h)
pure h
withBlockForDownload :: forall e m . (MonadIO m, MyPeer e, HasStorage m, HasPeerLocator e m)
=> Peer e => Peer e
-> BlockDownloadM e m () -> BlockDownloadM e m (Maybe (Hash HbSync))
-> (Hash HbSync -> BlockDownloadM e m ())
-> BlockDownloadM e m ()
withBlockForDownload p noBlockAction action = do
-- FIXME: busyloop-e46ad5e0
--
sto <- lift getStorage
h <- getBlockForDownload
here <- liftIO $ hasBlock sto h <&> isJust
getBlockForDownload peer = do
pa <- lift $ toPeerAddr peer
tinq <- asks (view blockInQ)
brains <- asks (view downloadBrains) brains <- asks (view downloadBrains)
prop <- asks (view blockProposed)
should <- shouldDownloadBlock brains p h inq <- liftIO $ readTVarIO tinq
let size = HashMap.size inq
if | here -> processBlock h if size == 0 then
| should -> onBlockDownloadAttempt brains p h >> action h pure Nothing
| otherwise -> noBlockAction >> addDownload mzero h else do
i <- randomRIO (0, size - 1)
let blk = HashMap.keys inq !! i
peers <- advisePeersForBlock @e brains blk
addBlockInfo :: (MonadIO m, MyPeer e) proposed <- liftIO $ Cache.lookup prop (blk, peer) <&> isJust
=> Peer e
-> Hash HbSync
-> Integer
-> BlockDownloadM e m ()
addBlockInfo pip h size = do r <- if | proposed -> do
-- debug $ "addBlockInfo" <+> pretty h <+> pretty pip <+> pretty size pure Nothing
tv <- asks (view blockPeers)
let mySize = HashMap.singleton pip size
liftIO $ atomically
$ modifyTVar tv (HashMap.insertWith (<>) h mySize)
getPeersForBlock :: (MonadIO m, MyPeer e) | List.null peers -> do
=> Hash HbSync pure $ Just blk
-> BlockDownloadM e m [(Peer e, Integer)]
getPeersForBlock h = do | pa `elem` peers -> do
tv <- asks (view blockPeers) pure $ Just blk
liftIO $ readTVarIO tv <&> foldMap HashMap.toList
. maybeToList | otherwise -> do
. HashMap.lookup h newOne <- shouldDownloadBlock @e brains peer blk
let chance = if newOne then 1 else 5
lucky <- liftIO $ shuffleM (True : replicate chance False) <&> headDef False
if lucky then
pure $ Just blk
else do
pure Nothing
case r of
Nothing -> none
Just h -> do
liftIO $ Cache.insert prop (h, peer) ()
pure r
processBlock :: forall e m . ( MonadIO m processBlock :: forall e m . ( MonadIO m
, HasStorage m , HasStorage m
@ -152,14 +143,15 @@ processBlock h = do
addDownload parent (fromHashRef b) addDownload parent (fromHashRef b)
Just (AnnRef h) -> do Just (AnnRef (AnnotatedHashRef ann hx)) -> do
addDownload parent h maybe1 ann none $ addDownload parent . fromHashRef
addDownload parent (fromHashRef hx)
Just (MerkleAnn ann) -> do Just (MerkleAnn ann) -> do
case (_mtaMeta ann) of case _mtaMeta ann of
NoMetaData -> pure () NoMetaData -> pure ()
ShortMetadata {} -> pure () ShortMetadata {} -> pure ()
AnnHashRef h -> addDownload parent h AnnHashRef hx -> addDownload parent hx
case (_mtaCrypt ann) of case (_mtaCrypt ann) of
NullEncryption -> pure () NullEncryption -> pure ()
@ -389,6 +381,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, Pretty (Peer e) , Pretty (Peer e)
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, PeerMessaging e , PeerMessaging e
, IsPeerAddr e m
) )
=> DownloadEnv e -> m () => DownloadEnv e -> m ()
blockDownloadLoop env0 = do blockDownloadLoop env0 = do
@ -399,6 +392,8 @@ blockDownloadLoop env0 = do
pl <- getPeerLocator @e pl <- getPeerLocator @e
pause @'Seconds 3.81
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 60 pause @'Seconds 60
debug "I'm peer thread sweeping thread" debug "I'm peer thread sweeping thread"
@ -424,11 +419,11 @@ blockDownloadLoop env0 = do
debug $ "peers to delete" <+> pretty (length r) debug $ "peers to delete" <+> pretty (length r)
for_ r $ delPeerThread . fst for_ r $ killPeerThread . fst
void $ liftIO $ async $ forever $ withPeerM e do void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 5 pause @'Seconds 1
debug "I'm a peer maintaining thread" -- debug "I'm a peer maintaining thread"
brains <- withDownload env0 $ asks (view downloadBrains) brains <- withDownload env0 $ asks (view downloadBrains)
pee <- knownPeers @e pl pee <- knownPeers @e pl
@ -454,8 +449,7 @@ blockDownloadLoop env0 = do
withDownload env0 $ newPeerThread p runPeer withDownload env0 $ newPeerThread p runPeer
| not auth -> do | not auth -> do
withDownload env0 $ delPeerThread p pure ()
-- pure ()
| otherwise -> pure () | otherwise -> pure ()
@ -508,24 +502,8 @@ blockDownloadLoop env0 = do
pause @'Seconds 5 -- FIXME: put to defaults pause @'Seconds 5 -- FIXME: put to defaults
-- we need to show download stats -- we need to show download stats
tinfo <- asks (view blockPeers) wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size
binfo <- liftIO $ readTVarIO tinfo let po = 0
wip <- asks (view blockWip)
liftIO $ Cache.purgeExpired wip
aliveWip <- Set.fromList <$> liftIO (Cache.keys wip)
let alive = HashMap.fromList [ (h,i)
| (h,i) <- HashMap.toList binfo
, Set.member h aliveWip
]
liftIO $ atomically $ writeTVar tinfo alive
po <- postponedNum
wipNum <- liftIO $ Cache.size wip
notice $ "maintain blocks wip" <+> pretty wipNum notice $ "maintain blocks wip" <+> pretty wipNum
<+> "postponed" <+> "postponed"
@ -535,10 +513,12 @@ blockDownloadLoop env0 = do
mapM_ processBlock blks mapM_ processBlock blks
fix \next -> do proposed <- asks (view blockProposed)
pause @'Seconds 30
debug "I'm a download loop. I don't do anything anymore" forever do
next pause @'Seconds 20
debug "block download loop. does not do anything"
liftIO $ Cache.purgeExpired proposed
postponedLoop :: forall e m . ( MyPeer e postponedLoop :: forall e m . ( MyPeer e
@ -553,48 +533,34 @@ postponedLoop :: forall e m . ( MyPeer e
postponedLoop env0 = do postponedLoop env0 = do
e <- ask e <- ask
void $ liftIO $ async $ withPeerM e $ withDownload env0 do pause @'Seconds 2.57
forever do
pause @'Seconds 10
mt <- asks (view downloadQ) >>= liftIO . atomically . isEmptyTQueue
debug $ "queue monitor thread" <+> "EMPTY:" <+> pretty mt
void $ liftIO $ async $ withPeerM e $ withDownload env0 do void $ liftIO $ async $ withPeerM e $ withDownload env0 do
-- wip <- asks (blockWip) >>= liftIO . Cache.keys q <- asks (view blockDelayTo)
wip0 <- asks (view blockWip) >>= liftIO . Cache.keys <&> length fix \next -> do
twip <- liftIO $ newTVarIO wip0 w <- liftIO $ atomically $ readTQueue q
pause defInterBlockDelay
forever do addDownload mzero w
pause @'Seconds 10 -- ws <- liftIO $ atomically $ flushTQueue q
wip1 <- asks (view blockWip) >>= liftIO . Cache.keys -- for_ (w:ws) $ addDownload mzero
wip2 <- liftIO $ readTVarIO twip next
trace $ "download stuck check" <+> pretty (length wip1) <+> pretty wip2
when (length wip1 == wip2 && not (null wip1)) do
debug "download stuck"
for_ wip1 $ \h -> do
removeFromWip h
addDownload Nothing h
wip3 <- asks (view blockWip) >>= liftIO . Cache.keys
liftIO $ atomically $ writeTVar twip (length wip3)
void $ liftIO $ async $ withPeerM e $ withDownload env0 do void $ liftIO $ async $ withPeerM e $ withDownload env0 do
forever do forever do
pause @'Seconds 30 pause @'Seconds 20
trace "UNPOSTPONE LOOP" trace "UNPOSTPONE LOOP"
po <- asks (view blockPostponedTo) >>= liftIO . Cache.toList po <- asks (view blockPostponedTo) >>= liftIO . Cache.toList
for_ po $ \(h, _, expired) -> do for_ po $ \(h, _, expired) -> do
when (isJust expired) do when (isJust expired) do
unpostponeBlock h unpostponeBlock h
peerDownloadLoop :: forall e m . ( MyPeer e peerDownloadLoop :: forall e m . ( MyPeer e
, Sessions e (KnownPeer e) m , Sessions e (KnownPeer e) m
, Request e (BlockInfo e) m , Request e (BlockInfo e) m
, EventListener e (BlockInfo e) m , EventListener e (BlockInfo e) m
, DownloadFromPeerStuff e m , DownloadFromPeerStuff e m
, HasPeerLocator e m , HasPeerLocator e m
, IsPeerAddr e m
, m ~ PeerM e IO , m ~ PeerM e IO
) => Peer e -> BlockDownloadM e m () ) => Peer e -> BlockDownloadM e m ()
peerDownloadLoop peer = do peerDownloadLoop peer = do
@ -605,6 +571,8 @@ peerDownloadLoop peer = do
pe <- lift ask pe <- lift ask
e <- ask e <- ask
brains <- asks (view downloadBrains)
let doBlockSizeRequest h = do let doBlockSizeRequest h = do
q <- liftIO newTQueueIO q <- liftIO newTQueueIO
lift do lift do
@ -653,21 +621,25 @@ peerDownloadLoop peer = do
Right{} -> do Right{} -> do
trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h
onBlockDownloaded brains peer h
processBlock h processBlock h
liftIO $ atomically do liftIO $ atomically do
writeTVar downFail 0 writeTVar downFail 0
modifyTVar downBlk succ modifyTVar downBlk succ
let noBlkAction = do let warnExit = warn $ "peer loop exit" <+> pretty peer
liftIO yield -- let stopLoop = none
forever do idle <- liftIO $ newTVarIO 0
fix \next -> do
let thenNext m = m >> next
liftIO do liftIO do
Cache.purgeExpired sizeCache Cache.purgeExpired sizeCache
Cache.purgeExpired noBlock Cache.purgeExpired noBlock
npi <- newPeerInfo npi <- newPeerInfo
auth' <- lift $ find (KnownPeerKey peer) id auth' <- lift $ find (KnownPeerKey peer) id
@ -678,53 +650,68 @@ peerDownloadLoop peer = do
let noAuth = do let noAuth = do
let authNone = if isNothing auth' then "noauth" else "" let authNone = if isNothing auth' then "noauth" else ""
warn ( "lost peer auth" <+> pretty peer <+> pretty authNone ) warn ( "lost peer auth" <+> pretty peer <+> pretty authNone )
pause @'Seconds 1 warnExit
-- liftIO $ withPeerM pe $ sendPing @e peer
-- -- FIXME: time-hardcode
-- pause @'Seconds 3
-- found <- lift $ find (KnownPeerKey peer) id <&> isJust
-- unless found do
-- warn ( "peer lost. stopping peer loop" <+> pretty peer )
maybe1 mbauth noAuth $ \(_,_) -> do maybe1 mbauth noAuth $ \_ -> do
withBlockForDownload peer noBlkAction $ \h -> do pt' <- getPeerThread peer
-- TODO: insert-busyloop-counter-for-block-request
-- trace $ "withBlockForDownload" <+> pretty peer <+> pretty h
mbSize <- liftIO $ Cache.lookup sizeCache h maybe1 pt' warnExit $ \pt -> do
noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust
case mbSize of liftIO $ atomically $ modifyTVar (view peerBlocksWip pt) (max 0 . pred)
Just size -> do
trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size
updateBlockPeerSize h peer size
tryDownload pinfo h size
Nothing | noBlk -> do mbh <- getBlockForDownload peer
trace $ pretty peer <+> "does not have block" <+> pretty h
addDownload mzero h
Nothing -> do case mbh of
incBlockSizeReqCount h Nothing -> thenNext do
idleNum <- liftIO $ atomically $ stateTVar idle $ \x -> (x, succ x)
r <- doBlockSizeRequest h when (idleNum > 5) do
trace $ "peer IDLE" <+> pretty peer
liftIO $ atomically $ writeTVar idle 0
x <- lift $ randomRIO (2.85, 10.47)
pause @'Seconds (realToFrac x)
case r of Just h -> thenNext do
Left{} -> failedDownload peer h
Right Nothing -> do liftIO $ atomically $ writeTVar idle 0
-- FIXME: non-existent-block-ruins-all
here <- liftIO $ Cache.lookup noBlock h <&> isJust
unless here $ lift $ onBlockDownloadAttempt brains peer h
liftIO $ Cache.insert noBlock h ()
addDownload mzero h trace $ "start download block" <+> pretty peer <+> pretty h
Right (Just s) -> do mbSize <- liftIO $ Cache.lookup sizeCache h
updateBlockPeerSize h peer s noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust
tryDownload pinfo h s
case mbSize of
Just size -> do
trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size
tryDownload pinfo h size
Nothing | noBlk -> do
trace $ pretty peer <+> "does not have block" <+> pretty h
addDownload mzero h
Nothing -> do
r <- doBlockSizeRequest h
case r of
Left{} -> failedDownload peer h
Right Nothing -> do
here <- liftIO $ Cache.lookup noBlock h <&> isJust
unless here $
liftIO $ Cache.insert noBlock h ()
addDownload mzero h
Right (Just s) -> do
tryDownload pinfo h s
warnExit
void $ delPeerThreadData peer
-- NOTE: this is an adapter for a ResponseM monad -- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!) -- because response is working in ResponseM monad (ha!)

View File

@ -12,17 +12,32 @@ import HBS2.System.Logger.Simple
import Data.Maybe import Data.Maybe
import Control.Monad import Control.Monad
import Control.Exception
import Control.Concurrent.STM import Control.Concurrent.STM
import Data.HashMap.Strict import Control.Concurrent.Async
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict (HashMap)
import Data.List qualified as List import Data.List qualified as List
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Text.InterpolatedString.Perl6 (qc)
import Database.SQLite.Simple
import Database.SQLite.Simple.FromField
import System.Random (randomRIO)
import Data.Word
class HasBrains e a where class HasBrains e a where
onKnownPeers :: MonadIO m => a -> [Peer e] -> m () onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
onBlockDownloadAttempt :: MonadIO m => a -> Peer e -> Hash HbSync -> m () onBlockDownloadAttempt :: ( MonadIO m
, IsPeerAddr e m
)
=> a
-> Peer e
-> Hash HbSync
-> m ()
onBlockDownloaded :: MonadIO m onBlockDownloaded :: MonadIO m
=> a => a
@ -53,6 +68,11 @@ class HasBrains e a where
-> Hash HbSync -> Hash HbSync
-> m Bool -> m Bool
advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e))
=> a
-> Hash HbSync
-> m [PeerAddr e]
type NoBrains = () type NoBrains = ()
instance Pretty (Peer e) => HasBrains e NoBrains where instance Pretty (Peer e) => HasBrains e NoBrains where
@ -74,6 +94,8 @@ instance Pretty (Peer e) => HasBrains e NoBrains where
shouldDownloadBlock _ _ _ = pure True shouldDownloadBlock _ _ _ = pure True
advisePeersForBlock _ _ = pure mempty
data SomeBrains e = forall a . HasBrains e a => SomeBrains a data SomeBrains e = forall a . HasBrains e a => SomeBrains a
instance HasBrains e (SomeBrains e) where instance HasBrains e (SomeBrains e) where
@ -84,11 +106,15 @@ instance HasBrains e (SomeBrains e) where
claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a
shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a
shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a
advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a
data BasicBrains e = data BasicBrains e =
BasicBrains BasicBrains
{ _brainsPeers :: TVar [Peer e] { _brainsPeers :: TVar [Peer e]
, _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int ) , _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int )
, _brainsExpire :: Cache (Hash HbSync) ()
, _brainsDb :: Connection
, _brainsPipeline :: TQueue (IO ())
} }
makeLenses 'BasicBrains makeLenses 'BasicBrains
@ -100,28 +126,34 @@ cleanupPostponed b h = do
let flt (_,h1) _ = h1 /= h let flt (_,h1) _ = h1 /= h
liftIO $ atomically $ modifyTVar po $ HashMap.filterWithKey flt liftIO $ atomically $ modifyTVar po $ HashMap.filterWithKey flt
instance Hashable (Peer e) => HasBrains e (BasicBrains e) where instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) where
onKnownPeers br ps = do onKnownPeers br ps = do
trace "BRAINS: onKnownPeers" -- trace "BRAINS: onKnownPeers"
let tv = view brainsPeers br let tv = view brainsPeers br
liftIO $ atomically $ writeTVar tv ps liftIO $ atomically $ writeTVar tv ps
onBlockDownloadAttempt b peer h = do onBlockDownloadAttempt b peer h = do
trace "BRAINS: onBlockDownloadAttempt" -- trace "BRAINS: onBlockDownloadAttempt"
let doAlter = HashMap.alter (maybe (Just 0) (Just . succ)) (peer,h) noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null
liftIO $ atomically $ modifyTVar (view brainsPostponeDown b) doAlter unless noPeers do
let cache = view brainsExpire b
liftIO $ Cache.insert cache h ()
let doAlter = HashMap.alter (maybe (Just 0) (Just . succ)) (peer,h)
liftIO $ atomically $ modifyTVar (view brainsPostponeDown b) doAlter
onBlockDownloaded b p h = do onBlockDownloaded b p h = do
trace "BRAINS: onBlockDownloaded" -- trace $ "BRAINS: onBlockDownloaded" <+> pretty p <+> pretty h
cleanupPostponed b h cleanupPostponed b h
updateOP b $ insertPeer b h p
onBlockPostponed b h = do onBlockPostponed b h = do
trace "BRAINS: onBlockPostponed" trace $ "BRAINS: onBlockPostponed" <+> pretty h
cleanupPostponed b h cleanupPostponed b h
claimBlockCameFrom _ _ _ = do claimBlockCameFrom b f t = do
trace "BRAINS: claimBlockCameFrom" -- trace $ "BRAINS: claimBlockCameFrom" <+> pretty f <+> pretty t
updateOP b $ insertAncestor b f t
shouldPostponeBlock b h = do shouldPostponeBlock b h = do
peers <- liftIO $ readTVarIO (view brainsPeers b) peers <- liftIO $ readTVarIO (view brainsPeers b)
@ -136,18 +168,202 @@ instance Hashable (Peer e) => HasBrains e (BasicBrains e) where
pure postpone pure postpone
shouldDownloadBlock b p h = do shouldDownloadBlock b p h = do
noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null
downs <- liftIO $ readTVarIO (view brainsPostponeDown b) downs <- liftIO $ readTVarIO (view brainsPostponeDown b)
pure $ HashMap.lookup (p,h) downs & fromMaybe 0 & (<2) pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<2))
advisePeersForBlock b h = do
r <- liftIO $ findPeers b h
pure $ mapMaybe fromStringMay r
updateOP :: forall e m . MonadIO m => BasicBrains e -> IO () -> m ()
updateOP br op = do
let pip = view brainsPipeline br
liftIO $ atomically $ writeTQueue pip (liftIO op)
insertAncestor :: BasicBrains e
-> Hash HbSync -- ^ parent
-> Hash HbSync -- ^ child
-> IO ()
insertAncestor br parent child = do
-- trace $ "INSERT ANCESTOR" <+> pretty parent <+> pretty child
let conn = view brainsDb br
void $ liftIO $ execute conn [qc|
insert into ancestors (child, parent) values (?,?)
on conflict (child,parent) do nothing
|] (show $ pretty child, show $ pretty parent)
insertPeer :: forall e . Pretty (Peer e)
=> BasicBrains e
-> Hash HbSync -- ^ block
-> Peer e -- ^ peer
-> IO ()
insertPeer br blk peer = do
-- trace $ "INSERT PEER" <+> pretty peer <+> pretty blk
let conn = view brainsDb br
void $ liftIO $ execute conn [qc|
insert into seenby (block, peer) values (?,?)
on conflict (block,peer) do nothing
|] (show $ pretty blk, show $ pretty peer)
newtype DBData a = DBData { fromDBData :: a }
instance FromField (DBData (Hash HbSync)) where
fromField = fmap (DBData . fromString) . fromField @String
getAncestors :: forall e m . (MonadIO m)
=> BasicBrains e
-> Hash HbSync
-> m [Hash HbSync]
getAncestors br child = do
let conn = view brainsDb br
let sql = [qc|
WITH RECURSIVE ancestor_list(parent) AS (
SELECT parent
FROM ancestors
WHERE child = ?
UNION
SELECT a.parent
FROM ancestors a
JOIN ancestor_list al ON a.child = al.parent
)
SELECT parent FROM ancestor_list;
|]
liftIO $ query conn sql (Only (show $ pretty child) )
<&> fmap (fromDBData . fromOnly)
findPeers :: BasicBrains e
-> Hash HbSync
-> IO [String]
findPeers br child = do
let conn = view brainsDb br
let sql = [qc|
WITH RECURSIVE ancestor_list(parent) AS (
SELECT parent
FROM ancestors
WHERE child = ?
UNION
SELECT a.parent
FROM ancestors a
JOIN ancestor_list al ON a.child = al.parent
)
SELECT s.peer
FROM ancestor_list a
JOIN seenby s on s.block = a.parent;
|]
liftIO $ query conn sql (Only (show $ pretty child) ) <&> fmap fromOnly
cleanupHashes :: BasicBrains e
-> IO ()
cleanupHashes br = do
debug "BRAINS: cleanup caches"
let conn = view brainsDb br
let sql = [qc|
SAVEPOINT zzz1;
DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600;
DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600;
RELEASE SAVEPOINT zzz1;
|]
r <- try $ liftIO $ execute_ conn sql
case r of
Right{} -> pure ()
Left (e :: SomeException) -> err $ "BRAINS: " <+> viaShow e
transactional :: BasicBrains e -> IO () -> IO ()
transactional brains action = do
n <- randomRIO @Word16 (1, maxBound)
let sp = [qc|sp{n}|] :: String
let conn = view brainsDb brains
execute_ conn [qc|SAVEPOINT {sp}|]
try action >>= \case
Right{} -> do
execute_ conn [qc|RELEASE SAVEPOINT {sp}|]
Left ( _ :: SomeException ) -> do
execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|]
-- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => m (BasicBrains e) newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => m (BasicBrains e)
newBasicBrains = liftIO do newBasicBrains = liftIO do
conn <- open ":memory:"
execute_ conn [qc|
create table if not exists ancestors
( child text not null
, parent text not null
, ts DATE DEFAULT (datetime('now','localtime'))
, primary key (child,parent))
|]
execute_ conn [qc|
create table if not exists seenby
( block text not null
, peer text not null
, ts DATE DEFAULT (datetime('now','localtime'))
, primary key (block,peer))
|]
BasicBrains <$> newTVarIO mempty BasicBrains <$> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds)))
<*> pure conn
<*> newTQueueIO
runBasicBrains :: MonadIO m => BasicBrains e -> m () runBasicBrains :: MonadIO m => BasicBrains e -> m ()
runBasicBrains brains = forever do runBasicBrains brains = do
pause @'Seconds 20
debug "BRAINS!" let pip = view brainsPipeline brains
pure() let expire = view brainsExpire brains
-- FIXME: async-error-handling
void $ liftIO $ async $ forever do
pause @'Seconds 5
-- transactional brains do
w <- atomically $ readTQueue pip
ws <- atomically $ flushTQueue pip
transactional brains (sequence_ (w:ws))
void $ liftIO $ async $ forever do
pause @'Seconds 60
updateOP brains (cleanupHashes brains)
void $ forever do
pause @'Seconds 15
ee <- liftIO $ Cache.toList expire
let eee = [ h | (h,_,Just{}) <- ee ]
forM_ eee $ \h -> do
cleanupPostponed brains h
liftIO $ Cache.purgeExpired expire

View File

@ -403,7 +403,7 @@ instance ( Monad m
-- runPeer :: forall e . (e ~ UDP, Nonce (RefLogUpdate e) ~ BS.ByteString) => PeerOpts -> IO () -- runPeer :: forall e . (e ~ UDP, Nonce (RefLogUpdate e) ~ BS.ByteString) => PeerOpts -> IO ()
runPeer :: forall e . (e ~ UDP) => PeerOpts -> IO () runPeer :: forall e . (e ~ UDP, FromStringMaybe (PeerAddr e)) => PeerOpts -> IO ()
runPeer opts = Exception.handle myException $ do runPeer opts = Exception.handle myException $ do
@ -473,8 +473,6 @@ runPeer opts = Exception.handle myException $ do
notice $ "run peer" <+> pretty (AsBase58 (view peerSignPk pc)) notice $ "run peer" <+> pretty (AsBase58 (view peerSignPk pc))
s <- simpleStorageInit @HbSync (Just pref) s <- simpleStorageInit @HbSync (Just pref)
let blk = liftIO . hasBlock s let blk = liftIO . hasBlock s
@ -882,7 +880,7 @@ rpcClientMain opt action = do
setLoggingOff @DEBUG setLoggingOff @DEBUG
action action
withRPC :: RPCOpt -> RPC UDP -> IO () withRPC :: FromStringMaybe (PeerAddr UDP) => RPCOpt -> RPC UDP -> IO ()
withRPC o cmd = rpcClientMain o $ do withRPC o cmd = rpcClientMain o $ do
hSetBuffering stdout LineBuffering hSetBuffering stdout LineBuffering
@ -992,7 +990,7 @@ withRPC o cmd = rpcClientMain o $ do
void $ waitAnyCatchCancel [mrpc, prpc] void $ waitAnyCatchCancel [mrpc, prpc]
runRpcCommand :: RPCOpt -> RPCCommand -> IO () runRpcCommand :: FromStringMaybe (IPAddrPort UDP) => RPCOpt -> RPCCommand -> IO ()
runRpcCommand opt = \case runRpcCommand opt = \case
POKE -> withRPC opt RPCPoke POKE -> withRPC opt RPCPoke
PING s _ -> withRPC opt (RPCPing s) PING s _ -> withRPC opt (RPCPing s)

View File

@ -135,28 +135,26 @@ data BlockState =
makeLenses 'BlockState makeLenses 'BlockState
data PeerTask e = DoDownload newtype PeerTask e = DoDownload (Hash HbSync)
deriving newtype (Pretty)
data PeerThread e = data PeerThread e =
PeerThread PeerThread
{ _peerThreadAsync :: Async () { _peerThreadAsync :: Async ()
, _peerThreadMailbox :: TQueue (PeerTask e) , _peerThreadMailbox :: TQueue (PeerTask e)
, _peerBlocksWip :: TVar Int
} }
makeLenses 'PeerThread makeLenses 'PeerThread
data DownloadEnv e = data DownloadEnv e =
DownloadEnv DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync) { _blockInQ :: TVar (HashMap (Hash HbSync) ())
, _peerBusy :: TVar (HashMap (Peer e) ())
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) ()
, _blockState :: TVar (HashMap (Hash HbSync) BlockState)
, _blockInQ :: TVar (HashMap (Hash HbSync) ())
, _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e))
, _blockStored :: Cache (Hash HbSync) ()
, _blockPostponed :: TVar (HashMap (Hash HbSync) () ) , _blockPostponed :: TVar (HashMap (Hash HbSync) () )
, _blockPostponedTo :: Cache (Hash HbSync) () , _blockPostponedTo :: Cache (Hash HbSync) ()
, _blockDelayTo :: TQueue (Hash HbSync)
, _blockProposed :: Cache (Hash HbSync, Peer e) ()
, _downloadBrains :: SomeBrains e , _downloadBrains :: SomeBrains e
} }
@ -165,16 +163,12 @@ makeLenses 'DownloadEnv
newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e) newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e)
newDownloadEnv brains = liftIO do newDownloadEnv brains = liftIO do
DownloadEnv <$> newTQueueIO DownloadEnv <$> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockBanTime) <*> Cache.newCache (Just defBlockBanTime)
<*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (2 :: Timeout 'Seconds)))
<*> pure (SomeBrains brains) <*> pure (SomeBrains brains)
newtype BlockDownloadM e m a = newtype BlockDownloadM e m a =
@ -190,47 +184,6 @@ newtype BlockDownloadM e m a =
withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a
withDownload e m = runReaderT ( fromBlockDownloadM m ) e withDownload e m = runReaderT ( fromBlockDownloadM m ) e
setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m ()
setBlockState h s = do
sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s)
setBlockHasSize :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
setBlockHasSize h = do
blk <- fetchBlockState h
liftIO $ atomically $ writeTVar (view bsHasSize blk) True
fetchBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState
fetchBlockState h = do
sh <- asks (view blockState)
liftIO do
now <- getTimeCoarse
tvlast <- newTVarIO now
tvreq <- newTVarIO 0
tvsz <- newTVarIO False
let defState = BlockState now tvreq tvlast tvsz
atomically $ stateTVar sh $ \hm -> case HashMap.lookup h hm of
Nothing -> (defState, HashMap.insert h defState hm)
Just x -> (x, hm)
delBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
delBlockState h = do
sh <- asks (view blockState)
liftIO $ atomically $ modifyTVar' sh (HashMap.delete h)
incBlockSizeReqCount :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
incBlockSizeReqCount h = do
blk <- fetchBlockState h
now <- liftIO getTimeCoarse
seen <- liftIO $ readTVarIO (view bsLastSeen blk)
let elapsed = realToFrac (toNanoSecs (now - seen)) / 1e9
noSize <- liftIO $ readTVarIO (view bsHasSize blk) <&> not
when (elapsed > 1.0 && noSize) do
liftIO $ atomically $ do
writeTVar (view bsLastSeen blk) now
modifyTVar (view bsReqSizeTimes blk) succ
isBlockHereCached :: forall e m . ( MyPeer e isBlockHereCached :: forall e m . ( MyPeer e
, MonadIO m , MonadIO m
@ -239,27 +192,9 @@ isBlockHereCached :: forall e m . ( MyPeer e
=> Hash HbSync -> BlockDownloadM e m Bool => Hash HbSync -> BlockDownloadM e m Bool
isBlockHereCached h = do isBlockHereCached h = do
szcache <- asks (view blockStored)
sto <- lift getStorage sto <- lift getStorage
liftIO $ hasBlock sto h <&> isJust
cached <- liftIO $ Cache.lookup szcache h
case cached of
Just{} -> pure True
Nothing -> liftIO do
blk <- hasBlock sto h <&> isJust
when blk $ Cache.insert szcache h ()
pure blk
checkForDownload :: forall e m . ( MyPeer e
, MonadIO m
, HasPeerLocator e (BlockDownloadM e m)
, HasStorage m -- (BlockDownloadM e m)
)
=> ByteString -> BlockDownloadM e m ()
checkForDownload lbs = do
pure ()
type DownloadConstr e m = ( MyPeer e type DownloadConstr e m = ( MyPeer e
, MonadIO m , MonadIO m
@ -277,42 +212,18 @@ addDownload :: forall e m . ( DownloadConstr e m
addDownload mbh h = do addDownload mbh h = do
tinq <- asks (view blockInQ) tinq <- asks (view blockInQ)
brains <- asks (view downloadBrains) brains <- asks (view downloadBrains)
here <- isBlockHereCached h
postponed <- isPostponed h if here then do
removeFromWip h
unless postponed do else do
maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h
postpone <- shouldPostponeBlock @e brains h postpone <- shouldPostponeBlock @e brains h
if postpone then do
when postpone do
-- trace $ "addDownload postpone" <+> pretty postpone <+> pretty h
postponeBlock h postponeBlock h
else do
doAdd <- do liftIO $ atomically $ stateTVar tinq liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h ()
\hm -> case HashMap.lookup h hm of
Nothing -> (True, HashMap.insert h () hm)
Just{} -> (False, HashMap.insert h () hm)
notHere <- isBlockHereCached h <&> not
when (doAdd && notHere && not postpone) do
trace $ "addDownload" <+> pretty h
q <- asks (view downloadQ)
wip <- asks (view blockWip)
liftIO do
atomically $ do
modifyTVar tinq $ HashMap.insert h ()
writeTQueue q h
Cache.insert wip h ()
postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int
postponedNum = do postponedNum = do
@ -324,14 +235,22 @@ isPostponed h = do
po <- asks (view blockPostponed) >>= liftIO . readTVarIO po <- asks (view blockPostponed) >>= liftIO . readTVarIO
pure $ HashMap.member h po pure $ HashMap.member h po
delayLittleBit :: forall e m . (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m ()
delayLittleBit h = do
q <- asks (view blockDelayTo)
liftIO $ atomically $ writeTQueue q h
postponeBlock :: forall e m . (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m () postponeBlock :: forall e m . (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m ()
postponeBlock h = do postponeBlock h = do
brains <- asks (view downloadBrains) brains <- asks (view downloadBrains)
po <- asks (view blockPostponed) po <- asks (view blockPostponed)
tto <- asks (view blockPostponedTo) tto <- asks (view blockPostponedTo)
tinq <- asks (view blockInQ)
liftIO $ do liftIO $ do
liftIO $ atomically $ modifyTVar tinq $ HashMap.delete h
already <- atomically $ readTVar po <&> HashMap.member h already <- atomically $ readTVar po <&> HashMap.member h
unless already do unless already do
atomically $ modifyTVar po (HashMap.insert h ()) atomically $ modifyTVar po (HashMap.insert h ())
@ -353,16 +272,8 @@ unpostponeBlock h = do
removeFromWip :: (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m () removeFromWip :: (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m ()
removeFromWip h = do removeFromWip h = do
wip <- asks (view blockWip)
st <- asks (view blockState)
sz <- asks (view blockPeers)
tinq <- asks (view blockInQ) tinq <- asks (view blockInQ)
liftIO $ Cache.delete wip h
liftIO $ atomically $ do liftIO $ atomically $ do
modifyTVar' st (HashMap.delete h)
modifyTVar' sz (HashMap.delete h)
modifyTVar' tinq (HashMap.delete h) modifyTVar' tinq (HashMap.delete h)
hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool
@ -370,14 +281,47 @@ hasPeerThread p = do
threads <- asks (view peerThreads) threads <- asks (view peerThreads)
liftIO $ readTVarIO threads <&> HashMap.member p liftIO $ readTVarIO threads <&> HashMap.member p
getPeerThreads :: (MyPeer e, MonadIO m) => BlockDownloadM e m [(Peer e, PeerThread e)]
delPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m () getPeerThreads = do
delPeerThread p = do
debug $ "delPeerThread" <+> pretty p
threads <- asks (view peerThreads) threads <- asks (view peerThreads)
pt <- liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x liftIO $ atomically $ readTVar threads <&> HashMap.toList
in (t, HashMap.delete p x))
getPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e))
getPeerThread p = do
threads <- asks (view peerThreads)
liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
getPeerTask :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerTask e))
getPeerTask p = do
threads <- asks (view peerThreads)
pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
maybe1 pt' (pure Nothing) $ \pt -> do
liftIO $ atomically $ readTQueue (view peerThreadMailbox pt) <&> Just
addPeerTask :: (MyPeer e, MonadIO m)
=> Peer e
-> PeerTask e
-> BlockDownloadM e m ()
addPeerTask p t = do
trace $ "ADD-PEER-TASK" <+> pretty p <+> pretty t
threads <- asks (view peerThreads)
liftIO $ atomically $ do
pt' <- readTVar threads <&> HashMap.lookup p
maybe1 pt' none $ \pt -> do
writeTQueue (view peerThreadMailbox pt) t
modifyTVar (view peerBlocksWip pt) succ
delPeerThreadData :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e))
delPeerThreadData p = do
debug $ "delPeerThreadData" <+> pretty p
threads <- asks (view peerThreads)
liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x
in (t, HashMap.delete p x))
killPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m ()
killPeerThread p = do
debug $ "delPeerThread" <+> pretty p
pt <- delPeerThreadData p
maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync
newPeerThread :: ( MyPeer e newPeerThread :: ( MyPeer e
@ -395,10 +339,23 @@ newPeerThread p m = do
void $ lift $ fetch True npi (PeerInfoKey p) id void $ lift $ fetch True npi (PeerInfoKey p) id
q <- liftIO newTQueueIO q <- liftIO newTQueueIO
let pt = PeerThread m q tnum <- liftIO $ newTVarIO 0
let pt = PeerThread m q tnum
threads <- asks (view peerThreads) threads <- asks (view peerThreads)
liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt
getPeerTaskWip :: ( MyPeer e
, MonadIO m
-- , Sessions e (PeerInfo e) m
-- , Sessions e (PeerInfo e) (BlockDownloadM e m)
)
=> Peer e
-> BlockDownloadM e m Int
getPeerTaskWip p = do
threads <- asks (view peerThreads)
pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p
maybe1 pt' (pure 0) $ \pt -> do
liftIO $ readTVarIO (view peerBlocksWip pt)
failedDownload :: forall e m . ( MyPeer e failedDownload :: forall e m . ( MyPeer e
, MonadIO m , MonadIO m
@ -416,24 +373,6 @@ failedDownload p h = do
-- FIXME: brains-download-fail -- FIXME: brains-download-fail
updateBlockPeerSize :: forall e m . (MyPeer e, MonadIO m)
=> Hash HbSync
-> Peer e
-> Integer
-> BlockDownloadM e m ()
updateBlockPeerSize h p s = do
tv <- asks (view blockPeers)
setBlockHasSize h
let alt = \case
Nothing -> Just $ HashMap.singleton p s
Just hm -> Just $ HashMap.insert p s hm
liftIO $ atomically $ modifyTVar tv (HashMap.alter alt h)
forKnownPeers :: forall e m . ( MonadIO m forKnownPeers :: forall e m . ( MonadIO m
, HasPeerLocator e m , HasPeerLocator e m
, Sessions e (KnownPeer e) m , Sessions e (KnownPeer e) m

View File

@ -148,8 +148,9 @@ reflogWorker conf adapter = do
SeqRef (SequentialRef _ (AnnotatedHashRef _ ref)) -> do SeqRef (SequentialRef _ (AnnotatedHashRef _ ref)) -> do
liftIO $ reflogDownload adapter (fromHashRef ref) liftIO $ reflogDownload adapter (fromHashRef ref)
AnnRef ref -> do -- TODO: asap-download-annotation-as-well
liftIO $ reflogDownload adapter ref AnnRef (AnnotatedHashRef _ ref) -> do
liftIO $ reflogDownload adapter (fromHashRef ref)
-- TODO: support-other-data-structures -- TODO: support-other-data-structures
_ -> pure () _ -> pure ()

View File

@ -45,6 +45,7 @@ common common-deps
, split , split
, stm , stm
, streaming , streaming
, sqlite-simple
, temporary , temporary
, text , text
, timeit , timeit

View File

@ -9,7 +9,6 @@ import HBS2.Net.Auth.AccessKey
import HBS2.Net.Auth.Credentials import HBS2.Net.Auth.Credentials
import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Messaging.UDP (UDP)
import HBS2.Net.Proto.Definition() import HBS2.Net.Proto.Definition()
import HBS2.Net.Proto.Types
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Storage.Simple import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra import HBS2.Storage.Simple.Extra
@ -17,36 +16,31 @@ import HBS2.OrDie
import HBS2.Net.Proto.ACB import HBS2.Net.Proto.ACB
import Control.Arrow ((&&&)) import HBS2.System.Logger.Simple hiding (info)
import HBS2.System.Logger.Simple qualified as Log
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.State.Strict
import Crypto.Saltine.Core.Box qualified as Encrypt import Crypto.Saltine.Core.Box qualified as Encrypt
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Char8 qualified as BS8
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.Either
import Data.Function import Data.Function
import Data.Functor import Data.Functor
import Data.List qualified as List import Data.List qualified as List
import Data.Map.Strict qualified as Map import Data.Map.Strict qualified as Map
import Data.Monoid qualified as Monoid import Data.Monoid qualified as Monoid
import Data.Text (Text)
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import Options.Applicative import Options.Applicative
import Prettyprinter
import System.Directory import System.Directory
import Data.Maybe import Data.Maybe
import Lens.Micro.Platform import Lens.Micro.Platform
-- import System.FilePath.Posix -- import System.FilePath.Posix
import System.IO import System.IO
import System.Exit import System.Exit
import System.ProgressBar
import Codec.Serialise import Codec.Serialise
@ -54,6 +48,24 @@ import Streaming.Prelude qualified as S
-- import Streaming qualified as S -- import Streaming qualified as S
logPrefix s = set loggerTr (s <>)
tracePrefix :: SetLoggerEntry
tracePrefix = logPrefix "[trace] "
debugPrefix :: SetLoggerEntry
debugPrefix = logPrefix "[debug] "
errorPrefix :: SetLoggerEntry
errorPrefix = logPrefix "[error] "
warnPrefix :: SetLoggerEntry
warnPrefix = logPrefix "[warn] "
noticePrefix :: SetLoggerEntry
noticePrefix = logPrefix "[notice] "
newtype CommonOpts = newtype CommonOpts =
CommonOpts CommonOpts
{ _coPref :: Maybe StoragePrefix { _coPref :: Maybe StoragePrefix
@ -142,7 +154,7 @@ runCat opts ss = do
liftIO $ do liftIO $ do
let walk h = walkMerkle h (getBlock ss) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do let stepInside hr =
case hr of case hr of
Left hx -> void $ hPrint stderr $ "missed block:" <+> pretty hx Left hx -> void $ hPrint stderr $ "missed block:" <+> pretty hx
Right (hrr :: [HashRef]) -> do Right (hrr :: [HashRef]) -> do
@ -155,6 +167,9 @@ runCat opts ss = do
Nothing -> die $ show $ "missed block: " <+> pretty hx Nothing -> die $ show $ "missed block: " <+> pretty hx
Just blk -> LBS.putStr blk Just blk -> LBS.putStr blk
let walk h = walkMerkle h (getBlock ss) stepInside
-- FIXME: switch-to-deep-scan
-- TODO: to-the-library -- TODO: to-the-library
let walkAnn :: MTreeAnn [HashRef] -> IO () let walkAnn :: MTreeAnn [HashRef] -> IO ()
walkAnn ann = do walkAnn ann = do
@ -202,7 +217,8 @@ runCat opts ss = do
case q of case q of
Blob h -> getBlock ss h >>= maybe (die "blob not found") LBS.putStr Blob h -> getBlock ss h >>= maybe (die "blob not found") LBS.putStr
Merkle h -> walk h Merkle t -> walkMerkleTree t (getBlock ss) stepInside
MerkleAnn ann -> walkAnn ann MerkleAnn ann -> walkAnn ann
-- FIXME: what-if-multiple-seq-ref-? -- FIXME: what-if-multiple-seq-ref-?
@ -483,6 +499,15 @@ runRefLogGet s ss = do
withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO ()
withStore opts f = do withStore opts f = do
setLogging @DEBUG debugPrefix
setLogging @INFO defLog
setLogging @ERROR errorPrefix
setLogging @WARN warnPrefix
setLogging @NOTICE noticePrefix
setLoggingOff @TRACE
xdg <- getXdgDirectory XdgData defStorePath <&> fromString xdg <- getXdgDirectory XdgData defStorePath <&> fromString
let pref = uniLastDef xdg opts :: StoragePrefix let pref = uniLastDef xdg opts :: StoragePrefix
@ -496,6 +521,13 @@ withStore opts f = do
_ <- waitAnyCatch w _ <- waitAnyCatch w
setLoggingOff @DEBUG
setLoggingOff @INFO
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE
setLoggingOff @TRACE
pure () pure ()
main :: IO () main :: IO ()
@ -511,7 +543,8 @@ main = join . customExecParser (prefs showHelpOnError) $
<> command "cat" (info pCat (progDesc "cat block")) <> command "cat" (info pCat (progDesc "cat block"))
<> command "hash" (info pHash (progDesc "calculates hash")) <> command "hash" (info pHash (progDesc "calculates hash"))
<> command "fsck" (info pFsck (progDesc "check storage constistency")) <> command "fsck" (info pFsck (progDesc "check storage constistency"))
<> command "del" ( info pDel (progDesc "del block")) <> command "deps" ( info pDeps (progDesc "print dependencies"))
<> command "del" ( info pDel (progDesc "del block"))
<> command "keyring-new" (info pNewKey (progDesc "generates a new keyring")) <> command "keyring-new" (info pNewKey (progDesc "generates a new keyring"))
<> command "keyring-list" (info pKeyList (progDesc "list public keys from keyring")) <> command "keyring-list" (info pKeyList (progDesc "list public keys from keyring"))
<> command "keyring-key-add" (info pKeyAdd (progDesc "adds a new keypair into the keyring")) <> command "keyring-key-add" (info pKeyAdd (progDesc "adds a new keypair into the keyring"))
@ -626,12 +659,53 @@ main = join . customExecParser (prefs showHelpOnError) $
forM_ rs $ \(h,f) -> do forM_ rs $ \(h,f) -> do
print $ fill 24 (pretty f) <+> pretty h print $ fill 24 (pretty f) <+> pretty h
pDeps = do
-- TODO: reflog-del-command-- TODO: reflog-del-command
pDel = do
o <- common o <- common
h <- strArgument ( metavar "HASH" ) h <- strArgument ( metavar "HASH" )
pure $ withStore o $ \sto -> do pure $ withStore o $ \sto -> do
delBlock sto h deepScan ScanDeep (const none) h (getBlock sto) $ \ha -> do
print $ pretty ha
-- TODO: reflog-del-command-- TODO: reflog-del-command
pDel = do
o <- common
recurse <- optional (flag' True ( short 'r' <> long "recursive" <> help "try to delete all blocks recursively" )
) <&> fromMaybe False
dontAsk <- optional ( flag' True ( short 'y' <> long "yes" <> help "don't ask permission to delete block")
) <&> fromMaybe False
h <- strArgument ( metavar "HASH" )
pure $ withStore o $ \sto -> do
setLogging @TRACE tracePrefix
hSetBuffering stdin NoBuffering
q <- liftIO newTQueueIO
if not recurse then
liftIO $ atomically $ writeTQueue q h
else do
-- hPrint stderr $ "recurse" <+> pretty h
deepScan ScanDeep (const none) h (getBlock sto) $ \ha -> do
liftIO $ atomically $ writeTQueue q ha
deps <- liftIO $ atomically $ flushTQueue q
forM_ deps $ \d -> do
doDelete <- if dontAsk then do
pure True
else do
hPutStr stderr $ show $ "Are you sure to delete block" <+> pretty d <+> "[y/n]: "
y <- getChar
hPutStrLn stderr ""
pure $ y `elem` ['y','Y']
when doDelete do
delBlock sto d
hFlush stderr
print $ "deleted" <+> pretty d
hFlush stdout

View File

@ -90,6 +90,7 @@ executable hbs2
, uniplate , uniplate
, uuid , uuid
, terminal-progress-bar , terminal-progress-bar
, stm
hs-source-dirs: . hs-source-dirs: .
default-language: Haskell2010 default-language: Haskell2010