hbs2/hbs2-peer/app/RefChan.hs

1053 lines
38 KiB
Haskell
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# Language AllowAmbiguousTypes #-}
{-# Language TemplateHaskell #-}
module RefChan (
RefChanWorkerEnv(..)
, refChanWorkerEnvHeadQ
, refChanWorkerEnvDownload
, refChanOnHeadFn
, refChanWriteTranFn
, refChanValidateTranFn
, refChanNotifyRelyFn
, refChanWorker
, runRefChanRelyWorker
, refChanWorkerEnv
, refChanNotifyOnUpdated
, refChanWorkerEnvSetProbe
) where
import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Hash
import HBS2.Data.Detect
import HBS2.Defaults
import HBS2.Data.Types.Refs
import HBS2.Data.Types.SignedBox
import HBS2.Events
import HBS2.Merkle
import HBS2.Net.Auth.Credentials
import HBS2.Net.Messaging.Unix
import HBS2.Peer.Proto.Peer
import HBS2.Peer.Proto.RefChan
import HBS2.Peer.Proto.RefChan.Adapter
import HBS2.Net.Proto.Notify (SomeNotifySource(..))
import HBS2.Peer.Notify
import HBS2.Net.Proto.Sessions
import HBS2.Storage
import PeerTypes hiding (downloads)
import PeerConfig
import Brains
import Control.Monad.Trans.Cont
import Codec.Serialise
import Control.Concurrent.STM (flushTQueue)
import Control.Exception ()
import Control.Monad.Except ()
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.Coerce
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet qualified as HashSet
import Data.Heap ()
import Data.List qualified as List
import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.Text qualified as Text
import Lens.Micro.Platform
import UnliftIO
import Streaming.Prelude qualified as S
import Streaming()
{- HLINT ignore "Use newtype instead of data" -}
data DataNotReady = DataNotReady deriving (Show)
instance Exception DataNotReady
type OnDownloadComplete = HashRef -> IO ()
data RefChanValidator =
RefChanValidator
{ rcvInbox :: TQueue (RefChanValidate UNIX, MVar (RefChanValidate UNIX))
, rcvAsync :: Async ()
}
data RefChanNotifier =
RefChanNotifier
{ rcnPeer :: Peer UNIX
, rcnInbox :: TQueue (RefChanNotify UNIX)
, rcnAsync :: Async ()
}
data RefChanWorkerEnv e =
RefChanWorkerEnv
{ _refChanWorkerConf :: PeerConfig
, _refChanPeerEnv :: PeerEnv e
, _refChanNotifySource :: SomeNotifySource (RefChanEvents e)
, _refChanWorkerEnvHeadQ :: TQueue (RefChanId e, RefChanHeadBlockTran e)
, _refChanWorkerEnvDownload :: TVar (HashMap HashRef (RefChanId e, (TimeSpec, OnDownloadComplete)))
, _refChanWorkerEnvNotify :: TVar (HashMap (RefChanId e) ())
, _refChanWorkerEnvWriteQ :: TQueue HashRef
, _refChanWorkerValidators :: TVar (HashMap (RefChanId e) RefChanValidator)
-- FIXME: peer-addr-to-have-multiple-actors-on-single-box
-- нужно ключом держать Peer e (SockAddr)
-- что бы можно было завести несколько акторов на одном
-- боксе в целях отладки.
, _refChanWorkerNotifiers :: TVar (HashMap (RefChanId e) [RefChanNotifier])
, _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip
, _refChanWorkerNotifiersDone :: Cache (Hash HbSync) ()
, _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) ()
, _refChanWorkerProbe :: TVar AnyProbe
}
makeLenses 'RefChanWorkerEnv
refChanWorkerEnvSetProbe :: forall m e . (MonadIO m, ForRefChans e)
=> RefChanWorkerEnv e
-> AnyProbe
-> m ()
refChanWorkerEnvSetProbe RefChanWorkerEnv{..} probe = do
liftIO $ atomically $ writeTVar _refChanWorkerProbe probe
refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
=> PeerConfig
-> PeerEnv e
-> SomeNotifySource (RefChanEvents e)
-> m (RefChanWorkerEnv e)
refChanWorkerEnv conf pe nsource =
liftIO $ RefChanWorkerEnv @e conf pe nsource
<$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> Cache.newCache (Just defRequestLimit)
<*> Cache.newCache (Just defRequestLimit)
<*> newTVarIO (AnyProbe ())
refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) (chan, tran)
refChanWriteTranFn :: MonadIO m => RefChanWorkerEnv e -> HashRef -> m ()
refChanWriteTranFn env href = do
atomically $ writeTQueue (view refChanWorkerEnvWriteQ env) href
refChanValidateTranFn :: forall e m . ( MonadUnliftIO m
, ForRefChans e, e ~ L4Proto
, HasNonces (RefChanValidate UNIX) m
)
=> RefChanWorkerEnv e
-> RefChanId e
-> HashRef
-> m Bool
refChanValidateTranFn env chan htran = do
mbv <- readTVarIO (view refChanWorkerValidators env) <&> HashMap.lookup chan
-- отправить запрос в соответствующий... что?
-- ждать ответа
debug $ "VALIDATE TRAN" <+> pretty (AsBase58 chan) <+> pretty htran
r <- maybe1 mbv (pure True) $ \(RefChanValidator q _) -> do
answ <- newEmptyMVar
nonce <- newNonce @(RefChanValidate UNIX)
atomically $ writeTQueue q (RefChanValidate nonce chan (Validate @UNIX htran), answ)
withMVar answ $ \msg -> case rcvData msg of
Accepted{} -> pure True
_ -> pure False
debug $ "TRANS VALIDATION RESULT: " <+> pretty htran <+> pretty r
pure r
-- FIXME: leak-when-block-never-really-updated
refChanNotifyOnUpdated :: (MonadIO m, ForRefChans e) => RefChanWorkerEnv e -> RefChanId e -> m ()
refChanNotifyOnUpdated env chan = do
atomically $ modifyTVar (_refChanWorkerEnvNotify env) (HashMap.insert chan ())
refChanNotifyRelyFn :: forall e m . ( MonadUnliftIO m
, ForRefChans e, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanId e
-> RefChanNotify e
-> m ()
refChanNotifyRelyFn env chan msg@(Notify _ (SignedBox k box s)) = do
debug "refChanNotifyRelyFn"
-- FIXME: multiple-hash-object-msg-performance
let h0 = hashObject @HbSync (serialise msg)
void $ runMaybeT do
guard =<< liftIO (Cache.lookup (view refChanWorkerNotifiersDone env) h0 <&> isNothing)
liftIO $ Cache.insert (view refChanWorkerNotifiersDone env) h0 ()
-- RefChanNotifier q _ <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier _ q _) -> do
atomically $ writeTQueue q (Notify @UNIX chan (SignedBox k box s))
refChanNotifyRelyFn _ _ _ = pure ()
refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e
)
=> RefChanWorkerEnv e
-> RefChanId e
-> HashRef
-> OnDownloadComplete
-> m ()
refChanAddDownload env chan r onComlete = do
penv <- ask
t <- getTimeCoarse
liftIO $ withPeerM penv $ addDownload @e Nothing (fromHashRef r)
atomically $ modifyTVar (view refChanWorkerEnvDownload env) (HashMap.insert r (chan,(t, onComlete)))
data NotifyEnv =
NotifyEnv
{ _notifyClient :: Fabriq UNIX
, _notifySelf :: Peer UNIX
}
newtype NotifyProtoM m a = NotifyProto { fromNotifyProto :: ReaderT NotifyEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader NotifyEnv
, MonadTrans
)
runNotifyProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> NotifyProtoM m a -> m a
runNotifyProtoM bus m = runReaderT (fromNotifyProto m) (NotifyEnv (Fabriq bus) (msgUnixSelf bus))
instance Monad m => HasFabriq UNIX (NotifyProtoM m) where
getFabriq = asks _notifyClient
instance Monad m => HasOwnPeer UNIX (NotifyProtoM m) where
ownPeer = asks _notifySelf
refChanNotifyRpcProto :: forall e m . ( MonadIO m
, Request e (RefChanNotify e) m
-- , HasPeerNonce UNIX m
, e ~ UNIX
-- , m ~ PeerM e IO
)
=> RefChanWorkerEnv L4Proto
-> RefChanNotify e
-> m ()
refChanNotifyRpcProto env msg@(ActionRequest chan action) = do
let penv = view refChanPeerEnv env
case action of
RefChanAnnounceBlock h -> do
debug $ "RefChanNotify: RefChanAnnounceBlock" <+> pretty h
liftIO $ withPeerM penv $ do
sto <- getStorage
sz' <- liftIO $ hasBlock sto (fromHashRef h)
maybe1 sz' none $ \sz -> do
ann <- simpleBlockAnnounce @L4Proto sz (fromHashRef h)
gossip ann
pure ()
RefChanFetch h -> do
debug $ "RefChanNotify: RefChanFetch" <+> pretty h
liftIO $ withPeerM penv $ do
refChanAddDownload env chan h (const $ pure ())
where
proto = Proxy @(RefChanNotify e)
refChanNotifyRpcProto env msg@(Notify chan (SignedBox pk box si)) = do
debug "GOT MESSAGE FROM CLIENT"
atomically $ writeTQueue (view refChanWorkerNotifiersInbox env) (Notify @L4Proto chan (SignedBox pk box si))
-- тут мы должны переслать всем, кроме отправителя
let h0 = hashObject @HbSync (serialise msg)
-- FIXME: squash-this-copypaste
void $ runMaybeT do
notifiers <- MaybeT $ liftIO (readTVarIO (view refChanWorkerNotifiers env) <&> HashMap.lookup chan)
forM_ notifiers $ \(RefChanNotifier peer q _) -> do
let lkey = (peer, h0)
-- guard =<< liftIO (Cache.lookup (view refChanWorkerLocalRelyDone env) lkey <&> isNothing)
-- liftIO $ Cache.insert (view refChanWorkerLocalRelyDone env) lkey ()
atomically $ writeTQueue q msg
refChanWorkerInitNotifiers :: forall e m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
-- , ForRefChans e
-- , ForRefChans UNIX
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> m ()
refChanWorkerInitNotifiers env = do
penv <- ask
debug "refChanWorkerInitNotifiers"
let (PeerConfig syn) = view refChanWorkerConf env
let notifiers = [ mkV rc x | ListVal [ SymbolVal "notify"
, SymbolVal "refchan"
, LitStrVal rc
, ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ]
] <- syn
] & catMaybes
forM_ notifiers $ \(rc, sa) -> do
q <- newTQueueIO
-- FIXME: restart-notifiers
-- сейчас если один нотификатор упал -- упадут все
-- сделать так, что бы рестартовал только один нотификатор
val <- asyncLinked $ withPeerM penv (notifierThread rc sa q)
let rcn = RefChanNotifier (fromString sa) q val
atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.insertWith (<>) rc [rcn])
where
mkV :: Text -> Text -> Maybe (RefChanId e, String)
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
notifierThread rc sa q = flip runContT (either throwIO (const none)) do
debug $ ">>> Notifier thread started" <+> pretty (AsBase58 rc, sa)
client <- newMessagingUnix False 1.0 sa
msg <- ContT $ withAsync $ runMessagingUnix client
disp <- ContT $ withAsync $ runNotifyProtoM client do
runProto [ makeResponse (refChanNotifyRpcProto env) ]
rely <- ContT $ withAsync $ runNotifyProtoM client do
forever do
req <- atomically $ readTQueue q
debug "Rely notification request"
request @UNIX (fromString sa) req
kill <- ContT $ withAsync $ forever do
pause @'Seconds 30
let RefChanWorkerEnv{..} = env
liftIO $ Cache.purgeExpired _refChanWorkerNotifiersDone
liftIO $ Cache.purgeExpired _refChanWorkerLocalRelyDone
r <- waitAnyCatchCancel [msg, disp, rely, kill]
warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r)
atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.delete rc)
pure (snd r)
data ValidateEnv =
ValidateEnv
{ _validateClient :: Fabriq UNIX
, _validateSelf :: Peer UNIX
}
newtype ValidateProtoM m a = ValidateProto { fromValidateProto :: ReaderT ValidateEnv m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadReader ValidateEnv
, MonadTrans
)
runValidateProtoM :: (MonadIO m, PeerMessaging UNIX) => MessagingUnix -> ValidateProtoM m a -> m a
runValidateProtoM tran m = runReaderT (fromValidateProto m) (ValidateEnv (Fabriq tran) (msgUnixSelf tran))
instance Monad m => HasFabriq UNIX (ValidateProtoM m) where
getFabriq = asks _validateClient
instance Monad m => HasOwnPeer UNIX (ValidateProtoM m) where
ownPeer = asks _validateSelf
refChanValidateProto :: forall e m . ( MonadIO m
, Request e (RefChanValidate e) m
, Response e (RefChanValidate e) m
, e ~ UNIX
)
=> Cache (RefChanValidateNonce e) (MVar (RefChanValidate e))
-> RefChanValidate e
-> m ()
refChanValidateProto waiters msg = do
debug $ "GOT ANSWER FROM VALIDATOR" <+> pretty msg
case rcvData msg of
Accepted h -> emitAnswer h msg
Rejected h -> emitAnswer h msg
_ -> none
where
emitAnswer h m = liftIO do
debug $ "EMIT ANSWER" <+> pretty h
mbAnsw <- Cache.lookup waiters (rcvNonce m)
maybe1 mbAnsw none $ \answ -> do
putMVar answ m
refChanWorkerInitValidators :: forall e m . ( MonadIO m
, MonadUnliftIO m
-- , MyPeer e
-- , ForRefChans e
-- , ForRefChans UNIX
-- , m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> m ()
refChanWorkerInitValidators env = do
debug "refChanWorkerInitValidators"
let (PeerConfig syn) = view refChanWorkerConf env
let validators = [ mkV rc x | ListVal [ SymbolVal "validate"
, SymbolVal "refchan"
, LitStrVal rc
, ListVal [ SymbolVal "socket", SymbolVal "unix", LitStrVal x ]
] <- syn
] & catMaybes
forM_ validators $ \(rc, sa) -> do
debug $ "** VALIDATOR FOR" <+> pretty (AsBase58 rc, sa)
here <- readTVarIO (_refChanWorkerValidators env) <&> HashMap.member rc
unless here do
q <- newTQueueIO
val <- asyncLinked $ validatorThread rc sa q
let rcv = RefChanValidator q val
atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv)
where
mkV :: Text -> Text -> Maybe (RefChanId e, String)
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
-- FIXME: make-thread-respawning
validatorThread chan sa q = liftIO $ flip runContT (either throwIO (const none)) do
client <- newMessagingUnix False 1.0 sa
msg <- ContT $ withAsync $ runMessagingUnix client
-- FIXME: hardcoded-timeout
waiters <- liftIO $ Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
disp <- ContT $ withAsync $ runValidateProtoM client do
runProto [ makeResponse (refChanValidateProto waiters)
]
poke <- ContT $ withAsync $ runValidateProtoM client do
pause @'Seconds 10
mv <- newEmptyMVar
nonce <- newNonce @(RefChanValidate UNIX)
atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv)
rely <- ContT $ withAsync $ runValidateProtoM client do
(req, answ) <- atomically $ readTQueue q
case rcvData req of
Validate href -> do
debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa
liftIO $ Cache.insert waiters (rcvNonce req) answ
let pa = fromString sa
request pa req
Poke{} -> do
debug "DO SEND POKE"
let pa = fromString sa
request pa req
_ -> pure ()
r <- waitAnyCatchCancel [msg, disp, poke, rely]
atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.delete chan)
warn $ "*** RefChan validate thread:" <+> pretty sa <+> viaShow (snd r)
pure (snd r)
runRefChanRelyWorker :: forall e m .
( MonadIO m
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> RefChanAdapter e (ResponseM e m)
-> IO ()
runRefChanRelyWorker env adapter = liftIO $ forever do
withPeerM (view refChanPeerEnv env) do
me <- ownPeer @e
-- FIXME: use-bounded-queue-ASAP
mess <- atomically $ readTQueue (view refChanWorkerNotifiersInbox env)
runResponseM me $ do
refChanNotifyProto True adapter mess
{- HLINT ignore "Functor law" -}
refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
, HasStorage m
, Request e (RefChanHead e) m
, Request e (RefChanRequest e) m
, Sessions e (KnownPeer e) m
, Signatures s
, s ~ Encryption e
, IsRefPubKey s
-- , Pretty (AsBase58 (PubKey 'Sign s))
, ForRefChans e
, EventListener e (RefChanRound e) m
, EventListener e (RefChanRequest e) m
, Sessions e (RefChanRound e) m
, m ~ PeerM e IO
, e ~ L4Proto
)
=> RefChanWorkerEnv e
-> SomeBrains e
-> m ()
refChanWorker env@RefChanWorkerEnv{..} brains = do
penv <- ask
mergeQ <- newTQueueIO
-- FIXME: resume-on-exception
-- FIXME: insist-more-during-download
-- что-то частая ситуация, когда блоки
-- с трудом докачиваются. надо бы
-- разобраться. возможно переделать
-- механизм скачивания блоков
--
-- всё это нужно вместе. соответственно,
-- упало одно - отменяем всё и простреливаем
-- наверх.
-- соответственно - bracket на каждый поток
flip runContT (either throwIO (const none) .snd) do
hw <- ContT $ withAsync (refChanHeadMon penv)
downloads <- ContT $ withAsync (monitorHeadDownloads penv)
polls <- ContT $ withAsync (refChanPoll penv)
wtrans <- ContT $ withAsync (liftIO $ withPeerM penv $ refChanWriter)
cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv))
merge <- ContT $ withAsync (liftIO $ logMergeProcess penv env mergeQ)
sto <- lift getStorage
liftIO $ refChanWorkerInitValidators env
lift $ refChanWorkerInitNotifiers env
liftIO $ withPeerM penv do
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
debug $ "RefChanRequestEvent" <+> pretty (AsBase58 chan) <+> pretty val
h <- liftIO $ getRef sto (RefChanLogKey @s chan)
-- игнорируем, если синхронно
unless ((HashRef <$> h) == Just val) do
refChanAddDownload env chan val $ \href -> do
debug $ "BLOCK DOWNLOADED" <+> pretty href
atomically $ writeTQueue mergeQ (chan, href)
atomically $ writeTQueue mergeQ (chan, val)
bullshit <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _refChanWorkerProbe
values <- atomically do
refChanWorkerEnvDownloadSize <- readTVar _refChanWorkerEnvDownload <&> HashMap.size
refChanWorkerNotifiersSize <- readTVar _refChanWorkerNotifiers <&> HashMap.size
pure [ ("refChanWorkerEnvDownloadSize", fromIntegral refChanWorkerEnvDownloadSize)
, ("refChanWorkerNotifiersSize", fromIntegral refChanWorkerNotifiersSize)
]
acceptReport probe values
debug "I'm refchan worker"
waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit]
where
cleanupRounds penv = withPeerM penv do
rounds <- newTVarIO HashSet.empty
subscribe @e RefChanRoundEventKey $ \(RefChanRoundEvent rcrk) -> do
atomically $ modifyTVar rounds (HashSet.insert rcrk)
debug $ "ON ROUND STARTED" <+> pretty rcrk
forever do
-- FIXME: use-polling-function-and-respect-wait
pause @'Seconds 10
now <- getTimeCoarse
xs <- readTVarIO rounds <&> HashSet.toList
forM_ xs $ \x -> do
void $ runMaybeT do
se <- MaybeT $ find @e x id
closed <- readTVarIO (view refChanRoundClosed se)
trans <- atomically $ readTVar (view refChanRoundTrans se) <&> HashSet.toList
let ttl = view refChanRoundTTL se
when (closed || ttl <= now) do
lift $ expire x
when closed do
forM_ trans $ \t -> do
debug $ "WRITING TRANS" <+> pretty t
lift $ refChanWriteTranFn env t
atomically $ modifyTVar rounds (HashSet.delete x)
debug $ "CLEANUP ROUND" <+> pretty x
refChanWriter = do
sto <- getStorage
forever do
pause @'Seconds 1
_ <- atomically $ peekTQueue (view refChanWorkerEnvWriteQ env)
htrans <- liftIO $ atomically $ flushTQueue (view refChanWorkerEnvWriteQ env)
trans <- forM htrans $ \h -> runMaybeT do
blk <- MaybeT $ liftIO (getBlock sto (fromHashRef h))
upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just
case upd of
Propose chan box -> do
lift $ tryDownloadContent env chan box
pure (RefChanLogKey @(Encryption e) chan, h)
Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h)
let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ]
-- FIXME: thread-num-hardcode-to-remove
pooledForConcurrentlyN_ 4 (HashMap.toList byChan) $ \(c,new) -> do
mbLog <- liftIO $ getRef sto c
hashes <- maybe1 mbLog (pure mempty) $ readLog (getBlock sto) . HashRef
-- FIXME: might-be-problems-on-large-logs
let hashesNew = HashSet.fromList (hashes <> new) & HashSet.toList
-- FIXME: remove-chunk-num-hardcode
-- $class: hardcode
let pt = toPTree (MaxSize 256) (MaxNum 256) hashesNew
nref <- makeMerkle 0 pt $ \(_,_,bss) -> void $ liftIO $ putBlock sto bss
-- TODO: ASAP-notify-on-refchan-update
-- $workflow: wip
updateRef sto c nref
notifyOnRefChanUpdated env c nref
refChanPoll penv = withPeerM penv do
let listRefs = listPolledRefs @e brains (Just "refchan")
<&> fmap (\(a,_,b) -> (a,b))
<&> fmap (over _2 ( (*60) . fromIntegral) )
polling (Polling 5 5) listRefs $ \ref -> do
debug $ "POLLING REFCHAN" <+> pretty (AsBase58 ref)
broadCastMessage (RefChanGetHead @e ref)
broadCastMessage (RefChanRequest @e ref)
monitorHeadDownloads penv = withPeerM penv $ forever do
pause @'Seconds 1
all <- atomically $ readTVar (view refChanWorkerEnvDownload env) <&> HashMap.toList
now <- getTimeCoarse
-- FIXME: change-to-polling-functions
-- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \(r,item@(chan,(t,onComplete))) -> do
here <- checkDownloaded r
if here then do
liftIO $ onComplete r
-- refChanOnHeadFn env chan (RefChanHeadBlockTran r)
pure mempty
else do
-- FIXME: fix-timeout-hardcode
let expired = realToFrac (toNanoSecs $ now - t) / 1e9 > 600
if expired then pure mempty else pure [(r,item)]
atomically $ writeTVar (view refChanWorkerEnvDownload env) (HashMap.fromList (mconcat rest))
-- FIXME: in-parallel?
refChanHeadMon pe = liftIO $ withPeerM pe do
forever do
(chan, RefChanHeadBlockTran hr) <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
here <- checkDownloaded hr
if not here then do
refChanAddDownload env chan hr (withPeerM pe . refChanOnHeadFn env chan . RefChanHeadBlockTran)
trace $ "BLOCK IS NOT HERE" <+> pretty hr
else do
sto <- getStorage
trace $ "BLOCK IS HERE" <+> pretty hr
-- читаем блок
lbs <- readBlobFromTree (getBlock sto) hr <&> fromMaybe mempty
let what = unboxSignedBox @(RefChanHeadBlock e) @s lbs
notify <- atomically $ do
no <- readTVar _refChanWorkerEnvNotify <&> HashMap.member chan
modifyTVar _refChanWorkerEnvNotify (HashMap.delete chan)
pure no
case what of
Nothing -> err $ "malformed head block" <+> pretty hr
Just (pk,blk) | pk == chan -> do
let rkey = RefChanHeadKey @s pk
debug $ "Good head block" <+> pretty hr <+> "processing..."
ourVersion <- runMaybeT do
cur <- MaybeT $ liftIO $ getRef sto rkey
lbss <- MaybeT $ readBlobFromTree (getBlock sto) (HashRef cur)
(_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @s lbss
pure $ view refChanHeadVersion blkOur
let v0 = fromMaybe 0 ourVersion
let v1 = view refChanHeadVersion blk
if v1 > v0 then do
debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0)
liftIO $ updateRef sto rkey (fromHashRef hr)
-- если это мы сами его обновили - то неплохо бы
-- всем разослать уведомление. А как?
--
-- TODO: update-acl-here
forM_ (HashMap.keys $ view refChanHeadPeers blk) $ \pip -> do
debug $ "ADD PEER ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 pip)
forM_ (view refChanHeadAuthors blk) $ \au -> do
debug $ "ADD AUTHOR ACL" <+> pretty (AsBase58 chan) <+> pretty(AsBase58 au)
when notify do
debug $ "NOTIFY-ALL-HEAD-UPDATED" <+> pretty (AsBase58 pk) <+> pretty hr
broadCastMessage (RefChanHead @e pk (RefChanHeadBlockTran hr))
else do
debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)
_ -> debug "not subscribed to this refchan"
pure ()
-- распаковываем блок
-- вытаскиваем ключ из блока?
pure ()
tryDownloadContent env chan box = runMaybeT do
(_, ProposeTran _ pbox) <- MaybeT $ pure $ unboxSignedBox0 box
(_, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox
let what = tryDetect (hashObject bss) (LBS.fromStrict bss)
down <- case what of
AnnRef (AnnotatedHashRef a b ) -> do
pure $ b : maybeToList a
SeqRef (SequentialRef _ (AnnotatedHashRef a b))-> do
pure $ b : maybeToList a
_ -> pure mempty
for_ (List.nub down) $ \blk -> do
lift $ refChanAddDownload env chan blk dontHandle
data MergeEnv e =
MergeEnv
{ mergeSto :: AnyStorage
, mergeHeads :: TVar (HashMap HashRef (RefChanHeadBlock e) )
}
-- FIXME: possible-performance-issues
-- Выглядит довольно медленно. Вероятно,
-- можно быстрее.
-- В частности, кэшировать уже обработанные логи
logMergeProcess :: forall e s m . ( MonadUnliftIO m
, MyPeer e
, ForRefChans e
, HasStorage m
, Signatures s
, Pretty (AsBase58 (PubKey 'Sign s))
, s ~ Encryption e
, m ~ PeerM e IO
)
=> PeerEnv e
-> RefChanWorkerEnv e
-> TQueue (RefChanId e, HashRef)
-> IO ()
logMergeProcess penv env q = withPeerM penv do
sto <- getStorage
menv <- MergeEnv sto <$> newTVarIO mempty
forever do
-- FIXME: fix-hardcoded-timeout
pause @'Seconds 1
_ <- atomically $ peekTQueue q
logs <- liftIO $ atomically $ flushTQueue q
let byChan = HashMap.fromListWith (<>) [ (x,[y]) | (x,y) <- logs ]
& HashMap.toList
& fmap (over _2 List.nub)
-- FIXME: in-parallel
mapM_ (logMergeChan menv sto) byChan
where
getHead :: MergeEnv e
-> HashRef
-> m (Maybe (RefChanHeadBlock e))
getHead e h = do
let sto = mergeSto e
hd <- readTVarIO (mergeHeads e) <&> HashMap.lookup h
here <- liftIO $ hasBlock sto (fromHashRef h) <&> isJust
unless here do
warn $ "refchan. head is missed:" <+> pretty h
pure ()
case hd of
Just x -> pure (Just x)
Nothing -> runMaybeT do
hdblob <- MaybeT $ readBlobFromTree ( liftIO . getBlock sto ) h
(_, headblk) <- MaybeT $ pure $ unboxSignedBox @_ @s hdblob
atomically $ modifyTVar (mergeHeads e) (HashMap.insert h headblk)
pure headblk
downloadMissedHead :: EventEmitter e (DownloadReq e) m
=> AnyStorage
-> RefChanId e
-> HashRef
-> m ()
downloadMissedHead sto chan headRef = do
penv <- ask
here <- liftIO $ hasBlock sto (fromHashRef headRef) <&> isJust
unless here do
refChanAddDownload env chan headRef (withPeerM penv . refChanOnHeadFn env chan . RefChanHeadBlockTran)
logMergeChan menv sto (chan, logs) = do
penv <- ask
let readFn = getBlock sto
void $ runMaybeT do
let chanKey = RefChanLogKey @s chan
-- FIXME: wont-work-if-no-reference-yet
-- не сработает если ссылка новая
(mergeSet, merge) <- liftIO (getRef sto chanKey) >>= \case
Nothing -> do
new <- mconcat <$> mapM (lift . readLog readFn) logs
pure (HashSet.fromList new, not (List.null new))
Just h -> do
current <- lift $ readLog readFn (HashRef h) <&> HashSet.fromList
new <- mconcat <$> mapM (lift . readLog readFn) (filter (/= HashRef h) logs)
let mergeSet = HashSet.fromList new <> current
pure (mergeSet, not (List.null new))
guard merge
-- итак, тут приехал весь лог, который есть у пира
-- логично искать подтверждения только в нём. если
-- пир принял транзы без достаточного количества
-- подтверждений, то он сам лошара.
-- каждую транзу рассматриваем один раз, если
-- она смержена.
-- если она не смержена --- может быть, надо её
-- в какой-то reject список заносить
-- распаковать, отсортировать по головам сначала
-- потом бежим по головам, достаём головы
-- проверяем acl-ы на соответствие историческим головам
-- потом связываем каждый accept с соответствующим propose
-- потом считаем количество accept для каждого propose
-- потом, если всё ок -- пишем accept-ы и propose-ы у которых
-- больше quorum подтверждений для актуальной головы
let mergeList = HashSet.toList mergeSet
downQ <- newTQueueIO
-- если какие-то транзакции отсутствуют - пытаемся их скачать
-- и надеемся на лучшее (лог сойдется в следующий раз)
forM_ mergeList $ \href -> do
mblk <- liftIO $ getBlock sto (fromHashRef href)
maybe1 mblk (lift $ refChanAddDownload env chan href dontHandle) dontHandle
r <- forM mergeList $ \href -> runMaybeT do
blk <- MaybeT $ liftIO $ getBlock sto (fromHashRef href)
tran <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk
& either (const Nothing) Just
case tran of
Propose _ box -> do
(pk, ProposeTran headRef pbox) <- MaybeT $ pure $ unboxSignedBox0 box
liftIO $ withPeerM penv $ tryDownloadContent env chan box
(ak, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox
lift $ lift $ downloadMissedHead sto chan headRef
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ checkACL ACLUpdate hd (Just pk) ak
pure [(href, (quo,mempty))]
Accept _ box -> do
(pk, AcceptTran _ headRef hashRef) <- MaybeT $ pure $ unboxSignedBox0 box
lift $ lift $ downloadMissedHead sto chan headRef
hd <- MaybeT $ lift $ getHead menv headRef
let quo = view refChanHeadQuorum hd & fromIntegral
guard $ HashMap.member pk (view refChanHeadPeers hd)
pure [(hashRef, (quo,[href]))]
let merge1 (q1, hs1) (q2, hs2) = (max q1 q2, List.nub (hs1 <> hs2) )
let permitted = HashMap.fromListWith merge1 (mconcat (catMaybes r))
& HashMap.toList
new <- S.toList_ do
forM_ permitted $ \(prop, (qx, accs)) -> do
when (length accs >= qx) do
S.yield prop
S.each accs
debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new)
let merged = HashSet.fromList new
deps <- atomically $ flushTQueue downQ
for_ deps $ \(parent, AnnotatedHashRef a r) -> do
when (parent `HashSet.member` merged) do
debug $ "#@#@#@ !!! RefChan.download deps" <+> pretty a <+> pretty r
lift $ refChanAddDownload env chan r dontHandle
maybe1 a none $ \ann -> do
lift $ refChanAddDownload env chan ann dontHandle
unless (HashSet.null merged) do
-- FIXME: sub-optimal-partition
-- убрать этот хардкод размеров
-- он приводит к излишне мелким блокам
let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList merged)
liftIO do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
updateRef sto chanKey nref
notifyOnRefChanUpdated env chanKey nref
notifyOnRefChanUpdated :: forall e s m . ( ForRefChans e
, s ~ Encryption e
, MonadUnliftIO m
)
=> RefChanWorkerEnv e
-> RefChanLogKey s
-> Hash HbSync
-> m ()
notifyOnRefChanUpdated RefChanWorkerEnv{..} c nref = do
emitNotify _refChanNotifySource notification
debug $ "REFCHAN UPDATED:" <+> pretty c <+> pretty nref
where
notification =
(RefChanNotifyKey (coerce c), RefChanUpdated (coerce c) (HashRef nref))