From 4851f4b0ddd88c719a6adae6b6b33825f71d8987 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 2 Feb 2023 15:05:06 +0300 Subject: [PATCH] peer credentials --- .gitignore | 1 + cabal.project | 3 + docs/devlog.md | 15 + docs/issues/block-download-loop | 33 + docs/issues/memory-leak-on-download | 11 + .../no-retry-after-sleep-on-queue-exhaustion | 8 + docs/issues/no-retry-stalled-blocks | 5 + docs/issues/no-sweep-stalled-blocks | 19 + flake.lock | 28 +- flake.nix | 16 +- hbs2-core/hbs2-core.cabal | 80 ++- hbs2-core/lib/HBS2/Actors/ChunkWriter.hs | 13 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 71 +- hbs2-core/lib/HBS2/Base58.hs | 19 + hbs2-core/lib/HBS2/Clock.hs | 8 +- hbs2-core/lib/HBS2/Data/Types.hs | 3 +- hbs2-core/lib/HBS2/Data/Types/Crypto.hs | 4 + hbs2-core/lib/HBS2/Defaults.hs | 41 +- hbs2-core/lib/HBS2/Hash.hs | 12 +- hbs2-core/lib/HBS2/Net/Auth/Credentials.hs | 82 +++ hbs2-core/lib/HBS2/Net/IP/Addr.hs | 90 +++ hbs2-core/lib/HBS2/Net/Messaging.hs | 6 +- hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 174 +++++ hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs | 37 +- hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs | 1 + hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 92 +++ hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 108 +++ hbs2-core/lib/HBS2/Net/Proto/Sessions.hs | 2 +- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 49 +- hbs2-core/lib/HBS2/OrDie.hs | 21 + hbs2-core/lib/HBS2/Prelude.hs | 21 +- hbs2-core/lib/HBS2/Storage.hs | 2 +- hbs2-core/test/TestUniqProtoId.hs | 147 ---- hbs2-peer/CHANGELOG.md | 5 + hbs2-peer/LICENSE | 30 + hbs2-peer/app/BlockDownload.hs | 427 +++++++++++ hbs2-peer/app/Logger.hs | 11 + hbs2-peer/app/PeerInfo.hs | 39 + hbs2-peer/app/PeerMain.hs | 322 +++++++++ hbs2-peer/app/RPC.hs | 92 +++ hbs2-peer/hbs2-peer.cabal | 113 +++ .../lib/HBS2/Storage/Simple.hs | 19 +- hbs2-tests/hbs2-tests.cabal | 52 +- hbs2-tests/test/Peer2Main.hs | 673 ++++++++++-------- hbs2-tests/test/TestChunkWriter.hs | 1 - hbs2-tests/test/TestUDP.hs | 103 +++ hbs2/Main.hs | 44 +- hbs2/hbs2.cabal | 1 + hie.yaml | 15 - 49 files changed, 2584 insertions(+), 585 deletions(-) create mode 100644 docs/devlog.md create mode 100644 docs/issues/block-download-loop create mode 100644 docs/issues/memory-leak-on-download create mode 100644 docs/issues/no-retry-after-sleep-on-queue-exhaustion create mode 100644 docs/issues/no-retry-stalled-blocks create mode 100644 docs/issues/no-sweep-stalled-blocks create mode 100644 hbs2-core/lib/HBS2/Base58.hs create mode 100644 hbs2-core/lib/HBS2/Data/Types/Crypto.hs create mode 100644 hbs2-core/lib/HBS2/Net/Auth/Credentials.hs create mode 100644 hbs2-core/lib/HBS2/Net/IP/Addr.hs create mode 100644 hbs2-core/lib/HBS2/Net/Messaging/UDP.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/Definition.hs create mode 100644 hbs2-core/lib/HBS2/Net/Proto/Peer.hs create mode 100644 hbs2-core/lib/HBS2/OrDie.hs delete mode 100644 hbs2-core/test/TestUniqProtoId.hs create mode 100644 hbs2-peer/CHANGELOG.md create mode 100644 hbs2-peer/LICENSE create mode 100644 hbs2-peer/app/BlockDownload.hs create mode 100644 hbs2-peer/app/Logger.hs create mode 100644 hbs2-peer/app/PeerInfo.hs create mode 100644 hbs2-peer/app/PeerMain.hs create mode 100644 hbs2-peer/app/RPC.hs create mode 100644 hbs2-peer/hbs2-peer.cabal create mode 100644 hbs2-tests/test/TestUDP.hs diff --git a/.gitignore b/.gitignore index 0753cd2d..f72ad380 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ dist-newstyle .direnv/ +hbs2.prof diff --git a/cabal.project b/cabal.project index c7bc5cde..4ad57f5d 100644 --- a/cabal.project +++ b/cabal.project @@ -1,4 +1,7 @@ packages: **/*.cabal + +-- allow-newer: all + -- executable-static: True -- profiling: True -- library-profiling: False diff --git a/docs/devlog.md b/docs/devlog.md new file mode 100644 index 00000000..e183a35d --- /dev/null +++ b/docs/devlog.md @@ -0,0 +1,15 @@ + +## 2023-02-01 + +Вести баги это слишком формально и накладно, даже в упрощенном виде. +Можно вести их в виде девлога. + +FIXME: Обработка ошибок в асинхронном приложении. + Async-и жрут исключения, даже, когда удаётся + их перехватить и пробросить дальше. + Например, если не удалось забиндиться на адрес, + исключение стреляет, но код ошибки при выходе + всё еще 0. + + + diff --git a/docs/issues/block-download-loop b/docs/issues/block-download-loop new file mode 100644 index 00000000..c12e89f7 --- /dev/null +++ b/docs/issues/block-download-loop @@ -0,0 +1,33 @@ +title: block-download-loop +status: open + + +Качать блоки по 500 штук за раз идея была +красивая, но работает плохо даже на localhost. + +Вероятно, нужно качать пачками по N штук +и перезапрашивать отдельные чанки, а не блок +целиком. + +Так же, может быть можно качать блок сразу +от нескольких пиров. + +Соотношение in/out нужно как-то регулировать. + +Либо же решает сам передатчик, по скольку кусков +за раз посылать. + +Возможно, нужно ввести явную очередь на отправку +и посылать по N пакетов за раз, что бы не переполнять +очереди сокетов. + +Возможно, с этого стоит начать. + +Стоит так же отметить, что сейчас у нас по одному +сокету на пира, через которых идёт вообще весь трафик. + +Надо попробовать буферизовать отправку ответов. + + + + diff --git a/docs/issues/memory-leak-on-download b/docs/issues/memory-leak-on-download new file mode 100644 index 00000000..1ba129a9 --- /dev/null +++ b/docs/issues/memory-leak-on-download @@ -0,0 +1,11 @@ +title: memory-leak-on-download +status: open + + +Выжирает огромное количество памяти при скачивании +и не отдаёт обратно. + +Возможно, это очереди. + +Возможно, накапливать чанки в памяти --- плохая +идея. diff --git a/docs/issues/no-retry-after-sleep-on-queue-exhaustion b/docs/issues/no-retry-after-sleep-on-queue-exhaustion new file mode 100644 index 00000000..bd46952d --- /dev/null +++ b/docs/issues/no-retry-after-sleep-on-queue-exhaustion @@ -0,0 +1,8 @@ +title: no-retry-after-sleep-on-queue-exhaustion +status: fixed + + +Когда в очереди слишком много блоков на скачивание и +мы уходим в sleep пока буфер не уменьшится, нужно +возвращать очередной блок обратно в очередь, иначе +не будет повторного скачивания. diff --git a/docs/issues/no-retry-stalled-blocks b/docs/issues/no-retry-stalled-blocks new file mode 100644 index 00000000..2790cf2d --- /dev/null +++ b/docs/issues/no-retry-stalled-blocks @@ -0,0 +1,5 @@ +title: no-retry-stalled-blocks +status: fixed + +При использовании UDP почему-то не запрашиваются повторно +повисшие блоки. diff --git a/docs/issues/no-sweep-stalled-blocks b/docs/issues/no-sweep-stalled-blocks new file mode 100644 index 00000000..7470a89d --- /dev/null +++ b/docs/issues/no-sweep-stalled-blocks @@ -0,0 +1,19 @@ +title: no-sweep-stalled-blocks +status: open + +В случае, если часть чанков не пришла или не была обработана +и блок завис в очереди --- этот блок никогда не убирается +из очереди. + +Нужен механизм наподобие LRU, когда в случае отсутствия активности +в течение времени блоки удаляются из всех очередей и отправляются +повторно выкачиваться. + +Вероятно, нужно убрать данные блока из СhunkWriter и держать +его в сессии (?). + +Вероятно, нужно добавить битовую карту пришедших блоков --- +это всего + ~ 68 байт при условии, что размер блока 256K. + + + diff --git a/flake.lock b/flake.lock index b4ea0fae..16d06a9f 100644 --- a/flake.lock +++ b/flake.lock @@ -44,8 +44,8 @@ }, "original": { "owner": "ivanovs-4", + "ref": "master", "repo": "haskell-flake-utils", - "rev": "896219e5bde6efac72198550454e9dd9b5ed9ac9", "type": "github" } }, @@ -91,11 +91,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1674407282, - "narHash": "sha256-2qwc8mrPINSFdWffPK+ji6nQ9aGnnZyHSItVcYDZDlk=", + "lastModified": 1675237434, + "narHash": "sha256-YoFR0vyEa1HXufLNIFgOGhIFMRnY6aZ0IepZF5cYemo=", "owner": "nixos", "repo": "nixpkgs", - "rev": "ab1254087f4cdf4af74b552d7fc95175d9bdbb49", + "rev": "285b3ff0660640575186a4086e1f8dc0df2874b5", "type": "github" }, "original": { @@ -109,7 +109,25 @@ "inputs": { "haskell-flake-utils": "haskell-flake-utils", "hspup": "hspup", - "nixpkgs": "nixpkgs" + "nixpkgs": "nixpkgs", + "saltine": "saltine" + } + }, + "saltine": { + "flake": false, + "locked": { + "lastModified": 1651348885, + "narHash": "sha256-0guvfkdOrofElDildQWE8QDwh+T/u2WY3HVYmOu4g3w=", + "owner": "tel", + "repo": "saltine", + "rev": "3d3a54cf46f78b71b4b55653482fb6f4cee6b77d", + "type": "github" + }, + "original": { + "owner": "tel", + "repo": "saltine", + "rev": "3d3a54cf46f78b71b4b55653482fb6f4cee6b77d", + "type": "github" } } }, diff --git a/flake.nix b/flake.nix index 619deb98..19302ad5 100644 --- a/flake.nix +++ b/flake.nix @@ -5,9 +5,15 @@ inputs = { nixpkgs.url = "github:nixos/nixpkgs/nixos-22.11"; # haskell-flake-utils.url = "github:ivanovs-4/haskell-flake-utils"; - haskell-flake-utils.url = "github:ivanovs-4/haskell-flake-utils/896219e5bde6efac72198550454e9dd9b5ed9ac9"; + haskell-flake-utils.url = "github:ivanovs-4/haskell-flake-utils/master"; hspup.url = "github:voidlizard/hspup"; hspup.inputs.nixpkgs.follows = "nixpkgs"; + + saltine = { + url = "github:tel/saltine/3d3a54cf46f78b71b4b55653482fb6f4cee6b77d"; + flake = false; + }; + }; outputs = { self, nixpkgs, haskell-flake-utils, ... }@inputs: @@ -17,6 +23,9 @@ outputs = { self, nixpkgs, haskell-flake-utils, ... }@inputs: systems = [ "x86_64-linux" ]; name = "hbs2"; + haskellFlakes = with inputs; [ + ]; + packageNames = [ "hbs2" "hbs2-core" @@ -29,8 +38,12 @@ outputs = { self, nixpkgs, haskell-flake-utils, ... }@inputs: "hbs2-tests" = "./hbs2-tests"; "hbs2-core" = "./hbs2-core"; "hbs2-storage-simple" = "./hbs2-storage-simple"; + "hbs2-peer" = "./hbs2-peer"; }; + hpPreOverrides = {pkgs, ...}: final: prev: with pkgs; { + saltine = prev.callCabal2nix "saltine" inputs.saltine { inherit (pkgs) libsodium; }; + }; packagePostOverrides = { pkgs }: with pkgs; with haskell.lib; [ disableExecutableProfiling @@ -49,6 +62,7 @@ outputs = { self, nixpkgs, haskell-flake-utils, ... }@inputs: shellExtBuildInputs = {pkgs}: with pkgs; [ haskellPackages.haskell-language-server + pkg-config inputs.hspup.packages.${pkgs.system}.default ]; diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index d84c1c5f..e8240993 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -67,24 +67,32 @@ library HBS2.Actors , HBS2.Actors.ChunkWriter , HBS2.Actors.Peer + , HBS2.Base58 , HBS2.Clock , HBS2.Data.Detect , HBS2.Data.Types , HBS2.Data.Types.Refs + , HBS2.Data.Types.Crypto , HBS2.Defaults , HBS2.Events , HBS2.Hash , HBS2.Merkle + , HBS2.Net.Auth.Credentials + , HBS2.Net.IP.Addr , HBS2.Net.Messaging , HBS2.Net.Messaging.Fake + , HBS2.Net.Messaging.UDP , HBS2.Net.PeerLocator , HBS2.Net.PeerLocator.Static , HBS2.Net.Proto + , HBS2.Net.Proto.BlockAnnounce , HBS2.Net.Proto.BlockChunks , HBS2.Net.Proto.BlockInfo - , HBS2.Net.Proto.BlockAnnounce + , HBS2.Net.Proto.Definition , HBS2.Net.Proto.Sessions + , HBS2.Net.Proto.Peer , HBS2.Net.Proto.Types + , HBS2.OrDie , HBS2.Prelude , HBS2.Prelude.Plated , HBS2.Storage @@ -93,37 +101,45 @@ library -- other-modules: -- other-extensions: build-depends: base ^>=4.15.1.0 - , aeson - , async - , base58-bytestring - , binary - , bytestring - , cache - , cborg - , clock - , containers - , cryptonite - , deepseq - , directory - , filepath - , filelock - , hashable - , interpolatedstring-perl6 - , memory - , microlens-platform - , mtl - , murmur-hash - , prettyprinter - , random - , safe - , serialise - , stm - , stm-chans - , text - , transformers - , temporary - , uniplate - , unordered-containers + , aeson + , async + , attoparsec + , base58-bytestring + , binary + , bytestring + , cache + , cborg + , clock + , containers + , cryptonite + , deepseq + , directory + , filelock + , filepath + , hashable + , interpolatedstring-perl6 + , memory + , microlens-platform + , mtl + , murmur-hash + , network + , network-multicast + , prettyprinter + , random + , random-shuffle + , safe + , saltine ^>=0.2.0.1 + , serialise + , sockaddr + , split + , stm + , stm-chans + , temporary + , text + , transformers + , uniplate + , unordered-containers + hs-source-dirs: lib default-language: Haskell2010 diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index 32f96534..99b29fb7 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -96,6 +96,7 @@ data ChunkWriter h m = forall a . ( MonadIO m } +-- FIXME: delete lost blocks! blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int blocksInProcess cw = do liftIO $ readTVarIO (perBlock cw) <&> HashMap.size @@ -152,10 +153,14 @@ newChunkWriterIO s _ = do } -delBlock :: (MonadIO m, Pretty (Hash h)) - => ChunkWriter h IO -> SKey -> m () +delBlock :: (MonadIO m, ChunkKey salt h, Pretty (Hash h)) + => ChunkWriter h IO + -> salt + -> Hash h + -> m () -delBlock w k = liftIO do +delBlock w salt h = liftIO do + let k = newSKey (salt, h) let cache = perBlock w liftIO $ atomically $ TV.modifyTVar' cache $ HashMap.delete k @@ -253,7 +258,7 @@ commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k case chunk of - Just (S s) -> void $ putBlock stor s >> delBlock w k + Just (S s) -> void $ putBlock stor s >> delBlock w salt h _ -> pure () -- FIXME: error diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index c26b6549..f0f60bd5 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,5 +1,6 @@ {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} +{-# Language FunctionalDependencies #-} -- {-# Language AllowAmbiguousTypes #-} module HBS2.Actors.Peer where @@ -70,6 +71,20 @@ class Messaging (Fabriq e) e (AnyMessage (Encoded e) e) => PeerMessaging e instance Messaging (Fabriq e) e (AnyMessage (Encoded e) e) => PeerMessaging e +class ( Eq (SessionKey e a) + , Hashable (SessionKey e a) + , Typeable (SessionData e a) + , Typeable (SessionKey e a) + , Expires (SessionKey e a) + ) => PeerSessionKey e a + +instance ( Eq (SessionKey e a) + , Hashable (SessionKey e a) + , Typeable (SessionData e a) + , Typeable (SessionKey e a) + , Expires (SessionKey e a) + ) + => PeerSessionKey e a instance (HasPeer e, Encoded e ~ ByteString) => Messaging (Fabriq e) e (AnyMessage ByteString e) where sendTo (Fabriq bus) t f (AnyMessage n bs) = sendTo bus t f (serialise (n, bs)) @@ -79,7 +94,7 @@ instance (HasPeer e, Encoded e ~ ByteString) => Messaging (Fabriq e) e (AnyMessa r <- forM recv $ \(f, msg) -> case deserialiseOrFail msg of Right (n,bs) -> pure $ Just (f, AnyMessage n bs) - Left _ -> liftIO (print "FUCK!") >> pure Nothing -- FIXME what to do with undecoded messages? + Left _ -> pure Nothing -- FIXME what to do with undecoded messages? pure $ catMaybes r @@ -167,13 +182,17 @@ instance Monad m => HasFabriq e (PeerM e m) where instance Monad m => HasStorage (PeerM e m) where getStorage = asks (view envStorage) +-- instance Monad m => HasKeys 'Sign e (PeerM e m) where +-- getPrivateKey = asks (view (envCred . peerSignSk)) +-- getPublicKey = asks (view (envCred . peerSignPk)) instance ( MonadIO m - , HasProtocol e p + -- , HasProtocol e p , Eq (SessionKey e p) , Typeable (SessionKey e p) , Typeable (SessionData e p) , Hashable (SessionKey e p) + , Expires (SessionKey e p) ) => Sessions e p (PeerM e m) where @@ -192,16 +211,19 @@ instance ( MonadIO m r <- liftIO $ Cache.lookup se sk + let ts = expiresIn (Proxy @(SessionKey e p)) <&> toTimeSpec + case r of Just v -> pure $ fn $ fromMaybe de (fromDynamic @(SessionData e p) v ) Nothing -> do - when upd $ liftIO $ Cache.insert se sk ddef + when upd $ liftIO $ Cache.insert' se ts sk ddef pure (fn de) update de k f = do se <- asks (view envSessions) val <- fetch @e @p True de k id - liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) + let ts = expiresIn (Proxy @(SessionKey e p)) <&> toTimeSpec + liftIO $ Cache.insert' se ts (newSKey @(SessionKey e p) k) (toDyn (f val)) expire k = do se <- asks (view envSessions) @@ -210,9 +232,10 @@ instance ( MonadIO m instance ( MonadIO m , HasProtocol e p - , HasFabriq e (PeerM e m) - , Messaging (Fabriq e) e (AnyMessage (Encoded e) e) - ) => Request e p (PeerM e m) where + , HasFabriq e m -- (PeerM e m) + , HasOwnPeer e m + , PeerMessaging e + ) => Request e p m where request p msg = do let proto = protoId @e @p (Proxy @p) pipe <- getFabriq @e @@ -349,41 +372,48 @@ runProto hh = do forever $ do - messages <- receive pipe (To me) + messages <- receive @_ @e pipe (To me) for_ messages $ \(From pip, AnyMessage n msg :: AnyMessage (Encoded e) e) -> do case Map.lookup n disp of - Nothing -> pure () + Nothing -> liftIO $ print "SHIT!" >> pure () Just (AnyProtocol { protoDecode = decoder , handle = h }) -> maybe (pure ()) (runResponseM pip . h) (decoder msg) + +instance (Monad m, HasProtocol e p) => HasThatPeer e p (ResponseM e m) where + thatPeer _ = asks (view answTo) + +instance HasProtocol e p => HasDeferred e p (ResponseM e (PeerM e IO)) where + deferred _ action = do + who <- asks (view answTo) + pip <- lift $ asks (view envDeferred) + env <- lift ask + liftIO $ addJob pip $ withPeerM env (runResponseM who action) + -- void $ liftIO $ async $ withPeerM env (runResponseM who action) + instance ( HasProtocol e p , MonadTrans (ResponseM e) , HasStorage (PeerM e IO) , Pretty (Peer e) , PeerMessaging e - ) => Response e p (ResponseM e (PeerM e IO)) where - - thatPeer _ = asks (view answTo) - - deferred _ action = do - who <- asks (view answTo) - pip <- lift $ asks (view envDeferred) - env <- lift ask - liftIO $ addJob pip $ withPeerM env (runResponseM who action) + , HasOwnPeer e m + , HasFabriq e m + , MonadIO m + ) => Response e p (ResponseM e m) where response msg = do let proto = protoId @e @p (Proxy @p) - who <- asks (view answTo) + who <- thatPeer (Proxy @p) self <- lift $ ownPeer @e fab <- lift $ getFabriq @e sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto (encode msg)) instance ( MonadIO m - , HasProtocol e p + -- , HasProtocol e p , Sessions e p m , Eq (SessionKey e p) , Typeable (SessionKey e p) @@ -407,7 +437,6 @@ instance ( MonadIO m emit k d = lift $ emit k d - instance (Monad m, HasOwnPeer e m) => HasOwnPeer e (ResponseM e m) where ownPeer = lift ownPeer diff --git a/hbs2-core/lib/HBS2/Base58.hs b/hbs2-core/lib/HBS2/Base58.hs new file mode 100644 index 00000000..bcd5828e --- /dev/null +++ b/hbs2-core/lib/HBS2/Base58.hs @@ -0,0 +1,19 @@ +module HBS2.Base58 where + +import Data.ByteString.Base58 (encodeBase58, bitcoinAlphabet, decodeBase58,Alphabet(..)) +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Char8 (ByteString) + +alphabet :: Alphabet +alphabet = bitcoinAlphabet + +getAlphabet :: [Char] +getAlphabet = BS8.unpack (unAlphabet alphabet) + + +toBase58 :: ByteString -> ByteString +toBase58 = encodeBase58 bitcoinAlphabet + +fromBase58 :: ByteString -> Maybe ByteString +fromBase58 = decodeBase58 bitcoinAlphabet + diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs index 8d3d2362..6454aa09 100644 --- a/hbs2-core/lib/HBS2/Clock.hs +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -34,10 +34,10 @@ class IsTimeout a where toTimeSpec :: Timeout a -> TimeSpec toTimeSpec x = fromNanoSecs (fromIntegral (toNanoSeconds x)) -class IsTimeout a => MonadPause m a where +class IsTimeout a => MonadPause a m where pause :: Timeout a -> m () -instance (IsTimeout a, MonadIO m) => MonadPause m a where +instance (IsTimeout a, MonadIO m) => MonadPause a m where pause x = liftIO $ threadDelay (toMicroSeconds x) instance Pretty (Fixed E9) where @@ -68,4 +68,8 @@ instance IsTimeout 'Minutes where class Expires a where expiresIn :: Proxy a -> Maybe (Timeout 'Seconds) + -- FIXME: dangerous! + expiresIn _ = Nothing + + diff --git a/hbs2-core/lib/HBS2/Data/Types.hs b/hbs2-core/lib/HBS2/Data/Types.hs index 0b494977..a8d73005 100644 --- a/hbs2-core/lib/HBS2/Data/Types.hs +++ b/hbs2-core/lib/HBS2/Data/Types.hs @@ -1,11 +1,12 @@ module HBS2.Data.Types ( module HBS2.Hash , module HBS2.Data.Types.Refs + , module HBS2.Data.Types.Crypto ) where import HBS2.Hash import HBS2.Data.Types.Refs - +import HBS2.Data.Types.Crypto diff --git a/hbs2-core/lib/HBS2/Data/Types/Crypto.hs b/hbs2-core/lib/HBS2/Data/Types/Crypto.hs new file mode 100644 index 00000000..6bca1764 --- /dev/null +++ b/hbs2-core/lib/HBS2/Data/Types/Crypto.hs @@ -0,0 +1,4 @@ +module HBS2.Data.Types.Crypto where + +-- type SignPubKey = () +-- type EncryptPubKey = () diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 0d4e4f37..775177d9 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -3,44 +3,63 @@ module HBS2.Defaults where import HBS2.Clock import Data.String +defMaxDatagram :: Int +defMaxDatagram = 2048 + +defMaxDatagramRPC :: Int +defMaxDatagramRPC = 4096 + +defMessageQueueSize :: Integral a => a +defMessageQueueSize = 65536 + +defBurst :: Integral a => a +defBurst = 64 + -- defChunkSize :: Integer defChunkSize :: Integral a => a -defChunkSize = 500 +defChunkSize = 1024 defBlockSize :: Integer -defBlockSize = 256 * 1024 +defBlockSize = 256 * 1024 defStorePath :: IsString a => a defStorePath = "hbs2" defPipelineSize :: Int -defPipelineSize = 16000 +defPipelineSize = 16000*4 defChunkWriterQ :: Integral a => a -defChunkWriterQ = 16000 +defChunkWriterQ = 16000*4 defBlockDownloadQ :: Integral a => a -defBlockDownloadQ = 2000 +defBlockDownloadQ = 65536*4 defBlockDownloadThreshold :: Integral a => a defBlockDownloadThreshold = 2 -- typical block hash 530+ chunks * parallel wip blocks amount defProtoPipelineSize :: Int -defProtoPipelineSize = 2000 +defProtoPipelineSize = 65536*4 + +defCookieTimeoutSec :: Timeout 'Seconds +defCookieTimeoutSec = 120 defCookieTimeout :: TimeSpec -defCookieTimeout = toTimeSpec ( 300 :: Timeout 'Minutes) +defCookieTimeout = toTimeSpec defCookieTimeoutSec -defBlockInfoTimeout :: TimeSpec -defBlockInfoTimeout = toTimeSpec ( 300 :: Timeout 'Minutes) +defBlockInfoTimeout :: Timeout 'Seconds +defBlockInfoTimeout = 2 -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 300 :: Timeout 'Seconds +defBlockWaitMax = 3 :: Timeout 'Seconds + +-- how much time wait for block from peer? +defChunkWaitMax :: Timeout 'Seconds +defChunkWaitMax = 1 :: Timeout 'Seconds defSweepTimeout :: Timeout 'Seconds -defSweepTimeout = 5 -- FIXME: only for debug! +defSweepTimeout = 30 -- FIXME: only for debug! diff --git a/hbs2-core/lib/HBS2/Hash.hs b/hbs2-core/lib/HBS2/Hash.hs index ce254588..609efabf 100644 --- a/hbs2-core/lib/HBS2/Hash.hs +++ b/hbs2-core/lib/HBS2/Hash.hs @@ -5,12 +5,13 @@ module HBS2.Hash ) where +import HBS2.Base58 + import Codec.Serialise import Crypto.Hash hiding (SHA1) import Data.Aeson(FromJSON(..),ToJSON(..),Value(..)) import Data.Binary (Binary(..)) import Data.ByteArray qualified as BA -import Data.ByteString.Base58 (encodeBase58, bitcoinAlphabet, decodeBase58,Alphabet(..)) import Data.ByteString (ByteString) import Data.ByteString.Char8 qualified as BS8 import Data.ByteString.Lazy qualified as LBS @@ -51,11 +52,6 @@ newtype Internal a = Internal a class Hashed t a where hashObject :: a -> Hash t -alphabet :: Alphabet -alphabet = bitcoinAlphabet - -getAlphabet :: [Char] -getAlphabet = BS8.unpack (unAlphabet alphabet) instance Hashed HbSync ByteString where @@ -71,10 +67,10 @@ instance Hashed HbSync LBS.ByteString where instance IsString (Hash HbSync) where fromString s = maybe (error ("invalid base58: " <> show s)) HbSyncHash doDecode where - doDecode = decodeBase58 alphabet (BS8.pack s) + doDecode = fromBase58 (BS8.pack s) instance Pretty (Hash HbSync) where - pretty (HbSyncHash s) = pretty @String [qc|{encodeBase58 bitcoinAlphabet s}|] + pretty (HbSyncHash s) = pretty @String [qc|{toBase58 s}|] instance FromJSON (Hash HbSync) where diff --git a/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs new file mode 100644 index 00000000..23ea6aea --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Auth/Credentials.hs @@ -0,0 +1,82 @@ +{-# Language UndecidableInstances #-} +module HBS2.Net.Auth.Credentials where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Types +import HBS2.Base58 +import HBS2.Net.Messaging.UDP (UDP) + +import Codec.Serialise +import Crypto.Saltine.Core.Sign (Keypair(..)) +import Crypto.Saltine.Core.Sign qualified as Sign +import Crypto.Saltine.Class qualified as Crypto +import Crypto.Saltine.Class (IsEncoding) +import Data.ByteString.Lazy.Char8 qualified as LBS +import Data.ByteString.Char8 qualified as B8 +import Data.ByteString.Char8 (ByteString) +import Data.Function +import Data.List.Split (chunksOf) +import Prettyprinter + +newtype AsBase58 a = AsBase58 a + +newtype AsCredFile a = AsCredFile a + +newCredentials :: forall e m . ( MonadIO m + , Signatures e + , PrivKey 'Sign e ~ Sign.SecretKey + , PubKey 'Sign e ~ Sign.PublicKey + ) => m (PeerCredentials e) +newCredentials = do + pair <- liftIO Sign.newKeypair + pure $ PeerCredentials @e (secretKey pair) (publicKey pair) + + +parseCredentials :: forall e . ( Signatures e + , PrivKey 'Sign e ~ Sign.SecretKey + , PubKey 'Sign e ~ Sign.PublicKey + ) + => AsCredFile ByteString -> Maybe (PeerCredentials e) + +parseCredentials (AsCredFile bs) = maybe1 b58_1 Nothing fromCbor + + where + fromCbor s = deserialiseOrFail @(ByteString, ByteString) s + & either (const Nothing) fromPair + + fromPair (s1,s2) = PeerCredentials <$> Crypto.decode s1 + <*> Crypto.decode s2 + + b58_1 = B8.lines bs & dropWhile hdr + & filter ( not . B8.null ) + & B8.concat + & fromBase58 + & fmap LBS.fromStrict + + hdr s = B8.isPrefixOf "#" s || B8.null s + +instance ( IsEncoding (PrivKey 'Sign e) + , IsEncoding (PubKey 'Sign e) + ) + + => Pretty (AsBase58 (PeerCredentials e)) where + pretty (AsBase58 (PeerCredentials s p)) = pretty $ B8.unpack (toBase58 bs) + where + sk = Crypto.encode s + pk = Crypto.encode p + bs = serialise (sk,pk) & LBS.toStrict + +instance Pretty (AsBase58 Sign.PublicKey) where + pretty (AsBase58 pk) = pretty $ B8.unpack $ toBase58 (Crypto.encode pk) + +instance Pretty (AsBase58 a) => Pretty (AsCredFile (AsBase58 a)) where + pretty (AsCredFile pc) = "# hbs2 credentials file" <> line + <> "# keep it private" <> line <> line + <> co + where + co = vcat $ fmap pretty + $ chunksOf 32 + $ show + $ pretty pc + + diff --git a/hbs2-core/lib/HBS2/Net/IP/Addr.hs b/hbs2-core/lib/HBS2/Net/IP/Addr.hs new file mode 100644 index 00000000..f554df1e --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/IP/Addr.hs @@ -0,0 +1,90 @@ +module HBS2.Net.IP.Addr (parseAddr, getHostPort, Pretty) where + +import HBS2.Prelude + +import Control.Applicative +import Control.Monad +import Control.Monad.Trans.Maybe +import Data.Attoparsec.Text as Atto +import Data.Char +import Data.Function +import Data.Functor +import Data.Maybe +import Data.Text qualified as Text +import Data.Text (Text) +import Network.Socket +import Network.SockAddr +import Prettyprinter + +instance Pretty SockAddr where + pretty sa = pretty (show sa) + +getHostPort :: Text -> Maybe (String, PortNumber) +getHostPort s = parseOnly p s & either (const Nothing) Just + where + p = do + (h, p) <- pAddr + pure (Text.unpack h, read (Text.unpack p)) + +parseAddr :: Text -> IO [AddrInfo] +parseAddr s = fromMaybe mempty <$> runMaybeT do + (host,port) <- MaybeT $ pure $ parseOnly pAddr s & either (const Nothing) Just + let hostS = Text.unpack host & Just + let portS = Text.unpack port & Just + MaybeT $ liftIO $ getAddrInfo (Just udp) hostS portS <&> Just + + where + udp = defaultHints { addrSocketType = Datagram } + +pAddr :: Parser (Text, Text) +pAddr = pIP6 <|> pIP4 <|> pHostName + +pIP6 :: Parser (Text, Text) +pIP6 = do + skipSpace + + hostAddr <- do + void $ char '[' + p <- Atto.takeWhile ( \c -> isHexDigit c || c == ':' ) + void $ char ']' + pure p + + port <- do + void $ char ':' + Atto.takeWhile isDigit + + skipSpace + endOfInput + + pure (hostAddr, port) + +pIP4 :: Parser (Text, Text) +pIP4 = do + skipSpace + + hostAddr0 <- replicateM 3 $ do + n <- Atto.takeWhile isDigit + dot <- string "." + pure ( n <> dot ) + + hostAddr1 <- Atto.takeWhile isDigit + + port <- do + void $ char ':' + Atto.takeWhile isDigit + + skipSpace + endOfInput + + pure (mconcat hostAddr0 <> hostAddr1, port) + +pHostName :: Parser (Text, Text) +pHostName = do + skipSpace + host' <- Atto.takeWhile (/= ':') + void $ char ':' + port <- decimal + let host = if Text.null host' then "localhost" else host' + pure (host, Text.pack (show port)) + + diff --git a/hbs2-core/lib/HBS2/Net/Messaging.hs b/hbs2-core/lib/HBS2/Net/Messaging.hs index 30853048..67bd56dd 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging.hs @@ -1,4 +1,5 @@ {-# Language FunctionalDependencies #-} +{-# Language AllowAmbiguousTypes #-} module HBS2.Net.Messaging where import HBS2.Net.Proto @@ -9,7 +10,10 @@ newtype From a = From (Peer a) newtype To a = To (Peer a) -class HasPeer proto => Messaging bus proto msg | bus -> proto, bus -> msg where +-- class Messaging bus e msg => MessagingHasPeer e where + +-- class HasPeer proto => Messaging bus proto msg | bus -> proto, bus -> msg where +class HasPeer proto => Messaging bus proto msg where sendTo :: MonadIO m => bus -> To proto -> From proto -> msg -> m () receive :: MonadIO m => bus -> To proto -> m [(From proto, msg)] diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs new file mode 100644 index 00000000..ffd69e54 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -0,0 +1,174 @@ +{-# Language TemplateHaskell #-} +module HBS2.Net.Messaging.UDP where + +import HBS2.Prelude +import HBS2.Clock +import HBS2.Defaults +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging +import HBS2.Net.Proto +import HBS2.Prelude.Plated + +import Data.Foldable +import Data.Function +import Control.Exception +import Control.Monad.Trans.Maybe +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue qualified as Q +import Control.Concurrent.STM.TQueue qualified as Q0 +import Control.Monad +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Functor +import Data.Hashable +import Data.List qualified as L +import Data.Maybe +-- import Data.Text (Text) +import Data.Text qualified as Text +import Lens.Micro.Platform +import Network.Socket +import Network.Socket.ByteString +import Network.Multicast +import Prettyprinter + +data UDP + +instance HasPeer UDP where + newtype instance Peer UDP = + PeerUDP + { _sockAddr :: SockAddr + } + deriving stock (Eq,Ord,Show,Generic) + +instance Hashable (Peer UDP) where + hashWithSalt salt p = case _sockAddr p of + SockAddrInet pn h -> hashWithSalt salt (4, fromIntegral pn, h) + SockAddrInet6 pn _ h _ -> hashWithSalt salt (6, fromIntegral pn, h) + SockAddrUnix s -> hashWithSalt salt ("unix", s) + +instance Pretty (Peer UDP) where + pretty p = pretty (_sockAddr p) + +makeLenses 'PeerUDP + +-- One address - one peer - one messaging +data MessagingUDP = + MessagingUDP + { listenAddr :: SockAddr + , sink :: TBQueue (From UDP, ByteString) + , inbox :: TQueue (To UDP, ByteString) + , sock :: TVar Socket + , mcast :: Bool + } + + +getOwnPeer :: MessagingUDP -> Peer UDP +getOwnPeer mess = PeerUDP (listenAddr mess) + +newMessagingUDPMulticast :: MonadIO m => String -> m (Maybe MessagingUDP) +newMessagingUDPMulticast s = runMaybeT $ do + + (host, port) <- MaybeT $ pure $ getHostPort (Text.pack s) + + so <- liftIO $ multicastReceiver host port + + liftIO $ setSocketOption so ReuseAddr 1 + + a <- liftIO $ getSocketName so + + liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize + <*> Q0.newTQueueIO + <*> newTVarIO so + <*> pure True + +newMessagingUDP :: MonadIO m => Bool -> Maybe String -> m (Maybe MessagingUDP) +newMessagingUDP reuse saddr = + case saddr of + Just s -> do + + runMaybeT $ do + l <- MaybeT $ liftIO $ parseAddr (Text.pack s) <&> listToMaybe . sorted + let a = addrAddress l + so <- liftIO $ socket (addrFamily l) (addrSocketType l) (addrProtocol l) + + when reuse $ do + liftIO $ setSocketOption so ReuseAddr 1 + + liftIO $ MessagingUDP a <$> Q.newTBQueueIO defMessageQueueSize + <*> Q0.newTQueueIO + <*> newTVarIO so + <*> pure False + + + Nothing -> do + so <- liftIO $ socket AF_INET Datagram defaultProtocol + sa <- liftIO $ getSocketName so + + liftIO $ Just <$> ( MessagingUDP sa <$> Q.newTBQueueIO defMessageQueueSize + <*> Q0.newTQueueIO + <*> newTVarIO so + <*> pure False + ) + + where + sorted = L.sortBy ( compare `on` proto) + proto x = case addrAddress x of + SockAddrInet{} -> 0 + SockAddrInet6{} -> 1 + SockAddrUnix{} -> 2 + + +udpWorker :: MessagingUDP -> TVar Socket -> IO () +udpWorker env tso = do + + so <- readTVarIO tso + + rcvLoop <- async $ forever $ do + -- so <- readTVarIO tso + pause ( 10 :: Timeout 'Seconds ) + -- (msg, from) <- recvFrom so defMaxDatagram + -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) + -- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) + + sndLoop <- async $ forever $ do + pause ( 10 :: Timeout 'Seconds ) + -- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env) + -- print "YAY!" + -- sendAllTo so (LBS.toStrict msg) (view sockAddr whom) + + -- (msg, from) <- recvFrom so defMaxDatagram + -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) + -- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) + + mapM_ wait [rcvLoop,sndLoop] + +-- FIXME: stopping + +runMessagingUDP :: MonadIO m => MessagingUDP -> m () +runMessagingUDP udpMess = liftIO $ do + let addr = listenAddr udpMess + so <- readTVarIO (sock udpMess) + + unless (mcast udpMess) $ do + bind so addr + + w <- async $ udpWorker udpMess (sock udpMess) + waitCatch w >>= either throwIO (const $ pure ()) + +instance Messaging MessagingUDP UDP ByteString where + + sendTo bus (To whom) _ msg = liftIO do + -- atomically $ Q0.writeTQueue (inbox bus) (To whom, msg) + so <- readTVarIO (sock bus) + sendAllTo so (LBS.toStrict msg) (view sockAddr whom) + + receive bus _ = liftIO do + so <- readTVarIO (sock bus) + (msg, from) <- recvFrom so defMaxDatagram + pure [(From (PeerUDP from), LBS.fromStrict msg)] + + -- liftIO $ atomically + -- $ Q.readTBQueue (sink bus) <&> L.singleton + diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index dd792689..aa11b256 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -3,6 +3,7 @@ module HBS2.Net.Proto.BlockChunks where import HBS2.Events import HBS2.Hash +import HBS2.Clock import HBS2.Net.Proto import HBS2.Prelude.Plated import HBS2.Storage @@ -13,6 +14,8 @@ import Prettyprinter import Data.ByteString.Lazy (ByteString) import Data.Foldable +import System.Random.Shuffle + newtype ChunkSize = ChunkSize Word16 deriving newtype (Num,Enum,Real,Integral,Pretty) deriving stock (Eq,Ord,Show,Data,Generic) @@ -47,6 +50,7 @@ data BlockChunks e = BlockChunks (Cookie e) (BlockChunksProto e) deriving stock (Generic) data BlockChunksProto e = BlockGetAllChunks (Hash HbSync) ChunkSize + | BlockGetChunks (Hash HbSync) ChunkSize Word32 Word32 | BlockNoChunks | BlockChunk ChunkNum ByteString | BlockLost @@ -69,12 +73,14 @@ newtype instance EventKey e (BlockChunks e) = deriving instance Hashable (EventKey e (BlockChunks e)) -newtype instance Event e (BlockChunks e) = - BlockReady (Hash HbSync) +data instance Event e (BlockChunks e) = + BlockReady (Hash HbSync) + | BlockChunksLost (Hash HbSync) deriving stock (Typeable) blockChunksProto :: forall e m . ( MonadIO m , Response e (BlockChunks e) m + , HasDeferred e (BlockChunks e) m , HasOwnPeer e m , Pretty (Peer e) ) @@ -84,6 +90,26 @@ blockChunksProto :: forall e m . ( MonadIO m blockChunksProto adapter (BlockChunks c p) = case p of + + BlockGetChunks h size n1 num -> do + + bsz' <- blkSize adapter h + + maybe1 bsz' (pure ()) $ \bsz -> do + + let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] + let offsets = take (fromIntegral num) $ drop (fromIntegral n1) $ zip offsets' [0..] + + -- liftIO $ print $ "sending " <+> pretty (length offsets) + -- <+> "chunks for block" + -- <+> pretty h + + -- for_ offsets $ \((o,sz),i) -> deferred proto do + for_ offsets $ \((o,sz),i) -> deferred proto do + -- liftIO $ print $ "send chunk " <+> pretty i <+> pretty sz + chunk <- blkChunk adapter h o sz + maybe (pure ()) (response_ . BlockChunk @e i) chunk + BlockGetAllChunks h size -> do me <- ownPeer @e @@ -96,9 +122,12 @@ blockChunksProto adapter (BlockChunks c p) = let offsets' = calcChunks bsz (fromIntegral size) :: [(Offset, Size)] let offsets = zip offsets' [0..] + -- liftIO $ print $ "sending " <+> pretty (length offsets) + -- <+> "chunks for block" + -- <+> pretty h + for_ offsets $ \((o,sz),i) -> deferred proto do chunk <- blkChunk adapter h o sz - -- liftIO $ print $ "sending chunk for block" <+> pretty h maybe (pure ()) (response_ . BlockChunk @e i) chunk BlockChunk n bs -> deferred proto do @@ -114,6 +143,7 @@ blockChunksProto adapter (BlockChunks c p) = pure () BlockLost{} -> do + liftIO $ print "GOT BLOCK LOST MESSAGE - means IO ERROR" pure () where @@ -121,4 +151,3 @@ blockChunksProto adapter (BlockChunks c p) = response_ pt = response (BlockChunks c pt) - diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 733753bc..5b6c1a37 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -19,6 +19,7 @@ instance Serialise (BlockInfo e) blockSizeProto :: forall e m . ( MonadIO m , Response e (BlockInfo e) m + , HasDeferred e (BlockInfo e) m , EventEmitter e (BlockInfo e) m ) => GetBlockSize HbSync m diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs new file mode 100644 index 00000000..eac3b079 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -0,0 +1,92 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +module HBS2.Net.Proto.Definition + ( module HBS2.Net.Proto.BlockAnnounce + , module HBS2.Net.Proto.BlockChunks + , module HBS2.Net.Proto.BlockInfo + ) + where + +import HBS2.Prelude +import HBS2.Clock +import HBS2.Net.Messaging.UDP +import HBS2.Net.Proto +import HBS2.Net.Proto.BlockAnnounce +import HBS2.Net.Proto.BlockChunks +import HBS2.Net.Proto.BlockInfo +import HBS2.Net.Proto.Peer +import HBS2.Defaults + +import Data.Functor +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS +import Codec.Serialise (deserialiseOrFail,serialise,Serialise(..)) + +import Crypto.Saltine.Core.Box qualified as Crypto +import Crypto.Saltine.Class qualified as Crypto +import Crypto.Saltine.Core.Sign qualified as Sign + +type instance PubKey 'Sign e = Sign.PublicKey +type instance PrivKey 'Sign e = Sign.SecretKey + +instance Serialise Sign.PublicKey + +instance HasProtocol UDP (BlockInfo UDP) where + type instance ProtocolId (BlockInfo UDP) = 1 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance HasProtocol UDP (BlockChunks UDP) where + type instance ProtocolId (BlockChunks UDP) = 2 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance Expires (SessionKey UDP (BlockChunks UDP)) where + expiresIn _ = Just defCookieTimeoutSec + +instance HasProtocol UDP (BlockAnnounce UDP) where + type instance ProtocolId (BlockAnnounce UDP) = 3 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance HasProtocol UDP (PeerHandshake UDP) where + type instance ProtocolId (PeerHandshake UDP) = 4 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +instance Expires (SessionKey UDP (BlockInfo UDP)) where + expiresIn _ = Just defCookieTimeoutSec + +instance Expires (EventKey UDP (BlockInfo UDP)) where + expiresIn _ = Just 600 + +instance Expires (EventKey UDP (BlockChunks UDP)) where + expiresIn _ = Just 600 + +instance Expires (EventKey UDP (BlockAnnounce UDP)) where + expiresIn _ = Nothing + +instance Expires (SessionKey UDP (KnownPeer UDP)) where + expiresIn _ = Just 3600 + +instance Expires (SessionKey UDP (PeerHandshake UDP)) where + expiresIn _ = Just 10 + +instance MonadIO m => HasNonces (PeerHandshake UDP) m where + type instance Nonce (PeerHandshake UDP) = BS.ByteString + newNonce = do + n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) + pure $ BS.take 32 n + + +instance Serialise Sign.Signature + +instance Signatures UDP where + type Signature UDP = Sign.Signature + makeSign = Sign.signDetached + verifySign = Sign.signVerifyDetached + + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs new file mode 100644 index 00000000..aa2b6f04 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -0,0 +1,108 @@ +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} +module HBS2.Net.Proto.Peer where + +import HBS2.Data.Types +import HBS2.Net.Proto +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated + +import Data.ByteString.Lazy (ByteString) +import Data.ByteString qualified as BS +import Lens.Micro.Platform +import Codec.Serialise() + +type PingSign e = Signature e +type PingNonce = BS.ByteString + +newtype PeerData e = + PeerData + { _peerSignKey :: PubKey 'Sign e + } + deriving stock (Typeable,Generic) + +makeLenses 'PeerData + +newtype PeerAnnounce e = PeerAnnounce (PeerData e) + deriving stock (Generic) + +data PeerHandshake e = + PeerPing PingNonce + | PeerPong (PeerData e) (Signature e) + deriving stock (Generic) + +newtype KnownPeer e = KnownPeer (PeerData e) + deriving stock (Typeable,Generic) + +newtype instance SessionKey e (KnownPeer e) = + KnownPeerKey (Peer e) + deriving stock (Generic,Typeable) + +type instance SessionData e (KnownPeer e) = KnownPeer e + +newtype instance SessionKey e (PeerHandshake e) = + PeerHandshakeKey (Peer e) + deriving stock (Generic, Typeable) + +type instance SessionData e (PeerHandshake e) = (PingNonce, PeerData e) + +peerHandShakeProto :: forall e m . ( MonadIO m + , Response e (PeerHandshake e) m + , Sessions e (PeerHandshake e) m + , HasNonces (PeerHandshake e) m + , Nonce (PeerHandshake e) ~ PingNonce + , Signatures e + , HasCredentials e m + ) + => PeerHandshake e -> m () + +peerHandShakeProto = + \case + PeerPing nonce -> do + pip <- thatPeer proto + -- TODO: взять свои ключи + -- TODO: подписать нонс + -- TODO: отправить обратно вместе с публичным ключом + -- + pure () + -- TODO: sign nonce + -- se <- find @e (PeerHandshakeKey pip) id + -- let signed = undefined + -- TODO: answer + -- response (PeerPong @e signed) + + PeerPong d sign -> do + pure () + + -- se' <- find @e (PeerHandshakeKey pip) id + -- maybe1 se' (pure ()) $ \se -> do + + -- TODO: get peer data + -- TODO: check signature + + -- ok <- undefined signed + + -- when ok $ do + -- TODO: add peer to authorized peers + -- pure () + + where + proto = Proxy @(PeerHandshake e) + +deriving instance Eq (Peer e) => Eq (SessionKey e (KnownPeer e)) +instance Hashable (Peer e) => Hashable (SessionKey e (KnownPeer e)) + +deriving instance Eq (Peer e) => Eq (SessionKey e (PeerHandshake e)) +instance Hashable (Peer e) => Hashable (SessionKey e (PeerHandshake e)) + +instance ( Serialise (PubKey 'Sign e) + , Serialise (Signature e) ) + + => Serialise (PeerData e) + +instance ( Serialise (PubKey 'Sign e) + , Serialise (Signature e) + ) + + => Serialise (PeerHandshake e) + diff --git a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs index 5d35f6b1..39a22cd1 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs @@ -35,7 +35,7 @@ type family SessionData e p :: Type class ( Monad m - , HasProtocol e p + -- , HasProtocol e p , Eq (SessionKey e p) , Hashable (SessionKey e p) , Typeable (SessionData e p) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 33503afc..a2f982c1 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -2,6 +2,7 @@ {-# Language FunctionalDependencies #-} {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} +{-# Language TemplateHaskell #-} module HBS2.Net.Proto.Types ( module HBS2.Net.Proto.Types ) where @@ -13,6 +14,8 @@ import Data.Hashable import Control.Monad.IO.Class import System.Random qualified as Random import Data.Digest.Murmur32 +import Data.ByteString (ByteString) +import Lens.Micro.Platform -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -20,27 +23,61 @@ import Data.Digest.Murmur32 class Monad m => GenCookie e m where genCookie :: Hashable salt => salt -> m (Cookie e) +type family EncryptPubKey e :: Type + +class Monad m => HasNonces p m where + type family Nonce p :: Type + newNonce :: m (Nonce p) + +data CryptoAction = Sign | Encrypt + +type family PubKey ( a :: CryptoAction) e :: Type +type family PrivKey ( a :: CryptoAction) e :: Type + +class Signatures e where + type family Signature e :: Type + makeSign :: PrivKey 'Sign e -> ByteString -> Signature e + verifySign :: PubKey 'Sign e -> Signature e -> ByteString -> Bool + +class HasCredentials e m where + getCredentials :: m (PeerCredentials e) + class HasCookie e p | p -> e where type family Cookie e :: Type getCookie :: p -> Maybe (Cookie e) getCookie = const Nothing + +data PeerCredentials e = + PeerCredentials + { _peerSignSk :: PrivKey 'Sign e + , _peerSignPk :: PubKey 'Sign e + } + +makeLenses 'PeerCredentials + + data WithCookie e p = WithCookie (Cookie e) p class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where data family (Peer e) :: Type - -class (MonadIO m, HasProtocol e p) => Response e p m | p -> e where - response :: p -> m () - deferred :: Proxy p -> m () -> m () +class (Monad m, HasProtocol e p) => HasThatPeer e p (m :: Type -> Type) where thatPeer :: Proxy p -> m (Peer e) +class (MonadIO m, HasProtocol e p) => HasDeferred e p m | p -> e where + deferred :: Proxy p -> m () -> m () + +class ( MonadIO m + , HasProtocol e p + , HasThatPeer e p m + ) => Response e p m | p -> e where + + response :: p -> m () + class Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () - - class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where type family ProtocolId p = (id :: Nat) | id -> p type family Encoded e :: Type diff --git a/hbs2-core/lib/HBS2/OrDie.hs b/hbs2-core/lib/HBS2/OrDie.hs new file mode 100644 index 00000000..5a01a3e7 --- /dev/null +++ b/hbs2-core/lib/HBS2/OrDie.hs @@ -0,0 +1,21 @@ +module HBS2.OrDie where + +import Data.Kind +import Control.Monad.IO.Class +import System.Exit + +class OrDie m a where + type family OrDieResult a :: Type + orDie :: m a -> String -> m (OrDieResult a) + +instance OrDie IO (Maybe a) where + type instance OrDieResult (Maybe a) = a + orDie mv err = mv >>= \case + Nothing -> die err + Just x -> pure x + +instance MonadIO m => OrDie m ExitCode where + type instance OrDieResult ExitCode = () + orDie mv err = mv >>= \case + ExitSuccess -> pure () + ExitFailure{} -> liftIO $ die err diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index a7353a86..924735b9 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -6,15 +6,34 @@ module HBS2.Prelude , maybe1 , Hashable , lift + , AsFileName(..) + , Pretty ) where import Data.String (IsString(..)) import Safe import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad (void,guard,when,unless) -import Data.Hashable (Hashable) import Control.Monad.Trans.Class (lift) +import Data.Function +import Data.Char qualified as Char +import Data.Text qualified as Text +import Data.Hashable +import Prettyprinter +import Data.Word + maybe1 :: Maybe a -> b -> (a -> b) -> b maybe1 mb n j = maybe n j mb + +newtype AsFileName a = AsFileName a + + +instance Pretty a => Pretty (AsFileName a) where + pretty (AsFileName f) = pretty x <> "@" <> uniq + where + uniq = pretty (fromIntegral $ hash (show (pretty f)) :: Word16) + x = show (pretty f) & Text.pack + & Text.filter (not . Char.isPunctuation) + diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index 5f05f3a6..60c4b830 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -18,7 +18,7 @@ instance Key HbSync ~ Hash HbSync => IsKey HbSync where newtype StoragePrefix = StoragePrefix { fromPrefix :: FilePath } deriving stock (Data,Show) - deriving newtype (IsString) + deriving newtype (IsString,Pretty) type family Block block :: Type diff --git a/hbs2-core/test/TestUniqProtoId.hs b/hbs2-core/test/TestUniqProtoId.hs deleted file mode 100644 index 14ec25f7..00000000 --- a/hbs2-core/test/TestUniqProtoId.hs +++ /dev/null @@ -1,147 +0,0 @@ -{-# Language TypeFamilyDependencies #-} -{-# Language UndecidableInstances #-} -module TestUniqProtoId where - -import HBS2.Prelude -import HBS2.Prelude.Plated -import HBS2.Clock - -import HasProtocol -import FakeMessaging - -import Test.Tasty.HUnit - -import Data.ByteString.Lazy (ByteString) -import Control.Concurrent.Async -import Codec.Serialise hiding (encode,decode) - -import System.IO - -import Control.Concurrent.STM.TQueue qualified as Q --- import Control.Concurrent.STM.TQueue () -import Control.Concurrent.STM - -import Prettyprinter hiding (pipe) - -debug :: (MonadIO m) => Doc ann -> m () -debug p = liftIO $ hPrint stderr p - - -dump :: MonadIO m => TQueue a -> a -> m () -dump q x = liftIO $ atomically $ Q.writeTQueue q x - -data PingPong e = Ping Int - | Pong Int - deriving stock (Eq,Generic,Show,Read) - -data PeekPoke e = Peek Int - | Poke Int - | Nop - deriving stock (Eq,Generic,Show,Read) - - -instance Serialise (PingPong e) - -instance Serialise (PeekPoke e) - -instance HasProtocol Fake (PingPong Fake) where - type instance ProtocolId (PingPong Fake) = 1 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - -instance HasProtocol Fake (PeekPoke Fake) where - type instance ProtocolId (PeekPoke Fake) = 2 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - -pingPongHandler :: forall e m . ( MonadIO m - , Response e (PingPong e) m - , HasProtocol e (PingPong e) - ) - => TQueue (PingPong e) - -> PingPong e - -> m () - -pingPongHandler q = - \case - - Ping c -> dump q (Ping c) >> response (Pong @e c) - - Pong c | c < 100 -> dump q (Pong c) >> response (Ping @e (succ c)) - | otherwise -> dump q (Pong c) - -peekPokeHandler :: forall e m . ( MonadIO m - , Response e (PeekPoke e) m - , HasProtocol e (PeekPoke e) - ) - => TQueue (PeekPoke e) - -> PeekPoke e - -> m () - -peekPokeHandler q = - \case - Peek c -> dump q (Peek c) >> response (Poke @e (succ c)) - Poke c -> dump q (Poke c) >> response (Nop @e) - Nop -> dump q Nop - -testUniqProtoId :: IO () -testUniqProtoId = do - - hSetBuffering stderr LineBuffering - - qpg0 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake)) - qpp0 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake)) - - qpg1 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake)) - qpp1 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake)) - - fake <- newFakeP2P True - - let peer0 = FakePeer 0 - let peer1 = FakePeer 1 - - env0 <- newEnv peer0 fake - env1 <- newEnv peer1 fake - - race (pause (0.25 :: Timeout 'Seconds)) $ do - - runEngineM env0 $ do - request peer1 (Ping @Fake 0) - - runEngineM env1 $ do - request peer0 (Peek @Fake 0) - - pip1 <- async $ - runPeer env0 - [ makeResponse (pingPongHandler qpg0) - , makeResponse (peekPokeHandler qpp0) - ] - - pip2 <- async $ - runPeer env1 - [ makeResponse (pingPongHandler qpg1) - , makeResponse (peekPokeHandler qpp1) - ] - - pause (0.10 :: Timeout 'Seconds) - - debug "stopping threads" - - mapM_ cancel [pip1, pip2] - - void $ waitAnyCatchCancel [pip1, pip2] - - ping0 <- atomically $ Q.flushTQueue qpg0 - ping1 <- atomically $ Q.flushTQueue qpg1 - p0 <- atomically $ Q.flushTQueue qpp0 - p1 <- atomically $ Q.flushTQueue qpp1 - - assertEqual "ping0" ping0 [ Pong i | i <- [0..100] ] - assertEqual "ping1" ping1 [ Ping i | i <- [0..100] ] - assertEqual "p0" p0 [ Peek 0, Nop ] - assertEqual "p1" p1 [ Poke 1 ] - - debug "we're done" - diff --git a/hbs2-peer/CHANGELOG.md b/hbs2-peer/CHANGELOG.md new file mode 100644 index 00000000..a55a8012 --- /dev/null +++ b/hbs2-peer/CHANGELOG.md @@ -0,0 +1,5 @@ +# Revision history for hbs2-peer + +## 0.1.0.0 -- YYYY-mm-dd + +* First version. Released on an unsuspecting world. diff --git a/hbs2-peer/LICENSE b/hbs2-peer/LICENSE new file mode 100644 index 00000000..3cbe915d --- /dev/null +++ b/hbs2-peer/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2023, + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs new file mode 100644 index 00000000..c86a3389 --- /dev/null +++ b/hbs2-peer/app/BlockDownload.hs @@ -0,0 +1,427 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} +module BlockDownload where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Data.Detect +import HBS2.Data.Types.Refs +import HBS2.Defaults +import HBS2.Events +import HBS2.Hash +import HBS2.Merkle +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Sessions +import HBS2.Prelude.Plated +import HBS2.Storage + +import PeerInfo +import Logger + +import Data.Foldable hiding (find) +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad.Reader +import Control.Monad.Trans.Maybe +import Data.ByteString.Lazy (ByteString) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.IntMap (IntMap) +import Data.IntMap qualified as IntMap +import Data.IntSet qualified as IntSet +import Data.Maybe +import Lens.Micro.Platform +import Prettyprinter +import System.Random.Shuffle + + +calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] +calcBursts bu pieces = go seed + where + seed = fmap (,1) pieces + + go ( (n1,s1) : (n2,s2) : xs ) + | (s1 + s2) <= bu = go ((n1, s1+s2) : xs) + | otherwise = (n1,s1) : go ( (n2,s2) : xs) + + go [x] = [x] + go [] = [] + + +data BlockDownload = + BlockDownload + { _sBlockHash :: Hash HbSync + , _sBlockSize :: Size + , _sBlockChunkSize :: ChunkSize + , _sBlockChunks :: TQueue (ChunkNum, ByteString) + } + deriving stock (Typeable) + +makeLenses 'BlockDownload + +newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload +newBlockDownload h = do + BlockDownload h 0 0 <$> liftIO newTQueueIO + + +type instance SessionData e (BlockChunks e) = BlockDownload + +newtype instance SessionKey e (BlockChunks e) = + DownloadSessionKey (Peer e, Cookie e) + deriving stock (Generic,Typeable) + + + +data DownloadEnv e = + DownloadEnv + { _downloadQ :: TQueue (Hash HbSync) + , _peerBusy :: TVar (HashMap (Peer e) ()) + } + +makeLenses 'DownloadEnv + +class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e +instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e + +newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) +newDownloadEnv = liftIO do + DownloadEnv <$> newTQueueIO + <*> newTVarIO mempty + +newtype BlockDownloadM e m a = + BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (DownloadEnv e) + , MonadTrans + ) + +runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a +runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv + +withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a +withDownload e m = runReaderT ( fromBlockDownloadM m ) e + +addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +addDownload h = do + q <- asks (view downloadQ) + liftIO $ atomically $ writeTQueue q h + -- debug $ "addDownload" <+> pretty h + -- pause ( 0.25 :: Timeout 'Seconds ) + +withFreePeer :: (MyPeer e, MonadIO m) + => Peer e + -> BlockDownloadM e m () + -> BlockDownloadM e m () + -> BlockDownloadM e m () + +withFreePeer p n m = do + busy <- asks (view peerBusy) + avail <- liftIO $ atomically + $ stateTVar busy $ + \s -> case HashMap.lookup p s of + Nothing -> (True, HashMap.insert p () s) + Just{} -> (False, s) + if not avail + then n + else do + r <- m + liftIO $ atomically $ modifyTVar busy $ HashMap.delete p + pure r + +getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) +getBlockForDownload = do + q <- asks (view downloadQ) + liftIO $ atomically $ readTQueue q + +processBlock :: forall e m . ( MonadIO m + , HasStorage m + , Block ByteString ~ ByteString + ) + => Hash HbSync + -> BlockDownloadM e m () + +processBlock h = do + + sto <- lift getStorage + + bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) + + case bt of + Nothing -> addDownload h + + Just (AnnRef{}) -> pure () + + Just (Merkle{}) -> do + debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h + walkMerkle h (liftIO . getBlock sto) $ \(hr :: [HashRef]) -> do + + for_ hr $ \(HashRef blk) -> do + + -- debug $ pretty blk + + here <- liftIO (hasBlock sto blk) <&> isJust + + if here then do + debug $ "block" <+> pretty blk <+> "is already here" + pure () -- we don't need to recurse, cause walkMerkle is recursing for us + + else + addDownload blk + + Just (Blob{}) -> do + pure () + + +downloadFromWithPeer :: forall e m . ( MyPeer e + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockChunks e) m + , MonadReader (PeerEnv e ) m + , PeerMessaging e + , HasProtocol e (BlockInfo e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , Sessions e (BlockChunks e) m + , Sessions e (PeerInfo e) m + , Block ByteString ~ ByteString + , HasStorage m + ) + => Peer e + -> Hash HbSync + -> BlockDownloadM e m () +downloadFromWithPeer peer h = do + + + npi <- newPeerInfo + pinfo <- lift $ fetch True npi (PeerInfoKey peer) id + + waitSize <- liftIO $ newTBQueueIO 1 + + lift $ do + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do + when ( p1 == peer ) $ do + liftIO $ atomically $ writeTBQueue waitSize s + + request @e peer (GetBlockSize @e h) + + esize <- liftIO $ race ( pause defBlockInfoTimeout ) do -- FIXME: block size wait time + atomically $ readTBQueue waitSize + + let mbSize = either (const Nothing) Just esize + + sto <- lift $ getStorage + + case mbSize of + Nothing -> void $ addDownload h + Just thisBkSize -> do + + coo <- genCookie (peer,h) + let key = DownloadSessionKey (peer, coo) + let chusz = defChunkSize + dnwld <- newBlockDownload h + let chuQ = view sBlockChunks dnwld + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral thisBkSize) + $ dnwld + + lift $ update @e new key id + + let burstSizeT = view peerBurst pinfo + + burstSize <- liftIO $ readTVarIO burstSizeT + + let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)] + + let chunkNums = [ 0 .. pred (length offsets) ] + + let bursts = calcBursts burstSize chunkNums + + -- debug $ "bursts: " <+> pretty bursts + + r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) + rq <- liftIO newTQueueIO + + for_ bursts $ liftIO . atomically . writeTQueue rq + + fix \next -> do + burst <- liftIO $ atomically $ tryReadTQueue rq + + case burst of + + Just (i,chunksN) -> do + let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) + lift $ request peer (BlockChunks @e coo req) + + -- TODO: here wait for all requested chunks! + -- FIXME: it may blocks forever, so must be timeout and retry + + catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty ) + ( replicateM chunksN + $ atomically + $ readTQueue chuQ ) + + ) + when (null catched) $ do + + -- nerfing peer burst size. + -- FIXME: we need a thread that will be reset them again + + newBurst <- liftIO $ atomically + $ stateTVar burstSizeT $ \c -> let v = max 1 (c `div` 2) + in (v,v) + + let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] + + debug $ "new burst: " <+> pretty newBurst + debug $ "missed chunks for request" <+> pretty (i,chunksN) + + for_ chuchu $ liftIO . atomically . writeTQueue rq + + for_ catched $ \(num,bs) -> do + liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) + + next + + Nothing -> do + + sz <- liftIO $ readTVarIO r <&> IntMap.size + + if sz == length offsets then do + pieces <- liftIO $ readTVarIO r <&> IntMap.elems + let block = mconcat pieces + let h1 = hashObject @HbSync block + + if h1 == h then do + -- debug "PROCESS BLOCK" + lift $ expire @e key + void $ liftIO $ putBlock sto block + void $ processBlock h + else do + debug "HASH NOT MATCH" + debug "MAYBE THAT PEER IS JERK" + + else do + debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS" + got <- liftIO $ readTVarIO r <&> IntMap.keysSet + let need = IntSet.fromList (fmap fromIntegral chunkNums) + + let missed = IntSet.toList $ need `IntSet.difference` got + + -- normally this should not happen + -- however, let's try do download the tails + -- by one chunk a time + for_ missed $ \n -> do + liftIO $ atomically $ writeTQueue rq (n,1) + + +instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where + getPeerLocator = lift getPeerLocator + +blockDownloadLoop :: forall e m . ( m ~ PeerM e IO + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockAnnounce e) m + , HasProtocol e (BlockInfo e) + , HasProtocol e (BlockAnnounce e) + , HasProtocol e (BlockChunks e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , EventListener e (BlockAnnounce e) m + , EventEmitter e (BlockChunks e) m + , Sessions e (BlockChunks e) m + , Sessions e (PeerInfo e) m + , PeerSessionKey e (PeerInfo e) + -- , Typeable (SessionKey e (BlockChunks e)) + -- , Typeable (SessionKey e (BlockInfo e)) + , HasStorage m + , Pretty (Peer e) + , Block ByteString ~ ByteString + , PeerMessaging e + ) + => m () +blockDownloadLoop = do + + e <- ask + stor <- getStorage + + let blks = mempty + + pl <- getPeerLocator @e + + -- TODO: peer info loop + void $ liftIO $ async $ forever $ withPeerM e $ do + pause @'Seconds 20 + pee <- knownPeers @e pl + + npi <- newPeerInfo + + for_ pee $ \p -> do + pinfo <- fetch True npi (PeerInfoKey p) id + burst <- liftIO $ readTVarIO (view peerBurst pinfo) + debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst + pure () + + runDownloadM @e $ do + + env <- ask + + let again h = do + debug $ "block fucked: " <+> pretty h + withPeerM e $ withDownload env (addDownload h) + + mapM_ processBlock blks + + fix \next -> do + + h <- getBlockForDownload + + here <- liftIO $ hasBlock stor h <&> isJust + + unless here do + + void $ runMaybeT $ do + p <- MaybeT $ knownPeers @e pl >>= liftIO . shuffleM <&> headMay + + liftIO $ race ( pause defBlockWaitMax >> again h ) do + withPeerM e $ withDownload env $ do -- NOTE: really crazy shit + withFreePeer p (addDownload h >> pause (0.1 :: Timeout 'Seconds)) do + downloadFromWithPeer p h + + next + + +-- NOTE: this is an adapter for a ResponseM monad +-- because response is working in ResponseM monad (ha!) +-- So don't be confused with types +-- +mkAdapter :: forall e m . ( m ~ PeerM e IO + , HasProtocol e (BlockChunks e) + , Hashable (SessionKey e (BlockChunks e)) + , Sessions e (BlockChunks e) (ResponseM e m) + , Typeable (SessionKey e (BlockChunks e)) + , EventEmitter e (BlockChunks e) m + , Pretty (Peer e) + , Block ByteString ~ ByteString + ) + => m (BlockChunksI e (ResponseM e m )) +mkAdapter = do + storage <- getStorage + pure $ + BlockChunksI + { blkSize = liftIO . hasBlock storage + , blkChunk = \h o s -> liftIO (getChunk storage h o s) + , blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash) + + , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + let cKey = DownloadSessionKey (p,c) + dwnld <- MaybeT $ find cKey (view sBlockChunks) + liftIO $ atomically $ writeTQueue dwnld (n, bs) + } + + diff --git a/hbs2-peer/app/Logger.hs b/hbs2-peer/app/Logger.hs new file mode 100644 index 00000000..0cc3efdc --- /dev/null +++ b/hbs2-peer/app/Logger.hs @@ -0,0 +1,11 @@ +module Logger where + +import HBS2.Prelude + +import System.IO +import Prettyprinter + +debug :: (MonadIO m) => Doc ann -> m () +debug p = liftIO $ hPrint stderr p + + diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs new file mode 100644 index 00000000..fc41f853 --- /dev/null +++ b/hbs2-peer/app/PeerInfo.hs @@ -0,0 +1,39 @@ +{-# Language TemplateHaskell #-} +module PeerInfo where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto.Sessions +import HBS2.Net.Messaging.UDP +import HBS2.Clock +import HBS2.Defaults + +import Lens.Micro.Platform +import Control.Concurrent.STM.TVar + + +newtype PeerInfo e = + PeerInfo + { _peerBurst :: TVar Int + } + deriving stock (Generic,Typeable) + +makeLenses 'PeerInfo + + +newPeerInfo :: MonadIO m => m (PeerInfo e) +newPeerInfo = liftIO do + PeerInfo <$> newTVarIO defBurst + + +type instance SessionData e (PeerInfo e) = PeerInfo e + +newtype instance SessionKey e (PeerInfo e) = + PeerInfoKey (Peer e) + +deriving newtype instance Hashable (SessionKey UDP (PeerInfo UDP)) +deriving stock instance Eq (SessionKey UDP (PeerInfo UDP)) + +instance Expires (SessionKey UDP (PeerInfo UDP)) where + expiresIn = const (Just 600) + + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs new file mode 100644 index 00000000..d820ae14 --- /dev/null +++ b/hbs2-peer/app/PeerMain.hs @@ -0,0 +1,322 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language TemplateHaskell #-} +{-# Language AllowAmbiguousTypes #-} +module Main where + +import HBS2.Actors.Peer +import HBS2.Clock +import HBS2.Defaults +import HBS2.Events +import HBS2.Hash +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging.UDP +import HBS2.Net.PeerLocator +import HBS2.Net.Proto +import HBS2.Net.Proto.Definition +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.OrDie +import HBS2.Prelude.Plated +import HBS2.Storage.Simple +import HBS2.Net.Auth.Credentials + +import RPC +import BlockDownload + +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Exception as Exception +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Lens.Micro.Platform +import Options.Applicative +import Prettyprinter +import System.Directory +import System.Exit +import System.IO +import Network.Socket + +debug :: (MonadIO m) => Doc ann -> m () +debug p = liftIO $ hPrint stderr p + +defStorageThreads :: Integral a => a +defStorageThreads = 4 + +defListenUDP :: String +defListenUDP = "0.0.0.0:7351" + +defRpcUDP :: String +defRpcUDP = "localhost:13331" + +defLocalMulticast :: String +defLocalMulticast = "239.192.152.145:10153" + +data RPCCommand = + PING + | ANNOUNCE (Hash HbSync) + +data PeerOpts = + PeerOpts + { _storage :: Maybe StoragePrefix + , _listenOn :: String + , _listenRpc :: String + , _peerCredFile :: FilePath + } + deriving stock (Data) + +makeLenses 'PeerOpts + +deriving newtype instance Hashable (SessionKey UDP (BlockChunks UDP)) +deriving stock instance Eq (SessionKey UDP (BlockChunks UDP)) + +main :: IO () +main = join . customExecParser (prefs showHelpOnError) $ + info (helper <*> parser) + ( fullDesc + <> header "hbs2-peer daemon" + <> progDesc "serves HBS2 protocol" + ) + where + parser :: Parser (IO ()) + parser = hsubparser ( command "run" (info pRun (progDesc "run peer")) + <> command "ping" (info pPing (progDesc "ping peer via rpc")) + <> command "announce" (info pAnnounce (progDesc "announce block")) + ) + + common = do + pref <- optional $ strOption ( short 'p' <> long "prefix" + <> help "storage prefix" ) + + l <- strOption ( short 'l' <> long "listen" + <> help "addr:port" + <> value defListenUDP ) + + r <- strOption ( short 'r' <> long "rpc" + <> help "addr:port" + <> value defRpcUDP ) + + k <- strOption ( short 'k' <> long "key" + <> help "peer keys file" + ) + + + pure $ PeerOpts pref l r k + + pRun = do + runPeer <$> common + + pRpcCommon = do + strOption ( short 'r' <> long "rpc" + <> help "addr:port" + <> value defRpcUDP + ) + + pPing = do + rpc <- pRpcCommon + pure $ runRpcCommand rpc PING + + pAnnounce = do + rpc <- pRpcCommon + h <- strArgument ( metavar "HASH" ) + pure $ runRpcCommand rpc (ANNOUNCE h) + + +myException :: SomeException -> IO () +myException e = die ( show e ) >> exitFailure + + +newtype CredentialsM e m a = + CredentialsM { fromCredentials :: ReaderT (PeerCredentials e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (PeerCredentials e) + , MonadTrans) + +withCredentials :: forall e m a . (HasOwnPeer e m, Monad m) + => PeerCredentials e + -> CredentialsM e m a -> m a + +withCredentials pc m = runReaderT (fromCredentials m) pc + +instance (HasOwnPeer e m) => HasOwnPeer e (CredentialsM e m) where + ownPeer = lift ownPeer + +instance (Monad m, HasFabriq e m) => HasFabriq e (CredentialsM e m) where + getFabriq = lift getFabriq + +instance (Sessions e p m ) => Sessions e p (CredentialsM e m) where + find k f = lift (find k f) + fetch i d k f = lift (fetch i d k f) + update d k f = lift (update d k f) + expire k = lift (expire k) + +instance Monad m => HasCredentials e (CredentialsM e m) where + getCredentials = ask + +instance Monad m => HasCredentials e (ResponseM e (CredentialsM e m)) where + getCredentials = lift getCredentials + +runPeer :: () => PeerOpts -> IO () +runPeer opts = Exception.handle myException $ do + + + rpcQ <- newTQueueIO @RPCCommand + + let ps = mempty + + pc' <- LBS.readFile (view peerCredFile opts) + <&> parseCredentials @UDP . AsCredFile + . LBS.toStrict + . LBS.take 4096 + + pc <- pure pc' `orDie` "can't parse credential file" + + debug $ "run peer" <+> pretty (AsBase58 (view peerSignPk pc)) + + xdg <- getXdgDirectory XdgData defStorePath <&> fromString + + let pref = uniLastDef xdg (view storage opts) :: StoragePrefix + + s <- simpleStorageInit @HbSync (Just pref) + let blk = liftIO . hasBlock s + + w <- replicateM defStorageThreads $ async $ simpleStorageWorker s + + localMulticast <- (headMay <$> parseAddr (fromString defLocalMulticast) + <&> fmap (PeerUDP . addrAddress)) + + `orDie` "assertion: localMulticastPeer not set" + + mess <- newMessagingUDP False (Just (view listenOn opts)) + `orDie` "unable listen on the given addr" + + udp <- async $ runMessagingUDP mess + `catch` (\(e::SomeException) -> throwIO e ) + + udp1 <- newMessagingUDP False (Just (view listenRpc opts)) + `orDie` "Can't start RPC listener" + + mrpc <- async $ runMessagingUDP udp1 + `catch` (\(e::SomeException) -> throwIO e ) + + mcast <- newMessagingUDPMulticast defLocalMulticast + `orDie` "Can't start RPC listener" + + messMcast <- async $ runMessagingUDP mcast + `catch` (\(e::SomeException) -> throwIO e ) + + loop <- async do + + runPeerM (AnyStorage s) (Fabriq mess) (getOwnPeer mess) $ do + adapter <- mkAdapter + env <- ask + + pl <- getPeerLocator @UDP + + addPeers @UDP pl ps + + as <- liftIO $ async $ withPeerM env blockDownloadLoop + + rpc <- liftIO $ async $ withPeerM env $ forever $ do + cmd <- liftIO $ atomically $ readTQueue rpcQ + case cmd of + PING -> debug "got ping" + + ANNOUNCE h -> do + debug $ "got announce rpc" <+> pretty h + sto <- getStorage + mbsize <- liftIO $ hasBlock sto h + + maybe1 mbsize (pure ()) $ \size -> do + let ann = BlockAnnounceInfo 0 NoBlockInfoMeta size h + request localMulticast (BlockAnnounce @UDP ann) + + me <- liftIO $ async $ withPeerM env $ do + runProto @UDP + [ makeResponse (blockSizeProto blk dontHandle) + , makeResponse (blockChunksProto adapter) + , makeResponse blockAnnounceProto + ] + + poo <- liftIO $ async $ withPeerM env $ withCredentials pc $ do + runProto @UDP + [ makeResponse peerHandShakeProto + ] + + void $ liftIO $ waitAnyCatchCancel [me,poo,as] + + let pingAction _ = do + liftIO $ atomically $ writeTQueue rpcQ PING + + let annAction h = do + liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h) + + let arpc = RpcAdapter pingAction + dontHandle + annAction + + rpc <- async $ runRPC udp1 do + runProto @UDP + [ makeResponse (rpcHandler arpc) + ] + + ann <- async $ runPeerM (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) $ do + + self <- ownPeer @UDP + + subscribe @UDP BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi) -> do + unless (p == self) do + debug $ "announce" <+> pretty p + <+> pretty (view biHash bi) + + runProto @UDP + [ makeResponse blockAnnounceProto + ] + + void $ waitAnyCatchCancel $ w <> [udp,loop,rpc,mrpc,ann,messMcast] + + simpleStorageStop s + + +withRPC :: String -> RPC UDP -> IO () +withRPC saddr cmd = do + + rpc' <- headMay <$> parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress) + + rpc <- pure rpc' `orDie` "Can't parse RPC endpoing" + + udp1 <- newMessagingUDP False Nothing `orDie` "Can't start RPC" + + mrpc <- async $ runMessagingUDP udp1 + + prpc <- async $ runRPC udp1 do + env <- ask + proto <- liftIO $ async $ continueWithRPC env $ do + runProto @UDP + [ makeResponse (rpcHandler adapter) + ] + + request rpc cmd + + case cmd of + RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + + _ -> pure () + + void $ liftIO $ waitAnyCatchCancel [proto] + + void $ waitAnyCatchCancel [mrpc, prpc] + + where + adapter = RpcAdapter dontHandle + (const $ debug "pong" >> liftIO exitSuccess) + (const $ liftIO exitSuccess) + +runRpcCommand :: String -> RPCCommand -> IO () +runRpcCommand saddr = \case + PING -> withRPC saddr (RPCPing @UDP) + ANNOUNCE h -> withRPC saddr (RPCAnnounce @UDP h) + diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs new file mode 100644 index 00000000..2d51e0c0 --- /dev/null +++ b/hbs2-peer/app/RPC.hs @@ -0,0 +1,92 @@ +{-# Language TemplateHaskell #-} +module RPC where + + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Hash +import HBS2.Net.Messaging +import HBS2.Net.Messaging.UDP +import HBS2.Actors.Peer +import HBS2.Defaults + +import Logger + +import Control.Concurrent.Async +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Codec.Serialise (serialise, deserialiseOrFail,Serialise) +import Lens.Micro.Platform + +import Prettyprinter + +data RPC e = + RPCPing + | RPCPong + | RPCAnnounce (Hash HbSync) + deriving stock (Eq,Generic,Show) + + +instance Serialise (RPC e) + +instance HasProtocol UDP (RPC UDP) where + type instance ProtocolId (RPC UDP) = 0xFFFFFFE0 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + + +data RPCEnv = + RPCEnv + { _rpcSelf :: Peer UDP + , _rpcFab :: Fabriq UDP + } + +makeLenses 'RPCEnv + +data RpcAdapter e m = + RpcAdapter + { rpcOnPing :: RPC e -> m () + , rpcOnPong :: RPC e -> m () + , rpcOnAnnounce :: Hash HbSync -> m () + } + +newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader RPCEnv + , MonadTrans + ) + +runRPC :: ( MonadIO m + , PeerMessaging UDP + ) + => MessagingUDP -> RpcM m a -> m a + +runRPC udp m = runReaderT (fromRpcM m) (RPCEnv pip (Fabriq udp)) + where + pip = getOwnPeer udp + +continueWithRPC :: RPCEnv -> RpcM m a -> m a +continueWithRPC e m = runReaderT (fromRpcM m) e + +instance Monad m => HasFabriq UDP (RpcM m) where + getFabriq = asks (view rpcFab) + +instance Monad m => HasOwnPeer UDP (RpcM m) where + ownPeer = asks (view rpcSelf) + +rpcHandler :: forall e m . ( MonadIO m + , Response e (RPC e) m + , HasProtocol e (RPC e) + ) + => RpcAdapter e m -> RPC e -> m () + +rpcHandler adapter = \case + p@RPCPing{} -> rpcOnPing adapter p >> response (RPCPong @e) + p@RPCPong{} -> rpcOnPong adapter p + (RPCAnnounce h) -> rpcOnAnnounce adapter h + + diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal new file mode 100644 index 00000000..0b4b43b9 --- /dev/null +++ b/hbs2-peer/hbs2-peer.cabal @@ -0,0 +1,113 @@ +cabal-version: 3.0 +name: hbs2-peer +version: 0.1.0.0 +-- synopsis: +-- description: +license: BSD-3-Clause +license-file: LICENSE +-- author: +-- maintainer: +-- copyright: +category: Network +build-type: Simple +extra-doc-files: CHANGELOG.md +-- extra-source-files: + +common warnings + ghc-options: -Wall + +common common-deps + build-depends: + base ^>=4.15.1.0, hbs2-core, hbs2-storage-simple + , async + , bytestring + , cache + , containers + , data-default + , deepseq + , directory + , filepath + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-multicast + , optparse-applicative + , prettyprinter + , random + , random-shuffle + , safe + , serialise + , split + , stm + , streaming + , temporary + , text + , timeit + , transformers + , uniplate + , unordered-containers + , vector + +common shared-properties + ghc-options: + -Wall + -O2 + -fno-warn-type-defaults + -- -fno-warn-unused-matches + -- -fno-warn-unused-do-bind + -- -Werror=missing-methods + -- -Werror=incomplete-patterns + -- -fno-warn-unused-binds + -threaded + -rtsopts + "-with-rtsopts=-N4 -A256m -AL256m -I0" + + + default-language: Haskell2010 + + default-extensions: + ApplicativeDo + , BangPatterns + , BlockArguments + , ConstraintKinds + , DataKinds + , DeriveDataTypeable + , DeriveGeneric + , DerivingStrategies + , DerivingVia + , ExtendedDefaultRules + , FlexibleContexts + , FlexibleInstances + , GADTs + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , MultiParamTypeClasses + , OverloadedStrings + , QuasiQuotes + , ScopedTypeVariables + , StandaloneDeriving + , TupleSections + , TypeApplications + , TypeFamilies + + + +executable hbs2-peer + import: shared-properties + import: common-deps + main-is: PeerMain.hs + + other-modules: BlockDownload + , PeerInfo + , Logger + , RPC + + -- other-extensions: + build-depends: base ^>=4.15.1.0 + hs-source-dirs: app + default-language: Haskell2010 + + diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index c03756a1..798b9408 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -3,8 +3,17 @@ {-# Language UndecidableInstances #-} module HBS2.Storage.Simple ( module HBS2.Storage.Simple + , StoragePrefix(..) + , Storage(..) + , Block ) where +import HBS2.Clock +import HBS2.Hash +import HBS2.Prelude.Plated +import HBS2.Storage +import HBS2.Base58 + import Control.Concurrent.Async import Control.Exception import Control.Monad @@ -35,10 +44,6 @@ import Control.Concurrent.STM.TBMQueue (TBMQueue) import Control.Concurrent.STM.TVar qualified as TV -import HBS2.Clock -import HBS2.Hash -import HBS2.Prelude.Plated -import HBS2.Storage -- NOTE: random accessing files in a git-like storage -- causes to file handles exhaust. @@ -103,10 +108,12 @@ touchForRead ss k = liftIO $ do mmaped = ss ^. storageMMaped -simpleStorageInit :: (MonadIO m, Data opts, IsSimpleStorageKey h) => opts -> m (SimpleStorage h) +simpleStorageInit :: forall h m opts . (MonadIO m, Data opts, IsSimpleStorageKey h) + => opts -> m (SimpleStorage h) + simpleStorageInit opts = liftIO $ do let prefix = uniLastDef "." opts :: StoragePrefix - let qSize = uniLastDef 2000 opts :: StorageQueueSize + let qSize = uniLastDef 2000 opts :: StorageQueueSize -- FIXME: defaults ? stor <- SimpleStorage <$> canonicalizePath (fromPrefix prefix) diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 9c70e346..00be925e 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -24,9 +24,9 @@ common common-deps , cache , containers , data-default + , deepseq , directory , filepath - , deepseq , hashable , microlens-platform , mtl @@ -37,6 +37,7 @@ common common-deps , random-shuffle , safe , serialise + , split , stm , streaming , tasty @@ -164,3 +165,52 @@ executable test-peer-run , uniplate , vector +executable test-udp + import: shared-properties + import: common-deps + default-language: Haskell2010 + + ghc-options: + -- -prof + -- -fprof-auto + + other-modules: + + -- other-extensions: + + -- type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: TestUDP.hs + + build-depends: + base ^>=4.15.1.0, hbs2-core, hbs2-storage-simple + , async + , attoparsec + , bytestring + , cache + , clock + , containers + , data-default + , data-textual + , directory + , hashable + , microlens-platform + , mtl + , mwc-random + , network + , network-ip + , prettyprinter + , QuickCheck + , random + , safe + , serialise + , stm + , streaming + , tasty + , tasty-hunit + , text + , transformers + , uniplate + , vector + + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index d666af9c..951c0d35 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -5,8 +5,10 @@ {-# LANGUAGE MultiWayIf #-} module Main where -import HBS2.Actors.ChunkWriter +import HBS2.Prelude +import HBS2.Hash import HBS2.Actors +import HBS2.Actors.ChunkWriter import HBS2.Actors.Peer import HBS2.Clock import HBS2.Data.Detect @@ -14,20 +16,22 @@ import HBS2.Data.Types import HBS2.Defaults import HBS2.Events import HBS2.Merkle -import HBS2.Net.Messaging.Fake +import HBS2.Net.Messaging.UDP import HBS2.Net.PeerLocator import HBS2.Net.Proto -import HBS2.Net.Messaging import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo +import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Sessions +import HBS2.OrDie import HBS2.Prelude.Plated import HBS2.Storage import HBS2.Storage.Simple import Test.Tasty.HUnit +import System.Random.Shuffle import Codec.Serialise hiding (encode,decode) import Control.Concurrent.Async import Control.Concurrent.STM @@ -51,22 +55,66 @@ import System.IO import Data.Hashable import Type.Reflection import Data.Fixed +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HashMap +import Data.List qualified as List +import Data.List.Split (chunksOf) + +import Data.IntMap (IntMap) +import Data.IntMap qualified as IntMap +import Data.IntSet qualified as IntSet import Data.Dynamic debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p -data Fake + +calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] +calcBursts bu pieces = go seed + where + seed = fmap (,1) pieces + + go ( (n1,s1) : (n2,s2) : xs ) + | (s1 + s2) <= bu = go ((n1, s1+s2) : xs) + | otherwise = (n1,s1) : go ( (n2,s2) : xs) + + go [x] = [x] + go [] = [] + +type Fake = UDP + +newtype PeerInfo e = + PeerInfo + { _peerBurst :: TVar Int + } + deriving stock (Generic,Typeable) + +makeLenses 'PeerInfo + + +newPeerInfo :: MonadIO m => m (PeerInfo e) +newPeerInfo = liftIO do + PeerInfo <$> newTVarIO defBurst + + +type instance SessionData e (PeerInfo e) = PeerInfo e + +newtype instance SessionKey e (PeerInfo e) = + PeerInfoKey (Peer e) + +deriving newtype instance Hashable (SessionKey Fake (PeerInfo Fake)) +deriving stock instance Eq (SessionKey Fake (PeerInfo Fake)) + +instance Expires (SessionKey Fake (PeerInfo Fake)) where + expiresIn = const (Just 600) data BlockDownload = BlockDownload { _sBlockHash :: Hash HbSync , _sBlockSize :: Size , _sBlockChunkSize :: ChunkSize - , _sBlockOffset :: Offset - , _sBlockWritten :: Size - , _sBlockWrittenT :: TVar Size + , _sBlockChunks :: TQueue (ChunkNum, ByteString) } deriving stock (Typeable) @@ -74,46 +122,8 @@ makeLenses 'BlockDownload newBlockDownload :: MonadIO m => Hash HbSync -> m BlockDownload newBlockDownload h = do - t <- liftIO $ newTVarIO 0 - pure $ BlockDownload h 0 0 0 0 t + BlockDownload h 0 0 <$> liftIO newTQueueIO -instance HasPeer Fake where - newtype instance Peer Fake = FakePeer Word8 - deriving newtype (Hashable,Num,Enum,Real,Integral) - deriving stock (Eq,Ord,Show) - - -instance Pretty (Peer Fake) where - pretty (FakePeer n) = parens ("peer" <+> pretty n) - - -instance HasProtocol Fake (BlockInfo Fake) where - type instance ProtocolId (BlockInfo Fake) = 1 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - --- FIXME: 3 is for debug only! -instance Expires (EventKey Fake (BlockInfo Fake)) where - expiresIn _ = Just 600 - -instance Expires (EventKey Fake (BlockChunks Fake)) where - expiresIn _ = Just 600 - -instance Expires (EventKey Fake (BlockAnnounce Fake)) where - expiresIn _ = Nothing - -instance HasProtocol Fake (BlockChunks Fake) where - type instance ProtocolId (BlockChunks Fake) = 2 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - -instance HasProtocol Fake (BlockAnnounce Fake) where - type instance ProtocolId (BlockAnnounce Fake) = 3 - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise type instance SessionData e (BlockInfo e) = BlockSizeSession e type instance SessionData e (BlockChunks e) = BlockDownload @@ -137,31 +147,34 @@ deriving stock instance Show (BlockSizeSession Fake) deriving newtype instance Hashable (SessionKey Fake (BlockChunks Fake)) deriving stock instance Eq (SessionKey Fake (BlockChunks Fake)) -runTestPeer :: Peer Fake +runTestPeer :: (Key HbSync ~ Hash HbSync, Storage (SimpleStorage HbSync) HbSync ByteString (ResponseM Fake (PeerM Fake IO))) + => MessagingUDP + -> Peer Fake -> (SimpleStorage HbSync -> ChunkWriter HbSync IO -> IO ()) -> IO () -runTestPeer p zu = do +runTestPeer mess p zu = do - dir <- liftIO $ canonicalizePath ( ".peers" show (fromIntegral p :: Int)) + dir <- liftIO $ canonicalizePath ( ".peers" show (pretty (AsFileName p))) let chDir = dir "tmp-chunks" liftIO $ createDirectoryIfMissing True dir let opts = [ StoragePrefix dir ] + udp <- async $ runMessagingUDP mess stor <- simpleStorageInit opts cww <- newChunkWriterIO stor (Just chDir) sw <- liftIO $ replicateM 8 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 16 $ async $ runChunkWriter cww + cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww zu stor cww simpleStorageStop stor stopChunkWriter cww - mapM_ cancel $ sw <> cw + mapM_ cancel $ sw <> cw <> [udp] handleBlockInfo :: forall e m . ( MonadIO m @@ -180,7 +193,6 @@ handleBlockInfo (p, h, sz') = do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) -data DownloadTask e = DownloadTask (Hash HbSync) (Maybe (Peer e, Integer)) data Stats e = Stats @@ -212,16 +224,6 @@ instance Typeable (SessionKey e (Stats e)) => Hashable (SessionKey e (Stats e)) p = Proxy @(SessionKey e (Stats e)) --- FIXME: for some reason Session typeclass --- requires HasProtocol. --- It seems somehow logical. But not convenient - -instance HasProtocol Fake (Stats Fake) where - type instance ProtocolId (Stats Fake) = 0xFFFFFFFE - type instance Encoded Fake = ByteString - decode = either (const Nothing) Just . deserialiseOrFail - encode = serialise - newtype Speed = Speed (Fixed E1) deriving newtype (Ord, Eq, Num, Real, Fractional, Show) @@ -257,188 +259,328 @@ updateStats updTime blknum = do pure newStats +data DownloadEnv e = + DownloadEnv + { _downloadQ :: TQueue (Hash HbSync) + , _peerBusy :: TVar (HashMap (Peer e) ()) + } + +makeLenses 'DownloadEnv + +class (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e +instance (Eq (Peer e), Hashable (Peer e), Pretty (Peer e)) => MyPeer e + +newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e) +newDownloadEnv = liftIO do + DownloadEnv <$> newTQueueIO + <*> newTVarIO mempty + +newtype BlockDownloadM e m a = + BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (DownloadEnv e) + , MonadTrans + ) + +runDownloadM :: (MyPeer e, MonadIO m) => BlockDownloadM e m a -> m a +runDownloadM m = runReaderT ( fromBlockDownloadM m ) =<< newDownloadEnv + +withDownload :: (MyPeer e, MonadIO m) => DownloadEnv e -> BlockDownloadM e m a -> m a +withDownload e m = runReaderT ( fromBlockDownloadM m ) e + +addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m () +addDownload h = do + q <- asks (view downloadQ) + liftIO $ atomically $ writeTQueue q h + -- debug $ "addDownload" <+> pretty h + -- pause ( 0.25 :: Timeout 'Seconds ) + +withFreePeer :: (MyPeer e, MonadIO m) + => Peer e + -> BlockDownloadM e m () + -> BlockDownloadM e m () + -> BlockDownloadM e m () + +withFreePeer p n m = do + busy <- asks (view peerBusy) + avail <- liftIO $ atomically + $ stateTVar busy $ + \s -> case HashMap.lookup p s of + Nothing -> (True, HashMap.insert p () s) + Just{} -> (False, s) + if not avail + then n + else do + r <- m + liftIO $ atomically $ modifyTVar busy $ HashMap.delete p + pure r + +getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync) +getBlockForDownload = do + q <- asks (view downloadQ) + liftIO $ atomically $ readTQueue q + +processBlock :: forall e m . ( MonadIO m + , HasStorage m + , Block ByteString ~ ByteString + ) + => Hash HbSync + -> BlockDownloadM e m () + +processBlock h = do + + sto <- lift getStorage + + bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) + + case bt of + Nothing -> addDownload h + + Just (AnnRef{}) -> pure () + + Just (Merkle{}) -> do + debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h + walkMerkle h (liftIO . getBlock sto) $ \(hr :: [HashRef]) -> do + + for_ hr $ \(HashRef blk) -> do + + -- debug $ pretty blk + + here <- liftIO (hasBlock sto blk) <&> isJust + + if here then do + debug $ "block" <+> pretty blk <+> "is already here" + pure () -- we don't need to recurse, cause walkMerkle is recursing for us + + else + addDownload blk + + Just (Blob{}) -> do + pure () + + +downloadFromWithPeer :: forall e m . ( MyPeer e + , MonadIO m + , Request e (BlockInfo e) m + , Request e (BlockChunks e) m + , MonadReader (PeerEnv e ) m + , PeerMessaging e + , HasProtocol e (BlockInfo e) + , EventListener e (BlockInfo e) m + , EventListener e (BlockChunks e) m + , Sessions e (BlockChunks e) m + , Sessions e (PeerInfo e) m + , Block ByteString ~ ByteString + , HasStorage m + ) + => Peer e + -> Hash HbSync + -> BlockDownloadM e m () +downloadFromWithPeer peer h = do + + + npi <- newPeerInfo + pinfo <- lift $ fetch True npi (PeerInfoKey peer) id + + waitSize <- liftIO $ newTBQueueIO 1 + + lift $ do + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do + when ( p1 == peer ) $ do + liftIO $ atomically $ writeTBQueue waitSize s + + request @e peer (GetBlockSize @e h) + + esize <- liftIO $ race ( pause defBlockInfoTimeout ) do -- FIXME: block size wait time + atomically $ readTBQueue waitSize + + let mbSize = either (const Nothing) Just esize + + sto <- lift $ getStorage + + case mbSize of + Nothing -> void $ addDownload h + Just thisBkSize -> do + + coo <- genCookie (peer,h) + let key = DownloadSessionKey (peer, coo) + let chusz = defChunkSize + dnwld <- newBlockDownload h + let chuQ = view sBlockChunks dnwld + let new = set sBlockChunkSize chusz + . set sBlockSize (fromIntegral thisBkSize) + $ dnwld + + lift $ update @e new key id + + let burstSizeT = view peerBurst pinfo + + burstSize <- liftIO $ readTVarIO burstSizeT + + let offsets = calcChunks thisBkSize (fromIntegral chusz) :: [(Offset, Size)] + + let chunkNums = [ 0 .. pred (length offsets) ] + + let bursts = calcBursts burstSize chunkNums + + -- debug $ "bursts: " <+> pretty bursts + + r <- liftIO $ newTVarIO (mempty :: IntMap ByteString) + rq <- liftIO newTQueueIO + + for_ bursts $ liftIO . atomically . writeTQueue rq + + fix \next -> do + burst <- liftIO $ atomically $ tryReadTQueue rq + + case burst of + + Just (i,chunksN) -> do + let req = BlockGetChunks h chusz (fromIntegral i) (fromIntegral chunksN) + lift $ request peer (BlockChunks @e coo req) + + -- TODO: here wait for all requested chunks! + -- FIXME: it may blocks forever, so must be timeout and retry + + catched <- either id id <$> liftIO ( race ( pause defChunkWaitMax >> pure mempty ) + ( replicateM chunksN + $ atomically + $ readTQueue chuQ ) + + ) + when (null catched) $ do + + -- nerfing peer burst size. + -- FIXME: we need a thread that will be reset them again + + newBurst <- liftIO $ atomically + $ stateTVar burstSizeT $ \c -> let v = max 1 (c `div` 2) + in (v,v) + + let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ] + + debug $ "new burst: " <+> pretty newBurst + debug $ "missed chunks for request" <+> pretty (i,chunksN) + + for_ chuchu $ liftIO . atomically . writeTQueue rq + + for_ catched $ \(num,bs) -> do + liftIO $ atomically $ modifyTVar' r (IntMap.insert (fromIntegral num) bs) + + next + + Nothing -> do + + sz <- liftIO $ readTVarIO r <&> IntMap.size + + if sz == length offsets then do + pieces <- liftIO $ readTVarIO r <&> IntMap.elems + let block = mconcat pieces + let h1 = hashObject @HbSync block + + if h1 == h then do + -- debug "PROCESS BLOCK" + lift $ expire @e key + void $ liftIO $ putBlock sto block + void $ processBlock h + else do + debug "HASH NOT MATCH" + debug "MAYBE THAT PEER IS JERK" + + else do + debug "RETRY BLOCK DOWNLOADING / ASK FOR MISSED CHUNKS" + got <- liftIO $ readTVarIO r <&> IntMap.keysSet + let need = IntSet.fromList (fmap fromIntegral chunkNums) + + let missed = IntSet.toList $ need `IntSet.difference` got + + -- normally this should not happen + -- however, let's try do download the tails + -- by one chunk a time + for_ missed $ \n -> do + liftIO $ atomically $ writeTQueue rq (n,1) + + +instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where + getPeerLocator = lift getPeerLocator blockDownloadLoop :: forall e m . ( m ~ PeerM e IO - , MonadIO m + , MonadIO m , Request e (BlockInfo e) m , Request e (BlockAnnounce e) m , HasProtocol e (BlockInfo e) , HasProtocol e (BlockAnnounce e) + , HasProtocol e (BlockChunks e) , EventListener e (BlockInfo e) m , EventListener e (BlockChunks e) m , EventListener e (BlockAnnounce e) m + , EventEmitter e (BlockChunks e) m , Sessions e (BlockInfo e) m , Sessions e (BlockChunks e) m - , Sessions e (Stats e) m + , Sessions e (PeerInfo e) m + , PeerSessionKey e (PeerInfo e) , Typeable (SessionKey e (BlockChunks e)) , Typeable (SessionKey e (BlockInfo e)) , HasStorage m - , Num (Peer e) , Pretty (Peer e) , Block ByteString ~ ByteString , PeerMessaging e ) - => ChunkWriter HbSync IO -> m () -blockDownloadLoop cw = do + => m () +blockDownloadLoop = do + e <- ask stor <- getStorage - stats0 <- newStatsIO - let blks = [ "GTtQp6QjK7G9Sh5Aq4koGSpMX398WRWn3DV28NUAYARg" - , "5LoU2EVq7JSpiT9FmLEakVHxpsE989XnX6jE4gaUcLFA" - , "CotHSTLrg8T5NrYxyhG1AeJrdz1s4A5PdtA95Fh96JX8" - , "ANHxB2dUcSFDB7W7JuuqkSjAUXWyekVKdQLqNBhFKGgj" , "ECWYwWXiLgNvCkN1EFpSYqsPcWfnL4bAQADsyZgy1Cbr" ] - blq <- liftIO $ Q.newTBQueueIO defBlockDownloadQ - for_ blks $ \b -> liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask b Nothing) + pl <- getPeerLocator @e - subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p ann) -> do - let h = view biHash ann - let s = view biSize ann + -- TODO: peer info loop + void $ liftIO $ async $ forever $ withPeerM e $ do + pause @'Seconds 20 + pee <- knownPeers @e pl - debug $ "BLOCK ANNOUNCE!" <+> pretty p - <+> pretty h - <+> pretty (view biSize ann) + npi <- newPeerInfo - liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask h (Just (p,s))) + for_ pee $ \p -> do + pinfo <- fetch True npi (PeerInfoKey p) id + burst <- liftIO $ readTVarIO (view peerBurst pinfo) + debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst + pure () - env <- ask + runDownloadM @e $ do - void $ liftIO $ async $ forever $ withPeerM env $ do - wip <- liftIO $ blocksInProcess cw + env <- ask - stats <- fetch @e True stats0 StatsKey id - t2 <- liftIO $ getTime Monotonic + let again h = do + debug $ "block fucked: " <+> pretty h + withPeerM e $ withDownload env (addDownload h) - let tdiff = realToFrac (toNanoSecs t2 - toNanoSecs (view timeLast stats)) / 1e9 - let blkdiff = realToFrac $ view blkNum stats - view blkNumLast stats - let speed = if tdiff > 0 then blkdiff / tdiff else 0 :: Speed - void $ updateStats @e True 0 - debug $ "I'm alive!:" <+> pretty wip <+> pretty speed - pause ( 5 :: Timeout 'Seconds ) + mapM_ processBlock blks - fix \next -> do + fix \next -> do - ejob <- liftIO $ race ( pause ( 5 :: Timeout 'Seconds) ) - ( atomically $ Q.readTBQueue blq ) + h <- getBlockForDownload - let job = either (const Nothing) Just ejob + here <- liftIO $ hasBlock stor h <&> isJust - wip <- liftIO $ blocksInProcess cw + unless here do - if wip > 200 then do - pause ( 1 :: Timeout 'Seconds ) - else do - case job of - Nothing -> pure () + void $ runMaybeT $ do + p <- MaybeT $ knownPeers @e pl >>= liftIO . shuffleM <&> headMay - Just (DownloadTask hx (Just (p,s))) -> do - initDownload True blq p hx s + liftIO $ race ( pause defBlockWaitMax >> again h ) do + withPeerM e $ withDownload env $ do -- NOTE: really crazy shit + withFreePeer p (addDownload h >> pause (0.1 :: Timeout 'Seconds)) do + downloadFromWithPeer p h - Just (DownloadTask h Nothing) -> do - - peers <- getPeerLocator @e >>= knownPeers @e - - for_ peers $ \peer -> do - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p,hx,s)) -> do - liftIO $ atomically $ Q.writeTBQueue blq (DownloadTask hx (Just (p,s))) - - -- debug $ "requesting size for" <+> pretty h - request @e peer (GetBlockSize @e h) - - next - - where - - initDownload anyway q p h thisBkSize = do - - env <- ask - - -- debug $ "initDownload" <+> pretty h <+> pretty p <+> pretty thisBkSize - - sto <- getStorage - here <- liftIO $ hasBlock sto h <&> isJust - - if | not here -> do - - coo <- genCookie (p,h) - let key = DownloadSessionKey (p, coo) - let chusz = defChunkSize - dnwld <- newBlockDownload h - let new = set sBlockChunkSize chusz - . set sBlockSize (fromIntegral thisBkSize) - $ dnwld - - update @e new key id - - subscribe @e (BlockChunksEventKey (coo,h)) $ \(BlockReady _) -> do - processBlock q h - - liftIO $ async $ do - -- -- FIXME: block is not downloaded, return it to the Q - void $ withPeerM env $ do - pause defBlockWaitMax - w <- find @e key (view sBlockWrittenT) - maybe1 w (pure ()) \_ -> do - h1 <- liftIO $ getHash cw key h - if h1 == Just h then do - liftIO $ commitBlock cw key h - expire @e key - else do - debug $ "Block lost" <+> pretty (p,coo) <+> pretty h - liftIO $ atomically $ Q.writeTBQueue q (DownloadTask h Nothing) - - request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction - - | anyway -> processBlock q h - - | otherwise -> do - debug $ "already got " <+> pretty h <+> " so relax" - pure () - - processBlock q h = do - - env <- ask - pip <- asks (view envDeferred) - -- debug "process block!" - liftIO $ addJob pip $ withPeerM env $ do - - sto <- getStorage - -- liftIO $ async $ debug $ "GOT BLOCK!" <+> pretty h - bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h) - -- debug $ pretty (show bt) - - case bt of - Nothing -> do - liftIO $ atomically $ Q.writeTBQueue q (DownloadTask h Nothing) - -- debug $ "NO FUCKING BLOCK FOUND!" - pure () - - Just (AnnRef{}) -> do - pure () - - Just (Merkle{}) -> liftIO do - debug $ "GOT MERKLE. requesting nodes/leaves" <+> pretty h - walkMerkle h (getBlock sto) $ \(hr :: [HashRef]) -> do - - for_ hr $ \(HashRef blk) -> do - - here <- liftIO $ hasBlock sto blk <&> isJust - - if here then do - debug $ "block" <+> pretty blk <+> "is already here" - pure () -- we don't need to recurse, cause walkMerkle is recursing for us - - else do - -- if block is missed, then - -- block to download q - liftIO $ atomically $ Q.writeTBQueue q (DownloadTask blk Nothing) - - Just (Blob{}) -> do - pure () + next -- NOTE: this is an adapter for a ResponseM monad @@ -449,15 +591,14 @@ mkAdapter :: forall e m . ( m ~ PeerM e IO , HasProtocol e (BlockChunks e) , Hashable (SessionKey e (BlockChunks e)) , Sessions e (BlockChunks e) (ResponseM e m) - , Sessions e (Stats e) (ResponseM e m) , Typeable (SessionKey e (BlockChunks e)) , Default (SessionData e (Stats e)) , EventEmitter e (BlockChunks e) m , Pretty (Peer e) , Block ByteString ~ ByteString ) - => ChunkWriter HbSync IO -> m (BlockChunksI e (ResponseM e m )) -mkAdapter cww = do + => m (BlockChunksI e (ResponseM e m )) +mkAdapter = do storage <- getStorage pure $ BlockChunksI @@ -465,12 +606,6 @@ mkAdapter cww = do , blkChunk = \h o s -> liftIO (getChunk storage h o s) , blkGetHash = \c -> find (DownloadSessionKey @e c) (view sBlockHash) - -- КАК ТОЛЬКО ПРИНЯЛИ ВСЕ ЧАНКИ (ПРИШЁЛ ПОСЛЕДНИЙ ЧАНК): - -- СЧИТАЕМ ХЭШ ТОГО, ЧТО ПОЛУЧИЛОСЬ - -- ЕСЛИ ПОЛУЧИЛОСЬ ХОРОШО --- ТО: - -- ПЕРЕЗАПИСЫВАЕМ БЛОК В СТОРЕЙДЖ - -- ГОВОРИМ ОЖИДАЮЩЕЙ СТОРОНЕ, ЧТО БЛОК ПРИНЯТ? - -- УДАЛЯЕМ КУКУ? , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do -- debug "AAAA!" @@ -486,111 +621,32 @@ mkAdapter cww = do when (isNothing ddd) $ do debug "SESSION NOT FOUND!" - dwnld <- MaybeT $ find cKey id + dwnld <- MaybeT $ find cKey (view sBlockChunks) - -- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure - - -- debug "session found!" - - let bslen = fromIntegral $ B8.length bs - - let mbChSize = view sBlockChunkSize dwnld - let mbSize = view sBlockSize dwnld - - let offset0 = fromIntegral n * fromIntegral mbChSize :: Offset - - liftIO $ do - writeChunk cww cKey h offset0 bs - - let written = view sBlockWritten dwnld + bslen - let maxOff = max offset0 (view sBlockOffset dwnld) - - lift $ update dwnld cKey ( over sBlockOffset (max maxOff) - . over sBlockWritten (+ bslen) - ) - - wrt <- MaybeT $ find cKey (view sBlockWrittenT) - - liftIO $ atomically $ modifyTVar wrt (+bslen) - - wrActually <- liftIO $ readTVarIO wrt - - let mbDone = wrActually >= mbSize - -- && (maxOffLast + fromIntegral mbChSize) > fromIntegral mbSize - - when mbDone $ lift do - - deferred (Proxy @(BlockChunks e)) $ do - h1 <- liftIO $ getHash cww cKey h - -- h1 <- pure h-- liftIO $ getHash cww cKey h - - -- ПОСЧИТАТЬ ХЭШ - -- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК - -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ - if | h1 == Just h -> do - liftIO $ commitBlock cww cKey h - - updateStats @e False 1 - - expire cKey - -- debug "hash matched!" - emit @e (BlockChunksEventKey (c,h)) (BlockReady h) - - | h1 /= Just h -> do - debug "chunk receiving failed" - - | otherwise -> pure () - - - - when (written > mbSize * defBlockDownloadThreshold) $ do - debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p - lift $ expire cKey - -- ЕСЛИ ТУТ ВИСЕТЬ ДОЛГО, ТО НАС МОЖНО ДИДОСИТЬ, - -- ПОСЫЛАЯ НЕ ВСЕ БЛОКИ ЧАНКА ИЛИ ПОСЫЛАЯ ОТДЕЛЬНЫЕ - -- ЧАНКИ ПО МНОГУ РАЗ. А МЫ БУДЕМ ХЭШИ СЧИТАТЬ. - -- ТАК НЕ ПОЙДЕТ - -- ТАК ЧТО ТУТ ЖДЁМ, ДОПУСТИМ 2*mbSize и отваливаемся + liftIO $ atomically $ writeTQueue dwnld (n, bs) } + main :: IO () main = do hSetBuffering stderr LineBuffering void $ race (pause (600 :: Timeout 'Seconds)) $ do - fake <- newFakeP2P True <&> Fabriq + -- fake <- newFakeP2P True <&> Fabriq - let (p0:ps) = [0..1] :: [Peer Fake] + udp0 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" + udp1 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" + + let (p0:ps) = [getOwnPeer udp0, getOwnPeer udp1] -- others - others <- forM ps $ \p -> asyncBound $ runTestPeer p $ \s cw -> do + others <- forM ps $ \p -> async $ runTestPeer udp1 p $ \s cw -> do let findBlk = hasBlock s - -- let size = 1024*1024*1 - -- let size = 1024*1024*30 - -- g <- initialize $ U.fromList [fromIntegral p, fromIntegral size] - - -- bytes <- replicateM size $ uniformM g :: IO [Char] - - -- let blk = B8.pack bytes - - -- root <- putAsMerkle s blk - - -- rootSz <- hasBlock s (fromMerkleHash root) - - -- debug $ "I'm" <+> pretty p <+> pretty root - - runPeerM (AnyStorage s) fake p $ do - adapter <- mkAdapter cw - -- env <- ask - -- liftIO $ async $ withPeerM env $ do - -- maybe1 rootSz (pure ()) $ \rsz -> do - -- pause ( 0.001 :: Timeout 'Seconds ) - -- let info = BlockAnnounceInfo 0 NoBlockInfoMeta rsz (fromMerkleHash root) - -- let ann = BlockAnnounce @Fake info - -- request @Fake p0 ann + runPeerM (AnyStorage s) (Fabriq udp1) p $ do + adapter <- mkAdapter runProto @Fake [ makeResponse (blockSizeProto findBlk dontHandle) @@ -598,7 +654,7 @@ main = do , makeResponse blockAnnounceProto ] - our <- async $ runTestPeer p0 $ \s cw -> do + our <- async $ runTestPeer udp0 p0 $ \s cw -> do let blk = hasBlock s -- void $ async $ forever $ do @@ -606,15 +662,15 @@ main = do -- wip <- blocksInProcess cw -- debug $ "blocks wip:" <+> pretty wip - runPeerM (AnyStorage s) fake p0 $ do - adapter <- mkAdapter cw + runPeerM (AnyStorage s) (Fabriq udp0) p0 $ do + adapter <- mkAdapter env <- ask pl <- getPeerLocator @Fake addPeers @Fake pl ps - as <- liftIO $ async $ withPeerM env (blockDownloadLoop cw) + as <- liftIO $ async $ withPeerM env blockDownloadLoop me <- liftIO $ replicateM 1 $ async $ liftIO $ withPeerM env $ do runProto @Fake @@ -641,4 +697,3 @@ main = do assertBool "failed" False - diff --git a/hbs2-tests/test/TestChunkWriter.hs b/hbs2-tests/test/TestChunkWriter.hs index fe6c7e28..f18fb157 100644 --- a/hbs2-tests/test/TestChunkWriter.hs +++ b/hbs2-tests/test/TestChunkWriter.hs @@ -85,4 +85,3 @@ main = do print $ "failed" <+> pretty (sum (mconcat failed)) - diff --git a/hbs2-tests/test/TestUDP.hs b/hbs2-tests/test/TestUDP.hs new file mode 100644 index 00000000..56c42ad6 --- /dev/null +++ b/hbs2-tests/test/TestUDP.hs @@ -0,0 +1,103 @@ +{-# Language TemplateHaskell #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Net.Messaging.UDP +import HBS2.Actors.Peer +import HBS2.OrDie + +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Prettyprinter +import System.IO +import Lens.Micro.Platform + +import Codec.Serialise +import Control.Concurrent.Async + +debug :: (MonadIO m) => Doc ann -> m () +debug p = liftIO $ hPrint stderr p + + +data PingPong e = Ping Int + | Pong Int + deriving stock (Eq,Generic,Show,Read) + + +instance Serialise (PingPong e) + + +instance HasProtocol UDP (PingPong UDP) where + type instance ProtocolId (PingPong UDP) = 1 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + +pingPongHandler :: forall e m . ( MonadIO m + , Response e (PingPong e) m + , HasProtocol e (PingPong e) + ) + => PingPong e + -> m () + +pingPongHandler = \case + + Ping c -> debug ("Ping" <+> pretty c) >> response (Pong @e c) + + Pong c | c < 100000 -> debug ("Pong" <+> pretty c) >> response (Ping @e (succ c)) + | otherwise -> pure () + +data PPEnv = + PPEnv + { _ppSelf :: Peer UDP + , _ppFab :: Fabriq UDP + } + +makeLenses 'PPEnv + +newtype PingPongM m a = PingPongM { fromPingPong :: ReaderT PPEnv m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader PPEnv + , MonadTrans + ) + +runPingPong :: (MonadIO m, PeerMessaging UDP) => MessagingUDP -> PingPongM m a -> m a +runPingPong udp m = runReaderT (fromPingPong m) (PPEnv (getOwnPeer udp) (Fabriq udp)) + +instance Monad m => HasFabriq UDP (PingPongM m) where + getFabriq = asks (view ppFab) + +instance Monad m => HasOwnPeer UDP (PingPongM m) where + ownPeer = asks (view ppSelf) + + +main :: IO () +main = do + hSetBuffering stdout LineBuffering + hSetBuffering stderr LineBuffering + + udp1 <- newMessagingUDP False (Just "127.0.0.1:10001") `orDie` "Can't start listener on 10001" + udp2 <- newMessagingUDP False (Just "127.0.0.1:10002") `orDie` "Can't start listener on 10002" + + m1 <- async $ runMessagingUDP udp1 + m2 <- async $ runMessagingUDP udp2 + + p1 <- async $ runPingPong udp1 do + request (getOwnPeer udp2) (Ping @UDP (-10000)) + runProto @UDP + [ makeResponse pingPongHandler + ] + + p2 <- async $ runPingPong udp2 do + request (getOwnPeer udp1) (Ping @UDP 0) + runProto @UDP + [ makeResponse pingPongHandler + ] + + mapM_ wait [p1,p2,m1,m2] + + diff --git a/hbs2/Main.hs b/hbs2/Main.hs index b17b62a4..1f92e8a1 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -1,6 +1,5 @@ module Main where -import HBS2.Storage import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra import HBS2.Prelude @@ -9,6 +8,10 @@ import HBS2.Merkle import HBS2.Data.Types import HBS2.Data.Detect import HBS2.Defaults +import HBS2.Net.Auth.Credentials +import HBS2.Net.Messaging.UDP (UDP) +import HBS2.Net.Proto.Definition() +import HBS2.Net.Proto.Types import Data.ByteString.Lazy (ByteString) @@ -17,6 +20,7 @@ import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Maybe import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS import Data.Either import Data.Function import Data.Functor @@ -26,6 +30,7 @@ import Options.Applicative import Prettyprinter import System.Directory import Data.Maybe +import Lens.Micro.Platform -- import System.FilePath.Posix import System.IO import System.Exit @@ -84,7 +89,9 @@ newtype NewRefOpts = runHash :: HashOpts -> SimpleStorage HbSync -> IO () runHash opts ss = do - pure () + withBinaryFile (hashFp opts) ReadMode $ \h -> do + LBS.hGetContents h >>= print . pretty . hashObject @HbSync + runCat :: Data opts => opts -> SimpleStorage HbSync -> IO () runCat opts ss = do @@ -123,7 +130,7 @@ runCat opts ss = do maybe (error "empty ref") walk mbHead -runStore ::(Data opts, Block ByteString ~ ByteString) => opts -> SimpleStorage HbSync -> IO () +runStore ::(Data opts) => opts -> SimpleStorage HbSync -> IO () runStore opts ss | justInit = do putStrLn "initialized" @@ -151,6 +158,20 @@ runNewRef opts mhash ss = do res <- simpleWriteLinkRaw ss uuid (serialise ref) print (pretty res) +runNewKey :: IO () +runNewKey = do + cred <- newCredentials @UDP + print $ pretty $ AsCredFile $ AsBase58 cred + +runShowPeerKey :: Maybe FilePath -> IO () +runShowPeerKey fp = do + handle <- maybe (pure stdin) (`openFile` ReadMode) fp + bs <- LBS.hGet handle 4096 <&> LBS.toStrict + let cred' = parseCredentials @UDP (AsCredFile bs) + + maybe1 cred' exitFailure $ \cred -> do + print $ pretty $ AsBase58 (view peerSignPk cred) + withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () withStore opts f = do xdg <- getXdgDirectory XdgData defStorePath <&> fromString @@ -177,10 +198,12 @@ main = join . customExecParser (prefs showHelpOnError) $ ) where parser :: Parser (IO ()) - parser = hsubparser ( command "store" (info pStore (progDesc "store block")) - <> command "new-ref" (info pNewRef (progDesc "creates reference")) - <> command "cat" (info pCat (progDesc "cat block")) - <> command "hash" (info pHash (progDesc "calculates hash")) + parser = hsubparser ( command "store" (info pStore (progDesc "store block")) + <> command "new-ref" (info pNewRef (progDesc "creates reference")) + <> command "cat" (info pCat (progDesc "cat block")) + <> command "hash" (info pHash (progDesc "calculates hash")) + <> command "new-key" (info pNewKey (progDesc "generates a new keypair")) + <> command "show-peer-key" (info pShowPeerKey (progDesc "show peer key from credential file")) ) common = do @@ -210,4 +233,11 @@ main = join . customExecParser (prefs showHelpOnError) $ hash <- strArgument ( metavar "HASH" ) pure $ withStore o $ runHash $ HashOpts hash + pNewKey = do + pure runNewKey + + pShowPeerKey = do + fp <- optional $ strArgument ( metavar "FILE" ) + pure $ runShowPeerKey fp + diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index 93fe6920..abac12b4 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -77,6 +77,7 @@ executable hbs2 , hashable , interpolatedstring-perl6 , memory + , microlens-platform , optparse-applicative , prettyprinter , safe diff --git a/hie.yaml b/hie.yaml index eca9b9bf..04cd2439 100644 --- a/hie.yaml +++ b/hie.yaml @@ -1,17 +1,2 @@ cradle: cabal: - - path: "hbs2-tests/test/Peer2Main.hs" - component: "hbs2-tests:exe:test-peer-run" - - - path: "hbs2-tests/test/TestSKey" - component: "hbs2-tests:test:test-skey" - - - path: "hbs2-tests/test/TestChunkWriter" - component: "hbs2-tests:test:test-cw" - - - path: "hbs2-core/lib" - component: "hbs2-core:lib:hbs2-core" - - - path: "hbs2-storage-simple/lib" - component: "hbs2-storage-simple:lib:hbs2-storage-simple" -