mirror of https://github.com/voidlizard/hbs2
490 lines
15 KiB
Haskell
490 lines
15 KiB
Haskell
{-# Language TemplateHaskell #-}
|
|
{-# Language ScopedTypeVariables #-}
|
|
{-# Language UndecidableInstances #-}
|
|
module HBS2.Storage.Simple
|
|
( module HBS2.Storage.Simple
|
|
, StoragePrefix(..)
|
|
, Storage(..)
|
|
) where
|
|
|
|
import HBS2.Clock
|
|
import HBS2.Hash
|
|
import HBS2.Prelude.Plated
|
|
import HBS2.Data.Types.Refs (refMetaData)
|
|
import HBS2.Storage
|
|
import HBS2.Base58
|
|
|
|
import HBS2.System.Logger.Simple
|
|
|
|
import Control.Concurrent.Async
|
|
import Control.Exception
|
|
import Control.Monad
|
|
import Control.Monad.Except
|
|
import Control.Monad.Trans.Maybe
|
|
import Data.ByteString.Lazy qualified as LBS
|
|
import Data.ByteString.Lazy.Char8 qualified as LBS8
|
|
import Data.ByteString qualified as BS
|
|
import Data.ByteString (ByteString)
|
|
import Data.Foldable
|
|
import Data.List qualified as L
|
|
import Data.Maybe
|
|
import Data.Map (Map)
|
|
import Data.Map qualified as Map
|
|
import Data.Either
|
|
import Lens.Micro.Platform
|
|
import Prettyprinter
|
|
import System.Directory
|
|
import System.FilePath.Posix
|
|
import System.IO
|
|
import System.IO.Error
|
|
import System.IO.Temp
|
|
import System.AtomicWrite.Writer.LazyByteString qualified as AwLBS
|
|
import System.AtomicWrite.Writer.ByteString qualified as AwBS
|
|
import Data.Time.Clock.POSIX (getPOSIXTime)
|
|
|
|
import Data.HashMap.Strict qualified as HashMap
|
|
import Data.HashMap.Strict (HashMap)
|
|
|
|
import System.IO.Posix.MMap ( unsafeMMapFile )
|
|
|
|
import Control.Concurrent.STM
|
|
import Control.Concurrent.STM.TBQueue qualified as TBQ
|
|
import Control.Concurrent.STM.TBMQueue qualified as TBMQ
|
|
import Control.Concurrent.STM.TBMQueue (TBMQueue)
|
|
import Control.Concurrent.STM.TVar qualified as TV
|
|
|
|
import Codec.Serialise
|
|
|
|
|
|
-- NOTE: random accessing files in a git-like storage
|
|
-- causes to file handles exhaust.
|
|
-- Therefore, those handles MUST be in something like
|
|
-- pool.
|
|
--
|
|
-- I.e. we're should use something like TBChan to queue
|
|
-- operations and wait in getBlock 'till it's completion
|
|
-- in order to make the disk access in this fashion safe
|
|
|
|
type IsSimpleStorageKey h = ( Eq (Key h)
|
|
, Hashable (Key h)
|
|
, IsKey h
|
|
, Key h ~ Hash h
|
|
, ToByteString (AsBase58 (Hash h))
|
|
, FromByteString (AsBase58 (Hash h))
|
|
)
|
|
|
|
newtype StorageQueueSize = StorageQueueSize { fromQueueSize :: Int }
|
|
deriving stock (Data,Show)
|
|
deriving newtype (Eq,Ord,Enum,Num,Integral,Real)
|
|
|
|
data SimpleStorage a =
|
|
SimpleStorage
|
|
{ _storageDir :: FilePath
|
|
, _storageOpQ :: TBMQueue ( IO () )
|
|
, _storageStopWriting :: TVar Bool
|
|
, _storageMMaped :: TVar (HashMap (Key a) ByteString)
|
|
, _storageMMapedLRU :: TVar (HashMap (Key a) TimeSpec)
|
|
}
|
|
|
|
makeLenses ''SimpleStorage
|
|
|
|
storageBlocks :: SimpleGetter (SimpleStorage h) FilePath
|
|
storageBlocks = to f
|
|
where
|
|
f b = _storageDir b </> "blocks"
|
|
|
|
storageTemp :: SimpleGetter (SimpleStorage h) FilePath
|
|
storageTemp = to f
|
|
where
|
|
f b = _storageDir b </> "temp"
|
|
|
|
storageRefs :: SimpleGetter (SimpleStorage h) FilePath
|
|
storageRefs = to f
|
|
where
|
|
f b = _storageDir b </> "refs"
|
|
|
|
touchForRead :: (MonadIO m, IsSimpleStorageKey h) => SimpleStorage h -> Key h -> m ByteString
|
|
touchForRead ss k = liftIO $ do
|
|
|
|
mbs <- readTVarIO mmaped <&> HashMap.lookup k
|
|
|
|
case mbs of
|
|
Just bs -> pure bs
|
|
Nothing -> do
|
|
|
|
bsmm <- unsafeMMapFile (simpleBlockFileName ss k)
|
|
|
|
tick <- getTimeCoarse
|
|
|
|
atomically $ do
|
|
modifyTVar' mmaped (HashMap.insert k bsmm)
|
|
modifyTVar' (ss ^. storageMMapedLRU) (HashMap.insert k tick)
|
|
pure bsmm
|
|
|
|
where
|
|
mmaped = ss ^. storageMMaped
|
|
|
|
|
|
simpleStorageInit :: forall h m opts . (MonadIO m, Data opts, IsSimpleStorageKey h)
|
|
=> opts -> m (SimpleStorage h)
|
|
|
|
simpleStorageInit opts = liftIO $ do
|
|
let prefix = uniLastDef "." opts :: StoragePrefix
|
|
let qSize = uniLastDef 2000 opts :: StorageQueueSize -- FIXME: defaults ?
|
|
|
|
stor <- SimpleStorage
|
|
<$> canonicalizePath (fromPrefix prefix)
|
|
<*> TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize))
|
|
<*> TV.newTVarIO False
|
|
<*> TV.newTVarIO mempty
|
|
<*> TV.newTVarIO mempty
|
|
|
|
createDirectoryIfMissing True (stor ^. storageBlocks)
|
|
createDirectoryIfMissing True (stor ^. storageTemp)
|
|
|
|
let alph = getAlphabet
|
|
|
|
for_ alph $ \a -> do
|
|
createDirectoryIfMissing True ( (stor ^. storageBlocks) </> L.singleton a )
|
|
createDirectoryIfMissing True ( (stor ^. storageRefs) </> L.singleton a )
|
|
|
|
pure stor
|
|
|
|
catchAny :: IO a -> (SomeException -> IO a) -> IO a
|
|
catchAny = Control.Exception.catch
|
|
|
|
simpleAddTask :: SimpleStorage h -> IO () -> IO ()
|
|
simpleAddTask s task = do
|
|
atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task
|
|
|
|
simpleStorageStop :: SimpleStorage h -> IO ()
|
|
simpleStorageStop ss = do
|
|
atomically $ TV.writeTVar ( ss ^. storageStopWriting ) True
|
|
fix \next -> do
|
|
mt <- atomically $ TBMQ.isEmptyTBMQueue ( ss ^. storageOpQ )
|
|
if mt then do
|
|
atomically $ TBMQ.closeTBMQueue ( ss ^. storageOpQ )
|
|
pure ()
|
|
else
|
|
pause ( 0.01 :: Timeout 'Seconds ) >> next
|
|
|
|
simpleStorageWorker :: IsSimpleStorageKey h => SimpleStorage h -> IO ()
|
|
simpleStorageWorker ss = do
|
|
|
|
ops <- async $ fix \next -> do
|
|
s <- atomically $ do TBMQ.readTBMQueue ( ss ^. storageOpQ )
|
|
case s of
|
|
Nothing -> pure ()
|
|
Just a -> a >> next
|
|
|
|
killer <- async $ forever $ do
|
|
pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting
|
|
simpleAddTask ss $ do
|
|
|
|
atomically $ do
|
|
|
|
alive <- readTVar ( ss ^. storageMMapedLRU )
|
|
mmaped <- readTVar ( ss ^. storageMMaped )
|
|
|
|
let survived = mmaped `HashMap.intersection` alive
|
|
|
|
writeTVar ( ss ^. storageMMaped ) survived
|
|
|
|
killerLRU <- async $ forever $ do
|
|
pause ( 10 :: Timeout 'Seconds ) -- FIXME: setting
|
|
atomically $ writeTVar ( ss ^. storageMMapedLRU ) mempty
|
|
|
|
(_, e) <- waitAnyCatchCancel [ops,killer, killerLRU]
|
|
|
|
pure ()
|
|
|
|
simpleBlockFileName :: Pretty (Hash h) => SimpleStorage h -> Hash h -> FilePath
|
|
simpleBlockFileName ss h = path
|
|
where
|
|
(pref,suf) = splitAt 1 (show (pretty h))
|
|
path = view storageBlocks ss </> pref </> suf
|
|
{-# INLINE simpleBlockFileName #-}
|
|
|
|
simpleRefFileName :: Pretty (Hash h) => SimpleStorage h -> Hash h -> FilePath
|
|
simpleRefFileName ss h = path
|
|
where
|
|
(pref,suf) = splitAt 1 (show (pretty h))
|
|
path = view storageRefs ss </> pref </> suf
|
|
|
|
|
|
-- NOTE: reads a whole file into memory!
|
|
-- if file size is too big --- it will
|
|
-- cause consequences!
|
|
--
|
|
-- However, we can not hold file
|
|
-- handles in lazy bytestrings, because
|
|
-- here maybe too many open files
|
|
--
|
|
-- So, the block MUST be small
|
|
--
|
|
simpleGetBlockLazy :: (IsKey h, Pretty (Key h))
|
|
=> SimpleStorage h
|
|
-> Hash h
|
|
-> IO (Maybe LBS.ByteString)
|
|
|
|
simpleGetBlockLazy s key = do
|
|
resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString))
|
|
let fn = simpleBlockFileName s key
|
|
let action = do
|
|
|
|
r <- tryJust (guard . isDoesNotExistError)
|
|
(BS.readFile fn <&> LBS.fromStrict)
|
|
|
|
result <- case r of
|
|
Right bytes -> pure (Just bytes)
|
|
Left _ -> pure Nothing
|
|
|
|
void $ atomically $ TBMQ.writeTBMQueue resQ result
|
|
|
|
let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
|
|
|
|
simpleAddTask s (catch action onFail)
|
|
|
|
atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure
|
|
|
|
simpleGetChunkLazy :: IsSimpleStorageKey h
|
|
=> SimpleStorage h
|
|
-> Hash h
|
|
-> Offset
|
|
-> Size
|
|
-> IO (Maybe LBS.ByteString)
|
|
|
|
simpleGetChunkLazy s key off size = do
|
|
resQ <- TBMQ.newTBMQueueIO 1 :: IO (TBMQueue (Maybe LBS.ByteString))
|
|
let action = do
|
|
let fn = simpleBlockFileName s key
|
|
|
|
bs <- (Just <$> touchForRead s key) `catchAny` \_ -> do
|
|
err $ "simpleGetChunkLazy" <+> pretty key <+> pretty off <+> pretty size
|
|
pure Nothing
|
|
|
|
let result = BS.take (fromIntegral size) . BS.drop (fromIntegral off) <$> bs
|
|
|
|
void $ atomically $ TBMQ.writeTBMQueue resQ (LBS.fromStrict <$> result)
|
|
|
|
let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
|
|
|
|
simpleAddTask s (catch action onFail )
|
|
|
|
atomically $ TBMQ.readTBMQueue resQ >>= maybe (pure Nothing) pure
|
|
|
|
simplePutBlockLazy :: (IsKey h, Hashed h LBS.ByteString)
|
|
=> Bool -- | wait
|
|
-> SimpleStorage h
|
|
-> LBS.ByteString
|
|
-> IO (Maybe (Hash h))
|
|
|
|
simplePutBlockLazy doWait s lbs = do
|
|
|
|
let hash = hashObject lbs
|
|
|
|
stop <- atomically $ TV.readTVar ( s ^. storageStopWriting )
|
|
|
|
if stop then do
|
|
pure Nothing
|
|
|
|
else do
|
|
|
|
waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue Bool)
|
|
|
|
let action = do
|
|
handle (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False)
|
|
do
|
|
let fn = simpleBlockFileName s hash
|
|
AwBS.atomicWriteFile fn (LBS.toStrict lbs)
|
|
atomically $ TBQ.writeTBQueue waits True
|
|
|
|
simpleAddTask s action
|
|
|
|
if doWait then do
|
|
ok <- atomically $ TBQ.readTBQueue waits
|
|
|
|
unless ok do
|
|
err $ "simplePutBlockLazy" <+> pretty hash
|
|
|
|
pure $! if ok then Just hash else Nothing
|
|
else
|
|
pure $ Just hash
|
|
|
|
-- TODO: should be async as well?
|
|
simpleBlockExists :: IsKey h
|
|
=> SimpleStorage h
|
|
-> Hash h
|
|
-> IO (Maybe Integer)
|
|
|
|
simpleBlockExists ss hash = runMaybeT $ do
|
|
let fn = simpleBlockFileName ss hash
|
|
exists <- liftIO $ doesFileExist fn
|
|
unless exists mzero
|
|
liftIO $ getFileSize fn
|
|
|
|
spawnAndWait :: SimpleStorage h -> IO a -> IO (Maybe a)
|
|
spawnAndWait s act = do
|
|
doneQ <- TBMQ.newTBMQueueIO 1
|
|
simpleAddTask s (act >>= \r -> atomically (TBMQ.writeTBMQueue doneQ r))
|
|
atomically $ TBMQ.readTBMQueue doneQ
|
|
|
|
|
|
simpleWriteLinkRaw :: forall h . ( IsSimpleStorageKey h
|
|
, Hashed h LBS.ByteString
|
|
, ToByteString (AsBase58 (Hash h))
|
|
)
|
|
=> SimpleStorage h
|
|
-> Hash h
|
|
-> LBS.ByteString
|
|
-> IO (Maybe (Hash h))
|
|
|
|
simpleWriteLinkRaw ss h lbs = do
|
|
let fnr = simpleRefFileName ss h
|
|
|
|
runMaybeT $ do
|
|
r <- MaybeT $ putBlock ss lbs
|
|
MaybeT $ liftIO $ spawnAndWait ss $ do
|
|
AwBS.atomicWriteFile fnr (toByteString (AsBase58 r))
|
|
`catchAny` \_ -> do
|
|
err $ "simpleWriteLinkRaw" <+> pretty h <+> pretty fnr
|
|
|
|
pure h
|
|
|
|
simpleWriteLinkRawRef :: forall h . ( IsSimpleStorageKey h
|
|
, Hashed h LBS.ByteString
|
|
, ToByteString (AsBase58 (Hash h))
|
|
)
|
|
=> SimpleStorage h
|
|
-> [(String, String)]
|
|
-> Hash h
|
|
-> Hash h
|
|
-> IO ()
|
|
|
|
simpleWriteLinkRawRef ss meta h ref = do
|
|
let fnr = simpleRefFileName ss h
|
|
void $ spawnAndWait ss $ do
|
|
AwBS.atomicWriteFile fnr (toByteString (AsBase58 ref))
|
|
`catchAny` \_ -> do
|
|
err $ "simpleWriteLinkRawRef" <+> pretty h <+> pretty ref <+> pretty fnr
|
|
|
|
unless (null meta) do
|
|
let metaname = fnr `addExtension` "metadata"
|
|
meta0 <- try @IOError (LBS.readFile metaname)
|
|
<&> fromRight mempty
|
|
<&> deserialiseOrFail @(Map String String)
|
|
<&> fromRight mempty
|
|
let meta1 = meta0 <> Map.fromList meta
|
|
r <- try @IOError $ AwBS.atomicWriteFile metaname (LBS.toStrict $ serialise meta1)
|
|
case r of
|
|
Right{} -> pure ()
|
|
Left e -> err $ "simpleWriteLinkRawRef" <+> viaShow e
|
|
|
|
simpleReadLinkRaw :: forall r h . ( IsKey h, Hashed h r, Pretty r)
|
|
=> SimpleStorage h
|
|
-> r
|
|
-> IO (Maybe LBS.ByteString)
|
|
|
|
simpleReadLinkRaw ss ref = do
|
|
let hash = hashObject @h ref
|
|
let fn = simpleRefFileName ss hash
|
|
rs <- spawnAndWait ss $ do
|
|
meta <- try @IOError (LBS.readFile (fn `addExtension` "metadata"))
|
|
>>= \case
|
|
Left{} -> pure mempty
|
|
Right sm -> pure $ deserialiseOrFail @(Map String String) sm & fromRight mempty
|
|
|
|
deleted <- runMaybeT do
|
|
ts <- toMPlus $ Map.lookup "expires" meta
|
|
t0 <- toMPlus $ readMay @Int ts
|
|
now <- liftIO $ getPOSIXTime <&> round
|
|
|
|
if now <= t0 then do
|
|
pure False
|
|
else do
|
|
liftIO $ simpleDelRef ss hash
|
|
pure True
|
|
|
|
if deleted == Just True then do
|
|
pure Nothing
|
|
else do
|
|
-- FIXME: log-this-situation
|
|
(Just <$> LBS.readFile fn) `catchAny` \e -> do
|
|
-- TODO: update-stats-instead-of-spamming
|
|
err $ "simpleReadLinkRaw" <+> pretty ref <+> pretty fn <+> viaShow e
|
|
pure Nothing
|
|
|
|
pure $ fromMaybe Nothing rs
|
|
|
|
|
|
simpleDelRef :: IsKey h => SimpleStorage h -> Hash h -> IO ()
|
|
simpleDelRef ss hash = do
|
|
let fn = simpleRefFileName ss hash
|
|
here <- doesFileExist fn
|
|
when here (removeFile fn)
|
|
let meta = fn `addExtension` "metadata"
|
|
mhere <- doesFileExist meta
|
|
when mhere (removeFile meta)
|
|
|
|
simpleReadLinkVal :: ( IsKey h
|
|
, IsSimpleStorageKey h
|
|
, Hashed h LBS.ByteString
|
|
, FromByteString (AsBase58 (Hash h))
|
|
)
|
|
=> SimpleStorage h
|
|
-> Hash h
|
|
-> IO (Maybe LBS.ByteString)
|
|
|
|
simpleReadLinkVal ss hash = do
|
|
let fn = simpleRefFileName ss hash
|
|
rs <- spawnAndWait ss $ do
|
|
(Just <$> BS.readFile fn) `catchAny` \e -> do
|
|
trace $ "simpleReadLinkVal" <+> pretty hash <+> pretty fn <+> viaShow e
|
|
pure Nothing
|
|
|
|
runMaybeT do
|
|
MaybeT . getBlock ss . unAsBase58 =<< MaybeT (pure (fromByteString =<< join rs))
|
|
|
|
instance ( MonadIO m, IsKey hash
|
|
, Hashed hash LBS.ByteString
|
|
, Key hash ~ Hash hash
|
|
, IsSimpleStorageKey hash
|
|
)
|
|
=> Storage (SimpleStorage hash) hash LBS.ByteString m where
|
|
|
|
putBlock s lbs = liftIO $ simplePutBlockLazy True s lbs
|
|
|
|
enqueueBlock s lbs = liftIO $ simplePutBlockLazy False s lbs
|
|
|
|
getBlock s key = liftIO $ simpleGetBlockLazy s key
|
|
|
|
getChunk s k off size = liftIO $ simpleGetChunkLazy s k off size
|
|
|
|
hasBlock s k = liftIO $ simpleBlockExists s k
|
|
|
|
updateRef ss ref v = do
|
|
let refHash = hashObject @hash ref
|
|
let meta = refMetaData ref
|
|
void $ liftIO $ simpleWriteLinkRawRef ss meta refHash v
|
|
|
|
getRef ss ref = do
|
|
runMaybeT do
|
|
-- debug $ "REF" <+> pretty (AsBase58 ref)
|
|
bs <- MaybeT $ liftIO $ simpleReadLinkRaw ss ref
|
|
let bss = LBS.toStrict bs
|
|
parsed <- MaybeT $ pure $ fromByteString bss
|
|
pure $ unAsBase58 parsed
|
|
|
|
delBlock ss h = do
|
|
let fn = simpleBlockFileName ss h
|
|
void $ liftIO $ spawnAndWait ss do
|
|
exists <- doesFileExist fn
|
|
when exists (removeFile fn)
|
|
|
|
delRef ss ref = do
|
|
let refHash = hashObject @hash ref
|
|
void $ liftIO $ spawnAndWait ss $ do
|
|
simpleDelRef ss refHash
|
|
|
|
|