making-reflogs-eventually-constistent

This commit is contained in:
Dmitry Zuikov 2023-04-18 13:21:12 +03:00
parent eabe7a2b91
commit c8395f38ed
4 changed files with 99 additions and 6 deletions

View File

@ -10,6 +10,8 @@ import HBS2.Hash
import HBS2.System.Logger.Simple
import PeerConfig
import Data.Maybe
import Control.Monad
import Control.Exception
@ -27,6 +29,8 @@ import Database.SQLite.Simple.FromField
import System.Random (randomRIO)
import Data.Word
import Data.Either
import System.Directory
import System.FilePath
class HasBrains e a where
@ -91,6 +95,20 @@ class HasBrains e a where
blockSize _ _ _ = pure Nothing
isReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m Bool
isReflogProcessed _ _ = pure False
setReflogProcessed :: (MonadIO m)
=> a
-> Hash HbSync
-> m ()
setReflogProcessed _ _ = pure ()
type NoBrains = ()
instance Pretty (Peer e) => HasBrains e NoBrains where
@ -130,6 +148,8 @@ instance HasBrains e (SomeBrains e) where
shouldDownloadBlock (SomeBrains a) = shouldDownloadBlock @e a
advisePeersForBlock (SomeBrains a) = advisePeersForBlock @e a
blockSize (SomeBrains a) = blockSize @e a
isReflogProcessed (SomeBrains a) = isReflogProcessed @e a
setReflogProcessed (SomeBrains a) = setReflogProcessed @e a
newtype CommitCmd = CommitCmd { onCommited :: IO () }
@ -215,6 +235,12 @@ instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) whe
blockSize b p h = do
liftIO $ selectBlockSize b p h
isReflogProcessed b h = do
liftIO $ selectReflogProcessed b h
setReflogProcessed b h = do
updateOP b $ insertReflogProcessed b h
commitNow :: forall e m . MonadIO m
=> BasicBrains e
-> Bool
@ -256,6 +282,38 @@ insertSize br p h s = do
|] (show $ pretty h, show $ pretty p, s, s)
insertReflogProcessed :: BasicBrains e
-> Hash HbSync
-> IO ()
insertReflogProcessed br h = do
let conn = view brainsDb br
void $ liftIO $ execute conn [qc|
insert into statedb.processed (hash) values (?)
on conflict (hash) do nothing
|] (Only (show $ pretty h))
selectReflogProcessed :: forall e . Pretty (Peer e)
=> BasicBrains e
-> Hash HbSync
-> IO Bool
selectReflogProcessed br h = do
let conn = view brainsDb br
liftIO $ query conn [qc|
select 1
from statedb.processed
where hash = ?
limit 1
|] (Only (show $ pretty h)) <&> fmap (fromOnly @Int)
<&> listToMaybe
<&> isJust
selectBlockSize :: forall e . Pretty (Peer e)
=> BasicBrains e
-> Peer e
@ -407,11 +465,34 @@ transactional brains action = do
execute_ conn [qc|ROLLBACK TO SAVEPOINT {sp}|]
-- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => m (BasicBrains e)
newBasicBrains = liftIO do
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m)
=> PeerConfig
-> m (BasicBrains e)
newBasicBrains cfg = liftIO do
sdir <- peerStateDirDefault
liftIO $ createDirectoryIfMissing True sdir
let stateDb = sdir </> "brains.db"
conn <- open ":memory:"
execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|]
execute_ conn [qc|
create table if not exists statedb.processed ( hash text not null primary key );
|]
execute_ conn [qc|
create table if not exists ancestors
( child text not null
, parent text not null
, ts DATE DEFAULT (datetime('now','localtime'))
, primary key (child,parent))
|]
execute_ conn [qc|
create table if not exists ancestors
( child text not null

View File

@ -80,6 +80,9 @@ peerConfigDefault = liftIO $
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
peerStateDirDefault :: MonadIO m => m FilePath
peerStateDirDefault = liftIO $ getXdgDirectory XdgData "hbs2-peer"
defConfigData :: String
defConfigData = [qc|
@ -155,7 +158,7 @@ peerConfigRead mbfp = do
-- debug $ pretty confData
config <- transformBiM (canonicalizeConfPaths ["key", "storage", "download-log"] dir) confData
config <- transformBiM (canonicalizeConfPaths ["key", "storage", "download-log", "state-dir"] dir) confData
pure $ PeerConfig config
where

View File

@ -518,7 +518,7 @@ runPeer opts = Exception.handle myException $ do
messMcast <- async $ runMessagingUDP mcast
`catch` (\(e::SomeException) -> throwIO e )
brains <- newBasicBrains @e
brains <- newBasicBrains @e conf
brainsThread <- async $ runBasicBrains brains
@ -571,7 +571,12 @@ runPeer opts = Exception.handle myException $ do
let doDownload h = do
pro <- isReflogProcessed @e brains h
if pro then do
withPeerM penv $ withDownload denv (addDownload mzero h)
else do
withPeerM penv $ withDownload denv (processBlock h)
setReflogProcessed @e brains h
let doFetchRef puk = do
withPeerM penv $ do

View File

@ -290,7 +290,11 @@ reflogWorker conf adapter = do
case hr of
Left ha -> do
atomically $ modifyTVar missed (ha:)
Right (_ :: [HashRef]) -> pure ()
Right (hs :: [HashRef]) -> do
w <- mapM ( hasBlock sto . fromHashRef ) hs <&> fmap isJust
let mi = [ hx | (False,hx) <- zip w hs ]
for_ mi $ \hx -> liftIO $ atomically $ modifyTVar missed (fromHashRef hx:)
liftIO $ readTVarIO missed