hbs2/hbs2-qblf/lib/HBS2/Net/Proto/QBLF.hs

511 lines
15 KiB
Haskell

{-# Language TemplateHaskell #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
{-# Language MultiWayIf #-}
module HBS2.Net.Proto.QBLF where
import HBS2.Prelude.Plated
import HBS2.System.Logger.Simple
import HBS2.Clock
import HBS2.Hash
import Control.Applicative
import Control.Concurrent.STM (flushTQueue)
import Control.Monad
import Control.Monad.Trans.Maybe
import Data.Either
import Data.Function
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashSet (HashSet)
import Data.HashSet qualified as HashSet
import Data.Kind
import Data.List qualified as List
import Data.Map qualified as Map
import Data.Maybe
import Data.Ord
import Data.Tuple (swap)
import Lens.Micro.Platform
import System.Random (randomRIO)
import Data.Time.Clock.POSIX
import Codec.Serialise()
import Data.Fixed
import UnliftIO
{- HLINT ignore "Use newtype instead of data" -}
newtype QBLFTimeStamp =
QBLFTimeStamp { fromQBLFTimeStamp :: Fixed E12 }
deriving stock (Generic)
deriving newtype (Eq,Ord,Num,Real,Fractional,Show)
instance Serialise QBLFTimeStamp
data QBLFMessage w =
QBLFMsgAnn (QBLFActor w) (QBLFAnnounce w)
| QBLFMsgMerge (QBLFActor w ) (QBLFMerge w)
| QBLFMsgCommit (QBLFActor w) (QBLFCommit w)
| QBLFMsgHeartBeat (QBLFActor w) QBLFStateN (QBLFState w) QBLFTimeStamp
deriving stock Generic
data QBLFAnnounce w =
QBLFAnnounce (QBLFState w) (QBLFState w)
deriving stock (Generic)
data QBLFMerge w =
QBLFMerge (QBLFState w) (QBLFState w)
deriving stock Generic
data QBLFCommit w =
QBLFCommit (QBLFState w) (QBLFState w)
deriving stock Generic
data QBLFStateN =
QWait
| QAnnounce
| QMerge
| QCommit
deriving stock (Eq,Ord,Enum,Show,Generic)
instance Serialise QBLFStateN
type ForQBLF w = ( Hashed HbSync (QBLFState w)
, Hashable (QBLFState w)
, Hashable (QBLFTransaction w)
, Hashable (QBLFActor w)
, Hashable (QBLFAnnounce w)
, Hashable (QBLFMerge w)
, Eq (QBLFState w)
)
deriving instance ForQBLF w => Eq (QBLFAnnounce w)
deriving instance ForQBLF w => Eq (QBLFMerge w)
instance ForQBLF w => Hashable (QBLFAnnounce w)
instance ForQBLF w => Hashable (QBLFMerge w)
class (Monad m, ForQBLF w) => IsQBLF w m where
type QBLFActor w :: Type
type QBLFTransaction w :: Type
type QBLFState w :: Type
qblfNewState :: QBLFState w -> [QBLFTransaction w] -> m (QBLFState w)
qblfBroadCast :: QBLFMessage w -> m ()
qblfMerge :: QBLFState w -> [QBLFState w] -> m (QBLFState w)
qblfCommit :: QBLFState w -> QBLFState w -> m ()
qblfMoveForward :: QBLFState w -> QBLFState w -> m Bool
qblfMoveForward _ _ = pure True
data QBLF w =
QBLF
{ _qblfSelf :: QBLFActor w
, _qblfAllActors :: HashSet (QBLFActor w)
, _qblfState :: QBLFStateN
, _qblfCurrent :: QBLFState w
, _qblfWaitAnnounce :: Timeout 'Seconds
, _qblfCommitsFrom :: HashSet (QBLFActor w)
, _qblfTranQ :: TVar (HashSet (QBLFTransaction w))
, _qblfAlive :: TVar (HashMap (QBLFActor w) (QBLFStateN, QBLFState w, TimeSpec))
, _qblfStateTime :: TVar TimeSpec
, _qblfLastHeartBeat :: TVar TimeSpec
, _qblfAnnounces :: TVar (HashMap (QBLFActor w, QBLFAnnounce w) TimeSpec)
, _qblfMerges :: TVar (HashMap (QBLFActor w, QBLFMerge w) TimeSpec)
}
makeLenses ''QBLF
qblfGetActor :: (ForQBLF w) => QBLFMessage w -> QBLFActor w
qblfGetActor = \case
QBLFMsgAnn a _ -> a
QBLFMsgMerge a _ -> a
QBLFMsgCommit a _ -> a
QBLFMsgHeartBeat a _ _ _ -> a
qblfEnqueue :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFTransaction w -> m ()
qblfEnqueue me tran = do
-- synced <- qblfIsSynced me
-- when synced do
atomically $ modifyTVar (view qblfTranQ me) (HashSet.insert tran)
qblfAcceptMessage :: (ForQBLF w, MonadIO m) => QBLF w -> QBLFMessage w -> m ()
qblfAcceptMessage me msg = do
-- FIXME: drop-premature-announces
let actor = qblfGetActor msg
when (actor `HashSet.member` view qblfAllActors me) do
now <- getTimeCoarse
let mine = view qblfCurrent me
let add = const True -- not $ HashSet.member x (view qblfIgnore me)
case msg of
QBLFMsgAnn a ann@(QBLFAnnounce s0 _) | add s0 -> do
atomically $ modifyTVar (view qblfAnnounces me) (HashMap.insert (a, ann) now)
QBLFMsgMerge a m@(QBLFMerge s0 _) | add s0 -> do
atomically $ modifyTVar (view qblfMerges me) (HashMap.insert (a, m) now)
QBLFMsgCommit a (QBLFCommit s0 s) | add s0 -> do
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (QWait,s,now))
QBLFMsgHeartBeat a t s _ -> do
-- debug $ "heartbeat" <+> pretty (view qblfSelf me) <+> pretty (a, s)
atomically $ modifyTVar (view qblfAlive me) (HashMap.insert a (t,s,now))
_ -> pure ()
qblfQuorum :: forall w a m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m, Integral a)
=> QBLF w
-> m a
qblfQuorum me = do
-- n <- qblfLastAlive me
-- pure $ fromIntegral $ 1 + (n `div` 2)
let aliveSz = view qblfAllActors me & List.length
pure $ max 1 $ round $ realToFrac (aliveSz + 1) / 2
qblfLastAlive :: (ForQBLF w, IsQBLF w m, MonadUnliftIO m) => QBLF w -> m Int
qblfLastAlive me = pure 0
-- q <- qblfQuorum me
-- n <- atomically $ readTVar (view qblfAlive me) <&> HashMap.toList <&> length
-- if n > 0 then
-- pure n
-- else
-- pure q
qblfInit :: forall w m . (ForQBLF w, IsQBLF w m, MonadUnliftIO m)
=> QBLFActor w -- ^ self
-> [QBLFActor w] -- ^ all actors
-> QBLFState w -- ^ initial state
-> Timeout 'Seconds -- ^ timeout
-> m (QBLF w)
qblfInit self actors s0 w =
QBLF self
(HashSet.fromList actors)
QWait
s0
w
mempty
<$> newTVarIO mempty
<*> newTVarIO mempty
<*> (newTVarIO =<< now)
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO mempty
where
now = getTimeCoarse
qblfNextCommitTime :: (MonadIO f, Real a) => a -> f TimeSpec
qblfNextCommitTime ww = do
let wt = realToFrac ww
t0 <- getTimeCoarse
dt <- liftIO $ randomRIO (wt/2, wt)
pure $ fromNanoSecs $ toNanoSecs t0 + round (realToFrac dt * 1e9)
-- qblfGetState :: (ForQBLF w, MonadUnliftIO m) => QBLF w -> m QBLFStateN
-- qblfGetState q = readTVarIO (view qblfState q)
qblfRun :: forall w m . ( Pretty (QBLFActor w)
, Pretty (QBLFState w)
, ForQBLF w
, IsQBLF w m
, MonadUnliftIO m
) => QBLF w -> m ()
qblfRun me = do
forever $ do
void $ qblfTo me QWait
warn "QUIT!!!"
-- mapM_ wait [a1,hb]
-- mapM_ wait [a1]
-- waitAnyCatchCancel []
where
tAlive = 5
minHeartBeat = round 5e9
sendHeartBeat :: IsQBLF w m => QBLF w -> m ()
sendHeartBeat s = do
ts <- liftIO getPOSIXTime <&> realToFrac
now <- getTimeCoarse
sent <- readTVarIO (_qblfLastHeartBeat s)
when (toNanoSecs (now - sent) > minHeartBeat) do
qblfBroadCast @w $ QBLFMsgHeartBeat (view qblfSelf s) (view qblfState s) (view qblfCurrent s) ts
atomically $ writeTVar (_qblfLastHeartBeat s) now
sendAnnounce :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendAnnounce s sx = do
qblfBroadCast @w (QBLFMsgAnn self (QBLFAnnounce current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
sendMerge :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendMerge s sx = do
qblfBroadCast @w (QBLFMsgMerge self (QBLFMerge current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
sendCommit :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sendCommit s sx = do
qblfBroadCast @w (QBLFMsgCommit self (QBLFCommit current sx))
where
self = view qblfSelf s
current = view qblfCurrent s
nap = pause @'Seconds 0.25
getAlive :: IsQBLF w m => QBLF w -> m [(QBLFActor w, QBLFStateN, QBLFState w) ]
getAlive s = do
now <- getTimeCoarse
states <- readTVarIO (view qblfAlive s) <&> HashMap.toList
pure [ (a,n,sx) | (a, (n,sx,t)) <- states, toNanoSecs (now - t) < round (2 * tAlive * 1e9) ]
sweepMerges :: IsQBLF w m => QBLF w -> QBLFState w -> m ()
sweepMerges qblf s = do
atomically do
old <- readTVar (view qblfMerges qblf) <&> HashMap.toList
let new = [ ((a, m), t) | ((a, m@(QBLFMerge s0 _)), t) <- old, s0 /= s ]
writeTVar (view qblfMerges me) (HashMap.fromList new)
sweepAnnounces :: IsQBLF w m => QBLF w -> m ()
sweepAnnounces qblf = do
-- FIXME: fix-magic-number
let wnano = fromIntegral $ toNanoSeconds $ max 300 $ 10 * view qblfWaitAnnounce qblf
now <- getTimeCoarse
swept <- atomically do
old <- readTVar (view qblfAnnounces qblf) <&> HashMap.toList
let new = [ ((a, m), t)
| ((a, m@(QBLFAnnounce _ _)), t) <- old
, toNanoSecs (now - t) < wnano
]
writeTVar (view qblfAnnounces me) (HashMap.fromList new)
pure $ length old - length new
when (swept > 0) do
debug $ "sweepAnnounces" <+> pretty swept
qblfTo s n = do
now <- getTimeCoarse
let ns = s { _qblfState = n }
atomically $ writeTVar (_qblfStateTime ns) now
case n of
QWait -> qblfToWait ns
QAnnounce -> qblfToAnnounce ns
QMerge -> qblfToMerge ns
QCommit -> qblfToCommit ns
qblfToWait s = do
let w = view qblfWaitAnnounce s
q <- qblfQuorum s
let trsh = max 2 (q `div` 2)
newAnn <- newTVarIO mempty
fix \next -> do
sendHeartBeat s
sweepAnnounces s
now <- getTimeCoarse
alive <- getAlive s
let wn = sum [ 1 | (_, QWait, _) <- alive ]
-- debug $ logActor s <+> "wait" <+> pretty wn
t0 <- readTVarIO (view qblfStateTime s)
let elapsed = toNanoSeconds $ TimeoutTS (now - t0)
their <- selectState s (fmap (view _3) alive)
let mine = view qblfCurrent s
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let alst = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine, s1 /= s0 ]
old <- readTVarIO newAnn
let an = HashSet.size $ HashSet.fromList alst `HashSet.difference` old
atomically $ writeTVar newAnn (HashSet.fromList alst)
let g = if | their /= Just mine -> do
case their of
Nothing -> pure $ nap >> next
Just th -> do
-- FIXME: what-if-failed
forwarded <- qblfMoveForward @w mine th
if forwarded then do
debug $ logActor s <+> "DO FAST-FORWARD" <+> pretty th
qblfCommit @w mine th
pure $ qblfTo (set qblfCurrent th s) QAnnounce
else do
pure $ nap >> next
| wn >= q && (elapsed > toNanoSeconds w || an >= trsh) && Just mine == their -> do
let el = elapsed > toNanoSeconds w
let aa = an >= trsh
debug $ logActor s <+> "ready" <+> pretty their <+> pretty el <+> pretty aa <+> pretty an
pure $ qblfTo s QAnnounce
| otherwise -> pure $ nap >> next
join g
qblfToAnnounce s = do
let mine = view qblfCurrent s
q <- qblfQuorum s
let wa = view qblfWaitAnnounce s
sweepMerges s mine
-- TODO: extract-method
txs <- atomically do
tx <- readTVar (view qblfTranQ s) <&> HashSet.toList
writeTVar (view qblfTranQ s) mempty
pure tx
hx <- qblfNewState @w mine txs
sendAnnounce s hx
pause (0.1 * wa)
g <- race ( pause wa ) do
fix \next -> do
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let ann = [ s1 | (_, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
if length ann >= q then do
debug $ logActor s <+> "announce/ready-to-merge" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
pure $ qblfTo s QMerge
else do
trace $ logActor s <+> "announce/wait" <+> pretty (view qblfCurrent s) <+> pretty (length ann)
nap >> next
case g of
Left{} -> qblfTo s QWait
Right n -> n
qblfToMerge s = do
let mine = view qblfCurrent s
q <- qblfQuorum s
let wa = view qblfWaitAnnounce s
-- pause @'Seconds wa
-- debug $ logActor s <+> "merge"
ann0 <- atomically $ readTVar (view qblfAnnounces s) <&> HashMap.keys
let aann = [ (a, s1) | (a, QBLFAnnounce s0 s1) <- ann0, s0 == mine ]
let ann = fmap snd aann
let actors = fmap fst aann & HashSet.fromList
let g = case ann of
[] -> do
debug $ "MERGE: not found SHIT!" <+> pretty (length aann)
pure $ qblfTo s QWait
(_:_) -> do
new <- qblfMerge @w mine ann
sendMerge s new
pure $ qblfTo (set qblfCommitsFrom actors s) QCommit
join g
qblfToCommit s = do
let mine = view qblfCurrent s
let authors = view qblfCommitsFrom s
-- pause @'Seconds 2
debug $ logActor s <+> "qblfToCommit"
-- FIXME: timeout-and-rollback-to-wait
let wa = view qblfWaitAnnounce s
r <- race ( pause wa ) do
fix \next -> do
merges0 <- atomically $ readTVar (view qblfMerges s) <&> HashMap.keys
let merges = [ (s1, a, m)
| (a, m@(QBLFMerge s0 s1)) <- merges0, s0 == mine
, a `HashSet.member` authors
]
mbNew <- selectState s (fmap (view _1) merges)
trace $ "#### COMMIT NEW:"
<+> pretty mine
<+> pretty mbNew
-- <+> pretty (fmap (view _1) merges)
case mbNew of
Just new -> do
when (new /= mine) do
debug $ logActor s <+> "commit: " <+> pretty new
sendCommit s new
qblfCommit @w mine new
-- sweepAnnounces s mine
pure $ qblfTo ( set qblfCurrent new s
) QWait
Nothing -> do
-- debug $ logActor s <+> "commit: " <+> "fail"
nap >> next
case r of
Left{} -> qblfTo s QWait
Right n -> n
selectState :: IsQBLF w m => QBLF w -> [QBLFState w] -> m (Maybe (QBLFState w))
selectState s sx = do
q <- qblfQuorum s
let mbs = fmap (,1) sx & HashMap.fromListWith (+)
& HashMap.toList
& fmap (over _2 List.singleton . swap)
& Map.fromListWith (<>)
& Map.toDescList
& headMay
runMaybeT do
ss <- MaybeT $ pure mbs
let sss = over _2 (List.sortOn (Down . hashObject @HbSync)) ss :: (Integer, [QBLFState w])
if fst sss >= q then do
MaybeT $ pure $ headMay (snd sss)
else
mzero
logActor s = "ACTOR" <> parens (pretty (view qblfSelf s))