From 72d0c8222cb7c27fac5a92390e92115ef5dacea1 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 14 Jul 2023 19:08:14 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Actors/Peer.hs | 4 + hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 45 +++++- hbs2-peer/app/PeerMain.hs | 5 +- hbs2-peer/app/PeerTypes.hs | 15 ++ hbs2-peer/app/RefChan.hs | 193 +++++++++++++++++++++++- 5 files changed, 243 insertions(+), 19 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index e0b0e475..7a61b374 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -36,12 +36,14 @@ import Data.HashMap.Strict qualified as HashMap import Control.Concurrent.STM.TVar import Control.Concurrent.STM import Data.Hashable (hash) +import UnliftIO (MonadUnliftIO(..)) import Codec.Serialise (serialise, deserialiseOrFail) import Prettyprinter hiding (pipe) + data AnyStorage = forall zu . ( Block ByteString ~ ByteString , Storage zu HbSync ByteString IO ) => AnyStorage zu @@ -156,6 +158,7 @@ newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } , Monad , MonadReader (PeerEnv e) , MonadIO + , MonadUnliftIO ) @@ -166,6 +169,7 @@ newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m , MonadReader (ResponseEnv e) , MonadIO , MonadTrans + , MonadUnliftIO ) newtype ResponseEnv e = diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 4c235762..e268593c 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -4,7 +4,7 @@ module HBS2.Net.Proto.RefChan where import HBS2.Prelude.Plated --- import HBS2.Hash +import HBS2.Hash -- import HBS2.Clock import HBS2.Net.Proto import HBS2.Net.Auth.Credentials @@ -28,6 +28,8 @@ import Data.Maybe import Data.Text qualified as Text import Lens.Micro.Platform +import Data.Hashable hiding (Hashed) + {- HLINT ignore "Use newtype instead of data" -} type RefChanId e = PubKey 'Sign (Encryption e) @@ -61,10 +63,35 @@ type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) instance ForRefChans e => Serialise (RefChanHeadBlock e) instance ForRefChans e => Serialise (SignedBox p e) + + +newtype RefChanHeadKey s = RefChanHeadKey (PubKey 'Sign s) + +deriving stock instance IsRefPubKey s => Eq (RefChanHeadKey s) + +instance IsRefPubKey s => Hashable (RefChanHeadKey s) where + hashWithSalt s k = hashWithSalt s (hashObject @HbSync k) + +instance IsRefPubKey s => Hashed HbSync (RefChanHeadKey s) where + hashObject (RefChanHeadKey pk) = hashObject ("refchanhead|" <> serialise pk) + +instance IsRefPubKey s => FromStringMaybe (RefChanHeadKey s) where + fromStringMay s = RefChanHeadKey <$> fromStringMay s + +instance IsRefPubKey s => IsString (RefChanHeadKey s) where + fromString s = fromMaybe (error "bad public key base58") (fromStringMay s) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (AsBase58 (RefChanHeadKey s)) where + pretty (AsBase58 (RefChanHeadKey k)) = pretty (AsBase58 k) + +instance Pretty (AsBase58 (PubKey 'Sign s )) => Pretty (RefChanHeadKey s) where + pretty (RefChanHeadKey k) = pretty (AsBase58 k) + + -- блок головы может быть довольно большой. -- поэтому посылаем его, как merkle tree newtype RefChanHeadBlockTran e = - RefChanHeadBlockTran HashRef + RefChanHeadBlockTran { fromRefChanHeadBlockTran :: HashRef } deriving stock (Generic) instance Serialise (RefChanHeadBlockTran e) @@ -78,7 +105,7 @@ instance ForRefChans e => Serialise (RefChanHead e) data RefChanHeadAdapter e m = RefChanHeadAdapter - { _refChanHeadOnHead :: RefChanHeadBlockTran e -> m () + { refChanHeadOnHead :: RefChanHeadBlockTran e -> m () } refChanHeadProto :: forall e s m . ( MonadIO m @@ -107,9 +134,9 @@ refChanHeadProto self adapter msg = do guard (auth || self) case msg of - RefChanHead pkt _ -> do - trace $ "RefChanHead" <+> pretty self - pure () + RefChanHead chan pkt -> do + trace $ "RefChanHead" <+> pretty self <+> pretty (AsBase58 chan) + lift $ refChanHeadOnHead adapter pkt RefChanGetHead _ -> do -- прочитать ссылку @@ -130,7 +157,7 @@ makeSignedBox pk sk msg = SignedBox @p @e pk bs sign unboxSignedBox :: forall p e . (Serialise p, ForRefChans e, Signatures (Encryption e)) => LBS.ByteString - -> Maybe p + -> Maybe (PubKey 'Sign (Encryption e), p) unboxSignedBox bs = runIdentity $ runMaybeT do @@ -140,7 +167,9 @@ unboxSignedBox bs = runIdentity $ runMaybeT do guard $ verifySign @(Encryption e) pk sign bs - MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just + p <- MaybeT $ pure $ deserialiseOrFail @p (LBS.fromStrict bs) & either (const Nothing) Just + + pure (pk, p) instance ForRefChans e => FromStringMaybe (RefChanHeadBlock e) where fromStringMay str = RefChanHeadBlockSmall <$> version diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 01b14495..b1f6b7c8 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -559,10 +559,10 @@ runPeer opts = U.handle (\e -> myException e pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache - rce <- refChanWorkerEnv conf + rce <- refChanWorkerEnv conf denv let refChanHeadAdapter = RefChanHeadAdapter - { _refChanHeadOnHead = dontHandle + { refChanHeadOnHead = refChanOnHead rce } let pexFilt pips = do @@ -785,7 +785,6 @@ runPeer opts = U.handle (\e -> myException e peerThread "reflogWorker" (reflogWorker @e conf rwa) - -- FIXME: reflogWorker-env peerThread "refChanWorker" (refChanWorker @e rce) peerThread "ping pong" $ forever $ do diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 9aa36b5a..4a1c4310 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -345,6 +345,21 @@ failedDownload p h = do addDownload mzero h -- FIXME: brains-download-fail +broadCastMessage :: forall e p m . ( MonadIO m + , MyPeer e + , HasPeerLocator e m + , HasProtocol e p + , Request e p m + , Sessions e (KnownPeer e) m + ) + => p -> m () + +broadCastMessage msg = do + -- TODO: broadcast-reflog-update + trace "broadCastMessage" + forKnownPeers $ \pip _ -> do + trace $ "send msg to peer" <+> pretty pip + request @e pip msg forKnownPeers :: forall e m . ( MonadIO m , HasPeerLocator e m diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index c6758e53..1c5defda 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -1,43 +1,220 @@ {-# Language AllowAmbiguousTypes #-} -module RefChan where +{-# Language TemplateHaskell #-} +module RefChan ( + RefChanWorkerEnv(..) + , refChanWorkerEnvHeadQ + , refChaWorkerEnvDownload + , refChanOnHead + , refChanWorker + , refChanWorkerEnv + ) where import HBS2.Prelude.Plated import HBS2.Actors.Peer import HBS2.Base58 import HBS2.Clock +import HBS2.Data.Detect import HBS2.Data.Types.Refs import HBS2.Net.Auth.Credentials +import HBS2.Net.Proto import HBS2.Net.Proto.RefChan import HBS2.Net.Proto.Types +import HBS2.Storage import HBS2.System.Logger.Simple import PeerTypes import PeerConfig +import BlockDownload -import Control.Monad +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import UnliftIO +import Lens.Micro.Platform +import Control.Monad.Except (throwError, runExceptT) +import Data.Maybe +import Control.Exception () +import Control.Monad.Trans.Maybe +import Streaming.Prelude qualified as S +import Streaming qualified as S -data RefChanWorkerEnv e = RefChanWorkerEnv +{- HLINT ignore "Use newtype instead of data" -} + +data DataNotReady = DataNotReady deriving (Show) + +instance Exception DataNotReady + +data RefChanWorkerEnv e = + RefChanWorkerEnv + { _refChanWorkerEnvDownload :: DownloadEnv e + , _refChanWorkerEnvHeadQ :: TQueue (RefChanHeadBlockTran e) + , _refChaWorkerEnvDownload :: TVar (HashMap HashRef ()) -- таймстемп можно + } + +makeLenses 'RefChanWorkerEnv refChanWorkerEnv :: forall m e . MonadIO m => PeerConfig + -> DownloadEnv e -> m (RefChanWorkerEnv e) -refChanWorkerEnv _ = pure $ RefChanWorkerEnv @e +refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO + <*> newTVarIO mempty -refChanWorker :: forall e s m . ( MonadIO m, MyPeer e + +refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanHeadBlockTran e -> m () +refChanOnHead env tran = do + atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) tran + +refChanAddDownload :: forall e m . ( m ~ PeerM e IO + , MyPeer e + , Block ByteString ~ ByteString + ) + => RefChanWorkerEnv e -> HashRef -> m () +refChanAddDownload env r = do + penv <- ask + withPeerM penv $ withDownload (_refChanWorkerEnvDownload env) + $ processBlock @e (fromHashRef r) + + atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r ()) + +checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool +checkDownloaded hr = do + sto <- getStorage + let readBlock h = liftIO $ getBlock sto h + result <- runExceptT $ deepScan ScanDeep (const $ throwError DataNotReady) (fromHashRef hr) readBlock dontHandle + pure $ either (const False) (const True) result + + +-- FIXME: move-to-library +readBlob :: forall m . ( MonadIO m + , HasStorage m + , Block ByteString ~ ByteString + ) + => HashRef + -> m (Maybe ByteString) + +readBlob hr = do + sto <- getStorage + let readBlock h = liftIO $ getBlock sto h + + chunks <- S.toList_ $ + deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do + unless (fromHashRef hr == ha) do + readBlock ha >>= S.yield + + let mfo acc el = case (acc, el) of + (Nothing, Just s) -> Just [s] + (_, Nothing) -> Nothing + (Just ss, Just s) -> Just (s:ss) + + pure $ LBS.concat . reverse <$> foldl mfo Nothing chunks + +refChanWorker :: forall e s m . ( MonadIO m + , MonadUnliftIO m + , MyPeer e , HasStorage m , Signatures s , s ~ Encryption e , IsRefPubKey s , Pretty (AsBase58 (PubKey 'Sign s)) + , Block ByteString ~ ByteString + , ForRefChans e + , m ~ PeerM e IO ) => RefChanWorkerEnv e -> m () -refChanWorker _ = forever do - pause @'Seconds 10 - debug "I'm refchan worker" +refChanWorker env = do + + hw <- async refChanHeadMon + downloads <- async monitorDownloads + + forever do + pause @'Seconds 10 + debug "I'm refchan worker" + + mapM_ wait [hw,downloads] + + where + + monitorDownloads = forever do + pause @'Seconds 2 + all <- atomically $ readTVar (view refChaWorkerEnvDownload env) <&> HashMap.keys + + -- FIXME: consider-timeouts-or-leak-is-possible + rest <- forM all $ \r -> do + here <- checkDownloaded r + if here then do + refChanOnHead env (RefChanHeadBlockTran r) + pure mempty + else do + pure [(r,())] + + atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest)) + + -- FIXME: in-parallel? + refChanHeadMon = do + forever do + RefChanHeadBlockTran hr <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env) + -- debug $ "DROP HEAD UPDATE" <+> pretty (fromRefChanHeadBlockTran tran) + + here <- checkDownloaded hr + + if not here then do + refChanAddDownload env hr + trace $ "BLOCK IS NOT HERE" <+> pretty hr + else do + trace $ "BLOCK IS HERE" <+> pretty hr + -- читаем блок + lbs <- readBlob hr <&> fromMaybe mempty + let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs + + case what of + Nothing -> err $ "malformed head block" <+> pretty hr + + Just (pk,blk) -> do + let rkey = RefChanHeadKey @s pk + + sto <- getStorage + + debug $ "Good head block" <+> pretty hr <+> "processing..." + + ourVersion <- runMaybeT do + + + cur <- MaybeT $ liftIO $ getRef sto rkey + + lbss <- MaybeT $ readBlob (HashRef cur) + + (_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss + + pure $ view refChanHeadVersion blkOur + + let v0 = fromMaybe 0 ourVersion + let v1 = view refChanHeadVersion blk + + if v1 > v0 then do + debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0) + liftIO $ updateRef sto rkey (fromHashRef hr) + else do + debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0) + + pure () + -- распаковываем блок + -- вытаскиваем ключ из блока? + + pure () + + -- если всё скачано --- то обрабатываем. + -- если не скачано -- то говорим качать и ждём. как ждём? + -- помещаем в фигню, которая download запускает, и время от времени ждёт, + -- пока скачается. как скачается -- убирает из своего локального стейта, + -- и пихает транзу обратно в эту очередь, допустим. +