This commit is contained in:
Dmitry Zuikov 2023-01-22 09:20:47 +03:00
parent 0f458134d3
commit f9748ed1fc
7 changed files with 100 additions and 23 deletions

View File

@ -119,6 +119,7 @@ library
, text , text
, transformers , transformers
, uniplate , uniplate
, unordered-containers
hs-source-dirs: lib hs-source-dirs: lib
default-language: Haskell2010 default-language: Haskell2010

View File

@ -5,6 +5,7 @@ module HBS2.Actors.Peer where
import HBS2.Prelude.Plated import HBS2.Prelude.Plated
import HBS2.Hash import HBS2.Hash
import HBS2.Clock
import HBS2.Actors import HBS2.Actors
import HBS2.Storage import HBS2.Storage
import HBS2.Net.Proto import HBS2.Net.Proto
@ -26,6 +27,13 @@ import Data.Map qualified as Map
import Data.Maybe import Data.Maybe
import GHC.TypeLits import GHC.TypeLits
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM
import Data.Hashable (hash)
import Prettyprinter hiding (pipe)
data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu
@ -82,12 +90,14 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p))
data PeerEnv e = data PeerEnv e =
PeerEnv PeerEnv
{ _envSelf :: Peer e { _envSelf :: Peer e
, _envFab :: Fabriq e , _envFab :: Fabriq e
, _envStorage :: AnyStorage , _envStorage :: AnyStorage
, _envDeferred :: Pipeline IO () , _envDeferred :: Pipeline IO ()
, _envSessions :: Cache SKey Dynamic , _envSessions :: Cache SKey Dynamic
, _envEvents :: Cache SKey Dynamic , _envEvents :: TVar (HashMap SKey [Dynamic])
, _envExpireTimes :: Cache SKey ()
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
} }
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a }
@ -187,20 +197,51 @@ instance ( MonadIO m
let bs = serialise (AnyMessage @e proto (encode msg)) let bs = serialise (AnyMessage @e proto (encode msg))
sendTo pipe (To p) (From me) bs sendTo pipe (To p) (From me) bs
instance ( HasProtocol e p instance ( HasProtocol e p
, Typeable (EventHandler e p (PeerM e IO)) , Typeable (EventHandler e p (PeerM e IO))
, Typeable (EventKey e p) , Typeable (EventKey e p)
, Typeable (Event e p) , Typeable (Event e p)
, Hashable (EventKey e p) , Hashable (EventKey e p)
, Expires (EventKey e p)
, Eq (EventKey e p) , Eq (EventKey e p)
) => EventListener e p (PeerM e IO) where ) => EventListener e p (PeerM e IO) where
subscribe k h = do subscribe k h = do
ev <- asks (view envEvents) ev <- asks (view envEvents)
liftIO $ Cache.insert ev (newSKey @(EventKey e p) k) (toDyn h) let sk = newSKey @(EventKey e p) k
pure () let dyn = toDyn h
liftIO $ atomically $ modifyTVar' ev (HashMap.insertWith (<>) sk [dyn])
-- FIXME: add a sweeping routine or else everything will be fucked!
addSweeper (expiresIn (Proxy @(EventKey e p))) sk $ do
liftIO $ print $ "sweep smth with key" <+> pretty (hash sk)
liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk)
addSweeper :: forall e . Timeout 'Seconds -> SKey -> PeerM e IO () -> PeerM e IO ()
addSweeper t k sweeper = do
liftIO $ print $ "adding sweeper for key" <+> pretty (hash k)
ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers)
liftIO $ Cache.insert' ex (Just (toTimeSpec t)) k ()
liftIO $ atomically $ modifyTVar' sw (HashMap.insertWith (<>) k [sweeper])
sweep :: PeerM e IO ()
sweep = do
ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers)
liftIO $ Cache.purgeExpired ex
toSweep <- HashMap.toList <$> liftIO (readTVarIO sw)
alive <- forM toSweep $ \(s, actions) -> do
here <- liftIO $ Cache.lookup' ex s <&> isJust
if here then
pure [(s, actions)]
else do
sequence_ actions
pure []
liftIO $ atomically $ modifyTVar' sw (<> HashMap.fromList (mconcat alive))
instance ( HasProtocol e p instance ( HasProtocol e p
, Typeable (EventKey e p) , Typeable (EventKey e p)
@ -215,20 +256,30 @@ instance ( HasProtocol e p
let sk = newSKey @(EventKey e p) k let sk = newSKey @(EventKey e p) k
void $ runMaybeT $ do void $ runMaybeT $ do
r <- MaybeT $ liftIO $ Cache.lookup se sk subs <- MaybeT $ liftIO $ atomically $ readTVar se <&> HashMap.lookup sk
ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r void $ liftIO $ atomically $ modifyTVar' se (HashMap.delete sk)
lift $ ev d for_ subs $ \r -> do
ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r
lift $ ev d
runPeerM :: (MonadIO m, Pretty (Peer e)) => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m ()
runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m ()
runPeerM s bus p f = do runPeerM s bus p f = do
env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize
<*> liftIO (Cache.newCache (Just defCookieTimeout)) <*> liftIO (Cache.newCache (Just defCookieTimeout))
<*> liftIO (newTVarIO mempty)
<*> liftIO (Cache.newCache (Just defCookieTimeout)) <*> liftIO (Cache.newCache (Just defCookieTimeout))
<*> liftIO (newTVarIO mempty)
let de = view envDeferred env let de = view envDeferred env
as <- liftIO $ async $ runPipeline de as <- liftIO $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout
se <- asks (view envSessions)
liftIO $ Cache.purgeExpired se
sweep
void $ runReaderT (fromPeerM f) env void $ runReaderT (fromPeerM f) env
void $ liftIO $ stopPipeline de void $ liftIO $ stopPipeline de
liftIO $ cancel as liftIO $ cancel as
@ -276,6 +327,7 @@ instance ( HasProtocol e p
, Serialise (Encoded e) , Serialise (Encoded e)
, MonadTrans (ResponseM e) , MonadTrans (ResponseM e)
, HasStorage (PeerM e IO) , HasStorage (PeerM e IO)
, Pretty (Peer e)
) => Response e p (ResponseM e (PeerM e IO)) where ) => Response e p (ResponseM e (PeerM e IO)) where
thatPeer _ = asks (view answTo) thatPeer _ = asks (view answTo)

View File

@ -1,14 +1,16 @@
{-# Language FunctionalDependencies #-}
module HBS2.Clock module HBS2.Clock
( module HBS2.Clock ( module HBS2.Clock
, module System.Clock , module System.Clock
)where )where
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Data.Fixed import Data.Fixed
import Data.Int (Int64) import Data.Int (Int64)
import Data.Proxy
import Prettyprinter import Prettyprinter
import System.Clock import System.Clock
import Control.Concurrent (threadDelay)
data TimeoutKind = MilliSeconds | Seconds | Minutes data TimeoutKind = MilliSeconds | Seconds | Minutes
@ -63,3 +65,7 @@ instance IsTimeout 'Seconds where
instance IsTimeout 'Minutes where instance IsTimeout 'Minutes where
toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9) toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9)
class Expires a where
expiresIn :: Proxy a -> Timeout 'Seconds

View File

@ -7,3 +7,5 @@ module HBS2.Data.Types
import HBS2.Hash import HBS2.Hash
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs

View File

@ -35,5 +35,8 @@ defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes) defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)
defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 5 -- FIXME: only for debug!

View File

@ -50,7 +50,7 @@ newtype instance SessionKey e (BlockSize e) =
newtype instance EventKey e (BlockSize e) = newtype instance EventKey e (BlockSize e) =
BlockSizeEventKey () BlockSizeEventKey (Hash HbSync)
deriving stock (Typeable, Eq) deriving stock (Typeable, Eq)
deriving newtype (Hashable) deriving newtype (Hashable)

View File

@ -27,6 +27,7 @@ import Control.Monad.Reader
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy.Char8 qualified as B8 import Data.ByteString.Lazy.Char8 qualified as B8
import Data.Default import Data.Default
import Data.Foldable
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Word import Data.Word
@ -70,6 +71,10 @@ instance HasProtocol Fake (BlockSize Fake) where
decode = either (const Nothing) Just . deserialiseOrFail decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise encode = serialise
-- FIXME: 3 is for debug only!
instance Expires (EventKey Fake (BlockSize Fake)) where
expiresIn _ = 3
instance HasProtocol Fake (BlockChunks Fake) where instance HasProtocol Fake (BlockChunks Fake) where
type instance ProtocolId (BlockChunks Fake) = 2 type instance ProtocolId (BlockChunks Fake) = 2
type instance Encoded Fake = ByteString type instance Encoded Fake = ByteString
@ -142,7 +147,7 @@ handleBlockInfo (p, h, sz') = do
let bsz = fromIntegral sz let bsz = fromIntegral sz
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
liftIO $ debug $ "got block:" <+> pretty (p, h, sz) liftIO $ debug $ "got block:" <+> pretty (p, h, sz)
emit @e (BlockSizeEventKey ()) BlockSizeEvent emit @e (BlockSizeEventKey h) BlockSizeEvent
-- FIXME: turn back on event notification -- FIXME: turn back on event notification
-- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit
@ -154,11 +159,19 @@ blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e)
) => PeerM e IO () ) => PeerM e IO ()
blockDownloadLoop = do blockDownloadLoop = do
w <- subscribe @e @(BlockSize e) (BlockSizeEventKey ()) $ \_ -> do let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
debug "can't believe this shit works" , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
]
request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt") for_ blks $ \h -> do
request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")
debug $ "subscribing to" <+> pretty h
subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \_ -> do
debug $ "can't believe this shit works" <+> pretty h
request 1 (GetBlockSize @e h)
fix \next -> do fix \next -> do
liftIO $ print "piu!" liftIO $ print "piu!"
@ -210,7 +223,7 @@ main = do
liftIO $ cancel as liftIO $ cancel as
pause ( 5 :: Timeout 'Seconds) pause ( 8 :: Timeout 'Seconds)
mapM_ cancel (our:others) mapM_ cancel (our:others)