proto block announces. it works!

This commit is contained in:
Dmitry Zuikov 2023-01-22 18:25:04 +03:00
parent 264314e255
commit 76e977327f
9 changed files with 201 additions and 60 deletions

View File

@ -81,6 +81,7 @@ library
, HBS2.Net.Proto
, HBS2.Net.Proto.BlockChunks
, HBS2.Net.Proto.BlockInfo
, HBS2.Net.Proto.BlockAnnounce
, HBS2.Net.Proto.Sessions
, HBS2.Net.Proto.Types
, HBS2.Prelude

View File

@ -225,12 +225,12 @@ instance ( HasProtocol e p
liftIO $ print $ "sweep smth with key" <+> pretty (hash sk)
liftIO $ atomically $ modifyTVar' ev (HashMap.delete sk)
addSweeper :: forall e . Timeout 'Seconds -> SKey -> PeerM e IO () -> PeerM e IO ()
addSweeper :: forall e . Maybe (Timeout 'Seconds) -> SKey -> PeerM e IO () -> PeerM e IO ()
addSweeper t k sweeper = do
liftIO $ print $ "adding sweeper for key" <+> pretty (hash k)
ex <- asks (view envExpireTimes)
sw <- asks (view envSweepers)
liftIO $ Cache.insert' ex (Just (toTimeSpec t)) k ()
liftIO $ Cache.insert' ex (toTimeSpec <$> t) k ()
liftIO $ atomically $ modifyTVar' sw (HashMap.insertWith (<>) k [sweeper])
sweep :: PeerM e IO ()
@ -258,20 +258,26 @@ instance ( HasProtocol e p
, Hashable (EventKey e p)
, Eq (EventKey e p)
, Typeable (EventHandler e p (PeerM e IO))
, EventType (Event e p)
, Pretty (Peer e)
) => EventEmitter e p (PeerM e IO) where
emit k d = do
me <- ownPeer @e
se <- asks (view envEvents)
let sk = newSKey @(EventKey e p) k
void $ runMaybeT $ do
subs <- MaybeT $ liftIO $ atomically $ readTVar se <&> HashMap.lookup sk
void $ liftIO $ atomically $ modifyTVar' se (HashMap.delete sk)
for_ subs $ \r -> do
ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r
lift $ ev d
pers <- forM subs $ \r -> do
ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r
lift $ ev d
if isPersistent @(Event e p) then
pure [r]
else
pure []
void $ liftIO $ atomically $ modifyTVar' se (HashMap.insert sk (mconcat pers))
runPeerM :: forall e m . (MonadIO m, HasPeer e, Ord (Peer e), Pretty (Peer e))
=> AnyStorage
@ -393,4 +399,6 @@ instance ( MonadIO m
emit k d = lift $ emit k d
instance (Monad m, HasOwnPeer e m) => HasOwnPeer e (ResponseM e m) where
ownPeer = lift ownPeer

View File

@ -66,6 +66,6 @@ instance IsTimeout 'Minutes where
toNanoSeconds (TimeoutMin x) = round (x * 60 * 1e9)
class Expires a where
expiresIn :: Proxy a -> Timeout 'Seconds
expiresIn :: Proxy a -> Maybe (Timeout 'Seconds)

View File

@ -16,7 +16,6 @@ newtype HashRef = HashRef (Hash HbSync)
deriving stock (Data,Generic,Show)
data HashRefObject = HashRefObject HashRef (Maybe HashRefMetadata)
deriving stock (Data,Generic)

View File

@ -1,4 +1,5 @@
{-# Language FunctionalDependencies #-}
{-# Language AllowAmbiguousTypes #-}
module HBS2.Events where
import Data.Kind
@ -38,5 +39,10 @@ class Monad m => EventListener e a m | a -> e where
class Monad m => EventEmitter e a m | a -> e where
emit :: EventKey e a -> Event e a -> m ()
class EventType a where
isPersistent :: Bool
isPersistent = False
instance {-# OVERLAPPABLE #-} EventType any where
isPersistent = False

View File

@ -0,0 +1,77 @@
{-# Language TemplateHaskell #-}
module HBS2.Net.Proto.BlockAnnounce where
import HBS2.Prelude.Plated
import HBS2.Net.Proto
import HBS2.Events
import HBS2.Hash
import Lens.Micro.Platform
import Type.Reflection (someTypeRep)
import Data.Hashable
import Data.ByteString.Lazy (ByteString)
import Data.Word
import Codec.Serialise()
data BlockInfoMeta = NoBlockInfoMeta
| BlockInfoMetaShort ByteString
| BlockInfoMetaRef (Hash HbSync)
deriving stock (Eq,Generic,Show)
instance Serialise BlockInfoMeta
data BlockAnnounceInfo e =
BlockAnnounceInfo
{ _biNonce :: BlockInfoNonce
, _biMeta :: BlockInfoMeta
, _biSize :: Integer
, _biHash :: Hash HbSync
}
deriving stock (Eq,Generic,Show)
newtype BlockInfoNonce = BlockInfoNonce Word64
deriving newtype (Num,Enum,Real,Integral)
deriving stock (Ord,Eq,Generic,Show)
instance Serialise BlockInfoNonce
instance Serialise (BlockAnnounceInfo e)
newtype BlockAnnounce e = BlockAnnounce (BlockAnnounceInfo e)
deriving stock (Eq,Generic,Show)
instance Serialise (BlockAnnounce e)
makeLenses ''BlockAnnounceInfo
blockAnnounceProto :: forall e m . ( MonadIO m
, EventEmitter e (BlockAnnounce e) m
, Response e (BlockAnnounce e) m
) => BlockAnnounce e -> m ()
blockAnnounceProto =
\case
BlockAnnounce info -> do
that <- thatPeer (Proxy @(BlockAnnounce e))
emit @e BlockAnnounceInfoKey (BlockAnnounceEvent that info)
data instance EventKey e (BlockAnnounce e) =
BlockAnnounceInfoKey
deriving stock (Typeable, Eq,Generic)
data instance Event e (BlockAnnounce e) =
BlockAnnounceEvent (Peer e) (BlockAnnounceInfo e)
deriving stock (Typeable)
instance Typeable (BlockAnnounceInfo e) => Hashable (EventKey e (BlockAnnounce e)) where
hashWithSalt salt _ = hashWithSalt salt (someTypeRep p)
where
p = Proxy @(BlockAnnounceInfo e)
instance EventType ( Event e ( BlockAnnounce e) ) where
isPersistent = True

View File

@ -6,6 +6,7 @@ import HBS2.Hash
import HBS2.Net.Proto
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.Actors.Peer
import Data.Word
import Prettyprinter
@ -74,6 +75,7 @@ newtype instance Event e (BlockChunks e) =
blockChunksProto :: forall e m . ( MonadIO m
, Response e (BlockChunks e) m
, HasOwnPeer e m
, Pretty (Peer e)
)
=> BlockChunksI e m
@ -83,6 +85,10 @@ blockChunksProto :: forall e m . ( MonadIO m
blockChunksProto adapter (BlockChunks c p) =
case p of
BlockGetAllChunks h size -> deferred proto do
me <- ownPeer @e
who <- thatPeer proto
bsz' <- blkSize adapter h
maybe1 bsz' (pure ()) $ \bsz -> do
@ -96,6 +102,7 @@ blockChunksProto adapter (BlockChunks c p) =
BlockChunk n bs -> do
who <- thatPeer proto
me <- ownPeer @e
h <- blkGetHash adapter (who, c)
maybe1 h (response_ (BlockLost @e)) $ \hh -> do

View File

@ -6,9 +6,8 @@ import HBS2.Net.Proto.Sessions
import HBS2.Events
import HBS2.Hash
import Codec.Serialise ()
data BlockSize e = GetBlockSize (Hash HbSync)
data BlockInfo e = GetBlockSize (Hash HbSync)
| NoBlock (Hash HbSync)
| BlockSize (Hash HbSync) Integer
deriving stock (Eq,Generic,Show)
@ -16,48 +15,48 @@ data BlockSize e = GetBlockSize (Hash HbSync)
type HasBlockEvent h e m = (Peer e, Hash h, Maybe Integer) -> m ()
instance Serialise (BlockSize e)
instance Serialise (BlockInfo e)
blockSizeProto :: forall e m . ( MonadIO m
, Response e (BlockSize e) m
, EventEmitter e (BlockSize e) m
, Response e (BlockInfo e) m
, EventEmitter e (BlockInfo e) m
)
=> GetBlockSize HbSync m
-> HasBlockEvent HbSync e m
-> BlockSize e
-> BlockInfo e
-> m ()
blockSizeProto getBlockSize evHasBlock =
\case
GetBlockSize h -> do
deferred (Proxy @(BlockSize e))$ do
deferred (Proxy @(BlockInfo e))$ do
getBlockSize h >>= \case
Just size -> response (BlockSize @e h size)
Nothing -> response (NoBlock @e h)
NoBlock h -> do
that <- thatPeer (Proxy @(BlockSize e))
that <- thatPeer (Proxy @(BlockInfo e))
evHasBlock ( that, h, Nothing )
BlockSize h sz -> do
that <- thatPeer (Proxy @(BlockSize e))
that <- thatPeer (Proxy @(BlockInfo e))
emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz))
evHasBlock ( that, h, Just sz )
newtype instance SessionKey e (BlockSize e) =
newtype instance SessionKey e (BlockInfo e) =
BlockSizeKey (Hash HbSync)
deriving stock (Typeable,Eq,Show)
deriving newtype (Hashable,IsString)
newtype instance EventKey e (BlockSize e) =
newtype instance EventKey e (BlockInfo e) =
BlockSizeEventKey (Hash HbSync)
deriving stock (Typeable, Eq,Generic)
deriving instance Hashable (EventKey e (BlockSize e))
deriving instance Hashable (EventKey e (BlockInfo e))
newtype instance Event e (BlockSize e) =
newtype instance Event e (BlockInfo e) =
BlockSizeEvent (Peer e, Hash HbSync, Integer)
deriving stock (Typeable)

View File

@ -7,20 +7,21 @@ module Main where
import HBS2.Actors.ChunkWriter
import HBS2.Actors.Peer
import HBS2.Clock
import HBS2.Defaults
import HBS2.Events
import HBS2.Hash
import HBS2.Merkle
import HBS2.Net.Messaging.Fake
import HBS2.Net.PeerLocator
import HBS2.Net.Proto
import HBS2.Net.Proto.Sessions
import HBS2.Net.Proto.BlockAnnounce
import HBS2.Net.Proto.BlockChunks
import HBS2.Net.Proto.BlockInfo
import HBS2.Net.PeerLocator
import HBS2.Net.PeerLocator.Static
import HBS2.Net.Proto.Sessions
import HBS2.Prelude.Plated
import HBS2.Storage
import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra
import HBS2.Defaults
import Test.Tasty.HUnit
@ -41,6 +42,8 @@ import System.Directory
import System.Exit
import System.FilePath.Posix
import System.IO
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue qualified as Q
debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p
@ -72,18 +75,21 @@ instance Pretty (Peer Fake) where
pretty (FakePeer n) = parens ("peer" <+> pretty n)
instance HasProtocol Fake (BlockSize Fake) where
type instance ProtocolId (BlockSize Fake) = 1
instance HasProtocol Fake (BlockInfo Fake) where
type instance ProtocolId (BlockInfo Fake) = 1
type instance Encoded Fake = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
-- FIXME: 3 is for debug only!
instance Expires (EventKey Fake (BlockSize Fake)) where
expiresIn _ = 3
instance Expires (EventKey Fake (BlockInfo Fake)) where
expiresIn _ = Just 3
instance Expires (EventKey Fake (BlockChunks Fake)) where
expiresIn _ = 10
expiresIn _ = Just 10
instance Expires (EventKey Fake (BlockAnnounce Fake)) where
expiresIn _ = Nothing
instance HasProtocol Fake (BlockChunks Fake) where
type instance ProtocolId (BlockChunks Fake) = 2
@ -91,8 +97,14 @@ instance HasProtocol Fake (BlockChunks Fake) where
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
instance HasProtocol Fake (BlockAnnounce Fake) where
type instance ProtocolId (BlockAnnounce Fake) = 3
type instance Encoded Fake = ByteString
decode = either (const Nothing) Just . deserialiseOrFail
encode = serialise
type instance SessionData e (BlockSize e) = BlockSizeSession e
type instance SessionData e (BlockInfo e) = BlockSizeSession e
type instance SessionData e (BlockChunks e) = BlockDownload
newtype instance SessionKey e (BlockChunks e) =
@ -120,7 +132,7 @@ runTestPeer :: Peer Fake
runTestPeer p zu = do
dir <- liftIO $ canonicalizePath ( ".peers" </> show p)
dir <- liftIO $ canonicalizePath ( ".peers" </> show (fromIntegral p :: Int))
let chDir = dir </> "tmp-chunks"
liftIO $ createDirectoryIfMissing True dir
@ -142,8 +154,8 @@ runTestPeer p zu = do
handleBlockInfo :: forall e m . ( MonadIO m
, Sessions e (BlockSize e) m
, Default (SessionData e (BlockSize e))
, Sessions e (BlockInfo e) m
, Default (SessionData e (BlockInfo e))
, Ord (Peer e)
, Pretty (Peer e)
-- , EventEmitter e (BlockSize e) m
@ -158,34 +170,59 @@ handleBlockInfo (p, h, sz') = do
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockSize e)
, HasProtocol e (BlockInfo e)
, HasProtocol e (BlockChunks e)
, Request e (BlockSize e) m
, Request e (BlockInfo e) m
, Request e (BlockChunks e) m
, EventListener e (BlockSize e) m
, EventListener e (BlockInfo e) m
, EventListener e (BlockChunks e) m
, EventListener e (BlockAnnounce e) m
, EventEmitter e (BlockChunks e) m
, Sessions e (BlockSize e) m
, EventEmitter e (BlockInfo e) m
, Sessions e (BlockInfo e) m
, Sessions e (BlockChunks e) m
, Num (Peer e)
, Pretty (Peer e)
) => PeerM e IO ()
blockDownloadLoop = do
let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
]
let blks = []
-- let blks = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
-- , "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
-- , "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
-- ]
for_ blks $ \h -> do
blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ
for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq b
debug $ "subscribing to" <+> pretty h
subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do
let h = view biHash ann
let s = view biSize ann
debug $ "BLOCK ANNOUNCE!" <+> pretty p
<+> pretty h
<+> pretty (view biSize ann)
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
debug $ "GOT BLOCK!" <+> pretty h
pure ()
initDownload p h s -- FIXME: don't trust everybody
fix \next -> do
h <- liftIO $ atomically $ Q.readTBQueue blq
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do
initDownload p h s
peers <- getPeerLocator @e >>= knownPeers @e
for_ peers $ \p -> do
debug $ "requesting block" <+> pretty h <+> "from" <+> pretty p
request p (GetBlockSize @e h)
liftIO $ print "piu!"
next
where
initDownload p h s = do
coo <- genCookie (p,h)
let key = DownloadSessionKey (p, coo)
let chusz = defChunkSize
@ -194,19 +231,13 @@ blockDownloadLoop = do
$ newBlockDownload h
update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do
debug $ "GOT BLOCK!" <+> pretty h
pure ()
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
peers <- getPeerLocator @e >>= knownPeers @e
for_ peers $ \p -> do
debug $ "WTF?" <+> pretty p
request p (GetBlockSize @e h)
fix \next -> do
liftIO $ print "piu!"
pause ( 0.85 :: Timeout 'Seconds )
next
-- NOTE: this is an adapter for a ResponseM monad
-- because response is working in ResponseM monad (ha!)
@ -296,7 +327,7 @@ main = do
fake <- newFakeP2P True <&> Fabriq
let (p0:ps) = [0..1] :: [Peer Fake]
let (p0:ps) = [0..4] :: [Peer Fake]
-- others
others <- forM ps $ \p -> async $ runTestPeer p $ \s cw -> do
@ -310,13 +341,25 @@ main = do
root <- putAsMerkle s blk
rootSz <- hasBlock s (fromMerkleHash root)
debug $ "I'm" <+> pretty p <+> pretty root
runPeerM (AnyStorage s) fake p $ do
adapter <- mkAdapter cw
env <- ask
liftIO $ async $ withPeerM env $ do
maybe1 rootSz (pure ()) $ \rsz -> do
pause ( 0.01 :: Timeout 'Seconds )
let info = BlockAnnounceInfo 0 NoBlockInfoMeta rsz (fromMerkleHash root)
let ann = BlockAnnounce @Fake info
request @Fake p0 ann
runProto @Fake
[ makeResponse (blockSizeProto findBlk dontHandle)
, makeResponse (blockChunksProto adapter)
, makeResponse blockAnnounceProto
]
our <- async $ runTestPeer p0 $ \s cw -> do
@ -334,6 +377,7 @@ main = do
runProto @Fake
[ makeResponse (blockSizeProto blk handleBlockInfo)
, makeResponse (blockChunksProto adapter)
, makeResponse blockAnnounceProto
]
liftIO $ cancel as