refactor refchans, download dependencies

This commit is contained in:
Dmitry Zuikov 2023-11-15 04:51:17 +03:00
parent 63caa3b5b7
commit 6ed1605841
1 changed files with 99 additions and 57 deletions

View File

@ -58,6 +58,7 @@ import Data.HashSet qualified as HashSet
import Data.Heap () import Data.Heap ()
import Data.List qualified as List import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.Text qualified as Text import Data.Text qualified as Text
import Lens.Micro.Platform import Lens.Micro.Platform
import UnliftIO import UnliftIO
@ -295,7 +296,7 @@ refChanWorkerInitNotifiers :: forall e m . ( MonadIO m
, MyPeer e , MyPeer e
-- , ForRefChans e -- , ForRefChans e
-- , ForRefChans UNIX -- , ForRefChans UNIX
-- , m ~ PeerM e IO , m ~ PeerM e IO
, e ~ L4Proto , e ~ L4Proto
) )
=> RefChanWorkerEnv e => RefChanWorkerEnv e
@ -303,6 +304,9 @@ refChanWorkerInitNotifiers :: forall e m . ( MonadIO m
refChanWorkerInitNotifiers env = do refChanWorkerInitNotifiers env = do
penv <- ask
debug "refChanWorkerInitNotifiers" debug "refChanWorkerInitNotifiers"
let (PeerConfig syn) = view refChanWorkerConf env let (PeerConfig syn) = view refChanWorkerConf env
@ -315,10 +319,12 @@ refChanWorkerInitNotifiers env = do
] & catMaybes ] & catMaybes
forM_ notifiers $ \(rc, sa) -> do forM_ notifiers $ \(rc, sa) -> do
debug $ "** NOTIFIER FOR" <+> pretty (AsBase58 rc, sa)
q <- newTQueueIO q <- newTQueueIO
val <- async $ liftIO $ notifierThread rc sa q
-- FIXME: restart-notifiers
-- сейчас если один нотификатор упал -- упадут все
-- сделать так, что бы рестартовал только один нотификатор
val <- asyncLinked $ withPeerM penv (notifierThread rc sa q)
let rcn = RefChanNotifier (fromString sa) q val let rcn = RefChanNotifier (fromString sa) q val
@ -329,24 +335,30 @@ refChanWorkerInitNotifiers env = do
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
notifierThread _ sa q = do notifierThread rc sa q = flip runContT (either throwIO (const none)) do
debug $ ">>> NOTIFIER THREAD FOR" <+> pretty sa debug $ ">>> Notifier thread started" <+> pretty (AsBase58 rc, sa)
client <- newMessagingUnix False 1.0 sa client <- newMessagingUnix False 1.0 sa
msg <- async $ runMessagingUnix client
runNotifyProtoM client do msg <- ContT $ withAsync $ runMessagingUnix client
proto <- async $ runProto [ makeResponse (refChanNotifyRpcProto env) ]
disp <- ContT $ withAsync $ runNotifyProtoM client do
runProto [ makeResponse (refChanNotifyRpcProto env) ]
rely <- ContT $ withAsync $ runNotifyProtoM client do
forever do forever do
req <- atomically $ readTQueue q req <- atomically $ readTQueue q
debug "Rely notification request" debug "Rely notification request"
request @UNIX (fromString sa) req request @UNIX (fromString sa) req
wait proto r <- waitAnyCatchCancel [msg, disp, rely]
mapM_ wait [msg] warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r)
atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.delete rc)
pure (snd r)
data ValidateEnv = data ValidateEnv =
@ -432,7 +444,7 @@ refChanWorkerInitValidators env = do
unless here do unless here do
q <- newTQueueIO q <- newTQueueIO
val <- async $ validatorThread rc sa q val <- asyncLinked $ validatorThread rc sa q
let rcv = RefChanValidator q val let rcv = RefChanValidator q val
atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv)
@ -442,29 +454,24 @@ refChanWorkerInitValidators env = do
mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc)
-- FIXME: make-thread-respawning -- FIXME: make-thread-respawning
validatorThread chan sa q = liftIO do validatorThread chan sa q = liftIO $ flip runContT (either throwIO (const none)) do
client <- newMessagingUnix False 1.0 sa client <- newMessagingUnix False 1.0 sa
msg <- async $ runMessagingUnix client msg <- ContT $ withAsync $ runMessagingUnix client
-- FIXME: hardcoded-timeout -- FIXME: hardcoded-timeout
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) waiters <- liftIO $ Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds)))
-- FIXME: hardcoded-timeout disp <- ContT $ withAsync $ runValidateProtoM client do
waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) runProto [ makeResponse (refChanValidateProto waiters)
]
runValidateProtoM client do poke <- ContT $ withAsync $ runValidateProtoM client do
poke <- async $ forever do
pause @'Seconds 10 pause @'Seconds 10
mv <- newEmptyMVar mv <- newEmptyMVar
nonce <- newNonce @(RefChanValidate UNIX) nonce <- newNonce @(RefChanValidate UNIX)
atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv)
z <- async $ runProto rely <- ContT $ withAsync $ runValidateProtoM client do
[ makeResponse (refChanValidateProto waiters)
]
forever do
(req, answ) <- atomically $ readTQueue q (req, answ) <- atomically $ readTQueue q
case rcvData req of case rcvData req of
Validate href -> do Validate href -> do
@ -478,21 +485,16 @@ refChanWorkerInitValidators env = do
let pa = fromString sa let pa = fromString sa
request pa req request pa req
Poke{} -> do
debug "DO SEND POKE"
let pa = fromString sa
pure ()
--
-- request pa req
_ -> pure () _ -> pure ()
r <- waitAnyCatchCancel [msg, disp, poke, rely]
(_, r) <- waitAnyCatch [z,poke] atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.delete chan)
debug $ "SOMETHING WRONG:" <+> viaShow r
warn $ "*** RefChan validate thread:" <+> pretty sa <+> viaShow (snd r)
pure (snd r)
cancel msg
warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan)
runRefChanRelyWorker :: forall e m . runRefChanRelyWorker :: forall e m .
( MonadIO m ( MonadIO m
@ -561,7 +563,7 @@ refChanWorker env brains = do
polls <- ContT $ withAsync (refChanPoll penv) polls <- ContT $ withAsync (refChanPoll penv)
wtrans <- ContT $ withAsync (refChanWriter penv) wtrans <- ContT $ withAsync (liftIO $ withPeerM penv $ refChanWriter penv)
cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv)) cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv))
@ -571,7 +573,7 @@ refChanWorker env brains = do
liftIO $ refChanWorkerInitValidators env liftIO $ refChanWorkerInitValidators env
liftIO $ refChanWorkerInitNotifiers env lift $ refChanWorkerInitNotifiers env
liftIO $ withPeerM penv do liftIO $ withPeerM penv do
subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do
@ -626,13 +628,17 @@ refChanWorker env brains = do
when closed do when closed do
forM_ trans $ \t -> do forM_ trans $ \t -> do
debug $ "WRITING TRANS" <+> pretty t debug $ "WRITING TRANS" <+> pretty t
lift $ refChanWriteTranFn env t lift $ refChanWriteTranFn env t
atomically $ modifyTVar rounds (HashSet.delete x) atomically $ modifyTVar rounds (HashSet.delete x)
debug $ "CLEANUP ROUND" <+> pretty x debug $ "CLEANUP ROUND" <+> pretty x
refChanWriter penv = withPeerM penv do
refChanWriter penv = do
sto <- getStorage sto <- getStorage
forever do forever do
pause @'Seconds 1 pause @'Seconds 1
@ -646,7 +652,10 @@ refChanWorker env brains = do
upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just
case upd of case upd of
Propose chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) Propose chan box -> do
lift $ tryDownloadContent env chan box
pure (RefChanLogKey @(Encryption e) chan, h)
Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h)
let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ]
@ -773,6 +782,26 @@ refChanWorker env brains = do
pure () pure ()
tryDownloadContent env chan box = runMaybeT do
(_, ProposeTran _ pbox) <- MaybeT $ pure $ unboxSignedBox0 box
(_, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox
let what = tryDetect (hashObject bss) (LBS.fromStrict bss)
down <- case what of
AnnRef (AnnotatedHashRef a b ) -> do
pure $ b : maybeToList a
SeqRef (SequentialRef _ (AnnotatedHashRef a b))-> do
pure $ b : maybeToList a
_ -> pure mempty
for_ (List.nub down) $ \blk -> do
lift $ refChanAddDownload env chan blk dontHandle
data MergeEnv e = data MergeEnv e =
MergeEnv MergeEnv
{ mergeSto :: AnyStorage { mergeSto :: AnyStorage
@ -892,6 +921,8 @@ logMergeProcess penv env q = withPeerM penv do
let mergeList = HashSet.toList mergeSet let mergeList = HashSet.toList mergeSet
downQ <- newTQueueIO
-- если какие-то транзакции отсутствуют - пытаемся их скачать -- если какие-то транзакции отсутствуют - пытаемся их скачать
-- и надеемся на лучшее (лог сойдется в следующий раз) -- и надеемся на лучшее (лог сойдется в следующий раз)
forM_ mergeList $ \href -> do forM_ mergeList $ \href -> do
@ -907,8 +938,11 @@ logMergeProcess penv env q = withPeerM penv do
case tran of case tran of
Propose _ box -> do Propose _ box -> do
(pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box (pk, ProposeTran headRef pbox) <- MaybeT $ pure $ unboxSignedBox0 box
(ak, _) <- MaybeT $ pure $ unboxSignedBox0 box
liftIO $ withPeerM penv $ tryDownloadContent env chan box
(ak, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox
lift $ lift $ downloadMissedHead sto chan headRef lift $ lift $ downloadMissedHead sto chan headRef
@ -941,14 +975,22 @@ logMergeProcess penv env q = withPeerM penv do
debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new)
forM_ new $ \tnew -> do let merged = HashSet.fromList new
debug $ "TRANS TO MERGE" <+> pretty tnew
let merged = HashSet.fromList new & HashSet.toList deps <- atomically $ flushTQueue downQ
let pt = toPTree (MaxSize 256) (MaxNum 256) merged for_ deps $ \(parent, AnnotatedHashRef a r) -> do
when (parent `HashSet.member` merged) do
debug $ "#@#@#@ !!! RefChan.download deps" <+> pretty a <+> pretty r
lift $ refChanAddDownload env chan r dontHandle
maybe1 a none $ \ann -> do
lift $ refChanAddDownload env chan ann dontHandle
unless (HashSet.null merged) do
let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList merged)
unless (List.null merged) do
liftIO do liftIO do
nref <- makeMerkle 0 pt $ \(_,_,bss) -> do nref <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss void $ putBlock sto bss