hbs2/hbs2-peer/app/RefChan.hs

950 lines
34 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# Language AllowAmbiguousTypes #-}
{-# Language TemplateHaskell #-}
module RefChan (
RefChanWorkerEnv(..)
, refChanWorkerEnvHeadQ
, refChanWorkerEnvDownload
, refChanOnHeadFn
, refChanWriteTranFn
, refChanValidateTranFn
, refChanNotifyRelyFn
, refChanWorker
, runRefChanRelyWorker
, refChanWorkerEnv
, refChanNotifyOnUpdated
) where
import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Hash
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Defaults
import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Events
import HBS2.Merkle
import HBS2.Net.Auth.Credentials
import HBS2.Net.Messaging.Unix
import HBS2.Net.Proto
import HBS2.Net.Proto.Definition()
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.Sessions
import HBS2.Storage
import HBS2.System.Logger.Simple
import PeerTypes
import PeerConfig
import BlockDownload
import Brains
import Data.Dynamic
import Codec.Serialise
import Control.Concurrent.STM (flushTQueue)
import Control.Exception ()
import Control.Monad.Except ()
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.ByteString (ByteString)
import Data.Either
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Heap ()
import Data.Coerce
-- import Data.Heap qualified as Heap
import Data.List qualified as List
import Data.Maybe
import Data.Text qualified as Text
import Lens.Micro.Platform
import Data.Generics.Product
import UnliftIO
import Streaming.Prelude qualified as S
import Streaming()
{- HLINT ignore "Use newtype instead of data" -}
data DataNotReady = DataNotReady deriving (Show)
instance Exception DataNotReady
type OnDownloadComplete = HashRef -> IO ()
data RefChanValidator =
RefChanValidator
{ rcvInbox :: TQueue (RefChanValidate UNIX, MVar (RefChanValidate UNIX))
, rcvAsync :: Async ()
}
data RefChanNotifier =
RefChanNotifier
{ rcnPeer :: Peer UNIX
, rcnInbox :: TQueue (RefChanNotify UNIX)
, rcnAsync :: Async ()
}
data RefChanWorkerEnv e =
RefChanWorkerEnv
{ _refChanWorkerConf :: PeerConfig
, _refChanPeerEnv :: PeerEnv e
, _refChanWorkerEnvDEnv :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete)))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef
, _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator)
-- FIXME: peer-addr-to-have-multiple-actors-on-single-box
-- нужно ключом держать Peer e (SockAddr)
-- что бы можно было завести несколько акторов на одном
-- боксе в целях отладки.
, _refChanWorkerNotifiers :: TVar (HashMap (RefChanId e) [RefChanNotifier])
, _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip
, _refChanWorkerNotifiersDone :: Cache (Hash HbSync) ()
, _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) ()
}
makeLenses 'RefChanWorkerEnv
refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
=> PeerConfig
-> PeerEnv e
-> DownloadEnv e
-> m (RefChanWorkerEnv e)
refChanWorkerEnv conf pe de = liftIO $ RefChanWorkerEnv @e conf pe de
<$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> Cache.newCache (Just defRequestLimit)
<*> Cache.newCache (Just defRequestLimit)
refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m ()
refChanWriteTranFn env href = do
atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href
refChanValidateTranFn :: forall e m . ( MonadUnliftIO m
, ForRefChans e, e ~ L4Proto
, HasNonces (RefChanValidate UNIX) m
)
=> RefChanWorkerEnv e
-> RefChanId e
-> HashRef
-> m Bool
refChanValidateTranFn env chan htran = do
mbv <- readTVarIO (view refChanWorkerValidators env) <&> HashMap.lookup chan
-- отправить запрос в соответствующий... что?
-- ждать ответа
debug $ "VALIDATE TRAN" <+> pretty (AsBase58 chan) <+> pretty htran
r <- maybe1 mbv (pure True) $ \(RefChanValidator q _) -> do
answ <- newEmptyMVar
nonce <- newNonce @(RefChanValidate UNIX)
atomically $ writeTQueue q (RefChanValidate nonce chan (Validate @UNIX htran), answ)
withMVar answ $ \msg -> case rcvData msg of
Accepted{} -> pure True
_ -> pure False
debug $ "TRANS VALIDATION RESULT: " <+> pretty htran <+> pretty r
pure r
-- FIXME: leak-when-block-never-really-updated
refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m ()
refChanNotifyOnUpdated env chan = do
atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ())
refChanNotifyRelyFn :: forall e m . ( MonadUnliftIO m
, ForRefChans e, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanId e
-> RefChanNotify e
-> m ()
refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do
debug "refChanNotifyRelyFn"
-- FIXME: multiple-hash-object-msg-performance
let h0 = hashObject @HbSync (serialise msg)
void $ runMaybeT do
guard =<< liftIO (Cache.lookup (view refChanWorkerNotifiersDone env) h0 <&> isNothing)
liftIO $ Cache.insert (view refChanWorkerNotifiersDone env) h0 ()
-- RefChanNotifier q _ <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier _ q _) -> do
atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s))
refChanNotifyRelyFn _ _ _ = pure ()
refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e
)
=> RefChanWorkerEnv e
-> RefChanId e
-> HashRef
-> OnDownloadComplete
-> m ()
refChanAddDownload env chan r onComlete = do
penv <- ask
t <- getTimeCoarse
withPeerM penv $ withDownload (_refChanWorkerEnvDEnv env)
$ processBlock @e (fromHashRef r)
atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete)))
data NotifyEnv =
NotifyEnv
{ _notifyClient :: Fabriq UNIX
, _notifySelf :: Peer UNIX
}
newtype NotifyProtoM m a = NotifyProto { fromNotifyProto :: ReaderT NotifyEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader NotifyEnv
, MonadTrans
)
runNotifyProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> NotifyProtoM m a -> m a
runNotifyProtoM bus m = runReaderT (fromNotifyProto m) (NotifyEnv (Fabriq bus) (msgUnixSelf bus))
instance Monad m => HasFabriq UNIX (NotifyProtoM m) where
getFabriq = asks _notifyClient
instance Monad m => HasOwnPeer UNIX (NotifyProtoM m) where
ownPeer = asks _notifySelf
refChanNotifyRpcProto :: forall e m . ( MonadIO m
, Request e (RefChanNotify e) m
-- , HasPeerNonce UNIX m
, e ~ UNIX
-- , m ~ PeerM e IO
)
=> RefChanWorkerEnv L4Proto
-> RefChanNotify e
-> m ()
refChanNotifyRpcProto env msg@(ActionRequest chan action) = do
let penv = view refChanPeerEnv env
case action of
RefChanAnnounceBlock h -> do
debug $ "RefChanNotify: RefChanAnnounceBlock" <+> pretty h
liftIO $ withPeerM penv $ do
sto <- getStorage
sz' <- liftIO $ hasBlock sto (fromHashRef h)
maybe1 sz' none $ \sz -> do
ann <- simpleBlockAnnounce @L4Proto sz (fromHashRef h)
gossip ann
pure ()
RefChanFetch h -> do
debug $ "RefChanNotify: RefChanFetch" <+> pretty h
liftIO $ withPeerM penv $ do
refChanAddDownload env chan h (const $ pure ())
where
proto = Proxy @(RefChanNotify e)
refChanNotifyRpcProto env msg@(Notify chan (SignedBox pk box si)) = do
debug "GOT MESSAGE FROM CLIENT"
atomically $ writeTQueue (view refChanWorkerNotifiersInbox env) (Notify @L4Proto chan (SignedBox pk box si))
-- тут мы должны переслать всем, кроме отправителя
let h0 = hashObject @HbSync (serialise msg)
-- FIXME: squash-this-copypaste
void $ runMaybeT do
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier peer q _) -> do
let lkey = (peer, h0)
-- guard =<< liftIO (Cache.lookup (view refChanWorkerLocalRelyDone env) lkey <&> isNothing)
-- liftIO $ Cache.insert (view refChanWorkerLocalRelyDone env) lkey ()
atomically $ writeTQueue q msg
refChanWorkerInitNotifiers :: forall e m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
-- , ForRefChans e
-- , ForRefChans UNIX
-- , m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> m ()
refChanWorkerInitNotifiers env = do
debug "refChanWorkerInitNotifiers"
let (PeerConfig syn) = view refChanWorkerConf env
let notifiers = [ mkV rc x | ListVal [ SymbolVal "notify"
, SymbolVal "refchan"
, LitStrVal rc
, ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ]
] <- syn
] & catMaybes
forM_ notifiers $ \(rc, sa) -> do
debug $ "** NOTIFIER FOR" <+> pretty (AsBase58 rc, sa)
q <- newTQueueIO
val <- async $ liftIO $ notifierThread rc sa q
let rcn = RefChanNotifier (fromString sa) q val
atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.insertWith (<>) rc [rcn])
where
mkV :: Text -> Text -> Maybe (RefChanId e, String)
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
notifierThread _ sa q = do
debug $ ">>> NOTIFIER THREAD FOR" <+> pretty sa
client <- newMessagingUnix False 1.0 sa
msg <- async $ runMessagingUnix client
runNotifyProtoM client do
proto <- async $ runProto [ makeResponse (refChanNotifyRpcProto env) ]
forever do
req <- atomically $ readTQueue q
debug "Rely notification request"
request @UNIX (fromString sa) req
wait proto
mapM_ wait [msg]
data ValidateEnv =
ValidateEnv
{ _validateClient :: Fabriq UNIX
, _validateSelf :: Peer UNIX
}
newtype ValidateProtoM m a = ValidateProto { fromValidateProto :: ReaderT ValidateEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader ValidateEnv
, MonadTrans
)
runValidateProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> ValidateProtoM m a -> m a
runValidateProtoM tran m = runReaderT (fromValidateProto m) (ValidateEnv (Fabriq tran) (msgUnixSelf tran))
instance Monad m => HasFabriq UNIX (ValidateProtoM m) where
getFabriq = asks _validateClient
instance Monad m => HasOwnPeer UNIX (ValidateProtoM m) where
ownPeer = asks _validateSelf
refChanValidateProto :: forall e m . ( MonadIO m
, Request e (RefChanValidate e) m
, Response e (RefChanValidate e) m
, e ~ UNIX
)
=> Cache (RefChanValidateNonce e) (MVar (RefChanValidate e))
-> RefChanValidate e
-> m ()
refChanValidateProto waiters msg = do
debug $ "GOT ANSWER FROM VALIDATOR" <+> pretty msg
case rcvData msg of
Accepted h -> emitAnswer h msg
Rejected h -> emitAnswer h msg
_ -> none
where
emitAnswer h m = liftIO do
debug $ "EMIT ANSWER" <+> pretty h
mbAnsw <- Cache.lookup waiters (rcvNonce m)
maybe1 mbAnsw none $ \answ -> do
putMVar answ m
refChanWorkerInitValidators :: forall e m . ( MonadIO m
, MonadUnliftIO m
-- , MyPeer e
-- , ForRefChans e
-- , ForRefChans UNIX
-- , m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> m ()
refChanWorkerInitValidators env = do
debug "refChanWorkerInitValidators"
let (PeerConfig syn) = view refChanWorkerConf env
let validators = [ mkV rc x | ListVal [ SymbolVal "validate"
, SymbolVal "refchan"
, LitStrVal rc
, ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ]
] <- syn
] & catMaybes
forM_ validators $ \(rc, sa) -> do
debug $ "** VALIDATOR FOR" <+> pretty (AsBase58 rc, sa)
here <- readTVarIO (_refChanWorkerValidators env) <&> HashMap.member rc
unless here do
q <- newTQueueIO
val <- async $ validatorThread rc sa q
let rcv = RefChanValidator q val
atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv)
where
mkV :: Text -> Text -> Maybe (RefChanId e, String)
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
-- FIXME: make-thread-respawning
validatorThread chan sa q = liftIO do
client <- newMessagingUnix False 1.0 sa
msg <- async $ runMessagingUnix client
-- FIXME: hardcoded-timeout
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
-- FIXME: hardcoded-timeout
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
runValidateProtoM client do
poke <- async $ forever do
pause @'Seconds 10
mv <- newEmptyMVar
nonce <- newNonce @(RefChanValidate UNIX)
atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv)
z <- async $ runProto
[ makeResponse (refChanValidateProto waiters)
]
forever do
(req, answ) <- atomically $ readTQueue q
case rcvData req of
Validate href -> do
debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa
liftIO $ Cache.insert waiters (rcvNonce req) answ
let pa = fromString sa
request pa req
Poke{} -> do
debug "DO SEND POKE"
let pa = fromString sa
request pa req
Poke{} -> do
debug "DO SEND POKE"
let pa = fromString sa
pure ()
--
-- request pa req
_ -> pure ()
(_, r) <- waitAnyCatch [z,poke]
debug $ "SOMETHING WRONG:" <+> viaShow r
cancel msg
warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan)
runRefChanRelyWorker :: forall e m .
( MonadIO m
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanAdapter e (ResponseM e m)
-> IO ()
runRefChanRelyWorker env adapter = liftIO $ forever do
withPeerM (view refChanPeerEnv env) do
me <- ownPeer @e
-- FIXME: use-bounded-queue-ASAP
mess <- atomically $ readTQueue (view refChanWorkerNotifiersInbox env)
runResponseM me $ do
refChanNotifyProto True adapter mess
refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
, HasStorage m
, Request e (RefChanHead e) m
, Request e (RefChanRequest e) m
, Sessions e (KnownPeer e) m
, Signatures s
, s ~ Encryption e
, IsRefPubKey s
-- , Pretty (AsBase58 (PubKey 'Sign s))
, ForRefChans e
, EventListener e (RefChanRound e) m
, EventListener e (RefChanRequest e) m
, Sessions e (RefChanRound e) m
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> SomeBrains e
-> m ()
refChanWorker env brains = do
penv <- ask
mergeQ <- newTQueueIO
-- FIXME: resume-on-exception
hw <- async (refChanHeadMon penv)
-- FIXME: insist-more-during-download
-- что-то частая ситуация, когда блоки
-- с трудом докачиваются. надо бы
-- разобраться. возможно переделать
-- механизм скачивания блоков
downloads <- async monitorHeadDownloads
polls <- async refChanPoll
wtrans <- async refChanWriter
cleanup1 <- async cleanupRounds
merge <- async (logMergeProcess env mergeQ)
sto <- getStorage
liftIO $ refChanWorkerInitValidators env
liftIO $ refChanWorkerInitNotifiers env
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
h <- liftIO $ getRef sto (RefChanLogKey @s chan)
-- игнорируем, если синхронно
unless ((HashRef <$> h) == Just val) do
refChanAddDownload env chan val $ \href -> do
debug $ "BLOCK DOWNLOADED" <+> pretty href
atomically $ writeTQueue mergeQ (chan, href)
atomically $ writeTQueue mergeQ (chan, val)
forever do
pause @'Seconds 10
debug "I'm refchan worker"
mapM_ waitCatch [hw,downloads,polls,wtrans,merge,cleanup1]
where
cleanupRounds = do
rounds <- newTVarIO HashSet.empty
subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do
atomically $ modifyTVar rounds (HashSet.insert rcrk)
debug $ "ON ROUND STARTED" <+> pretty rcrk
forever do
-- FIXME: use-polling-function-and-respect-wait
pause @'Seconds 10
now <- getTimeCoarse
xs <- readTVarIO rounds <&> HashSet.toList
forM_ xs $ \x -> do
void $ runMaybeT do
se <- MaybeT $ find @e x id
closed <- readTVarIO (view refChanRoundClosed se)
trans <- atomically $ readTVar (view refChanRoundTrans se) <&> HashSet.toList
let ttl = view refChanRoundTTL se
when (closed || ttl <= now) do
lift $ expire x
when closed do
forM_ trans $ \t -> do
debug $ "WRITING TRANS" <+> pretty t
lift $ refChanWriteTranFn env t
atomically $ modifyTVar rounds (HashSet.delete x)
debug $ "CLEANUP ROUND" <+> pretty x
refChanWriter = do
sto <- getStorage
forever do
pause @'Seconds 1
_ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env)
htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env)
trans <- forM htrans $ \h -> runMaybeT do
blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h))
upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just
case upd of
Propose chan _ -> pure (RefChanLogKey @(Encryption e) chan, h)
Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h)
let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ]
-- FIXME: process-in-parallel
forM_ (HashMap.toList byChan) $ \(c,new) -> do
mbLog <- liftIO $ getRef sto c
hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef
-- FIXME: might-be-problems-on-large-logs
let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList
-- -- FIXME: remove-chunk-num-hardcode
let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew
nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss
liftIO $ updateRef sto c nref
debug $ "REFCHANLOG UPDATED:" <+> pretty c <+> pretty nref
refChanPoll = do
let listRefs = listPolledRefs @e brains "refchan" <&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 5 5) listRefs $ \ref -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref)
broadCastMessage (RefChanGetHead @e ref)
broadCastMessage (RefChanRequest @e ref)
monitorHeadDownloads = forever do
pause @'Seconds 1
all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList
now <- getTimeCoarse
-- FIXME: change-to-polling-functions
-- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do
here <- checkDownloaded r
if here then do
liftIO $ onComplete r
-- refChanOnHeadFn env chan (RefChanHeadBlockTran r)
pure mempty
else do
-- FIXME: fix-timeout-hardcode
let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600
if expired then pure mempty else pure [(r,item)]
atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest))
-- FIXME: in-parallel?
refChanHeadMon pe = liftIO $ withPeerM pe do
forever do
(chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
here <- checkDownloaded hr
if not here then do
refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran)
trace $ "BLOCK IS NOT HERE" <+> pretty hr
else do
sto <- getStorage
trace $ "BLOCK IS HERE" <+> pretty hr
-- читаем блок
lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty
let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs
notify <- atomically $ do
no <- readTVar (_refChanWorkerEnvNotify env) <&> HashMap.member chan
modifyTVar (_refChanWorkerEnvNotify env) (HashMap.delete chan)
pure no
case what of
Nothing -> err $ "malformed head block" <+> pretty hr
Just (pk,blk) | pk == chan -> do
let rkey = RefChanHeadKey @s pk
debug $ "Good head block" <+> pretty hr <+> "processing..."
ourVersion <- runMaybeT do
cur <- MaybeT $ liftIO $ getRef sto rkey
lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur)
(_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss
pure $ view refChanHeadVersion blkOur
let v0 = fromMaybe 0 ourVersion
let v1 = view refChanHeadVersion blk
if v1 > v0 then do
debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0)
liftIO $ updateRef sto rkey (fromHashRef hr)
-- если это мы сами его обновили - то неплохо бы
-- всем разослать уведомление. А как?
--
-- TODO: update-acl-here
forM_ (HashMap.keys $ view refChanHeadPeers blk) $ \pip -> do
debug $ "ADD PEER ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 pip)
forM_ (view refChanHeadAuthors blk) $ \au -> do
debug $ "ADD AUTHOR ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 au)
when notify do
debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr
broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr))
else do
debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)
_ -> debug "not subscribed to this refchan"
pure ()
-- распаковываем блок
-- вытаскиваем ключ из блока?
pure ()
data MergeEnv e =
MergeEnv
{ mergeSto :: AnyStorage
, mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) )
}
-- FIXME: possible-performance-issues
-- Выглядит довольно медленно. Вероятно,
-- можно быстрее.
-- В частности, кэшировать уже обработанные логи
logMergeProcess :: forall e s m . ( MonadUnliftIO m
, MyPeer e
, ForRefChans e
, HasStorage m
, Signatures s
, Pretty (AsBase58 (PubKey 'Sign s))
, s ~ Encryption e
, m ~ PeerM e IO
)
=> RefChanWorkerEnv e
-> TQueue (RefChanId e, HashRef)
-> m ()
logMergeProcess env q = do
sto <- getStorage
menv <- MergeEnv sto <$> newTVarIO mempty
forever do
-- FIXME: fix-hardcoded-timeout
pause @'Seconds 1
_ <- atomically $ peekTQueue q
logs <- liftIO $ atomically $ flushTQueue q
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ]
& HashMap.toList
& fmap (over _2 List.nub)
-- FIXME: in-parallel
mapM_ (logMergeChan menv sto) byChan
where
getHead :: MergeEnv e
-> HashRef
-> m (Maybe (RefChanHeadBlock e))
getHead e h = do
let sto = mergeSto e
hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h
here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust
unless here do
warn $ "refchan. head is missed:" <+> pretty h
pure ()
case hd of
Just x -> pure (Just x)
Nothing -> runMaybeT do
hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e hdblob
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
pure headblk
downloadMissedHead :: AnyStorage -> RefChanId e -> HashRef -> m ()
downloadMissedHead sto chan headRef = do
penv <- ask
here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust
unless here do
refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran)
logMergeChan menv sto (chan, logs) = do
penv <- ask
let readFn = getBlock sto
void $ runMaybeT do
let chanKey = RefChanLogKey @s chan
-- FIXME: wont-work-if-no-reference-yet
-- не сработает если ссылка новая
(mergeSet, merge) <- liftIO (getRef sto chanKey) >>= \case
Nothing -> do
new <- mconcat <$> mapM (lift . readLog readFn) logs
pure (HashSet.fromList new, not (List.null new))
Just h -> do
current <- lift $ readLog readFn (HashRef h) <&> HashSet.fromList
new <- mconcat <$> mapM (lift . readLog readFn) (filter (/= HashRef h) logs)
let mergeSet = HashSet.fromList new <> current
pure (mergeSet, not (List.null new))
guard merge
-- итак, тут приехал весь лог, который есть у пира
-- логично искать подтверждения только в нём. если
-- пир принял транзы без достаточного количества
-- подтверждений, то он сам лошара.
-- каждую транзу рассматриваем один раз, если
-- она смержена.
-- если она не смержена --- может быть, надо её
-- в какой-то reject список заносить
-- распаковать, отсортировать по головам сначала
-- потом бежим по головам, достаём головы
-- проверяем acl-ы на соответствие историческим головам
-- потом связываем каждый accept с соответствующим propose
-- потом считаем количество accept для каждого propose
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
-- больше quorum подтверждений для актуальной головы
let mergeList = HashSet.toList mergeSet
-- если какие-то транзакции отсутствуют - пытаемся их скачать
-- и надеемся на лучшее (лог сойдется в следующий раз)
forM_ mergeList $ \href -> do
mblk <- liftIO $ getBlock sto (fromHashRef href)
maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle
r <- forM mergeList $ \href -> runMaybeT do
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk
& either (const Nothing) Just
case tran of
Propose _ box -> do
(pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box
(ak, _) <- MaybeT $ pure $ unboxSignedBox0 box
lift $ lift $ downloadMissedHead sto chan headRef
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ checkACL hd (Just pk) ak
pure [(href, (quo,mempty))]
Accept _ box -> do
(pk, AcceptTran headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
lift $ lift $ downloadMissedHead sto chan headRef
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ HashMap.member pk (view refChanHeadPeers hd)
pure [(hashRef, (quo,[href]))]
let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) )
let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r))
& HashMap.toList
new <- S.toList_ do
forM_ permitted $ \(prop, (qx, accs)) -> do
when (length accs >= qx) do
S.yield prop
S.each accs
debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new)
forM_ new $ \tnew -> do
debug $ "TRANS TO MERGE" <+> pretty tnew
let merged = HashSet.fromList new & HashSet.toList
let pt = toPTree (MaxSize 256) (MaxNum 256) merged
unless (List.null merged) do
liftIO do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
debug $ "NEW REFCHAN" <+> pretty chanKey <+> pretty nref
updateRef sto chanKey nref