wip, works betta

This commit is contained in:
Dmitry Zuikov 2024-03-28 17:44:16 +03:00
parent 48d17c6e26
commit af6d6db378
4 changed files with 75 additions and 34 deletions

View File

@ -20,6 +20,7 @@ import Data.Hashable
import Network.ByteOrder hiding (ByteString)
import System.IO.Unsafe (unsafePerformIO)
import System.Posix.IO
import UnliftIO
-- define new transport protocol type
@ -53,6 +54,7 @@ newMessagingPipe (pIn,pOut) = do
instance Hashable PipeAddr where
hashWithSalt salt (PipeAddr pip) = hashWithSalt salt ("pipe-addr", fd)
where
-- FIXME: ASAP-unsafePerformIO-is-really-unsafe
fd = unsafePerformIO (handleToFd pip <&> fromIntegral @_ @Word)
instance HasPeer PIPE where
@ -85,10 +87,14 @@ instance Messaging MessagingPipe PIPE ByteString where
runMessagingPipe :: MonadIO m => MessagingPipe -> m ()
runMessagingPipe bus = liftIO do
fix \next -> do
done <- hIsEOF who
unless done do
r <- try @_ @SomeException do
frame <- LBS.hGet who 4 <&> word32 . LBS.toStrict
piece <- LBS.hGet who (fromIntegral frame)
atomically (writeTQueue (inQ bus) piece)
next
either (const $ pure ()) (const next) r
where
who = pipeIn bus

View File

@ -30,6 +30,11 @@ newtype GitLwwSeq = GitLwwSeq Word64
deriving stock (Generic,Data)
deriving newtype (ToField)
newtype GitRepoHeadSeq = GitRepoHeadSeq Word64
deriving stock (Generic,Data)
deriving newtype (ToField)
newtype GitRefLog = GitRefLog (RefLogKey HBS2Basic)
deriving stock (Generic,Data)
deriving newtype (ToField)
@ -63,6 +68,7 @@ data GitRepoFacts =
, gitRefLog :: GitRefLog
, gitTx :: GitTx
, gitRepoHead :: GitRepoHeadRef
, gitRepoHeadSeq :: GitRepoHeadSeq
, gitName :: GitName
, gitBrief :: GitBrief
, gitEncrypted :: GitEncrypted
@ -80,6 +86,7 @@ instance Serialise GitName
instance Serialise GitBrief
instance Serialise GitRepoExtended
instance Serialise GitEncrypted
instance Serialise GitRepoHeadSeq
instance ToField HashRef where
toField = toField @String . show . pretty
@ -129,5 +136,6 @@ instance HasColumnName GitBrief where
instance HasColumnName GitEncrypted where
columnName = "gk"
instance HasColumnName GitRepoHeadSeq where
columnName = "repoheadseq"

View File

@ -41,6 +41,10 @@ import System.Process.Typed
import Text.InterpolatedString.Perl6 (qc)
import System.Environment (getProgName, getArgs)
import System.Posix.Signals
import System.Exit
{- HLINT ignore "Functor law" -}
runOracleIndex :: forall m . MonadUnliftIO m
@ -113,6 +117,7 @@ runOracleIndex auPk = do
(GitRefLog rk)
(GitTx tx)
(GitRepoHeadRef rhh)
(GitRepoHeadSeq (fromIntegral n))
(GitName (Just name))
(GitBrief (Just brief))
(GitEncrypted _repoHeadGK0)
@ -153,16 +158,17 @@ runDump pks = do
flip runContT pure do
p <- ContT $ withProcessWait cmd
-- p <- lift $ startProcess cmd
let ssin = getStdin p
let sout = getStdout p
client <- newMessagingPipe (sout,ssin) -- ,sout)
void $ ContT $ withAsync $ runMessagingPipe client
mess <- ContT $ bracket (async $ runMessagingPipe client) cancel
caller <- makeServiceCaller @BrowserPluginAPI @PIPE (localPeer client)
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClient caller) client
broker <- ContT $ bracket (async $ liftIO $ runReaderT (runServiceClient caller) client) cancel
wtf <- callService @RpcChannelQuery caller ()
>>= orThrowUser "can't query rpc"
@ -173,6 +179,19 @@ runDump pks = do
liftIO $ LBS.putStr (A.encodePretty val)
hClose ssin
hClose sout
waitExitCode p
debug "CLIENT: WTF?"
-- stopProcess p
-- error "MOTHERFUCKER!"
-- void $ callService @RpcChannelQuery caller ()
-- >>= orThrowUser "can't query rpc"
-- liftIO $ exitSuccess
data RpcChannelQuery
-- API definition
@ -196,20 +215,17 @@ instance (MonadUnliftIO m, HasOracleEnv m) => HandleMethod m RpcChannelQuery whe
withOracleEnv env do
items <- withState $ select_ @_ @(HashVal, Text, Text) [qc|
SELECT
g.ref,
gn.name,
gb.brief
FROM
gitrepo AS g
INNER JOIN
gitreponame AS gn ON g.ref = gn.ref
INNER JOIN
gitrepoheadversion AS ghv ON gn.hash = ghv.hash
LEFT JOIN
gitrepobrief AS gb ON g.ref = gb.ref AND ghv.hash = gb.hash
GROUP BY
g.ref, gn.name
select lwwref, name, brief
from (
select
lwwref
, name
, brief
, max(lwwseq)
, max(repoheadseq)
from gitrepofact
group by lwwref,name,brief) as s0;
|]
let root = object [ "rows" .= items
@ -242,11 +258,13 @@ runPipe = do
chan <- asks _refchanId
debug "run pipe"
liftIO $ void $ installHandler sigPIPE Ignore Nothing
flip runContT pure do
server <- newMessagingPipe (stdin,stdout)
void $ ContT $ withAsync $ runMessagingPipe server
void $ ContT $ bracket (async $ runMessagingPipe server) cancel
void $ ContT $ withAsync $ forever do
debug $ yellow "updateState"
@ -254,12 +272,19 @@ runPipe = do
pause @'Seconds 60
-- make server protocol responder
-- void $ ContT $ withAsync $ flip
lift $ flip runReaderT server do
serv <- ContT $ withAsync $ flip runReaderT server do
runProto @PIPE
[ makeResponse (makeServer @BrowserPluginAPI)
]
fix \next -> do
-- debug $ red "YAYAYAYA"
done1 <- hIsClosed stdin
done2 <- hIsClosed stdout
done3 <- hIsEOF stdin
let done = done1 || done2 || done3
debug $ red "DONE:" <+> pretty done
unless done (pause @'Seconds 0.01 >> next)
updateState :: MonadUnliftIO m => Oracle m ()
updateState = do

View File

@ -46,6 +46,7 @@ gitRepoFactTable = do
, reflog text not null
, tx text not null
, repohead text not null
, repoheadseq integer not null
, name text null
, brief text null
, gk text null
@ -97,6 +98,7 @@ insertRepoFacts GitRepoFacts{..} = do
, gitRefLog
, gitTx
, gitRepoHead
, gitRepoHeadSeq
, gitName
, gitBrief
, gitEncrypted