This commit is contained in:
Dmitry Zuikov 2023-01-20 06:48:00 +03:00
parent e752075eed
commit f1d9513ad5
5 changed files with 206 additions and 146 deletions

View File

@ -80,6 +80,7 @@ library
, HBS2.Net.Proto , HBS2.Net.Proto
, HBS2.Net.Proto.Types , HBS2.Net.Proto.Types
, HBS2.Net.Proto.BlockInfo , HBS2.Net.Proto.BlockInfo
, HBS2.Net.Proto.BlockChunks
, HBS2.Prelude , HBS2.Prelude
, HBS2.Prelude.Plated , HBS2.Prelude.Plated
, HBS2.Storage , HBS2.Storage

View File

@ -0,0 +1,105 @@
{-# Language RankNTypes #-}
module HBS2.Net.Proto.BlockChunks where
import HBS2.Prelude.Plated
import HBS2.Hash
import HBS2.Net.Proto
import HBS2.Storage
import Data.Word
import Prettyprinter
import Data.ByteString.Lazy (ByteString)
import Data.Foldable
newtype ChunkSize = ChunkSize Word16
deriving newtype (Num,Enum,Real,Integral,Pretty)
deriving stock (Eq,Ord,Show,Data,Generic)
newtype ChunkNum = ChunkNum Word16
deriving newtype (Num,Enum,Real,Integral,Pretty)
deriving stock (Eq,Ord,Show,Data,Generic)
type OnBlockReady h m = Hash h -> m ()
type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString)
type AcceptChunk h e m = Response e (BlockChunks e) m
=> ( Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString ) -> m ()
type GetBlockHash h e m = (Peer e, Cookie e) -> m (Maybe (Hash h))
data BlockChunksI e m =
BlockChunksI
{ blkSize :: GetBlockSize HbSync m
, blkChunk :: GetBlockChunk HbSync m
, blkGetHash :: GetBlockHash HbSync e m
, blkAcceptChunk :: AcceptChunk HbSync e m
}
data BlockChunks e = BlockChunks (Cookie e) (BlockChunksProto e)
deriving stock (Generic)
data BlockChunksProto e = BlockGetAllChunks (Hash HbSync) ChunkSize
| BlockNoChunks
| BlockChunk ChunkNum ByteString
| BlockLost
deriving stock (Generic)
instance HasCookie e (BlockChunks e) where
type instance Cookie e = Word32
getCookie (BlockChunks c _) = Just c
instance Serialise ChunkSize
instance Serialise ChunkNum
instance Serialise (BlockChunksProto e)
instance Serialise (BlockChunks e)
blockChunksProto :: forall e m . ( MonadIO m
, Response e (BlockChunks e) m
, Pretty (Peer e)
)
=> BlockChunksI e m
-> BlockChunks e
-> m ()
blockChunksProto adapter (BlockChunks c p) =
case p of
BlockGetAllChunks h size -> deferred proto do
bsz' <- blkSize adapter h
maybe1 bsz' (pure ()) $ \bsz -> do
let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)]
let offsets = zip offsets' [0..]
for_ offsets $ \((o,sz),i) -> do
chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> do
who <- thatPeer proto
h <- blkGetHash adapter (who, c)
maybe1 h (response_ (BlockLost @e)) $ \hh -> do
void $ blkAcceptChunk adapter (c, who, hh, n, bs)
BlockNoChunks {} -> do
-- TODO: notification
pure ()
BlockLost{} -> do
pure ()
where
proto = Proxy @(BlockChunks e)
response_ pt = response (BlockChunks c pt)

View File

@ -2,7 +2,6 @@
{-# Language ScopedTypeVariables #-} {-# Language ScopedTypeVariables #-}
module HBS2.Storage.Simple module HBS2.Storage.Simple
( module HBS2.Storage.Simple ( module HBS2.Storage.Simple
, module HBS2.Storage
) where ) where
import Control.Concurrent.Async import Control.Concurrent.Async

View File

@ -4,6 +4,7 @@ module HBS2.Storage.Simple.Extra where
import HBS2.Merkle import HBS2.Merkle
import HBS2.Hash import HBS2.Hash
import HBS2.Prelude import HBS2.Prelude
import HBS2.Storage
import HBS2.Storage.Simple import HBS2.Storage.Simple
import HBS2.Data.Types.Refs import HBS2.Data.Types.Refs
import HBS2.Defaults import HBS2.Defaults

View File

@ -9,10 +9,13 @@ import HBS2.Hash
-- import HBS2.Net.Messaging -- import HBS2.Net.Messaging
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Net.Proto.BlockInfo import HBS2.Net.Proto.BlockInfo
import HBS2.Net.Proto.BlockChunks
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Fake import HBS2.Net.Messaging.Fake
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Defaults import HBS2.Defaults
import HBS2.Storage
import HBS2.Storage.Simple import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra import HBS2.Storage.Simple.Extra
import HBS2.Actors.ChunkWriter import HBS2.Actors.ChunkWriter
@ -22,12 +25,17 @@ import Test.Tasty.HUnit
import Codec.Serialise import Codec.Serialise
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Monad
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy.Char8 qualified as B8 import Data.ByteString.Lazy.Char8 qualified as B8
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable import Data.Foldable
import Data.Hashable import Data.Hashable
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Maybe import Data.Maybe
import Data.Traversable
import Data.Word import Data.Word
import Lens.Micro.Platform import Lens.Micro.Platform
import Prettyprinter import Prettyprinter
@ -35,11 +43,7 @@ import System.Directory
import System.Exit import System.Exit
import System.FilePath.Posix import System.FilePath.Posix
import System.IO import System.IO
import Data.Cache (Cache) import Control.Concurrent
import Data.Cache qualified as Cache
import Data.Map (Map)
import Data.Map qualified as Map
import Control.Monad.Trans.Maybe
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q import Control.Concurrent.STM.TQueue qualified as Q
@ -47,21 +51,12 @@ import Control.Concurrent.STM.TQueue qualified as Q
debug :: (MonadIO m) => Doc ann -> m () debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p debug p = liftIO $ hPrint stderr p
newtype ChunkSize = ChunkSize Word16
deriving newtype (Num,Enum,Real,Integral,Pretty)
deriving stock (Eq,Ord,Show,Data,Generic)
newtype ChunkNum = ChunkNum Word16
deriving newtype (Num,Enum,Real,Integral,Pretty)
deriving stock (Eq,Ord,Show,Data,Generic)
-- FIXME: peer should be a part of the key -- FIXME: peer should be a part of the key
-- therefore, key is ( p | cookie ) -- therefore, key is ( p | cookie )
-- but client's cookie in protocol should be just ( cookie :: Word32 ) -- but client's cookie in protocol should be just ( cookie :: Word32 )
type OnBlockReady h m = Hash h -> m ()
data BlockDownload m = data BlockDownload m =
BlockDownload BlockDownload
@ -90,81 +85,8 @@ newBlockDownload :: forall m . MonadIO m
newBlockDownload h = BlockDownload h 0 0 0 newBlockDownload h = BlockDownload h 0 0 0
type GetBlockChunk h m = Hash h -> Offset -> Size -> m (Maybe ByteString)
type AcceptChunk h e m = Response e (BlockChunks e) m
=> ( Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString ) -> m ()
type GetBlockHash h e m = (Peer e, Cookie e) -> m (Maybe (Hash h))
data BlockChunksI e m =
BlockChunksI
{ blkSize :: GetBlockSize HbSync m
, blkChunk :: GetBlockChunk HbSync m
, blkGetHash :: GetBlockHash HbSync e m
, blkAcceptChunk :: AcceptChunk HbSync e m
}
instance HasCookie e (BlockChunks e) where
type instance Cookie e = Word32
getCookie (BlockChunks c _) = Just c
data BlockChunks e = BlockChunks (Cookie e) (BlockChunksProto e)
deriving stock (Generic)
data BlockChunksProto e = BlockGetAllChunks (Hash HbSync) ChunkSize
| BlockNoChunks
| BlockChunk ChunkNum ByteString
| BlockLost
deriving stock (Generic)
instance Serialise ChunkSize
instance Serialise ChunkNum
instance Serialise (BlockChunksProto e)
instance Serialise (BlockChunks e)
blockChunksProto :: forall e m . ( MonadIO m
, Response e (BlockChunks e) m
, Pretty (Peer e)
)
=> BlockChunksI e m
-> BlockChunks e
-> m ()
blockChunksProto adapter (BlockChunks c p) =
case p of
BlockGetAllChunks h size -> deferred proto do
bsz <- blkSize adapter h
let offsets' = calcChunks (fromJust bsz) (fromIntegral size) :: [(Offset, Size)]
let offsets = zip offsets' [0..]
for_ offsets $ \((o,sz),i) -> do
chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> do
who <- thatPeer proto
h <- blkGetHash adapter (who, c)
maybe1 h (response_ (BlockLost @e)) $ \hh -> do
void $ blkAcceptChunk adapter (c, who, hh, n, bs)
BlockNoChunks {} -> do
-- TODO: notification
pure ()
BlockLost{} -> do
pure ()
where
proto = Proxy @(BlockChunks e)
response_ pt = response (BlockChunks c pt)
data Fake data Fake
instance HasPeer Fake where instance HasPeer Fake where
@ -235,23 +157,40 @@ delSession se l k = liftIO do
expireSession se l = liftIO do expireSession se l = liftIO do
Cache.purgeExpired (view l se) Cache.purgeExpired (view l se)
runFakePeer :: forall e m . (e ~ Fake, m ~ IO) => Sessions e m -> EngineEnv e -> m ()
runFakePeer se env = do -- newtype FullPeerM m a = RealPeerM { fromRealPeerM :: ReaderT }
runFakePeer :: forall e b . ( e ~ Fake
-- , m ~ IO
, Messaging b e ByteString
-- , MonadIO m
-- , Response e p m
-- , EngineM e m
)
=> Peer e
-> b
-> EngineM e IO ()
-> IO ()
runFakePeer p bus work = do
env <- newEnv p bus
se <- emptySessions @e
let pid = fromIntegral (hash (env ^. self)) :: Word8 let pid = fromIntegral (hash (env ^. self)) :: Word8
dir <- canonicalizePath ( ".peers" </> show pid) dir <- liftIO $ canonicalizePath ( ".peers" </> show pid)
let chDir = dir </> "tmp-chunks" let chDir = dir </> "tmp-chunks"
createDirectoryIfMissing True dir liftIO $ createDirectoryIfMissing True dir
let opts = [ StoragePrefix dir let opts = [ StoragePrefix dir
] ]
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync) storage <- simpleStorageInit opts -- :: IO (SimpleStorage HbSync)
w <- async $ simpleStorageWorker storage w <- liftIO $ async $ simpleStorageWorker storage
cww <- newChunkWriterIO storage (Just chDir) cww <- newChunkWriterIO storage (Just chDir)
@ -342,16 +281,18 @@ runFakePeer se env = do
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
} }
runPeer env peer <- async $ runPeer env
[ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo)
, makeResponse (blockChunksProto adapter) , makeResponse (blockChunksProto adapter)
] ]
runEngineM env work
simpleStorageStop storage simpleStorageStop storage
stopChunkWriter cww stopChunkWriter cww
mapM_ cancel [w,cw] mapM_ cancel [w,cw,peer]
test1 :: IO () test1 :: IO ()
@ -361,77 +302,90 @@ test1 = do
fake <- newFakeP2P True fake <- newFakeP2P True
let peers@[p0,p1] = [0..1] :: [Peer Fake]
envs@[e0,e1] <- forM peers $ \p -> newEnv p fake
mtS <- emptySessions @Fake
let ee = zip (repeat mtS) envs
void $ race (pause (2 :: Timeout 'Seconds)) $ do void $ race (pause (2 :: Timeout 'Seconds)) $ do
peerz <- mapM (async . uncurry runFakePeer) ee let p0 = 0 :: Peer Fake
let p1 = 1 :: Peer Fake
runEngineM e0 $ do p1Thread <- async $ runFakePeer p1 fake (liftIO $ forever yield)
let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ" p0Thread <- async $ runFakePeer p0 fake $ do
-- TODO: #ASAP generate unique cookie!! request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
-- request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
-- FIXME: withAllCrap $ do ...
let s0 = (fst . head) ee
newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id! let h = fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
let cKey@(_, cookie) = (p1, newCookie) -- cookie <- newSession ???
let chsz = defChunkSize
debug $ "new cookie:" <+> pretty cookie -- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id!
-- let cKey@(_, cookie) = (p1, newCookie)
qblk <- liftIO Q.newTQueueIO pure ()
let onBlockReady bh = do let peerz = p0Thread : [p1Thread]
liftIO $ atomically $ Q.writeTQueue qblk bh
let def = newBlockDownload h onBlockReady -- peerz <- mapM (async . uncurry runFakePeer) ee
-- create sessions before sequesting anything --runEngineM e0 $ do
updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz)
request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) -- -- TODO: #ASAP generate unique cookie!!
request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) -- --
-- -- FIXME: withAllCrap $ do ...
-- let s0 = (fst . head) ee
-- TODO: #ASAP block ready notification -- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id!
debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1 -- let cKey@(_, cookie) = (p1, newCookie)
-- let chsz = defChunkSize
request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz)) -- debug $ "new cookie:" <+> pretty cookie
blk <- liftIO $ atomically $ Q.readTQueue qblk -- qblk <- liftIO Q.newTQueueIO
debug $ "BLOCK READY:" <+> pretty blk -- let onBlockReady bh = do
-- liftIO $ atomically $ Q.writeTQueue qblk bh
-- TODO: смотрим, что за блок -- let def = newBlockDownload h onBlockReady
-- если Merkle - то качаем рекурсивно
-- если ссылка - то смотрим, что за ссылка
-- проверяем пруфы
-- качаем рекурсивно
-- let mbLink = deserialiseOrFail @Merkle obj -- -- create sessions before sequesting anything
-- updSession s0 def sBlockDownload cKey (set sBlockChunkSize chsz)
pure () -- request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
-- request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
mapM_ cancel peerz -- request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
-- request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
(_, e) <- waitAnyCatchCancel peerz -- -- TODO: #ASAP block ready notification
debug (pretty $ show e) -- debug $ "REQUEST BLOCK:" <+> pretty h <+> "from" <+> pretty p1
debug "we're done"
assertBool "success" True -- request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz))
exitSuccess
-- blk <- liftIO $ atomically $ Q.readTQueue qblk
-- debug $ "BLOCK READY:" <+> pretty blk
-- -- TODO: смотрим, что за блок
-- -- если Merkle - то качаем рекурсивно
-- -- если ссылка - то смотрим, что за ссылка
-- -- проверяем пруфы
-- -- качаем рекурсивно
-- -- let mbLink = deserialiseOrFail @Merkle obj
-- pure ()
mapM_ cancel peerz
(_, e) <- waitAnyCatchCancel peerz
debug (pretty $ show e)
debug "we're done"
assertBool "success" True
exitSuccess
assertBool "failed" False assertBool "failed" False