From a5628a19ed7408bade3d6ffcb8583a06a3896168 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 20 Sep 2023 05:45:08 +0300 Subject: [PATCH] bundle-basic-implementation --- .fixme/config | 1 + .fixme/log | 14 ++++- docs/notes/bundles.txt | 61 ++++++++++++++++++++++ hbs2-core/lib/HBS2/Data/Bundle.hs | 56 +++++++++++++++----- hbs2-core/lib/HBS2/Data/Types/SignedBox.hs | 1 + hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 1 + hbs2-peer/app/BlockDownload.hs | 45 ++++++++++++++-- hbs2-peer/app/BlockHttpDownload.hs | 2 + hbs2-peer/app/DownloadMon.hs | 44 ++++++++++++++++ hbs2-peer/app/PeerTypes.hs | 48 ++++++++++++++++- hbs2-peer/app/RefChan.hs | 13 ----- hbs2-peer/hbs2-peer.cabal | 1 + hbs2/Main.hs | 22 ++++++++ 13 files changed, 274 insertions(+), 35 deletions(-) create mode 100644 docs/notes/bundles.txt create mode 100644 hbs2-peer/app/DownloadMon.hs diff --git a/.fixme/config b/.fixme/config index 0c2d92c8..bacbca51 100644 --- a/.fixme/config +++ b/.fixme/config @@ -13,6 +13,7 @@ fixme-files docs/pep*.txt fixme-files docs/drafts/**/*.txt fixme-files docs/pr/**/*.txt fixme-files docs/todo/**/*.txt +fixme-files docs/notes/**/*.txt fixme-files-ignore .direnv/** dist-newstyle/** diff --git a/.fixme/log b/.fixme/log index bf7ca57e..3eeb29e2 100644 --- a/.fixme/log +++ b/.fixme/log @@ -1,3 +1,13 @@ +(fixme-set "assigned" "voidlizard" "CmfGGmDAuC") +(fixme-set "assigned" "voidlizard" "4Bm5kS8t54" ) +(fixme-set "assigned" "voidlizard" "8i2gUFTTnH" ) +(fixme-set "assigned" "voidlizard" "CPhvijEXN2" ) -(fixme-set "assigned" "HPoqtobDAT" "voidlizard") -(fixme-set "workflow" "test" "HPoqtobDAT") \ No newline at end of file + +(fixme-set "assigned" "voidlizard" "3HwTgQQXvC") +(fixme-set "workflow" "test" "CmfGGmDAuC") +(fixme-set "workflow" "test" "3HwTgQQXvC") +(fixme-set "workflow" "test" "4Bm5kS8t54") +(fixme-set "workflow" "test" "8i2gUFTTnH") +(fixme-set "workflow" "test" "CPhvijEXN2") +(fixme-set "workflow" "backlog" "6WZaH3NXuH") \ No newline at end of file diff --git a/docs/notes/bundles.txt b/docs/notes/bundles.txt new file mode 100644 index 00000000..42eee39a --- /dev/null +++ b/docs/notes/bundles.txt @@ -0,0 +1,61 @@ + +Базовая реализация бандлов + +Основная идея: + +Берём список HashRef, каждый HashRef может ссылаться +на какие-то объекты и создаем журнал (замыкание), +куда складываем все нужные объекты. + +Это может быть нужно для ускорения передачи +большого числа маленьких объектов там, где это +нужно. + +Минусы: создаются объекты-обёртки, +дублируются данные в бандлах и БД. + +Решение: + +TODO: introduce-bundle-ttl + удалять бандлы и BundleRefValue, + которые никто не трогал дольше, чем + ttl. + +Делаем bundle: + +``` + hbs2 bundle create < bundle-refs + + +``` +В bundle-refs список хэшей (HashRef) в текстовом виде + + +Делаем ссылку на bundle: + +``` + hbs2 bundle create-ref -k some.key + +``` + +Создаем объект типа BundleRefValue и подписывает его +неким ключом. + +Идея в том, что даже если мы не создатели Bundle и BundleRefValue, +мы можем его сохранять и распространять, сохраняя оригинальную +подпись. + +Клиенты уже сами решают, доверяют они ключу подписанта или нет. + +Распространение bundle: + +``` + hbs2-peer announce + +``` + +Теперь те, кто вообще слушают анонсы от пира --- получат +блок BundleRefValue, проверят подпись, поставят Bundle +на скачивание, и когда он скачается - импортируют объекты +из него. + diff --git a/hbs2-core/lib/HBS2/Data/Bundle.hs b/hbs2-core/lib/HBS2/Data/Bundle.hs index 82c7702d..49b1c63b 100644 --- a/hbs2-core/lib/HBS2/Data/Bundle.hs +++ b/hbs2-core/lib/HBS2/Data/Bundle.hs @@ -7,10 +7,14 @@ import HBS2.Storage import HBS2.Storage.Operations import HBS2.Hash import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox +import HBS2.Net.Proto.Types +import HBS2.Net.Auth.Credentials import HBS2.Data.Detect import Data.Word +import Data.Function import Codec.Compression.GZip as GZip import Codec.Serialise import Control.Monad @@ -20,12 +24,33 @@ import Data.ByteString.Lazy.Char8 qualified as LBS import Data.Functor import Data.List qualified as List import Data.Either +import Data.Maybe import Streaming.Prelude qualified as S import Streaming() {- HLINT ignore "Use newtype instead of data" -} +data BundleRefValue e = + BundleRefValue (SignedBox BundleRef e) + deriving stock (Generic) + +instance ForSignedBox e => Serialise (BundleRefValue e) + +data BundleRef = + BundleRefSimple HashRef + deriving stock (Generic) + +instance Serialise BundleRef + + +makeBundleRefValue :: forall e . (ForSignedBox e, Signatures (Encryption e)) + => PubKey 'Sign (Encryption e) + -> PrivKey 'Sign (Encryption e) + -> BundleRef + -> BundleRefValue e + +makeBundleRefValue pk sk ref = BundleRefValue $ makeSignedBox @e pk sk ref -- у нас может быть много способов хранить данные: -- сжимать целиком (эффективно, но медленно) @@ -80,29 +105,32 @@ createBundle :: ( MonadIO m -> [HashRef] -> m (Maybe HashRef) -createBundle sto refs = runMaybeT do - -- читать блок из сторейджа - blocks <- forM refs $ \href -> do - blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href) +createBundle sto refs = do + let readBlock = liftIO . getBlock sto - let compressed = compressWith params blk - let size = LBS.length compressed - let section = BundleSection (fromIntegral size) (Just href) + -- FIXME: handle-errors-on-missed-blocks + blocks <- S.toList_ $ forM_ refs $ \hr -> do + deepScan ScanDeep (const none) (fromHashRef hr) readBlock $ \ha -> do + blk' <- readBlock ha + let href = HashRef ha + maybe1 blk' none $ \blk -> do + let compressed = compressWith params blk + let size = LBS.length compressed + let section = BundleSection (fromIntegral size) (Just href) - let sbs = serialise section - let pad = sectionHeadSize - LBS.length sbs - let pads = LBS.replicate pad '\x0' - - pure (sbs <> pads <> compressed) + let sbs = serialise section + let pad = sectionHeadSize - LBS.length sbs + let pads = LBS.replicate pad '\x0' + S.yield (sbs <> pads <> compressed) let buHead = serialise (BundleHeadSimple sectionHeadSize) let buPadded = buHead <> LBS.replicate (bundleHeadSize - LBS.length buHead) '\x0' let blob = buPadded <> mconcat blocks + -- FIXME: streamed-write-as-merkle wtf <- liftIO $ writeAsMerkle sto blob - - pure (HashRef wtf) + pure $ Just (HashRef wtf) where params = defaultCompressParams { compressLevel = bestSpeed } diff --git a/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs b/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs index c8c4b0f9..c50bb115 100644 --- a/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs +++ b/hbs2-core/lib/HBS2/Data/Types/SignedBox.hs @@ -33,6 +33,7 @@ instance ( Eq (PubKey 'Sign (Encryption e)) type ForSignedBox e = ( Serialise ( PubKey 'Sign (Encryption e)) , FromStringMaybe (PubKey 'Sign (Encryption e)) , Serialise (Signature (Encryption e)) + , Signatures (Encryption e) , Hashable (PubKey 'Sign (Encryption e)) ) diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index ead0d71b..ab7fed9e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -69,6 +69,7 @@ makeLenses 'RefChanHeadBlockSmall type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e)) , Pretty (AsBase58 (PubKey 'Sign (Encryption e))) , FromStringMaybe (PubKey 'Sign (Encryption e)) + , Signatures (Encryption e) , Serialise (Signature (Encryption e)) , Hashable (PubKey 'Sign (Encryption e)) ) diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 1f4acf23..7eaedf38 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -1,12 +1,13 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} {-# Language UndecidableInstances #-} -{-# Language MultiWayIf #-} module BlockDownload where import HBS2.Actors.Peer import HBS2.Clock import HBS2.Data.Detect import HBS2.Data.Types.Refs +import HBS2.Data.Bundle +import HBS2.Data.Types.SignedBox import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -24,6 +25,7 @@ import HBS2.System.Logger.Simple import PeerTypes import PeerInfo import Brains +import DownloadMon import Control.Concurrent.Async import Control.Concurrent.STM @@ -45,6 +47,7 @@ import Data.Maybe import Lens.Micro.Platform import System.Random (randomRIO) import System.Random.Shuffle (shuffleM) +import Codec.Serialise getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e, HasStorage m) => Peer e @@ -91,6 +94,7 @@ getBlockForDownload peer = do processBlock :: forall e m . ( MonadIO m , HasStorage m , MyPeer e + , ForSignedBox e , HasPeerLocator e (BlockDownloadM e m) ) => Hash HbSync @@ -104,14 +108,15 @@ processBlock h = do let parent = Just h - bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) + block <- liftIO $ getBlock sto h + + let bt = tryDetect h <$> block -- FIXME: если блок нашёлся, то удаляем его из wip when (isJust bt) (removeFromWip h) - let handleHrr = \(hrr :: Either (Hash HbSync) [HashRef]) -> do - + let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do case hrr of Left hx -> addDownload parent hx Right hr -> do @@ -163,8 +168,36 @@ processBlock h = do walkMerkle h (liftIO . getBlock sto) handleHrr Just (Blob{}) -> do + -- NOTE: bundle-ref-detection-note + -- добавлять обработку BundleRefValue в tryDetect + -- слишком накладно, т.к. требует большого количества + -- констрейнтов, которые не предполагались там + -- изначально. Как временная мера -- пробуем Bundle + -- обнаруживать здесь. + mon <- asks (view downloadMon) + runMaybeT do + bs <- MaybeT $ pure block + + -- TODO: check-if-we-somehow-trust-this-key + (pk, BundleRefSimple ref) <- MaybeT $ pure $ deserialiseOrFail @(BundleRefValue e) bs + & either (const Nothing) unboxBundleRef + + debug $ "GOT BundleRefValue" <+> parens (pretty ref) + + downloadMonAdd mon ref do + debug $ "Downloaded bundle:" <+> pretty ref + r <- importBundle sto (void . putBlock sto . snd) ref + case r of + Right{} -> debug $ "Imported bundle: " <+> pretty ref + Left e -> err (viaShow e) + + lift $ addDownload parent (fromHashRef ref) + pure () + where + unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box + -- NOTE: if peer does not have a block, it may -- cause to an unpleasant timeouts -- So make sure that this peer really answered to @@ -429,6 +462,10 @@ blockDownloadLoop env0 = do let withAllStuff = withPeerM e . withDownload env0 + -- FIXME: exception-handling + void $ liftIO $ async $ withPeerM e do + downloadMonLoop (view downloadMon env0) + void $ liftIO $ async $ forever $ withPeerM e do pause @'Seconds 30 diff --git a/hbs2-peer/app/BlockHttpDownload.hs b/hbs2-peer/app/BlockHttpDownload.hs index 8a2e91a6..a0deff83 100644 --- a/hbs2-peer/app/BlockHttpDownload.hs +++ b/hbs2-peer/app/BlockHttpDownload.hs @@ -7,6 +7,7 @@ import HBS2.Actors.Peer import HBS2.Clock import HBS2.Data.Detect import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -66,6 +67,7 @@ blockHttpDownloadLoop :: forall e m . , PeerSessionKey e (PeerInfo e) , Pretty (Peer e) , IsPeerAddr e m + , ForSignedBox e ) => DownloadEnv e -> m () blockHttpDownloadLoop denv = do diff --git a/hbs2-peer/app/DownloadMon.hs b/hbs2-peer/app/DownloadMon.hs new file mode 100644 index 00000000..5288958b --- /dev/null +++ b/hbs2-peer/app/DownloadMon.hs @@ -0,0 +1,44 @@ +{-# Language TemplateHaskell #-} +module DownloadMon where + +import HBS2.Prelude.Plated + +import HBS2.System.Logger.Simple + +import HBS2.Actors.Peer +import PeerTypes + +import Data.Functor +import Data.HashMap.Strict qualified as HashMap + +import UnliftIO + +import Control.Monad +import Lens.Micro.Platform + + +downloadMonLoop :: ( MonadIO m + , HasStorage m + ) + => DownloadMonEnv + -> m () + +downloadMonLoop env = do + + debug "I'm a download monitor" + + -- FIXME: timeout-hardcodes + let refs = readTVarIO (view downloads env) <&> HashMap.keys <&> fmap (,2) + + polling (Polling 2.5 1) refs $ \ref -> do + debug $ "DownloadMon. check" <+> pretty ref + done <- checkDownloaded ref + when done do + mbAction <- atomically $ do + a <- readTVar (view downloads env) <&> HashMap.lookup ref + modifyTVar (view downloads env) (HashMap.delete ref) + pure a + + forM_ mbAction $ \action -> liftIO $ async $ liftIO action + + diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index e0251e31..48bdd439 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -8,7 +8,10 @@ module PeerTypes where import HBS2.Actors.Peer import HBS2.Actors.Peer.Types import HBS2.Clock +import HBS2.Data.Types.SignedBox import HBS2.Data.Types.Peer +import HBS2.Data.Types.Refs +import HBS2.Data.Detect import HBS2.Defaults import HBS2.Events import HBS2.Hash @@ -33,7 +36,7 @@ import PeerConfig import Prelude hiding (log) import Data.Foldable (for_) import Control.Concurrent.Async -import Control.Concurrent.STM +-- import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Writer qualified as W import Crypto.Saltine.Core.Box qualified as Encrypt @@ -52,11 +55,15 @@ import Data.IntMap (IntMap) import Data.IntSet (IntSet) import Data.Text qualified as Text import Data.Text.Encoding qualified as TE -import Data.Time.Clock (NominalDiffTime) import Data.Heap qualified as Heap import Data.Heap (Heap,Entry(..)) import Data.Time.Clock import Data.Word +import Data.List qualified as List + +import UnliftIO.STM + +import Streaming.Prelude qualified as S data PeerInfo e = PeerInfo @@ -123,6 +130,7 @@ type MyPeer e = ( Eq (Peer e) , Hashable (Peer e) , Pretty (Peer e) , HasPeer e + , ForSignedBox e ) data DownloadReq e @@ -152,6 +160,7 @@ instance Expires (EventKey e (DownloadReq e)) where type DownloadFromPeerStuff e m = ( MyPeer e , MonadIO m + , ForSignedBox e , Request e (BlockInfo e) m , Request e (BlockChunks e) m , MonadReader (PeerEnv e ) m @@ -213,7 +222,24 @@ data BlockState = makeLenses 'BlockState +data DownloadMonEnv = + DownloadMonEnv + { _downloads :: TVar (HashMap HashRef (IO ())) + } +makeLenses 'DownloadMonEnv + +downloadMonNew :: MonadIO m => m DownloadMonEnv +downloadMonNew = DownloadMonEnv <$> newTVarIO mempty + +downloadMonAdd :: forall m . MonadIO m + => DownloadMonEnv + -> HashRef + -> IO () + -> m () + +downloadMonAdd env h whenDone = do + atomically $ modifyTVar (view downloads env) (HashMap.insert h whenDone) data DownloadEnv e = DownloadEnv @@ -222,6 +248,7 @@ data DownloadEnv e = , _blockPostponedTo :: Cache (Hash HbSync) () , _blockDelayTo :: TQueue (Hash HbSync) , _blockProposed :: Cache (Hash HbSync, Peer e) () + , _downloadMon :: DownloadMonEnv , _downloadBrains :: SomeBrains e } @@ -235,6 +262,7 @@ newDownloadEnv brains = liftIO do <*> Cache.newCache (Just defBlockBanTime) <*> newTQueueIO <*> Cache.newCache (Just (toTimeSpec (2 :: Timeout 'Seconds))) + <*> downloadMonNew <*> pure (SomeBrains brains) newtype BlockDownloadM e m a = @@ -429,6 +457,20 @@ mkPeerMeta conf penv = do elem k = W.tell . L.singleton . (k ,) +-- FIXME: slow-deep-scan-exception-seems-not-working +checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool +checkDownloaded hr = do + sto <- getStorage + let readBlock h = liftIO $ getBlock sto h + + result <- S.toList_ $ + deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do + unless (fromHashRef hr == ha) do + here <- liftIO $ hasBlock sto ha + S.yield here + + pure $ maybe False (not . List.null) $ sequence result + data Polling = Polling { waitBefore :: NominalDiffTime @@ -443,6 +485,8 @@ polling :: forall a m . (MonadIO m, Hashable a) polling o listEntries action = do + -- FIXME: might-be-concurrent + pause (TimeoutNDT (waitBefore o)) now0 <- getTimeCoarse diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index fc6e4824..7b16fb52 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -162,19 +162,6 @@ refChanAddDownload env chan r onComlete = do atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete))) --- FIXME: slow-deep-scan-exception-seems-not-working -checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool -checkDownloaded hr = do - sto <- getStorage - let readBlock h = liftIO $ getBlock sto h - - result <- S.toList_ $ - deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do - unless (fromHashRef hr == ha) do - here <- liftIO $ hasBlock sto ha - S.yield here - - pure $ maybe False (not . List.null) $ sequence result readLog :: forall m . ( MonadUnliftIO m ) diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 0f951bdf..c0cf535d 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -132,6 +132,7 @@ executable hbs2-peer other-modules: BlockDownload , BlockHttpDownload , DownloadQ + , DownloadMon , EncryptionKeys , Bootstrap , PeerInfo diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 7537fe9d..56b87dc1 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -533,6 +533,7 @@ main = join . customExecParser (prefs showHelpOnError) $ pBundle = hsubparser ( command "create" (info pBundleCreate (progDesc "create bundle")) <> command "list" (info pBundleList (progDesc "list bundle")) <> command "import" (info pBundleImport (progDesc "import objects from bundle")) + <> command "create-ref" (info pBundleCreateRef (progDesc "create bundle ref block")) ) pBundleCreate = do @@ -551,6 +552,27 @@ main = join . customExecParser (prefs showHelpOnError) $ print $ pretty bundle + pBundleCreateRef = do + o <- common + kr <- strOption (long "keyring" <> short 'k' <> help "owner credentials") + hash <- strArgument (metavar "HASHREF") + + pure $ withStore o $ \sto -> do + sc <- BS.readFile kr + creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile sc)) `orDie` "bad keyring file" + + let sk = view peerSignSk creds + let pk = view peerSignPk creds + + ref <- pure (fromStringMay hash) `orDie` "invalid HASHREF" + + let refval = makeBundleRefValue @L4Proto pk sk (BundleRefSimple ref) + + mh <- putBlock sto (serialise refval) + + maybe1 mh exitFailure $ \h -> do + print $ pretty h + exitSuccess pBundleImport = do o <- common