hbs2/hbs2-peer/app/RefChan.hs

221 lines
7.0 KiB
Haskell

{-# Language AllowAmbiguousTypes #-}
{-# Language TemplateHaskell #-}
module RefChan (
RefChanWorkerEnv(..)
, refChanWorkerEnvHeadQ
, refChaWorkerEnvDownload
, refChanOnHead
, refChanWorker
, refChanWorkerEnv
) where
import HBS2.Prelude.Plated
import HBS2.Actors.Peer
import HBS2.Base58
import HBS2.Clock
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto
import HBS2.Net.Proto.RefChan
import HBS2.Net.Proto.Types
import HBS2.Storage
import HBS2.System.Logger.Simple
import PeerTypes
import PeerConfig
import BlockDownload
import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import UnliftIO
import Lens.Micro.Platform
import Control.Monad.Except (throwError, runExceptT)
import Data.Maybe
import Control.Exception ()
import Control.Monad.Trans.Maybe
import Streaming.Prelude qualified as S
import Streaming qualified as S
{- HLINT ignore "Use newtype instead of data" -}
data DataNotReady = DataNotReady deriving (Show)
instance Exception DataNotReady
data RefChanWorkerEnv e =
RefChanWorkerEnv
{ _refChanWorkerEnvDownload :: DownloadEnv e
, _refChanWorkerEnvHeadQ :: TQueue (RefChanHeadBlockTran e)
, _refChaWorkerEnvDownload :: TVar (HashMap HashRef ()) -- таймстемп можно
}
makeLenses 'RefChanWorkerEnv
refChanWorkerEnv :: forall m e . MonadIO m
=> PeerConfig
-> DownloadEnv e
-> m (RefChanWorkerEnv e)
refChanWorkerEnv _ de = liftIO $ RefChanWorkerEnv @e de <$> newTQueueIO
<*> newTVarIO mempty
refChanOnHead :: MonadIO m => RefChanWorkerEnv e -> RefChanHeadBlockTran e -> m ()
refChanOnHead env tran = do
atomically $ writeTQueue (view refChanWorkerEnvHeadQ env) tran
refChanAddDownload :: forall e m . ( m ~ PeerM e IO
, MyPeer e
, Block ByteString ~ ByteString
)
=> RefChanWorkerEnv e -> HashRef -> m ()
refChanAddDownload env r = do
penv <- ask
withPeerM penv $ withDownload (_refChanWorkerEnvDownload env)
$ processBlock @e (fromHashRef r)
atomically $ modifyTVar (view refChaWorkerEnvDownload env) (HashMap.insert r ())
checkDownloaded :: forall m . (MonadIO m, HasStorage m, Block ByteString ~ ByteString) => HashRef -> m Bool
checkDownloaded hr = do
sto <- getStorage
let readBlock h = liftIO $ getBlock sto h
result <- runExceptT $ deepScan ScanDeep (const $ throwError DataNotReady) (fromHashRef hr) readBlock dontHandle
pure $ either (const False) (const True) result
-- FIXME: move-to-library
readBlob :: forall m . ( MonadIO m
, HasStorage m
, Block ByteString ~ ByteString
)
=> HashRef
-> m (Maybe ByteString)
readBlob hr = do
sto <- getStorage
let readBlock h = liftIO $ getBlock sto h
chunks <- S.toList_ $
deepScan ScanDeep (const $ S.yield Nothing) (fromHashRef hr) readBlock $ \ha -> do
unless (fromHashRef hr == ha) do
readBlock ha >>= S.yield
let mfo acc el = case (acc, el) of
(Nothing, Just s) -> Just [s]
(_, Nothing) -> Nothing
(Just ss, Just s) -> Just (s:ss)
pure $ LBS.concat . reverse <$> foldl mfo Nothing chunks
refChanWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m
, MyPeer e
, HasStorage m
, Signatures s
, s ~ Encryption e
, IsRefPubKey s
, Pretty (AsBase58 (PubKey 'Sign s))
, Block ByteString ~ ByteString
, ForRefChans e
, m ~ PeerM e IO
)
=> RefChanWorkerEnv e
-> m ()
refChanWorker env = do
hw <- async refChanHeadMon
downloads <- async monitorDownloads
forever do
pause @'Seconds 10
debug "I'm refchan worker"
mapM_ wait [hw,downloads]
where
monitorDownloads = forever do
pause @'Seconds 2
all <- atomically $ readTVar (view refChaWorkerEnvDownload env) <&> HashMap.keys
-- FIXME: consider-timeouts-or-leak-is-possible
rest <- forM all $ \r -> do
here <- checkDownloaded r
if here then do
refChanOnHead env (RefChanHeadBlockTran r)
pure mempty
else do
pure [(r,())]
atomically $ writeTVar (view refChaWorkerEnvDownload env) (HashMap.fromList (mconcat rest))
-- FIXME: in-parallel?
refChanHeadMon = do
forever do
RefChanHeadBlockTran hr <- atomically $ readTQueue (view refChanWorkerEnvHeadQ env)
-- debug $ "DROP HEAD UPDATE" <+> pretty (fromRefChanHeadBlockTran tran)
here <- checkDownloaded hr
if not here then do
refChanAddDownload env hr
trace $ "BLOCK IS NOT HERE" <+> pretty hr
else do
trace $ "BLOCK IS HERE" <+> pretty hr
-- читаем блок
lbs <- readBlob hr <&> fromMaybe mempty
let what = unboxSignedBox @(RefChanHeadBlock e) @e lbs
case what of
Nothing -> err $ "malformed head block" <+> pretty hr
Just (pk,blk) -> do
let rkey = RefChanHeadKey @s pk
sto <- getStorage
debug $ "Good head block" <+> pretty hr <+> "processing..."
ourVersion <- runMaybeT do
cur <- MaybeT $ liftIO $ getRef sto rkey
lbss <- MaybeT $ readBlob (HashRef cur)
(_, blkOur) <- MaybeT $ pure $ unboxSignedBox @(RefChanHeadBlock e) @e lbss
pure $ view refChanHeadVersion blkOur
let v0 = fromMaybe 0 ourVersion
let v1 = view refChanHeadVersion blk
if v1 > v0 then do
debug $ "UPDATING HEAD BLOCK" <+> pretty (v1, v0)
liftIO $ updateRef sto rkey (fromHashRef hr)
else do
debug $ "LEAVING HEAD BLOCK" <+> pretty (v1, v0)
pure ()
-- распаковываем блок
-- вытаскиваем ключ из блока?
pure ()
-- если всё скачано --- то обрабатываем.
-- если не скачано -- то говорим качать и ждём. как ждём?
-- помещаем в фигню, которая download запускает, и время от времени ждёт,
-- пока скачается. как скачается -- убирает из своего локального стейта,
-- и пихает транзу обратно в эту очередь, допустим.