diff --git a/docs/todo/hbs2-git-new-repo.txt b/docs/todo/hbs2-git-new-repo.txt index b3fa084d..2d06f874 100644 --- a/docs/todo/hbs2-git-new-repo.txt +++ b/docs/todo/hbs2-git-new-repo.txt @@ -155,6 +155,23 @@ TODO: hbs2-git-export-segment-params-to-export-ASAP от проекта, коммиты -- более менее стабильного размера и маленькие. -NOTE: test-1 -NOTE: test-2 +TODO: hbs2-git-http + Реализовать простой http сервер для отдачи объектов + по git протоколу, что бы локально работало клонирование + объектов и т.п. + + В самом последнем случае --- просто клонируем каталог + и отдаём файлы из него. + + В текущей реализации: + + - Время от времени синхронизируем репо (git fetch) ? + + 1. По запросу -- выясняем, в каком логе лежит объект + 2. Сканируем лог, сохраняем все объекты из него, запоминаем + 3. Отдаём сохраненные объекты (с tmp, sendfile-ом?) + 4. Время от времени (config) зачищаем файлы + + + diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index dd4c6e81..4cb28d6b 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -38,6 +38,7 @@ import Network.Socket hiding (listen,connect) -- import Network.Socket.ByteString.Lazy hiding (send,recv) import Streaming.Prelude qualified as S import System.Random hiding (next) +import Control.Monad.Trans.Resource import UnliftIO.Async import UnliftIO.STM @@ -205,88 +206,93 @@ spawnConnection :: forall m . MonadIO m spawnConnection tp env so sa = liftIO do - let myCookie = view tcpCookie env - let own = view tcpOwnPeer env - let newP = fromSockAddr @'TCP sa + runResourceT do - theirCookie <- handshake tp env so + let myCookie = view tcpCookie env + let own = view tcpOwnPeer env + let newP = fromSockAddr @'TCP sa - let connId = connectionId myCookie theirCookie + theirCookie <- handshake tp env so - when (tp == Client && theirCookie /= myCookie) do - pa <- toPeerAddr newP - liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection + let connId = connectionId myCookie theirCookie - traceCmd own - ( "spawnConnection " - <+> viaShow tp - <+> pretty myCookie - <+> pretty connId ) - newP + when (tp == Client && theirCookie /= myCookie) do + pa <- toPeerAddr newP + liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection - debug $ "handshake" <+> viaShow tp - <+> brackets (pretty (view tcpOwnPeer env)) - <+> pretty sa - <+> pretty theirCookie - <+> pretty connId + traceCmd own + ( "spawnConnection " + <+> viaShow tp + <+> pretty myCookie + <+> pretty connId ) + newP - used <- atomically $ do - modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1) - readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId + debug $ "handshake" <+> viaShow tp + <+> brackets (pretty (view tcpOwnPeer env)) + <+> pretty sa + <+> pretty theirCookie + <+> pretty connId - debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used - - when ( used <= 2 ) do - atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - - when (used == 1) do - q <- getWriteQueue connId - updatePeer connId newP - - debug $ "NEW PEER" <+> brackets (pretty own) - <+> pretty connId - <+> pretty newP - <+> parens ("used:" <+> pretty used) - - rd <- async $ fix \next -> do - - spx <- readFromSocket so 4 <&> LBS.toStrict - ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг - let px = word32 spx -- & fromIntegral - let size = word32 ssize & fromIntegral + used <- atomically $ do + modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1) + readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId - bs <- readFromSocket so size + void $ allocate (pure connId) cleanupConn - memReqId newP px + debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used - pxes <- readTVarIO (view tcpPeerPx env) + when ( used <= 2 ) do + atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes) + when (used == 1) do + q <- getWriteQueue connId + updatePeer connId newP - -- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs) + debug $ "NEW PEER" <+> brackets (pretty own) + <+> pretty connId + <+> pretty newP + <+> parens ("used:" <+> pretty used) - atomically $ writeTQueue (view tcpRecv env) (orig, bs) + rd <- async $ fix \next -> do - next + spx <- readFromSocket so 4 <&> LBS.toStrict + ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг + let px = word32 spx -- & fromIntegral + let size = word32 ssize & fromIntegral - wr <- async $ fix \next -> do - (rcpt, bs) <- atomically $ readTQueue q - pq <- makeReqId rcpt - let qids = bytestring32 pq - let size = bytestring32 (fromIntegral $ LBS.length bs) + bs <- readFromSocket so size - let frame = LBS.fromStrict qids - <> LBS.fromStrict size -- req-size - <> bs -- payload + memReqId newP px + + pxes <- readTVarIO (view tcpPeerPx env) + + let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes) + + -- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs) + + atomically $ writeTQueue (view tcpRecv env) (orig, bs) - sendLazy so frame --(LBS.toStrict frame) next - void $ waitAnyCatchCancel [rd,wr] + wr <- async $ fix \next -> do + (rcpt, bs) <- atomically $ readTQueue q - cleanupConn connId + pq <- makeReqId rcpt + let qids = bytestring32 pq + let size = bytestring32 (fromIntegral $ LBS.length bs) + + let frame = LBS.fromStrict qids + <> LBS.fromStrict size -- req-size + <> bs -- payload + + sendLazy so frame --(LBS.toStrict frame) + next + + void $ waitAnyCatchCancel [rd,wr] + + -- cleanupConn connId -- gracefulClose so 1000 debug $ "spawnConnection exit" <+> pretty sa