diff --git a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs index 3af1848a..de6db0e8 100644 --- a/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs +++ b/hbs2-core/lib/HBS2/Actors/ChunkWriter.hs @@ -69,7 +69,7 @@ data ChunkWriter h m = forall a . ( MonadIO m ) => ChunkWriter { stopped :: TVar Bool - , pipeline :: Pipeline m () + , pipeline :: Pipeline IO () , dir :: FilePath , storage :: a , perBlock :: TVar (HashMap FilePath [Handle -> IO ()]) @@ -78,15 +78,13 @@ data ChunkWriter h m = forall a . ( MonadIO m blocksInProcess :: MonadIO m => ChunkWriter h m -> m Int -blocksInProcess cw = undefined - -- liftIO $ Cache.purgeExpired cache >> Cache.size cache - -- where - -- cache = perBlock cw +blocksInProcess cw = do + liftIO $ readTVarIO (perBlock cw) <&> HashMap.size runChunkWriter :: forall h m . ( Eq (Hash h) , Hashable (Hash h) , MonadIO m ) - => ChunkWriter h m -> m () + => ChunkWriter h IO -> m () runChunkWriter = runChunkWriter2 @@ -94,16 +92,17 @@ runChunkWriter = runChunkWriter2 runChunkWriter2 :: forall h m . ( Eq (Hash h) , Hashable (Hash h) , MonadIO m ) - => ChunkWriter h m -> m () + => ChunkWriter h IO -> m () runChunkWriter2 w = do liftIO $ createDirectoryIfMissing True ( dir w ) let tv = perBlock w - fix \next -> do - keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys) - liftIO $ forConcurrently_ keys $ \f -> flush w f - pause ( 1.00 :: Timeout 'Seconds) - next + liftIO $ runPipeline (pipeline w) + -- fix \next -> do + -- keys <- liftIO $ readTVarIO tv <&> (L.take 20 . HashMap.keys) + -- liftIO $ forConcurrently_ keys $ \f -> flush w f + -- pause ( 1.00 :: Timeout 'Seconds) + -- next stopChunkWriter :: MonadIO m => ChunkWriter h m -> m () stopChunkWriter w = do @@ -145,7 +144,7 @@ makeFileName w salt h = dir w suff suff = show $ pretty (fromIntegral (hash salt) :: Word32) <> "@" <> pretty h delBlock :: (Hashable salt, MonadIO m, Pretty (Hash h)) - => ChunkWriter h m -> salt -> Hash h -> m () + => ChunkWriter h IO -> salt -> Hash h -> m () delBlock w salt h = liftIO do @@ -179,7 +178,7 @@ writeChunk = writeChunk2 getHash :: forall salt h m . ( Hashable salt , Hashed h ByteString - , MonadIO m + , m ~ IO , Block ByteString ~ ByteString , Pretty (Hash h) , Hashable (Hash h), Eq (Hash h) @@ -196,7 +195,7 @@ commitBlock :: forall salt h m . ( Hashable salt , Hashed h ByteString , Block ByteString ~ ByteString - , MonadIO m + , m ~ IO , Pretty (Hash h) , Hashable (Hash h), Eq (Hash h) ) @@ -229,29 +228,30 @@ writeChunk2 w salt h o bs = do where fn = makeFileName w salt h +flush :: ChunkWriter h IO -> FilePath -> IO () flush w fn = do let cache = perBlock w let sems = perBlockSem w + let pip = pipeline w liftIO $ do - nsem <- atomically $ Sem.newTSem 1 - actions <- atomically $ stateTVar cache (\v -> (HashMap.lookup fn v, HashMap.delete fn v)) + q <- liftIO $ Q.newTBQueueIO 1 - -- sem <- atomically $ stateTVar sems $ \hm -> let found = HashMap.lookup fn hm - -- in case found of - -- Nothing -> (nsem, HashMap.insert fn nsem hm) - -- Just s -> (s, hm) + addJob pip $ do + + as <- asyncBound $ do + withBinaryFile fn ReadWriteMode $ \h -> do + withFileLock fn Exclusive $ \_ -> do + for_ (fromMaybe mempty actions) $ \f -> f h + wait as + + void $ liftIO $ atomically $ Q.writeTBQueue q () + + liftIO $ atomically $ Q.readTBQueue q - -- atomically $ Sem.waitTSem sem - as <- asyncBound $ do - withBinaryFile fn ReadWriteMode $ \h -> do - withFileLock fn Exclusive $ \_ -> do - for_ (fromMaybe mempty actions) $ \f -> f h - wait as - -- atomically $ Sem.signalTSem sem -- Blocking! -- we need to write last chunk before this will happen @@ -260,12 +260,12 @@ flush w fn = do getHash2 :: forall salt h m . ( Hashable salt , Hashed h ByteString - , MonadIO m + , m ~ IO , Block ByteString ~ ByteString , Pretty (Hash h) , Hashable (Hash h), Eq (Hash h) ) - => ChunkWriter h m + => ChunkWriter h IO -> salt -> Hash h -> m (Hash h) @@ -282,7 +282,7 @@ commitBlock2 :: forall salt h m . ( Hashable salt , Hashed h ByteString , Block ByteString ~ ByteString - , MonadIO m + , m ~ IO , Pretty (Hash h) , Hashable (Hash h), Eq (Hash h) ) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 6fc7b163..088d0259 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -37,7 +37,7 @@ defBlockInfoTimeout = toTimeSpec ( 60 :: Timeout 'Minutes) -- how much time wait for block from peer? defBlockWaitMax :: Timeout 'Seconds -defBlockWaitMax = 10 :: Timeout 'Seconds +defBlockWaitMax = 120 :: Timeout 'Seconds defBlockWaitSleep :: Timeout 'Seconds defBlockWaitSleep = 0.1 :: Timeout 'Seconds diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs index 5b3378a5..cdf9179e 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockChunks.hs @@ -64,7 +64,7 @@ instance Serialise (BlockChunks e) newtype instance EventKey e (BlockChunks e) = - BlockChunksEventKey (Hash HbSync) + BlockChunksEventKey (Cookie e, Hash HbSync) deriving stock (Typeable, Eq, Generic) deriving instance Hashable (EventKey e (BlockChunks e)) diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 75f288f7..1e36c87b 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -159,7 +159,7 @@ runTestPeer p zu = do cww <- newChunkWriterIO stor (Just chDir) sw <- liftIO $ replicateM 4 $ async $ simpleStorageWorker stor - cw <- liftIO $ replicateM 8 $ async $ runChunkWriter cww + cw <- liftIO $ replicateM 4 $ async $ runChunkWriter cww zu stor cww @@ -380,31 +380,31 @@ blockDownloadLoop cw = do update @e new key id - subscribe @e (BlockChunksEventKey h) $ \(BlockReady _) -> do + subscribe @e (BlockChunksEventKey (coo,h)) $ \(BlockReady _) -> do processBlock q h - let blockWtf = do - debug $ "WTF!" <+> pretty (p,coo) <+> pretty h + -- let blockWtf = do + -- debug $ "WTF!" <+> pretty (p,coo) <+> pretty h - liftIO $ async $ do - -- FIXME: block is not downloaded, return it to the Q - void $ race (pause defBlockWaitMax >> blockWtf) - $ withPeerM env $ fix \next -> do - w <- find @e key (view sBlockWrittenT) + -- liftIO $ async $ do + -- -- FIXME: block is not downloaded, return it to the Q + -- void $ race (pause defBlockWaitMax >> blockWtf) + -- $ withPeerM env $ fix \next -> do + -- w <- find @e key (view sBlockWrittenT) - maybe1 w (pure ()) $ \z -> do - wrt <- liftIO $ readTVarIO z + -- maybe1 w (pure ()) $ \z -> do + -- wrt <- liftIO $ readTVarIO z - if fromIntegral wrt >= thisBkSize then do - -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h - h1 <- liftIO $ getHash cw key h - if h1 == h then do - liftIO $ commitBlock cw key h - expire @e key - else pause defBlockWaitSleep >> next - else do - pause defBlockWaitSleep - next + -- if fromIntegral wrt >= thisBkSize then do + -- -- debug $ "THE BLOCK IS ABOUT TO BE READY" <+> pretty h + -- h1 <- liftIO $ getHash cw key h + -- if h1 == h then do + -- liftIO $ commitBlock cw key h + -- -- expire @e key + -- else pause defBlockWaitSleep >> next + -- else do + -- pause defBlockWaitSleep + -- next request @e p (BlockChunks @e coo (BlockGetAllChunks @e h chusz)) -- FIXME: nicer construction @@ -484,6 +484,8 @@ mkAdapter cww = do -- УДАЛЯЕМ КУКУ? , blkAcceptChunk = \(c,p,h,n,bs) -> void $ runMaybeT $ do + -- debug "AAAA!" + let cKey = DownloadSessionKey (p,c) -- check if there is a session @@ -495,7 +497,6 @@ mkAdapter cww = do when (isNothing ddd) $ do debug "SESSION NOT FOUND!" - dwnld <- MaybeT $ find cKey id -- dwnld <- maybe1 dwnld' (debug "AAAA") $ pure @@ -539,12 +540,13 @@ mkAdapter cww = do -- ЕСЛИ НЕ СОШЁЛСЯ - ТО ПОДОЖДАТЬ ЕЩЕ if ( h1 == h ) then do liftIO $ commitBlock cww cKey h + -- debug "GOT BLOCK!" updateStats @e False 1 expire cKey -- debug "hash matched!" - emit @e (BlockChunksEventKey h) (BlockReady h) + emit @e (BlockChunksEventKey (c,h)) (BlockReady h) else do debug $ "FUCK FUCK!" <+> pretty h diff --git a/hie.yaml b/hie.yaml index 9271e42b..eca9b9bf 100644 --- a/hie.yaml +++ b/hie.yaml @@ -1,6 +1,6 @@ cradle: cabal: - - path: "hbs-tests/test/Peer2Main.hs" + - path: "hbs2-tests/test/Peer2Main.hs" component: "hbs2-tests:exe:test-peer-run" - path: "hbs2-tests/test/TestSKey"