This commit is contained in:
Dmitry Zuykov 2025-05-13 11:41:41 +03:00
parent 9722fa7c01
commit 77a0052ffb
2 changed files with 98 additions and 72 deletions

View File

@ -53,7 +53,8 @@ import System.Posix.IO as PosixBase
import System.Posix.Types as Posix
import System.Posix.IO.ByteString as Posix
import System.Posix.Unistd
import System.Posix.Files (getFileStatus, modificationTimeHiRes)
import System.Posix.Files (getFileStatus, modificationTimeHiRes, getFdStatus, FileStatus(..))
import System.Posix.Files qualified as PFS
import System.IO.Error (catchIOError)
import System.IO.MMap as MMap
import System.IO.Temp (emptyTempFile)
@ -145,7 +146,7 @@ instance Pretty Location where
pretty = \case
InWriteQueue{} -> "write-queue"
InCurrent (o,l) -> pretty $ mkForm @C "current" [mkInt o, mkInt l]
InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkList [mkInt o, mkInt l]]
InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l]
type IsHCQKey h = ( Eq (Key h)
, Hashable (Key h)
@ -307,8 +308,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
refsWriter <- makeRefsWriter
reader <- makeReader
indexer <- makeIndexer indexQ
writer <- makeWriter indexQ
indexer <- makeIndexer writer indexQ
mapM_ waitCatch [writer,indexer,refsWriter]
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
@ -361,7 +362,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
flush <- isEmptyTQueue myFlushQ <&> not
stop <- readTVar ncqStopped
bytes <- readTVar ncqNotWritten
if bytes > dumpData || flush || stop then none else STM.retry
now <- readTVar ncqIndexNow <&> (>0)
if bytes > dumpData || flush || now || stop then none else STM.retry
void $ atomically (STM.flushTQueue myFlushQ)
@ -410,18 +412,16 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
pure refsWriter
makeIndexer indexQ = do
indexer <- ContT $ withAsync $ untilStopped do
debug $ "STARTED INDEXER"
makeIndexer w indexQ = do
indexer <- ContT $ withAsync $ fix \next -> do
what' <- race (pause @'Seconds 1) $ atomically do
stop <- readTVar ncqStopped
q <- tryPeekTQueue indexQ
if not (stop || isJust q) then
STM.retry
else do
STM.flushTQueue indexQ
stop <- readTVar ncqStopped
q <- tryPeekTQueue indexQ
if not ( stop || isJust q) then
STM.retry
else do
STM.flushTQueue indexQ
let what = fromRight mempty what'
@ -431,17 +431,19 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
(key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList
-- atomically do
-- r <- readTVar ncqWaitIndex <&> HPSQ.toList
-- let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)]
-- writeTVar ncqWaitIndex (HPSQ.fromList new)
ncqAddTrackedFilesIO ncq [key]
atomically do
modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
ncqLoadSomeIndexes ncq [fromString key]
down <- atomically do
writerDown <- pollSTM w <&> isJust
stopped <- readTVar ncqStopped
pure (stopped && writerDown)
unless down next
link indexer
pure indexer
@ -456,29 +458,29 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
initQ <- readTVarIO ncqWriteQueue
wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of
Nothing -> pure mempty
Just (h,_,WQItem{..},rest) -> do
Nothing -> pure mempty
Just (h,_,WQItem{..},rest) -> do
off <- fdSeek fh SeekFromEnd 0
let b = byteString (coerce @_ @ByteString h)
<> lazyByteString (fromMaybe mempty wqData)
let wbs = toLazyByteString b
let len = LBS.length wbs
let ws = N.bytestring32 (fromIntegral len)
let w = 4 + len
off <- fdSeek fh SeekFromEnd 0
let b = byteString (coerce @_ @ByteString h)
<> lazyByteString (fromMaybe mempty wqData)
let wbs = toLazyByteString b
let len = LBS.length wbs
let ws = N.bytestring32 (fromIntegral len)
let w = 4 + len
if isNothing wqData && wqNew then
pure ()
else void do
liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs))
if isNothing wqData && wqNew then
pure ()
else void do
liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs))
written' <- if written < syncData then do
pure (written + w)
else do
fileSynchronise fh
pure 0
written' <- if written < syncData then do
pure (written + w)
else do
fileSynchronise fh
pure 0
((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest)
((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest)
fileSynchronise fh
size <- fdSeek fh SeekFromEnd 0
@ -509,52 +511,58 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
when (fromIntegral size >= ncqMinLog || indexNow > 0) do
(n,u) <- atomically do
r <- readTVar ncqCurrentHandleR <&> fromIntegral
u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r
pure (fromIntegral @_ @Word32 r, u)
fsize <- readTVarIO ncqCurrentHandleR
>>= getFdStatus
<&> PFS.fileSize
let current = ncqGetCurrentName ncq
unless (fsize == 0) do
fossilized <- ncqGetNewFossilName ncq
(n,u) <- atomically do
r <- readTVar ncqCurrentHandleR <&> fromIntegral
u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r
pure (fromIntegral @_ @Word32 r, u)
warn $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u
let current = ncqGetCurrentName ncq
mv current fossilized
fossilized <- ncqGetNewFossilName ncq
atomically do
writeTVar ncqIndexNow 0
r <- readTVar ncqCurrentHandleR
-- NOTE: extra-use
-- добавляем лишний 1 для индексации.
-- исходный файл закрываем, только когда проиндексировано.
-- то есть должны отнять 1 после индексации.
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1)
writeTQueue indexQ (r, fossilized)
debug $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u
let flags = defaultFileFlags { exclusive = True }
mv current fossilized
touch current
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0)
atomically do
writeTVar ncqIndexNow 0
r <- readTVar ncqCurrentHandleR
-- NOTE: extra-use
-- добавляем лишний 1 для индексации.
-- исходный файл закрываем, только когда проиндексировано.
-- то есть должны отнять 1 после индексации.
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1)
writeTQueue indexQ (r, fossilized)
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
>>= atomically . writeTVar ncqCurrentHandleW
let flags = defaultFileFlags { exclusive = True }
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
>>= atomically . writeTVar ncqCurrentHandleR
touch current
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0)
debug $ "TRUNCATED, moved to" <+> pretty fossilized
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
>>= atomically . writeTVar ncqCurrentHandleW
toClose <- atomically do
w <- readTVar ncqCurrentUsage <&> IntMap.toList
let (alive,dead) = List.partition( (>0) . snd) w
writeTVar ncqCurrentUsage (IntMap.fromList alive)
pure dead
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
>>= atomically . writeTVar ncqCurrentHandleR
for_ toClose $ \(f,_) -> do
when (f > 0) do
debug $ "CLOSE FD" <+> pretty f
Posix.closeFd (fromIntegral f)
debug $ "TRUNCATED, moved to" <+> pretty fossilized
toClose <- atomically do
w <- readTVar ncqCurrentUsage <&> IntMap.toList
let (alive,dead) = List.partition( (>0) . snd) w
writeTVar ncqCurrentUsage (IntMap.fromList alive)
pure dead
for_ toClose $ \(f,_) -> do
when (f > 0) do
debug $ "CLOSE FD" <+> pretty f
Posix.closeFd (fromIntegral f)
ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> LBS.ByteString -> m (Maybe HashRef)
ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do

View File

@ -182,6 +182,24 @@ main = do
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:fossilize" $ nil_ \case
[ isOpaqueOf @TCQ -> Just tcq ] -> lift do
ncq <- getNCQ tcq
ncqIndexRightNow ncq
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:locate" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
ncq <- getNCQ tcq
ncqLocate ncq hash >>= \case
Just x -> do
parseSyntax (show $ pretty x) & either (error.show) pure
_ -> pure nil
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:get" $ \case
[ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do
ncq <- getNCQ tcq