experimental persistent-download-queue

This commit is contained in:
Dmitry Zuikov 2023-02-24 13:17:25 +03:00
parent 962afec828
commit afe5c09a18
7 changed files with 184 additions and 32 deletions

View File

@ -207,3 +207,4 @@ fixme-set "workflow" "test" "AR3Ppzm1E2"
fixme-set "workflow" "test" "BZjzN7BjQ4"
fixme-set "workflow" "wip" "6spiDvVE3q"
fixme-set "assigned" "voidlizard" "6spiDvVE3q"
fixme-set "workflow" "test" "6spiDvVE3q"

View File

@ -13,8 +13,8 @@ import HBS2.Hash
import HBS2.Merkle
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.Definition
import HBS2.Net.Proto.Peer
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
@ -27,13 +27,9 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM.TSem as Sem
import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable hiding (find)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap
@ -41,12 +37,7 @@ import Data.IntSet qualified as IntSet
import Data.List qualified as List
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Lens.Micro.Platform
import Numeric ( showGFloat )
import Prettyprinter
import System.Random.Shuffle
import Type.Reflection
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
@ -169,20 +160,6 @@ processBlock h = do
-- So make sure that this peer really answered to
-- GetBlockSize request
type DownloadFromPeerStuff e m = ( MyPeer e
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
, PeerMessaging e
, HasProtocol e (BlockInfo e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Block ByteString ~ ByteString
, HasStorage m
)
downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m
=> Peer e
@ -378,6 +355,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, EventListener e (BlockAnnounce e) m
, EventListener e (PeerHandshake e) m
, EventEmitter e (BlockChunks e) m
, EventEmitter e (DownloadReq e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Sessions e (KnownPeer e) m

107
hbs2-peer/app/DownloadQ.hs Normal file
View File

@ -0,0 +1,107 @@
{-# Language AllowAmbiguousTypes #-}
module DownloadQ where
import HBS2.Prelude
import HBS2.Clock
import HBS2.Hash
import HBS2.Events
import HBS2.Data.Types.Refs
import HBS2.Actors.Peer
import HBS2.Storage
import HBS2.Merkle
import HBS2.System.Logger.Simple
import PeerTypes
import PeerConfig
import Data.Map qualified as Map
import Data.Foldable
import Control.Concurrent.STM
import Data.ByteString.Char8 qualified as B8
import Data.List (nub)
import Data.Maybe
import Data.Functor
import Data.Function
import Control.Exception
import Control.Monad
import System.IO
downloadLogAppend :: forall e m . ( MonadIO m
, EventEmitter e (DownloadReq e) m
, DownloadFromPeerStuff e m
) => Hash HbSync -> m ()
downloadLogAppend h = do
emit @e DownloadReqKey (DownloadReqData h)
noLogFile :: MonadIO m => m ()
noLogFile = err "download log not defined"
downloadQueue :: forall e m . ( MyPeer e
, DownloadFromPeerStuff e m
, EventListener e (DownloadReq e) m
) => PeerConfig -> DownloadEnv e -> m ()
downloadQueue conf denv = do
sto <- getStorage
hq <- liftIO newTQueueIO
pause @'Seconds 1
let qfile' = cfgValue @PeerDownloadLogKey conf :: Maybe String
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
liftIO $ atomically $ writeTQueue hq h
maybe1 qfile' noLogFile $ \fn -> forever do
debug $ "downloadQueue" <+> pretty fn
liftIO do
r <- catchAny (B8.readFile fn) (\e -> whimper e >> pure "")
let hashes = B8.lines r & mapMaybe (fromStringMay . B8.unpack) & nub :: [Hash HbSync]
fromq <- liftIO $ atomically $ flushTQueue hq
let hashesWip = nub ( hashes <> fromq )
errnum <- newTVarIO mempty
let walk h = walkMerkle h (getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
case hr of
Left{} -> atomically $ modifyTVar errnum (mappend [(h,True)])
Right (hrr :: [HashRef]) -> do
forM_ hrr $ \(HashRef hx) -> do
mblk <- hasBlock sto hx
case mblk of
Nothing -> atomically $ modifyTVar errnum (mappend [(h,True)])
_ -> pure ()
for_ hashesWip walk
loosers <- readTVarIO errnum <&> Map.fromListWith (||) <&> Map.filter id
-- debug $ vcat (fmap pretty (Map.toList loosers))
let leftovers = [ x | x <- hashesWip , Map.member x loosers ]
for_ leftovers $ withDownload denv . addDownload
catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) )
whimper
debug "downloadQueue okay"
-- TODO: remove-downloadQueue-pause-hardcode
pause @'Seconds 300
-- FIXME: only-debug-20-sec
where
whimper e = err (pretty $ show e)
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch

View File

@ -37,6 +37,11 @@ type C = MegaParsec
pattern Key :: forall {c}. Id -> [Syntax c] -> [Syntax c]
pattern Key n ns <- SymbolVal n : ns
data PeerDownloadLogKey
instance HasCfgKey PeerDownloadLogKey (Maybe String) where
key = "download-log"
cfgName :: FilePath
cfgName = "config"
@ -85,6 +90,11 @@ peerConfigInit mbfp = liftIO do
appendFile (dir</>cfgName) ";; hbs2-peer config file"
appendFile (dir</>cfgName) defConfigData
peerConfDef :: String
peerConfDef = [qc|
download-log "./download-log"
|]
peerConfigRead :: MonadIO m => Maybe FilePath -> m PeerConfig
peerConfigRead mbfp = do
@ -112,7 +122,9 @@ peerConfigRead mbfp = do
debug $ pretty cfgPath
confData <- liftIO $ readFile cfgPath <&> parseTop <&> either mempty id
confData' <- liftIO $ readFile cfgPath <&> parseTop <&> either mempty id
let confData = confData' <> either mempty id (parseTop peerConfDef)
debug $ pretty confData
@ -125,6 +137,10 @@ peerConfigRead mbfp = do
kp <- liftIO $ canonicalizePath (dir </> Text.unpack p)
pure $ List @C co [Symbol co "storage", Literal co (mkLit (Text.pack kp)) ]
List co (Key "download-log" [LitStrVal p]) -> do
kp <- liftIO $ canonicalizePath (dir </> Text.unpack p)
pure $ List @C co [Symbol co "download-log", Literal co (mkLit (Text.pack kp)) ]
x -> pure x
pure $ PeerConfig config

View File

@ -31,6 +31,7 @@ import HBS2.System.Logger.Simple qualified as Log
import RPC
import PeerTypes
import BlockDownload
import DownloadQ
import PeerInfo
import PeerConfig
import Bootstrap
@ -482,6 +483,8 @@ runPeer opts = Exception.handle myException $ do
peerThread (blockDownloadLoop denv)
peerThread (downloadQueue conf denv)
peerThread $ forever $ do
cmd <- liftIO $ atomically $ readTQueue rpcQ
case cmd of
@ -551,6 +554,7 @@ runPeer opts = Exception.handle myException $ do
| otherwise -> do
downloadLogAppend @e h
withDownload denv $ do
processBlock h
@ -584,8 +588,9 @@ runPeer opts = Exception.handle myException $ do
let fetchAction h = do
debug $ "fetchAction" <+> pretty h
liftIO $ withPeerM penv
$ withDownload denv (processBlock h)
liftIO $ withPeerM penv $ do
downloadLogAppend @e h
withDownload denv (processBlock h)
let peersAction _ = do
who <- thatPeer (Proxy @(RPC e))

View File

@ -3,9 +3,12 @@
{-# Language UndecidableInstances #-}
module PeerTypes where
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
import HBS2.Net.Messaging.UDP (UDP)
import HBS2.Net.Proto
import HBS2.Net.Proto.BlockInfo
import HBS2.Net.Proto.Definition
@ -13,7 +16,8 @@ import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.System.Logger.Simple
import HBS2.Net.Messaging.UDP (UDP)
import PeerInfo
import Control.Concurrent.Async
import Control.Concurrent.STM
@ -25,8 +29,49 @@ import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Maybe
import Lens.Micro.Platform
import Data.Hashable
import Type.Reflection
import Numeric (showGFloat)
import Prettyprinter
type MyPeer e = (Eq (Peer e), Hashable (Peer e), Pretty (Peer e))
data DownloadReq e
data instance EventKey e (DownloadReq e) =
DownloadReqKey
deriving (Generic,Typeable,Eq)
instance Typeable (DownloadReq e) => Hashable (EventKey e (DownloadReq e)) where
hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
where
p = Proxy @DownloadReq
newtype instance Event e (DownloadReq e) =
DownloadReqData (Hash HbSync)
deriving (Typeable)
instance EventType ( Event e (DownloadReq e) ) where
isPersistent = True
instance Expires (EventKey e (DownloadReq e)) where
expiresIn = const Nothing
type DownloadFromPeerStuff e m = ( MyPeer e
, MonadIO m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, MonadReader (PeerEnv e ) m
, PeerMessaging e
, HasProtocol e (BlockInfo e)
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, Sessions e (BlockChunks e) m
, Sessions e (PeerInfo e) m
, Block ByteString ~ ByteString
, HasStorage m
)
calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)]
calcBursts bu pieces = go seed
@ -104,8 +149,6 @@ data DownloadEnv e =
makeLenses 'DownloadEnv
class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e
newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e)
newDownloadEnv = liftIO do

View File

@ -53,6 +53,7 @@ common common-deps
, unordered-containers
, vector
, interpolatedstring-perl6
, filelock
common shared-properties
ghc-options:
@ -105,6 +106,7 @@ executable hbs2-peer
main-is: PeerMain.hs
other-modules: BlockDownload
, DownloadQ
, Bootstrap
, PeerInfo
, RPC