mirror of https://github.com/voidlizard/hbs2
511 lines
15 KiB
Haskell
511 lines
15 KiB
Haskell
{-# Language TemplateHaskell #-}
|
|
{-# Language AllowAmbiguousTypes #-}
|
|
{-# Language UndecidableInstances #-}
|
|
{-# Language MultiWayIf #-}
|
|
module HBS2.Net.Proto.QBLF where
|
|
|
|
import HBS2.Prelude.Plated
|
|
import HBS2.System.Logger.Simple
|
|
import HBS2.Clock
|
|
import HBS2.Hash
|
|
|
|
import Control.Applicative
|
|
import Control.Concurrent.STM (flushTQueue)
|
|
import Control.Monad
|
|
import Control.Monad.Trans.Maybe
|
|
import Data.Either
|
|
import Data.Function
|
|
import Data.HashMap.Strict (HashMap)
|
|
import Data.HashMap.Strict qualified as HashMap
|
|
import Data.HashSet (HashSet)
|
|
import Data.HashSet qualified as HashSet
|
|
import Data.Kind
|
|
import Data.List qualified as List
|
|
import Data.Map qualified as Map
|
|
import Data.Maybe
|
|
import Data.Ord
|
|
import Data.Tuple (swap)
|
|
import Lens.Micro.Platform
|
|
import System.Random (randomRIO)
|
|
import Data.Time.Clock.POSIX
|
|
import Codec.Serialise()
|
|
import Data.Fixed
|
|
import UnliftIO
|
|
|
|
{- HLINT ignore "Use newtype instead of data" -}
|
|
|
|
newtype QBLFTimeStamp =
|
|
QBLFTimeStamp { fromQBLFTimeStamp :: Fixed E12 }
|
|
deriving stock (Generic)
|
|
deriving newtype (Eq,Ord,Num,Real,Fractional,Show)
|
|
|
|
instance Serialise QBLFTimeStamp
|
|
|
|
data QBLFMessage w =
|
|
QBLFMsgAnn (QBLFActor w) (QBLFAnnounce w)
|
|
| QBLFMsgMerge (QBLFActor w ) (QBLFMerge w)
|
|
| QBLFMsgCommit (QBLFActor w) (QBLFCommit w)
|
|
| QBLFMsgHeartBeat (QBLFActor w) QBLFStateN (QBLFState w) QBLFTimeStamp
|
|
deriving stock Generic
|
|
|
|
data QBLFAnnounce w =
|
|
QBLFAnnounce (QBLFState w) (QBLFState w)
|
|
deriving stock (Generic)
|
|
|
|
|
|
data QBLFMerge w =
|
|
QBLFMerge (QBLFState w) (QBLFState w)
|
|
deriving stock Generic
|
|
|
|
data QBLFCommit w =
|
|
QBLFCommit (QBLFState w) (QBLFState w)
|
|
deriving stock Generic
|
|
|
|
|
|
data QBLFStateN =
|
|
QWait
|
|
| QAnnounce
|
|
| QMerge
|
|
| QCommit
|
|
deriving stock (Eq,Ord,Enum,Show,Generic)
|
|
|
|
instance Serialise QBLFStateN
|
|
|
|
type ForQBLF w = ( Hashed HbSync (QBLFState w)
|
|
, Hashable (QBLFState w)
|
|
, Hashable (QBLFTransaction w)
|
|
, Hashable (QBLFActor w)
|
|
, Hashable (QBLFAnnounce w)
|
|
, Hashable (QBLFMerge w)
|
|
, Eq (QBLFState w)
|
|
)
|
|
|
|
deriving instance ForQBLF w => Eq (QBLFAnnounce w)
|
|
deriving instance ForQBLF w => Eq (QBLFMerge w)
|
|
|
|
instance ForQBLF w => Hashable (QBLFAnnounce w)
|
|
|
|
instance ForQBLF w => Hashable (QBLFMerge w)
|
|
|
|
class (Monad m, ForQBLF w) => IsQBLF w m where
|
|
type QBLFActor w :: Type
|
|
type QBLFTransaction w :: Type
|
|
type QBLFState w :: Type
|
|
|
|
qblfNewState :: QBLFState w -> [QBLFTransaction w] -> m (QBLFState w)
|
|
qblfBroadCast :: QBLFMessage w -> m ()
|
|
qblfMerge :: QBLFState w -> [QBLFState w] -> m (QBLFState w)
|
|
qblfCommit :: QBLFState w -> QBLFState w -> m ()
|
|
|
|
qblfMoveForward :: QBLFState w -> QBLFState w -> m Bool
|
|
qblfMoveForward _ _ = pure True
|
|
|
|
data QBLF w =
|
|
QBLF
|
|
{ _qblfSelf :: QBLFActor w
|
|
, _qblfAllActors :: HashSet (QBLFActor w)
|
|
, _qblfState :: QBLFStateN
|
|
, _qblfCurrent :: QBLFState w
|
|
, _qblfWaitAnnounce :: Timeout 'Seconds
|
|
, _qblfCommitsFrom :: HashSet (QBLFActor w)
|
|
, _qblfTranQ :: TVar (HashSet (QBLFTransaction w))
|
|
, _qblfAlive :: TVar (HashMap (QBLFActor w) (QBLFStateN, QBLFState w, TimeSpec))
|
|
, _qblfStateTime :: TVar TimeSpec
|
|
, _qblfLastHeartBeat :: TVar TimeSpec
|
|
, _qblfAnnounces :: TVar (HashMap (QBLFActor w, QBLFAnnounce w) TimeSpec)
|
|
, _qblfMerges :: TVar (HashMap (QBLFActor w, QBLFMerge w) TimeSpec)
|
|
}
|
|
|
|
makeLenses ''QBLF
|
|
|
|
|
|
qblfGetActor :: (ForQBLF w) => QBLFMessage w -> QBLFActor w
|
|
qblfGetActor = \case
|
|
QBLFMsgAnn a _ -> a
|
|
QBLFMsgMerge a _ -> a
|
|
QBLFMsgCommit a _ -> a
|
|
QBLFMsgHeartBeat a _ _ _ -> a
|
|
|
|
qblfEnqueue :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFTransaction w -> m ()
|
|
qblfEnqueue me tran = do
|
|
-- synced <- qblfIsSynced me
|
|
-- when synced do
|
|
atomically $ modifyTVar (view qblfTranQ me) (HashSet.insert tran)
|
|
|
|
|
|
|
|
qblfAcceptMessage :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFMessage w -> m ()
|
|
qblfAcceptMessage me msg = do
|
|
|
|
-- FIXME: drop-premature-announces
|
|
|
|
let actor = qblfGetActor msg
|
|
|
|
when (actor `HashSet.member` view qblfAllActors me) do
|
|
now <- getTimeCoarse
|
|
|
|
let mine = view qblfCurrent me
|
|
let add = const True -- not $ HashSet.member x (view qblfIgnore me)
|
|
|
|
case msg of
|
|
QBLFMsgAnn a ann@(QBLFAnnounce s0 _) | add s0 -> do
|
|
atomically $ modifyTVar (view qblfAnnounces me) (HashMap.insert (a, ann) now)
|
|
|
|
QBLFMsgMerge a m@(QBLFMerge s0 _) | add s0 -> do
|
|
atomically $ modifyTVar (view qblfMerges me) (HashMap.insert (a, m) now)
|
|
|
|
QBLFMsgCommit a (QBLFCommit s0 s) | add s0 -> do
|
|
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (QWait,s,now))
|
|
|
|
QBLFMsgHeartBeat a t s _ -> do
|
|
-- debug $ "heartbeat" <+> pretty (view qblfSelf me) <+> pretty (a, s)
|
|
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (t,s,now))
|
|
|
|
_ -> pure ()
|
|
|
|
qblfQuorum :: forall w a m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m, Integral a)
|
|
=> QBLF w
|
|
-> m a
|
|
|
|
qblfQuorum me = do
|
|
-- n <- qblfLastAlive me
|
|
-- pure $ fromIntegral $ 1 + (n `div` 2)
|
|
let aliveSz = view qblfAllActors me & List.length
|
|
pure $ max 1 $ round $ realToFrac (aliveSz + 1) / 2
|
|
|
|
qblfLastAlive :: (ForQBLF w, IsQBLF w m, MonadUnliftIO m) => QBLF w -> m Int
|
|
qblfLastAlive me = pure 0
|
|
-- q <- qblfQuorum me
|
|
-- n <- atomically $ readTVar (view qblfAlive me) <&> HashMap.toList <&> length
|
|
-- if n > 0 then
|
|
-- pure n
|
|
-- else
|
|
-- pure q
|
|
|
|
|
|
qblfInit :: forall w m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m)
|
|
=> QBLFActor w -- ^ self
|
|
-> [QBLFActor w] -- ^ all actors
|
|
-> QBLFState w -- ^ initial state
|
|
-> Timeout 'Seconds -- ^ timeout
|
|
-> m (QBLF w)
|
|
|
|
qblfInit self actors s0 w =
|
|
QBLF self
|
|
(HashSet.fromList actors)
|
|
QWait
|
|
s0
|
|
w
|
|
mempty
|
|
<$> newTVarIO mempty
|
|
<*> newTVarIO mempty
|
|
<*> (newTVarIO =<< now)
|
|
<*> newTVarIO 0
|
|
<*> newTVarIO mempty
|
|
<*> newTVarIO mempty
|
|
|
|
where
|
|
now = getTimeCoarse
|
|
|
|
qblfNextCommitTime :: (MonadIO f, Real a) => a -> f TimeSpec
|
|
qblfNextCommitTime ww = do
|
|
let wt = realToFrac ww
|
|
t0 <- getTimeCoarse
|
|
dt <- liftIO $ randomRIO (wt/2, wt)
|
|
pure $ fromNanoSecs $ toNanoSecs t0 + round (realToFrac dt * 1e9)
|
|
|
|
-- qblfGetState :: (ForQBLF w, MonadUnliftIO m) => QBLF w -> m QBLFStateN
|
|
-- qblfGetState q = readTVarIO (view qblfState q)
|
|
|
|
|
|
|
|
qblfRun :: forall w m . ( Pretty (QBLFActor w)
|
|
, Pretty (QBLFState w)
|
|
, ForQBLF w
|
|
, IsQBLF w m
|
|
, MonadUnliftIO m
|
|
) => QBLF w -> m ()
|
|
qblfRun me = do
|
|
|
|
forever $ do
|
|
void $ qblfTo me QWait
|
|
warn "QUIT!!!"
|
|
|
|
-- mapM_ wait [a1,hb]
|
|
-- mapM_ wait [a1]
|
|
|
|
-- waitAnyCatchCancel []
|
|
|
|
where
|
|
|
|
tAlive = 5
|
|
|
|
minHeartBeat = round 5e9
|
|
|
|
sendHeartBeat :: IsQBLF w m => QBLF w -> m ()
|
|
sendHeartBeat s = do
|
|
ts <- liftIO getPOSIXTime <&> realToFrac
|
|
now <- getTimeCoarse
|
|
sent <- readTVarIO (_qblfLastHeartBeat s)
|
|
when (toNanoSecs (now - sent) > minHeartBeat) do
|
|
qblfBroadCast @w $ QBLFMsgHeartBeat (view qblfSelf s) (view qblfState s) (view qblfCurrent s) ts
|
|
atomically $ writeTVar (_qblfLastHeartBeat s) now
|
|
|
|
sendAnnounce :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
|
|
sendAnnounce s sx = do
|
|
qblfBroadCast @w (QBLFMsgAnn self (QBLFAnnounce current sx))
|
|
where
|
|
self = view qblfSelf s
|
|
current = view qblfCurrent s
|
|
|
|
sendMerge :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
|
|
sendMerge s sx = do
|
|
qblfBroadCast @w (QBLFMsgMerge self (QBLFMerge current sx))
|
|
where
|
|
self = view qblfSelf s
|
|
current = view qblfCurrent s
|
|
|
|
sendCommit :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
|
|
sendCommit s sx = do
|
|
qblfBroadCast @w (QBLFMsgCommit self (QBLFCommit current sx))
|
|
where
|
|
self = view qblfSelf s
|
|
current = view qblfCurrent s
|
|
|
|
|
|
nap = pause @'Seconds 0.25
|
|
|
|
getAlive :: IsQBLF w m => QBLF w -> m [(QBLFActor w, QBLFStateN, QBLFState w) ]
|
|
getAlive s = do
|
|
now <- getTimeCoarse
|
|
states <- readTVarIO (view qblfAlive s) <&> HashMap.toList
|
|
pure [ (a,n,sx) | (a, (n,sx,t)) <- states, toNanoSecs (now - t) < round (2 * tAlive * 1e9) ]
|
|
|
|
sweepMerges :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
|
|
sweepMerges qblf s = do
|
|
atomically do
|
|
old <- readTVar (view qblfMerges qblf) <&> HashMap.toList
|
|
let new = [ ((a, m), t) | ((a, m@(QBLFMerge s0 _)), t) <- old, s0 /= s ]
|
|
writeTVar (view qblfMerges me) (HashMap.fromList new)
|
|
|
|
sweepAnnounces :: IsQBLF w m => QBLF w -> m ()
|
|
sweepAnnounces qblf = do
|
|
-- FIXME: fix-magic-number
|
|
let wnano = fromIntegral $ toNanoSeconds $ max 300 $ 10 * view qblfWaitAnnounce qblf
|
|
|
|
now <- getTimeCoarse
|
|
|
|
swept <- atomically do
|
|
old <- readTVar (view qblfAnnounces qblf) <&> HashMap.toList
|
|
let new = [ ((a, m), t)
|
|
| ((a, m@(QBLFAnnounce _ _)), t) <- old
|
|
, toNanoSecs (now - t) < wnano
|
|
]
|
|
writeTVar (view qblfAnnounces me) (HashMap.fromList new)
|
|
pure $ length old - length new
|
|
|
|
when (swept > 0) do
|
|
debug $ "sweepAnnounces" <+> pretty swept
|
|
|
|
qblfTo s n = do
|
|
now <- getTimeCoarse
|
|
let ns = s { _qblfState = n }
|
|
atomically $ writeTVar (_qblfStateTime ns) now
|
|
case n of
|
|
QWait -> qblfToWait ns
|
|
QAnnounce -> qblfToAnnounce ns
|
|
QMerge -> qblfToMerge ns
|
|
QCommit -> qblfToCommit ns
|
|
|
|
qblfToWait s = do
|
|
let w = view qblfWaitAnnounce s
|
|
q <- qblfQuorum s
|
|
|
|
let trsh = max 2 (q `div` 2)
|
|
|
|
newAnn <- newTVarIO mempty
|
|
|
|
fix \next -> do
|
|
|
|
sendHeartBeat s
|
|
|
|
sweepAnnounces s
|
|
|
|
now <- getTimeCoarse
|
|
alive <- getAlive s
|
|
let wn = sum [ 1 | (_, QWait, _) <- alive ]
|
|
-- debug $ logActor s <+> "wait" <+> pretty wn
|
|
|
|
t0 <- readTVarIO (view qblfStateTime s)
|
|
let elapsed = toNanoSeconds $ TimeoutTS (now - t0)
|
|
|
|
their <- selectState s (fmap (view _3) alive)
|
|
let mine = view qblfCurrent s
|
|
|
|
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
|
|
|
|
let alst = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine, s1 /= s0 ]
|
|
|
|
old <- readTVarIO newAnn
|
|
let an = HashSet.size $ HashSet.fromList alst `HashSet.difference` old
|
|
|
|
atomically $ writeTVar newAnn (HashSet.fromList alst)
|
|
|
|
let g = if | their /= Just mine -> do
|
|
|
|
case their of
|
|
Nothing -> pure $ nap >> next
|
|
Just th -> do
|
|
|
|
-- FIXME: what-if-failed
|
|
forwarded <- qblfMoveForward @w mine th
|
|
|
|
if forwarded then do
|
|
debug $ logActor s <+> "DO FAST-FORWARD" <+> pretty th
|
|
qblfCommit @w mine th
|
|
pure $ qblfTo (set qblfCurrent th s) QAnnounce
|
|
else do
|
|
pure $ nap >> next
|
|
|
|
| wn >= q && (elapsed > toNanoSeconds w || an >= trsh) && Just mine == their -> do
|
|
let el = elapsed > toNanoSeconds w
|
|
let aa = an >= trsh
|
|
debug $ logActor s <+> "ready" <+> pretty their <+> pretty el <+> pretty aa <+> pretty an
|
|
pure $ qblfTo s QAnnounce
|
|
|
|
| otherwise -> pure $ nap >> next
|
|
|
|
join g
|
|
|
|
qblfToAnnounce s = do
|
|
|
|
let mine = view qblfCurrent s
|
|
q <- qblfQuorum s
|
|
let wa = view qblfWaitAnnounce s
|
|
|
|
sweepMerges s mine
|
|
|
|
-- TODO: extract-method
|
|
txs <- atomically do
|
|
tx <- readTVar (view qblfTranQ s) <&> HashSet.toList
|
|
writeTVar (view qblfTranQ s) mempty
|
|
pure tx
|
|
|
|
hx <- qblfNewState @w mine txs
|
|
|
|
sendAnnounce s hx
|
|
|
|
pause (0.1 * wa)
|
|
|
|
g <- race ( pause wa ) do
|
|
|
|
fix \next -> do
|
|
|
|
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
|
|
|
|
let ann = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
|
|
|
|
if length ann >= q then do
|
|
debug $ logActor s <+> "announce/ready-to-merge" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
|
|
pure $ qblfTo s QMerge
|
|
else do
|
|
trace $ logActor s <+> "announce/wait" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
|
|
nap >> next
|
|
|
|
case g of
|
|
Left{} -> qblfTo s QWait
|
|
Right n -> n
|
|
|
|
qblfToMerge s = do
|
|
let mine = view qblfCurrent s
|
|
q <- qblfQuorum s
|
|
|
|
let wa = view qblfWaitAnnounce s
|
|
-- pause @'Seconds wa
|
|
-- debug $ logActor s <+> "merge"
|
|
|
|
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
|
|
let aann = [ (a, s1) | (a, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
|
|
|
|
let ann = fmap snd aann
|
|
let actors = fmap fst aann & HashSet.fromList
|
|
|
|
let g = case ann of
|
|
[] -> do
|
|
debug $ "MERGE: not found SHIT!" <+> pretty (length aann)
|
|
pure $ qblfTo s QWait
|
|
|
|
(_:_) -> do
|
|
new <- qblfMerge @w mine ann
|
|
sendMerge s new
|
|
pure $ qblfTo (set qblfCommitsFrom actors s) QCommit
|
|
|
|
join g
|
|
|
|
qblfToCommit s = do
|
|
let mine = view qblfCurrent s
|
|
let authors = view qblfCommitsFrom s
|
|
|
|
-- pause @'Seconds 2
|
|
debug $ logActor s <+> "qblfToCommit"
|
|
|
|
-- FIXME: timeout-and-rollback-to-wait
|
|
|
|
let wa = view qblfWaitAnnounce s
|
|
|
|
r <- race ( pause wa ) do
|
|
fix \next -> do
|
|
|
|
merges0 <- atomically $ readTVar (view qblfMerges s) <&> HashMap.keys
|
|
let merges = [ (s1, a, m)
|
|
| (a, m@(QBLFMerge s0 s1)) <- merges0, s0 == mine
|
|
, a `HashSet.member` authors
|
|
]
|
|
|
|
mbNew <- selectState s (fmap (view _1) merges)
|
|
|
|
trace $ "#### COMMIT NEW:"
|
|
<+> pretty mine
|
|
<+> pretty mbNew
|
|
-- <+> pretty (fmap (view _1) merges)
|
|
|
|
case mbNew of
|
|
Just new -> do
|
|
when (new /= mine) do
|
|
debug $ logActor s <+> "commit: " <+> pretty new
|
|
sendCommit s new
|
|
qblfCommit @w mine new
|
|
-- sweepAnnounces s mine
|
|
|
|
pure $ qblfTo ( set qblfCurrent new s
|
|
) QWait
|
|
|
|
Nothing -> do
|
|
-- debug $ logActor s <+> "commit: " <+> "fail"
|
|
nap >> next
|
|
|
|
case r of
|
|
Left{} -> qblfTo s QWait
|
|
Right n -> n
|
|
|
|
selectState :: IsQBLF w m => QBLF w -> [QBLFState w] -> m (Maybe (QBLFState w))
|
|
selectState s sx = do
|
|
q <- qblfQuorum s
|
|
let mbs = fmap (,1) sx & HashMap.fromListWith (+)
|
|
& HashMap.toList
|
|
& fmap (over _2 List.singleton . swap)
|
|
& Map.fromListWith (<>)
|
|
& Map.toDescList
|
|
& headMay
|
|
runMaybeT do
|
|
ss <- MaybeT $ pure mbs
|
|
let sss = over _2 (List.sortOn (Down . hashObject @HbSync)) ss :: (Integer, [QBLFState w])
|
|
|
|
if fst sss >= q then do
|
|
MaybeT $ pure $ headMay (snd sss)
|
|
else
|
|
mzero
|
|
|
|
logActor s = "ACTOR" <> parens (pretty (view qblfSelf s))
|
|
|