This commit is contained in:
Dmitry Zuikov 2023-01-27 09:44:46 +03:00
parent 784ac22b0e
commit d1c8972b09
2 changed files with 16 additions and 57 deletions

View File

@ -16,49 +16,16 @@ import HBS2.Actors
import HBS2.Hash import HBS2.Hash
import HBS2.Storage import HBS2.Storage
import HBS2.Defaults import HBS2.Defaults
import HBS2.Clock
import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Sessions
import Control.Monad.Trans.Maybe
import Data.List qualified as L
import Data.Functor import Data.Functor
import Data.Function
import Control.Exception
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as B
import Data.ByteString qualified as BS
-- import Data.Cache (Cache)
-- import Data.Cache qualified as Cache
import Data.Foldable
import Data.Traversable
import Data.Hashable (hash)
import Data.Maybe
import Data.Word
import Prettyprinter import Prettyprinter
import System.Directory
import System.FilePath
import System.IO.Error
import System.IO
import System.IO.Temp
import System.FileLock
import Control.Concurrent.Async
import Control.Monad.Except
import Control.Monad
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TVar as TV import Control.Concurrent.STM.TVar as TV
import Control.Concurrent.STM.TBQueue qualified as Q
import Control.Concurrent.STM.TSem qualified as Sem
import Control.Concurrent.STM.TSem (TSem)
import Data.Typeable import Data.Typeable
import Control.Concurrent.MVar as MVar
import Control.Concurrent.STM.TQueue qualified as Q0
import Control.Concurrent
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap import Data.HashMap.Strict qualified as HashMap
@ -85,10 +52,8 @@ instance ( Hashable salt
, Hashed h ByteString , Hashed h ByteString
) => ChunkKey salt h ) => ChunkKey salt h
data Chunk h = P (IntMap ByteString) data Chunk h = P (IntMap ByteString)
| S (Hash h) ByteString | S ByteString
instance Hashed h ByteString => Monoid (Chunk h) where instance Hashed h ByteString => Monoid (Chunk h) where
mempty = P mempty mempty = P mempty
@ -96,32 +61,28 @@ instance Hashed h ByteString => Monoid (Chunk h) where
instance Hashed h ByteString => Semigroup (Chunk h) where instance Hashed h ByteString => Semigroup (Chunk h) where
(<>) (P a) (P b) = P ( a <> b ) (<>) (P a) (P b) = P ( a <> b )
(<>) (S _ s1) (S _ s2) = S h3 s3 (<>) (S s1) (S s2) = S s3
where where
s3 = s1 <> s2 s3 = s1 <> s2
h3 = hashObject s3
(<>) p@(P{}) (S _ s) = S h3 s3 (<>) p@(P{}) (S s) = S s3
where where
(S _ s1) = toS p (S s1) = toS p
s3 = s1 <> s s3 = s1 <> s
h3 = hashObject s3
(<>) (S _ s) p@(P{}) = S h3 s3 (<>) (S s) p@(P{}) = S s3
where where
(S _ s1) = toS p (S s1) = toS p
s3 = s <> s1 s3 = s <> s1
h3 = hashObject s3
mkP :: Offset -> ByteString -> Chunk h mkP :: Offset -> ByteString -> Chunk h
mkP o b = P (IntMap.singleton (fromIntegral o) b) mkP o b = P (IntMap.singleton (fromIntegral o) b)
toS :: Hashed h ByteString => Chunk h -> Chunk h toS :: Hashed h ByteString => Chunk h -> Chunk h
toS s@(S{}) = s toS s@(S{}) = s
toS (P xs) = S h s toS (P xs) = S s
where where
s = mconcat $ IntMap.elems xs s = mconcat $ IntMap.elems xs
h = hashObject s
data ChunkWriter h m = forall a . ( MonadIO m data ChunkWriter h m = forall a . ( MonadIO m
, Storage a h ByteString m , Storage a h ByteString m
@ -130,7 +91,6 @@ data ChunkWriter h m = forall a . ( MonadIO m
ChunkWriter ChunkWriter
{ stopped :: TVar Bool { stopped :: TVar Bool
, pipeline :: Pipeline IO () , pipeline :: Pipeline IO ()
, dir :: FilePath
, storage :: a , storage :: a
, perBlock :: !(TVar (HashMap SKey (Chunk h))) , perBlock :: !(TVar (HashMap SKey (Chunk h)))
} }
@ -154,7 +114,7 @@ runChunkWriter2 :: forall h m . ( Eq (Hash h)
=> ChunkWriter h IO -> m () => ChunkWriter h IO -> m ()
runChunkWriter2 w = do runChunkWriter2 w = do
liftIO $ createDirectoryIfMissing True ( dir w ) -- liftIO $ createDirectoryIfMissing True ( dir w )
let tv = perBlock w let tv = perBlock w
liftIO $ runPipeline (pipeline w) liftIO $ runPipeline (pipeline w)
-- fix \next -> do -- fix \next -> do
@ -176,12 +136,9 @@ newChunkWriterIO :: forall h a m . ( Key h ~ Hash h, h ~ HbSync
-> Maybe FilePath -> Maybe FilePath
-> m (ChunkWriter h m) -> m (ChunkWriter h m)
newChunkWriterIO s tmp = do newChunkWriterIO s _ = do
pip <- newPipeline defChunkWriterQ pip <- newPipeline defChunkWriterQ
def <- liftIO $ getXdgDirectory XdgData (defStorePath </> "temp-chunks")
let d = fromMaybe def tmp
mt <- liftIO $ newTVarIO mempty mt <- liftIO $ newTVarIO mempty
running <- liftIO $ newTVarIO False running <- liftIO $ newTVarIO False
@ -190,7 +147,6 @@ newChunkWriterIO s tmp = do
ChunkWriter ChunkWriter
{ stopped = running { stopped = running
, pipeline = pip , pipeline = pip
, dir = d
, storage = s , storage = s
, perBlock = mt , perBlock = mt
} }
@ -276,8 +232,8 @@ getHash2 w salt h = do
let k = newSKey (salt, h) let k = newSKey (salt, h)
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of case chunk of
Just (S h1 _) -> pure (Just h1) Just (S s) -> pure (Just (hashObject s))
_ -> pure Nothing _ -> pure Nothing
commitBlock2 :: forall salt h m . commitBlock2 :: forall salt h m .
@ -297,7 +253,7 @@ commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k chunk <- readTVarIO (perBlock w) <&> fmap toS . HashMap.lookup k
case chunk of case chunk of
Just (S _ s) -> void $ putBlock stor s >> delBlock w k Just (S s) -> void $ putBlock stor s >> delBlock w k
_ -> pure () -- FIXME: error _ -> pure () -- FIXME: error

View File

@ -280,6 +280,8 @@ blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, Sessions e (BlockInfo e) m , Sessions e (BlockInfo e) m
, Sessions e (BlockChunks e) m , Sessions e (BlockChunks e) m
, Sessions e (Stats e) m , Sessions e (Stats e) m
, Typeable (SessionKey e (BlockChunks e))
, Typeable (SessionKey e (BlockInfo e))
, HasStorage m , HasStorage m
, Num (Peer e) , Num (Peer e)
, Pretty (Peer e) , Pretty (Peer e)
@ -471,6 +473,7 @@ mkAdapter :: forall e m . ( m ~ PeerM e IO
, Hashable (SessionKey e (BlockChunks e)) , Hashable (SessionKey e (BlockChunks e))
, Sessions e (BlockChunks e) (ResponseM e m) , Sessions e (BlockChunks e) (ResponseM e m)
, Sessions e (Stats e) (ResponseM e m) , Sessions e (Stats e) (ResponseM e m)
, Typeable (SessionKey e (BlockChunks e))
, Default (SessionData e (Stats e)) , Default (SessionData e (Stats e))
, EventEmitter e (BlockChunks e) m , EventEmitter e (BlockChunks e) m
, Pretty (Peer e) , Pretty (Peer e)