diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 6da2184c..17eaef51 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -81,6 +81,7 @@ library , HBS2.Net.Proto , HBS2.Net.Proto.BlockChunks , HBS2.Net.Proto.BlockInfo + , HBS2.Net.Proto.BlockAnnounce , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.Types , HBS2.Prelude diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index d2363124..87d440cb 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -225,12 +225,12 @@ instance ( HasProtocol e p liftIO $ print $ "sweep smth with key" <+> pretty (hash sk) liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk) -addSweeper :: forall e . Timeout 'Seconds -> SKey -> PeerM e IO () -> PeerM e IO () +addSweeper :: forall e . Maybe (Timeout 'Seconds) -> SKey -> PeerM e IO () -> PeerM e IO () addSweeper t k sweeper = do liftIO $ print $ "adding sweeper for key" <+> pretty (hash k) ex <- asks (view envExpireTimes) sw <- asks (view envSweepers) - liftIO $ Cache.insert' ex (Just (toTimeSpec t)) k () + liftIO $ Cache.insert' ex (toTimeSpec <$> t) k () liftIO $ atomically $ modifyTVar' sw (HashMap.insertWith (<>) k [sweeper]) sweep :: PeerM e IO () @@ -258,20 +258,26 @@ instance ( HasProtocol e p , Hashable (EventKey e p) , Eq (EventKey e p) , Typeable (EventHandler e p (PeerM e IO)) + , EventType (Event e p) , Pretty (Peer e) ) => EventEmitter e p (PeerM e IO) where emit k d = do - me <- ownPeer @e se <- asks (view envEvents) let sk = newSKey @(EventKey e p) k void $ runMaybeT $ do subs <- MaybeT $ liftIO $ atomically $ readTVar se <&> HashMap.lookup sk void $ liftIO $ atomically $ modifyTVar' se (HashMap.delete sk) - for_ subs $ \r -> do - ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r - lift $ ev d + pers <- forM subs $ \r -> do + ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r + lift $ ev d + if isPersistent @(Event e p) then + pure [r] + else + pure [] + + void $ liftIO $ atomically $ modifyTVar' se (HashMap.insert sk (mconcat pers)) runPeerM :: forall e m . (MonadIO m, HasPeer e, Ord (Peer e), Pretty (Peer e)) => AnyStorage @@ -393,4 +399,6 @@ instance ( MonadIO m emit k d = lift $ emit k d +instance (Monad m, HasOwnPeer e m) => HasOwnPeer e (ResponseM e m) where + ownPeer = lift ownPeer diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs index a29b6d27..8d3d2362 100644 --- a/hbs2-core/lib/HBS2/Clock.hs +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -66,6 +66,6 @@ instance IsTimeout 'Minutes where toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9) class Expires a where - expiresIn :: Proxy a -> Timeout 'Seconds + expiresIn :: Proxy a -> Maybe (Timeout 'Seconds) diff --git a/hbs2-core/lib/HBS2/Data/Types/Refs.hs b/hbs2-core/lib/HBS2/Data/Types/Refs.hs index 119f1e84..9387bd13 100644 --- a/hbs2-core/lib/HBS2/Data/Types/Refs.hs +++ b/hbs2-core/lib/HBS2/Data/Types/Refs.hs @@ -16,7 +16,6 @@ newtype HashRef = HashRef (Hash HbSync) deriving stock (Data,Generic,Show) - data HashRefObject = HashRefObject HashRef (Maybe HashRefMetadata) deriving stock (Data,Generic) diff --git a/hbs2-core/lib/HBS2/Events.hs b/hbs2-core/lib/HBS2/Events.hs index 05a49946..6d7c678c 100644 --- a/hbs2-core/lib/HBS2/Events.hs +++ b/hbs2-core/lib/HBS2/Events.hs @@ -1,4 +1,5 @@ {-# Language FunctionalDependencies #-} +{-# Language AllowAmbiguousTypes #-} module HBS2.Events where import Data.Kind @@ -38,5 +39,10 @@ class Monad m => EventListener e a m | a -> e where class Monad m => EventEmitter e a m | a -> e where emit :: EventKey e a -> Event e a -> m () +class EventType a where + isPersistent :: Bool + isPersistent = False +instance {-# OVERLAPPABLE #-} EventType any where + isPersistent = False diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs new file mode 100644 index 00000000..e00b024e --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockAnnounce.hs @@ -0,0 +1,77 @@ +{-# Language TemplateHaskell #-} +module HBS2.Net.Proto.BlockAnnounce where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Events +import HBS2.Hash + +import Lens.Micro.Platform +import Type.Reflection (someTypeRep) +import Data.Hashable +import Data.ByteString.Lazy (ByteString) +import Data.Word +import Codec.Serialise() + + +data BlockInfoMeta = NoBlockInfoMeta + | BlockInfoMetaShort ByteString + | BlockInfoMetaRef (Hash HbSync) + deriving stock (Eq,Generic,Show) + +instance Serialise BlockInfoMeta + +data BlockAnnounceInfo e = + BlockAnnounceInfo + { _biNonce :: BlockInfoNonce + , _biMeta :: BlockInfoMeta + , _biSize :: Integer + , _biHash :: Hash HbSync + } + deriving stock (Eq,Generic,Show) + +newtype BlockInfoNonce = BlockInfoNonce Word64 + deriving newtype (Num,Enum,Real,Integral) + deriving stock (Ord,Eq,Generic,Show) + +instance Serialise BlockInfoNonce +instance Serialise (BlockAnnounceInfo e) + + +newtype BlockAnnounce e = BlockAnnounce (BlockAnnounceInfo e) + deriving stock (Eq,Generic,Show) + +instance Serialise (BlockAnnounce e) + + +makeLenses ''BlockAnnounceInfo + + +blockAnnounceProto :: forall e m . ( MonadIO m + , EventEmitter e (BlockAnnounce e) m + , Response e (BlockAnnounce e) m + ) => BlockAnnounce e -> m () +blockAnnounceProto = + \case + BlockAnnounce info -> do + that <- thatPeer (Proxy @(BlockAnnounce e)) + emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info) + +data instance EventKey e (BlockAnnounce e) = + BlockAnnounceInfoKey + deriving stock (Typeable, Eq,Generic) + +data instance Event e (BlockAnnounce e) = + BlockAnnounceEvent (Peer e) (BlockAnnounceInfo e) + deriving stock (Typeable) + +instance Typeable (BlockAnnounceInfo e) => Hashable (EventKey e (BlockAnnounce e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(BlockAnnounceInfo e) + +instance EventType ( Event e ( BlockAnnounce e) ) where + isPersistent = True + + + diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index e8e560c9..0fdd4218 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -6,6 +6,7 @@ import HBS2.Hash import HBS2.Net.Proto import HBS2.Prelude.Plated import HBS2.Storage +import HBS2.Actors.Peer import Data.Word import Prettyprinter @@ -74,6 +75,7 @@ newtype instance Event e (BlockChunks e) = blockChunksProto :: forall e m . ( MonadIO m , Response e (BlockChunks e) m + , HasOwnPeer e m , Pretty (Peer e) ) => BlockChunksI e m @@ -83,6 +85,10 @@ blockChunksProto :: forall e m . ( MonadIO m blockChunksProto adapter (BlockChunks c p) = case p of BlockGetAllChunks h size -> deferred proto do + + me <- ownPeer @e + who <- thatPeer proto + bsz' <- blkSize adapter h maybe1 bsz' (pure ()) $ \bsz -> do @@ -96,6 +102,7 @@ blockChunksProto adapter (BlockChunks c p) = BlockChunk n bs -> do who <- thatPeer proto + me <- ownPeer @e h <- blkGetHash adapter (who, c) maybe1 h (response_ (BlockLost @e)) $ \hh -> do diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index e87d876a..733753bc 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -6,9 +6,8 @@ import HBS2.Net.Proto.Sessions import HBS2.Events import HBS2.Hash -import Codec.Serialise () -data BlockSize e = GetBlockSize (Hash HbSync) +data BlockInfo e = GetBlockSize (Hash HbSync) | NoBlock (Hash HbSync) | BlockSize (Hash HbSync) Integer deriving stock (Eq,Generic,Show) @@ -16,48 +15,48 @@ data BlockSize e = GetBlockSize (Hash HbSync) type HasBlockEvent h e m = (Peer e, Hash h, Maybe Integer) -> m () -instance Serialise (BlockSize e) +instance Serialise (BlockInfo e) blockSizeProto :: forall e m . ( MonadIO m - , Response e (BlockSize e) m - , EventEmitter e (BlockSize e) m + , Response e (BlockInfo e) m + , EventEmitter e (BlockInfo e) m ) => GetBlockSize HbSync m -> HasBlockEvent HbSync e m - -> BlockSize e + -> BlockInfo e -> m () blockSizeProto getBlockSize evHasBlock = \case GetBlockSize h -> do - deferred (Proxy @(BlockSize e))$ do + deferred (Proxy @(BlockInfo e))$ do getBlockSize h >>= \case Just size -> response (BlockSize @e h size) Nothing -> response (NoBlock @e h) NoBlock h -> do - that <- thatPeer (Proxy @(BlockSize e)) + that <- thatPeer (Proxy @(BlockInfo e)) evHasBlock ( that, h, Nothing ) BlockSize h sz -> do - that <- thatPeer (Proxy @(BlockSize e)) + that <- thatPeer (Proxy @(BlockInfo e)) emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz)) evHasBlock ( that, h, Just sz ) -newtype instance SessionKey e (BlockSize e) = + +newtype instance SessionKey e (BlockInfo e) = BlockSizeKey (Hash HbSync) deriving stock (Typeable,Eq,Show) deriving newtype (Hashable,IsString) -newtype instance EventKey e (BlockSize e) = +newtype instance EventKey e (BlockInfo e) = BlockSizeEventKey (Hash HbSync) deriving stock (Typeable, Eq,Generic) -deriving instance Hashable (EventKey e (BlockSize e)) +deriving instance Hashable (EventKey e (BlockInfo e)) -newtype instance Event e (BlockSize e) = +newtype instance Event e (BlockInfo e) = BlockSizeEvent (Peer e, Hash HbSync, Integer) deriving stock (Typeable) - diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 6ec86200..7cc9e599 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -7,20 +7,21 @@ module Main where import HBS2.Actors.ChunkWriter import HBS2.Actors.Peer import HBS2.Clock +import HBS2.Defaults import HBS2.Events import HBS2.Hash +import HBS2.Merkle import HBS2.Net.Messaging.Fake +import HBS2.Net.PeerLocator import HBS2.Net.Proto -import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo -import HBS2.Net.PeerLocator -import HBS2.Net.PeerLocator.Static +import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra -import HBS2.Defaults import Test.Tasty.HUnit @@ -41,6 +42,8 @@ import System.Directory import System.Exit import System.FilePath.Posix import System.IO +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue qualified as Q debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p @@ -72,18 +75,21 @@ instance Pretty (Peer Fake) where pretty (FakePeer n) = parens ("peer" <+> pretty n) -instance HasProtocol Fake (BlockSize Fake) where - type instance ProtocolId (BlockSize Fake) = 1 +instance HasProtocol Fake (BlockInfo Fake) where + type instance ProtocolId (BlockInfo Fake) = 1 type instance Encoded Fake = ByteString decode = either (const Nothing) Just . deserialiseOrFail encode = serialise -- FIXME: 3 is for debug only! -instance Expires (EventKey Fake (BlockSize Fake)) where - expiresIn _ = 3 +instance Expires (EventKey Fake (BlockInfo Fake)) where + expiresIn _ = Just 3 instance Expires (EventKey Fake (BlockChunks Fake)) where - expiresIn _ = 10 + expiresIn _ = Just 10 + +instance Expires (EventKey Fake (BlockAnnounce Fake)) where + expiresIn _ = Nothing instance HasProtocol Fake (BlockChunks Fake) where type instance ProtocolId (BlockChunks Fake) = 2 @@ -91,8 +97,14 @@ instance HasProtocol Fake (BlockChunks Fake) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +instance HasProtocol Fake (BlockAnnounce Fake) where + type instance ProtocolId (BlockAnnounce Fake) = 3 + type instance Encoded Fake = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise -type instance SessionData e (BlockSize e) = BlockSizeSession e + +type instance SessionData e (BlockInfo e) = BlockSizeSession e type instance SessionData e (BlockChunks e) = BlockDownload newtype instance SessionKey e (BlockChunks e) = @@ -120,7 +132,7 @@ runTestPeer :: Peer Fake runTestPeer p zu = do - dir <- liftIO $ canonicalizePath ( ".peers" show p) + dir <- liftIO $ canonicalizePath ( ".peers" show (fromIntegral p :: Int)) let chDir = dir "tmp-chunks" liftIO $ createDirectoryIfMissing True dir @@ -142,8 +154,8 @@ runTestPeer p zu = do handleBlockInfo :: forall e m . ( MonadIO m - , Sessions e (BlockSize e) m - , Default (SessionData e (BlockSize e)) + , Sessions e (BlockInfo e) m + , Default (SessionData e (BlockInfo e)) , Ord (Peer e) , Pretty (Peer e) -- , EventEmitter e (BlockSize e) m @@ -158,34 +170,59 @@ handleBlockInfo (p, h, sz') = do update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) blockDownloadLoop :: forall e m . ( m ~ PeerM e IO - , HasProtocol e (BlockSize e) + , HasProtocol e (BlockInfo e) , HasProtocol e (BlockChunks e) - , Request e (BlockSize e) m + , Request e (BlockInfo e) m , Request e (BlockChunks e) m - , EventListener e (BlockSize e) m + , EventListener e (BlockInfo e) m , EventListener e (BlockChunks e) m + , EventListener e (BlockAnnounce e) m , EventEmitter e (BlockChunks e) m - , Sessions e (BlockSize e) m + , EventEmitter e (BlockInfo e) m + , Sessions e (BlockInfo e) m , Sessions e (BlockChunks e) m , Num (Peer e) , Pretty (Peer e) ) => PeerM e IO () blockDownloadLoop = do - let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" - , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" - , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - ] + let blks = [] + -- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt" + -- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" + -- , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + -- ] - for_ blks $ \h -> do + blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ + for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b - debug $ "subscribing to" <+> pretty h + subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do + let h = view biHash ann + let s = view biSize ann + debug $ "BLOCK ANNOUNCE!" <+> pretty p + <+> pretty h + <+> pretty (view biSize ann) - subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do - debug $ "GOT BLOCK!" <+> pretty h - pure () + initDownload p h s -- FIXME: don't trust everybody + + fix \next -> do + + h <- liftIO $ atomically $ Q.readTBQueue blq subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do + initDownload p h s + + peers <- getPeerLocator @e >>= knownPeers @e + + for_ peers $ \p -> do + debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p + request p (GetBlockSize @e h) + + liftIO $ print "piu!" + + next + + where + initDownload p h s = do coo <- genCookie (p,h) let key = DownloadSessionKey (p, coo) let chusz = defChunkSize @@ -194,19 +231,13 @@ blockDownloadLoop = do $ newBlockDownload h update @e new key id + + subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do + debug $ "GOT BLOCK!" <+> pretty h + pure () + request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction - peers <- getPeerLocator @e >>= knownPeers @e - - for_ peers $ \p -> do - debug $ "WTF?" <+> pretty p - request p (GetBlockSize @e h) - - fix \next -> do - liftIO $ print "piu!" - - pause ( 0.85 :: Timeout 'Seconds ) - next -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) @@ -296,7 +327,7 @@ main = do fake <- newFakeP2P True <&> Fabriq - let (p0:ps) = [0..1] :: [Peer Fake] + let (p0:ps) = [0..4] :: [Peer Fake] -- others others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do @@ -310,13 +341,25 @@ main = do root <- putAsMerkle s blk + rootSz <- hasBlock s (fromMerkleHash root) + debug $ "I'm" <+> pretty p <+> pretty root runPeerM (AnyStorage s) fake p $ do adapter <- mkAdapter cw + + env <- ask + liftIO $ async $ withPeerM env $ do + maybe1 rootSz (pure ()) $ \rsz -> do + pause ( 0.01 :: Timeout 'Seconds ) + let info = BlockAnnounceInfo 0 NoBlockInfoMeta rsz (fromMerkleHash root) + let ann = BlockAnnounce @Fake info + request @Fake p0 ann + runProto @Fake [ makeResponse (blockSizeProto findBlk dontHandle) , makeResponse (blockChunksProto adapter) + , makeResponse blockAnnounceProto ] our <- async $ runTestPeer p0 $ \s cw -> do @@ -334,6 +377,7 @@ main = do runProto @Fake [ makeResponse (blockSizeProto blk handleBlockInfo) , makeResponse (blockChunksProto adapter) + , makeResponse blockAnnounceProto ] liftIO $ cancel as