diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index ce6d4a2c..03d075ee 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -1042,8 +1042,7 @@ theDict = do tn <- getNumCapabilities - sourceQ <- newTBQueueIO (fromIntegral tn * 10) - + sourceQ <- newTBQueueIO (fromIntegral tn * 1024) let write sz_ fh ss = do LBS.hPutStr fh ss @@ -1090,14 +1089,25 @@ theDict = do progress_ <- newTVarIO 0 + -- (pool, gitCatBatchQ) <- lift $ limitedResourceWorkerRequestQ tn startGitCat stopProcess + + -- link pool + + gitCatBatchQ <- contWorkerPool 16 do + che <- ContT withGitCat + pure $ gitReadObjectMaybe che + + -- void $ ContT $ bracket (pure pool) cancel + workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do - theReader <- ContT withGitCat + + -- let gitCatBatchQ commit = gitReadObjectMaybe theReader commit for_ chunk \commit -> do atomically $ modifyTVar progress_ succ - (_,self) <- gitReadObjectMaybe theReader commit + (_,self) <- lift (gitCatBatchQ commit) >>= orThrow (GitReadError (show $ pretty commit)) tree <- gitReadCommitTree self @@ -1109,7 +1119,7 @@ theDict = do for_ hashes $ \gh -> do atomically $ modifyTVar _already (HS.insert gh) -- debug $ "object" <+> pretty gh - (_t,lbs) <- gitReadObjectMaybe theReader gh + (_t,lbs) <- lift (gitCatBatchQ gh) >>= orThrow (GitReadError (show $ pretty gh)) let e = [ Builder.byteString (coerce gh) @@ -1189,11 +1199,32 @@ contWorkerPool' n contWorker = do -- возвращаем функцию, с помощью которой отправлять воркерам запрос -- и получать ответ pure \a -> do - tmv <- atomically newEmptyTMVar + tmv <- newEmptyTMVarIO atomically $ writeTQueue inQ (a, atomically . STM.putTMVar tmv) pure do either throwIO pure =<< atomically (readTMVar tmv) + +limitedResourceWorkerRequestQ :: MonadUnliftIO m + => Int + -> m r -- ^ create resource + -> ( r -> m () ) -- ^ destroy resource + -> m ( Async (), (r -> m b) -> m b ) + +limitedResourceWorkerRequestQ n create destroy = do + inQ <- newTQueueIO + ass <- async $ flip runContT pure do + replicateM_ n do + (link <=< ContT . withAsync) $ flip runContT pure do + r <- ContT $ bracket create destroy + (fix . (>>)) $ atomically (readTQueue inQ) >>= \(a,reply) -> do + lift (tryAny (a r)) >>= reply + + pure $ (ass, \fn -> do + tmv <- newEmptyTMVarIO + atomically $ writeTQueue inQ (fn, atomically . STM.putTMVar tmv) + atomically (readTMVar tmv) >>= either throwIO pure) + linearSearchLBS hash lbs = do found <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do diff --git a/hbs2-git3/lib/HBS2/Git3/Git.hs b/hbs2-git3/lib/HBS2/Git3/Git.hs index 61b5940d..aa09aa4c 100644 --- a/hbs2-git3/lib/HBS2/Git3/Git.hs +++ b/hbs2-git3/lib/HBS2/Git3/Git.hs @@ -139,11 +139,15 @@ gitReadHEAD = runMaybeT do withGitCat :: (MonadIO m) => (Process Handle Handle () -> m a) -> m a withGitCat action = do + p <- startGitCat + action p + +startGitCat :: MonadIO m => m (Process Handle Handle ()) +startGitCat = do let cmd = "git" let args = ["cat-file", "--batch"] let config = setStdin createPipe $ setStdout createPipe $ setStderr closed $ proc cmd args - p <- startProcess config - action p + startProcess config withGitCatCheck :: (MonadIO m) => (Process Handle Handle () -> m a) -> m a withGitCatCheck action = do