mirror of https://github.com/voidlizard/hbs2
wip, ncq fixed races
This commit is contained in:
parent
35e701c127
commit
90b9204e58
|
@ -34,6 +34,8 @@ import Data.HashPSQ qualified as HPSQ
|
|||
import Data.HashPSQ (HashPSQ)
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.IntSet qualified as IntSet
|
||||
import Data.IntSet (IntSet)
|
||||
import Data.Sequence as Seq
|
||||
import Data.List qualified as List
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
|
@ -88,6 +90,8 @@ data NCQStorageException =
|
|||
NCQStorageAlreadyExist String
|
||||
| NCQStorageSeedMissed
|
||||
| NCQStorageTimeout
|
||||
| NCQStorageCurrentAlreadyOpen
|
||||
| NCQStorageCantOpenCurrent
|
||||
deriving stock (Show,Typeable)
|
||||
|
||||
instance Exception NCQStorageException
|
||||
|
@ -124,6 +128,10 @@ data WQItem =
|
|||
, wqData :: Maybe LBS.ByteString
|
||||
}
|
||||
|
||||
newtype RFd = RFd { unRfd :: Fd }
|
||||
|
||||
newtype WFd = WFd { unWfd :: Fd }
|
||||
|
||||
data NCQStorage =
|
||||
NCQStorage
|
||||
{ ncqRoot :: FilePath
|
||||
|
@ -134,14 +142,14 @@ data NCQStorage =
|
|||
, ncqMaxCached :: Int
|
||||
, ncqSalt :: HashRef
|
||||
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem)
|
||||
, ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64))
|
||||
, ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64)))
|
||||
, ncqIndexed :: TVar IntSet
|
||||
, ncqIndexNow :: TVar Int
|
||||
, ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry))
|
||||
, ncqCachedEntries :: TVar Int
|
||||
, ncqNotWritten :: TVar Word64
|
||||
, ncqLastWritten :: TVar TimeSpec
|
||||
, ncqCurrentHandleW :: TVar Fd
|
||||
, ncqCurrentHandleR :: TVar Fd
|
||||
, ncqCurrentFd :: TVar (Maybe (RFd,WFd))
|
||||
, ncqCurrentUsage :: TVar (IntMap Int)
|
||||
, ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString))
|
||||
, ncqFlushNow :: TVar [TQueue ()]
|
||||
|
@ -152,13 +160,13 @@ data NCQStorage =
|
|||
|
||||
data Location =
|
||||
InWriteQueue WQItem
|
||||
| InCurrent (Word64, Word64)
|
||||
| InCurrent (Fd,Word64, Word64)
|
||||
| InFossil CachedEntry (Word64, Word64)
|
||||
|
||||
instance Pretty Location where
|
||||
pretty = \case
|
||||
InWriteQueue{} -> "write-queue"
|
||||
InCurrent (o,l) -> pretty $ mkForm @C "current" [mkInt o, mkInt l]
|
||||
InCurrent (fd,o,l) -> pretty $ mkForm @C "current" [mkInt fd, mkInt o, mkInt l]
|
||||
InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l]
|
||||
|
||||
type IsHCQKey h = ( Eq (Key h)
|
||||
|
@ -311,8 +319,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
indexQ <- newTQueueIO
|
||||
|
||||
ContT $ bracket none $ const $ liftIO do
|
||||
-- writeJournal syncData
|
||||
readTVarIO ncqCurrentHandleW >>= closeFd
|
||||
ncqFinalize ncq
|
||||
|
||||
debug "RUNNING STORAGE!"
|
||||
|
||||
|
@ -407,10 +414,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
(key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList
|
||||
|
||||
ncqAddTrackedFilesIO ncq [key]
|
||||
ncqLoadSomeIndexes ncq [fromString key]
|
||||
|
||||
atomically do
|
||||
modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
|
||||
|
||||
ncqLoadSomeIndexes ncq [fromString key]
|
||||
modifyTVar ncqIndexed (IntSet.insert (fromIntegral fd))
|
||||
|
||||
down <- atomically do
|
||||
writerDown <- pollSTM w <&> isJust
|
||||
|
@ -422,12 +430,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
link indexer
|
||||
pure indexer
|
||||
|
||||
writeJournal indexQ syncData = liftIO do
|
||||
writeJournal indexQ syncData = ncqWithCurrent ncq $ \(RFd fdr, WFd fh) -> liftIO do
|
||||
|
||||
trace $ "writeJournal" <+> pretty syncData
|
||||
|
||||
fh <- readTVarIO ncqCurrentHandleW
|
||||
|
||||
fdSeek fh SeekFromEnd 0
|
||||
|
||||
initQ <- readTVarIO ncqWriteQueue
|
||||
|
@ -462,11 +468,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
|
||||
fileSynchronise fh
|
||||
size <- fdSeek fh SeekFromEnd 0
|
||||
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size))
|
||||
|
||||
now1 <- getTimeCoarse
|
||||
atomically do
|
||||
q0 <- readTVar ncqWriteQueue
|
||||
w0 <- readTVar ncqWaitIndex
|
||||
w0 <- readTVar ncqStaged <&> fromMaybe HPSQ.empty . IntMap.lookup (fromIntegral fdr)
|
||||
b0 <- readTVar ncqNotWritten
|
||||
|
||||
wbytes <- newTVar 0
|
||||
|
@ -479,24 +486,20 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
next (HPSQ.delete h q, HPSQ.insert h now1 (o,l) w,xs)
|
||||
|
||||
writeTVar ncqWriteQueue rq
|
||||
writeTVar ncqWaitIndex rw
|
||||
modifyTVar ncqStaged (IntMap.insert (fromIntegral fdr) rw)
|
||||
bw <- readTVar wbytes
|
||||
writeTVar ncqNotWritten (max 0 (b0 - bw))
|
||||
|
||||
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size))
|
||||
|
||||
indexNow <- readTVarIO ncqIndexNow
|
||||
|
||||
when (fromIntegral size >= ncqMinLog || indexNow > 0) do
|
||||
|
||||
fsize <- readTVarIO ncqCurrentHandleR
|
||||
>>= getFdStatus
|
||||
<&> PFS.fileSize
|
||||
fsize <- getFdStatus fdr <&> PFS.fileSize
|
||||
|
||||
unless (fsize == 0) do
|
||||
|
||||
(n,u) <- atomically do
|
||||
r <- readTVar ncqCurrentHandleR <&> fromIntegral
|
||||
let r = fromIntegral fdr
|
||||
u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r
|
||||
pure (fromIntegral @_ @Word32 r, u)
|
||||
|
||||
|
@ -510,37 +513,42 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
|||
|
||||
atomically do
|
||||
writeTVar ncqIndexNow 0
|
||||
r <- readTVar ncqCurrentHandleR
|
||||
-- NOTE: extra-use
|
||||
-- добавляем лишний 1 для индексации.
|
||||
-- исходный файл закрываем, только когда проиндексировано.
|
||||
-- то есть должны отнять 1 после индексации.
|
||||
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1)
|
||||
writeTQueue indexQ (r, fossilized)
|
||||
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fdr) 1)
|
||||
writeTQueue indexQ (fdr, fossilized)
|
||||
|
||||
let flags = defaultFileFlags { exclusive = True }
|
||||
|
||||
touch current
|
||||
closeFd fh
|
||||
writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0)
|
||||
|
||||
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
|
||||
>>= atomically . writeTVar ncqCurrentHandleW
|
||||
|
||||
liftIO (PosixBase.openFd current Posix.ReadWrite flags)
|
||||
>>= atomically . writeTVar ncqCurrentHandleR
|
||||
ncqOpenCurrent ncq
|
||||
|
||||
debug $ "TRUNCATED, moved to" <+> pretty fossilized
|
||||
|
||||
toClose <- atomically do
|
||||
w <- readTVar ncqCurrentUsage <&> IntMap.toList
|
||||
let (alive,dead) = List.partition( (>0) . snd) w
|
||||
writeTVar ncqCurrentUsage (IntMap.fromList alive)
|
||||
pure dead
|
||||
|
||||
for_ toClose $ \(f,_) -> do
|
||||
when (f > 0) do
|
||||
toClose <- atomically do
|
||||
usage <- readTVar ncqCurrentUsage
|
||||
staged <- readTVar ncqStaged
|
||||
indexed <- readTVar ncqIndexed
|
||||
|
||||
let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage)
|
||||
|
||||
let closable = do
|
||||
(f, _) <- dead
|
||||
guard (IntSet.member f indexed)
|
||||
guard (maybe True HPSQ.null (IntMap.lookup f staged))
|
||||
pure f
|
||||
|
||||
writeTVar ncqCurrentUsage (IntMap.fromList alive)
|
||||
writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable)
|
||||
writeTVar ncqStaged (foldr IntMap.delete staged closable)
|
||||
|
||||
pure closable
|
||||
|
||||
for_ toClose $ \f -> do
|
||||
debug $ "CLOSE FD" <+> pretty f
|
||||
Posix.closeFd (fromIntegral f)
|
||||
closeFd (fromIntegral f)
|
||||
|
||||
--
|
||||
ncqStoragePut_ :: MonadUnliftIO m
|
||||
|
@ -614,7 +622,7 @@ ncqTombPrefix = "T;;\x00"
|
|||
ncqLocatedSize :: Location -> Integer
|
||||
ncqLocatedSize = \case
|
||||
InWriteQueue WQItem{..} -> fromIntegral $ maybe 0 LBS.length wqData
|
||||
InCurrent (_,s) -> fromIntegral s
|
||||
InCurrent (_,_,s) -> fromIntegral s
|
||||
InFossil _ (_,s) -> fromIntegral s
|
||||
|
||||
evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM ()
|
||||
|
@ -646,18 +654,23 @@ evictIfNeededSTM NCQStorage{..} howMany = do
|
|||
|
||||
ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location)
|
||||
ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
|
||||
l1 <- atomically do
|
||||
|
||||
inQ <- readTVar ncqWriteQueue
|
||||
inQ <- atomically $ readTVar ncqWriteQueue
|
||||
<&> (fmap snd . HPSQ.lookup h)
|
||||
<&> \case
|
||||
Just wq -> Just (InWriteQueue wq)
|
||||
_ -> Nothing
|
||||
|
||||
inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent
|
||||
pure (inQ <|> inC)
|
||||
for_ inQ $ exit . Just
|
||||
|
||||
for_ l1 $ exit . Just
|
||||
inC <- atomically $ do
|
||||
s <- readTVar ncqStaged <&> IntMap.toList
|
||||
let found = lastMay $ catMaybes [ (fd,) <$> HPSQ.lookup h hpsq | (fd, hpsq) <- s ]
|
||||
case found of
|
||||
Just (f, (_,(off,size))) -> pure (Just (InCurrent (fromIntegral f,off,size)))
|
||||
Nothing -> pure Nothing
|
||||
|
||||
for_ inC $ exit . Just
|
||||
|
||||
now <- getTimeCoarse
|
||||
tracked <- readTVarIO ncqTrackedFiles
|
||||
|
@ -738,14 +751,13 @@ ncqStorageGet ncq h = runMaybeT do
|
|||
lift (ncqStorageGet_ ncq location) >>= toMPlus
|
||||
|
||||
ncqStorageGet_ :: MonadUnliftIO m => NCQStorage -> Location -> m (Maybe LBS.ByteString)
|
||||
ncqStorageGet_ NCQStorage{..} = \case
|
||||
ncqStorageGet_ ncq@NCQStorage{..} = \case
|
||||
InWriteQueue WQItem{ wqData = Just lbs } -> do
|
||||
pure $ Just lbs
|
||||
|
||||
InCurrent (o,l) -> do
|
||||
InCurrent (fd,o,l) -> do
|
||||
r <- atomically do
|
||||
a <- newEmptyTMVar
|
||||
fd <- readTVar ncqCurrentHandleR
|
||||
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1)
|
||||
modifyTVar ncqCurrentReadReq (|> (fd, o, l, a))
|
||||
pure a
|
||||
|
@ -798,8 +810,8 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
|
|||
|
||||
ncqLocate ncq h >>= atomically . \case
|
||||
Just (InFossil _ _) -> writeTombstone (WQItem False Nothing)
|
||||
Just (InCurrent _) -> do
|
||||
modifyTVar ncqWaitIndex (HPSQ.delete h)
|
||||
Just (InCurrent (fd,_,_)) -> do
|
||||
modifyTVar ncqStaged (IntMap.adjust (HPSQ.delete h) (fromIntegral fd))
|
||||
writeTombstone (WQItem False Nothing)
|
||||
|
||||
Just (InWriteQueue _) -> writeTombstone (WQItem True Nothing)
|
||||
|
@ -877,7 +889,7 @@ ncqStorageOpen fp' = do
|
|||
|
||||
where
|
||||
|
||||
readCurrent ncq@NCQStorage{..} = do
|
||||
readCurrent ncq@NCQStorage{..} = ncqWithCurrent ncq \(RFd fd, _) -> do
|
||||
let fn = ncqGetCurrentName ncq
|
||||
-- liftIO $ print $ pretty "FILE" <+> pretty fn
|
||||
bs0 <- liftIO $ mmapFileByteString fn Nothing
|
||||
|
@ -902,11 +914,33 @@ ncqStorageOpen fp' = do
|
|||
|
||||
next (o+w+4, BS.drop (w+4) bs)
|
||||
|
||||
atomically $ writeTVar ncqWaitIndex (HPSQ.fromList items)
|
||||
atomically $ modifyTVar ncqStaged (IntMap.insert (fromIntegral fd) (HPSQ.fromList items))
|
||||
|
||||
ncqStorageInit :: MonadUnliftIO m => FilePath -> m NCQStorage
|
||||
ncqStorageInit = ncqStorageInit_ True
|
||||
|
||||
ncqOpenCurrent :: MonadUnliftIO m => NCQStorage -> m ()
|
||||
ncqOpenCurrent ncq@NCQStorage{..} = do
|
||||
let fp = ncqGetCurrentName ncq
|
||||
touch fp
|
||||
let flags = defaultFileFlags { exclusive = True }
|
||||
fdw <- liftIO (PosixBase.openFd fp Posix.ReadWrite flags) <&> WFd
|
||||
fdr <- liftIO (PosixBase.openFd fp Posix.ReadOnly flags) <&> RFd
|
||||
atomically $ writeTVar ncqCurrentFd (Just (fdr, fdw))
|
||||
|
||||
ncqWithCurrent :: MonadUnliftIO m => NCQStorage -> ((RFd, WFd) -> m a) -> m a
|
||||
ncqWithCurrent ncq@NCQStorage{..} action = do
|
||||
flip fix 2 $ \next i -> do
|
||||
readTVarIO ncqCurrentFd >>= \case
|
||||
|
||||
Just a -> action a
|
||||
|
||||
Nothing | i >= 0 -> do
|
||||
ncqOpenCurrent ncq
|
||||
next (pred i)
|
||||
|
||||
Nothing -> do
|
||||
throwIO NCQStorageCantOpenCurrent
|
||||
|
||||
ncqStorageInit_ :: MonadUnliftIO m => Bool -> FilePath -> m NCQStorage
|
||||
ncqStorageInit_ check path = do
|
||||
|
@ -956,7 +990,7 @@ ncqStorageInit_ check path = do
|
|||
|
||||
ncqNotWritten <- newTVarIO 0
|
||||
ncqLastWritten <- getTimeCoarse >>= newTVarIO
|
||||
ncqWaitIndex <- newTVarIO HPSQ.empty
|
||||
ncqStaged <- newTVarIO mempty
|
||||
|
||||
ncqFlushNow <- newTVarIO mempty
|
||||
ncqOpenDone <- newEmptyTMVarIO
|
||||
|
@ -966,6 +1000,8 @@ ncqStorageInit_ check path = do
|
|||
ncqTrackedFiles <- newTVarIO HPSQ.empty
|
||||
ncqCachedEntries <- newTVarIO 0
|
||||
ncqIndexNow <- newTVarIO 0
|
||||
ncqCurrentFd <- newTVarIO Nothing
|
||||
ncqIndexed <- newTVarIO mempty
|
||||
|
||||
let currentName = ncqGetCurrentName_ path ncqGen
|
||||
|
||||
|
@ -974,8 +1010,6 @@ ncqStorageInit_ check path = do
|
|||
hereCurrent <- doesPathExist currentName
|
||||
|
||||
when hereCurrent $ liftIO do
|
||||
let ncqCurrentHandleW = undefined
|
||||
let ncqCurrentHandleR = undefined
|
||||
let ncq0 = NCQStorage{..}
|
||||
|
||||
lastSz <- try @_ @IOException (BS.readFile currentSize)
|
||||
|
@ -994,19 +1028,10 @@ ncqStorageInit_ check path = do
|
|||
ncqWriteError ncq0 msg
|
||||
mv currentName fossilized
|
||||
|
||||
touch currentName
|
||||
|
||||
let flags = defaultFileFlags { exclusive = True }
|
||||
|
||||
ncqCurrentHandleW <- liftIO (PosixBase.openFd currentName Posix.ReadWrite flags)
|
||||
>>= newTVarIO
|
||||
|
||||
ncqCurrentHandleR <- liftIO (PosixBase.openFd currentName Posix.ReadOnly flags)
|
||||
>>= newTVarIO
|
||||
|
||||
debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen)
|
||||
|
||||
let ncq = NCQStorage{..}
|
||||
ncqOpenCurrent ncq
|
||||
|
||||
pure ncq
|
||||
|
||||
|
@ -1016,6 +1041,19 @@ ncqStorageFlush = ncqStorageSync
|
|||
ncqIndexRightNow :: MonadUnliftIO m => NCQStorage -> m ()
|
||||
ncqIndexRightNow NCQStorage{..} = atomically $ modifyTVar ncqIndexNow succ
|
||||
|
||||
ncqFinalize :: MonadUnliftIO m => NCQStorage -> m ()
|
||||
ncqFinalize NCQStorage{..} = do
|
||||
|
||||
liftIO $ readTVarIO ncqStaged <&> IntMap.keys >>= mapM_ (closeFd . fromIntegral)
|
||||
atomically (writeTVar ncqStaged mempty)
|
||||
|
||||
readTVarIO ncqCurrentFd >>= \case
|
||||
Just (RFd _, WFd wfd) -> do
|
||||
liftIO (closeFd wfd)
|
||||
atomically (writeTVar ncqCurrentFd Nothing)
|
||||
|
||||
_ -> none
|
||||
|
||||
withNCQ :: forall m a . MonadUnliftIO m
|
||||
=> (NCQStorage -> NCQStorage)
|
||||
-> FilePath
|
||||
|
|
|
@ -1179,27 +1179,6 @@ executable test-scripts
|
|||
, zstd
|
||||
|
||||
|
||||
executable test-cq-storage
|
||||
import: shared-properties
|
||||
import: common-deps
|
||||
default-language: Haskell2010
|
||||
ghc-options:
|
||||
hs-source-dirs: test
|
||||
main-is: TestCQ.hs
|
||||
build-depends:
|
||||
base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq
|
||||
, network
|
||||
, string-conversions
|
||||
, db-pipe
|
||||
, suckless-conf
|
||||
, network-byte-order
|
||||
, text
|
||||
, time
|
||||
, mmap
|
||||
, zstd
|
||||
, unix
|
||||
|
||||
|
||||
executable tcq
|
||||
import: shared-properties
|
||||
import: common-deps
|
||||
|
|
|
@ -1,556 +0,0 @@
|
|||
{-# Language AllowAmbiguousTypes #-}
|
||||
{-# Language UndecidableInstances #-}
|
||||
{-# Language MultiWayIf #-}
|
||||
{-# Language RecordWildCards #-}
|
||||
module Main where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
import HBS2.OrDie
|
||||
import HBS2.Hash
|
||||
import HBS2.Data.Types.Refs
|
||||
import HBS2.Clock
|
||||
import HBS2.Merkle
|
||||
|
||||
import HBS2.Storage
|
||||
import HBS2.Storage.Simple
|
||||
import HBS2.Storage.Operations.ByteString
|
||||
|
||||
import HBS2.System.Logger.Simple.ANSI
|
||||
|
||||
import HBS2.Storage.NCQ
|
||||
import HBS2.Data.Log.Structured.NCQ
|
||||
|
||||
|
||||
import Data.Config.Suckless.Syntax
|
||||
import Data.Config.Suckless.Script
|
||||
import Data.Config.Suckless.System
|
||||
|
||||
import DBPipe.SQLite hiding (field)
|
||||
|
||||
import Data.Bits
|
||||
import Data.ByteString (ByteString)
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString.Char8 qualified as BS8
|
||||
import Data.ByteString.Builder
|
||||
import Data.Maybe
|
||||
import Data.Word
|
||||
import Data.List qualified as List
|
||||
import Data.Vector qualified as V
|
||||
import Data.Vector ((!))
|
||||
import Control.Monad.Trans.Cont
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Network.ByteOrder qualified as N
|
||||
import Data.Coerce
|
||||
import Data.HashPSQ qualified as HPSQ
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.IntMap qualified as IntMap
|
||||
import Data.IntMap (IntMap)
|
||||
import Data.Fixed
|
||||
import System.Environment
|
||||
import System.Directory
|
||||
import System.Posix.Fcntl
|
||||
import System.Posix.IO
|
||||
import System.IO.MMap
|
||||
import System.IO qualified as IO
|
||||
import System.Exit (exitSuccess, exitFailure)
|
||||
import System.Random
|
||||
import Safe
|
||||
import Lens.Micro.Platform
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
|
||||
import UnliftIO
|
||||
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
|
||||
import Streaming.Prelude qualified as S
|
||||
import System.TimeIt
|
||||
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
setupLogger :: MonadIO m => m ()
|
||||
setupLogger = do
|
||||
setLogging @DEBUG $ toStderr . logPrefix "[debug] "
|
||||
setLogging @ERROR $ toStderr . logPrefix "[error] "
|
||||
setLogging @WARN $ toStderr . logPrefix "[warn] "
|
||||
setLogging @NOTICE $ toStdout . logPrefix ""
|
||||
|
||||
flushLoggers :: MonadIO m => m ()
|
||||
flushLoggers = do
|
||||
silence
|
||||
|
||||
silence :: MonadIO m => m ()
|
||||
silence = do
|
||||
setLoggingOff @DEBUG
|
||||
setLoggingOff @ERROR
|
||||
setLoggingOff @WARN
|
||||
setLoggingOff @NOTICE
|
||||
setLoggingOff @TRACE
|
||||
|
||||
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
||||
let dict = makeDict @C do
|
||||
|
||||
entry $ bindMatch "--help" $ nil_ \case
|
||||
HelpEntryBound what -> helpEntry what
|
||||
[StringLike s] -> helpList False (Just s)
|
||||
_ -> helpList False Nothing
|
||||
|
||||
internalEntries
|
||||
|
||||
entry $ bindMatch "run" $ \case
|
||||
[ StringLike what ] -> do
|
||||
liftIO (readFile what)
|
||||
<&> parseTop
|
||||
>>= either (error.show) pure
|
||||
>>= evalTop
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:sqlite" $ nil_ $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
let dbname = "jopakita.db"
|
||||
rm dbname
|
||||
newDb <- newDBPipeEnv dbPipeOptsDef dbname
|
||||
|
||||
withDB newDb do
|
||||
ddl [qc|CREATE TABLE kv (k BLOB PRIMARY KEY, v int)|]
|
||||
|
||||
timeItNamed "sqlite -- test insert" do
|
||||
withDB newDb $ transactional do
|
||||
for_ hashes $ \h -> do
|
||||
let k = coerce @_ @ByteString h
|
||||
insert [qc|insert into kv (k,v) values(?,?)|] (k,0)
|
||||
|
||||
replicateM_ 5 do
|
||||
withDB newDb do
|
||||
timeItNamed "sqlite -- select test" do
|
||||
-- fn <- newTVarIO 0
|
||||
-- fns <- newTVarIO 0
|
||||
q <- newTQueueIO
|
||||
for_ hashes $ \h -> do
|
||||
let k = coerce @_ @ByteString h
|
||||
|
||||
founds <- select [qc|select k,v from kv where k = ?|] (Only k)
|
||||
|
||||
for_ founds $ \(s :: ByteString,n :: Int) -> do
|
||||
atomically $ writeTQueue q (s,n)
|
||||
|
||||
found <- atomically (STM.flushTQueue q) <&> List.length
|
||||
liftIO $ IO.hPrint stderr $ "FOUND" <+> pretty found
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:hashmap" $ nil_ $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
let hma = HM.fromList [(h,()) | h <- hashes ]
|
||||
|
||||
replicateM_ 5 do
|
||||
timeItNamed (show $ "HashMap lookup test" <+> pretty (List.length hashes)) do
|
||||
q <- newTQueueIO
|
||||
for_ hashes $ \h -> do
|
||||
when (HM.member h hma) do
|
||||
atomically $ writeTQueue q h
|
||||
|
||||
n <- atomically ( STM.flushTQueue q) <&> List.length
|
||||
liftIO $ print $ "FOUND" <+> pretty n
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:nway:scan" $ nil_ $ \case
|
||||
[ StringLike fn ]-> liftIO do
|
||||
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn)
|
||||
let emptyKey = BS.replicate nwayKeySize 0
|
||||
nwayHashScanAll meta mmaped $ \o k v -> do
|
||||
unless (k == emptyKey) do
|
||||
liftIO $ print $ "scan:found" <+> fill 44 (pretty (coerce @_ @HashRef k)) <+> pretty o
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:nway:lookup" $ nil_ $ \case
|
||||
|
||||
[ StringLike fn ] -> liftIO do
|
||||
|
||||
hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
(mmaped, nw) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn)
|
||||
|
||||
replicateM_ 5 do
|
||||
timeItNamed (show $ "lookup:nway" <+> pretty (List.length hashes)) do
|
||||
rQ <- newTQueueIO
|
||||
|
||||
for_ hashes $ \h -> do
|
||||
r <- nwayHashLookup nw mmaped (coerce @_ @ByteString h)
|
||||
when (isJust r) do
|
||||
atomically $ writeTQueue rQ (h,r)
|
||||
|
||||
found <- atomically $ STM.flushTQueue rQ
|
||||
liftIO $ print $ "FOUND" <+> pretty (List.length found)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:nway:stats" $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
|
||||
mt_ <- newTVarIO 0
|
||||
total_ <- newTVarIO 0
|
||||
|
||||
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn)
|
||||
|
||||
let emptyKey = BS.replicate nwayKeySize 0
|
||||
nwayHashScanAll meta mmaped $ \o k v -> do
|
||||
atomically do
|
||||
modifyTVar total_ succ
|
||||
when (k == emptyKey) do
|
||||
modifyTVar mt_ succ
|
||||
|
||||
mt <- readTVarIO mt_
|
||||
total <- readTVarIO total_
|
||||
let used = total - mt
|
||||
|
||||
let ratio = realToFrac @_ @(Fixed E3) (realToFrac used / realToFrac total)
|
||||
|
||||
let stats = mkForm @C "stats" [ mkForm "empty" [mkInt mt]
|
||||
, mkForm "used" [mkInt used]
|
||||
, mkForm "total" [mkInt total]
|
||||
, mkForm "ratio" [mkDouble ratio]
|
||||
]
|
||||
|
||||
pure $ mkList [mkForm "metadata" [mkSyntax meta], stats]
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:nway:metadata" $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
(_, nw) <- nwayHashMMapReadOnly fn >>= orThrowUser "can't mmape file"
|
||||
pure $ mkSyntax nw
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:nway:write" $ nil_ $ \case
|
||||
[StringLike fn] -> liftIO do
|
||||
hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
let items = [ (coerce @_ @ByteString x, N.bytestring64 0) | x <- hashes ]
|
||||
nwayWriteBatch (nwayAllocDef 1.10 32 8 8) "." fn items
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:index" $ \case
|
||||
[ StringLike p, StringLike fsrc ]-> lift $ flip runContT pure do
|
||||
|
||||
ncq <- lift $ ncqStorageOpen p
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
(fres,_) <- lift $ ncqIndexFile ncq fsrc
|
||||
|
||||
pure $ mkSym fres
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:list-tracked-files" $ nil_ \case
|
||||
[StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do
|
||||
ncqListTrackedFiles ncq >>= mapM_ display_
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:get:stdout" $ nil_ \case
|
||||
|
||||
[StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do
|
||||
w <- ncqStorageGet ncq h
|
||||
maybe1 w exitFailure LBS.putStr
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:has" $ \case
|
||||
|
||||
[StringLike fn, HashLike h] -> liftIO $ flip runContT pure do
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
lift do
|
||||
ncqStorageHasBlock ncq h >>= \case
|
||||
Nothing -> pure nil
|
||||
Just x -> pure $ mkInt x
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:up" $ nil_ $ \case
|
||||
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
|
||||
ncq@NCQStorage{..} <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
trf <- readTVarIO ncqTrackedFiles <&> HPSQ.keys
|
||||
|
||||
for_ trf $ \tf -> do
|
||||
notice $ "tracked" <+> pretty tf
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw" $ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
|
||||
debug "SHIT"
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
h <- lift $ ncqStoragePut ncq "JOPAKITA!"
|
||||
h2 <- lift $ ncqStoragePut ncq "PECHENTRESKI!"
|
||||
|
||||
liftIO $ ncqStorageStop ncq
|
||||
wait writer
|
||||
|
||||
pure $ mkList [mkSym (show $ pretty h), mkSym (show $ pretty h2)]
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:list" $ nil_ \case
|
||||
[StringLike p, StringLike f] -> liftIO $ flip runContT pure do
|
||||
|
||||
ncq <- lift $ ncqStorageOpen p
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
lift $ ncqStorageScanDataFile ncq f $ \o _ k v -> do
|
||||
liftIO $ print $ pretty k -- <+> pretty o <+> pretty (BS.length v)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:find-some" $ nil_ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
liftIO $ for_ hashes $ \h -> runMaybeT do
|
||||
what <- liftIO (ncqStorageHasBlock ncq h) >>= toMPlus
|
||||
-- let h1 = hashObject @HbSync what
|
||||
-- liftIO $ print $ "block" <+> pretty h <+> pretty h1 <+> pretty (LBS.length what)
|
||||
liftIO $ print $ "block" <+> pretty h <+> pretty what -- (LBS.length what)
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:dump-some" $ nil_ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix
|
||||
|
||||
s <- simpleStorageInit @HbSync (Just xdg)
|
||||
|
||||
w <- ContT $ withAsync $ simpleStorageWorker s
|
||||
link w
|
||||
|
||||
let sto = AnyStorage s
|
||||
|
||||
rm fn
|
||||
dump <- openFile fn WriteMode
|
||||
|
||||
for_ hashes $ \h -> runMaybeT do
|
||||
blk <- getBlock sto (coerce h) >>= toMPlus
|
||||
debug $ "read" <+> pretty (LBS.length blk)
|
||||
none
|
||||
-- liftIO $ LBS.hPut dump blk
|
||||
|
||||
hClose dump
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:locate:one" $ nil_ \case
|
||||
[StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do
|
||||
ncqLocate ncq h >>= \case
|
||||
Nothing -> print $ pretty "not-found" <+> pretty h
|
||||
Just l -> print $ pretty "found" <+> pretty h <+> pretty l
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:put:stdin" $ \case
|
||||
[StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do
|
||||
what <- liftIO BS.getContents
|
||||
href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what)
|
||||
pure $ maybe nil (mkSym . show . pretty) href
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:get" $ nil_ \case
|
||||
[StringLike fn, HashLike href] -> lift $ withNCQ id fn $ \ncq -> do
|
||||
mbs <- ncqStorageGet ncq href
|
||||
maybe1 mbs exitFailure LBS.putStr
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case
|
||||
[StringLike fn, StringLike what] -> liftIO $ flip runContT pure do
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
none
|
||||
|
||||
lbs <- liftIO $ LBS.readFile what
|
||||
|
||||
ta <- getTimeCoarse
|
||||
|
||||
(t1,hashes) <- timeItT $ liftIO do
|
||||
chu <- S.toList_ (readChunkedBS lbs (256*1024))
|
||||
forConcurrently chu $ \chunk -> do
|
||||
ncqStoragePut ncq chunk >>= orThrowUser "can't save"
|
||||
|
||||
tb <- getTimeCoarse
|
||||
|
||||
notice $ "stored in" <+> pretty t1
|
||||
<+> pretty (realToFrac @_ @(Fixed E6) (realToFrac (toMicroSeconds (TimeoutTS (tb - ta))) / 1e6))
|
||||
|
||||
-- FIXME: handle-hardcode
|
||||
let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings
|
||||
|
||||
m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do
|
||||
void $ ncqStoragePut ncq bss >>= orThrowUser "can't save"
|
||||
|
||||
liftIO $ print $ pretty m
|
||||
|
||||
debug "stopping"
|
||||
liftIO $ ncqStorageStop ncq
|
||||
debug "stopping done"
|
||||
|
||||
wait writer
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:one-ref" $ nil_ $ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
none
|
||||
|
||||
none
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:write-some" $ nil_ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
|
||||
hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix
|
||||
|
||||
s <- simpleStorageInit @HbSync (Just xdg)
|
||||
|
||||
w <- ContT $ withAsync $ simpleStorageWorker s
|
||||
link w
|
||||
|
||||
let sto = AnyStorage s
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
none
|
||||
|
||||
for_ hashes $ \h -> runMaybeT do
|
||||
already <- liftIO (ncqStorageHasBlock ncq h <&> isJust)
|
||||
guard (not already)
|
||||
-- debug $ "write" <+> pretty h
|
||||
blk <- getBlock sto (coerce h) >>= toMPlus
|
||||
liftIO do
|
||||
let l = LBS.length blk
|
||||
-- print $ pretty h <+> pretty l
|
||||
ncqStoragePut ncq blk
|
||||
|
||||
warn "about to stop storage!"
|
||||
liftIO $ ncqStorageStop ncq
|
||||
|
||||
wait writer
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:raw:del-some" $ nil_ \case
|
||||
[StringLike fn] -> liftIO $ flip runContT pure do
|
||||
|
||||
hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines
|
||||
|
||||
ncq <- lift $ ncqStorageOpen fn
|
||||
|
||||
writer <- ContT $ withAsync $ ncqStorageRun ncq
|
||||
link writer
|
||||
|
||||
ContT $ bracket none $ const do
|
||||
none
|
||||
|
||||
debug $ "TO DELETE" <+> pretty (length hashes)
|
||||
|
||||
for_ hashes $ \h -> runMaybeT do
|
||||
liftIO do
|
||||
-- print $ "delete" <+> pretty h
|
||||
ncqStorageDel ncq h
|
||||
|
||||
liftIO $ ncqStorageStop ncq
|
||||
|
||||
wait writer
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
entry $ bindMatch "test:ncq:index:now" $ nil_ \case
|
||||
[StringLike p] -> lift do
|
||||
withNCQ id p $ \ncq -> do
|
||||
display_ $ "test:ncq:index:now" <+> pretty p
|
||||
ncqIndexRightNow ncq
|
||||
pause @'Seconds 10
|
||||
display_ $ "test:ncq:index:now" <+> pretty p <+> "done"
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
entry $ bindMatch "test:ncq:run" $ nil_ \case
|
||||
[StringLike p] -> lift do
|
||||
withNCQ id p $ \_ -> do
|
||||
display_ $ "hello from ncq" <+> pretty p
|
||||
|
||||
e -> throwIO $ BadFormException @C (mkList e)
|
||||
|
||||
|
||||
setupLogger
|
||||
|
||||
argz <- liftIO getArgs
|
||||
|
||||
forms <- parseTop (unlines $ unwords <$> splitForms argz)
|
||||
& either (error.show) pure
|
||||
|
||||
tvd <- newTVarIO dict
|
||||
|
||||
(runEval tvd forms >>= eatNil display)
|
||||
`finally` flushLoggers
|
||||
|
Loading…
Reference in New Issue