bundle-basic-implementation

This commit is contained in:
Dmitry Zuikov 2023-09-20 05:45:08 +03:00
parent 67892fca02
commit a5628a19ed
13 changed files with 274 additions and 35 deletions

View File

@ -13,6 +13,7 @@ fixme-files docs/pep*.txt
fixme-files docs/drafts/**/*.txt
fixme-files docs/pr/**/*.txt
fixme-files docs/todo/**/*.txt
fixme-files docs/notes/**/*.txt
fixme-files-ignore .direnv/** dist-newstyle/**

View File

@ -1,3 +1,13 @@
(fixme-set "assigned" "voidlizard" "CmfGGmDAuC")
(fixme-set "assigned" "voidlizard" "4Bm5kS8t54" )
(fixme-set "assigned" "voidlizard" "8i2gUFTTnH" )
(fixme-set "assigned" "voidlizard" "CPhvijEXN2" )
(fixme-set "assigned" "HPoqtobDAT" "voidlizard")
(fixme-set "workflow" "test" "HPoqtobDAT")
(fixme-set "assigned" "voidlizard" "3HwTgQQXvC")
(fixme-set "workflow" "test" "CmfGGmDAuC")
(fixme-set "workflow" "test" "3HwTgQQXvC")
(fixme-set "workflow" "test" "4Bm5kS8t54")
(fixme-set "workflow" "test" "8i2gUFTTnH")
(fixme-set "workflow" "test" "CPhvijEXN2")
(fixme-set "workflow" "backlog" "6WZaH3NXuH")

61
docs/notes/bundles.txt Normal file
View File

@ -0,0 +1,61 @@
Базовая реализация бандлов
Основная идея:
Берём список HashRef, каждый HashRef может ссылаться
на какие-то объекты и создаем журнал (замыкание),
куда складываем все нужные объекты.
Это может быть нужно для ускорения передачи
большого числа маленьких объектов там, где это
нужно.
Минусы: создаются объекты-обёртки,
дублируются данные в бандлах и БД.
Решение:
TODO: introduce-bundle-ttl
удалять бандлы и BundleRefValue,
которые никто не трогал дольше, чем
ttl.
Делаем bundle:
```
hbs2 bundle create < bundle-refs
<SOME-HASH>
```
В bundle-refs список хэшей (HashRef) в текстовом виде
Делаем ссылку на bundle:
```
hbs2 bundle create-ref -k some.key <SOME-HASH>
<SOME-OTHER-HASH>
```
Создаем объект типа BundleRefValue и подписывает его
неким ключом.
Идея в том, что даже если мы не создатели Bundle и BundleRefValue,
мы можем его сохранять и распространять, сохраняя оригинальную
подпись.
Клиенты уже сами решают, доверяют они ключу подписанта или нет.
Распространение bundle:
```
hbs2-peer announce <SOME-OTHER-HASH>
```
Теперь те, кто вообще слушают анонсы от пира --- получат
блок BundleRefValue, проверят подпись, поставят Bundle
на скачивание, и когда он скачается - импортируют объекты
из него.

View File

@ -7,10 +7,14 @@ import HBS2.Storage
import HBS2.Storage.Operations
import HBS2.Hash
import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Net.Proto.Types
import HBS2.Net.Auth.Credentials
import HBS2.Data.Detect
import Data.Word
import Data.Function
import Codec.Compression.GZip as GZip
import Codec.Serialise
import Control.Monad
@ -20,12 +24,33 @@ import Data.ByteString.Lazy.Char8 qualified as LBS
import Data.Functor
import Data.List qualified as List
import Data.Either
import Data.Maybe
import Streaming.Prelude qualified as S
import Streaming()
{- HLINT ignore "Use newtype instead of data" -}
data BundleRefValue e =
BundleRefValue (SignedBox BundleRef e)
deriving stock (Generic)
instance ForSignedBox e => Serialise (BundleRefValue e)
data BundleRef =
BundleRefSimple HashRef
deriving stock (Generic)
instance Serialise BundleRef
makeBundleRefValue :: forall e . (ForSignedBox e, Signatures (Encryption e))
=> PubKey 'Sign (Encryption e)
-> PrivKey 'Sign (Encryption e)
-> BundleRef
-> BundleRefValue e
makeBundleRefValue pk sk ref = BundleRefValue $ makeSignedBox @e pk sk ref
-- у нас может быть много способов хранить данные:
-- сжимать целиком (эффективно, но медленно)
@ -80,29 +105,32 @@ createBundle :: ( MonadIO m
-> [HashRef]
-> m (Maybe HashRef)
createBundle sto refs = runMaybeT do
-- читать блок из сторейджа
blocks <- forM refs $ \href -> do
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)
createBundle sto refs = do
let readBlock = liftIO . getBlock sto
let compressed = compressWith params blk
let size = LBS.length compressed
let section = BundleSection (fromIntegral size) (Just href)
-- FIXME: handle-errors-on-missed-blocks
blocks <- S.toList_ $ forM_ refs $ \hr -> do
deepScan ScanDeep (const none) (fromHashRef hr) readBlock $ \ha -> do
blk' <- readBlock ha
let href = HashRef ha
maybe1 blk' none $ \blk -> do
let compressed = compressWith params blk
let size = LBS.length compressed
let section = BundleSection (fromIntegral size) (Just href)
let sbs = serialise section
let pad = sectionHeadSize - LBS.length sbs
let pads = LBS.replicate pad '\x0'
pure (sbs <> pads <> compressed)
let sbs = serialise section
let pad = sectionHeadSize - LBS.length sbs
let pads = LBS.replicate pad '\x0'
S.yield (sbs <> pads <> compressed)
let buHead = serialise (BundleHeadSimple sectionHeadSize)
let buPadded = buHead <> LBS.replicate (bundleHeadSize - LBS.length buHead) '\x0'
let blob = buPadded <> mconcat blocks
-- FIXME: streamed-write-as-merkle
wtf <- liftIO $ writeAsMerkle sto blob
pure (HashRef wtf)
pure $ Just (HashRef wtf)
where
params = defaultCompressParams { compressLevel = bestSpeed }

View File

@ -33,6 +33,7 @@ instance ( Eq (PubKey 'Sign (Encryption e))
type ForSignedBox e = ( Serialise ( PubKey 'Sign (Encryption e))
, FromStringMaybe (PubKey 'Sign (Encryption e))
, Serialise (Signature (Encryption e))
, Signatures (Encryption e)
, Hashable (PubKey 'Sign (Encryption e))
)

View File

@ -69,6 +69,7 @@ makeLenses 'RefChanHeadBlockSmall
type ForRefChans e = ( Serialise ( PubKey 'Sign (Encryption e))
, Pretty (AsBase58 (PubKey 'Sign (Encryption e)))
, FromStringMaybe (PubKey 'Sign (Encryption e))
, Signatures (Encryption e)
, Serialise (Signature (Encryption e))
, Hashable (PubKey 'Sign (Encryption e))
)

View File

@ -1,12 +1,13 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
module BlockDownload where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Data.Bundle
import HBS2.Data.Types.SignedBox
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
@ -24,6 +25,7 @@ import HBS2.System.Logger.Simple
import PeerTypes
import PeerInfo
import Brains
import DownloadMon
import Control.Concurrent.Async
import Control.Concurrent.STM
@ -45,6 +47,7 @@ import Data.Maybe
import Lens.Micro.Platform
import System.Random (randomRIO)
import System.Random.Shuffle (shuffleM)
import Codec.Serialise
getBlockForDownload :: forall e m . (MonadIO m, IsPeerAddr e m, MyPeer e, HasStorage m)
=> Peer e
@ -91,6 +94,7 @@ getBlockForDownload peer = do
processBlock :: forall e m . ( MonadIO m
, HasStorage m
, MyPeer e
, ForSignedBox e
, HasPeerLocator e (BlockDownloadM e m)
)
=> Hash HbSync
@ -104,14 +108,15 @@ processBlock h = do
let parent = Just h
bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h)
block <- liftIO $ getBlock sto h
let bt = tryDetect h <$> block
-- FIXME: если блок нашёлся, то удаляем его из wip
when (isJust bt) (removeFromWip h)
let handleHrr = \(hrr :: Either (Hash HbSync) [HashRef]) -> do
let handleHrr (hrr :: Either (Hash HbSync) [HashRef]) = do
case hrr of
Left hx -> addDownload parent hx
Right hr -> do
@ -163,8 +168,36 @@ processBlock h = do
walkMerkle h (liftIO . getBlock sto) handleHrr
Just (Blob{}) -> do
-- NOTE: bundle-ref-detection-note
-- добавлять обработку BundleRefValue в tryDetect
-- слишком накладно, т.к. требует большого количества
-- констрейнтов, которые не предполагались там
-- изначально. Как временная мера -- пробуем Bundle
-- обнаруживать здесь.
mon <- asks (view downloadMon)
runMaybeT do
bs <- MaybeT $ pure block
-- TODO: check-if-we-somehow-trust-this-key
(pk, BundleRefSimple ref) <- MaybeT $ pure $ deserialiseOrFail @(BundleRefValue e) bs
& either (const Nothing) unboxBundleRef
debug $ "GOT BundleRefValue" <+> parens (pretty ref)
downloadMonAdd mon ref do
debug $ "Downloaded bundle:" <+> pretty ref
r <- importBundle sto (void . putBlock sto . snd) ref
case r of
Right{} -> debug $ "Imported bundle: " <+> pretty ref
Left e -> err (viaShow e)
lift $ addDownload parent (fromHashRef ref)
pure ()
where
unboxBundleRef (BundleRefValue box) = unboxSignedBox0 box
-- NOTE: if peer does not have a block, it may
-- cause to an unpleasant timeouts
-- So make sure that this peer really answered to
@ -429,6 +462,10 @@ blockDownloadLoop env0 = do
let withAllStuff = withPeerM e . withDownload env0
-- FIXME: exception-handling
void $ liftIO $ async $ withPeerM e do
downloadMonLoop (view downloadMon env0)
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 30

View File

@ -7,6 +7,7 @@ import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
@ -66,6 +67,7 @@ blockHttpDownloadLoop :: forall e m .
, PeerSessionKey e (PeerInfo e)
, Pretty (Peer e)
, IsPeerAddr e m
, ForSignedBox e
)
=> DownloadEnv e -> m ()
blockHttpDownloadLoop denv = do

View File

@ -0,0 +1,44 @@
{-# Language TemplateHaskell #-}
module DownloadMon where
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import HBS2.Actors.Peer
import PeerTypes
import Data.Functor
import Data.HashMap.Strict qualified as HashMap
import UnliftIO
import Control.Monad
import Lens.Micro.Platform
downloadMonLoop :: ( MonadIO m
, HasStorage m
)
=> DownloadMonEnv
-> m ()
downloadMonLoop env = do
debug "I'm a download monitor"
-- FIXME: timeout-hardcodes
let refs = readTVarIO (view downloads env) <&> HashMap.keys <&> fmap (,2)
polling (Polling 2.5 1) refs $ \ref -> do
debug $ "DownloadMon. check" <+> pretty ref
done <- checkDownloaded ref
when done do
mbAction <- atomically $ do
a <- readTVar (view downloads env) <&> HashMap.lookup ref
modifyTVar (view downloads env) (HashMap.delete ref)
pure a
forM_ mbAction $ \action -> liftIO $ async $ liftIO action

View File

@ -8,7 +8,10 @@ module PeerTypes where
import HBS2.Actors.Peer
import HBS2.Actors.Peer.Types
import HBS2.Clock
import HBS2.Data.Types.SignedBox
import HBS2.Data.Types.Peer
import HBS2.Data.Types.Refs
import HBS2.Data.Detect
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
@ -33,7 +36,7 @@ import PeerConfig
import Prelude hiding (log)
import Data.Foldable (for_)
import Control.Concurrent.Async
import Control.Concurrent.STM
-- import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Writer qualified as W
import Crypto.Saltine.Core.Box qualified as Encrypt
@ -52,11 +55,15 @@ import Data.IntMap (IntMap)
import Data.IntSet (IntSet)
import Data.Text qualified as Text
import Data.Text.Encoding qualified as TE
import Data.Time.Clock (NominalDiffTime)
import Data.Heap qualified as Heap
import Data.Heap (Heap,Entry(..))
import Data.Time.Clock
import Data.Word
import Data.List qualified as List
import UnliftIO.STM
import Streaming.Prelude qualified as S
data PeerInfo e =
PeerInfo
@ -123,6 +130,7 @@ type MyPeer e = ( Eq (Peer e)
, Hashable (Peer e)
, Pretty (Peer e)
, HasPeer e
, ForSignedBox e
)
data DownloadReq e
@ -152,6 +160,7 @@ instance Expires (EventKey e (DownloadReq e)) where
type DownloadFromPeerStuff e m = ( MyPeer e
, MonadIO m
, ForSignedBox e
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
@ -213,7 +222,24 @@ data BlockState =
makeLenses 'BlockState
data DownloadMonEnv =
DownloadMonEnv
{ _downloads :: TVar (HashMap HashRef (IO ()))
}
makeLenses 'DownloadMonEnv
downloadMonNew :: MonadIO m => m DownloadMonEnv
downloadMonNew = DownloadMonEnv <$> newTVarIO mempty
downloadMonAdd :: forall m . MonadIO m
=> DownloadMonEnv
-> HashRef
-> IO ()
-> m ()
downloadMonAdd env h whenDone = do
atomically $ modifyTVar (view downloads env) (HashMap.insert h whenDone)
data DownloadEnv e =
DownloadEnv
@ -222,6 +248,7 @@ data DownloadEnv e =
, _blockPostponedTo :: Cache (Hash HbSync) ()
, _blockDelayTo :: TQueue (Hash HbSync)
, _blockProposed :: Cache (Hash HbSync, Peer e) ()
, _downloadMon :: DownloadMonEnv
, _downloadBrains :: SomeBrains e
}
@ -235,6 +262,7 @@ newDownloadEnv brains = liftIO do
<*> Cache.newCache (Just defBlockBanTime)
<*> newTQueueIO
<*> Cache.newCache (Just (toTimeSpec (2 :: Timeout 'Seconds)))
<*> downloadMonNew
<*> pure (SomeBrains brains)
newtype BlockDownloadM e m a =
@ -429,6 +457,20 @@ mkPeerMeta conf penv = do
elem k = W.tell . L.singleton . (k ,)
-- FIXME: slow-deep-scan-exception-seems-not-working
checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool
checkDownloaded hr = do
sto <- getStorage
let readBlock h = liftIO $ getBlock sto h
result <- S.toList_ $
deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do
unless (fromHashRef hr == ha) do
here <- liftIO $ hasBlock sto ha
S.yield here
pure $ maybe False (not . List.null) $ sequence result
data Polling =
Polling
{ waitBefore :: NominalDiffTime
@ -443,6 +485,8 @@ polling :: forall a m . (MonadIO m, Hashable a)
polling o listEntries action = do
-- FIXME: might-be-concurrent
pause (TimeoutNDT (waitBefore o))
now0 <- getTimeCoarse

View File

@ -162,19 +162,6 @@ refChanAddDownload env chan r onComlete = do
atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete)))
-- FIXME: slow-deep-scan-exception-seems-not-working
checkDownloaded :: forall m . (MonadIO m, HasStorage m) => HashRef -> m Bool
checkDownloaded hr = do
sto <- getStorage
let readBlock h = liftIO $ getBlock sto h
result <- S.toList_ $
deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do
unless (fromHashRef hr == ha) do
here <- liftIO $ hasBlock sto ha
S.yield here
pure $ maybe False (not . List.null) $ sequence result
readLog :: forall m . ( MonadUnliftIO m )

View File

@ -132,6 +132,7 @@ executable hbs2-peer
other-modules: BlockDownload
, BlockHttpDownload
, DownloadQ
, DownloadMon
, EncryptionKeys
, Bootstrap
, PeerInfo

View File

@ -533,6 +533,7 @@ main = join . customExecParser (prefs showHelpOnError) $
pBundle = hsubparser ( command "create" (info pBundleCreate (progDesc "create bundle"))
<> command "list" (info pBundleList (progDesc "list bundle"))
<> command "import" (info pBundleImport (progDesc "import objects from bundle"))
<> command "create-ref" (info pBundleCreateRef (progDesc "create bundle ref block"))
)
pBundleCreate = do
@ -551,6 +552,27 @@ main = join . customExecParser (prefs showHelpOnError) $
print $ pretty bundle
pBundleCreateRef = do
o <- common
kr <- strOption (long "keyring" <> short 'k' <> help "owner credentials")
hash <- strArgument (metavar "HASHREF")
pure $ withStore o $ \sto -> do
sc <- BS.readFile kr
creds <- pure (parseCredentials @(Encryption L4Proto) (AsCredFile sc)) `orDie` "bad keyring file"
let sk = view peerSignSk creds
let pk = view peerSignPk creds
ref <- pure (fromStringMay hash) `orDie` "invalid HASHREF"
let refval = makeBundleRefValue @L4Proto pk sk (BundleRefSimple ref)
mh <- putBlock sto (serialise refval)
maybe1 mh exitFailure $ \h -> do
print $ pretty h
exitSuccess
pBundleImport = do
o <- common