This commit is contained in:
voidlizard 2024-12-28 09:13:30 +03:00
parent 2e4558d444
commit 06089dcffb
2 changed files with 43 additions and 8 deletions

View File

@ -1042,8 +1042,7 @@ theDict = do
tn <- getNumCapabilities
sourceQ <- newTBQueueIO (fromIntegral tn * 10)
sourceQ <- newTBQueueIO (fromIntegral tn * 1024)
let write sz_ fh ss = do
LBS.hPutStr fh ss
@ -1090,14 +1089,25 @@ theDict = do
progress_ <- newTVarIO 0
-- (pool, gitCatBatchQ) <- lift $ limitedResourceWorkerRequestQ tn startGitCat stopProcess
-- link pool
gitCatBatchQ <- contWorkerPool 16 do
che <- ContT withGitCat
pure $ gitReadObjectMaybe che
-- void $ ContT $ bracket (pure pool) cancel
workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do
theReader <- ContT withGitCat
-- let gitCatBatchQ commit = gitReadObjectMaybe theReader commit
for_ chunk \commit -> do
atomically $ modifyTVar progress_ succ
(_,self) <- gitReadObjectMaybe theReader commit
(_,self) <- lift (gitCatBatchQ commit)
>>= orThrow (GitReadError (show $ pretty commit))
tree <- gitReadCommitTree self
@ -1109,7 +1119,7 @@ theDict = do
for_ hashes $ \gh -> do
atomically $ modifyTVar _already (HS.insert gh)
-- debug $ "object" <+> pretty gh
(_t,lbs) <- gitReadObjectMaybe theReader gh
(_t,lbs) <- lift (gitCatBatchQ gh)
>>= orThrow (GitReadError (show $ pretty gh))
let e = [ Builder.byteString (coerce gh)
@ -1189,11 +1199,32 @@ contWorkerPool' n contWorker = do
-- возвращаем функцию, с помощью которой отправлять воркерам запрос
-- и получать ответ
pure \a -> do
tmv <- atomically newEmptyTMVar
tmv <- newEmptyTMVarIO
atomically $ writeTQueue inQ (a, atomically . STM.putTMVar tmv)
pure do
either throwIO pure =<< atomically (readTMVar tmv)
limitedResourceWorkerRequestQ :: MonadUnliftIO m
=> Int
-> m r -- ^ create resource
-> ( r -> m () ) -- ^ destroy resource
-> m ( Async (), (r -> m b) -> m b )
limitedResourceWorkerRequestQ n create destroy = do
inQ <- newTQueueIO
ass <- async $ flip runContT pure do
replicateM_ n do
(link <=< ContT . withAsync) $ flip runContT pure do
r <- ContT $ bracket create destroy
(fix . (>>)) $ atomically (readTQueue inQ) >>= \(a,reply) -> do
lift (tryAny (a r)) >>= reply
pure $ (ass, \fn -> do
tmv <- newEmptyTMVarIO
atomically $ writeTQueue inQ (fn, atomically . STM.putTMVar tmv)
atomically (readTMVar tmv) >>= either throwIO pure)
linearSearchLBS hash lbs = do
found <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do

View File

@ -139,11 +139,15 @@ gitReadHEAD = runMaybeT do
withGitCat :: (MonadIO m) => (Process Handle Handle () -> m a) -> m a
withGitCat action = do
p <- startGitCat
action p
startGitCat :: MonadIO m => m (Process Handle Handle ())
startGitCat = do
let cmd = "git"
let args = ["cat-file", "--batch"]
let config = setStdin createPipe $ setStdout createPipe $ setStderr closed $ proc cmd args
p <- startProcess config
action p
startProcess config
withGitCatCheck :: (MonadIO m) => (Process Handle Handle () -> m a) -> m a
withGitCatCheck action = do