From c8395f38ed15479c5119d631133413fa3e223436 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 18 Apr 2023 13:21:12 +0300 Subject: [PATCH] making-reflogs-eventually-constistent --- hbs2-peer/app/Brains.hs | 85 ++++++++++++++++++++++++++++++++++++- hbs2-peer/app/PeerConfig.hs | 5 ++- hbs2-peer/app/PeerMain.hs | 9 +++- hbs2-peer/app/RefLog.hs | 6 ++- 4 files changed, 99 insertions(+), 6 deletions(-) diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 0526cef7..a41645b9 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -10,6 +10,8 @@ import HBS2.Hash import HBS2.System.Logger.Simple +import PeerConfig + import Data.Maybe import Control.Monad import Control.Exception @@ -27,6 +29,8 @@ import Database.SQLite.Simple.FromField import System.Random (randomRIO) import Data.Word import Data.Either +import System.Directory +import System.FilePath class HasBrains e a where @@ -91,6 +95,20 @@ class HasBrains e a where blockSize _ _ _ = pure Nothing + isReflogProcessed :: (MonadIO m) + => a + -> Hash HbSync + -> m Bool + + isReflogProcessed _ _ = pure False + + setReflogProcessed :: (MonadIO m) + => a + -> Hash HbSync + -> m () + + setReflogProcessed _ _ = pure () + type NoBrains = () instance Pretty (Peer e) => HasBrains e NoBrains where @@ -130,6 +148,8 @@ instance HasBrains e (SomeBrains e) where shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a blockSize (SomeBrains a) = blockSize @e a + isReflogProcessed (SomeBrains a) = isReflogProcessed @e a + setReflogProcessed (SomeBrains a) = setReflogProcessed @e a newtype CommitCmd = CommitCmd { onCommited :: IO () } @@ -215,6 +235,12 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe blockSize b p h = do liftIO $ selectBlockSize b p h + isReflogProcessed b h = do + liftIO $ selectReflogProcessed b h + + setReflogProcessed b h = do + updateOP b $ insertReflogProcessed b h + commitNow :: forall e m . MonadIO m => BasicBrains e -> Bool @@ -256,6 +282,38 @@ insertSize br p h s = do |] (show $ pretty h, show $ pretty p, s, s) +insertReflogProcessed :: BasicBrains e + -> Hash HbSync + -> IO () + +insertReflogProcessed br h = do + + + let conn = view brainsDb br + + void $ liftIO $ execute conn [qc| + insert into statedb.processed (hash) values (?) + on conflict (hash) do nothing + |] (Only (show $ pretty h)) + +selectReflogProcessed :: forall e . Pretty (Peer e) + => BasicBrains e + -> Hash HbSync + -> IO Bool +selectReflogProcessed br h = do + + let conn = view brainsDb br + + liftIO $ query conn [qc| + select 1 + from statedb.processed + where hash = ? + limit 1 + |] (Only (show $ pretty h)) <&> fmap (fromOnly @Int) + <&> listToMaybe + <&> isJust + + selectBlockSize :: forall e . Pretty (Peer e) => BasicBrains e -> Peer e @@ -407,11 +465,34 @@ transactional brains action = do execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|] -- FIXME: eventually-close-db -newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => m (BasicBrains e) -newBasicBrains = liftIO do +newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) + => PeerConfig + -> m (BasicBrains e) + +newBasicBrains cfg = liftIO do + + sdir <- peerStateDirDefault + + liftIO $ createDirectoryIfMissing True sdir + + let stateDb = sdir "brains.db" conn <- open ":memory:" + execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|] + + execute_ conn [qc| + create table if not exists statedb.processed ( hash text not null primary key ); + |] + + execute_ conn [qc| + create table if not exists ancestors + ( child text not null + , parent text not null + , ts DATE DEFAULT (datetime('now','localtime')) + , primary key (child,parent)) + |] + execute_ conn [qc| create table if not exists ancestors ( child text not null diff --git a/hbs2-peer/app/PeerConfig.hs b/hbs2-peer/app/PeerConfig.hs index 4f219862..57cbacef 100644 --- a/hbs2-peer/app/PeerConfig.hs +++ b/hbs2-peer/app/PeerConfig.hs @@ -80,6 +80,9 @@ peerConfigDefault = liftIO $ catchAny :: IO a -> (SomeException -> IO a) -> IO a catchAny = Control.Exception.catch +peerStateDirDefault :: MonadIO m => m FilePath +peerStateDirDefault = liftIO $ getXdgDirectory XdgData "hbs2-peer" + defConfigData :: String defConfigData = [qc| @@ -155,7 +158,7 @@ peerConfigRead mbfp = do -- debug $ pretty confData - config <- transformBiM (canonicalizeConfPaths ["key", "storage", "download-log"] dir) confData + config <- transformBiM (canonicalizeConfPaths ["key", "storage", "download-log", "state-dir"] dir) confData pure $ PeerConfig config where diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index dc5bfa83..7ec05b85 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -518,7 +518,7 @@ runPeer opts = Exception.handle myException $ do messMcast <- async $ runMessagingUDP mcast `catch` (\(e::SomeException) -> throwIO e ) - brains <- newBasicBrains @e + brains <- newBasicBrains @e conf brainsThread <- async $ runBasicBrains brains @@ -571,7 +571,12 @@ runPeer opts = Exception.handle myException $ do let doDownload h = do - withPeerM penv $ withDownload denv (addDownload mzero h) + pro <- isReflogProcessed @e brains h + if pro then do + withPeerM penv $ withDownload denv (addDownload mzero h) + else do + withPeerM penv $ withDownload denv (processBlock h) + setReflogProcessed @e brains h let doFetchRef puk = do withPeerM penv $ do diff --git a/hbs2-peer/app/RefLog.hs b/hbs2-peer/app/RefLog.hs index e9e6d013..684f9d2a 100644 --- a/hbs2-peer/app/RefLog.hs +++ b/hbs2-peer/app/RefLog.hs @@ -290,7 +290,11 @@ reflogWorker conf adapter = do case hr of Left ha -> do atomically $ modifyTVar missed (ha:) - Right (_ :: [HashRef]) -> pure () + Right (hs :: [HashRef]) -> do + w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust + let mi = [ hx | (False,hx) <- zip w hs ] + for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:) + liftIO $ readTVarIO missed