diff --git a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs index 0bf120c0..8a2dda82 100644 --- a/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs +++ b/hbs2-storage-simple/lib/HBS2/Storage/Simple.hs @@ -65,6 +65,7 @@ data SimpleStorage a = { _storageDir :: FilePath , _storageOpQ :: TBMQueue ( IO () ) , _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString + , _storageStopWriting :: TVar Bool } makeLenses ''SimpleStorage @@ -86,12 +87,15 @@ simpleStorageInit opts = liftIO $ do tbq <- TBMQ.newTBMQueueIO (fromIntegral (fromQueueSize qSize)) + tstop <- TV.newTVarIO False + hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 1)) -- FIXME: real setting let stor = SimpleStorage { _storageDir = pdir , _storageOpQ = tbq , _storageChunksCache = hcache + , _storageStopWriting = tstop } createDirectoryIfMissing True (stor ^. storageBlocks) @@ -110,6 +114,16 @@ simpleAddTask :: SimpleStorage h -> IO () -> IO () simpleAddTask s task = do atomically $ TBMQ.writeTBMQueue (s ^. storageOpQ) task +simpleStorageStop :: SimpleStorage h -> IO () +simpleStorageStop ss = do + atomically $ TV.writeTVar ( ss ^. storageStopWriting ) True + fix \next -> do + mt <- atomically $ TBMQ.isEmptyTBMQueue ( ss ^. storageOpQ ) + if mt then + pure () + else + pause ( 0.01 :: Timeout 'Seconds ) >> next + simpleStorageWorker :: SimpleStorage h -> IO () simpleStorageWorker ss = do @@ -259,19 +273,26 @@ simplePutBlockLazy s lbs = do let hash = hashObject lbs :: Key (Raw LBS.ByteString) let fn = simpleBlockFileName s hash - waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue Bool) + stop <- atomically $ TV.readTVar ( s ^. storageStopWriting ) - let action = do - catch (LBS.writeFile fn lbs) - (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False) + if stop then do + pure Nothing - atomically $ TBQ.writeTBQueue waits True + else do - simpleAddTask s action + waits <- TBQ.newTBQueueIO 1 :: IO (TBQueue Bool) - ok <- atomically $ TBQ.readTBQueue waits + let action = do + catch (LBS.writeFile fn lbs) + (\(_ :: IOError) -> atomically $ TBQ.writeTBQueue waits False) - pure $! if ok then Just hash else Nothing + atomically $ TBQ.writeTBQueue waits True + + simpleAddTask s action + + ok <- atomically $ TBQ.readTBQueue waits + + pure $! if ok then Just hash else Nothing simpleBlockExists :: SimpleStorage h diff --git a/hbs2-storage-simple/test/TestSimpleStorage.hs b/hbs2-storage-simple/test/TestSimpleStorage.hs index 83c1f91d..6d59ae3f 100644 --- a/hbs2-storage-simple/test/TestSimpleStorage.hs +++ b/hbs2-storage-simple/test/TestSimpleStorage.hs @@ -96,7 +96,7 @@ testSimpleStorageNoKeys = do let pieces = take 1000 $ shrink [0x00 .. 0xFF] :: [[Word8]] results' <- forConcurrently pieces $ \p -> do - let hash = hashObject (LBS.pack p) + let hash = hashObject @HbSync (LBS.pack p) s <- getBlock storage hash pure (LBS.length <$> s) diff --git a/hbs2/Main.hs b/hbs2/Main.hs index 5d354621..5a6f32ac 100644 --- a/hbs2/Main.hs +++ b/hbs2/Main.hs @@ -2,18 +2,20 @@ module Main where import Control.Monad import Control.Monad.IO.Class +import Control.Concurrent.Async import Data.ByteString (ByteString) import Data.ByteString qualified as B +import Data.ByteString.Lazy qualified as LBS import Data.Function import Data.Functor import Options.Applicative import Prettyprinter import System.Directory -import System.FilePath.Posix +-- import System.FilePath.Posix import System.IO import Streaming.Prelude qualified as S -import Streaming qualified as S +-- import Streaming qualified as S import HBS2.Storage import HBS2.Storage.Simple @@ -48,14 +50,15 @@ readChunked handle size = fuu next runStore :: Opts -> SimpleStorage HbSync -> IO () -runStore opts _ = do +runStore opts ss = do let fname = uniLastMay @OptInputFile opts handle <- maybe (pure stdin) (flip openFile ReadMode . unOptFile) fname hashes <- readChunked handle (fromIntegral defBlockSize) -- FIXME: to settings! - & S.map (hashObject . B.copy) + & S.mapM (\blk -> putBlock ss (LBS.fromStrict blk) >> pure blk) + & S.map hashObject & S.map HashRef & S.toList_ @@ -68,11 +71,20 @@ runStore opts _ = do withStore :: Data opts => opts -> ( SimpleStorage HbSync -> IO () ) -> IO () withStore opts f = do - xdg <- getXdgDirectory XdgData "hbs2" <&> ( defStorePath) + xdg <- getXdgDirectory XdgData defStorePath <&> fromString - let pref = uniLastDef defStorePath opts :: StoragePrefix - simpleStorageInit (Just pref) >>= f + let pref = uniLastDef xdg opts :: StoragePrefix + s <- simpleStorageInit (Just pref) + storage <- async $ simpleStorageWorker s + + f s + + simpleStorageStop s + + _ <- waitAnyCatch [storage] + + pure () main :: IO () main = join . customExecParser (prefs showHelpOnError) $ diff --git a/hbs2/hbs2.cabal b/hbs2/hbs2.cabal index 2a49645b..3e2bb6da 100644 --- a/hbs2/hbs2.cabal +++ b/hbs2/hbs2.cabal @@ -72,6 +72,8 @@ executable hbs2 , containers , cryptonite , deepseq + , directory + , filepath , hashable , interpolatedstring-perl6 , memory @@ -84,7 +86,6 @@ executable hbs2 , uniplate - hs-source-dirs: . default-language: Haskell2010