This commit is contained in:
voidlizard 2025-08-14 07:46:43 +03:00
parent 09ec309ade
commit 6293fdebf2
1 changed files with 48 additions and 9 deletions

View File

@ -87,34 +87,74 @@ ncqWithStorage fp action = flip runContT pure do
wait w
pure r
ncqPutBlock :: MonadUnliftIO m
=> NCQStorage
-> LBS.ByteString
-> m (Maybe HashRef)
-- FIXME: Nothing-on-exception
ncqPutBlock sto lbs =
ncqPutBlock sto lbs = ncqPutBlock0 sto lbs True
{-# INLINE ncqPutBlock #-}
ncqTossBlock :: MonadUnliftIO m
=> NCQStorage
-> LBS.ByteString
-> m (Maybe HashRef)
ncqTossBlock sto lbs = ncqPutBlock0 sto lbs False
{-# INLINE ncqTossBlock #-}
ncqPutBlock0 :: MonadUnliftIO m
=> NCQStorage
-> LBS.ByteString
-> Bool
-> m (Maybe HashRef)
ncqPutBlock0 sto lbs wait =
ncqLocate sto ohash >>= \case
Nothing -> Just <$> ncqPutBS sto (Just B) (Just ohash) bs
Nothing -> Just <$> work sto (Just B) (Just ohash) bs
_ -> pure (Just ohash)
where
bs = LBS.toStrict lbs
ohash = HashRef $ hashObject @HbSync bs
{-# INLINE ncqPutBlock #-}
-- FIXME: maybe-on-storage-closed
work | wait = ncqPutBS
| otherwise = ncqTossBS
{-# INLINE ncqPutBlock0 #-}
ncqPutBS :: MonadUnliftIO m
=> NCQStorage
-> Maybe NCQSectionType
-> Maybe HashRef
-> ByteString
-> m HashRef
ncqPutBS ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe hash0 mhref) do
ncqPutBS = ncqPutBS0 True
{-# INLINE ncqPutBS #-}
ncqTossBS :: MonadUnliftIO m
=> NCQStorage
-> Maybe NCQSectionType
-> Maybe HashRef
-> ByteString
-> m HashRef
ncqTossBS = ncqPutBS0 False
{-# INLINE ncqTossBS #-}
-- FIXME: maybe-on-storage-closed
ncqPutBS0 :: MonadUnliftIO m
=> Bool
-> NCQStorage
-> Maybe NCQSectionType
-> Maybe HashRef
-> ByteString
-> m HashRef
ncqPutBS0 wait ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe hash0 mhref) do
waiter <- newEmptyTMVarIO
let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref
let work = do
let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref
let bs = ncqMakeSectionBS mtp h bs'
let shard = ncqGetShard ncq h
zero <- newTVarIO Nothing
@ -135,11 +175,10 @@ ncqPutBS ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe h
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
writeTQueue (ncqWriteOps ! nw) work
atomically $ takeTMVar waiter
if not wait then pure h else atomically (takeTMVar waiter)
where hash0 = HashRef (hashObject @HbSync bs')
ncqTryLoadState :: forall m. MonadUnliftIO m
=> NCQStorage
-> m ()