diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index c4864057..a8e43aeb 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -58,6 +58,7 @@ import Data.HashSet qualified as HashSet import Data.Heap () import Data.List qualified as List import Data.Maybe +import Data.ByteString.Lazy qualified as LBS import Data.Text qualified as Text import Lens.Micro.Platform import UnliftIO @@ -295,7 +296,7 @@ refChanWorkerInitNotifiers :: forall e m . ( MonadIO m , MyPeer e -- , ForRefChans e -- , ForRefChans UNIX - -- , m ~ PeerM e IO + , m ~ PeerM e IO , e ~ L4Proto ) => RefChanWorkerEnv e @@ -303,6 +304,9 @@ refChanWorkerInitNotifiers :: forall e m . ( MonadIO m refChanWorkerInitNotifiers env = do + + penv <- ask + debug "refChanWorkerInitNotifiers" let (PeerConfig syn) = view refChanWorkerConf env @@ -315,10 +319,12 @@ refChanWorkerInitNotifiers env = do ] & catMaybes forM_ notifiers $ \(rc, sa) -> do - debug $ "** NOTIFIER FOR" <+> pretty (AsBase58 rc, sa) - q <- newTQueueIO - val <- async $ liftIO $ notifierThread rc sa q + + -- FIXME: restart-notifiers + -- сейчас если один нотификатор упал -- упадут все + -- сделать так, что бы рестартовал только один нотификатор + val <- asyncLinked $ withPeerM penv (notifierThread rc sa q) let rcn = RefChanNotifier (fromString sa) q val @@ -329,24 +335,30 @@ refChanWorkerInitNotifiers env = do mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) - notifierThread _ sa q = do + notifierThread rc sa q = flip runContT (either throwIO (const none)) do - debug $ ">>> NOTIFIER THREAD FOR" <+> pretty sa + debug $ ">>> Notifier thread started" <+> pretty (AsBase58 rc, sa) client <- newMessagingUnix False 1.0 sa - msg <- async $ runMessagingUnix client - runNotifyProtoM client do - proto <- async $ runProto [ makeResponse (refChanNotifyRpcProto env) ] + msg <- ContT $ withAsync $ runMessagingUnix client + disp <- ContT $ withAsync $ runNotifyProtoM client do + runProto [ makeResponse (refChanNotifyRpcProto env) ] + + rely <- ContT $ withAsync $ runNotifyProtoM client do forever do req <- atomically $ readTQueue q debug "Rely notification request" request @UNIX (fromString sa) req - wait proto + r <- waitAnyCatchCancel [msg, disp, rely] - mapM_ wait [msg] + warn $ ">>> Notifier thread for" <+> pretty sa <+> "terminated" <+> viaShow (snd r) + + atomically $ modifyTVar (_refChanWorkerNotifiers env) (HashMap.delete rc) + + pure (snd r) data ValidateEnv = @@ -432,7 +444,7 @@ refChanWorkerInitValidators env = do unless here do q <- newTQueueIO - val <- async $ validatorThread rc sa q + val <- asyncLinked $ validatorThread rc sa q let rcv = RefChanValidator q val atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.insert rc rcv) @@ -442,57 +454,47 @@ refChanWorkerInitValidators env = do mkV rc x = (,Text.unpack x) <$> fromStringMay @(RefChanId e) (Text.unpack rc) -- FIXME: make-thread-respawning - validatorThread chan sa q = liftIO do + validatorThread chan sa q = liftIO $ flip runContT (either throwIO (const none)) do client <- newMessagingUnix False 1.0 sa - msg <- async $ runMessagingUnix client + msg <- ContT $ withAsync $ runMessagingUnix client -- FIXME: hardcoded-timeout - waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) + waiters <- liftIO $ Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) - -- FIXME: hardcoded-timeout - waiters <- Cache.newCache (Just (toTimeSpec (10 :: Timeout 'Seconds))) + disp <- ContT $ withAsync $ runValidateProtoM client do + runProto [ makeResponse (refChanValidateProto waiters) + ] - runValidateProtoM client do - - poke <- async $ forever do + poke <- ContT $ withAsync $ runValidateProtoM client do pause @'Seconds 10 mv <- newEmptyMVar nonce <- newNonce @(RefChanValidate UNIX) atomically $ writeTQueue q (RefChanValidate @UNIX nonce chan Poke, mv) - z <- async $ runProto - [ makeResponse (refChanValidateProto waiters) - ] + rely <- ContT $ withAsync $ runValidateProtoM client do + (req, answ) <- atomically $ readTQueue q + case rcvData req of + Validate href -> do + debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa + liftIO $ Cache.insert waiters (rcvNonce req) answ + let pa = fromString sa + request pa req - forever do - (req, answ) <- atomically $ readTQueue q - case rcvData req of - Validate href -> do - debug $ "DO REQUEST VALIDATE" <+> pretty href <+> pretty sa - liftIO $ Cache.insert waiters (rcvNonce req) answ - let pa = fromString sa - request pa req + Poke{} -> do + debug "DO SEND POKE" + let pa = fromString sa + request pa req - Poke{} -> do - debug "DO SEND POKE" - let pa = fromString sa - request pa req + _ -> pure () - Poke{} -> do - debug "DO SEND POKE" - let pa = fromString sa - pure () - -- - -- request pa req + r <- waitAnyCatchCancel [msg, disp, poke, rely] - _ -> pure () + atomically $ modifyTVar (_refChanWorkerValidators env) (HashMap.delete chan) + warn $ "*** RefChan validate thread:" <+> pretty sa <+> viaShow (snd r) - (_, r) <- waitAnyCatch [z,poke] - debug $ "SOMETHING WRONG:" <+> viaShow r + pure (snd r) - cancel msg - warn $ "validatorThread is terminated for some reasons" <+> pretty (AsBase58 chan) runRefChanRelyWorker :: forall e m . ( MonadIO m @@ -561,7 +563,7 @@ refChanWorker env brains = do polls <- ContT $ withAsync (refChanPoll penv) - wtrans <- ContT $ withAsync (refChanWriter penv) + wtrans <- ContT $ withAsync (liftIO $ withPeerM penv $ refChanWriter penv) cleanup1 <- ContT $ withAsync (liftIO (cleanupRounds penv)) @@ -571,7 +573,7 @@ refChanWorker env brains = do liftIO $ refChanWorkerInitValidators env - liftIO $ refChanWorkerInitNotifiers env + lift $ refChanWorkerInitNotifiers env liftIO $ withPeerM penv do subscribe @e RefChanRequestEventKey $ \(RefChanRequestEvent chan val) -> do @@ -626,13 +628,17 @@ refChanWorker env brains = do when closed do forM_ trans $ \t -> do + debug $ "WRITING TRANS" <+> pretty t + lift $ refChanWriteTranFn env t atomically $ modifyTVar rounds (HashSet.delete x) debug $ "CLEANUP ROUND" <+> pretty x - refChanWriter penv = withPeerM penv do + + + refChanWriter penv = do sto <- getStorage forever do pause @'Seconds 1 @@ -646,7 +652,10 @@ refChanWorker env brains = do upd <- MaybeT $ pure $ deserialiseOrFail @(RefChanUpdate e) blk & either (const Nothing) Just case upd of - Propose chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) + Propose chan box -> do + lift $ tryDownloadContent env chan box + pure (RefChanLogKey @(Encryption e) chan, h) + Accept chan _ -> pure (RefChanLogKey @(Encryption e) chan, h) let byChan = HashMap.fromListWith (<>) [ (x, [y]) | (x,y) <- catMaybes trans ] @@ -773,6 +782,26 @@ refChanWorker env brains = do pure () + +tryDownloadContent env chan box = runMaybeT do + (_, ProposeTran _ pbox) <- MaybeT $ pure $ unboxSignedBox0 box + (_, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox + + let what = tryDetect (hashObject bss) (LBS.fromStrict bss) + + down <- case what of + AnnRef (AnnotatedHashRef a b ) -> do + pure $ b : maybeToList a + + SeqRef (SequentialRef _ (AnnotatedHashRef a b))-> do + pure $ b : maybeToList a + + _ -> pure mempty + + for_ (List.nub down) $ \blk -> do + lift $ refChanAddDownload env chan blk dontHandle + + data MergeEnv e = MergeEnv { mergeSto :: AnyStorage @@ -892,6 +921,8 @@ logMergeProcess penv env q = withPeerM penv do let mergeList = HashSet.toList mergeSet + downQ <- newTQueueIO + -- если какие-то транзакции отсутствуют - пытаемся их скачать -- и надеемся на лучшее (лог сойдется в следующий раз) forM_ mergeList $ \href -> do @@ -907,8 +938,11 @@ logMergeProcess penv env q = withPeerM penv do case tran of Propose _ box -> do - (pk, ProposeTran headRef box) <- MaybeT $ pure $ unboxSignedBox0 box - (ak, _) <- MaybeT $ pure $ unboxSignedBox0 box + (pk, ProposeTran headRef pbox) <- MaybeT $ pure $ unboxSignedBox0 box + + liftIO $ withPeerM penv $ tryDownloadContent env chan box + + (ak, bss) <- MaybeT $ pure $ unboxSignedBox0 pbox lift $ lift $ downloadMissedHead sto chan headRef @@ -941,14 +975,22 @@ logMergeProcess penv env q = withPeerM penv do debug $ "new trans to merge" <+> pretty (AsBase58 chan) <+> pretty (length new) - forM_ new $ \tnew -> do - debug $ "TRANS TO MERGE" <+> pretty tnew + let merged = HashSet.fromList new - let merged = HashSet.fromList new & HashSet.toList + deps <- atomically $ flushTQueue downQ - let pt = toPTree (MaxSize 256) (MaxNum 256) merged + for_ deps $ \(parent, AnnotatedHashRef a r) -> do + when (parent `HashSet.member` merged) do + + debug $ "#@#@#@ !!! RefChan.download deps" <+> pretty a <+> pretty r + lift $ refChanAddDownload env chan r dontHandle + maybe1 a none $ \ann -> do + lift $ refChanAddDownload env chan ann dontHandle + + unless (HashSet.null merged) do + + let pt = toPTree (MaxSize 256) (MaxNum 256) (HashSet.toList merged) - unless (List.null merged) do liftIO do nref <- makeMerkle 0 pt $ \(_,_,bss) -> do void $ putBlock sto bss