tcp minor fix + fixme

This commit is contained in:
Dmitry Zuikov 2023-07-01 06:17:12 +03:00
parent f04dedde31
commit 55cdf976da
2 changed files with 85 additions and 62 deletions

View File

@ -155,6 +155,23 @@ TODO: hbs2-git-export-segment-params-to-export-ASAP
от проекта, коммиты -- более менее стабильного размера и
маленькие.
NOTE: test-1
NOTE: test-2
TODO: hbs2-git-http
Реализовать простой http сервер для отдачи объектов
по git протоколу, что бы локально работало клонирование
объектов и т.п.
В самом последнем случае --- просто клонируем каталог
и отдаём файлы из него.
В текущей реализации:
- Время от времени синхронизируем репо (git fetch) ?
1. По запросу -- выясняем, в каком логе лежит объект
2. Сканируем лог, сохраняем все объекты из него, запоминаем
3. Отдаём сохраненные объекты (с tmp, sendfile-ом?)
4. Время от времени (config) зачищаем файлы

View File

@ -38,6 +38,7 @@ import Network.Socket hiding (listen,connect)
-- import Network.Socket.ByteString.Lazy hiding (send,recv)
import Streaming.Prelude qualified as S
import System.Random hiding (next)
import Control.Monad.Trans.Resource
import UnliftIO.Async
import UnliftIO.STM
@ -205,88 +206,93 @@ spawnConnection :: forall m . MonadIO m
spawnConnection tp env so sa = liftIO do
let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
runResourceT do
theirCookie <- handshake tp env so
let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
let connId = connectionId myCookie theirCookie
theirCookie <- handshake tp env so
when (tp == Client && theirCookie /= myCookie) do
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
let connId = connectionId myCookie theirCookie
traceCmd own
( "spawnConnection "
<+> viaShow tp
<+> pretty myCookie
<+> pretty connId )
newP
when (tp == Client && theirCookie /= myCookie) do
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
debug $ "handshake" <+> viaShow tp
<+> brackets (pretty (view tcpOwnPeer env))
<+> pretty sa
<+> pretty theirCookie
<+> pretty connId
traceCmd own
( "spawnConnection "
<+> viaShow tp
<+> pretty myCookie
<+> pretty connId )
newP
used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
debug $ "handshake" <+> viaShow tp
<+> brackets (pretty (view tcpOwnPeer env))
<+> pretty sa
<+> pretty theirCookie
<+> pretty connId
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
when (used == 1) do
q <- getWriteQueue connId
updatePeer connId newP
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
rd <- async $ fix \next -> do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
bs <- readFromSocket so size
void $ allocate (pure connId) cleanupConn
memReqId newP px
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
pxes <- readTVarIO (view tcpPeerPx env)
when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
when (used == 1) do
q <- getWriteQueue connId
updatePeer connId newP
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
rd <- async $ fix \next -> do
next
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
wr <- async $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
pq <- makeReqId rcpt
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
bs <- readFromSocket so size
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
memReqId newP px
pxes <- readTVarIO (view tcpPeerPx env)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
sendLazy so frame --(LBS.toStrict frame)
next
void $ waitAnyCatchCancel [rd,wr]
wr <- async $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
cleanupConn connId
pq <- makeReqId rcpt
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
next
void $ waitAnyCatchCancel [rd,wr]
-- cleanupConn connId
-- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa