This commit is contained in:
Dmitry Zuikov 2023-01-26 08:31:23 +03:00
parent 25f857f889
commit e7ce36591a
4 changed files with 51 additions and 198 deletions

View File

@ -4,7 +4,6 @@ module HBS2.Actors.ChunkWriter
, newChunkWriterIO
, runChunkWriter
, stopChunkWriter
, newBlock
, delBlock
, commitBlock
, writeChunk
@ -50,6 +49,10 @@ import Control.Concurrent.STM.TSem (TSem)
import Control.Concurrent.STM.TQueue qualified as Q0
import Control.Concurrent
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
--
--
--TODO: cache file handles
@ -67,15 +70,16 @@ data ChunkWriter h m = forall a . ( MonadIO m
, pipeline :: Pipeline m ()
, dir :: FilePath
, storage :: a
, perBlock :: Cache FilePath (TQueue (IO ()))
, perBlock :: TVar (HashMap FilePath [IO ()])
, perBlockSem :: Cache FilePath TSem
}
blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int
blocksInProcess cw = liftIO $ Cache.purgeExpired cache >> Cache.size cache
where
cache = perBlock cw
blocksInProcess cw = undefined
-- liftIO $ Cache.purgeExpired cache >> Cache.size cache
-- where
-- cache = perBlock cw
runChunkWriter :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
@ -84,15 +88,6 @@ runChunkWriter :: forall h m . ( Eq (Hash h)
runChunkWriter = runChunkWriter2
runChunkWriter1 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
, MonadIO m )
=> ChunkWriter h m -> m ()
runChunkWriter1 w = do
liftIO $ createDirectoryIfMissing True ( dir w )
runPipeline (pipeline w)
runChunkWriter2 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h)
@ -101,43 +96,13 @@ runChunkWriter2 :: forall h m . ( Eq (Hash h)
runChunkWriter2 w = do
liftIO $ createDirectoryIfMissing True ( dir w )
let cache = perBlock w
fix \next -> do
-- kks <- liftIO $ take 1 <$> Cache.keys cache
-- liftIO $ for_ kks $ \h -> flush w h
-- pause ( 1 :: Timeout 'Seconds )
-- yield
-- next
-- liftIO $ print "runChunkWriter2"
stop <- liftIO $ readTVarIO (stopped w)
if stop then do
ks <- liftIO $ Cache.keys cache
liftIO $ for_ ks $ \k -> flush w k
else do
ks <- liftIO $ Cache.keys cache
amount <- for ks $ \k -> flush w k
if (sum amount == 0) then do
-- pure ()
pause ( 0.1 :: Timeout 'Seconds )
else do
liftIO $ print ("flushed:" <+> pretty (sum amount))
fix \next -> pause ( 1 :: Timeout 'Seconds) >> next
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = do
liftIO $ atomically $ writeTVar (stopped w) True
stopChunkWriter1 :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter1 w = do
let cache = perBlock w
stopPipeline ( pipeline w )
newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
, Storage a h ByteString m
, Block ByteString ~ ByteString
@ -153,7 +118,7 @@ newChunkWriterIO s tmp = do
def <- liftIO $ getXdgDirectory XdgData (defStorePath </> "temp-chunks")
let d = fromMaybe def tmp
mt <- liftIO $ Cache.newCache Nothing
mt <- liftIO $ newTVarIO mempty
mts <- liftIO $ Cache.newCache Nothing
running <- liftIO $ newTVarIO False
@ -173,21 +138,6 @@ makeFileName w salt h = dir w </> suff
where
suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h
-- TODO: check uniqueness
newBlock :: ( MonadIO m
, Hashable salt
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> Size -> m ()
newBlock w salt h size = liftIO do
withBinaryFile fn ReadWriteMode (`hSetFileSize` fromIntegral size)
where
fn = makeFileName w salt h
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m -> salt -> Hash h -> m ()
@ -241,26 +191,6 @@ commitBlock :: forall salt h m .
commitBlock = commitBlock2
writeChunk1 :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m
-> salt
-> Hash h
-> Offset
-> ByteString -> m ()
writeChunk1 w salt h o bs = addJob (pipeline w) $ liftIO do
-- writeChunk w salt h o bs = liftIO do
-- print $ "writeChunk:" <+> pretty fn
withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
hFlush fh
where
fn = makeFileName w salt h
writeChunk2 :: (Hashable salt, MonadIO m, Pretty (Hash h), Hashable (Hash h), Eq (Hash h))
=> ChunkWriter h m
-> salt
@ -272,46 +202,13 @@ writeChunk2 w salt h o bs = do
let cache = perBlock w
-- liftIO $ print $ "writeChunk" <+> pretty o <+> pretty (B.length bs) <+> pretty h
let action = do
withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
liftIO $ do
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
atomically $ Q0.writeTQueue q $ do
withBinaryFile fn ReadWriteMode $ \fh -> do
hSeek fh AbsoluteSeek (fromIntegral o)
B.hPutStr fh bs
-- hFlush fh
where
fn = makeFileName w salt h
-- Blocking!
-- we need to write last chunk before this will happen
-- FIXME: incremental calculation,
-- streaming, blah-blah
getHash1 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, MonadIO m
, Block ByteString ~ ByteString
, Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
getHash1 w salt h = liftIO do
q <- Q.newTBQueueIO 1
addJob (pipeline w) $ liftIO do
h1 <- hashObject @h <$> B.readFile fn
atomically $ Q.writeTBQueue q h1
atomically $ Q.readTBQueue q
atomically $ modifyTVar cache (HashMap.insertWith (<>) fn [action])
where
fn = makeFileName w salt h
@ -319,28 +216,10 @@ getHash1 w salt h = liftIO do
flush w fn = do
let cache = perBlock w
let scache = perBlockSem w
liftIO $ do
q <- Cache.fetchWithCache cache fn $ const Q0.newTQueueIO
s <- Cache.fetchWithCache scache fn $ const $ atomically $ Sem.newTSem 1
atomically $ Sem.waitTSem s
Cache.delete cache fn
flushed <- atomically (Q0.flushTQueue q)
liftIO $ do
-- withBinaryFile fn ReadWriteMode $ \fh -> do
-- withBinaryFile fn ReadWriteMode $ \fh -> do
for_ flushed id
atomically $ Sem.signalTSem s
pure (length flushed)
actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v))
sequence_ (fromMaybe mempty actions)
-- Blocking!
-- we need to write last chunk before this will happen
@ -361,41 +240,12 @@ getHash2 :: forall salt h m .
getHash2 w salt h = do
flush w fn
h1 <- liftIO $ hashObject @h <$> B.readFile fn
pure h1
liftIO $ hashObject @h <$> B.readFile fn
where
fn = makeFileName w salt h
commitBlock1 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
, Block ByteString ~ ByteString
, MonadIO m
, Pretty (Hash h)
)
=> ChunkWriter h m
-> salt
-> Hash h
-> m ()
commitBlock1 w@(ChunkWriter {storage = stor}) salt h = do
q <- liftIO $ Q.newTBQueueIO 1
addJob (pipeline w) (liftIO $ B.readFile fn >>= atomically . Q.writeTBQueue q)
s <- liftIO $ atomically $ Q.readTBQueue q
void $ putBlock stor s
delBlock w salt h
where
fn = makeFileName w salt h
commitBlock2 :: forall salt h m .
( Hashable salt
, Hashed h ByteString
@ -415,7 +265,6 @@ commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
s <- liftIO $ B.readFile fn
void $ putBlock stor s
delBlock w salt h
liftIO $ Cache.delete cache fn
where
fn = makeFileName w salt h

View File

@ -1,11 +0,0 @@
cradle:
cabal:
- path: "test/Peer2Main.hs"
component: "hbs2-tests:exe:test-peer-run"
- path: "test/TestSKey"
component: "hbs2-tests:test:test-skey"
- path: "test/TestChunkWriter"
component: "hbs2-tests:test:test-cw"

View File

@ -31,6 +31,17 @@ main = do
withSystemTempDirectory "cww-test" $ \dir -> do
let opts = [ StoragePrefix (dir </> ".test-cww")
]
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
w1 <- replicateM 1 $ async (simpleStorageWorker storage)
cw <- newChunkWriterIO storage (Just (dir </> ".qqq"))
w2 <- replicateM 1 $ async $ runChunkWriter cw
failed <- replicateM 100 $ do
bytes <- B8.pack <$> (replicateM size $ uniformM g)
@ -39,35 +50,24 @@ main = do
let psz = calcChunks (fromIntegral size) (fromIntegral chu)
let opts = [ StoragePrefix (dir </> ".test-cww")
]
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
w1 <- replicateM 1 $ async (simpleStorageWorker storage)
cw <- newChunkWriterIO storage (Just (dir </> ".qqq"))
w2 <- replicateM 1 $ async $ runChunkWriter cw
psz' <- shuffleM psz
-- psz' <- pure psz
-- forConcurrently_ psz' $ \(o,s) -> do
forConcurrently_ psz' $ \(o,s) -> do
-- forM_ psz' $ \(o,s) -> do
let t = B8.take s $ B8.drop o bytes
writeChunk cw 1 hash (fromIntegral o) t
h2 <- getHash cw 1 hash
-- h3 <- getHash cw 1 hash
mapM_ cancel $ w1 <> w2
if hash /= h2 then do
pure [1]
else
else do
commitBlock cw 1 hash
pure mempty
mapM_ cancel $ w1 <> w2
print $ "failed" <+> pretty (sum (mconcat failed))
pure ()

View File

@ -1,2 +1,17 @@
cradle:
cabal:
- path: "hbs-tests/test/Peer2Main.hs"
component: "hbs2-tests:exe:test-peer-run"
- path: "hbs2-tests/test/TestSKey"
component: "hbs2-tests:test:test-skey"
- path: "hbs2-tests/test/TestChunkWriter"
component: "hbs2-tests:test:test-cw"
- path: "hbs2-core/lib"
component: "hbs2-core:lib:hbs2-core"
- path: "hbs2-storage-simple/lib"
component: "hbs2-storage-simple:lib:hbs2-storage-simple"