mirror of https://github.com/voidlizard/hbs2
wip
This commit is contained in:
parent
2525f1abb0
commit
ab3d22747d
|
@ -5,7 +5,7 @@ import Data.String
|
|||
|
||||
-- defChunkSize :: Integer
|
||||
defChunkSize :: Integral a => a
|
||||
defChunkSize = 500
|
||||
defChunkSize = 1024
|
||||
|
||||
defBlockSize :: Integer
|
||||
defBlockSize = 256 * 1024
|
||||
|
|
|
@ -38,7 +38,7 @@ type family HashType ( a :: Type) where
|
|||
HashType HbSync = Blake2b_256
|
||||
|
||||
newtype instance Hash HbSync =
|
||||
HbSyncHash ShortByteString
|
||||
HbSyncHash ByteString
|
||||
deriving stock (Eq,Ord,Data,Generic)
|
||||
deriving newtype (Hashable,Show)
|
||||
|
||||
|
@ -59,22 +59,22 @@ getAlphabet = BS8.unpack (unAlphabet alphabet)
|
|||
|
||||
|
||||
instance Hashed HbSync ByteString where
|
||||
hashObject s = HbSyncHash $ force $ SB.toShort $ BA.convert digest
|
||||
hashObject s = HbSyncHash $! BA.convert digest
|
||||
where
|
||||
digest = hash s :: Digest (HashType HbSync)
|
||||
|
||||
instance Hashed HbSync LBS.ByteString where
|
||||
hashObject s = HbSyncHash $ force $ SB.toShort $ BA.convert digest
|
||||
hashObject s = HbSyncHash $! BA.convert digest
|
||||
where
|
||||
digest = hashlazy s :: Digest (HashType HbSync)
|
||||
|
||||
instance IsString (Hash HbSync) where
|
||||
fromString s = maybe (error ("invalid base58: " <> show s)) HbSyncHash doDecode
|
||||
where
|
||||
doDecode = SB.toShort <$> decodeBase58 alphabet (BS8.pack s)
|
||||
doDecode = decodeBase58 alphabet (BS8.pack s)
|
||||
|
||||
instance Pretty (Hash HbSync) where
|
||||
pretty (HbSyncHash s) = pretty @String [qc|{encodeBase58 bitcoinAlphabet (SB.fromShort s)}|]
|
||||
pretty (HbSyncHash s) = pretty @String [qc|{encodeBase58 bitcoinAlphabet s}|]
|
||||
|
||||
|
||||
instance FromJSON (Hash HbSync) where
|
||||
|
|
|
@ -59,7 +59,6 @@ data SimpleStorage a =
|
|||
SimpleStorage
|
||||
{ _storageDir :: FilePath
|
||||
, _storageOpQ :: TBMQueue ( IO () )
|
||||
, _storageChunksCache :: Cache (FilePath, Offset, Size) ByteString
|
||||
, _storageStopWriting :: TVar Bool
|
||||
}
|
||||
|
||||
|
@ -88,12 +87,9 @@ simpleStorageInit opts = liftIO $ do
|
|||
|
||||
tstop <- TV.newTVarIO False
|
||||
|
||||
hcache <- Cache.newCache (Just (toTimeSpec @'Seconds 30)) -- FIXME: real setting
|
||||
|
||||
let stor = SimpleStorage
|
||||
{ _storageDir = pdir
|
||||
, _storageOpQ = tbq
|
||||
, _storageChunksCache = hcache
|
||||
, _storageStopWriting = tstop
|
||||
}
|
||||
|
||||
|
@ -138,39 +134,11 @@ simpleStorageWorker ss = do
|
|||
|
||||
killer <- async $ forever $ do
|
||||
pause ( 30 :: Timeout 'Seconds ) -- FIXME: setting
|
||||
Cache.purgeExpired ( ss ^. storageChunksCache )
|
||||
|
||||
(_, e) <- waitAnyCatchCancel [ops,killer]
|
||||
|
||||
pure ()
|
||||
|
||||
|
||||
simpleChunkLookup :: IsKey h
|
||||
=> SimpleStorage h
|
||||
-> Hash h
|
||||
-> Offset
|
||||
-> Size
|
||||
-> IO (Maybe LBS.ByteString)
|
||||
|
||||
simpleChunkLookup s k off size = do
|
||||
let fn = simpleBlockFileName s k
|
||||
let cache = s ^. storageChunksCache
|
||||
Cache.lookup cache (fn, off, size) <&> fmap LBS.fromStrict
|
||||
|
||||
simpleChunkCache :: IsKey h
|
||||
=> SimpleStorage h
|
||||
-> Hash h
|
||||
-> Offset
|
||||
-> Size
|
||||
-> LBS.ByteString
|
||||
-> IO ()
|
||||
|
||||
simpleChunkCache s k off size bs = do
|
||||
let fn = simpleBlockFileName s k
|
||||
let cache = s ^. storageChunksCache
|
||||
-- print ("caching!", fn, off, size)
|
||||
Cache.insert cache (fn, off, size) (LBS.toStrict bs)
|
||||
|
||||
simpleBlockFileName :: Pretty (Hash h) => SimpleStorage h -> Hash h -> FilePath
|
||||
simpleBlockFileName ss h = path
|
||||
where
|
||||
|
@ -231,43 +199,16 @@ simpleGetChunkLazy s key off size = do
|
|||
let action = do
|
||||
let fn = simpleBlockFileName s key
|
||||
|
||||
cached <- simpleChunkLookup s key off size
|
||||
r <- tryJust (guard . isDoesNotExistError)
|
||||
$ withBinaryFile fn ReadMode $ \ha -> do
|
||||
hSeek ha AbsoluteSeek ( fromIntegral off )
|
||||
LBS.hGet ha ( fromIntegral size )
|
||||
|
||||
case cached of
|
||||
Just chunk -> do
|
||||
void $ atomically $ TBMQ.writeTBMQueue resQ (Just chunk)
|
||||
result <- case r of
|
||||
Right bytes -> pure (Just bytes)
|
||||
Left _ -> pure Nothing
|
||||
|
||||
Nothing -> do
|
||||
r <- tryJust (guard . isDoesNotExistError)
|
||||
$ withBinaryFile fn ReadMode $ \handle -> do
|
||||
hSeek handle AbsoluteSeek ( fromIntegral off )
|
||||
bytes <- LBS.hGet handle ( fromIntegral size )
|
||||
|
||||
let ahead = 16
|
||||
let bnum = off `div` fromIntegral size
|
||||
let doCache =
|
||||
ahead > 0
|
||||
&& size > 0
|
||||
&& size < 4096
|
||||
&& (bnum `mod` ahead) == 0
|
||||
|
||||
when doCache do -- FIXME:! setting
|
||||
chunks <- forM [ size .. size * fromIntegral ahead ] $ \i -> do
|
||||
let o = fromIntegral off + fromIntegral (i * size)
|
||||
hSeek handle AbsoluteSeek o
|
||||
fwd <- LBS.hGet handle (fromIntegral size)
|
||||
pure (fwd, fromIntegral o)
|
||||
|
||||
let chunks' = takeWhile (not . LBS.null . fst) chunks
|
||||
mapM_ (\(c,o) -> simpleChunkCache s key o size c) chunks'
|
||||
|
||||
pure bytes
|
||||
|
||||
result <- case r of
|
||||
Right bytes -> pure (Just bytes)
|
||||
Left _ -> pure Nothing
|
||||
|
||||
void $ atomically $ TBMQ.writeTBMQueue resQ result
|
||||
void $ atomically $ TBMQ.writeTBMQueue resQ result
|
||||
|
||||
let onFail (_ :: IOError)= void $ atomically $ TBMQ.writeTBMQueue resQ Nothing
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ main = do
|
|||
|
||||
storage <- simpleStorageInit opts :: IO (SimpleStorage HbSync)
|
||||
|
||||
w1 <- replicateM 128 $ async (simpleStorageWorker storage)
|
||||
w1 <- replicateM 16 $ async (simpleStorageWorker storage)
|
||||
|
||||
cw <- newChunkWriterIO @HbSync storage (Just (dir </> ".qqq"))
|
||||
|
||||
|
@ -68,7 +68,6 @@ main = do
|
|||
|
||||
failed <- replicateM times $ do
|
||||
|
||||
|
||||
forConcurrently_ psz' $ \(o,s) -> do
|
||||
let t = B8.take s $ B8.drop o bytes
|
||||
writeChunk cw 1 hash (fromIntegral o) t
|
||||
|
|
Loading…
Reference in New Issue