diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 029d3403..f4e59f2d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -92,7 +92,7 @@ data NCQStorage = , ncqMaxCachedIdx :: Int , ncqMaxCachedData :: Int , ncqRefsMem :: TVar (HashMap HashRef HashRef) - , ncqRefsDirty :: TVar Bool + , ncqRefsDirty :: TVar Int , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec LBS.ByteString) , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) , ncqTrackedFiles :: TVar (HashSet FileKey) @@ -104,7 +104,7 @@ data NCQStorage = , ncqCurrentHandleR :: TVar Fd , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) - , ncqFlushNow :: TQueue () + , ncqFlushNow :: TVar Int , ncqOpenDone :: TMVar Bool , ncqStopped :: TVar Bool } @@ -154,6 +154,9 @@ ncqGetNewFossilName n@NCQStorage{} = do let (p,tpl) = splitFileName fn liftIO $ emptyTempFile p tpl +ncqGetRefsDataFileName :: NCQStorage -> FilePath +ncqGetRefsDataFileName ncq = ncqGetFileName ncq "refs.data" + ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath ncqGetIndexFileName ncq fk = do ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq") @@ -242,8 +245,8 @@ ncqIndexFile n@NCQStorage{} fp' = do ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do - atomically $ writeTVar ncqStopped True ncqStorageSync ncq + atomically $ writeTVar ncqStopped True atomically $ fix \next -> do done <- readTVar ncqWriteQueue <&> HPSQ.null unless done STM.retry @@ -255,6 +258,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do let dumpData = 1024 ^ 2 let syncData = fromIntegral ncqSyncSize + let untilStopped m = fix \loop -> do + m >> readTVarIO ncqStopped >>= \case + False -> loop + _ -> debug "STOPPING THREAD" + ContT $ bracket none $ const $ liftIO do -- writeJournal syncData readTVarIO ncqCurrentHandleW >>= closeFd @@ -264,7 +272,30 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do -- cap <- (10*) <$> getNumCapabilities cap <- getNumCapabilities - reader <- ContT $ withAsync $ forever do + refsWriter <- ContT $ withAsync $ untilStopped do + -- FIXME: timeout-hardcode + + void $ race (pause @'Seconds 2) $ atomically do + readTVar ncqStopped `orElse` STM.retry + + dirty <- readTVarIO ncqRefsDirty + + when (dirty > 0) do + refs <- readTVarIO ncqRefsMem <&> HM.toList + withBinaryFileDurableAtomic (ncqGetRefsDataFileName ncq) WriteMode $ \fh -> do + for_ refs $ \(k,v) -> do + let ks = coerce @_ @ByteString k + let vs = coerce @_ @ByteString v + let w = 4 + BS.length ks + BS.length vs -- always 4+64, but okay + liftIO do + BS.hPutStr fh (N.bytestring32 (fromIntegral w)) + BS.hPutStr fh ks + BS.hPutStr fh vs + atomically $ writeTVar ncqRefsDirty 0 + + link refsWriter + + reader <- ContT $ withAsync $ untilStopped do reqs <- atomically do xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) @@ -281,23 +312,39 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do indexQ <- newTQueueIO - indexer <- ContT $ withAsync $ forever do - (fd, fn) <- atomically (readTQueue indexQ) - key <- ncqIndexFile ncq fn <&> fromString @FileKey + indexer <- ContT $ withAsync $ untilStopped do - atomically do - ncqAddTrackedFilesSTM ncq [key] - modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + what <- atomically do + e <- tryReadTQueue indexQ + stop <- readTVar ncqStopped - ncqLoadSomeIndexes ncq [key] + case e of + Just x -> pure (Just x) + Nothing | stop -> pure Nothing + | otherwise -> STM.retry + + for_ what $ \(fd,fn) -> do + + key <- ncqIndexFile ncq fn <&> fromString @FileKey + + atomically do + ncqAddTrackedFilesSTM ncq [key] + modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + + ncqLoadSomeIndexes ncq [key] link indexer - fix \loop -> do + writer <- ContT $ withAsync $ untilStopped do flush <- liftIO $ race (pause @'Seconds ( realToFrac dumpTimeout / 4e6 )) $ atomically do - peekTQueue ncqFlushNow >> STM.flushTQueue ncqFlushNow - pure True + flush <- readTVar ncqFlushNow + stop <- readTVar ncqStopped + if flush > 0 || stop then do + writeTVar ncqFlushNow 0 + pure True + else do + STM.retry let flushNow = fromRight False flush @@ -307,16 +354,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do let dumpByTime = toMicroSeconds (TimeoutTS (now - lastW)) > dumpTimeout && bytes > 0 - when (dumpByTime || bytes >= dumpData || flushNow) do - -- debug "NCQStorage: dump data!" + stopped <- readTVarIO ncqStopped + + when (dumpByTime || bytes >= dumpData || flushNow || stopped) do + debug "NCQStorage: dump data!" liftIO $ writeJournal indexQ syncData - done <- atomically do - mt <- readTVar ncqWriteQueue <&> HPSQ.null - stop <- readTVar ncqStopped - pure (mt && stop) - - unless done loop + mapM_ waitCatch [writer,indexer,refsWriter] + mapM_ cancel [reader] where @@ -583,14 +628,27 @@ ncqStorageGet ncq@NCQStorage{..} h = do pure $ Just $ LBS.fromStrict $ BS.take (fromIntegral l) (BS.drop (fromIntegral o+4+32) mmaped) +ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) +ncqStorageGetRef NCQStorage{..} ref = readTVarIO ncqRefsMem <&> HM.lookup ref + +ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m () +ncqStorageSetRef NCQStorage{..} ref val = atomically do + stopped <- readTVar ncqStopped + unless stopped do + modifyTVar ncqRefsMem (HM.insert ref val) + modifyTVar ncqRefsDirty succ + +ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m () +ncqStorageDelRef NCQStorage{..} ref = atomically do + modifyTVar ncqRefsMem (HM.delete ref) + ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m NCQStorage ncqStorageDel sto h = do error "not implemented yet" ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do - atomically $ writeTQueue ncqFlushNow () - + atomically $ modifyTVar ncqFlushNow succ ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do @@ -631,10 +689,21 @@ ncqStorageOpen fp = do ncqFixIndexes ncq ncqLoadIndexes ncq readCurrent ncq + readRefs ncq atomically $ putTMVar ncqOpenDone True pure ncq where + + readRefs ncq@NCQStorage{..} = do + mmaped <- liftIO $ mmapFileByteString (ncqGetRefsDataFileName ncq) Nothing + kvs <- S.toList_ do + scanBS mmaped $ \bs -> do + let k = BS.copy (BS.take 32 bs) & coerce @_ @HashRef + let v = BS.copy (BS.take 32 (BS.drop 32 bs)) & coerce @_ @HashRef + S.yield (k,v) + atomically $ writeTVar ncqRefsMem (HM.fromList kvs) + readCurrent ncq@NCQStorage{..} = do let fn = ncqGetCurrentName ncq -- liftIO $ print $ pretty "FILE" <+> pretty fn @@ -691,7 +760,7 @@ ncqStorageInit_ check path = do let ncqRoot = path ncqRefsMem <- newTVarIO mempty - ncqRefsDirty <- newTVarIO False + ncqRefsDirty <- newTVarIO 0 let ncqSyncSize = 32 * (1024 ^ 2) let ncqMinLog = 2 * (1024 ^ 3) @@ -706,7 +775,7 @@ ncqStorageInit_ check path = do ncqLastWritten <- getTimeCoarse >>= newTVarIO ncqWaitIndex <- newTVarIO HPSQ.empty - ncqFlushNow <- newTQueueIO + ncqFlushNow <- newTVarIO 0 ncqOpenDone <- newEmptyTMVarIO ncqCurrentReadReq <- newTVarIO mempty ncqCurrentUsage <- newTVarIO mempty @@ -725,6 +794,7 @@ ncqStorageInit_ check path = do let ncqCurrentHandleW = undefined let ncqCurrentHandleR = undefined let ncq0 = NCQStorage{..} + lastSz <- try @_ @IOException (BS.readFile currentSize) <&> either (const 0) N.word64 @@ -752,6 +822,10 @@ ncqStorageInit_ check path = do debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen) - pure $ NCQStorage{..} + let ncq = NCQStorage{..} + + touch (ncqGetRefsDataFileName ncq) + + pure ncq diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 72febc2a..b5fcc611 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -311,6 +311,7 @@ main = do h2 <- lift $ ncqStoragePut ncq "PECHENTRESKI!" liftIO $ ncqStorageStop ncq + wait writer pure $ mkList [mkSym (show $ pretty h), mkSym (show $ pretty h2)] @@ -402,6 +403,7 @@ main = do href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what) liftIO $ ncqStorageStop ncq + wait writer pure $ maybe nil (mkSym . show . pretty) href @@ -442,7 +444,26 @@ main = do liftIO $ print $ pretty m + debug "stopping" liftIO $ ncqStorageStop ncq + debug "stopping done" + + wait writer + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:one-ref" $ nil_ $ \case + [StringLike fn] -> liftIO $ flip runContT pure do + + ncq <- lift $ ncqStorageOpen fn + + writer <- ContT $ withAsync $ ncqStorageRun ncq + link writer + + ContT $ bracket none $ const do + none + + none e -> throwIO $ BadFormException @C (mkList e) @@ -477,34 +498,10 @@ main = do liftIO $ ncqStorageStop ncq + wait writer + e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:retry" $ nil_ $ const $ flip runContT pure do - - q <- newTQueueIO - w <- newTVarIO 0 - - p1 <- ContT $ withAsync $ forever do - pause @'Seconds 0.001 - x <- randomIO @Word64 - atomically do - writeTQueue q x - modifyTVar w succ - - p2 <- ContT $ withAsync $ do - atomically $ fix \next -> do - e <- readTQueue q - if (e == 0xDEADF00D) then none else next - - p3 <- ContT $ withAsync $ do - pause @'Seconds 10 - - waitAnyCatchCancel [p1,p2,p3] - - s <- atomically $ STM.flushTQueue q - n <- readTVarIO w - - liftIO $ print $ "so?" <+> pretty n <+> pretty (length s) setupLogger