This commit is contained in:
Dmitry Zuikov 2023-01-22 10:14:27 +03:00
parent f9748ed1fc
commit 0c32275176
4 changed files with 114 additions and 27 deletions

View File

@ -35,9 +35,9 @@ import Data.Hashable (hash)
import Prettyprinter hiding (pipe) import Prettyprinter hiding (pipe)
data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu data AnyStorage = forall zu . (Block ByteString ~ ByteString, Storage zu HbSync ByteString IO) => AnyStorage zu
instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync ByteString IO where instance (IsKey HbSync, Key HbSync ~ Hash HbSync, Block ByteString ~ ByteString) => Storage AnyStorage HbSync ByteString IO where
putBlock (AnyStorage s) = putBlock s putBlock (AnyStorage s) = putBlock s
enqueueBlock (AnyStorage s) = enqueueBlock s enqueueBlock (AnyStorage s) = enqueueBlock s

View File

@ -20,6 +20,7 @@ instance Serialise (BlockSize e)
blockSizeProto :: forall e m . ( MonadIO m blockSizeProto :: forall e m . ( MonadIO m
, Response e (BlockSize e) m , Response e (BlockSize e) m
, EventEmitter e (BlockSize e) m
) )
=> GetBlockSize HbSync m => GetBlockSize HbSync m
-> HasBlockEvent HbSync e m -> HasBlockEvent HbSync e m
@ -40,9 +41,9 @@ blockSizeProto getBlockSize evHasBlock =
BlockSize h sz -> do BlockSize h sz -> do
that <- thatPeer (Proxy @(BlockSize e)) that <- thatPeer (Proxy @(BlockSize e))
emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz))
evHasBlock ( that, h, Just sz ) evHasBlock ( that, h, Just sz )
newtype instance SessionKey e (BlockSize e) = newtype instance SessionKey e (BlockSize e) =
BlockSizeKey (Hash HbSync) BlockSizeKey (Hash HbSync)
deriving stock (Typeable,Eq,Show) deriving stock (Typeable,Eq,Show)
@ -54,7 +55,8 @@ newtype instance EventKey e (BlockSize e) =
deriving stock (Typeable, Eq) deriving stock (Typeable, Eq)
deriving newtype (Hashable) deriving newtype (Hashable)
data instance Event e (BlockSize e) = newtype instance Event e (BlockSize e) =
BlockSizeEvent BlockSizeEvent (Peer e, Hash HbSync, Integer)
deriving stock (Typeable) deriving stock (Typeable)

View File

@ -10,8 +10,6 @@ import HBS2.Data.Types.Refs
import HBS2.Defaults import HBS2.Defaults
import Data.Bifunctor import Data.Bifunctor
import Data.Functor
import Control.Monad
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as B import Data.ByteString.Lazy qualified as B
import Data.Function import Data.Function
@ -19,14 +17,13 @@ import Data.Function
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import Streaming qualified as S import Streaming qualified as S
import Prettyprinter
import System.IO import System.IO
pieces :: Integral a => a pieces :: Integral a => a
pieces = 8192 pieces = 8192
class SimpleStorageExtra a where class SimpleStorageExtra a where
putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString,Block ByteString~ByteString) => SimpleStorage h -> a -> IO MerkleHash putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString, Block ByteString ~ ByteString) => SimpleStorage h -> a -> IO MerkleHash
readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m ()
readChunked handle size = fuu readChunked handle size = fuu
@ -52,7 +49,7 @@ instance SimpleStorageExtra Handle where
pure (MerkleHash root) pure (MerkleHash root)
instance SimpleStorageExtra ByteString where instance Block ByteString ~ ByteString => SimpleStorageExtra ByteString where
putAsMerkle ss bs = do putAsMerkle ss bs = do
hashes <- S.each (B.unpack bs) hashes <- S.each (B.unpack bs)

View File

@ -18,16 +18,18 @@ import HBS2.Prelude.Plated
import HBS2.Storage import HBS2.Storage
import HBS2.Storage.Simple import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra import HBS2.Storage.Simple.Extra
import HBS2.Defaults
import Test.Tasty.HUnit import Test.Tasty.HUnit
import Codec.Serialise hiding (encode,decode) import Codec.Serialise hiding (encode,decode)
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy.Char8 qualified as B8 import Data.ByteString.Lazy.Char8 qualified as B8
import Data.Default import Data.Default
import Data.Foldable import Data.Foldable hiding (find)
import Data.Map (Map) import Data.Map (Map)
import Data.Map qualified as Map import Data.Map qualified as Map
import Data.Word import Data.Word
@ -51,9 +53,12 @@ data BlockDownload =
, _sBlockOffset :: Offset , _sBlockOffset :: Offset
, _sBlockWritten :: Size , _sBlockWritten :: Size
} }
deriving stock (Typeable)
makeLenses 'BlockDownload makeLenses 'BlockDownload
newBlockDownload :: Hash HbSync -> BlockDownload
newBlockDownload h = BlockDownload h 0 0 0 0
instance HasPeer Fake where instance HasPeer Fake where
newtype instance Peer Fake = FakePeer Word8 newtype instance Peer Fake = FakePeer Word8
@ -83,12 +88,11 @@ instance HasProtocol Fake (BlockChunks Fake) where
type instance SessionData e (BlockSize e) = BlockSizeSession e type instance SessionData e (BlockSize e) = BlockSizeSession e
type instance SessionData Fake (BlockChunks Fake) = BlockDownload type instance SessionData e (BlockChunks e) = BlockDownload
newtype instance SessionKey Fake (BlockChunks Fake) = newtype instance SessionKey e (BlockChunks e) =
DownloadSessionKey (Peer Fake, Cookie Fake) DownloadSessionKey (Peer e, Cookie e)
deriving newtype (Eq, Hashable) deriving stock (Generic,Typeable)
deriving stock (Generic)
newtype BlockSizeSession e = newtype BlockSizeSession e =
BlockSizeSession BlockSizeSession
@ -102,7 +106,8 @@ instance Ord (Peer e) => Default (BlockSizeSession e) where
deriving stock instance Show (BlockSizeSession Fake) deriving stock instance Show (BlockSizeSession Fake)
deriving newtype instance Hashable (SessionKey Fake (BlockChunks Fake))
deriving stock instance Eq (SessionKey Fake (BlockChunks Fake))
runTestPeer :: Peer Fake runTestPeer :: Peer Fake
-> (SimpleStorage HbSync -> IO ()) -> (SimpleStorage HbSync -> IO ())
@ -136,7 +141,7 @@ handleBlockInfo :: forall e m . ( MonadIO m
, Default (SessionData e (BlockSize e)) , Default (SessionData e (BlockSize e))
, Ord (Peer e) , Ord (Peer e)
, Pretty (Peer e) , Pretty (Peer e)
, EventEmitter e (BlockSize e) m -- , EventEmitter e (BlockSize e) m
) )
=> (Peer e, Hash HbSync, Maybe Integer) => (Peer e, Hash HbSync, Maybe Integer)
@ -146,16 +151,18 @@ handleBlockInfo (p, h, sz') = do
maybe1 sz' (pure ()) $ \sz -> do maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz let bsz = fromIntegral sz
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
liftIO $ debug $ "got block:" <+> pretty (p, h, sz)
emit @e (BlockSizeEventKey h) BlockSizeEvent
-- FIXME: turn back on event notification
-- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit
blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e) blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e)
, HasProtocol e (BlockChunks e)
, Request e (BlockSize e) (PeerM e IO) , Request e (BlockSize e) (PeerM e IO)
, Request e (BlockChunks e) (PeerM e IO)
, EventListener e (BlockSize e) (PeerM e IO) , EventListener e (BlockSize e) (PeerM e IO)
, Sessions e (BlockSize e) (PeerM e IO)
, Hashable (SessionKey e (BlockChunks e))
, Typeable (SessionKey e (BlockChunks e))
, Eq (SessionKey e (BlockChunks e))
, Num (Peer e) , Num (Peer e)
-- , Ord (Peer e)
) => PeerM e IO () ) => PeerM e IO ()
blockDownloadLoop = do blockDownloadLoop = do
@ -168,8 +175,17 @@ blockDownloadLoop = do
debug $ "subscribing to" <+> pretty h debug $ "subscribing to" <+> pretty h
subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \_ -> do subscribe @e @(BlockSize e) (BlockSizeEventKey h) $ \(BlockSizeEvent (p,h,s)) -> do
debug $ "can't believe this shit works" <+> pretty h debug $ "can't believe this shit works" <+> pretty h
coo <- genCookie (p,h)
let key = DownloadSessionKey (p, coo)
let chusz = defChunkSize
let new = set sBlockChunkSize chusz
. set sBlockSize (fromIntegral s)
$ newBlockDownload h
update @e new key id
request p (BlockChunks coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
request 1 (GetBlockSize @e h) request 1 (GetBlockSize @e h)
@ -179,6 +195,77 @@ blockDownloadLoop = do
pause ( 0.85 :: Timeout 'Seconds ) pause ( 0.85 :: Timeout 'Seconds )
next next
mkAdapter :: forall e m . ( m ~ PeerM e IO
, HasProtocol e (BlockChunks e)
) => m (BlockChunksI e (ResponseM e m ))
mkAdapter = do
-- storage <- asks (view _envS
storage <- getStorage
pure $
BlockChunksI
{ blkSize = hasBlock storage
, blkChunk = getChunk storage
, blkGetHash = \c -> find (DownloadSessionKey c) (view sBlockHash)
-- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК):
-- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ
-- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО:
-- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ
-- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ?
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = DownloadSessionKey (p,c)
-- check if there is a session
-- FIXME:
-- TODO: log situation when no session
dwnld <- MaybeT $ find cKey id
let bslen = fromIntegral $ B8.length bs
let mbSize = view sBlockSize dwnld
let mbChSize = view sBlockChunkSize dwnld
let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset
liftIO $ do
writeChunk cww cKey h offset0 bs
let written = view sBlockWritten dwnld + bslen
let maxOff = max offset0 (view sBlockOffset dwnld)
lift $ update dwnld cKey ( set sBlockOffset maxOff
. set sBlockWritten written
)
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
&& written >= mbSize
when mbDone $ lift do
deferred (Proxy @(BlockChunks e)) $ do
h1 <- liftIO $ getHash cww cKey h
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
when ( h1 == h ) $ do
liftIO $ commitBlock cww cKey h
expire cKey
-- FIXME: return this event!
-- lift $ runEngineM env $ emitBlockReadyEvent ev h -- TODO: fix this crazy shit
when (written > mbSize * defBlockDownloadThreshold) $ do
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
lift $ expire cKey
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
-- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
-- ТАК НЕ ПОЙДЕТ
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
}
main :: IO () main :: IO ()
main = do main = do
hSetBuffering stderr LineBuffering hSetBuffering stderr LineBuffering
@ -204,21 +291,22 @@ main = do
debug $ "I'm" <+> pretty p <+> pretty root debug $ "I'm" <+> pretty p <+> pretty root
runPeerM (AnyStorage s) fake p $ do runPeerM (AnyStorage s) fake p $ do
adapter <- mkAdapter
runProto @Fake runProto @Fake
[ makeResponse (blockSizeProto findBlk dontHandle) [ makeResponse (blockSizeProto findBlk dontHandle)
-- , makeResponse (blockChunksProto undefined) , makeResponse (blockChunksProto adapter)
] ]
our <- async $ runTestPeer p0 $ \s -> do our <- async $ runTestPeer p0 $ \s -> do
let blk = hasBlock s let blk = hasBlock s
runPeerM (AnyStorage s) fake p0 $ do runPeerM (AnyStorage s) fake p0 $ do
adapter <- mkAdapter
env <- ask env <- ask
as <- liftIO $ async $ withPeerM env blockDownloadLoop as <- liftIO $ async $ withPeerM env blockDownloadLoop
runProto @Fake runProto @Fake
[ makeResponse (blockSizeProto blk handleBlockInfo) [ makeResponse (blockSizeProto blk handleBlockInfo)
-- , makeResponse (blockChunksProto undefined) , makeResponse (blockChunksProto adapter)
] ]
liftIO $ cancel as liftIO $ cancel as