This commit is contained in:
Dmitry Zuikov 2023-01-18 17:42:17 +03:00
parent d3a40299d6
commit d338c5c37a
5 changed files with 98 additions and 43 deletions

View File

@ -65,6 +65,7 @@ library
exposed-modules:
HBS2.Actors
, HBS2.Actors.ChunkWriter
, HBS2.Clock
, HBS2.Data.Types
, HBS2.Data.Types.Refs
@ -98,6 +99,8 @@ library
, containers
, cryptonite
, deepseq
, directory
, filepath
, hashable
, interpolatedstring-perl6
, memory

View File

@ -16,6 +16,9 @@ defStorePath = "hbs2"
defPipelineSize :: Int
defPipelineSize = 100
defChunkWriterQ :: Integral a => a
defChunkWriterQ = 100
-- typical block hash 530+ chunks * parallel wip blocks amount
defProtoPipelineSize :: Int
defProtoPipelineSize = 65536
@ -23,3 +26,6 @@ defProtoPipelineSize = 65536
defCookieTimeout :: TimeSpec
defCookieTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)
defBlockInfoTimeout :: TimeSpec
defBlockInfoTimeout = toTimeSpec ( 10 :: Timeout 'Minutes)

View File

@ -2,14 +2,16 @@ module HBS2.Prelude
( module Data.String
, module Safe
, MonadIO(..)
, void
, void, guard
, maybe1
, Hashable
) where
import Data.String (IsString(..))
import Safe
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (void)
import Control.Monad (void,guard)
import Data.Hashable (Hashable)
maybe1 :: Maybe a -> b -> (a -> b) -> b

View File

@ -23,11 +23,11 @@ newtype StoragePrefix = StoragePrefix { fromPrefix :: FilePath }
type family Block block :: Type
newtype Offset = Offset Integer
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable)
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable,Pretty)
deriving stock (Show)
newtype Size = Size Integer
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable)
deriving newtype (Eq,Ord,Enum,Num,Real,Integral,Hashable,Pretty)
deriving stock (Show)
class ( Monad m

View File

@ -14,6 +14,7 @@ import HBS2.Defaults
import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra
import HBS2.Actors.ChunkWriter
-- import Test.Tasty hiding (Timeout)
import Test.Tasty.HUnit
@ -35,6 +36,9 @@ import System.FilePath.Posix
import System.IO
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Map (Map)
import Data.Map qualified as Map
import Control.Monad.Trans.Maybe
debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p
@ -49,9 +53,12 @@ newtype ChunkNum = ChunkNum Word16
deriving stock (Eq,Ord,Show,Data,Generic)
newtype Sessions e =
data Sessions e =
Sessions
{ _sBlockHash :: Cache (Cookie e) (Hash HbSync)
, _sBlockChunkSize :: Cache (Cookie e) ChunkSize
, _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size)
, _sBlockSize :: Cache (Hash HbSync) Size
}
makeLenses 'Sessions
@ -64,7 +71,7 @@ data BlockChunksI e m =
{ blkSize :: GetBlockSize HbSync m
, blkChunk :: GetBlockChunk HbSync m
, blkGetHash :: Cookie e -> m (Maybe (Hash HbSync))
, blkAcceptChunk :: (Hash HbSync, ChunkNum, ByteString) -> m ()
, blkAcceptChunk :: Response e (BlockChunks e) m => (Cookie e, Peer e, Hash HbSync, ChunkNum, ByteString) -> m ()
}
@ -107,15 +114,15 @@ blockChunksProto adapter (BlockChunks c p) =
let offsets = zip offsets' [0..]
for_ offsets $ \((o,sz),i) -> do
p <- thatPeer proto
chunk <- blkChunk adapter h o sz
maybe (pure ()) (response_ . BlockChunk @e i) chunk
BlockChunk n bs -> do
h <- blkGetHash adapter c
who <- thatPeer proto
maybe1 h (response_ (BlockLost @e)) $ \hh -> do
blkAcceptChunk adapter (hh, n, bs)
blkAcceptChunk adapter (c, who, hh, n, bs)
BlockNoChunks {} -> do
-- TODO: notification
@ -165,14 +172,11 @@ main = do
emptySessions :: IO (Sessions e)
emptySessions = do
bh <- Cache.newCache (Just defCookieTimeout)
pure $
Sessions
{ _sBlockHash = bh
}
emptySessions =
Sessions <$> Cache.newCache (Just defCookieTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
newSession :: (Eq k, Hashable k,MonadIO m)
=> s
@ -187,9 +191,16 @@ newSession se l k v = do
withNewSession se l k v m = newSession se l k v >> m
getSession se l k = do
getSession' se l k fn = do
let cache = view l se
liftIO $ Cache.lookup cache k
liftIO $ Cache.lookup cache k <&> fmap fn
getSession se l k = getSession' se l k id
updSession se def l k fn = liftIO do
let cache = view l se
v <- Cache.fetchWithCache cache k (const $ pure def)
Cache.insert cache k (fn v)
runFakePeer :: forall e . e ~ Fake => Sessions e -> EngineEnv e -> IO ()
runFakePeer se env = do
@ -198,6 +209,8 @@ runFakePeer se env = do
dir <- canonicalizePath ( ".peers" </> show pid)
let chDir = dir </> "tmp-chunks"
createDirectoryIfMissing True dir
let opts = [ StoragePrefix dir
@ -207,6 +220,10 @@ runFakePeer se env = do
w <- async $ simpleStorageWorker storage
cww <- newChunkWriterIO (Just chDir)
cw <- async $ runChunkWriter cww
let size = 1024*1024
let blk = B8.concat [ fromString (take 1 $ show x)
@ -217,19 +234,50 @@ runFakePeer se env = do
debug $ "I'm" <+> pretty pid <+> pretty root
let handleBlockInfo (p, h, sz) = do
debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz
let handleBlockInfo (p, h, sz') = do
maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz
updSession se mempty sBlockSizes h (Map.insert p bsz)
updSession se bsz sBlockSize h (const bsz)
blkCookies <- Cache.newCache @(Cookie e) @(Hash HbSync) (Just defCookieTimeout)
debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz'
let adapter =
BlockChunksI
{ blkSize = hasBlock storage
, blkChunk = getChunk storage
, blkGetHash = getSession se sBlockHash
, blkAcceptChunk = \(h,n,bs) -> debug $ "got chunk" <+> pretty h
-- И ЧТО ТУТ ДЕЛАТЬ.
-- ЗАПИСАТЬ ЧАНК В ФАЙЛ КУДА-ТО НА TMP (КУДА?
-- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК):
-- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ
-- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО:
-- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ
-- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ?
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
-- TODO: log this situation
mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
mbChSize <- MaybeT $ getSession se sBlockChunkSize c
let offset = fromIntegral n * fromIntegral mbChSize :: Offset
liftIO $ do
-- newBlock cww (p,c) h mbSize
writeChunk cww (p,c) h offset bs
-- ОТКУДА УЗНАТЬ РАЗМЕР БЛОКА?
-- ДОПУСТИМ, ОТ БЛОКИНФО?
-- ЕСЛИ НИЧЕГО НЕТ? => BLOCK_LOST
debug $ "got chunk" <+> pretty p
<+> pretty h
<+> pretty n
<+> parens ("off:" <+> pretty offset)
<+> pretty (B8.length bs)
-- <+> parens (pretty mbSize)
-- <+> braces ("chunkSize:" <+> pretty mbChSize)
}
runPeer env
@ -237,11 +285,13 @@ runFakePeer se env = do
, makeResponse (blockChunksProto adapter)
]
cancel w
simpleStorageStop storage
pure ()
stopChunkWriter cww
pause ( 0.25 :: Timeout 'Seconds)
mapM_ cancel [w,cw]
test1 :: IO ()
@ -258,7 +308,7 @@ test1 = do
mtS <- emptySessions @Fake
let ee = zip (repeat mtS) envs
void $ race (pause (2 :: Timeout 'Seconds)) $ do
void $ race (pause (5 :: Timeout 'Seconds)) $ do
peerz <- mapM (async . uncurry runFakePeer) ee
@ -271,11 +321,17 @@ test1 = do
let h = fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
-- TODO: generate unique cookie!!
let cookie = 0
let s0 = (fst . head) ee
-- getSession' se sBlockSizes h ???
withNewSession s0 sBlockHash cookie h $ do
request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h defChunkSize))
let chsz = defChunkSize
updSession s0 chsz sBlockChunkSize cookie (const chsz)
request p1 (BlockChunks @Fake cookie (BlockGetAllChunks h chsz))
pure ()
@ -289,20 +345,8 @@ test1 = do
-- НО ХЗ ГДЕ ДЕРЖАТЬ САМ КЭШ для конкретного подпротокола
-- request p1 (BlockGetAllChunks @Fake 0 (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
-- Q1: ЧТО ДЕЛАТЬ
-- Q1.1: КАК КУКА ПОПАДЁТ в то, где работает "adapter"
-- Q2: КАК ДЕЛАТЬ ЗАПРОСЫ
--
-- ОТСЮДА СЛЕДУЕТ: Cookie должны жить в Engine и быть там доступны
-- В монаде Response тоже должна быть кука
--
-- НУ есть кука и чо? какие данные с ней ассоциированы?
-- какого блеать типа?
--
-- Как быть с тем, что кука может не поддерживаться подпротоколом?
-- Требовать HasCookie у всех?
pause ( 1 :: Timeout 'Seconds)
pause ( 2 :: Timeout 'Seconds)
mapM_ cancel peerz