From c1a66b902b92db34e031d7f7224faed955b402fc Mon Sep 17 00:00:00 2001 From: ilixiaocui Date: Thu, 16 Feb 2023 17:18:30 +0800 Subject: [PATCH] curvefs/client: bug fix about memcache 1. When running the vdbech task, the memory usage of the mount point is very high 2. When running the vdbech task, the mount will coredump or the task will have data inconsistency 3. The memcache server fails over and then restarts and cannot be used by the client anymore Signed-off-by: ilixiaocui --- .../src/client/kvclient/kvclient_manager.cpp | 28 ++++++++----------- .../src/client/kvclient/kvclient_manager.h | 22 +++++++++------ curvefs/src/client/kvclient/memcache_client.h | 15 +++++++--- .../src/client/s3/client_s3_cache_manager.cpp | 11 +++++++- curvefs/test/client/client_memcache_test.cpp | 14 ++++++++-- 5 files changed, 57 insertions(+), 33 deletions(-) diff --git a/curvefs/src/client/kvclient/kvclient_manager.cpp b/curvefs/src/client/kvclient/kvclient_manager.cpp index a0850982c5..cac4b0b43b 100644 --- a/curvefs/src/client/kvclient/kvclient_manager.cpp +++ b/curvefs/src/client/kvclient/kvclient_manager.cpp @@ -34,13 +34,11 @@ namespace client { KVClientManager *g_kvClientManager = nullptr; KVClientMetric *g_kvClientMetric = nullptr; -#define ONRETURN(TYPE, RES, KEY, ERRORLOG) \ +#define ONRETURN(TYPE, RES) \ if (RES) { \ g_kvClientMetric->kvClient##TYPE.qps.count << 1; \ - VLOG(9) << "##TYPE key = " << KEY << " OK"; \ } else { \ g_kvClientMetric->kvClient##TYPE.eps.count << 1; \ - LOG(ERROR) << "##TYPE key = " << KEY << " error = " << ERRORLOG; \ } bool KVClientManager::Init(const KVClientManagerOpt &config, @@ -63,27 +61,23 @@ void KVClientManager::Set(std::shared_ptr task) { std::string error_log; auto res = client_->Set(task->key, task->value, task->length, &error_log); - ONRETURN(Set, res, task->key, error_log); + ONRETURN(Set, res); task->done(task); }); } -bool KVClientManager::Get(std::shared_ptr task) { - assert(nullptr != task->value); - return Get(task->key, task->value, task->offset, task->length); -} - +void KVClientManager::Get(std::shared_ptr task) { + threadPool_.Enqueue([task, this]() { + LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency); -bool KVClientManager::Get(const std::string &key, char *value, uint64_t offset, - uint64_t length) { - LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency); + std::string error_log; + task->res = client_->Get(task->key, task->value, task->offset, + task->length, &error_log); + ONRETURN(Get, task->res); - assert(nullptr != value); - std::string error_log; - auto res = client_->Get(key, value, offset, length, &error_log); - ONRETURN(Get, res, key, error_log); - return res; + task->done(task); + }); } } // namespace client diff --git a/curvefs/src/client/kvclient/kvclient_manager.h b/curvefs/src/client/kvclient/kvclient_manager.h index 68587919e6..19c08c3278 100644 --- a/curvefs/src/client/kvclient/kvclient_manager.h +++ b/curvefs/src/client/kvclient/kvclient_manager.h @@ -44,6 +44,7 @@ namespace client { class KVClientManager; class SetKVCacheTask; +class GetKVCacheTask; using curve::common::TaskThreadPool; using curvefs::client::common::KVClientManagerOpt; @@ -52,6 +53,8 @@ extern KVClientMetric *g_kvClientMetric; typedef std::function &)> SetKVCacheDone; +typedef std::function &)> + GetKVCacheDone; struct SetKVCacheTask { std::string key; @@ -60,16 +63,22 @@ struct SetKVCacheTask { SetKVCacheDone done; SetKVCacheTask() = default; SetKVCacheTask(const std::string &k, const char *val, const uint64_t len) - : key(k), value(val), length(len) {} + : key(k), value(val), length(len) { + done = [](const std::shared_ptr &) {}; + } }; -struct GetKvCacheContext { +struct GetKVCacheTask { const std::string &key; char *value; uint64_t offset; uint64_t length; - GetKvCacheContext(const std::string &k, char *v, uint64_t off, uint64_t len) - : key(k), value(v), offset(off), length(len) {} + bool res; + GetKVCacheDone done; + GetKVCacheTask(const std::string &k, char *v, uint64_t off, uint64_t len) + : key(k), value(v), offset(off), length(len), res(false) { + done = [](const std::shared_ptr &) {}; + } }; class KVClientManager { @@ -87,10 +96,7 @@ class KVClientManager { */ void Set(std::shared_ptr task); - bool Get(const std::string &key, char *value, uint64_t offset, - uint64_t length); - - bool Get(std::shared_ptr task); + void Get(std::shared_ptr task); private: void Uninit(); diff --git a/curvefs/src/client/kvclient/memcache_client.h b/curvefs/src/client/kvclient/memcache_client.h index c706f74bcb..e5c14ab6bb 100644 --- a/curvefs/src/client/kvclient/memcache_client.h +++ b/curvefs/src/client/kvclient/memcache_client.h @@ -92,12 +92,9 @@ class MemCachedClient : public KVClient { return false; } } - memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_DISTRIBUTION, MEMCACHED_DISTRIBUTION_CONSISTENT); memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, 5); - memcached_behavior_set(client_, - MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS, 1); return PushServer(); } @@ -117,9 +114,11 @@ class MemCachedClient : public KVClient { auto res = memcached_set(tcli, key.c_str(), key.length(), value, value_len, 0, 0); if (MEMCACHED_SUCCESS == res) { + VLOG(9) << "Set key = " << key << " OK"; return true; } *errorlog = ResError(res); + LOG(ERROR) << "Set key = " << key << " error = " << *errorlog; return false; } @@ -135,13 +134,21 @@ class MemCachedClient : public KVClient { memcached_return_t ue; char *res = memcached_get(tcli, key.c_str(), key.length(), &value_length, &flags, &ue); - if (res != nullptr && value) { + if (MEMCACHED_SUCCESS == ue && res != nullptr && value && + value_length >= length) { + VLOG(9) << "Get key = " << key << " OK"; memcpy(value, res + offset, length); free(res); return true; } *errorlog = ResError(ue); + if (ue != MEMCACHED_NOTFOUND) { + LOG(ERROR) << "Get key = " << key << " error = " << *errorlog + << ", get_value_len = " << value_length + << ", expect_value_len = " << length; + } + return false; } diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 2a98aa5606..35b3fe5d62 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -513,7 +513,16 @@ bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name, return false; } - return g_kvClientManager->Get(name, databuf, offset, length); + auto task = std::make_shared(name, databuf, offset, length); + CountDownEvent event(1); + task->done = [&](const std::shared_ptr &task) { + event.Signal(); + return; + }; + g_kvClientManager->Get(task); + event.Wait(); + + return task->res; } bool FileCacheManager::ReadKVRequestFromS3(const std::string &name, diff --git a/curvefs/test/client/client_memcache_test.cpp b/curvefs/test/client/client_memcache_test.cpp index 08899fb979..28e8cdca70 100644 --- a/curvefs/test/client/client_memcache_test.cpp +++ b/curvefs/test/client/client_memcache_test.cpp @@ -73,6 +73,7 @@ class MemCachedTest : public ::testing::Test { std::this_thread::sleep_for(std::chrono::milliseconds(100)); retry++; } while (1); + LOG(INFO) << "=============== memcache start ok"; } void TearDown() { @@ -116,11 +117,18 @@ TEST_F(MemCachedTest, MultiThreadTask) { // get for (int i = 0; i < 5; i++) { workers.emplace_back([&, i]() { + CountDownEvent taskEnvent(1); char *result = new char[4]; - auto context = std::make_shared(kvstr[i].first, - result, 0, 4); - ASSERT_EQ(true, manager_.Get(context)); + auto task = + std::make_shared(kvstr[i].first, result, 0, 4); + task->done = + [&taskEnvent](const std::shared_ptr &task) { + taskEnvent.Signal(); + }; + manager_.Get(task); + taskEnvent.Wait(); ASSERT_EQ(0, memcmp(result, kvstr[i].second.c_str(), 4)); + ASSERT_TRUE(task->res); }); } for (auto &iter : workers) {