This commit is contained in:
voidlizard 2025-03-14 07:17:44 +03:00 committed by Dmitry Zuykov
parent 569b55d401
commit 7597ed2822
2 changed files with 124 additions and 53 deletions

View File

@ -92,7 +92,7 @@ data NCQStorage =
, ncqMaxCachedIdx :: Int
, ncqMaxCachedData :: Int
, ncqRefsMem :: TVar (HashMap HashRef HashRef)
, ncqRefsDirty :: TVar Bool
, ncqRefsDirty :: TVar Int
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString)
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
, ncqTrackedFiles :: TVar (HashSet FileKey)
@ -104,7 +104,7 @@ data NCQStorage =
, ncqCurrentHandleR :: TVar Fd
, ncqCurrentUsage :: TVar (IntMap Int)
, ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString))
, ncqFlushNow :: TQueue ()
, ncqFlushNow :: TVar Int
, ncqOpenDone :: TMVar Bool
, ncqStopped :: TVar Bool
}
@ -154,6 +154,9 @@ ncqGetNewFossilName n@NCQStorage{} = do
let (p,tpl) = splitFileName fn
liftIO $ emptyTempFile p tpl
ncqGetRefsDataFileName :: NCQStorage -> FilePath
ncqGetRefsDataFileName ncq = ncqGetFileName ncq "refs.data"
ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath
ncqGetIndexFileName ncq fk = do
ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq")
@ -242,8 +245,8 @@ ncqIndexFile n@NCQStorage{} fp' = do
ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageStop ncq@NCQStorage{..} = do
atomically $ writeTVar ncqStopped True
ncqStorageSync ncq
atomically $ writeTVar ncqStopped True
atomically $ fix \next -> do
done <- readTVar ncqWriteQueue <&> HPSQ.null
unless done STM.retry
@ -255,6 +258,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
let dumpData = 1024 ^ 2
let syncData = fromIntegral ncqSyncSize
let untilStopped m = fix \loop -> do
m >> readTVarIO ncqStopped >>= \case
False -> loop
_ -> debug "STOPPING THREAD"
ContT $ bracket none $ const $ liftIO do
-- writeJournal syncData
readTVarIO ncqCurrentHandleW >>= closeFd
@ -264,7 +272,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
-- cap <- (10*) <$> getNumCapabilities
cap <- getNumCapabilities
reader <- ContT $ withAsync $ forever do
refsWriter <- ContT $ withAsync $ untilStopped do
-- FIXME: timeout-hardcode
void $ race (pause @'Seconds 2) $ atomically do
readTVar ncqStopped `orElse` STM.retry
dirty <- readTVarIO ncqRefsDirty
when (dirty > 0) do
refs <- readTVarIO ncqRefsMem <&> HM.toList
withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do
for_ refs $ \(k,v) -> do
let ks = coerce @_ @ByteString k
let vs = coerce @_ @ByteString v
let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay
liftIO do
BS.hPutStr fh (N.bytestring32 (fromIntegral w))
BS.hPutStr fh ks
BS.hPutStr fh vs
atomically $ writeTVar ncqRefsDirty 0
link refsWriter
reader <- ContT $ withAsync $ untilStopped do
reqs <- atomically do
xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap)
@ -281,23 +312,39 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
indexQ <- newTQueueIO
indexer <- ContT $ withAsync $ forever do
(fd, fn) <- atomically (readTQueue indexQ)
key <- ncqIndexFile ncq fn <&> fromString @FileKey
indexer <- ContT $ withAsync $ untilStopped do
atomically do
ncqAddTrackedFilesSTM ncq [key]
modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
what <- atomically do
e <- tryReadTQueue indexQ
stop <- readTVar ncqStopped
ncqLoadSomeIndexes ncq [key]
case e of
Just x -> pure (Just x)
Nothing | stop -> pure Nothing
| otherwise -> STM.retry
for_ what $ \(fd,fn) -> do
key <- ncqIndexFile ncq fn <&> fromString @FileKey
atomically do
ncqAddTrackedFilesSTM ncq [key]
modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
ncqLoadSomeIndexes ncq [key]
link indexer
fix \loop -> do
writer <- ContT $ withAsync $ untilStopped do
flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do
peekTQueue ncqFlushNow >> STM.flushTQueue ncqFlushNow
pure True
flush <- readTVar ncqFlushNow
stop <- readTVar ncqStopped
if flush > 0 || stop then do
writeTVar ncqFlushNow 0
pure True
else do
STM.retry
let flushNow = fromRight False flush
@ -307,16 +354,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0
when (dumpByTime || bytes >= dumpData || flushNow) do
-- debug "NCQStorage: dump data!"
stopped <- readTVarIO ncqStopped
when (dumpByTime || bytes >= dumpData || flushNow || stopped) do
debug "NCQStorage: dump data!"
liftIO $ writeJournal indexQ syncData
done <- atomically do
mt <- readTVar ncqWriteQueue <&> HPSQ.null
stop <- readTVar ncqStopped
pure (mt && stop)
unless done loop
mapM_ waitCatch [writer,indexer,refsWriter]
mapM_ cancel [reader]
where
@ -583,14 +628,27 @@ ncqStorageGet ncq@NCQStorage{..} h = do
pure $ Just $ LBS.fromStrict $ BS.take (fromIntegral l) (BS.drop (fromIntegral o+4+32) mmaped)
ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef)
ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref
ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m ()
ncqStorageSetRef NCQStorage{..} ref val = atomically do
stopped <- readTVar ncqStopped
unless stopped do
modifyTVar ncqRefsMem (HM.insert ref val)
modifyTVar ncqRefsDirty succ
ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m ()
ncqStorageDelRef NCQStorage{..} ref = atomically do
modifyTVar ncqRefsMem (HM.delete ref)
ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m NCQStorage
ncqStorageDel sto h = do
error "not implemented yet"
ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m ()
ncqStorageSync NCQStorage{..} = do
atomically $ writeTQueue ncqFlushNow ()
atomically $ modifyTVar ncqFlushNow succ
ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m ()
ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do
@ -631,10 +689,21 @@ ncqStorageOpen fp = do
ncqFixIndexes ncq
ncqLoadIndexes ncq
readCurrent ncq
readRefs ncq
atomically $ putTMVar ncqOpenDone True
pure ncq
where
readRefs ncq@NCQStorage{..} = do
mmaped <- liftIO $ mmapFileByteString (ncqGetRefsDataFileName ncq) Nothing
kvs <- S.toList_ do
scanBS mmaped $ \bs -> do
let k = BS.copy (BS.take 32 bs) & coerce @_ @HashRef
let v = BS.copy (BS.take 32 (BS.drop 32 bs)) & coerce @_ @HashRef
S.yield (k,v)
atomically $ writeTVar ncqRefsMem (HM.fromList kvs)
readCurrent ncq@NCQStorage{..} = do
let fn = ncqGetCurrentName ncq
-- liftIO $ print $ pretty "FILE" <+> pretty fn
@ -691,7 +760,7 @@ ncqStorageInit_ check path = do
let ncqRoot = path
ncqRefsMem <- newTVarIO mempty
ncqRefsDirty <- newTVarIO False
ncqRefsDirty <- newTVarIO 0
let ncqSyncSize = 32 * (1024 ^ 2)
let ncqMinLog = 2 * (1024 ^ 3)
@ -706,7 +775,7 @@ ncqStorageInit_ check path = do
ncqLastWritten <- getTimeCoarse >>= newTVarIO
ncqWaitIndex <- newTVarIO HPSQ.empty
ncqFlushNow <- newTQueueIO
ncqFlushNow <- newTVarIO 0
ncqOpenDone <- newEmptyTMVarIO
ncqCurrentReadReq <- newTVarIO mempty
ncqCurrentUsage <- newTVarIO mempty
@ -725,6 +794,7 @@ ncqStorageInit_ check path = do
let ncqCurrentHandleW = undefined
let ncqCurrentHandleR = undefined
let ncq0 = NCQStorage{..}
lastSz <- try @_ @IOException (BS.readFile currentSize)
<&> either (const 0) N.word64
@ -752,6 +822,10 @@ ncqStorageInit_ check path = do
debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen)
pure $ NCQStorage{..}
let ncq = NCQStorage{..}
touch (ncqGetRefsDataFileName ncq)
pure ncq

View File

@ -311,6 +311,7 @@ main = do
h2 <- lift $ ncqStoragePut ncq "PECHENTRESKI!"
liftIO $ ncqStorageStop ncq
wait writer
pure $ mkList [mkSym (show $ pretty h), mkSym (show $ pretty h2)]
@ -402,6 +403,7 @@ main = do
href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what)
liftIO $ ncqStorageStop ncq
wait writer
pure $ maybe nil (mkSym . show . pretty) href
@ -442,7 +444,26 @@ main = do
liftIO $ print $ pretty m
debug "stopping"
liftIO $ ncqStorageStop ncq
debug "stopping done"
wait writer
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:one-ref" $ nil_ $ \case
[StringLike fn] -> liftIO $ flip runContT pure do
ncq <- lift $ ncqStorageOpen fn
writer <- ContT $ withAsync $ ncqStorageRun ncq
link writer
ContT $ bracket none $ const do
none
none
e -> throwIO $ BadFormException @C (mkList e)
@ -477,34 +498,10 @@ main = do
liftIO $ ncqStorageStop ncq
wait writer
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:retry" $ nil_ $ const $ flip runContT pure do
q <- newTQueueIO
w <- newTVarIO 0
p1 <- ContT $ withAsync $ forever do
pause @'Seconds 0.001
x <- randomIO @Word64
atomically do
writeTQueue q x
modifyTVar w succ
p2 <- ContT $ withAsync $ do
atomically $ fix \next -> do
e <- readTQueue q
if (e == 0xDEADF00D) then none else next
p3 <- ContT $ withAsync $ do
pause @'Seconds 10
waitAnyCatchCancel [p1,p2,p3]
s <- atomically $ STM.flushTQueue q
n <- readTVarIO w
liftIO $ print $ "so?" <+> pretty n <+> pretty (length s)
setupLogger