gonna-fuckup

This commit is contained in:
Dmitry Zuikov 2023-01-20 19:09:11 +03:00
parent ec417fe3ef
commit 5d3d60778d
5 changed files with 138 additions and 144 deletions

View File

@ -24,7 +24,7 @@ data Pipeline m a =
, toQueue :: TBMQueue ( m a )
}
newPipeline :: forall a (m :: Type -> Type) . MonadIO m => Int -> m (Pipeline m a)
newPipeline :: forall a (m1 :: Type -> Type) (m :: Type -> Type) . (MonadIO m, MonadIO m1) => Int -> m (Pipeline m1 a)
newPipeline size = do
tv <- liftIO $ TVar.newTVarIO False
liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv

View File

@ -39,7 +39,7 @@ class Typeable a => Unkey a where
instance Typeable a => Unkey a where
unfuck _ = fromDynamic @a
newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a, Show a) => a -> SKey
newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey
newSKey s = SKey (Proxy @a) (toDyn s)
@ -143,8 +143,6 @@ instance ( MonadIO m
, Typeable (SessionKey e p)
, Typeable (SessionData e p)
, Hashable (SessionKey e p)
, Show (SessionData e p)
, Show (SessionKey e p)
) => Sessions e p (ResponseM e m) where
fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine)
@ -160,8 +158,6 @@ instance ( MonadIO m
, Typeable (SessionKey e p)
, Typeable (SessionData e p)
, Hashable (SessionKey e p)
, Show (SessionData e p)
, Show (SessionKey e p)
) => Sessions e p (EngineM e m) where
@ -170,8 +166,6 @@ instance ( MonadIO m
let sk = newSKey @(SessionKey e p) k
let ddef = toDyn def
liftIO $ print ("fetch!", show k)
r <- liftIO $ Cache.lookup se sk
case r of
@ -183,10 +177,7 @@ instance ( MonadIO m
update def k f = do
se <- asks (view sessions)
val <- fetch @e @p True def k id
liftIO $ print "UPDATE !!!!"
liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val))
z <- liftIO $ Cache.lookup se (newSKey k)
liftIO $ print $ ("INSERTED SHIT", z)
expire k = do
se <- asks (view sessions)
@ -266,10 +257,5 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
, handle = h
}) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg)
-- FIXME: slow and dumb
instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e (EngineM e m) where
genCookie salt = do
r <- liftIO $ Random.randomIO @Int
pure $ fromInteger $ fromIntegral $ asWord32 $ hash32 (hash salt + r)

View File

@ -19,6 +19,9 @@ defPipelineSize = 100
defChunkWriterQ :: Integral a => a
defChunkWriterQ = 100
defBlockDownloadQ :: Integral a => a
defBlockDownloadQ = 100
defBlockDownloadThreshold :: Integral a => a
defBlockDownloadThreshold = 2

View File

@ -1,6 +1,7 @@
{-# Language TypeFamilyDependencies #-}
{-# Language FunctionalDependencies #-}
{-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-}
module HBS2.Net.Proto.Types
( module HBS2.Net.Proto.Types
) where
@ -11,6 +12,8 @@ import Data.Proxy
import Data.Hashable
import Control.Monad.IO.Class
import Data.Typeable
import System.Random qualified as Random
import Data.Digest.Murmur32
-- e -> Transport (like, UDP or TChan)
-- p -> L4 Protocol (like Ping/Pong)
@ -93,3 +96,10 @@ class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where
decode :: Encoded e -> Maybe p
encode :: p -> Encoded e
-- FIXME: slow and dumb
instance {-# OVERLAPPABLE #-} (MonadIO m, Num (Cookie e)) => GenCookie e m where
genCookie salt = do
r <- liftIO $ Random.randomIO @Int
pure $ fromInteger $ fromIntegral $ asWord32 $ hash32 (hash salt + r)

View File

@ -7,6 +7,7 @@ module Main where
import HBS2.Prelude.Plated
import HBS2.Clock
import HBS2.Hash
import HBS2.Actors
-- import HBS2.Net.Messaging
import HBS2.Net.Proto
import HBS2.Net.Proto.BlockInfo
@ -48,9 +49,12 @@ import Control.Concurrent
import Data.Default
import Control.Monad.Reader
import Data.Dynamic
import Data.Kind
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue qualified as Q
import Control.Concurrent.STM.TBQueue qualified as TBQ
import Control.Concurrent.STM.TBQueue (TBQueue)
debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p
@ -62,35 +66,19 @@ debug p = liftIO $ hPrint stderr p
-- but client's cookie in protocol should be just ( cookie :: Word32 )
data BlockDownload m =
data BlockDownload =
BlockDownload
{ _sBlockHash :: Hash HbSync
, _sBlockChunkSize :: ChunkSize
, _sBlockOffset :: Offset
, _sBlockWritten :: Size
, _sOnBlockReady :: OnBlockReady HbSync m
}
data MySessions e m =
Sessions
{ _sBlockDownload :: Cache (Peer e, Cookie e) (BlockDownload m)
, _sBlockSizes :: Cache (Hash HbSync) (Map (Peer e) Size)
, _sBlockSize :: Cache (Hash HbSync) Size
}
makeLenses 'Sessions
makeLenses 'BlockDownload
newBlockDownload :: forall m . MonadIO m
=> Hash HbSync
-> OnBlockReady HbSync m
-> BlockDownload m
newBlockDownload :: Hash HbSync -> BlockDownload
newBlockDownload h = BlockDownload h 0 0 0
data Fake
instance HasPeer Fake where
@ -117,6 +105,12 @@ instance HasProtocol Fake (BlockChunks Fake) where
type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake
type instance SessionData Fake (BlockChunks Fake) = BlockDownload
newtype instance SessionKey Fake (BlockChunks Fake) =
DownloadSessionKey (Peer Fake, Cookie Fake)
deriving newtype (Eq, Hashable)
deriving stock (Generic)
newtype BlockSizeSession e =
BlockSizeSession
@ -143,63 +137,56 @@ main = do
-- ]
emptySessions :: forall e m . MonadIO m => m (MySessions e m)
emptySessions = liftIO $
Sessions <$> Cache.newCache (Just defCookieTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
<*> Cache.newCache (Just defBlockInfoTimeout)
newtype PeerEvents e (m :: Type -> Type) =
PeerEvents
{ onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m])
}
newSession :: (Eq k, Hashable k,MonadIO m)
=> s
-> Getting (Cache k v) s (Cache k v)
-> k
-> v
-> m ()
newPeerEventsIO :: forall e m . MonadIO m => IO (PeerEvents e m)
newPeerEventsIO = PeerEvents <$> newTVarIO mempty
newSession se l k v = do
let cache = view l se
liftIO $ Cache.insert cache k v
addBlockSizeEventNotify :: forall e m . (MonadIO m)
=> PeerEvents e m
-> Hash HbSync
-> HasBlockEvent HbSync e m
-> m ()
getSession' se l k fn = do
let cache = view l se
liftIO $ Cache.lookup cache k <&> fmap fn
addBlockSizeEventNotify pe h e = do
void $ liftIO $ atomically $ modifyTVar' (onBlockSize pe) (Map.insertWith (<>) h [e])
getSession se l k = getSession' se l k id
emitBlockSizeEvent :: MonadIO m
=> PeerEvents e m
-> Hash HbSync
-> (Peer e, Hash HbSync, Maybe Integer)
-> m ()
updSession se def l k fn = liftIO do
let cache = view l se
v <- Cache.fetchWithCache cache k (const $ pure def)
Cache.insert cache k (fn v)
emitBlockSizeEvent pe h event = do
ev <- liftIO $ atomically $ stateTVar (onBlockSize pe) alter
for_ ev $ \e -> e event
delSession se l k = liftIO do
Cache.delete (view l se) k
expireSession se l = liftIO do
Cache.purgeExpired (view l se)
-- A questionable FIX to avoid "orphans" complains
data Adapted e = Adapted
where
alter m =
let ev = Map.lookup h m
in (mconcat (maybeToList ev), Map.delete h m)
-- newtype FullPeerM m a = RealPeerM { fromRealPeerM :: ReaderT }
runFakePeer :: forall e b . ( e ~ Fake
-- , m ~ IO
, Messaging b e ByteString
-- , MonadIO m
-- , Response e p m
-- , EngineM e m
)
=> Peer e
runFakePeer :: forall e b m . ( e ~ Fake
, MonadIO m
, Messaging b e ByteString
-- , MonadIO m
-- , Response e p m
-- , EngineM e m
)
=> PeerEvents e m
-> Peer e
-> b
-> EngineM e IO ()
-> IO ()
runFakePeer p0 bus work = do
-> EngineM e m ()
-> IO ()
runFakePeer ev p0 bus work = do
env <- newEnv p0 bus
se <- emptySessions @e
let pid = fromIntegral (hash (env ^. self)) :: Word8
dir <- liftIO $ canonicalizePath ( ".peers" </> show pid)
@ -233,25 +220,14 @@ runFakePeer p0 bus work = do
maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz
z <- fetch @e False def (BlockSizeKey h) id
liftIO $ print ("QQQQQ", pretty p0, pretty p, z)
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
-- here we cache block size information
updSession se mempty sBlockSizes h (Map.insert p bsz)
updSession se bsz sBlockSize h (const bsz)
debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz'
z <- fetch @e False def (BlockSizeKey h) id
liftIO $ print ("BEBEBE", pretty p0, pretty p, z)
emitBlockSizeEvent ev h (p, h, Just sz)
let adapter =
BlockChunksI
{ blkSize = hasBlock storage
, blkChunk = getChunk storage
, blkGetHash = \c -> getSession' se sBlockDownload c (view sBlockHash)
, blkGetHash = error "FUCK" -- FIXME! \c -> getSession' se sBlockDownload c (view sBlockHash)
-- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК):
-- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ
@ -261,55 +237,58 @@ runFakePeer p0 bus work = do
-- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
let cKey = (p,c)
let cKey = DownloadSessionKey (p,c)
-- check if there is a session
void $ MaybeT $ getSession' se sBlockDownload cKey id
-- FIXME
-- void $ MaybeT $ getSession' se sBlockDownload cKey id
let def = newBlockDownload h dontHandle
let de = newBlockDownload h
let bslen = fromIntegral $ B8.length bs
-- TODO: log this situation
mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize)
-- FIXME
-- mbSize <- MaybeT $ getSession' se sBlockSizes h (Map.lookup p) <&> fromMaybe Nothing
-- mbChSize <- MaybeT $ getSession' se sBlockDownload cKey (view sBlockChunkSize)
let offset = fromIntegral n * fromIntegral mbChSize :: Offset
-- let offset = fromIntegral n * fromIntegral mbChSize :: Offset
updSession se def sBlockDownload cKey (over sBlockOffset (max offset))
-- updSession se de sBlockDownload cKey (over sBlockOffset (max offset))
liftIO $ do
writeChunk cww cKey h offset bs
updSession se def sBlockDownload cKey (over sBlockWritten (+bslen))
-- liftIO $ do
-- writeChunk cww cKey h offset bs
-- updSession se de sBlockDownload cKey (over sBlockWritten (+bslen))
dwnld <- MaybeT $ getSession' se sBlockDownload cKey id
-- dwnld <- MaybeT $ getSession' se sBlockDownload cKey id
let maxOff = view sBlockOffset dwnld
let written = view sBlockWritten dwnld
let notify = view sOnBlockReady dwnld
-- let maxOff = view sBlockOffset dwnld
-- let written = view sBlockWritten dwnld
-- let notify = view sOnBlockReady dwnld
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
&& written >= mbSize
-- let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
-- && written >= mbSize
when mbDone $ lift do
deferred (Proxy @(BlockChunks e)) $ do
h1 <- liftIO $ getHash cww cKey h
-- when mbDone $ lift do
-- deferred (Proxy @(BlockChunks e)) $ do
-- h1 <- liftIO $ getHash cww cKey h
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
when ( h1 == h ) $ do
lift $ commitBlock cww cKey h
lift $ notify h
delSession se sBlockDownload cKey
-- -- ПОСЧИТАТЬ ХЭШ
-- -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
-- when ( h1 == h ) $ do
-- lift $ commitBlock cww cKey h
-- lift $ notify h
-- delSession se sBlockDownload cKey
when (written > mbSize * defBlockDownloadThreshold) $ do
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
delSession se sBlockDownload cKey
-- when (written > mbSize * defBlockDownloadThreshold) $ do
-- debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
-- delSession se sBlockDownload cKey
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
-- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
-- ТАК НЕ ПОЙДЕТ
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
pure ()
}
peer <- async $ runPeer env
@ -333,44 +312,60 @@ test1 = do
fake <- newFakeP2P True
void $ race (pause (10 :: Timeout 'Seconds)) $ do
void $ race (pause (2 :: Timeout 'Seconds)) $ do
let p0 = 0 :: Peer Fake
let p1 = 1 :: Peer Fake
p1Thread <- async $ runFakePeer p1 fake (liftIO $ forever yield)
ev1 <- liftIO newPeerEventsIO
ev0 <- liftIO newPeerEventsIO
p0Thread <- async $ runFakePeer p0 fake $ do
p1Thread <- async $ runFakePeer ev1 p1 fake (liftIO $ forever yield)
let h1 = "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
let h0 = "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
]
-- fetch @Fake @(BlockSize Fake) True def h id
-- update @Fake @(BlockSize Fake) def (fromString h1) (over bsBlockSizes (Map.insert p1 111))
update @Fake @(BlockSize Fake) def (BlockSizeKey (fromString h0)) (over bsBlockSizes (Map.insert p0 100))
blkQ <- liftIO $ do
b <- newTBQueueIO defBlockDownloadQ
traverse_ (atomically . TBQ.writeTBQueue b) ini
pure b
-- request p1 (GetBlockSize @Fake (fromString h1))
request p0 (GetBlockSize @Fake (fromString h0))
p0Thread <- async $ runFakePeer ev0 p0 fake $ do
se1 <- fetch @Fake @(BlockSize Fake) False def (BlockSizeKey (fromString h0)) id
-- se2 <- fetch @Fake @(BlockSize Fake) False def (fromString h1) id
let knownPeers = [p1]
jopa <- asks (view sessions)
fix \next -> do
wtf <- liftIO $ Cache.lookup jopa (newSKey @(SessionKey Fake (BlockSize Fake)) (BlockSizeKey (fromString h0)))
-- НА САМОМ ДЕЛЕ НАМ НЕ НАДО ЖДАТЬ БЛОКИНФЫ.
-- НАМ НАДО ОТПРАВЛЯТЬ КАЧАТЬ БЛОК, КАК ТОЛЬКО
-- ПО НЕМУ ПОЯВИЛАСЬ ИНФА
pause ( 2 :: Timeout 'Seconds)
blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ
liftIO $ print $ (p0, "AAAAAA", se1, fromDynamic @(SessionData Fake (BlockSize Fake)) (fromJust wtf))
-- TODO: надо трекать, может блок-то и найден
-- либо по всем пирам спросить
-- updateSession cookie (id)
-- se <- getSession cookie (lens)
-- cookie <- newSession ???
addBlockSizeEventNotify ev0 blkHash $ \case
(p, h, Just _) -> do
-- coo <- genCookie (p,blkHash)
-- let key = DownloadSessionKey (p, coo)
-- let new = newBlockDownload blkHash
-- update @Fake new key id
-- (over bsBlockSizes (Map.insert p bsz))
request p (GetBlockSize @Fake blkHash)
-- liftIO $ print $ "DAVAI BLOCK!" <+> pretty h
-- update
-- let q = pure ()
pure ()
-- newCookie <- genCookie @Fake (p1, h) -- <<~~~ FIXME: generate a good session id!
-- let cKey@(_, cookie) = (p1, newCookie)
_ -> pure ()
pure ()
-- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ
for_ knownPeers $ \who ->
request who (GetBlockSize @Fake blkHash)
next
let peerz = p0Thread : [p1Thread]
@ -427,7 +422,7 @@ test1 = do
-- pure ()
pause ( 5 :: Timeout 'Seconds)
pause ( 1 :: Timeout 'Seconds)
mapM_ cancel peerz