tcp minor fix + fixme

This commit is contained in:
Dmitry Zuikov 2023-07-01 06:17:12 +03:00
parent f04dedde31
commit 55cdf976da
2 changed files with 85 additions and 62 deletions

View File

@ -155,6 +155,23 @@ TODO: hbs2-git-export-segment-params-to-export-ASAP
от проекта, коммиты -- более менее стабильного размера и от проекта, коммиты -- более менее стабильного размера и
маленькие. маленькие.
NOTE: test-1 TODO: hbs2-git-http
NOTE: test-2 Реализовать простой http сервер для отдачи объектов
по git протоколу, что бы локально работало клонирование
объектов и т.п.
В самом последнем случае --- просто клонируем каталог
и отдаём файлы из него.
В текущей реализации:
- Время от времени синхронизируем репо (git fetch) ?
1. По запросу -- выясняем, в каком логе лежит объект
2. Сканируем лог, сохраняем все объекты из него, запоминаем
3. Отдаём сохраненные объекты (с tmp, sendfile-ом?)
4. Время от времени (config) зачищаем файлы

View File

@ -38,6 +38,7 @@ import Network.Socket hiding (listen,connect)
-- import Network.Socket.ByteString.Lazy hiding (send,recv) -- import Network.Socket.ByteString.Lazy hiding (send,recv)
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import System.Random hiding (next) import System.Random hiding (next)
import Control.Monad.Trans.Resource
import UnliftIO.Async import UnliftIO.Async
import UnliftIO.STM import UnliftIO.STM
@ -205,88 +206,93 @@ spawnConnection :: forall m . MonadIO m
spawnConnection tp env so sa = liftIO do spawnConnection tp env so sa = liftIO do
let myCookie = view tcpCookie env runResourceT do
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
theirCookie <- handshake tp env so let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
let connId = connectionId myCookie theirCookie theirCookie <- handshake tp env so
when (tp == Client && theirCookie /= myCookie) do let connId = connectionId myCookie theirCookie
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
traceCmd own when (tp == Client && theirCookie /= myCookie) do
( "spawnConnection " pa <- toPeerAddr newP
<+> viaShow tp liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
<+> pretty myCookie
<+> pretty connId )
newP
debug $ "handshake" <+> viaShow tp traceCmd own
<+> brackets (pretty (view tcpOwnPeer env)) ( "spawnConnection "
<+> pretty sa <+> viaShow tp
<+> pretty theirCookie <+> pretty myCookie
<+> pretty connId <+> pretty connId )
newP
used <- atomically $ do debug $ "handshake" <+> viaShow tp
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1) <+> brackets (pretty (view tcpOwnPeer env))
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId <+> pretty sa
<+> pretty theirCookie
<+> pretty connId
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
when ( used <= 2 ) do readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
when (used == 1) do
q <- getWriteQueue connId
updatePeer connId newP
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
rd <- async $ fix \next -> do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size void $ allocate (pure connId) cleanupConn
memReqId newP px debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
pxes <- readTVarIO (view tcpPeerPx env) when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes) when (used == 1) do
q <- getWriteQueue connId
updatePeer connId newP
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs) debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
atomically $ writeTQueue (view tcpRecv env) (orig, bs) rd <- async $ fix \next -> do
next spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
wr <- async $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
pq <- makeReqId rcpt bs <- readFromSocket so size
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids memReqId newP px
<> LBS.fromStrict size -- req-size
<> bs -- payload pxes <- readTVarIO (view tcpPeerPx env)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
sendLazy so frame --(LBS.toStrict frame)
next next
void $ waitAnyCatchCancel [rd,wr] wr <- async $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
cleanupConn connId pq <- makeReqId rcpt
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
next
void $ waitAnyCatchCancel [rd,wr]
-- cleanupConn connId
-- gracefulClose so 1000 -- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa debug $ "spawnConnection exit" <+> pretty sa