diff --git a/docs/todo/flood-protection.txt b/docs/todo/flood-protection.txt new file mode 100644 index 00000000..835ef62b --- /dev/null +++ b/docs/todo/flood-protection.txt @@ -0,0 +1,7 @@ +TODO: flood-protection + Если пир сидит на плохом канале и пробует скачать, у него + ничего не получается, он пробует опять и опять и в итоге + забивает весь канал. Нужно давать ему сколько-то попыток + и временно банить. Соответственно, нужен механизм + временного бана пира. + diff --git a/docs/todo/hbs2-peer-ban-peers-rpc.txt b/docs/todo/hbs2-peer-ban-peers-rpc.txt new file mode 100644 index 00000000..2161ad3a --- /dev/null +++ b/docs/todo/hbs2-peer-ban-peers-rpc.txt @@ -0,0 +1,6 @@ +TODO: hbs2-peer-ban-peers-rpc + Сделать команду и всё остальное для бана пира прямо в рантайме + без перечитывания конфига. + + Нужно для тестирования и выявления проблемы '[warn] lost peer auth' + и окончательного победы над ней, кроме того, ну и так пригодится. diff --git a/docs/todo/new-download-testing.txt b/docs/todo/new-download-testing.txt new file mode 100644 index 00000000..51fb028e --- /dev/null +++ b/docs/todo/new-download-testing.txt @@ -0,0 +1,13 @@ +TODO: new-download-test + 1. Занести единственного пира в whitelist + 2. Скачать большой блок данных + 3. Замерять скорость/наличия провалов в скачивании + 4. Удалить скачанные данные + 5. Удалить whitelist + 6. Удалить скачанный блок + 7. Повторить скачивание + 8. Замерить скорость + 9. По итогам решить, срочно ли нужно + делать алгоритм выбора пиров в Brains + + diff --git a/docs/todo/recursive-block-del.txt b/docs/todo/recursive-block-del.txt new file mode 100644 index 00000000..2390014e --- /dev/null +++ b/docs/todo/recursive-block-del.txt @@ -0,0 +1,4 @@ +TODO: implement-recursive-block-delete + В hbs2 del -r или del --recursure + Смотрит, если это дерево --- то пытается обойти + его рекурсивно и удалить все блоки. diff --git a/hbs2-core/lib/HBS2/Data/Detect.hs b/hbs2-core/lib/HBS2/Data/Detect.hs index 01276884..00815636 100644 --- a/hbs2-core/lib/HBS2/Data/Detect.hs +++ b/hbs2-core/lib/HBS2/Data/Detect.hs @@ -5,15 +5,24 @@ import HBS2.Hash import HBS2.Data.Types import HBS2.Merkle +import HBS2.System.Logger.Simple + +import Data.Foldable (for_) +import Control.Monad.Trans.Maybe import Codec.Serialise (deserialiseOrFail) import Data.ByteString.Lazy (ByteString) import Data.Either import Data.Function import Data.Functor -data BlobType = Merkle (Hash HbSync) +import Data.Maybe +import Control.Concurrent.STM +import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict (HashMap) + +data BlobType = Merkle (MTree [HashRef]) | MerkleAnn (MTreeAnn [HashRef]) - | AnnRef (Hash HbSync) + | AnnRef AnnotatedHashRef | SeqRef SequentialRef | Blob (Hash HbSync) deriving (Show,Data) @@ -23,9 +32,96 @@ tryDetect :: Hash HbSync -> ByteString -> BlobType tryDetect hash obj = rights [mbAnn, mbLink, mbMerkle, mbSeq] & headDef orBlob where - mbLink = deserialiseOrFail @AnnotatedHashRef obj >> pure (AnnRef hash) - mbMerkle = deserialiseOrFail @(MTree [HashRef]) obj >> pure (Merkle hash) + mbLink = deserialiseOrFail @AnnotatedHashRef obj <&> AnnRef + mbMerkle = deserialiseOrFail @(MTree [HashRef]) obj <&> Merkle mbSeq = deserialiseOrFail @SequentialRef obj <&> SeqRef mbAnn = deserialiseOrFail obj <&> MerkleAnn orBlob = Blob hash +data ScanLevel = ScanShallow | ScanDeep + + +-- TODO: control-nesting-level-to-avoid-abuse + +-- TODO: switch-all-merkle-walks-to-deep-scan + +-- TODO: asap-make-it-support-encryption +-- Передавать параметры расшифровки через тайпкласс +-- Сделать реализацию тайпкласса для MonadIO по умолчанию, +-- будет возращать блоки как есть +-- + +deepScan :: MonadIO m + => ScanLevel + -> ( Hash HbSync -> m () ) -- ^ missed block handler + -> Hash HbSync -- ^ root + -> ( Hash HbSync -> m (Maybe ByteString) ) -- ^ block reading function + -> ( Hash HbSync -> m () ) -- ^ sink function + -> m () +deepScan l miss from reader sink = do + tv <- liftIO $ newTVarIO mempty + + deepScan_ tv (HashRef from) + + where + + deepScan_ tv item = do + + here <- reader (fromHashRef item) <&> isJust + + when here do + sink (fromHashRef item) + + void $ runMaybeT $ do + blk <- MaybeT $ reader (fromHashRef item) + + let what = tryDetect (fromHashRef item) blk + + case what of + Blob{} -> pure () + + Merkle t -> do + lift $ walkTree t + + MerkleAnn ann -> case _mtaCrypt ann of + NullEncryption -> do + lift $ walkTree (_mtaTree ann) + + -- FIXME: ASAP-support-encryption + CryptAccessKeyNaClAsymm{} -> do + err "deepScan does not support encryption yet" + pure () + + SeqRef (SequentialRef _ (AnnotatedHashRef ann hx)) -> do + lift $ maybe1 ann (pure ()) sinkDeep + lift $ sinkDeep hx + + AnnRef (AnnotatedHashRef ann hx) -> do + lift $ maybe1 ann (pure ()) sinkDeep + lift $ sinkDeep hx + + where + + deep = case l of + ScanDeep -> True + _ -> False + + sinkDeep h = do + visited <- liftIO $ readTVarIO tv <&> HashMap.member h + unless visited do + liftIO $ atomically $ modifyTVar tv (HashMap.insert h ()) + sinkDeep_ h + + sinkDeep_ h | deep = deepScan_ tv h + | otherwise = walk (fromHashRef h) + + stepInside = \case + Left x -> miss x + Right ( hxx :: [HashRef] ) -> do + for_ hxx sinkDeep + + walkTree t = do + walkMerkleTree t reader stepInside + + walk h = walkMerkle h reader stepInside + diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 340ff9f3..53c5a21b 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -60,10 +60,10 @@ defBlockBanTime :: TimeSpec defBlockBanTime = toTimeSpec defBlockBanTimeSec defBlockPostponeTime :: TimeSpec -defBlockPostponeTime = toTimeSpec ( 60 :: Timeout 'Seconds) +defBlockPostponeTime = toTimeSpec ( 45 :: Timeout 'Seconds) defBlockBanTimeSec :: Timeout 'Seconds -defBlockBanTimeSec = 60 :: Timeout 'Seconds +defBlockBanTimeSec = 30 :: Timeout 'Seconds defBlockWipTimeout :: TimeSpec defBlockWipTimeout = defCookieTimeout @@ -103,4 +103,9 @@ defDownloadFails = 100 defUsefulLimit :: Double defUsefulLimit = 0.25 +defInterBlockDelay :: Timeout 'Seconds +defInterBlockDelay = 0.0085 + +defBlockReqNum :: Integral a => a +defBlockReqNum = 2 diff --git a/hbs2-core/lib/HBS2/Net/IP/Addr.hs b/hbs2-core/lib/HBS2/Net/IP/Addr.hs index 650948de..8275cf62 100644 --- a/hbs2-core/lib/HBS2/Net/IP/Addr.hs +++ b/hbs2-core/lib/HBS2/Net/IP/Addr.hs @@ -44,7 +44,7 @@ instance Serialise IPv6 newtype IPAddrPort e = IPAddrPort (IP, Word16) - deriving (Generic) + deriving stock (Generic,Eq,Ord) instance Serialise (IPAddrPort e) @@ -60,6 +60,13 @@ instance IsString (IPAddrPort e) where where (h,p) = fromMaybe (error "no parse IPAddrPort") (getHostPort (Text.pack s)) +instance FromStringMaybe (IPAddrPort e) where + fromStringMay x = IPAddrPort <$> ( (,) <$> ip <*> fmap fromIntegral po) + where + hp = getHostPort (Text.pack x) + ip = readMay . fst =<< hp + po = snd <$> hp + getHostPort :: Text -> Maybe (String, PortNumber) getHostPort s = parseOnly p s & either (const Nothing) Just where diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 4bcd3cee..28725edd 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -1,4 +1,5 @@ {-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} module HBS2.Net.Messaging.UDP where import HBS2.Clock @@ -58,7 +59,7 @@ instance Pretty (Peer UDP) where makeLenses 'PeerUDP -instance MonadIO m => IsPeerAddr UDP m where +instance (FromStringMaybe (IPAddrPort UDP), MonadIO m) => IsPeerAddr UDP m where type instance PeerAddr UDP = IPAddrPort UDP toPeerAddr p = pure $ fromString $ show $ pretty p diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index 2603b7fa..23e8f72e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -97,64 +97,66 @@ blockChunksProto adapter (BlockChunks c p) = do peer <- thatPeer (Proxy @(BlockChunks e)) auth <- find (KnownPeerKey peer) id <&> isJust - when auth do - case p of + case p of - BlockGetChunks h size n1 num -> do + BlockGetChunks h size n1 num | auth -> do - bsz' <- blkSize adapter h + bsz' <- blkSize adapter h - maybe1 bsz' (pure ()) $ \bsz -> do + maybe1 bsz' (pure ()) $ \bsz -> do - let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] - let offsets = take (fromIntegral num) $ drop (fromIntegral n1) $ zip offsets' [0..] + let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] + let offsets = take (fromIntegral num) $ drop (fromIntegral n1) $ zip offsets' [0..] - -- liftIO $ print $ "sending " <+> pretty (length offsets) - -- <+> "chunks for block" - -- <+> pretty h + -- liftIO $ print $ "sending " <+> pretty (length offsets) + -- <+> "chunks for block" + -- <+> pretty h - -- for_ offsets $ \((o,sz),i) -> deferred proto do - for_ offsets $ \((o,sz),i) -> deferred proto do - -- liftIO $ print $ "send chunk " <+> pretty i <+> pretty sz - chunk <- blkChunk adapter h o sz - maybe (pure ()) (response_ . BlockChunk @e i) chunk + -- for_ offsets $ \((o,sz),i) -> deferred proto do + for_ offsets $ \((o,sz),i) -> deferred proto do + -- liftIO $ print $ "send chunk " <+> pretty i <+> pretty sz + chunk <- blkChunk adapter h o sz + maybe (pure ()) (response_ . BlockChunk @e i) chunk - BlockGetAllChunks h size -> do + BlockGetAllChunks h size | auth -> do - me <- ownPeer @e - who <- thatPeer proto + me <- ownPeer @e + who <- thatPeer proto - bsz' <- blkSize adapter h + bsz' <- blkSize adapter h - maybe1 bsz' (pure ()) $ \bsz -> do + maybe1 bsz' (pure ()) $ \bsz -> do - let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] - let offsets = zip offsets' [0..] + let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] + let offsets = zip offsets' [0..] - -- liftIO $ print $ "sending " <+> pretty (length offsets) - -- <+> "chunks for block" - -- <+> pretty h + -- liftIO $ print $ "sending " <+> pretty (length offsets) + -- <+> "chunks for block" + -- <+> pretty h - for_ offsets $ \((o,sz),i) -> deferred proto do - chunk <- blkChunk adapter h o sz - maybe (pure ()) (response_ . BlockChunk @e i) chunk + for_ offsets $ \((o,sz),i) -> deferred proto do + chunk <- blkChunk adapter h o sz + maybe (pure ()) (response_ . BlockChunk @e i) chunk - BlockChunk n bs -> deferred proto do - who <- thatPeer proto - me <- ownPeer @e - h <- blkGetHash adapter (who, c) + BlockChunk n bs | auth -> deferred proto do + who <- thatPeer proto + me <- ownPeer @e + h <- blkGetHash adapter (who, c) - maybe1 h (response_ (BlockLost @e)) $ \hh -> do - void $ blkAcceptChunk adapter (c, who, hh, n, bs) + maybe1 h (response_ (BlockLost @e)) $ \hh -> do + void $ blkAcceptChunk adapter (c, who, hh, n, bs) - BlockNoChunks {} -> do - -- TODO: notification - pure () + BlockNoChunks {} -> do + -- TODO: notification + pure () - BlockLost{} -> do - liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" - pure () + BlockLost{} -> do + liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" + pure () + + _ -> do + pure () where proto = Proxy @(BlockChunks e) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 3745dc10..f57cec69 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -31,6 +31,7 @@ blockSizeProto :: forall e m . ( MonadIO m -> BlockInfo e -> m () +-- FIXME: with-auth-combinator blockSizeProto getBlockSize evHasBlock = \case GetBlockSize h -> do diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 3ac65fef..1cafd717 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -6,6 +6,7 @@ module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types ) where +import HBS2.Prelude (FromStringMaybe(..)) import HBS2.Clock import Data.Kind @@ -43,13 +44,15 @@ class HasPeerNonce e m where peerNonce :: m PeerNonce - data WithCookie e p = WithCookie (Cookie e) p class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where data family (Peer e) :: Type -class Monad m => IsPeerAddr e m where +class ( FromStringMaybe (PeerAddr e) + , Eq (PeerAddr e) + , Monad m + ) => IsPeerAddr e m where type family PeerAddr e :: Type toPeerAddr :: Peer e -> m (PeerAddr e) diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 11996f2d..d0923d71 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -4,7 +4,6 @@ module BlockDownload where import HBS2.Actors.Peer -import HBS2.Base58 import HBS2.Clock import HBS2.Data.Detect import HBS2.Data.Types.Refs @@ -39,64 +38,56 @@ import Data.IntMap qualified as IntMap import Data.IntSet qualified as IntSet import Data.List qualified as List import Data.Maybe -import Data.Set qualified as Set import Lens.Micro.Platform -import Control.Concurrent +import System.Random (randomRIO) +import System.Random.Shuffle (shuffleM) -getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) -getBlockForDownload = do - q <- asks (view downloadQ) - inq <- asks (view blockInQ) - liftIO $ atomically $ do - h <- readTQueue q - modifyTVar' inq (HashMap.delete h) - pure h - -withBlockForDownload :: forall e m . (MonadIO m, MyPeer e, HasStorage m, HasPeerLocator e m) +getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e) => Peer e - -> BlockDownloadM e m () - -> (Hash HbSync -> BlockDownloadM e m ()) - -> BlockDownloadM e m () - -withBlockForDownload p noBlockAction action = do - -- FIXME: busyloop-e46ad5e0 - -- - sto <- lift getStorage - - h <- getBlockForDownload - - here <- liftIO $ hasBlock sto h <&> isJust + -> BlockDownloadM e m (Maybe (Hash HbSync)) +getBlockForDownload peer = do + pa <- lift $ toPeerAddr peer + tinq <- asks (view blockInQ) brains <- asks (view downloadBrains) + prop <- asks (view blockProposed) - should <- shouldDownloadBlock brains p h + inq <- liftIO $ readTVarIO tinq + let size = HashMap.size inq - if | here -> processBlock h - | should -> onBlockDownloadAttempt brains p h >> action h - | otherwise -> noBlockAction >> addDownload mzero h + if size == 0 then + pure Nothing + else do + i <- randomRIO (0, size - 1) + let blk = HashMap.keys inq !! i + peers <- advisePeersForBlock @e brains blk -addBlockInfo :: (MonadIO m, MyPeer e) - => Peer e - -> Hash HbSync - -> Integer - -> BlockDownloadM e m () + proposed <- liftIO $ Cache.lookup prop (blk, peer) <&> isJust -addBlockInfo pip h size = do - -- debug $ "addBlockInfo" <+> pretty h <+> pretty pip <+> pretty size - tv <- asks (view blockPeers) - let mySize = HashMap.singleton pip size - liftIO $ atomically - $ modifyTVar tv (HashMap.insertWith (<>) h mySize) + r <- if | proposed -> do + pure Nothing -getPeersForBlock :: (MonadIO m, MyPeer e) - => Hash HbSync - -> BlockDownloadM e m [(Peer e, Integer)] + | List.null peers -> do + pure $ Just blk -getPeersForBlock h = do - tv <- asks (view blockPeers) - liftIO $ readTVarIO tv <&> foldMap HashMap.toList - . maybeToList - . HashMap.lookup h + | pa `elem` peers -> do + pure $ Just blk + + | otherwise -> do + newOne <- shouldDownloadBlock @e brains peer blk + let chance = if newOne then 1 else 5 + lucky <- liftIO $ shuffleM (True : replicate chance False) <&> headDef False + if lucky then + pure $ Just blk + else do + pure Nothing + + case r of + Nothing -> none + Just h -> do + liftIO $ Cache.insert prop (h, peer) () + + pure r processBlock :: forall e m . ( MonadIO m , HasStorage m @@ -152,14 +143,15 @@ processBlock h = do addDownload parent (fromHashRef b) - Just (AnnRef h) -> do - addDownload parent h + Just (AnnRef (AnnotatedHashRef ann hx)) -> do + maybe1 ann none $ addDownload parent . fromHashRef + addDownload parent (fromHashRef hx) Just (MerkleAnn ann) -> do - case (_mtaMeta ann) of + case _mtaMeta ann of NoMetaData -> pure () ShortMetadata {} -> pure () - AnnHashRef h -> addDownload parent h + AnnHashRef hx -> addDownload parent hx case (_mtaCrypt ann) of NullEncryption -> pure () @@ -389,6 +381,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , Pretty (Peer e) , Block ByteString ~ ByteString , PeerMessaging e + , IsPeerAddr e m ) => DownloadEnv e -> m () blockDownloadLoop env0 = do @@ -399,6 +392,8 @@ blockDownloadLoop env0 = do pl <- getPeerLocator @e + pause @'Seconds 3.81 + void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do pause @'Seconds 60 debug "I'm peer thread sweeping thread" @@ -424,11 +419,11 @@ blockDownloadLoop env0 = do debug $ "peers to delete" <+> pretty (length r) - for_ r $ delPeerThread . fst + for_ r $ killPeerThread . fst void $ liftIO $ async $ forever $ withPeerM e do - pause @'Seconds 5 - debug "I'm a peer maintaining thread" + pause @'Seconds 1 + -- debug "I'm a peer maintaining thread" brains <- withDownload env0 $ asks (view downloadBrains) pee <- knownPeers @e pl @@ -454,8 +449,7 @@ blockDownloadLoop env0 = do withDownload env0 $ newPeerThread p runPeer | not auth -> do - withDownload env0 $ delPeerThread p - -- pure () + pure () | otherwise -> pure () @@ -508,24 +502,8 @@ blockDownloadLoop env0 = do pause @'Seconds 5 -- FIXME: put to defaults -- we need to show download stats - tinfo <- asks (view blockPeers) - binfo <- liftIO $ readTVarIO tinfo - wip <- asks (view blockWip) - - liftIO $ Cache.purgeExpired wip - - aliveWip <- Set.fromList <$> liftIO (Cache.keys wip) - - let alive = HashMap.fromList [ (h,i) - | (h,i) <- HashMap.toList binfo - , Set.member h aliveWip - ] - - liftIO $ atomically $ writeTVar tinfo alive - - po <- postponedNum - - wipNum <- liftIO $ Cache.size wip + wipNum <- asks (view blockInQ) >>= liftIO . readTVarIO <&> HashMap.size + let po = 0 notice $ "maintain blocks wip" <+> pretty wipNum <+> "postponed" @@ -535,10 +513,12 @@ blockDownloadLoop env0 = do mapM_ processBlock blks - fix \next -> do - pause @'Seconds 30 - debug "I'm a download loop. I don't do anything anymore" - next + proposed <- asks (view blockProposed) + + forever do + pause @'Seconds 20 + debug "block download loop. does not do anything" + liftIO $ Cache.purgeExpired proposed postponedLoop :: forall e m . ( MyPeer e @@ -553,48 +533,34 @@ postponedLoop :: forall e m . ( MyPeer e postponedLoop env0 = do e <- ask - void $ liftIO $ async $ withPeerM e $ withDownload env0 do - forever do - pause @'Seconds 10 - mt <- asks (view downloadQ) >>= liftIO . atomically . isEmptyTQueue - debug $ "queue monitor thread" <+> "EMPTY:" <+> pretty mt + pause @'Seconds 2.57 void $ liftIO $ async $ withPeerM e $ withDownload env0 do - -- wip <- asks (blockWip) >>= liftIO . Cache.keys - wip0 <- asks (view blockWip) >>= liftIO . Cache.keys <&> length - twip <- liftIO $ newTVarIO wip0 - - forever do - pause @'Seconds 10 - wip1 <- asks (view blockWip) >>= liftIO . Cache.keys - wip2 <- liftIO $ readTVarIO twip - trace $ "download stuck check" <+> pretty (length wip1) <+> pretty wip2 - - when (length wip1 == wip2 && not (null wip1)) do - debug "download stuck" - for_ wip1 $ \h -> do - removeFromWip h - addDownload Nothing h - - wip3 <- asks (view blockWip) >>= liftIO . Cache.keys - liftIO $ atomically $ writeTVar twip (length wip3) + q <- asks (view blockDelayTo) + fix \next -> do + w <- liftIO $ atomically $ readTQueue q + pause defInterBlockDelay + addDownload mzero w + -- ws <- liftIO $ atomically $ flushTQueue q + -- for_ (w:ws) $ addDownload mzero + next void $ liftIO $ async $ withPeerM e $ withDownload env0 do forever do - pause @'Seconds 30 + pause @'Seconds 20 trace "UNPOSTPONE LOOP" po <- asks (view blockPostponedTo) >>= liftIO . Cache.toList for_ po $ \(h, _, expired) -> do when (isJust expired) do unpostponeBlock h - peerDownloadLoop :: forall e m . ( MyPeer e , Sessions e (KnownPeer e) m , Request e (BlockInfo e) m , EventListener e (BlockInfo e) m , DownloadFromPeerStuff e m , HasPeerLocator e m + , IsPeerAddr e m , m ~ PeerM e IO ) => Peer e -> BlockDownloadM e m () peerDownloadLoop peer = do @@ -605,6 +571,8 @@ peerDownloadLoop peer = do pe <- lift ask e <- ask + brains <- asks (view downloadBrains) + let doBlockSizeRequest h = do q <- liftIO newTQueueIO lift do @@ -653,21 +621,25 @@ peerDownloadLoop peer = do Right{} -> do trace $ "OK" <+> pretty peer <+> "dowloaded block" <+> pretty h + onBlockDownloaded brains peer h processBlock h liftIO $ atomically do writeTVar downFail 0 modifyTVar downBlk succ - let noBlkAction = do - liftIO yield + let warnExit = warn $ "peer loop exit" <+> pretty peer + -- let stopLoop = none - forever do + idle <- liftIO $ newTVarIO 0 + + fix \next -> do + + let thenNext m = m >> next liftIO do Cache.purgeExpired sizeCache Cache.purgeExpired noBlock - npi <- newPeerInfo auth' <- lift $ find (KnownPeerKey peer) id @@ -678,53 +650,68 @@ peerDownloadLoop peer = do let noAuth = do let authNone = if isNothing auth' then "noauth" else "" warn ( "lost peer auth" <+> pretty peer <+> pretty authNone ) - pause @'Seconds 1 - -- liftIO $ withPeerM pe $ sendPing @e peer - -- -- FIXME: time-hardcode - -- pause @'Seconds 3 - -- found <- lift $ find (KnownPeerKey peer) id <&> isJust - -- unless found do - -- warn ( "peer lost. stopping peer loop" <+> pretty peer ) + warnExit - maybe1 mbauth noAuth $ \(_,_) -> do + maybe1 mbauth noAuth $ \_ -> do - withBlockForDownload peer noBlkAction $ \h -> do - -- TODO: insert-busyloop-counter-for-block-request - -- trace $ "withBlockForDownload" <+> pretty peer <+> pretty h + pt' <- getPeerThread peer - mbSize <- liftIO $ Cache.lookup sizeCache h - noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust + maybe1 pt' warnExit $ \pt -> do - case mbSize of - Just size -> do - trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size - updateBlockPeerSize h peer size - tryDownload pinfo h size + liftIO $ atomically $ modifyTVar (view peerBlocksWip pt) (max 0 . pred) - Nothing | noBlk -> do - trace $ pretty peer <+> "does not have block" <+> pretty h - addDownload mzero h + mbh <- getBlockForDownload peer - Nothing -> do - incBlockSizeReqCount h + case mbh of + Nothing -> thenNext do + idleNum <- liftIO $ atomically $ stateTVar idle $ \x -> (x, succ x) - r <- doBlockSizeRequest h + when (idleNum > 5) do + trace $ "peer IDLE" <+> pretty peer + liftIO $ atomically $ writeTVar idle 0 + x <- lift $ randomRIO (2.85, 10.47) + pause @'Seconds (realToFrac x) - case r of - Left{} -> failedDownload peer h + Just h -> thenNext do - Right Nothing -> do - -- FIXME: non-existent-block-ruins-all - here <- liftIO $ Cache.lookup noBlock h <&> isJust + liftIO $ atomically $ writeTVar idle 0 - unless here $ - liftIO $ Cache.insert noBlock h () + lift $ onBlockDownloadAttempt brains peer h - addDownload mzero h + trace $ "start download block" <+> pretty peer <+> pretty h - Right (Just s) -> do - updateBlockPeerSize h peer s - tryDownload pinfo h s + mbSize <- liftIO $ Cache.lookup sizeCache h + noBlk <- liftIO $ Cache.lookup noBlock h <&> isJust + + case mbSize of + Just size -> do + trace $ "HAS SIZE:" <+> pretty peer <+> pretty h <+> pretty size + tryDownload pinfo h size + + Nothing | noBlk -> do + trace $ pretty peer <+> "does not have block" <+> pretty h + addDownload mzero h + + Nothing -> do + + r <- doBlockSizeRequest h + + case r of + Left{} -> failedDownload peer h + + Right Nothing -> do + here <- liftIO $ Cache.lookup noBlock h <&> isJust + + unless here $ + liftIO $ Cache.insert noBlock h () + + addDownload mzero h + + Right (Just s) -> do + tryDownload pinfo h s + + warnExit + void $ delPeerThreadData peer -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index f6d19b9b..f3d67027 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -12,17 +12,32 @@ import HBS2.System.Logger.Simple import Data.Maybe import Control.Monad +import Control.Exception import Control.Concurrent.STM -import Data.HashMap.Strict +import Control.Concurrent.Async import Lens.Micro.Platform import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict (HashMap) import Data.List qualified as List +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Text.InterpolatedString.Perl6 (qc) +import Database.SQLite.Simple +import Database.SQLite.Simple.FromField +import System.Random (randomRIO) +import Data.Word class HasBrains e a where onKnownPeers :: MonadIO m => a -> [Peer e] -> m () - onBlockDownloadAttempt :: MonadIO m => a -> Peer e -> Hash HbSync -> m () + onBlockDownloadAttempt :: ( MonadIO m + , IsPeerAddr e m + ) + => a + -> Peer e + -> Hash HbSync + -> m () onBlockDownloaded :: MonadIO m => a @@ -53,6 +68,11 @@ class HasBrains e a where -> Hash HbSync -> m Bool + advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e)) + => a + -> Hash HbSync + -> m [PeerAddr e] + type NoBrains = () instance Pretty (Peer e) => HasBrains e NoBrains where @@ -74,6 +94,8 @@ instance Pretty (Peer e) => HasBrains e NoBrains where shouldDownloadBlock _ _ _ = pure True + advisePeersForBlock _ _ = pure mempty + data SomeBrains e = forall a . HasBrains e a => SomeBrains a instance HasBrains e (SomeBrains e) where @@ -84,11 +106,15 @@ instance HasBrains e (SomeBrains e) where claimBlockCameFrom (SomeBrains a) = claimBlockCameFrom @e a shouldPostponeBlock (SomeBrains a) = shouldPostponeBlock @e a shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a + advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a data BasicBrains e = BasicBrains { _brainsPeers :: TVar [Peer e] , _brainsPostponeDown :: TVar (HashMap (Peer e, Hash HbSync) Int ) + , _brainsExpire :: Cache (Hash HbSync) () + , _brainsDb :: Connection + , _brainsPipeline :: TQueue (IO ()) } makeLenses 'BasicBrains @@ -100,28 +126,34 @@ cleanupPostponed b h = do let flt (_,h1) _ = h1 /= h liftIO $ atomically $ modifyTVar po $ HashMap.filterWithKey flt -instance Hashable (Peer e) => HasBrains e (BasicBrains e) where +instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) where onKnownPeers br ps = do - trace "BRAINS: onKnownPeers" + -- trace "BRAINS: onKnownPeers" let tv = view brainsPeers br liftIO $ atomically $ writeTVar tv ps onBlockDownloadAttempt b peer h = do - trace "BRAINS: onBlockDownloadAttempt" - let doAlter = HashMap.alter (maybe (Just 0) (Just . succ)) (peer,h) - liftIO $ atomically $ modifyTVar (view brainsPostponeDown b) doAlter + -- trace "BRAINS: onBlockDownloadAttempt" + noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null + unless noPeers do + let cache = view brainsExpire b + liftIO $ Cache.insert cache h () + let doAlter = HashMap.alter (maybe (Just 0) (Just . succ)) (peer,h) + liftIO $ atomically $ modifyTVar (view brainsPostponeDown b) doAlter onBlockDownloaded b p h = do - trace "BRAINS: onBlockDownloaded" + -- trace $ "BRAINS: onBlockDownloaded" <+> pretty p <+> pretty h cleanupPostponed b h + updateOP b $ insertPeer b h p onBlockPostponed b h = do - trace "BRAINS: onBlockPostponed" + trace $ "BRAINS: onBlockPostponed" <+> pretty h cleanupPostponed b h - claimBlockCameFrom _ _ _ = do - trace "BRAINS: claimBlockCameFrom" + claimBlockCameFrom b f t = do + -- trace $ "BRAINS: claimBlockCameFrom" <+> pretty f <+> pretty t + updateOP b $ insertAncestor b f t shouldPostponeBlock b h = do peers <- liftIO $ readTVarIO (view brainsPeers b) @@ -136,18 +168,202 @@ instance Hashable (Peer e) => HasBrains e (BasicBrains e) where pure postpone shouldDownloadBlock b p h = do + noPeers <- liftIO $ readTVarIO (view brainsPeers b) <&> List.null downs <- liftIO $ readTVarIO (view brainsPostponeDown b) - pure $ HashMap.lookup (p,h) downs & fromMaybe 0 & (<2) + pure $ noPeers || (HashMap.lookup (p,h) downs & fromMaybe 0 & (<2)) + advisePeersForBlock b h = do + r <- liftIO $ findPeers b h + pure $ mapMaybe fromStringMay r + +updateOP :: forall e m . MonadIO m => BasicBrains e -> IO () -> m () +updateOP br op = do + let pip = view brainsPipeline br + liftIO $ atomically $ writeTQueue pip (liftIO op) + +insertAncestor :: BasicBrains e + -> Hash HbSync -- ^ parent + -> Hash HbSync -- ^ child + -> IO () + +insertAncestor br parent child = do + + -- trace $ "INSERT ANCESTOR" <+> pretty parent <+> pretty child + + let conn = view brainsDb br + + void $ liftIO $ execute conn [qc| + insert into ancestors (child, parent) values (?,?) + on conflict (child,parent) do nothing + |] (show $ pretty child, show $ pretty parent) + + +insertPeer :: forall e . Pretty (Peer e) + => BasicBrains e + -> Hash HbSync -- ^ block + -> Peer e -- ^ peer + -> IO () + +insertPeer br blk peer = do + + -- trace $ "INSERT PEER" <+> pretty peer <+> pretty blk + + let conn = view brainsDb br + + void $ liftIO $ execute conn [qc| + insert into seenby (block, peer) values (?,?) + on conflict (block,peer) do nothing + |] (show $ pretty blk, show $ pretty peer) + + +newtype DBData a = DBData { fromDBData :: a } + +instance FromField (DBData (Hash HbSync)) where + fromField = fmap (DBData . fromString) . fromField @String + +getAncestors :: forall e m . (MonadIO m) + => BasicBrains e + -> Hash HbSync + -> m [Hash HbSync] + +getAncestors br child = do + + let conn = view brainsDb br + + let sql = [qc| +WITH RECURSIVE ancestor_list(parent) AS ( + SELECT parent + FROM ancestors + WHERE child = ? + UNION + SELECT a.parent + FROM ancestors a + JOIN ancestor_list al ON a.child = al.parent +) +SELECT parent FROM ancestor_list; +|] + + liftIO $ query conn sql (Only (show $ pretty child) ) + <&> fmap (fromDBData . fromOnly) + + +findPeers :: BasicBrains e + -> Hash HbSync + -> IO [String] + +findPeers br child = do + + let conn = view brainsDb br + + let sql = [qc| +WITH RECURSIVE ancestor_list(parent) AS ( + SELECT parent + FROM ancestors + WHERE child = ? + UNION + SELECT a.parent + FROM ancestors a + JOIN ancestor_list al ON a.child = al.parent +) + +SELECT s.peer + FROM ancestor_list a + JOIN seenby s on s.block = a.parent; +|] + + liftIO $ query conn sql (Only (show $ pretty child) ) <&> fmap fromOnly + + +cleanupHashes :: BasicBrains e + -> IO () + +cleanupHashes br = do + + debug "BRAINS: cleanup caches" + + let conn = view brainsDb br + + let sql = [qc| +SAVEPOINT zzz1; + +DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600; +DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600; + +RELEASE SAVEPOINT zzz1; + + |] + + r <- try $ liftIO $ execute_ conn sql + + case r of + Right{} -> pure () + Left (e :: SomeException) -> err $ "BRAINS: " <+> viaShow e + +transactional :: BasicBrains e -> IO () -> IO () +transactional brains action = do + n <- randomRIO @Word16 (1, maxBound) + let sp = [qc|sp{n}|] :: String + let conn = view brainsDb brains + execute_ conn [qc|SAVEPOINT {sp}|] + try action >>= \case + Right{} -> do + execute_ conn [qc|RELEASE SAVEPOINT {sp}|] + + Left ( _ :: SomeException ) -> do + execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|] + +-- FIXME: eventually-close-db newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => m (BasicBrains e) newBasicBrains = liftIO do + + conn <- open ":memory:" + + execute_ conn [qc| + create table if not exists ancestors + ( child text not null + , parent text not null + , ts DATE DEFAULT (datetime('now','localtime')) + , primary key (child,parent)) + |] + + execute_ conn [qc| + create table if not exists seenby + ( block text not null + , peer text not null + , ts DATE DEFAULT (datetime('now','localtime')) + , primary key (block,peer)) + |] + BasicBrains <$> newTVarIO mempty <*> newTVarIO mempty + <*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds))) + <*> pure conn + <*> newTQueueIO runBasicBrains :: MonadIO m => BasicBrains e -> m () -runBasicBrains brains = forever do - pause @'Seconds 20 - debug "BRAINS!" - pure() +runBasicBrains brains = do + + let pip = view brainsPipeline brains + let expire = view brainsExpire brains + + -- FIXME: async-error-handling + void $ liftIO $ async $ forever do + pause @'Seconds 5 + -- transactional brains do + w <- atomically $ readTQueue pip + ws <- atomically $ flushTQueue pip + transactional brains (sequence_ (w:ws)) + + void $ liftIO $ async $ forever do + pause @'Seconds 60 + updateOP brains (cleanupHashes brains) + + void $ forever do + pause @'Seconds 15 + ee <- liftIO $ Cache.toList expire + let eee = [ h | (h,_,Just{}) <- ee ] + forM_ eee $ \h -> do + cleanupPostponed brains h + liftIO $ Cache.purgeExpired expire diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index dccff01b..abbbee74 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -403,7 +403,7 @@ instance ( Monad m -- runPeer :: forall e . (e ~ UDP, Nonce (RefLogUpdate e) ~ BS.ByteString) => PeerOpts -> IO () -runPeer :: forall e . (e ~ UDP) => PeerOpts -> IO () +runPeer :: forall e . (e ~ UDP, FromStringMaybe (PeerAddr e)) => PeerOpts -> IO () runPeer opts = Exception.handle myException $ do @@ -473,8 +473,6 @@ runPeer opts = Exception.handle myException $ do notice $ "run peer" <+> pretty (AsBase58 (view peerSignPk pc)) - - s <- simpleStorageInit @HbSync (Just pref) let blk = liftIO . hasBlock s @@ -882,7 +880,7 @@ rpcClientMain opt action = do setLoggingOff @DEBUG action -withRPC :: RPCOpt -> RPC UDP -> IO () +withRPC :: FromStringMaybe (PeerAddr UDP) => RPCOpt -> RPC UDP -> IO () withRPC o cmd = rpcClientMain o $ do hSetBuffering stdout LineBuffering @@ -992,7 +990,7 @@ withRPC o cmd = rpcClientMain o $ do void $ waitAnyCatchCancel [mrpc, prpc] -runRpcCommand :: RPCOpt -> RPCCommand -> IO () +runRpcCommand :: FromStringMaybe (IPAddrPort UDP) => RPCOpt -> RPCCommand -> IO () runRpcCommand opt = \case POKE -> withRPC opt RPCPoke PING s _ -> withRPC opt (RPCPing s) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index aba347fb..3790aebf 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -135,28 +135,26 @@ data BlockState = makeLenses 'BlockState -data PeerTask e = DoDownload +newtype PeerTask e = DoDownload (Hash HbSync) + deriving newtype (Pretty) data PeerThread e = PeerThread { _peerThreadAsync :: Async () , _peerThreadMailbox :: TQueue (PeerTask e) + , _peerBlocksWip :: TVar Int } makeLenses 'PeerThread data DownloadEnv e = DownloadEnv - { _downloadQ :: TQueue (Hash HbSync) - , _peerBusy :: TVar (HashMap (Peer e) ()) - , _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) ) - , _blockWip :: Cache (Hash HbSync) () - , _blockState :: TVar (HashMap (Hash HbSync) BlockState) - , _blockInQ :: TVar (HashMap (Hash HbSync) ()) + { _blockInQ :: TVar (HashMap (Hash HbSync) ()) , _peerThreads :: TVar (HashMap (Peer e) (PeerThread e)) - , _blockStored :: Cache (Hash HbSync) () , _blockPostponed :: TVar (HashMap (Hash HbSync) () ) , _blockPostponedTo :: Cache (Hash HbSync) () + , _blockDelayTo :: TQueue (Hash HbSync) + , _blockProposed :: Cache (Hash HbSync, Peer e) () , _downloadBrains :: SomeBrains e } @@ -165,16 +163,12 @@ makeLenses 'DownloadEnv newDownloadEnv :: (MonadIO m, MyPeer e, HasBrains e brains) => brains -> m (DownloadEnv e) newDownloadEnv brains = liftIO do - DownloadEnv <$> newTQueueIO + DownloadEnv <$> newTVarIO mempty <*> newTVarIO mempty <*> newTVarIO mempty - <*> Cache.newCache (Just defBlockWipTimeout) - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> Cache.newCache (Just defBlockWipTimeout) - <*> newTVarIO mempty <*> Cache.newCache (Just defBlockBanTime) + <*> newTQueueIO + <*> Cache.newCache (Just (toTimeSpec (2 :: Timeout 'Seconds))) <*> pure (SomeBrains brains) newtype BlockDownloadM e m a = @@ -190,47 +184,6 @@ newtype BlockDownloadM e m a = withDownload :: (MyPeer e, HasPeerLocator e m, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a withDownload e m = runReaderT ( fromBlockDownloadM m ) e -setBlockState :: MonadIO m => Hash HbSync -> BlockState -> BlockDownloadM e m () -setBlockState h s = do - sh <- asks (view blockState) - liftIO $ atomically $ modifyTVar' sh (HashMap.insert h s) - -setBlockHasSize :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -setBlockHasSize h = do - blk <- fetchBlockState h - liftIO $ atomically $ writeTVar (view bsHasSize blk) True - -fetchBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m BlockState -fetchBlockState h = do - sh <- asks (view blockState) - liftIO do - now <- getTimeCoarse - tvlast <- newTVarIO now - tvreq <- newTVarIO 0 - tvsz <- newTVarIO False - let defState = BlockState now tvreq tvlast tvsz - atomically $ stateTVar sh $ \hm -> case HashMap.lookup h hm of - Nothing -> (defState, HashMap.insert h defState hm) - Just x -> (x, hm) - - -delBlockState :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -delBlockState h = do - sh <- asks (view blockState) - liftIO $ atomically $ modifyTVar' sh (HashMap.delete h) - -incBlockSizeReqCount :: MonadIO m => Hash HbSync -> BlockDownloadM e m () -incBlockSizeReqCount h = do - blk <- fetchBlockState h - now <- liftIO getTimeCoarse - seen <- liftIO $ readTVarIO (view bsLastSeen blk) - let elapsed = realToFrac (toNanoSecs (now - seen)) / 1e9 - noSize <- liftIO $ readTVarIO (view bsHasSize blk) <&> not - - when (elapsed > 1.0 && noSize) do - liftIO $ atomically $ do - writeTVar (view bsLastSeen blk) now - modifyTVar (view bsReqSizeTimes blk) succ isBlockHereCached :: forall e m . ( MyPeer e , MonadIO m @@ -239,27 +192,9 @@ isBlockHereCached :: forall e m . ( MyPeer e => Hash HbSync -> BlockDownloadM e m Bool isBlockHereCached h = do - szcache <- asks (view blockStored) sto <- lift getStorage + liftIO $ hasBlock sto h <&> isJust - cached <- liftIO $ Cache.lookup szcache h - - case cached of - Just{} -> pure True - Nothing -> liftIO do - blk <- hasBlock sto h <&> isJust - when blk $ Cache.insert szcache h () - pure blk - -checkForDownload :: forall e m . ( MyPeer e - , MonadIO m - , HasPeerLocator e (BlockDownloadM e m) - , HasStorage m -- (BlockDownloadM e m) - ) - => ByteString -> BlockDownloadM e m () - -checkForDownload lbs = do - pure () type DownloadConstr e m = ( MyPeer e , MonadIO m @@ -277,42 +212,18 @@ addDownload :: forall e m . ( DownloadConstr e m addDownload mbh h = do tinq <- asks (view blockInQ) - brains <- asks (view downloadBrains) + here <- isBlockHereCached h - postponed <- isPostponed h - - unless postponed do - + if here then do + removeFromWip h + else do maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h - postpone <- shouldPostponeBlock @e brains h - - when postpone do - -- trace $ "addDownload postpone" <+> pretty postpone <+> pretty h + if postpone then do postponeBlock h - - doAdd <- do liftIO $ atomically $ stateTVar tinq - \hm -> case HashMap.lookup h hm of - Nothing -> (True, HashMap.insert h () hm) - Just{} -> (False, HashMap.insert h () hm) - - notHere <- isBlockHereCached h <&> not - - when (doAdd && notHere && not postpone) do - - trace $ "addDownload" <+> pretty h - - q <- asks (view downloadQ) - wip <- asks (view blockWip) - - liftIO do - atomically $ do - modifyTVar tinq $ HashMap.insert h () - writeTQueue q h - - Cache.insert wip h () - + else do + liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h () postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int postponedNum = do @@ -324,14 +235,22 @@ isPostponed h = do po <- asks (view blockPostponed) >>= liftIO . readTVarIO pure $ HashMap.member h po + +delayLittleBit :: forall e m . (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m () +delayLittleBit h = do + q <- asks (view blockDelayTo) + liftIO $ atomically $ writeTQueue q h + postponeBlock :: forall e m . (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m () postponeBlock h = do brains <- asks (view downloadBrains) po <- asks (view blockPostponed) tto <- asks (view blockPostponedTo) + tinq <- asks (view blockInQ) liftIO $ do + liftIO $ atomically $ modifyTVar tinq $ HashMap.delete h already <- atomically $ readTVar po <&> HashMap.member h unless already do atomically $ modifyTVar po (HashMap.insert h ()) @@ -353,16 +272,8 @@ unpostponeBlock h = do removeFromWip :: (MyPeer e, MonadIO m) => Hash HbSync -> BlockDownloadM e m () removeFromWip h = do - wip <- asks (view blockWip) - st <- asks (view blockState) - sz <- asks (view blockPeers) tinq <- asks (view blockInQ) - - liftIO $ Cache.delete wip h - liftIO $ atomically $ do - modifyTVar' st (HashMap.delete h) - modifyTVar' sz (HashMap.delete h) modifyTVar' tinq (HashMap.delete h) hasPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m Bool @@ -370,14 +281,47 @@ hasPeerThread p = do threads <- asks (view peerThreads) liftIO $ readTVarIO threads <&> HashMap.member p - -delPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m () -delPeerThread p = do - debug $ "delPeerThread" <+> pretty p +getPeerThreads :: (MyPeer e, MonadIO m) => BlockDownloadM e m [(Peer e, PeerThread e)] +getPeerThreads = do threads <- asks (view peerThreads) - pt <- liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x - in (t, HashMap.delete p x)) + liftIO $ atomically $ readTVar threads <&> HashMap.toList +getPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e)) +getPeerThread p = do + threads <- asks (view peerThreads) + liftIO $ atomically $ readTVar threads <&> HashMap.lookup p + +getPeerTask :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerTask e)) +getPeerTask p = do + threads <- asks (view peerThreads) + pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p + maybe1 pt' (pure Nothing) $ \pt -> do + liftIO $ atomically $ readTQueue (view peerThreadMailbox pt) <&> Just + +addPeerTask :: (MyPeer e, MonadIO m) + => Peer e + -> PeerTask e + -> BlockDownloadM e m () +addPeerTask p t = do + trace $ "ADD-PEER-TASK" <+> pretty p <+> pretty t + threads <- asks (view peerThreads) + liftIO $ atomically $ do + pt' <- readTVar threads <&> HashMap.lookup p + maybe1 pt' none $ \pt -> do + writeTQueue (view peerThreadMailbox pt) t + modifyTVar (view peerBlocksWip pt) succ + +delPeerThreadData :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m (Maybe (PeerThread e)) +delPeerThreadData p = do + debug $ "delPeerThreadData" <+> pretty p + threads <- asks (view peerThreads) + liftIO $ atomically $ stateTVar threads (\x -> let t = HashMap.lookup p x + in (t, HashMap.delete p x)) + +killPeerThread :: (MyPeer e, MonadIO m) => Peer e -> BlockDownloadM e m () +killPeerThread p = do + debug $ "delPeerThread" <+> pretty p + pt <- delPeerThreadData p maybe1 pt (pure ()) $ liftIO . cancel . view peerThreadAsync newPeerThread :: ( MyPeer e @@ -395,10 +339,23 @@ newPeerThread p m = do void $ lift $ fetch True npi (PeerInfoKey p) id q <- liftIO newTQueueIO - let pt = PeerThread m q + tnum <- liftIO $ newTVarIO 0 + let pt = PeerThread m q tnum threads <- asks (view peerThreads) liftIO $ atomically $ modifyTVar threads $ HashMap.insert p pt +getPeerTaskWip :: ( MyPeer e + , MonadIO m + -- , Sessions e (PeerInfo e) m + -- , Sessions e (PeerInfo e) (BlockDownloadM e m) + ) + => Peer e + -> BlockDownloadM e m Int +getPeerTaskWip p = do + threads <- asks (view peerThreads) + pt' <- liftIO $ atomically $ readTVar threads <&> HashMap.lookup p + maybe1 pt' (pure 0) $ \pt -> do + liftIO $ readTVarIO (view peerBlocksWip pt) failedDownload :: forall e m . ( MyPeer e , MonadIO m @@ -416,24 +373,6 @@ failedDownload p h = do -- FIXME: brains-download-fail -updateBlockPeerSize :: forall e m . (MyPeer e, MonadIO m) - => Hash HbSync - -> Peer e - -> Integer - -> BlockDownloadM e m () - -updateBlockPeerSize h p s = do - tv <- asks (view blockPeers) - - setBlockHasSize h - - let alt = \case - Nothing -> Just $ HashMap.singleton p s - Just hm -> Just $ HashMap.insert p s hm - - liftIO $ atomically $ modifyTVar tv (HashMap.alter alt h) - - forKnownPeers :: forall e m . ( MonadIO m , HasPeerLocator e m , Sessions e (KnownPeer e) m diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index c3abfb4a..40487a9e 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -148,8 +148,9 @@ reflogWorker conf adapter = do SeqRef (SequentialRef _ (AnnotatedHashRef _ ref)) -> do liftIO $ reflogDownload adapter (fromHashRef ref) - AnnRef ref -> do - liftIO $ reflogDownload adapter ref + -- TODO: asap-download-annotation-as-well + AnnRef (AnnotatedHashRef _ ref) -> do + liftIO $ reflogDownload adapter (fromHashRef ref) -- TODO: support-other-data-structures _ -> pure () diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 40eba0db..bd3a28da 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -45,6 +45,7 @@ common common-deps , split , stm , streaming + , sqlite-simple , temporary , text , timeit diff --git a/hbs2/Main.hs b/hbs2/Main.hs index cc967bd9..8ae9cb13 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -9,7 +9,6 @@ import HBS2.Net.Auth.AccessKey import HBS2.Net.Auth.Credentials import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Proto.Definition() -import HBS2.Net.Proto.Types import HBS2.Prelude.Plated import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra @@ -17,36 +16,31 @@ import HBS2.OrDie import HBS2.Net.Proto.ACB -import Control.Arrow ((&&&)) +import HBS2.System.Logger.Simple hiding (info) +import HBS2.System.Logger.Simple qualified as Log + import Control.Concurrent.Async +import Control.Concurrent.STM import Control.Monad -import Control.Monad.IO.Class import Control.Monad.Trans.Maybe -import Control.Monad.Trans.State.Strict import Crypto.Saltine.Core.Box qualified as Encrypt import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Lazy (ByteString) -import Data.Either import Data.Function import Data.Functor import Data.List qualified as List import Data.Map.Strict qualified as Map import Data.Monoid qualified as Monoid -import Data.Text (Text) import Data.Set qualified as Set -import Data.UUID qualified as UUID -import Data.UUID.V4 qualified as UUID import Options.Applicative -import Prettyprinter import System.Directory import Data.Maybe import Lens.Micro.Platform -- import System.FilePath.Posix import System.IO import System.Exit -import System.ProgressBar import Codec.Serialise @@ -54,6 +48,24 @@ import Streaming.Prelude qualified as S -- import Streaming qualified as S +logPrefix s = set loggerTr (s <>) + +tracePrefix :: SetLoggerEntry +tracePrefix = logPrefix "[trace] " + +debugPrefix :: SetLoggerEntry +debugPrefix = logPrefix "[debug] " + +errorPrefix :: SetLoggerEntry +errorPrefix = logPrefix "[error] " + +warnPrefix :: SetLoggerEntry +warnPrefix = logPrefix "[warn] " + +noticePrefix :: SetLoggerEntry +noticePrefix = logPrefix "[notice] " + + newtype CommonOpts = CommonOpts { _coPref :: Maybe StoragePrefix @@ -142,7 +154,7 @@ runCat opts ss = do liftIO $ do - let walk h = walkMerkle h (getBlock ss) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do + let stepInside hr = case hr of Left hx -> void $ hPrint stderr $ "missed block:" <+> pretty hx Right (hrr :: [HashRef]) -> do @@ -155,6 +167,9 @@ runCat opts ss = do Nothing -> die $ show $ "missed block: " <+> pretty hx Just blk -> LBS.putStr blk + let walk h = walkMerkle h (getBlock ss) stepInside + + -- FIXME: switch-to-deep-scan -- TODO: to-the-library let walkAnn :: MTreeAnn [HashRef] -> IO () walkAnn ann = do @@ -202,7 +217,8 @@ runCat opts ss = do case q of Blob h -> getBlock ss h >>= maybe (die "blob not found") LBS.putStr - Merkle h -> walk h + Merkle t -> walkMerkleTree t (getBlock ss) stepInside + MerkleAnn ann -> walkAnn ann -- FIXME: what-if-multiple-seq-ref-? @@ -483,6 +499,15 @@ runRefLogGet s ss = do withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () withStore opts f = do + + setLogging @DEBUG debugPrefix + setLogging @INFO defLog + setLogging @ERROR errorPrefix + setLogging @WARN warnPrefix + setLogging @NOTICE noticePrefix + + setLoggingOff @TRACE + xdg <- getXdgDirectory XdgData defStorePath <&> fromString let pref = uniLastDef xdg opts :: StoragePrefix @@ -496,6 +521,13 @@ withStore opts f = do _ <- waitAnyCatch w + setLoggingOff @DEBUG + setLoggingOff @INFO + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + pure () main :: IO () @@ -511,7 +543,8 @@ main = join . customExecParser (prefs showHelpOnError) $ <> command "cat" (info pCat (progDesc "cat block")) <> command "hash" (info pHash (progDesc "calculates hash")) <> command "fsck" (info pFsck (progDesc "check storage constistency")) - <> command "del" ( info pDel (progDesc "del block")) + <> command "deps" ( info pDeps (progDesc "print dependencies")) + <> command "del" ( info pDel (progDesc "del block")) <> command "keyring-new" (info pNewKey (progDesc "generates a new keyring")) <> command "keyring-list" (info pKeyList (progDesc "list public keys from keyring")) <> command "keyring-key-add" (info pKeyAdd (progDesc "adds a new keypair into the keyring")) @@ -626,12 +659,53 @@ main = join . customExecParser (prefs showHelpOnError) $ forM_ rs $ \(h,f) -> do print $ fill 24 (pretty f) <+> pretty h - - -- TODO: reflog-del-command-- TODO: reflog-del-command - pDel = do + pDeps = do o <- common h <- strArgument ( metavar "HASH" ) + pure $ withStore o $ \sto -> do - delBlock sto h + deepScan ScanDeep (const none) h (getBlock sto) $ \ha -> do + print $ pretty ha + -- TODO: reflog-del-command-- TODO: reflog-del-command + pDel = do + o <- common + recurse <- optional (flag' True ( short 'r' <> long "recursive" <> help "try to delete all blocks recursively" ) + ) <&> fromMaybe False + + dontAsk <- optional ( flag' True ( short 'y' <> long "yes" <> help "don't ask permission to delete block") + ) <&> fromMaybe False + + h <- strArgument ( metavar "HASH" ) + + pure $ withStore o $ \sto -> do + + setLogging @TRACE tracePrefix + hSetBuffering stdin NoBuffering + + q <- liftIO newTQueueIO + + if not recurse then + liftIO $ atomically $ writeTQueue q h + else do + -- hPrint stderr $ "recurse" <+> pretty h + deepScan ScanDeep (const none) h (getBlock sto) $ \ha -> do + liftIO $ atomically $ writeTQueue q ha + + deps <- liftIO $ atomically $ flushTQueue q + + forM_ deps $ \d -> do + doDelete <- if dontAsk then do + pure True + else do + hPutStr stderr $ show $ "Are you sure to delete block" <+> pretty d <+> "[y/n]: " + y <- getChar + hPutStrLn stderr "" + pure $ y `elem` ['y','Y'] + + when doDelete do + delBlock sto d + hFlush stderr + print $ "deleted" <+> pretty d + hFlush stdout diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index 1fd3f006..ec761066 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -90,6 +90,7 @@ executable hbs2 , uniplate , uuid , terminal-progress-bar + , stm hs-source-dirs: . default-language: Haskell2010