fucking fuck

This commit is contained in:
Dmitry Zuikov 2023-01-26 13:26:37 +03:00
parent e390cc8c5d
commit 8a2d153914
4 changed files with 73 additions and 39 deletions

View File

@ -18,6 +18,7 @@ import HBS2.Storage
import HBS2.Defaults
import HBS2.Clock
import Control.Monad.Trans.Maybe
import Data.List qualified as L
import Data.Functor
import Data.Function
@ -41,6 +42,7 @@ import System.FileLock
import Control.Concurrent.Async
import Control.Monad
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Control.Concurrent.STM
@ -186,7 +188,7 @@ getHash :: forall salt h m .
=> ChunkWriter h m
-> salt
-> Hash h
-> m (Hash h)
-> m (Maybe (Hash h))
getHash = getHash2
@ -231,7 +233,6 @@ writeChunk2 w salt h o bs = do
flush :: ChunkWriter h IO -> FilePath -> IO ()
flush w fn = do
let cache = perBlock w
let sems = perBlockSem w
let pip = pipeline w
liftIO $ do
@ -268,11 +269,16 @@ getHash2 :: forall salt h m .
=> ChunkWriter h IO
-> salt
-> Hash h
-> m (Hash h)
-> m (Maybe (Hash h))
getHash2 w salt h = do
flush w fn
liftIO $ hashObject @h <$> B.readFile fn
runMaybeT $ do
res <- liftIO $ tryJust (guard . isDoesNotExistError)
( B.readFile fn >>= \s -> pure $ hashObject @h s )
MaybeT $ pure $ either (const Nothing) Just res
where
fn = makeFileName w salt h
@ -292,9 +298,20 @@ commitBlock2 :: forall salt h m .
-> m ()
commitBlock2 w@(ChunkWriter {storage = stor}) salt h = do
print "FLUSHING"
flush w fn
s <- liftIO $ B.readFile fn
void $! putBlock stor s
print "FLUSHED"
res <- liftIO $ tryJust (guard . isDoesNotExistError)
( B.readFile fn )
case res of
Left _ -> pure ()
Right s -> do
void $ putBlock stor s
delBlock w salt h
where

View File

@ -37,10 +37,10 @@ defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes)
-- how much time wait for block from peer?
defBlockWaitMax :: Timeout 'Seconds
defBlockWaitMax = 120 :: Timeout 'Seconds
defBlockWaitMax = 20 :: Timeout 'Seconds
defBlockWaitSleep :: Timeout 'Seconds
defBlockWaitSleep = 0.1 :: Timeout 'Seconds
defBlockWaitSleep = 1 :: Timeout 'Seconds
defSweepTimeout :: Timeout 'Seconds
defSweepTimeout = 5 -- FIXME: only for debug!

View File

@ -383,28 +383,33 @@ blockDownloadLoop cw = do
subscribe @e (BlockChunksEventKey (coo,h)) $ \(BlockReady _) -> do
processBlock q h
-- let blockWtf = do
-- debug $ "WTF!" <+> pretty (p,coo) <+> pretty h
let blockWtf = do
debug $ "WTF!" <+> pretty (p,coo) <+> pretty h
-- liftIO $ async $ do
-- -- FIXME: block is not downloaded, return it to the Q
-- void $ race (pause defBlockWaitMax >> blockWtf)
-- $ withPeerM env $ fix \next -> do
-- w <- find @e key (view sBlockWrittenT)
liftIO $ async $ do
-- FIXME: block is not downloaded, return it to the Q
void $ race (pause defBlockWaitMax >> blockWtf)
$ withPeerM env $ fix \next -> do
w <- find @e key (view sBlockWrittenT)
-- maybe1 w (pure ()) $ \z -> do
-- wrt <- liftIO $ readTVarIO z
maybe1 w (pure ()) $ \z -> do
wrt <- liftIO $ readTVarIO z
-- if fromIntegral wrt >= thisBkSize then do
-- -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h
-- h1 <- liftIO $ getHash cw key h
-- if h1 == h then do
-- liftIO $ commitBlock cw key h
-- -- expire @e key
-- else pause defBlockWaitSleep >> next
-- else do
-- pause defBlockWaitSleep
-- next
if fromIntegral wrt >= thisBkSize then do
h1 <- liftIO $ getHash cw key h
if | h1 == Just h -> do
liftIO $ commitBlock cw key h
expire @e key
| h1 /= Just h -> do
debug "block fucked"
pause defBlockWaitMax --
| otherwise -> pure ()
else do
pause defBlockWaitSleep
next
request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction
@ -538,17 +543,21 @@ mkAdapter cww = do
-- ПОСЧИТАТЬ ХЭШ
-- ЕСЛИ СОШЁЛСЯ - ФИНАЛИЗИРОВАТЬ БЛОК
-- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ
if ( h1 == h ) then do
if | h1 == Just h -> do
liftIO $ commitBlock cww cKey h
-- debug "GOT BLOCK!"
updateStats @e False 1
expire cKey
-- debug "hash matched!"
emit @e (BlockChunksEventKey (c,h)) (BlockReady h)
else do
debug $ "FUCK FUCK!" <+> pretty h
| h1 /= Just h -> do
debug "chunk receiving failed"
| otherwise -> pure ()
when (written > mbSize * defBlockDownloadThreshold) $ do
debug $ "SESSION DELETED BECAUSE THAT PEER IS JERK:" <+> pretty p

View File

@ -42,7 +42,7 @@ main = do
w1 <- replicateM 2 $ async (simpleStorageWorker storage)
cw <- newChunkWriterIO storage (Just (dir </> ".qqq"))
cw <- newChunkWriterIO @HbSync storage (Just (dir </> ".qqq"))
w2 <- replicateM 2 $ async $ runChunkWriter cw
@ -69,10 +69,18 @@ main = do
h2 <- getHash cw 1 hash
if hash /= h2 then do
-- commitBlock cw 1 hash
-- commitBlock cw 1 hash
print "JOPA"
commitBlock cw 1 hash
print "KITA"
if Just hash /= h2 then do
pure [1]
else do
print "YAY!"
commitBlock cw 1 hash
print "QQQ!"
pure mempty
mapM_ cancel $ w1 <> w2