mirror of https://github.com/procxx/kepka.git
Allow cleaner to work after database is closed.
This commit is contained in:
parent
8210a51fdc
commit
2001d3c617
|
@ -39,6 +39,14 @@ void Database::close(FnMut<void()> &&done) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Database::waitForCleaner(FnMut<void()> &&done) {
|
||||||
|
_wrapped.with([
|
||||||
|
done = std::move(done)
|
||||||
|
](Implementation &unwrapped) mutable {
|
||||||
|
unwrapped.waitForCleaner(std::move(done));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void Database::put(
|
void Database::put(
|
||||||
const Key &key,
|
const Key &key,
|
||||||
QByteArray &&value,
|
QByteArray &&value,
|
||||||
|
|
|
@ -68,6 +68,7 @@ public:
|
||||||
|
|
||||||
void clear(FnMut<void(Error)> &&done = nullptr);
|
void clear(FnMut<void(Error)> &&done = nullptr);
|
||||||
void clearByTag(uint8 tag, FnMut<void(Error)> &&done = nullptr);
|
void clearByTag(uint8 tag, FnMut<void(Error)> &&done = nullptr);
|
||||||
|
void waitForCleaner(FnMut<void()> &&done = nullptr);
|
||||||
|
|
||||||
~Database();
|
~Database();
|
||||||
|
|
||||||
|
|
|
@ -1167,6 +1167,7 @@ void DatabaseObject::createCleaner() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseObject::cleanerDone(Error error) {
|
void DatabaseObject::cleanerDone(Error error) {
|
||||||
|
invokeCallback(_cleaner.done);
|
||||||
_cleaner = CleanerWrap();
|
_cleaner = CleanerWrap();
|
||||||
pushStatsDelayed();
|
pushStatsDelayed();
|
||||||
}
|
}
|
||||||
|
@ -1211,6 +1212,7 @@ void DatabaseObject::clear(FnMut<void(Error)> &&done) {
|
||||||
}
|
}
|
||||||
if (key.empty()) {
|
if (key.empty()) {
|
||||||
invokeCallback(done, Error::NoError());
|
invokeCallback(done, Error::NoError());
|
||||||
|
createCleaner();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
open(std::move(key), std::move(done));
|
open(std::move(key), std::move(done));
|
||||||
|
@ -1229,6 +1231,17 @@ void DatabaseObject::clearByTag(uint8 tag, FnMut<void(Error)> &&done) {
|
||||||
invokeCallback(done, Error::NoError());
|
invokeCallback(done, Error::NoError());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DatabaseObject::waitForCleaner(FnMut<void()> &&done) {
|
||||||
|
while (!_stale.empty()) {
|
||||||
|
clearStaleChunk();
|
||||||
|
}
|
||||||
|
if (_cleaner.object) {
|
||||||
|
_cleaner.done = std::move(done);
|
||||||
|
} else {
|
||||||
|
invokeCallback(done);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto DatabaseObject::getManyRaw(const std::vector<Key> &keys) const
|
auto DatabaseObject::getManyRaw(const std::vector<Key> &keys) const
|
||||||
-> std::vector<Raw> {
|
-> std::vector<Raw> {
|
||||||
auto result = std::vector<Raw>();
|
auto result = std::vector<Raw>();
|
||||||
|
|
|
@ -59,6 +59,7 @@ public:
|
||||||
|
|
||||||
void clear(FnMut<void(Error)> &&done);
|
void clear(FnMut<void(Error)> &&done);
|
||||||
void clearByTag(uint8 tag, FnMut<void(Error)> &&done);
|
void clearByTag(uint8 tag, FnMut<void(Error)> &&done);
|
||||||
|
void waitForCleaner(FnMut<void()> &&done);
|
||||||
|
|
||||||
static QString BinlogFilename();
|
static QString BinlogFilename();
|
||||||
static QString CompactReadyFilename();
|
static QString CompactReadyFilename();
|
||||||
|
@ -90,6 +91,7 @@ private:
|
||||||
struct CleanerWrap {
|
struct CleanerWrap {
|
||||||
std::unique_ptr<Cleaner> object;
|
std::unique_ptr<Cleaner> object;
|
||||||
base::binary_guard guard;
|
base::binary_guard guard;
|
||||||
|
FnMut<void()> done;
|
||||||
};
|
};
|
||||||
struct CompactorWrap {
|
struct CompactorWrap {
|
||||||
std::unique_ptr<Compactor> object;
|
std::unique_ptr<Compactor> object;
|
||||||
|
|
|
@ -90,7 +90,8 @@ void Databases::destroy(Cache::Database *database) {
|
||||||
Assert(!kept.destroying.alive());
|
Assert(!kept.destroying.alive());
|
||||||
auto [first, second] = base::make_binary_guard();
|
auto [first, second] = base::make_binary_guard();
|
||||||
kept.destroying = std::move(first);
|
kept.destroying = std::move(first);
|
||||||
database->close([=, guard = std::move(second)]() mutable {
|
database->close();
|
||||||
|
database->waitForCleaner([=, guard = std::move(second)]() mutable {
|
||||||
crl::on_main([=, guard = std::move(guard)]{
|
crl::on_main([=, guard = std::move(guard)]{
|
||||||
if (!guard.alive()) {
|
if (!guard.alive()) {
|
||||||
return;
|
return;
|
||||||
|
@ -102,4 +103,4 @@ void Databases::destroy(Cache::Database *database) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace Storage
|
} // namespace Storage
|
||||||
|
|
Loading…
Reference in New Issue