mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
717f24deab
commit
7e4914b73e
|
@ -1,252 +0,0 @@
|
||||||
{-# Language TemplateHaskell #-}
|
|
||||||
{-# Language UndecidableInstances #-}
|
|
||||||
module HBS2.Actors.OldPeer where
|
|
||||||
|
|
||||||
import HBS2.Prelude
|
|
||||||
import HBS2.Prelude.Plated
|
|
||||||
import HBS2.Net.Proto
|
|
||||||
import HBS2.Net.Messaging
|
|
||||||
import HBS2.Clock
|
|
||||||
import HBS2.Actors
|
|
||||||
import HBS2.Defaults
|
|
||||||
|
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
|
||||||
import Control.Monad.Reader
|
|
||||||
import Control.Monad.Trans.Maybe
|
|
||||||
import Data.ByteString.Lazy ( ByteString )
|
|
||||||
import Data.Digest.Murmur32
|
|
||||||
import Data.Foldable hiding (find)
|
|
||||||
import Data.Hashable
|
|
||||||
import Data.Kind
|
|
||||||
import Data.Map qualified as Map
|
|
||||||
import GHC.TypeLits
|
|
||||||
import Lens.Micro.Platform
|
|
||||||
import System.Random qualified as Random
|
|
||||||
import Data.Cache (Cache)
|
|
||||||
import Data.Cache qualified as Cache
|
|
||||||
import Data.Dynamic
|
|
||||||
import Data.Maybe
|
|
||||||
|
|
||||||
import Codec.Serialise hiding (encode,decode)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
data AnyMessage e = AnyMessage Integer (Encoded e)
|
|
||||||
deriving stock (Generic)
|
|
||||||
|
|
||||||
instance Serialise (Encoded e) => Serialise (AnyMessage e)
|
|
||||||
|
|
||||||
|
|
||||||
data EngineEnv e = forall bus . ( Messaging bus e ByteString
|
|
||||||
, Serialise (Encoded e)
|
|
||||||
) =>
|
|
||||||
EngineEnv
|
|
||||||
{ _peer :: Maybe (Peer e)
|
|
||||||
, _self :: Peer e
|
|
||||||
, _sessions :: Cache SKey Dynamic
|
|
||||||
, bus :: bus
|
|
||||||
, defer :: Pipeline IO ()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a }
|
|
||||||
deriving newtype ( Functor
|
|
||||||
, Applicative
|
|
||||||
, Monad
|
|
||||||
, MonadIO
|
|
||||||
, MonadReader (EngineEnv e)
|
|
||||||
, MonadTrans
|
|
||||||
)
|
|
||||||
|
|
||||||
data ResponseEnv e =
|
|
||||||
ResponseEnv
|
|
||||||
{ _engine :: EngineEnv e
|
|
||||||
, _respPeer :: Peer e
|
|
||||||
}
|
|
||||||
|
|
||||||
newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a }
|
|
||||||
deriving newtype ( Functor
|
|
||||||
, Applicative
|
|
||||||
, Monad
|
|
||||||
, MonadIO
|
|
||||||
, MonadReader (ResponseEnv e)
|
|
||||||
, MonadTrans
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
makeLenses 'EngineEnv
|
|
||||||
|
|
||||||
makeLenses 'ResponseEnv
|
|
||||||
|
|
||||||
data AnyProtocol e m = forall p . ( HasProtocol e p
|
|
||||||
, Response e p m
|
|
||||||
) =>
|
|
||||||
AnyProtocol
|
|
||||||
{ myProtoId :: Integer
|
|
||||||
, protoDecode :: Encoded e -> Maybe p
|
|
||||||
, protoEncode :: p -> Encoded e
|
|
||||||
, handle :: p -> m ()
|
|
||||||
}
|
|
||||||
|
|
||||||
makeResponse :: forall e p m . ( MonadIO m
|
|
||||||
, Response e p m
|
|
||||||
, HasProtocol e p
|
|
||||||
)
|
|
||||||
=> (p -> m ()) -> AnyProtocol e m
|
|
||||||
|
|
||||||
makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p))
|
|
||||||
, protoDecode = decode
|
|
||||||
, protoEncode = encode
|
|
||||||
, handle = h
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
runEngineM :: EngineEnv e -> EngineM e m a -> m a
|
|
||||||
runEngineM e f = runReaderT (fromEngine f) e
|
|
||||||
|
|
||||||
runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a
|
|
||||||
runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p)
|
|
||||||
|
|
||||||
instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where
|
|
||||||
request p msg = do
|
|
||||||
let proto = protoId @e @p (Proxy @p)
|
|
||||||
ask >>= \case
|
|
||||||
EngineEnv { _self = s, bus = b} -> do
|
|
||||||
let bs = serialise (AnyMessage @e proto (encode msg))
|
|
||||||
liftIO $ sendTo b (To p) (From s) bs
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
instance ( MonadIO m
|
|
||||||
, HasProtocol e p
|
|
||||||
, Eq (SessionKey e p)
|
|
||||||
, Typeable (SessionKey e p)
|
|
||||||
, Typeable (SessionData e p)
|
|
||||||
, Hashable (SessionKey e p)
|
|
||||||
) => Sessions e p (ResponseM e m) where
|
|
||||||
|
|
||||||
find k f = flip runEngineM (find k f) =<< asks (view engine)
|
|
||||||
|
|
||||||
fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine)
|
|
||||||
|
|
||||||
update d k f = flip runEngineM (update d k f) =<< asks (view engine)
|
|
||||||
|
|
||||||
expire k = flip runEngineM (expire k) =<< asks (view engine)
|
|
||||||
|
|
||||||
|
|
||||||
instance ( MonadIO m
|
|
||||||
, HasProtocol e p
|
|
||||||
, Eq (SessionKey e p)
|
|
||||||
, Typeable (SessionKey e p)
|
|
||||||
, Typeable (SessionData e p)
|
|
||||||
, Hashable (SessionKey e p)
|
|
||||||
) => Sessions e p (EngineM e m) where
|
|
||||||
|
|
||||||
|
|
||||||
find k f = do
|
|
||||||
se <- asks (view sessions)
|
|
||||||
let sk = newSKey @(SessionKey e p) k
|
|
||||||
r <- liftIO $ Cache.lookup se sk
|
|
||||||
case fromDynamic @(SessionData e p) <$> r of
|
|
||||||
Just v -> pure $ f <$> v
|
|
||||||
Nothing -> pure Nothing
|
|
||||||
|
|
||||||
fetch upd def k fn = do
|
|
||||||
se <- asks (view sessions)
|
|
||||||
let sk = newSKey @(SessionKey e p) k
|
|
||||||
let ddef = toDyn def
|
|
||||||
|
|
||||||
r <- liftIO $ Cache.lookup se sk
|
|
||||||
|
|
||||||
case r of
|
|
||||||
Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v )
|
|
||||||
Nothing -> do
|
|
||||||
when upd $ liftIO $ Cache.insert se sk ddef
|
|
||||||
pure (fn def)
|
|
||||||
|
|
||||||
update def k f = do
|
|
||||||
se <- asks (view sessions)
|
|
||||||
val <- fetch @e @p True def k id
|
|
||||||
liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val))
|
|
||||||
|
|
||||||
expire k = do
|
|
||||||
se <- asks (view sessions)
|
|
||||||
liftIO $ Cache.delete se (newSKey @(SessionKey e p) k)
|
|
||||||
|
|
||||||
instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where
|
|
||||||
|
|
||||||
thatPeer _ = asks (view respPeer)
|
|
||||||
|
|
||||||
deferred _ m = do
|
|
||||||
e@(EngineEnv { defer = d }) <- asks (view engine)
|
|
||||||
p <- asks (view respPeer)
|
|
||||||
addJob d (runEngineM e (runResponseM e p m))
|
|
||||||
|
|
||||||
response resp = do
|
|
||||||
env <- ask
|
|
||||||
let p = env ^. respPeer
|
|
||||||
let s = env ^. (engine . self)
|
|
||||||
let proto = protoId @e @p (Proxy @p)
|
|
||||||
let bs = serialise (AnyMessage @e proto (encode resp))
|
|
||||||
|
|
||||||
-- TODO: wrap somehow
|
|
||||||
case env ^. engine of
|
|
||||||
EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs
|
|
||||||
|
|
||||||
newEnv :: forall e bus m . ( Monad m
|
|
||||||
, MonadIO m
|
|
||||||
, Messaging bus e ByteString
|
|
||||||
, Serialise (Encoded e)
|
|
||||||
)
|
|
||||||
=> Peer e
|
|
||||||
-> bus
|
|
||||||
-> m (EngineEnv e)
|
|
||||||
|
|
||||||
newEnv p pipe = do
|
|
||||||
de <- liftIO $ newPipeline defProtoPipelineSize
|
|
||||||
se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass
|
|
||||||
pure $ EngineEnv Nothing p se pipe de
|
|
||||||
|
|
||||||
runPeer :: forall e m a . ( MonadIO m
|
|
||||||
)
|
|
||||||
=> EngineEnv e
|
|
||||||
-> [AnyProtocol e (ResponseM e m)]
|
|
||||||
-> m a
|
|
||||||
|
|
||||||
runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
|
|
||||||
|
|
||||||
let me = env ^. self
|
|
||||||
|
|
||||||
let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ]
|
|
||||||
|
|
||||||
let disp = Map.fromList resp
|
|
||||||
|
|
||||||
void $ liftIO $ async $ runPipeline d
|
|
||||||
|
|
||||||
runEngineM env $ do
|
|
||||||
|
|
||||||
forever $ do
|
|
||||||
messages <- receive pipe (To me)
|
|
||||||
|
|
||||||
for_ messages $ \(From pip, bs) -> do
|
|
||||||
|
|
||||||
case deserialiseOrFail @(AnyMessage e) bs of
|
|
||||||
|
|
||||||
Left _-> pure () -- liftIO $ print "failed to deserialise"
|
|
||||||
|
|
||||||
Right (AnyMessage n msg) -> do
|
|
||||||
|
|
||||||
local (set peer (Just pip)) do
|
|
||||||
|
|
||||||
ee <- ask
|
|
||||||
|
|
||||||
case Map.lookup n disp of
|
|
||||||
Nothing -> pure ()
|
|
||||||
|
|
||||||
Just (AnyProtocol { protoDecode = decoder
|
|
||||||
, handle = h
|
|
||||||
}) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,527 +0,0 @@
|
||||||
{-# Language RankNTypes #-}
|
|
||||||
{-# Language TemplateHaskell #-}
|
|
||||||
{-# Language AllowAmbiguousTypes #-}
|
|
||||||
{-# Language UndecidableInstances #-}
|
|
||||||
module Main where
|
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
|
||||||
import HBS2.Clock
|
|
||||||
import HBS2.Hash
|
|
||||||
import HBS2.Actors
|
|
||||||
-- import HBS2.Net.Messaging
|
|
||||||
import HBS2.Net.Proto
|
|
||||||
import HBS2.Net.Proto.BlockInfo
|
|
||||||
import HBS2.Net.Proto.BlockChunks
|
|
||||||
import HBS2.Net.Messaging
|
|
||||||
import HBS2.Net.Messaging.Fake
|
|
||||||
import HBS2.Actors.Peer
|
|
||||||
import HBS2.Defaults
|
|
||||||
|
|
||||||
import HBS2.Data.Types.Refs
|
|
||||||
import HBS2.Storage
|
|
||||||
import HBS2.Storage.Simple
|
|
||||||
import HBS2.Storage.Simple.Extra
|
|
||||||
import HBS2.Actors.ChunkWriter
|
|
||||||
|
|
||||||
-- import Test.Tasty hiding (Timeout)
|
|
||||||
import Test.Tasty.HUnit
|
|
||||||
|
|
||||||
import Codec.Serialise
|
|
||||||
import Control.Concurrent.Async
|
|
||||||
import Control.Monad
|
|
||||||
import Control.Monad.Trans.Maybe
|
|
||||||
import Data.ByteString.Lazy (ByteString)
|
|
||||||
import Data.ByteString.Lazy.Char8 qualified as B8
|
|
||||||
import Data.Cache (Cache)
|
|
||||||
import Data.Cache qualified as Cache
|
|
||||||
import Data.Foldable hiding (find)
|
|
||||||
import Data.Hashable
|
|
||||||
import Data.Map (Map)
|
|
||||||
import Data.Map qualified as Map
|
|
||||||
import Data.Maybe
|
|
||||||
import Data.Word
|
|
||||||
import Lens.Micro.Platform
|
|
||||||
import Prettyprinter
|
|
||||||
import System.Directory
|
|
||||||
import System.Exit
|
|
||||||
import System.FilePath.Posix
|
|
||||||
import System.IO
|
|
||||||
import Control.Concurrent
|
|
||||||
import Data.Default
|
|
||||||
import Control.Monad.Reader
|
|
||||||
import Data.Dynamic
|
|
||||||
import Data.Kind
|
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
|
||||||
import Control.Concurrent.STM.TQueue qualified as Q
|
|
||||||
import Control.Concurrent.STM.TBQueue qualified as TBQ
|
|
||||||
import Control.Concurrent.STM.TBQueue (TBQueue)
|
|
||||||
|
|
||||||
debug :: (MonadIO m) => Doc ann -> m ()
|
|
||||||
debug p = liftIO $ hPrint stderr p
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- FIXME: peer should be a part of the key
|
|
||||||
-- therefore, key is ( p | cookie )
|
|
||||||
-- but client's cookie in protocol should be just ( cookie :: Word32 )
|
|
||||||
|
|
||||||
|
|
||||||
data BlockDownload =
|
|
||||||
BlockDownload
|
|
||||||
{ _sBlockHash :: Hash HbSync
|
|
||||||
, _sBlockSize :: Size
|
|
||||||
, _sBlockChunkSize :: ChunkSize
|
|
||||||
, _sBlockOffset :: Offset
|
|
||||||
, _sBlockWritten :: Size
|
|
||||||
}
|
|
||||||
|
|
||||||
makeLenses 'BlockDownload
|
|
||||||
|
|
||||||
newBlockDownload :: Hash HbSync -> BlockDownload
|
|
||||||
newBlockDownload h = BlockDownload h 0 0 0 0
|
|
||||||
|
|
||||||
data Fake
|
|
||||||
|
|
||||||
instance HasPeer Fake where
|
|
||||||
newtype instance Peer Fake = FakePeer Word8
|
|
||||||
deriving newtype (Hashable,Num,Enum,Real,Integral)
|
|
||||||
deriving stock (Eq,Ord,Show)
|
|
||||||
|
|
||||||
|
|
||||||
instance Pretty (Peer Fake) where
|
|
||||||
pretty (FakePeer n) = parens ("peer" <+> pretty n)
|
|
||||||
|
|
||||||
|
|
||||||
instance HasProtocol Fake (BlockSize Fake) where
|
|
||||||
type instance ProtocolId (BlockSize Fake) = 1
|
|
||||||
type instance Encoded Fake = ByteString
|
|
||||||
decode = either (const Nothing) Just . deserialiseOrFail
|
|
||||||
encode = serialise
|
|
||||||
|
|
||||||
instance HasProtocol Fake (BlockChunks Fake) where
|
|
||||||
type instance ProtocolId (BlockChunks Fake) = 2
|
|
||||||
type instance Encoded Fake = ByteString
|
|
||||||
decode = either (const Nothing) Just . deserialiseOrFail
|
|
||||||
encode = serialise
|
|
||||||
|
|
||||||
|
|
||||||
type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake
|
|
||||||
type instance SessionData Fake (BlockChunks Fake) = BlockDownload
|
|
||||||
|
|
||||||
newtype instance SessionKey Fake (BlockChunks Fake) =
|
|
||||||
DownloadSessionKey (Peer Fake, Cookie Fake)
|
|
||||||
deriving newtype (Eq, Hashable)
|
|
||||||
deriving stock (Generic)
|
|
||||||
|
|
||||||
newtype BlockSizeSession e =
|
|
||||||
BlockSizeSession
|
|
||||||
{ _bsBlockSizes :: Map (Peer e) Size
|
|
||||||
}
|
|
||||||
|
|
||||||
makeLenses 'BlockSizeSession
|
|
||||||
|
|
||||||
instance Ord (Peer e) => Default (BlockSizeSession e) where
|
|
||||||
def = BlockSizeSession mempty
|
|
||||||
|
|
||||||
deriving stock instance Show (BlockSizeSession Fake)
|
|
||||||
|
|
||||||
|
|
||||||
main :: IO ()
|
|
||||||
main = do
|
|
||||||
hSetBuffering stderr LineBuffering
|
|
||||||
test1
|
|
||||||
|
|
||||||
-- defaultMain $
|
|
||||||
-- testGroup "root"
|
|
||||||
-- [
|
|
||||||
-- testCase "test1" test1
|
|
||||||
-- ]
|
|
||||||
|
|
||||||
|
|
||||||
-- TODO: абстрактные нотификации, т.к это всё типизируется
|
|
||||||
-- по ключу-значению
|
|
||||||
|
|
||||||
data PeerEvents e (m :: Type -> Type) =
|
|
||||||
PeerEvents
|
|
||||||
{ onBlockSize :: TVar (Map (Hash HbSync) [HasBlockEvent HbSync e m])
|
|
||||||
, onBlockReady :: TVar (Map (Hash HbSync) [OnBlockReady HbSync m])
|
|
||||||
}
|
|
||||||
|
|
||||||
newPeerEventsIO :: forall e m . MonadIO m => IO (PeerEvents e m)
|
|
||||||
newPeerEventsIO = PeerEvents <$> newTVarIO mempty
|
|
||||||
<*> newTVarIO mempty
|
|
||||||
|
|
||||||
addBlockSizeEventNotify :: forall e m . (MonadIO m)
|
|
||||||
=> PeerEvents e m
|
|
||||||
-> Hash HbSync
|
|
||||||
-> HasBlockEvent HbSync e m
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
addBlockSizeEventNotify pe h e = do
|
|
||||||
void $ liftIO $ atomically $ modifyTVar' (onBlockSize pe) (Map.insertWith (<>) h [e])
|
|
||||||
|
|
||||||
addBlockReadyEventNotify :: forall e m . (MonadIO m)
|
|
||||||
=> PeerEvents e m
|
|
||||||
-> Hash HbSync
|
|
||||||
-> OnBlockReady HbSync m
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
addBlockReadyEventNotify pe h e = do
|
|
||||||
void $ liftIO $ atomically $ modifyTVar' (onBlockReady pe) (Map.insertWith (<>) h [e])
|
|
||||||
|
|
||||||
emitBlockSizeEvent :: forall e m . MonadIO m
|
|
||||||
=> PeerEvents e m
|
|
||||||
-> Hash HbSync
|
|
||||||
-> (Peer e, Hash HbSync, Maybe Integer)
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
emitBlockSizeEvent pe h event = do
|
|
||||||
ev <- liftIO $ atomically $ stateTVar (onBlockSize pe) alter
|
|
||||||
for_ ev $ \e -> e event
|
|
||||||
|
|
||||||
where
|
|
||||||
alter m =
|
|
||||||
let ev = Map.lookup h m
|
|
||||||
in (mconcat (maybeToList ev), Map.delete h m)
|
|
||||||
|
|
||||||
|
|
||||||
emitBlockReadyEvent :: forall e m . MonadIO m
|
|
||||||
=> PeerEvents e m
|
|
||||||
-> Hash HbSync
|
|
||||||
-> m ()
|
|
||||||
|
|
||||||
emitBlockReadyEvent pe h = do
|
|
||||||
ev <- liftIO $ atomically $ stateTVar (onBlockReady pe) alter
|
|
||||||
for_ ev $ \e -> e h
|
|
||||||
|
|
||||||
where
|
|
||||||
alter m =
|
|
||||||
let ev = Map.lookup h m
|
|
||||||
in (mconcat (maybeToList ev), Map.delete h m)
|
|
||||||
|
|
||||||
|
|
||||||
-- Бежал ридер по ридеру, видит ридер сидит на ридере
|
|
||||||
-- ридер. Схватил ридер ридера за хуй да и выкинул нахуй.
|
|
||||||
--
|
|
||||||
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT () (EngineM e m) a }
|
|
||||||
deriving newtype ( Functor
|
|
||||||
, Applicative
|
|
||||||
, Monad
|
|
||||||
, MonadIO
|
|
||||||
, MonadReader ()
|
|
||||||
)
|
|
||||||
|
|
||||||
-- instance MonadTrans (PeerM e) where
|
|
||||||
-- lift = lift . lift
|
|
||||||
|
|
||||||
|
|
||||||
runPeerM p0 bus f = do
|
|
||||||
env <- newEnv p0 bus
|
|
||||||
runEngineM env (runReaderT (fromPeerM f) ())
|
|
||||||
|
|
||||||
|
|
||||||
instance Request Fake (BlockSize Fake) (PeerM e IO) where
|
|
||||||
request p proto = undefined
|
|
||||||
|
|
||||||
|
|
||||||
runFakePeer :: forall e b . ( e ~ Fake
|
|
||||||
-- , MonadIO m
|
|
||||||
, Messaging b e ByteString
|
|
||||||
-- , Monad m
|
|
||||||
-- , Sessions Fake (BlockSize Fake)
|
|
||||||
-- , m ~ ResponseM Fake IO
|
|
||||||
-- , MonadIO m
|
|
||||||
-- , Response e p m
|
|
||||||
-- , EngineM e m
|
|
||||||
)
|
|
||||||
=> PeerEvents e (EngineM e IO)
|
|
||||||
-> Peer e
|
|
||||||
-> b
|
|
||||||
-> PeerM e IO ()
|
|
||||||
-> IO ()
|
|
||||||
|
|
||||||
runFakePeer ev p0 bus work = do
|
|
||||||
|
|
||||||
env <- newEnv p0 bus
|
|
||||||
|
|
||||||
let pid = fromIntegral (hash (env ^. self)) :: Word8
|
|
||||||
|
|
||||||
dir <- liftIO $ canonicalizePath ( ".peers" </> show pid)
|
|
||||||
|
|
||||||
let chDir = dir </> "tmp-chunks"
|
|
||||||
|
|
||||||
liftIO $ createDirectoryIfMissing True dir
|
|
||||||
|
|
||||||
let opts = [ StoragePrefix dir
|
|
||||||
]
|
|
||||||
|
|
||||||
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
|
|
||||||
|
|
||||||
w <- liftIO $ async $ simpleStorageWorker storage
|
|
||||||
|
|
||||||
cww <- newChunkWriterIO storage (Just chDir)
|
|
||||||
|
|
||||||
cw <- async $ runChunkWriter cww
|
|
||||||
|
|
||||||
let size = 1024*1024
|
|
||||||
|
|
||||||
let blk = B8.concat [ fromString (take 1 $ show x)
|
|
||||||
| x <- replicate size (fromIntegral pid :: Int)
|
|
||||||
]
|
|
||||||
|
|
||||||
root <- putAsMerkle storage blk
|
|
||||||
|
|
||||||
debug $ "I'm" <+> pretty pid <+> pretty root
|
|
||||||
|
|
||||||
let handleBlockInfo (p, h, sz') = do
|
|
||||||
maybe1 sz' (pure ()) $ \sz -> do
|
|
||||||
let bsz = fromIntegral sz
|
|
||||||
|
|
||||||
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
|
|
||||||
lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit
|
|
||||||
|
|
||||||
let adapter =
|
|
||||||
BlockChunksI
|
|
||||||
{ blkSize = hasBlock storage
|
|
||||||
, blkChunk = getChunk storage
|
|
||||||
, blkGetHash = \c -> find (DownloadSessionKey c) (view sBlockHash)
|
|
||||||
|
|
||||||
-- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК):
|
|
||||||
-- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ
|
|
||||||
-- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО:
|
|
||||||
-- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ
|
|
||||||
-- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ?
|
|
||||||
-- УДАЛЯЕМ КУКУ?
|
|
||||||
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
|
|
||||||
|
|
||||||
let cKey = DownloadSessionKey (p,c)
|
|
||||||
|
|
||||||
-- check if there is a session
|
|
||||||
-- FIXME:
|
|
||||||
-- TODO: log situation when no session
|
|
||||||
dwnld <- MaybeT $ find cKey id
|
|
||||||
|
|
||||||
let bslen = fromIntegral $ B8.length bs
|
|
||||||
|
|
||||||
let mbSize = view sBlockSize dwnld
|
|
||||||
let mbChSize = view sBlockChunkSize dwnld
|
|
||||||
|
|
||||||
let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset
|
|
||||||
|
|
||||||
liftIO $ do
|
|
||||||
writeChunk cww cKey h offset0 bs
|
|
||||||
|
|
||||||
let written = view sBlockWritten dwnld + bslen
|
|
||||||
let maxOff = max offset0 (view sBlockOffset dwnld)
|
|
||||||
|
|
||||||
lift $ update dwnld cKey ( set sBlockOffset maxOff
|
|
||||||
. set sBlockWritten written
|
|
||||||
)
|
|
||||||
|
|
||||||
let mbDone = (maxOff + fromIntegral mbChSize) > fromIntegral mbSize
|
|
||||||
&& written >= mbSize
|
|
||||||
|
|
||||||
when mbDone $ lift do
|
|
||||||
deferred (Proxy @(BlockChunks e)) $ do
|
|
||||||
h1 <- liftIO $ getHash cww cKey h
|
|
||||||
|
|
||||||
-- ПОСЧИТАТЬ ХЭШ
|
|
||||||
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
|
|
||||||
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
|
|
||||||
when ( h1 == h ) $ do
|
|
||||||
liftIO $ commitBlock cww cKey h
|
|
||||||
expire cKey
|
|
||||||
lift $ runEngineM env $ emitBlockReadyEvent ev h -- TODO: fix this crazy shit
|
|
||||||
|
|
||||||
when (written > mbSize * defBlockDownloadThreshold) $ do
|
|
||||||
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p
|
|
||||||
lift $ expire cKey
|
|
||||||
-- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ,
|
|
||||||
-- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ
|
|
||||||
-- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ.
|
|
||||||
-- ТАК НЕ ПОЙДЕТ
|
|
||||||
-- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся
|
|
||||||
}
|
|
||||||
|
|
||||||
peer <- async $ runPeer env
|
|
||||||
[ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo)
|
|
||||||
, makeResponse (blockChunksProto adapter)
|
|
||||||
]
|
|
||||||
|
|
||||||
runPeerM p0 bus work
|
|
||||||
|
|
||||||
simpleStorageStop storage
|
|
||||||
|
|
||||||
stopChunkWriter cww
|
|
||||||
|
|
||||||
mapM_ cancel [w,cw,peer]
|
|
||||||
|
|
||||||
|
|
||||||
-- TODO: замутить мап/кэш со статистикой по блоку:
|
|
||||||
-- сколько блок там маринуется и т.п.
|
|
||||||
-- Если блок в этом кэше и еще не скачан, то
|
|
||||||
-- ... пробуем качать повторно?
|
|
||||||
-- ... увеличиваем время
|
|
||||||
-- ... если не появилось новых пиров
|
|
||||||
-- ... запоминать, у какого пира уже спрашивали и стараться
|
|
||||||
-- ... спрашивать у других?
|
|
||||||
-- ... для каждого блока - вести список, у кого лучше спрашивать?
|
|
||||||
-- ... и там whilelist, blacklist
|
|
||||||
-- ... не дохрена ли это будет занимать?
|
|
||||||
--
|
|
||||||
-- ... и тут, короче, еще кэш WiP
|
|
||||||
-- ... и еще один поток, который это всё хэндлит, например:
|
|
||||||
-- ... берём статистику блоков, берём wip
|
|
||||||
-- ... если блок не wip и до сих пор в мапе --- то то добавляем
|
|
||||||
-- ... в очередь.
|
|
||||||
--
|
|
||||||
-- ... блоку пишем, у каких пиров уже спрашивали (Set)
|
|
||||||
-- ... блоку пишем, когда стартовал процесс
|
|
||||||
--
|
|
||||||
--
|
|
||||||
|
|
||||||
|
|
||||||
blockDownloadLoop :: forall e . PeerM e IO ()
|
|
||||||
blockDownloadLoop = do
|
|
||||||
|
|
||||||
let who = FakePeer 1
|
|
||||||
let blkHash = ""
|
|
||||||
request who (GetBlockSize @Fake blkHash)
|
|
||||||
|
|
||||||
pure ()
|
|
||||||
|
|
||||||
|
|
||||||
-- blockDownloadLoop :: PeerEvents Fake m -> Peer e -> PeerM () m ()
|
|
||||||
blockDownloadLoop1 ev0 p1 = do
|
|
||||||
|
|
||||||
|
|
||||||
let ini = [ "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"
|
|
||||||
, "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"
|
|
||||||
]
|
|
||||||
|
|
||||||
blkQ <- liftIO $ do
|
|
||||||
b <- newTBQueueIO defBlockDownloadQ
|
|
||||||
traverse_ (atomically . TBQ.writeTBQueue b) ini
|
|
||||||
pure b
|
|
||||||
|
|
||||||
|
|
||||||
-- TODO: random shuffle and take X
|
|
||||||
-- подтягиваем новых пиров откуда можем
|
|
||||||
-- для каждого блока решаем, откуда брать:
|
|
||||||
-- shuffle (white-list) <> shuffle (black-list)
|
|
||||||
--
|
|
||||||
--
|
|
||||||
let knownPeers = [p1]
|
|
||||||
|
|
||||||
fix \next -> do
|
|
||||||
|
|
||||||
-- Вечно ждём. Это и правильно и неправильно
|
|
||||||
|
|
||||||
blkHash <- liftIO $ atomically $ TBQ.readTBQueue blkQ
|
|
||||||
|
|
||||||
-- TODO: check is this block is already here
|
|
||||||
-- maybe emit event to continue -> parse/seek for content
|
|
||||||
|
|
||||||
-- TODO: убивать нотификации, если блок скачан или что-то с ним еще
|
|
||||||
-- приключилось
|
|
||||||
--
|
|
||||||
-- добавляем сюда экшоны на почистить:
|
|
||||||
-- добавили нотификацию --- экшон на
|
|
||||||
-- почистить нотификацию
|
|
||||||
--
|
|
||||||
-- добавили еще какую парашу -- экшон на
|
|
||||||
-- её очистку
|
|
||||||
--
|
|
||||||
-- у каждого экшона - дедлайн
|
|
||||||
-- и там процесс, который берёт тех, у кого дедлайн
|
|
||||||
-- истёк и вызывает их
|
|
||||||
-- ?
|
|
||||||
|
|
||||||
addBlockReadyEventNotify ev0 blkHash $ \hash -> do
|
|
||||||
debug $ "DOWNLOADED BLOCK" <+> pretty hash <+> "NOW WHAT?"
|
|
||||||
|
|
||||||
-- ВЫКОВЫРЯТЬ СТОРЕЙДЖ (как?)
|
|
||||||
-- ЗАСУНУТЫЙ В READER?
|
|
||||||
|
|
||||||
obj <- undefined -- getBlock ss hash
|
|
||||||
|
|
||||||
let mbLink = deserialiseOrFail @AnnotatedHashRef obj
|
|
||||||
|
|
||||||
pure ()
|
|
||||||
|
|
||||||
|
|
||||||
-- -- TODO: смотрим, что за блок
|
|
||||||
-- -- если Merkle - то качаем рекурсивно
|
|
||||||
-- -- если ссылка - то смотрим, что за ссылка
|
|
||||||
-- -- проверяем пруфы
|
|
||||||
-- -- качаем рекурсивно
|
|
||||||
|
|
||||||
-- TODO: надо трекать, может блок-то и найден
|
|
||||||
-- либо по всем пирам спросить
|
|
||||||
|
|
||||||
addBlockSizeEventNotify ev0 blkHash $ \case
|
|
||||||
(p, h, Just size) -> do
|
|
||||||
coo <- genCookie (p,blkHash)
|
|
||||||
let key = DownloadSessionKey (p, coo)
|
|
||||||
let chusz = defChunkSize
|
|
||||||
|
|
||||||
let new = set sBlockChunkSize chusz
|
|
||||||
. set sBlockSize (fromIntegral size)
|
|
||||||
$ newBlockDownload blkHash
|
|
||||||
|
|
||||||
update @Fake new key id
|
|
||||||
request p (BlockChunks coo (BlockGetAllChunks @Fake blkHash chusz)) -- FIXME: nice construction
|
|
||||||
|
|
||||||
_ -> pure ()
|
|
||||||
|
|
||||||
-- TODO: смотрим, может у нас уже есть block-size
|
|
||||||
-- тогда ловим случайного пира, у которого оно есть
|
|
||||||
-- и ставим на закачку
|
|
||||||
|
|
||||||
-- КТО ПЕРВЫЙ ВСТАЛ ТОГО И ТАПКИ
|
|
||||||
for_ knownPeers $ \who ->
|
|
||||||
request who (GetBlockSize @Fake blkHash)
|
|
||||||
|
|
||||||
next
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
test1 :: IO ()
|
|
||||||
test1 = do
|
|
||||||
|
|
||||||
hSetBuffering stderr LineBuffering
|
|
||||||
|
|
||||||
fake <- newFakeP2P True
|
|
||||||
|
|
||||||
void $ race (pause (10 :: Timeout 'Seconds)) $ do
|
|
||||||
|
|
||||||
let p0 = 0 :: Peer Fake
|
|
||||||
let p1 = 1 :: Peer Fake
|
|
||||||
|
|
||||||
ev1 <- newPeerEventsIO
|
|
||||||
ev0 <- newPeerEventsIO
|
|
||||||
|
|
||||||
p1Thread <- async $ runFakePeer ev1 p1 fake $ forever $ liftIO yield
|
|
||||||
|
|
||||||
p0Thread <- async $ runFakePeer ev0 p0 fake $ do
|
|
||||||
blockDownloadLoop
|
|
||||||
|
|
||||||
-- blockDownloadLoop ev0 p1
|
|
||||||
|
|
||||||
let peerz = p0Thread : [p1Thread]
|
|
||||||
|
|
||||||
|
|
||||||
pause ( 5 :: Timeout 'Seconds)
|
|
||||||
|
|
||||||
mapM_ cancel peerz
|
|
||||||
|
|
||||||
(_, e) <- waitAnyCatchCancel peerz
|
|
||||||
|
|
||||||
debug (pretty $ show e)
|
|
||||||
debug "we're done"
|
|
||||||
assertBool "success" True
|
|
||||||
exitSuccess
|
|
||||||
|
|
||||||
assertBool "failed" False
|
|
||||||
|
|
Loading…
Reference in New Issue