fixed-block-download-control

This commit is contained in:
Dmitry Zuikov 2023-10-20 11:16:11 +03:00
parent 44ada95e3f
commit 98e589fe6f
15 changed files with 214 additions and 120 deletions

View File

@ -179,7 +179,7 @@ walkMerkle' root flookup sink = go root
bs0 <- lift $ flookup hash
bs <- MaybeT $ maybe1 bs0 (sink (Left hash) >> pure Nothing) (pure . Just)
bs <- MaybeT $ maybe1 bs0 (sink (Left hash) >> pure mzero) (pure . Just)
let t1 = deserialiseOrFail @(MTree a) bs
@ -188,7 +188,7 @@ walkMerkle' root flookup sink = go root
runWithAnnTree hash bs = do
let t = deserialiseOrFail @(MTreeAnn a) bs
case t of
Left{} -> lift (sink (Left hash)) >> mzero
Left{} -> pure ()
Right (MTreeAnn { _mtaTree = t1 }) -> runWithTree t1
runWithTree t = lift do

View File

@ -40,6 +40,9 @@ import Data.Text qualified as Text
import Data.Hashable
import Prettyprinter
import Data.Word
import GHC.Generics
import Data.Time.Clock (NominalDiffTime(..))
import Codec.Serialise
none :: forall m . Monad m => m ()
none = pure ()
@ -50,8 +53,10 @@ maybe1 mb n j = maybe n j mb
eitherToMaybe :: Either a b -> Maybe b
eitherToMaybe = either (const Nothing) Just
newtype AsFileName a = AsFileName a
-- deriving instance Generic NominalDiffTime
-- instance Serialise NominalDiffTime
newtype AsFileName a = AsFileName a
instance Pretty a => Pretty (AsFileName a) where
pretty (AsFileName f) = pretty x <> "@" <> uniq

View File

@ -9,7 +9,7 @@ module HBS2.Prelude.Plated
import Data.Data
import Data.Generics.Uniplate.Data()
import Data.Generics.Uniplate.Operations
import GHC.Generics(Generic)
import GHC.Generics()
import Safe
import HBS2.Prelude
@ -25,3 +25,5 @@ uniFirstMay = headMay . universeBi
uniFirstDef :: forall from to . (Data from, Data to) => to -> from -> to
uniFirstDef d = headDef d . universeBi

View File

@ -20,6 +20,7 @@ import HBS2.Net.Proto.RefLog
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.System.Logger.Simple
import PeerTypes
@ -143,7 +144,7 @@ processBlock h = do
Just (SeqRef (SequentialRef n (AnnotatedHashRef a' b))) -> do
maybe1 a' none $ \a -> do
debug $ "GOT AnnotatedHashRef" <+> pretty a
addDownload parent (fromHashRef a)
processBlock (fromHashRef a)
addDownload parent (fromHashRef b)

View File

@ -1,3 +1,4 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
{-# Language TemplateHaskell #-}
@ -10,6 +11,7 @@ module Brains
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Net.Proto.RefChan(ForRefChans)
import HBS2.Data.Types.Refs
import HBS2.Net.Proto
import HBS2.Hash
import HBS2.Base58
@ -20,13 +22,14 @@ import HBS2.System.Logger.Simple
import PeerConfig
import Crypto.Saltine.Core.Box qualified as Encrypt
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Control.Exception
import Control.Concurrent.STM
import Crypto.Saltine.Core.Box qualified as Encrypt
import Database.SQLite.Simple
import Database.SQLite.Simple.FromField
import Database.SQLite.Simple.ToField
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Either
@ -35,6 +38,7 @@ import Data.HashMap.Strict qualified as HashMap
import Data.List qualified as List
import Data.Maybe
import Data.Text qualified as Text
import Data.Time.Clock (UTCTime)
import Data.Word
import Lens.Micro.Platform
import System.Directory
@ -45,6 +49,16 @@ import UnliftIO (MonadUnliftIO(..),async,race)
data PeerBrainsDb
-- FIXME: move-that-orphans-somewhere
instance ToField HashRef where
toField h = toField (show $ pretty h)
instance FromField HashRef where
fromField = fmap fromString . fromField @String
instance Monad m => HasCfgKey PeerBrainsDb (Maybe String) m where
key = "brains"
@ -90,6 +104,10 @@ instance ( Hashable (Peer e)
listTCPPexCandidates = liftIO . selectTCPPexCandidates
listDownloads = liftIO . selectDownloads
delDownload br what = updateOP br (deleteDownload br what)
onKnownPeers br ps = do
trace $ "BRAINS: onKnownPeers" <+> pretty ps
let tv = view brainsPeers br
@ -118,19 +136,27 @@ instance ( Hashable (Peer e)
liftIO $ Cache.insert cache h ()
let doAlter = HashMap.alter (maybe (Just 0) (Just . succ)) (peer,h)
liftIO $ atomically $ modifyTVar (view brainsPostponeDown b) doAlter
-- updateOP b $ insertDownload b
onBlockDownloaded b p h = do
-- trace $ "BRAINS: onBlockDownloaded" <+> pretty p <+> pretty h
cleanupPostponed b h
updateOP b $ insertPeer b h p
updateOP b do
insertPeer b h p
onBlockPostponed b h = do
-- trace $ "BRAINS: onBlockPostponed" <+> pretty h
cleanupPostponed b h
claimBlockCameFrom b f t = do
claimBlockCameFrom b (Just f) t = do
-- trace $ "BRAINS: claimBlockCameFrom" <+> pretty f <+> pretty t
updateOP b $ insertAncestor b f t
updateOP b do
insertAncestor b f t
insertDownload b (Just $ HashRef f) (HashRef t)
claimBlockCameFrom b p h = do
updateOP b do
insertDownload b (HashRef <$> p) (HashRef h)
shouldPostponeBlock b h = do
peers <- liftIO $ readTVarIO (view brainsPeers b)
@ -587,6 +613,42 @@ deletePeerAsymmKey' br key =
WHERE peer = ?
|] (Only key)
insertDownload :: forall e . ( e ~ L4Proto)
=> BasicBrains e
-> Maybe HashRef
-> HashRef
-> IO ()
insertDownload br parent hash = do
let conn = view brainsDb br
liftIO $ execute conn [qc|
insert into statedb.download (hash, parent)
values (?, ?)
on conflict (hash) do update set parent = excluded.parent
|] (hash, parent)
deleteDownload :: forall e . (e ~ L4Proto)
=> BasicBrains e
-> HashRef
-> IO ()
deleteDownload br hash = do
let conn = view brainsDb br
liftIO $ execute conn [qc|
delete from statedb.download where hash = ?
|] (Only hash)
selectDownloads :: forall e . (e ~ L4Proto)
=> BasicBrains e
-> IO [(HashRef, Integer)]
selectDownloads br = do
let conn = view brainsDb br
liftIO $ query_ conn [qc|
select hash, ts from statedb.download
|]
---
-- FIXME: eventually-close-db
@ -619,6 +681,14 @@ newBasicBrains cfg = liftIO do
create table if not exists statedb.processed ( hash text not null primary key );
|]
execute_ conn [qc|
create table if not exists statedb.download
( hash text not null primary key
, parent text null
, ts INTEGER DEFAULT (strftime('%s','now'))
);
|]
execute_ conn [qc|
create table if not exists ancestors
( child text not null

View File

@ -111,7 +111,6 @@ checkBlockAnnounce conf denv nonce pa h = void $ runMaybeT do
guard accept
lift do
downloadLogAppend @e h
withDownload denv $ do
processBlock h

View File

@ -3,128 +3,53 @@ module DownloadQ where
import HBS2.Prelude
import HBS2.Clock
import HBS2.Hash
import HBS2.Events
import HBS2.Data.Types.Refs
import HBS2.Actors.Peer
import HBS2.Net.PeerLocator
import HBS2.Peer.Brains
import HBS2.Storage
import HBS2.Merkle
import HBS2.Storage.Operations.Missed
import HBS2.System.Logger.Simple
import PeerTypes
import PeerConfig
import BlockDownload (processBlock)
import Data.Map qualified as Map
import Data.Foldable
import Control.Concurrent.STM
import Control.Concurrent.STM.TSem
import Data.ByteString.Char8 qualified as B8
import Data.List (nub)
import Data.Maybe
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Control.Concurrent.Async
downloadLogAppend :: forall e m . ( MonadIO m
, EventEmitter e (DownloadReq e) m
, DownloadFromPeerStuff e m
) => Hash HbSync -> m ()
downloadLogAppend h = do
emit @e DownloadReqKey (DownloadReqData h)
noLogFile :: MonadIO m => m ()
noLogFile = err "download log not defined"
import Lens.Micro.Platform
downloadQueue :: forall e m . ( MyPeer e
, DownloadFromPeerStuff e m
, HasPeerLocator e (BlockDownloadM e m)
, HasPeerLocator e m
, EventListener e (DownloadReq e) m
) => PeerConfig -> DownloadEnv e -> m ()
, HasStorage m
) => PeerConfig
-> SomeBrains e
-> DownloadEnv e -> m ()
downloadQueue (PeerConfig syn) denv = do
downloadQueue _ brains denv = do
debug "DownloadQ started"
down <- listDownloads @e brains
sto <- getStorage
hq <- liftIO newTQueueIO
fsem <- liftIO $ atomically $ newTSem 1
pause @'Seconds 2
withDownload denv do
forM_ down $ \(HashRef h,_) -> do
missed <- findMissedBlocks sto (HashRef h)
for_ missed $ \h -> do
debug $ "DownloadQ:" <+> pretty h
addDownload mzero (fromHashRef h)
let qfile' = runReader (cfgValue @PeerDownloadLogKey) syn
-- FIXME: timeout-hardcodes
let refs = listDownloads @e brains <&> fmap (set _2 10)
subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do
liftIO $ atomically $ writeTQueue hq h
polling (Polling 5 10) refs $ \ref -> do
missed <- findMissedBlocks sto ref
maybe1 qfile' noLogFile $ \fn -> do
void $ liftIO $ async $ forever $ do
pause @'Seconds 10
fromq <- liftIO $ atomically $ flushTQueue hq
unless (null fromq) do
atomically $ waitTSem fsem
catchAny ( B8.appendFile fn ( B8.unlines (fmap (B8.pack.show.pretty) fromq) ) )
whimper
atomically $ signalTSem fsem
debug $ "DownloadQ. check" <+> pretty ref <+> pretty (length missed)
maybe1 qfile' noLogFile $ \fn -> forever do
debug $ "downloadQueue" <+> pretty fn
lo <- liftIO do
-- FIXME: will-crash-on-big-logs
atomically $ waitTSem fsem
r <- catchAny (B8.readFile fn) (\e -> whimper e >> pure "")
atomically $ signalTSem fsem
let hashes = B8.lines r & mapMaybe (fromStringMay . B8.unpack) & nub :: [Hash HbSync]
fromq <- liftIO $ atomically $ flushTQueue hq
let hashesWip = nub ( hashes <> fromq )
errnum <- newTQueueIO
let walk h = walkMerkle h (getBlock sto) $ \(hr :: Either (Hash HbSync) [HashRef]) -> do
case hr of
Left{} -> atomically $ writeTQueue errnum (h,True)
Right (hrr :: [HashRef]) -> do
forM_ hrr $ \(HashRef hx) -> do
mblk <- hasBlock sto hx
case mblk of
Nothing -> atomically $ writeTQueue errnum (h,True)
_ -> pure ()
for_ hashesWip walk
loosers <- atomically $ flushTQueue errnum <&> Map.fromListWith (||) <&> Map.filter id
-- debug $ vcat (fmap pretty (Map.toList loosers))
let leftovers = [ x | x <- hashesWip , Map.member x loosers ]
atomically $ waitTSem fsem
catchAny ( B8.writeFile fn ( B8.unlines (fmap (B8.pack.show.pretty) leftovers) ) )
whimper
atomically $ signalTSem fsem
pure leftovers
for_ lo $ withDownload denv . processBlock
debug "downloadQueue okay"
-- TODO: remove-downloadQueue-pause-hardcode
pause @'Seconds 150
-- FIXME: only-debug-20-sec
where
whimper e = err (pretty $ show e)
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
when (null missed) do
delDownload @e brains ref

View File

@ -3,14 +3,16 @@ module Fetch where
import HBS2.Prelude
import HBS2.Actors.Peer
import HBS2.Data.Types.Refs
import HBS2.Storage.Operations.Missed
import HBS2.Net.Proto.Types
import HBS2.System.Logger.Simple
import PeerTypes
import DownloadQ
import BlockDownload
import Data.Foldable (for_)
fetchHash :: forall e m . (e ~ L4Proto, MonadIO m)
=> PeerEnv e
-> DownloadEnv e
@ -20,8 +22,11 @@ fetchHash :: forall e m . (e ~ L4Proto, MonadIO m)
fetchHash penv denv href = do
debug $ "fetchAction" <+> pretty h
liftIO $ withPeerM penv $ do
downloadLogAppend @e h
withDownload denv (processBlock h)
sto <- getStorage
missed <- findMissedBlocks sto href
for_ missed $ \miss -> do
withDownload denv (processBlock (fromHashRef miss))
where
h = fromHashRef href

View File

@ -87,24 +87,28 @@ import Crypto.Saltine (sodiumInit)
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.Cache qualified as Cache
import Data.HashSet qualified as HashSet
import Data.List qualified as L
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Data.HashSet qualified as HashSet
import Data.Time.Clock (UTCTime, NominalDiffTime, diffUTCTime, getCurrentTime)
import Data.Time.Clock.POSIX
import Data.Time.LocalTime
import Data.Time.Format
import Lens.Micro.Platform as Lens
import Network.Socket
import Options.Applicative
import Prettyprinter.Render.Terminal
import System.Directory
import System.Environment
import System.Exit
import System.IO
import System.Mem
import System.Metrics
import System.Posix.Process
import System.Environment
import Prettyprinter.Render.Terminal
import UnliftIO.Exception qualified as U
-- import UnliftIO.STM
@ -226,6 +230,7 @@ runCLI = do
<> command "refchan" (info pRefChan (progDesc "refchan commands"))
<> command "peers" (info pPeers (progDesc "show known peers"))
<> command "pexinfo" (info pPexInfo (progDesc "show pex"))
<> command "download" (info pDownload (progDesc "download management"))
<> command "poll" (info pPoll (progDesc "polling management"))
<> command "log" (info pLog (progDesc "set logging level"))
-- FIXME: bring-back-dialogue-over-separate-socket
@ -435,9 +440,39 @@ runCLI = do
<+> pretty what
<> line
pDownload = hsubparser ( command "list" (info pDownList (progDesc "list current downloads" ))
<> command "del" (info pDownDel (progDesc "delete download" ))
)
pDownDel = do
rpc <- pRpcCommon
href <- argument hashP ( metavar "HASH-REF")
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
void $ runMaybeT do
toMPlus =<< callService @RpcDownloadDel caller href
pDownList = do
rpc <- pRpcCommon
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
void $ runMaybeT do
-- now <- getU
d <- toMPlus =<< callService @RpcDownloadList caller ()
now <- liftIO getPOSIXTime
liftIO $ print $ vcat (fmap (fmt now) d)
pure ()
where
fmt now (h,u) = pretty (AsBase58 h) <+> pretty diff
where
delta = now - fromIntegral u
diff = formatTime defaultTimeLocale "%d:%H:%M:%S" delta
refP :: ReadM (PubKey 'Sign HBS2Basic)
refP = maybeReader fromStringMay
hashP :: ReadM HashRef
hashP = maybeReader fromStringMay
myException :: SomeException -> IO ()
myException e = err ( show e )
@ -924,10 +959,11 @@ runPeer opts = U.handle (\e -> myException e
peerThread "blockDownloadLoop" (blockDownloadLoop denv)
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)
peerThread "encryptionHandshakeWorker"
(EncryptionKeys.encryptionHandshakeWorker @e conf pc encryptionHshakeAdapter)
peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)
-- FIXME: clumsy-code
@ -937,7 +973,6 @@ runPeer opts = U.handle (\e -> myException e
peerThread "postponedLoop" (postponedLoop denv)
peerThread "downloadQueue" (downloadQueue conf denv)
peerThread "reflogWorker" (reflogWorker @e conf (SomeBrains brains) rwa)

View File

@ -309,7 +309,7 @@ addDownload mbh h = do
if here then do
removeFromWip h
else do
maybe1 mbh none $ \hp -> claimBlockCameFrom @e brains hp h
claimBlockCameFrom @e brains mbh h
liftIO $ atomically $ modifyTVar tinq $ HashMap.insert h ()
postponedNum :: forall e m . (MyPeer e, MonadIO m) => BlockDownloadM e m Int

View File

@ -0,0 +1,27 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# Language UndecidableInstances #-}
module RPC2.Downloads where
import HBS2.Prelude.Plated
import HBS2.Net.Proto.Service
import HBS2.Peer.Brains
import HBS2.System.Logger.Simple
import HBS2.Net.Proto.Definition()
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.Internal.Types
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcDownloadList where
handleMethod _ = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.downloadList"
listDownloads @L4Proto brains
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcDownloadDel where
handleMethod href = do
brains <- getRpcContext @PeerAPI <&> rpcBrains
debug $ "rpc.downloadDel" <+> pretty href
delDownload @L4Proto brains href

View File

@ -16,4 +16,5 @@ import RPC2.RefChan()
import RPC2.Die()
import RPC2.LogLevel()
import RPC2.Poll()
import RPC2.Downloads()

View File

@ -180,6 +180,7 @@ executable hbs2-peer
, RPC2.PexInfo
, RPC2.Ping
, RPC2.Poll
, RPC2.Downloads
, RPC2.RefLog
, RPC2.RefChan
, PeerTypes

View File

@ -8,6 +8,7 @@ import HBS2.Net.Proto
import HBS2.Hash
import Data.Word
import HBS2.Data.Types.Refs (HashRef(..))
-- TODO: rename
class HasBrains e a where
@ -42,6 +43,12 @@ class HasBrains e a where
listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e]
listTCPPexCandidates _ = pure mempty
listDownloads :: MonadIO m => a -> m [(HashRef, Integer)]
listDownloads _ = pure mempty
delDownload :: MonadIO m => a -> HashRef -> m ()
delDownload _ _ = pure ()
onKnownPeers :: MonadIO m => a -> [Peer e] -> m ()
onKnownPeers _ _ = none
@ -82,7 +89,7 @@ class HasBrains e a where
claimBlockCameFrom :: MonadIO m
=> a
-> Hash HbSync
-> Maybe (Hash HbSync)
-> Hash HbSync
-> m ()
@ -146,6 +153,10 @@ instance HasBrains e (SomeBrains e) where
getClientTCP (SomeBrains a) = getClientTCP @e a
setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a
listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a
listDownloads (SomeBrains a) = listDownloads @e a
delDownload (SomeBrains a) = delDownload @e a
onKnownPeers (SomeBrains a) = onKnownPeers a
onBlockSize (SomeBrains a) = onBlockSize a
onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a

View File

@ -9,6 +9,7 @@ import HBS2.Actors.Peer
import HBS2.Peer.RPC.Internal.Types
import Data.Time.Clock.POSIX (POSIXTime)
import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString)
import Codec.Serialise
@ -26,6 +27,9 @@ data RpcPollList
data RpcPollAdd
data RpcPollDel
data RpcDownloadList
data RpcDownloadDel
type PeerAPI = '[ RpcPoke
, RpcPing
, RpcAnnounce
@ -37,6 +41,8 @@ type PeerAPI = '[ RpcPoke
, RpcPollList
, RpcPollAdd
, RpcPollDel
, RpcDownloadList
, RpcDownloadDel
]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -74,6 +80,12 @@ type instance Output RpcFetch = ()
type instance Input RpcPollList= ()
type instance Output RpcPollList = [(PubKey 'Sign HBS2Basic, String, Int)]
type instance Input RpcDownloadList = ()
type instance Output RpcDownloadList = [(HashRef, Integer)]
type instance Input RpcDownloadDel = HashRef
type instance Output RpcDownloadDel = ()
type instance Input RpcPollAdd = (PubKey 'Sign HBS2Basic, String, Int)
type instance Output RpcPollAdd = ()