mirror of https://github.com/voidlizard/hbs2
wip, hbs2-sync rc
This commit is contained in:
parent
7b7a89f13d
commit
573a9f3377
|
@ -5,24 +5,14 @@ module HBS2.Sync.Internal
|
|||
import HBS2.Sync.Prelude
|
||||
import HBS2.Sync.State
|
||||
|
||||
import HBS2.Merkle.MetaData
|
||||
import HBS2.System.Dir
|
||||
import HBS2.Net.Auth.GroupKeySymm
|
||||
import HBS2.Storage.Operations.Class
|
||||
import HBS2.Storage.Compact as Compact
|
||||
import HBS2.Peer.RPC.API.RefChan
|
||||
import HBS2.Peer.RPC.API.Storage
|
||||
import HBS2.Peer.RPC.Client.Unix (UNIX)
|
||||
import HBS2.Peer.RPC.Client
|
||||
import HBS2.Peer.RPC.Client.RefChan as Client
|
||||
|
||||
import HBS2.CLI.Run.Internal hiding (PeerNotConnectedException)
|
||||
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.Time.Clock.POSIX
|
||||
|
||||
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.List qualified as L
|
||||
|
@ -34,7 +24,6 @@ import Control.Monad.Except
|
|||
import Data.Ord
|
||||
|
||||
import Streaming.Prelude qualified as S
|
||||
import UnliftIO.IO.File qualified as UIO
|
||||
|
||||
|
||||
syncEntries :: forall c m . ( MonadUnliftIO m
|
||||
|
@ -319,197 +308,4 @@ syncEntries = do
|
|||
liftIO $ getPOSIXTime <&> round >>= print
|
||||
|
||||
|
||||
runDirectory :: ( IsContext c
|
||||
, SyncAppPerks m
|
||||
, HasClientAPI RefChanAPI UNIX m
|
||||
, HasClientAPI StorageAPI UNIX m
|
||||
, HasStorage m
|
||||
, HasRunDir m
|
||||
, HasTombs m
|
||||
, HasCache m
|
||||
, Exception (BadFormException c)
|
||||
) => RunM c m ()
|
||||
runDirectory = do
|
||||
|
||||
path <- getRunDir
|
||||
|
||||
runDir
|
||||
`catch` \case
|
||||
RefChanNotSetException -> do
|
||||
err $ "no refchan set for" <+> pretty path
|
||||
RefChanHeadNotFoundException -> do
|
||||
err $ "no refchan head found for" <+> pretty path
|
||||
EncryptionKeysNotDefined -> do
|
||||
err $ "no readers defined in the refchan for " <+> pretty path
|
||||
SignKeyNotSet -> do
|
||||
err $ "sign key not set or not found " <+> pretty path
|
||||
DirNotSet -> do
|
||||
err $ "directory not set"
|
||||
|
||||
`catch` \case
|
||||
(e :: OperationError) -> do
|
||||
err $ viaShow e
|
||||
|
||||
`finally` do
|
||||
closeTombs
|
||||
closeCache
|
||||
|
||||
where
|
||||
|
||||
|
||||
writeEntry path e = do
|
||||
|
||||
let p = entryPath e
|
||||
let filePath = path </> p
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
dir <- getRunDir
|
||||
backup <- getRunDirEnv dir
|
||||
<&> fmap (view dirSyncBackup)
|
||||
<&> fromMaybe False
|
||||
|
||||
h <- getEntryHash e & toMPlus
|
||||
|
||||
unless backup do
|
||||
|
||||
notice $ green "write" <+> pretty h <+> pretty p
|
||||
|
||||
lbs <- lift (runExceptT (getTreeContents sto h))
|
||||
>>= toMPlus
|
||||
|
||||
mkdir (dropFileName filePath)
|
||||
|
||||
liftIO $ UIO.withBinaryFileAtomic filePath WriteMode $ \fh -> do
|
||||
LBS.hPutStr fh lbs >> hFlush fh
|
||||
|
||||
let ts = getEntryTimestamp e
|
||||
let timestamp = posixSecondsToUTCTime (fromIntegral ts)
|
||||
|
||||
liftIO $ setModificationTime (path </> p) timestamp
|
||||
|
||||
lift $ Compact.putVal tombs p (0 :: Integer)
|
||||
|
||||
runDir = do
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
path <- getRunDir
|
||||
|
||||
env <- getRunDirEnv path >>= orThrow DirNotSet
|
||||
|
||||
refchan <- view dirSyncRefChan env & orThrow RefChanNotSetException
|
||||
|
||||
fetchRefChan @UNIX refchan
|
||||
|
||||
-- FIXME: multiple-directory-scans
|
||||
|
||||
local <- getStateFromDir0 True
|
||||
|
||||
let hasRemoteHash = [ (p, h) | (p, WithRemoteHash e h) <- local]
|
||||
|
||||
hasGK0 <- HM.fromList <$> S.toList_ do
|
||||
for_ hasRemoteHash $ \(p,h) -> do
|
||||
mgk0 <- lift $ loadGroupKeyForTree @HBS2Basic sto h
|
||||
for_ mgk0 $ \gk0 -> S.yield (p,gk0)
|
||||
|
||||
deleted <- findDeleted
|
||||
|
||||
merged <- mergeState deleted local
|
||||
|
||||
rch <- Client.getRefChanHead @UNIX refchan
|
||||
>>= orThrow RefChanHeadNotFoundException
|
||||
|
||||
let filesLast m = case mergedEntryType m of
|
||||
Tomb -> 0
|
||||
Dir -> 1
|
||||
File -> 2
|
||||
|
||||
for_ (L.sortOn filesLast merged) $ \w -> do
|
||||
case w of
|
||||
N (p,TombEntry e) -> do
|
||||
notice $ green "removed" <+> pretty p
|
||||
|
||||
D (p,e) _ -> do
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
n <- Compact.getValEither @Integer tombs p
|
||||
<&> fromRight (Just 0)
|
||||
|
||||
notice $ "deleted locally" <+> pretty n <+> pretty p
|
||||
|
||||
when (n < Just 1) do
|
||||
notice $ "post tomb tx" <+> pretty n <+> pretty p
|
||||
now <- liftIO $ getPOSIXTime <&> round <&> LBS.take 6 . serialise
|
||||
postEntryTx (Just now) (HM.lookup p hasGK0) refchan path e
|
||||
Compact.putVal tombs p (maybe 1 succ n)
|
||||
|
||||
N (p,_) -> do
|
||||
notice $ "?" <+> pretty p
|
||||
|
||||
M (f,t,e) -> do
|
||||
notice $ green "move" <+> pretty f <+> pretty t
|
||||
mv (path </> f) (path </> t)
|
||||
notice $ green "post renamed entry tx" <+> pretty f
|
||||
postEntryTx Nothing (HM.lookup f hasGK0) refchan path e
|
||||
|
||||
E (p,UpdatedFileEntry _ e) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
writeEntry path e
|
||||
notice $ red "updated" <+> pretty here <+> pretty p
|
||||
postEntryTx Nothing (HM.lookup p hasGK0) refchan path e
|
||||
|
||||
E (p,e@(FileEntry _)) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
d <- liftIO $ doesDirectoryExist fullPath
|
||||
|
||||
older <- if here then do
|
||||
s <- getFileTimestamp fullPath
|
||||
pure $ s < getEntryTimestamp e
|
||||
else
|
||||
pure False
|
||||
|
||||
when (not here || older) do
|
||||
writeEntry path e
|
||||
|
||||
void $ runMaybeT do
|
||||
gk0 <- HM.lookup p hasGK0 & toMPlus
|
||||
let rcpt = recipients gk0 & HM.keys
|
||||
let members = view refChanHeadReaders rch & HS.toList
|
||||
when (rcpt /= members) do
|
||||
notice $ red "update group key" <+> pretty p
|
||||
lift $ postEntryTx Nothing (Just gk0) refchan path e
|
||||
|
||||
E (p,TombEntry e) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
when here do
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
n <- Compact.getValEither @Integer tombs p
|
||||
<&> fromRight (Just 0)
|
||||
|
||||
|
||||
when (n < Just 1) do
|
||||
notice $ "post tomb tx" <+> pretty n <+> pretty p
|
||||
postEntryTx Nothing (HM.lookup p hasGK0) refchan path e
|
||||
|
||||
Compact.putVal tombs p (maybe 0 succ n)
|
||||
|
||||
b <- backupMode
|
||||
|
||||
unless b do
|
||||
notice $ red "deleted" <+> pretty p
|
||||
rm fullPath
|
||||
|
||||
E (p,_) -> do
|
||||
notice $ "skip entry" <+> pretty p
|
||||
|
||||
|
|
|
@ -41,43 +41,24 @@ import HBS2.KeyMan.Keys.Direct as Exported ( runKeymanClient
|
|||
|
||||
import Data.Config.Suckless as Exported
|
||||
import Data.Config.Suckless.Script as Exported
|
||||
import Data.Config.Suckless.Script.File
|
||||
|
||||
import DBPipe.SQLite
|
||||
|
||||
import Codec.Serialise as Exported
|
||||
import Control.Applicative
|
||||
import Control.Concurrent.STM (flushTQueue)
|
||||
import Control.Monad.Reader as Exported
|
||||
import Control.Monad.Trans.Cont as Exported
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Control.Monad.Except
|
||||
import Data.Ord
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Coerce as Exported
|
||||
import Data.Either as Exported
|
||||
import Data.Fixed
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.List qualified as L
|
||||
import Data.List (stripPrefix)
|
||||
import Data.Map (Map)
|
||||
import Data.Map qualified as Map
|
||||
import Data.Maybe as Exported
|
||||
import Data.Text qualified as Text
|
||||
import Data.Set qualified as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Word
|
||||
import Lens.Micro.Platform
|
||||
import Streaming.Prelude qualified as S
|
||||
import System.Directory (getModificationTime,setModificationTime,doesFileExist,listDirectory)
|
||||
import System.Directory (XdgDirectory(..),getXdgDirectory)
|
||||
import System.Exit qualified as Exit
|
||||
import System.TimeIt
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
|
||||
import UnliftIO.IO.File qualified as UIO
|
||||
import System.Directory
|
||||
import UnliftIO as Exported
|
||||
|
||||
{- HLINT ignore "Functor law" -}
|
||||
|
|
|
@ -28,6 +28,8 @@ import DBPipe.SQLite
|
|||
import Data.Config.Suckless.Script.File
|
||||
|
||||
import Control.Concurrent.STM (flushTQueue)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.Fixed
|
||||
|
@ -36,16 +38,21 @@ import Data.HashSet qualified as HS
|
|||
import Data.List qualified as L
|
||||
import Data.Map (Map)
|
||||
import Data.Map qualified as Map
|
||||
import Data.Set qualified as Set
|
||||
import Data.Ord
|
||||
import Data.Set qualified as Set
|
||||
import Data.Text qualified as Text
|
||||
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
|
||||
import Data.Word
|
||||
import Lens.Micro.Platform
|
||||
import Streaming.Prelude qualified as S
|
||||
import System.TimeIt
|
||||
import System.Directory hiding (doesFileExist,doesDirectoryExist)
|
||||
import Text.InterpolatedString.Perl6 (qc)
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Control.Monad.Except
|
||||
import Lens.Micro.Platform
|
||||
import Data.Text qualified as Text
|
||||
import UnliftIO.IO.File qualified as UIO
|
||||
|
||||
|
||||
{- HLINT ignore "Functor law" -}
|
||||
{- HLINT ignore "Eta reduce" -}
|
||||
|
||||
data EntryType = File | Dir | Tomb
|
||||
deriving stock (Eq,Ord,Show,Data,Generic)
|
||||
|
@ -230,19 +237,6 @@ getStateFromDir seed path incl excl = do
|
|||
S.yield (p,e)
|
||||
|
||||
|
||||
|
||||
-- dbPath <- getStatePath
|
||||
-- env <- liftIO newAppEnv
|
||||
-- let db = appDb env
|
||||
-- flip runContT pure $ do
|
||||
-- void $ ContT $ bracket (async (runPipe db)) cancel
|
||||
|
||||
-- here <- doesPathExist dbPath
|
||||
|
||||
-- unless here do
|
||||
-- withDB db $ populateState
|
||||
|
||||
|
||||
getStateFromRefChan :: forall m . ( MonadUnliftIO m
|
||||
, HasClientAPI RefChanAPI UNIX m
|
||||
, HasClientAPI StorageAPI UNIX m
|
||||
|
@ -365,7 +359,9 @@ getStateFromRefChan rchan = do
|
|||
|
||||
seenTx <- atomically $ flushTQueue seen
|
||||
for_ seenTx $ \txh -> do
|
||||
insert [qc|insert into seen (txhash) values(?) on conflict do nothing|] (Only (show $ pretty $ txh))
|
||||
insert [qc| insert into seen (txhash)
|
||||
values(?) on conflict do nothing
|
||||
|] (Only (show $ pretty $ txh))
|
||||
|
||||
ess0 <- withDB db do
|
||||
select_ [qc|select s from entry|]
|
||||
|
@ -629,3 +625,198 @@ getTreeContents sto href = do
|
|||
_ -> throwError UnsupportedFormat
|
||||
|
||||
|
||||
runDirectory :: ( IsContext c
|
||||
, SyncAppPerks m
|
||||
, HasClientAPI RefChanAPI UNIX m
|
||||
, HasClientAPI StorageAPI UNIX m
|
||||
, HasStorage m
|
||||
, HasRunDir m
|
||||
, HasTombs m
|
||||
, HasCache m
|
||||
, Exception (BadFormException c)
|
||||
) => RunM c m ()
|
||||
runDirectory = do
|
||||
|
||||
path <- getRunDir
|
||||
|
||||
runDir
|
||||
`catch` \case
|
||||
RefChanNotSetException -> do
|
||||
err $ "no refchan set for" <+> pretty path
|
||||
RefChanHeadNotFoundException -> do
|
||||
err $ "no refchan head found for" <+> pretty path
|
||||
EncryptionKeysNotDefined -> do
|
||||
err $ "no readers defined in the refchan for " <+> pretty path
|
||||
SignKeyNotSet -> do
|
||||
err $ "sign key not set or not found " <+> pretty path
|
||||
DirNotSet -> do
|
||||
err $ "directory not set"
|
||||
|
||||
`catch` \case
|
||||
(e :: OperationError) -> do
|
||||
err $ viaShow e
|
||||
|
||||
`finally` do
|
||||
closeTombs
|
||||
closeCache
|
||||
|
||||
where
|
||||
|
||||
|
||||
writeEntry path e = do
|
||||
|
||||
let p = entryPath e
|
||||
let filePath = path </> p
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
void $ runMaybeT do
|
||||
|
||||
dir <- getRunDir
|
||||
backup <- getRunDirEnv dir
|
||||
<&> fmap (view dirSyncBackup)
|
||||
<&> fromMaybe False
|
||||
|
||||
h <- getEntryHash e & toMPlus
|
||||
|
||||
unless backup do
|
||||
|
||||
notice $ green "write" <+> pretty h <+> pretty p
|
||||
|
||||
lbs <- lift (runExceptT (getTreeContents sto h))
|
||||
>>= toMPlus
|
||||
|
||||
mkdir (dropFileName filePath)
|
||||
|
||||
liftIO $ UIO.withBinaryFileAtomic filePath WriteMode $ \fh -> do
|
||||
LBS.hPutStr fh lbs >> hFlush fh
|
||||
|
||||
let ts = getEntryTimestamp e
|
||||
let timestamp = posixSecondsToUTCTime (fromIntegral ts)
|
||||
|
||||
liftIO $ setModificationTime (path </> p) timestamp
|
||||
|
||||
lift $ Compact.putVal tombs p (0 :: Integer)
|
||||
|
||||
runDir = do
|
||||
|
||||
sto <- getStorage
|
||||
|
||||
path <- getRunDir
|
||||
|
||||
env <- getRunDirEnv path >>= orThrow DirNotSet
|
||||
|
||||
refchan <- view dirSyncRefChan env & orThrow RefChanNotSetException
|
||||
|
||||
fetchRefChan @UNIX refchan
|
||||
|
||||
-- FIXME: multiple-directory-scans
|
||||
|
||||
local <- getStateFromDir0 True
|
||||
|
||||
let hasRemoteHash = [ (p, h) | (p, WithRemoteHash e h) <- local]
|
||||
|
||||
hasGK0 <- HM.fromList <$> S.toList_ do
|
||||
for_ hasRemoteHash $ \(p,h) -> do
|
||||
mgk0 <- lift $ loadGroupKeyForTree @HBS2Basic sto h
|
||||
for_ mgk0 $ \gk0 -> S.yield (p,gk0)
|
||||
|
||||
deleted <- findDeleted
|
||||
|
||||
merged <- mergeState deleted local
|
||||
|
||||
rch <- Client.getRefChanHead @UNIX refchan
|
||||
>>= orThrow RefChanHeadNotFoundException
|
||||
|
||||
let filesLast m = case mergedEntryType m of
|
||||
Tomb -> 0
|
||||
Dir -> 1
|
||||
File -> 2
|
||||
|
||||
for_ (L.sortOn filesLast merged) $ \w -> do
|
||||
case w of
|
||||
N (p,TombEntry e) -> do
|
||||
notice $ green "removed" <+> pretty p
|
||||
|
||||
D (p,e) _ -> do
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
n <- Compact.getValEither @Integer tombs p
|
||||
<&> fromRight (Just 0)
|
||||
|
||||
notice $ "deleted locally" <+> pretty n <+> pretty p
|
||||
|
||||
when (n < Just 1) do
|
||||
notice $ "post tomb tx" <+> pretty n <+> pretty p
|
||||
now <- liftIO $ getPOSIXTime <&> round <&> LBS.take 6 . serialise
|
||||
postEntryTx (Just now) (HM.lookup p hasGK0) refchan path e
|
||||
Compact.putVal tombs p (maybe 1 succ n)
|
||||
|
||||
N (p,_) -> do
|
||||
notice $ "?" <+> pretty p
|
||||
|
||||
M (f,t,e) -> do
|
||||
notice $ green "move" <+> pretty f <+> pretty t
|
||||
mv (path </> f) (path </> t)
|
||||
notice $ green "post renamed entry tx" <+> pretty f
|
||||
postEntryTx Nothing (HM.lookup f hasGK0) refchan path e
|
||||
|
||||
E (p,UpdatedFileEntry _ e) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
writeEntry path e
|
||||
notice $ red "updated" <+> pretty here <+> pretty p
|
||||
postEntryTx Nothing (HM.lookup p hasGK0) refchan path e
|
||||
|
||||
E (p,e@(FileEntry _)) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
d <- liftIO $ doesDirectoryExist fullPath
|
||||
|
||||
older <- if here then do
|
||||
s <- getFileTimestamp fullPath
|
||||
pure $ s < getEntryTimestamp e
|
||||
else
|
||||
pure False
|
||||
|
||||
when (not here || older) do
|
||||
writeEntry path e
|
||||
|
||||
void $ runMaybeT do
|
||||
gk0 <- HM.lookup p hasGK0 & toMPlus
|
||||
let rcpt = recipients gk0 & HM.keys
|
||||
let members = view refChanHeadReaders rch & HS.toList
|
||||
when (rcpt /= members) do
|
||||
notice $ red "update group key" <+> pretty p
|
||||
lift $ postEntryTx Nothing (Just gk0) refchan path e
|
||||
|
||||
E (p,TombEntry e) -> do
|
||||
let fullPath = path </> p
|
||||
here <- liftIO $ doesFileExist fullPath
|
||||
when here do
|
||||
|
||||
tombs <- getTombs
|
||||
|
||||
n <- Compact.getValEither @Integer tombs p
|
||||
<&> fromRight (Just 0)
|
||||
|
||||
|
||||
when (n < Just 1) do
|
||||
notice $ "post tomb tx" <+> pretty n <+> pretty p
|
||||
postEntryTx Nothing (HM.lookup p hasGK0) refchan path e
|
||||
|
||||
Compact.putVal tombs p (maybe 0 succ n)
|
||||
|
||||
b <- backupMode
|
||||
|
||||
unless b do
|
||||
notice $ red "deleted" <+> pretty p
|
||||
rm fullPath
|
||||
|
||||
E (p,_) -> do
|
||||
notice $ "skip entry" <+> pretty p
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue