diff --git a/.fixme/log b/.fixme/log index 55f3b5e0..85920119 100644 --- a/.fixme/log +++ b/.fixme/log @@ -206,4 +206,5 @@ fixme-set "assigned" "voidlizard" "AR3Ppzm1E2" fixme-set "workflow" "test" "AR3Ppzm1E2" fixme-set "workflow" "test" "BZjzN7BjQ4" fixme-set "workflow" "wip" "6spiDvVE3q" -fixme-set "assigned" "voidlizard" "6spiDvVE3q" \ No newline at end of file +fixme-set "assigned" "voidlizard" "6spiDvVE3q" +fixme-set "workflow" "test" "6spiDvVE3q" \ No newline at end of file diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 96ce8e7e..531778c9 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -13,8 +13,8 @@ import HBS2.Hash import HBS2.Merkle import HBS2.Net.PeerLocator import HBS2.Net.Proto -import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated import HBS2.Storage @@ -27,13 +27,9 @@ import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe -import Control.Concurrent.STM.TSem as Sem import Data.ByteString.Lazy (ByteString) -import Data.Cache (Cache) import Data.Cache qualified as Cache import Data.Foldable hiding (find) -import Data.Hashable -import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.IntMap (IntMap) import Data.IntMap qualified as IntMap @@ -41,12 +37,7 @@ import Data.IntSet qualified as IntSet import Data.List qualified as List import Data.Maybe import Data.Set qualified as Set -import Data.Set (Set) import Lens.Micro.Platform -import Numeric ( showGFloat ) -import Prettyprinter -import System.Random.Shuffle -import Type.Reflection getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) @@ -169,20 +160,6 @@ processBlock h = do -- So make sure that this peer really answered to -- GetBlockSize request -type DownloadFromPeerStuff e m = ( MyPeer e - , MonadIO m - , Request e (BlockInfo e) m - , Request e (BlockChunks e) m - , MonadReader (PeerEnv e ) m - , PeerMessaging e - , HasProtocol e (BlockInfo e) - , EventListener e (BlockInfo e) m - , EventListener e (BlockChunks e) m - , Sessions e (BlockChunks e) m - , Sessions e (PeerInfo e) m - , Block ByteString ~ ByteString - , HasStorage m - ) downloadFromWithPeer :: forall e m . DownloadFromPeerStuff e m => Peer e @@ -378,6 +355,7 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO , EventListener e (BlockAnnounce e) m , EventListener e (PeerHandshake e) m , EventEmitter e (BlockChunks e) m + , EventEmitter e (DownloadReq e) m , Sessions e (BlockChunks e) m , Sessions e (PeerInfo e) m , Sessions e (KnownPeer e) m diff --git a/hbs2-peer/app/DownloadQ.hs b/hbs2-peer/app/DownloadQ.hs new file mode 100644 index 00000000..fc59e314 --- /dev/null +++ b/hbs2-peer/app/DownloadQ.hs @@ -0,0 +1,107 @@ +{-# Language AllowAmbiguousTypes #-} +module DownloadQ where + +import HBS2.Prelude +import HBS2.Clock +import HBS2.Hash +import HBS2.Events +import HBS2.Data.Types.Refs +import HBS2.Actors.Peer +import HBS2.Storage +import HBS2.Merkle +import HBS2.System.Logger.Simple + +import PeerTypes +import PeerConfig + +import Data.Map qualified as Map +import Data.Foldable +import Control.Concurrent.STM +import Data.ByteString.Char8 qualified as B8 +import Data.List (nub) +import Data.Maybe +import Data.Functor +import Data.Function +import Control.Exception +import Control.Monad +import System.IO + + +downloadLogAppend :: forall e m . ( MonadIO m + , EventEmitter e (DownloadReq e) m + , DownloadFromPeerStuff e m + ) => Hash HbSync -> m () +downloadLogAppend h = do + emit @e DownloadReqKey (DownloadReqData h) + +noLogFile :: MonadIO m => m () +noLogFile = err "download log not defined" + +downloadQueue :: forall e m . ( MyPeer e + , DownloadFromPeerStuff e m + , EventListener e (DownloadReq e) m + ) => PeerConfig -> DownloadEnv e -> m () + +downloadQueue conf denv = do + + sto <- getStorage + hq <- liftIO newTQueueIO + + pause @'Seconds 1 + + let qfile' = cfgValue @PeerDownloadLogKey conf :: Maybe String + + subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do + liftIO $ atomically $ writeTQueue hq h + + maybe1 qfile' noLogFile $ \fn -> forever do + + debug $ "downloadQueue" <+> pretty fn + + liftIO do + + r <- catchAny (B8.readFile fn) (\e -> whimper e >> pure "") + + let hashes = B8.lines r & mapMaybe (fromStringMay . B8.unpack) & nub :: [Hash HbSync] + + fromq <- liftIO $ atomically $ flushTQueue hq + let hashesWip = nub ( hashes <> fromq ) + + errnum <- newTVarIO mempty + + let walk h = walkMerkle h (getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do + case hr of + Left{} -> atomically $ modifyTVar errnum (mappend [(h,True)]) + Right (hrr :: [HashRef]) -> do + forM_ hrr $ \(HashRef hx) -> do + mblk <- hasBlock sto hx + case mblk of + Nothing -> atomically $ modifyTVar errnum (mappend [(h,True)]) + _ -> pure () + + for_ hashesWip walk + + loosers <- readTVarIO errnum <&> Map.fromListWith (||) <&> Map.filter id + + -- debug $ vcat (fmap pretty (Map.toList loosers)) + + let leftovers = [ x | x <- hashesWip , Map.member x loosers ] + + for_ leftovers $ withDownload denv . addDownload + + catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) ) + whimper + + debug "downloadQueue okay" + + -- TODO: remove-downloadQueue-pause-hardcode + pause @'Seconds 300 + -- FIXME: only-debug-20-sec + + where + whimper e = err (pretty $ show e) + + catchAny :: IO a -> (SomeException -> IO a) -> IO a + catchAny = Control.Exception.catch + + diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index 90592cd4..b6a3647d 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -37,6 +37,11 @@ type C = MegaParsec pattern Key :: forall {c}. Id -> [Syntax c] -> [Syntax c] pattern Key n ns <- SymbolVal n : ns +data PeerDownloadLogKey + +instance HasCfgKey PeerDownloadLogKey (Maybe String) where + key = "download-log" + cfgName :: FilePath cfgName = "config" @@ -85,6 +90,11 @@ peerConfigInit mbfp = liftIO do appendFile (dircfgName) ";; hbs2-peer config file" appendFile (dircfgName) defConfigData +peerConfDef :: String +peerConfDef = [qc| + download-log "./download-log" +|] + peerConfigRead :: MonadIO m => Maybe FilePath -> m PeerConfig peerConfigRead mbfp = do @@ -112,7 +122,9 @@ peerConfigRead mbfp = do debug $ pretty cfgPath - confData <- liftIO $ readFile cfgPath <&> parseTop <&> either mempty id + confData' <- liftIO $ readFile cfgPath <&> parseTop <&> either mempty id + + let confData = confData' <> either mempty id (parseTop peerConfDef) debug $ pretty confData @@ -125,6 +137,10 @@ peerConfigRead mbfp = do kp <- liftIO $ canonicalizePath (dir Text.unpack p) pure $ List @C co [Symbol co "storage", Literal co (mkLit (Text.pack kp)) ] + List co (Key "download-log" [LitStrVal p]) -> do + kp <- liftIO $ canonicalizePath (dir Text.unpack p) + pure $ List @C co [Symbol co "download-log", Literal co (mkLit (Text.pack kp)) ] + x -> pure x pure $ PeerConfig config diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index a2587e9e..f7009b05 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -31,6 +31,7 @@ import HBS2.System.Logger.Simple qualified as Log import RPC import PeerTypes import BlockDownload +import DownloadQ import PeerInfo import PeerConfig import Bootstrap @@ -482,6 +483,8 @@ runPeer opts = Exception.handle myException $ do peerThread (blockDownloadLoop denv) + peerThread (downloadQueue conf denv) + peerThread $ forever $ do cmd <- liftIO $ atomically $ readTQueue rpcQ case cmd of @@ -551,6 +554,7 @@ runPeer opts = Exception.handle myException $ do | otherwise -> do + downloadLogAppend @e h withDownload denv $ do processBlock h @@ -584,8 +588,9 @@ runPeer opts = Exception.handle myException $ do let fetchAction h = do debug $ "fetchAction" <+> pretty h - liftIO $ withPeerM penv - $ withDownload denv (processBlock h) + liftIO $ withPeerM penv $ do + downloadLogAppend @e h + withDownload denv (processBlock h) let peersAction _ = do who <- thatPeer (Proxy @(RPC e)) diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 1fc8fa93..85ed392b 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -3,9 +3,12 @@ {-# Language UndecidableInstances #-} module PeerTypes where +import HBS2.Actors.Peer import HBS2.Clock import HBS2.Defaults +import HBS2.Events import HBS2.Hash +import HBS2.Net.Messaging.UDP (UDP) import HBS2.Net.Proto import HBS2.Net.Proto.BlockInfo import HBS2.Net.Proto.Definition @@ -13,7 +16,8 @@ import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated import HBS2.Storage import HBS2.System.Logger.Simple -import HBS2.Net.Messaging.UDP (UDP) + +import PeerInfo import Control.Concurrent.Async import Control.Concurrent.STM @@ -25,8 +29,49 @@ import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.Maybe import Lens.Micro.Platform +import Data.Hashable +import Type.Reflection import Numeric (showGFloat) -import Prettyprinter + + +type MyPeer e = (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) + +data DownloadReq e + +data instance EventKey e (DownloadReq e) = + DownloadReqKey + deriving (Generic,Typeable,Eq) + +instance Typeable (DownloadReq e) => Hashable (EventKey e (DownloadReq e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @DownloadReq + +newtype instance Event e (DownloadReq e) = + DownloadReqData (Hash HbSync) + deriving (Typeable) + +instance EventType ( Event e (DownloadReq e) ) where + isPersistent = True + +instance Expires (EventKey e (DownloadReq e)) where + expiresIn = const Nothing + +type DownloadFromPeerStuff e m = ( MyPeer e + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockChunks e) m + , MonadReader (PeerEnv e ) m + , PeerMessaging e + , HasProtocol e (BlockInfo e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , Sessions e (BlockChunks e) m + , Sessions e (PeerInfo e) m + , Block ByteString ~ ByteString + , HasStorage m + ) + calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] calcBursts bu pieces = go seed @@ -104,8 +149,6 @@ data DownloadEnv e = makeLenses 'DownloadEnv -class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e -instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) newDownloadEnv = liftIO do diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 6d1ba6c8..af574dac 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -53,6 +53,7 @@ common common-deps , unordered-containers , vector , interpolatedstring-perl6 + , filelock common shared-properties ghc-options: @@ -105,6 +106,7 @@ executable hbs2-peer main-is: PeerMain.hs other-modules: BlockDownload + , DownloadQ , Bootstrap , PeerInfo , RPC