works-but-slow

This commit is contained in:
Dmitry Zuikov 2023-01-26 11:41:00 +03:00
parent 6fee1ef8c1
commit 5c46d7cec0
5 changed files with 59 additions and 57 deletions

View File

@ -69,7 +69,7 @@ data ChunkWriter h m = forall a . ( MonadIO m
) => ) =>
ChunkWriter ChunkWriter
{ stopped :: TVar Bool { stopped :: TVar Bool
, pipeline :: Pipeline m () , pipeline :: Pipeline IO ()
, dir :: FilePath , dir :: FilePath
, storage :: a , storage :: a
, perBlock :: TVar (HashMap FilePath [Handle -> IO ()]) , perBlock :: TVar (HashMap FilePath [Handle -> IO ()])
@ -78,15 +78,13 @@ data ChunkWriter h m = forall a . ( MonadIO m
blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int
blocksInProcess cw = undefined blocksInProcess cw = do
-- liftIO $ Cache.purgeExpired cache >> Cache.size cache liftIO $ readTVarIO (perBlock cw) <&> HashMap.size
-- where
-- cache = perBlock cw
runChunkWriter :: forall h m . ( Eq (Hash h) runChunkWriter :: forall h m . ( Eq (Hash h)
, Hashable (Hash h) , Hashable (Hash h)
, MonadIO m ) , MonadIO m )
=> ChunkWriter h m -> m () => ChunkWriter h IO -> m ()
runChunkWriter = runChunkWriter2 runChunkWriter = runChunkWriter2
@ -94,16 +92,17 @@ runChunkWriter = runChunkWriter2
runChunkWriter2 :: forall h m . ( Eq (Hash h) runChunkWriter2 :: forall h m . ( Eq (Hash h)
, Hashable (Hash h) , Hashable (Hash h)
, MonadIO m ) , MonadIO m )
=> ChunkWriter h m -> m () => ChunkWriter h IO -> m ()
runChunkWriter2 w = do runChunkWriter2 w = do
liftIO $ createDirectoryIfMissing True ( dir w ) liftIO $ createDirectoryIfMissing True ( dir w )
let tv = perBlock w let tv = perBlock w
fix \next -> do liftIO $ runPipeline (pipeline w)
keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys) -- fix \next -> do
liftIO $ forConcurrently_ keys $ \f -> flush w f -- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys)
pause ( 1.00 :: Timeout 'Seconds) -- liftIO $ forConcurrently_ keys $ \f -> flush w f
next -- pause ( 1.00 :: Timeout 'Seconds)
-- next
stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () stopChunkWriter :: MonadIO m => ChunkWriter h m -> m ()
stopChunkWriter w = do stopChunkWriter w = do
@ -145,7 +144,7 @@ makeFileName w salt h = dir w </> suff
suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h
delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h))
=> ChunkWriter h m -> salt -> Hash h -> m () => ChunkWriter h IO -> salt -> Hash h -> m ()
delBlock w salt h = liftIO do delBlock w salt h = liftIO do
@ -179,7 +178,7 @@ writeChunk = writeChunk2
getHash :: forall salt h m . getHash :: forall salt h m .
( Hashable salt ( Hashable salt
, Hashed h ByteString , Hashed h ByteString
, MonadIO m , m ~ IO
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h) , Hashable (Hash h), Eq (Hash h)
@ -196,7 +195,7 @@ commitBlock :: forall salt h m .
( Hashable salt ( Hashable salt
, Hashed h ByteString , Hashed h ByteString
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, MonadIO m , m ~ IO
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h) , Hashable (Hash h), Eq (Hash h)
) )
@ -229,29 +228,30 @@ writeChunk2 w salt h o bs = do
where where
fn = makeFileName w salt h fn = makeFileName w salt h
flush :: ChunkWriter h IO -> FilePath -> IO ()
flush w fn = do flush w fn = do
let cache = perBlock w let cache = perBlock w
let sems = perBlockSem w let sems = perBlockSem w
let pip = pipeline w
liftIO $ do liftIO $ do
nsem <- atomically $ Sem.newTSem 1
actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v))
q <- liftIO $ Q.newTBQueueIO 1
-- sem <- atomically $ stateTVar sems $ \hm -> let found = HashMap.lookup fn hm addJob pip $ do
-- in case found of
-- Nothing -> (nsem, HashMap.insert fn nsem hm) as <- asyncBound $ do
-- Just s -> (s, hm) withBinaryFile fn ReadWriteMode $ \h -> do
withFileLock fn Exclusive $ \_ -> do
for_ (fromMaybe mempty actions) $ \f -> f h
wait as
void $ liftIO $ atomically $ Q.writeTBQueue q ()
liftIO $ atomically $ Q.readTBQueue q
-- atomically $ Sem.waitTSem sem
as <- asyncBound $ do
withBinaryFile fn ReadWriteMode $ \h -> do
withFileLock fn Exclusive $ \_ -> do
for_ (fromMaybe mempty actions) $ \f -> f h
wait as
-- atomically $ Sem.signalTSem sem
-- Blocking! -- Blocking!
-- we need to write last chunk before this will happen -- we need to write last chunk before this will happen
@ -260,12 +260,12 @@ flush w fn = do
getHash2 :: forall salt h m . getHash2 :: forall salt h m .
( Hashable salt ( Hashable salt
, Hashed h ByteString , Hashed h ByteString
, MonadIO m , m ~ IO
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h) , Hashable (Hash h), Eq (Hash h)
) )
=> ChunkWriter h m => ChunkWriter h IO
-> salt -> salt
-> Hash h -> Hash h
-> m (Hash h) -> m (Hash h)
@ -282,7 +282,7 @@ commitBlock2 :: forall salt h m .
( Hashable salt ( Hashable salt
, Hashed h ByteString , Hashed h ByteString
, Block ByteString ~ ByteString , Block ByteString ~ ByteString
, MonadIO m , m ~ IO
, Pretty (Hash h) , Pretty (Hash h)
, Hashable (Hash h), Eq (Hash h) , Hashable (Hash h), Eq (Hash h)
) )

View File

@ -37,7 +37,7 @@ defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
-- how much time wait for block from peer? -- how much time wait for block from peer?
defBlockWaitMax :: Timeout 'Seconds defBlockWaitMax :: Timeout 'Seconds
defBlockWaitMax = 10 :: Timeout 'Seconds defBlockWaitMax = 120 :: Timeout 'Seconds
defBlockWaitSleep :: Timeout 'Seconds defBlockWaitSleep :: Timeout 'Seconds
defBlockWaitSleep = 0.1 :: Timeout 'Seconds defBlockWaitSleep = 0.1 :: Timeout 'Seconds

View File

@ -64,7 +64,7 @@ instance Serialise (BlockChunks e)
newtype instance EventKey e (BlockChunks e) = newtype instance EventKey e (BlockChunks e) =
BlockChunksEventKey (Hash HbSync) BlockChunksEventKey (Cookie e, Hash HbSync)
deriving stock (Typeable, Eq, Generic) deriving stock (Typeable, Eq, Generic)
deriving instance Hashable (EventKey e (BlockChunks e)) deriving instance Hashable (EventKey e (BlockChunks e))

View File

@ -159,7 +159,7 @@ runTestPeer p zu = do
cww <- newChunkWriterIO stor (Just chDir) cww <- newChunkWriterIO stor (Just chDir)
sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor
cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww
zu stor cww zu stor cww
@ -380,31 +380,31 @@ blockDownloadLoop cw = do
update @e new key id update @e new key id
subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do subscribe @e (BlockChunksEventKey (coo,h)) $ \(BlockReady _) -> do
processBlock q h processBlock q h
let blockWtf = do -- let blockWtf = do
debug $ "WTF!" <+> pretty (p,coo) <+> pretty h -- debug $ "WTF!" <+> pretty (p,coo) <+> pretty h
liftIO $ async $ do -- liftIO $ async $ do
-- FIXME: block is not downloaded, return it to the Q -- -- FIXME: block is not downloaded, return it to the Q
void $ race (pause defBlockWaitMax >> blockWtf) -- void $ race (pause defBlockWaitMax >> blockWtf)
$ withPeerM env $ fix \next -> do -- $ withPeerM env $ fix \next -> do
w <- find @e key (view sBlockWrittenT) -- w <- find @e key (view sBlockWrittenT)
maybe1 w (pure ()) $ \z -> do -- maybe1 w (pure ()) $ \z -> do
wrt <- liftIO $ readTVarIO z -- wrt <- liftIO $ readTVarIO z
if fromIntegral wrt >= thisBkSize then do -- if fromIntegral wrt >= thisBkSize then do
-- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h -- -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h
h1 <- liftIO $ getHash cw key h -- h1 <- liftIO $ getHash cw key h
if h1 == h then do -- if h1 == h then do
liftIO $ commitBlock cw key h -- liftIO $ commitBlock cw key h
expire @e key -- -- expire @e key
else pause defBlockWaitSleep >> next -- else pause defBlockWaitSleep >> next
else do -- else do
pause defBlockWaitSleep -- pause defBlockWaitSleep
next -- next
request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
@ -484,6 +484,8 @@ mkAdapter cww = do
-- УДАЛЯЕМ КУКУ? -- УДАЛЯЕМ КУКУ?
, blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do
-- debug "AAAA!"
let cKey = DownloadSessionKey (p,c) let cKey = DownloadSessionKey (p,c)
-- check if there is a session -- check if there is a session
@ -495,7 +497,6 @@ mkAdapter cww = do
when (isNothing ddd) $ do when (isNothing ddd) $ do
debug "SESSION NOT FOUND!" debug "SESSION NOT FOUND!"
dwnld <- MaybeT $ find cKey id dwnld <- MaybeT $ find cKey id
-- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure -- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure
@ -539,12 +540,13 @@ mkAdapter cww = do
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
if ( h1 == h ) then do if ( h1 == h ) then do
liftIO $ commitBlock cww cKey h liftIO $ commitBlock cww cKey h
-- debug "GOT BLOCK!"
updateStats @e False 1 updateStats @e False 1
expire cKey expire cKey
-- debug "hash matched!" -- debug "hash matched!"
emit @e (BlockChunksEventKey h) (BlockReady h) emit @e (BlockChunksEventKey (c,h)) (BlockReady h)
else do else do
debug $ "FUCK FUCK!" <+> pretty h debug $ "FUCK FUCK!" <+> pretty h

View File

@ -1,6 +1,6 @@
cradle: cradle:
cabal: cabal:
- path: "hbs-tests/test/Peer2Main.hs" - path: "hbs2-tests/test/Peer2Main.hs"
component: "hbs2-tests:exe:test-peer-run" component: "hbs2-tests:exe:test-peer-run"
- path: "hbs2-tests/test/TestSKey" - path: "hbs2-tests/test/TestSKey"