天池比赛记录

赛题简单介绍

比赛地址:第四届全球数据库大赛赛道1:云原生共享内存数据库性能优化

赛题大致内容:
本地读写速度快,但空间小,远端读写速度慢,但空间大(通过eRDMA读写远端数据)
初赛时实现一个简化、高效的KV存储引擎,支持Write、Read接口,此时key-value皆为定值
复赛额外实现一个Delete接口和重建(rebuild)功能,此时value为变长值。
评测程序分为2个阶段:
1)程序正确性验证
验证KV操作的正确性(包括加密/解密过程),这部分的耗时不计入运行时间的统计。如果正确性测试不通过,则终止,评测失败。
2)性能评测
引擎使用的本地内存和远端内存限制在 8 GB 和 32 GB。 阶段1. 每个线程分别写入约 12 M个Key大小为 16 Bytes,Value大小为 80-1024 Bytes 的 KV对象,并选择性读取验证;阶段2. 每个线程会进行并发删除,每个线程删除 10 M个Key,删除操作耗时将计入运行时间;阶段3. 每个线程分别再次写入约 10 M个Key大小为 16 Bytes,Value大小为 80-256 Bytes 的 KV对象;接着会进行读写混合测试,开启16个线程以75%:25%的读写比例调用64M次。其中75%的读访问具有热点的特征,大部分的读访问集中在少量的Key上面。最后的分数为以上操作耗时的总和。

数据安排如下:本阶段保证任意时刻数据的value部分长度和不超过30G。纯写入的12M次操作中大约70%的操作Value长度在80-128Bytes之间;大约20%的操作Value长度在129-256Bytes之间;大约10%的操作Value长度在257-1024Bytes之间。读写混合的64M操作中,所有Set操作的Value长度均不超过128Bytes。

评测程序输出大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Start local encryption evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh

Starting Write-Read Testing.
##################### Start Write from index: 0
...
##################### End The Write-Read Test ##############################
Time for Write-Read Test 32.000000 seconds
LocalEngine::stop finsh
##################### Evaluation Success ##############################
Start local perf evaluation...Start evaluation.
Generating the ZipFian PDF......
Generate PDF Done.
Do new LocalEngine.
Start LocalEngine using start interface.
LocalEngine::start finsh

Starting Write-Read Testing.
##################### Start Write from index: 0
...
##################### End The Write-Read Test ##############################
Time for Write-Read Test 158.000000 seconds

Starting Deleting Testing.
##################### Start Delete from index: 0
...
##################### End The Delete Test ##############################
Time for Delete Test 47.000000 seconds

Starting HOT Data Testing.
##################### Start test from index: 0
...
##################### End The Hot Data Test ##############################
Time for Hot Data Test 46.000000 seconds
Hot:46
LocalEngine::stop finsh
Total:253
##################### Evaluation Success ##############################
Success evaluation, update score...37.2979146325685141834147

复赛排名第20名,正好是极客奖最后一名,嘻嘻。

比赛经历

在初赛时官方提供了一个简单的demo,将key和远端地址存于本地,value全部存于远端,初赛结束时我们的代码大致架构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
key-value数据以页面的方式存储起来,本地存储key的元数据(key,(页号,索引))(5G),缓存少量页(2G),远端页的远端地址,远端存储大部分页数据(30G)。
写入时:
插入:将数据插入新申请的页中,写入元数据(key,(页号,页索引))
更新:若对应页面当前存于远端,则视作插入操作处理,并更新key元数据,
若对应页面存于本地,则直接更新value
更新LRU列表
读取时:
若页面存于远端,则读取远端数据,将该页加入本地缓存
若页面存于本地,则直接读取数据
更新LRU列表
淘汰:
开启一个后台线程,当本地缓存页大于阈值时,将最久未被访问的页写入远端,记录远端地址

哈希:
为减小锁争用,我们构建了许多个执行请求的实体,并通过对key进行哈希将请求分发至某一实体
(LocalEngine->LocalEngineEntity),而后对key进行第二次哈希,写入/读取key的元数据。
由于STL的map占用空间较大,官方提供了哈希表的简单实现,直接刚开始就申请足够的空间,然后使
用拉链法连接哈希值相同的数据,不需要动态扩容。


初赛结束时,我们只得了9分,最大的原因在于第一次哈希与第二次哈希使用同样的哈希函数(std::hash),导致LocalEngineEntity里的自定义哈希表中很大的一部分空间永远不会被访问(哈希值皆为LocalEngineEntity下标的整数),增大了哈希冲突的概率。
在初赛的基础上编写复赛代码,主要实现三个功能:value的加密,删除操作,重构操作(删除被标记为无效的数据,整理有效数据使其排列更紧凑)。
加密:value的加密根据IPP-Crypto的接口简单实现一个加密算法即可,没有几行代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool LocalEngine::set_aes() {
// Current algorithm is not supported, just for demonstration.
m_aes_.algo = CBC;
m_aes_.key_len = 16;
m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
if (m_aes_.key == nullptr) return false;
m_aes_.blk_size = 16;
m_aes_.piv_len = 16;
m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
if (m_aes_.piv == nullptr) return false;

int ctxSize; // AES context size
ippsAESGetSize(&ctxSize); // evaluating AES context size
// allocatting memory for AES context
m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
// AES context initialization
ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
// encrypting plaintext
ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
encrypt_value = std::move(tmp);
return true;
}

删除: 代码的数据通路为:key——(page, index) ——本地缓存m_data_map——远端地址m_addr_map。故实现删除操作首先需要将key—>(page, index)的映射删除,这个只需要增加自定义哈希表的删除功能,注意将删除后的slot插入另一个链表中,以便复用该slot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
data_info_t hash_map_t::remove(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
hash_map_slot *parent = nullptr;
if (cur == nullptr) {
return kNullInfo;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
// 在bucket中删除该slot
if (parent == nullptr) {
m_bucket_[index] = cur->next;
} else {
parent->next = cur->next;
}
// 加入后备链表
cur->next = m_slot_head_->next;
m_slot_head_->next = cur;
return cur->info;
}
parent = cur;
cur = cur->next;
}
return kNullInfo;
}

将该映射删除后,无法通过key访问相应的value,但value仍然占据存储空间,故需要标记该位置,表示该value已经被删除,在重构操作时不需要迁移该位置的数据。

1
std::bitset<kMaxIndex> m_bitmap_[kBitmapSize];  // 删除为1,正常为0

每一页增加一个位图,标记页中记录是否有效。其中kMaxIndex表示页中最大记录数,kBitmapSize表示运行过程中的最大页号。
与位图相关的另一个操作是更新操作,如果更新操作对应的数据当前在远端,若此时读取远端数据再进行本地更新效率太低;故将这个操作拆分为删除远端数据+插入新数据;这时也需要将远端数据标记为无效。

1
2
3
4
5
6
7
8
9
10
11
12
bool LocalEngineEntity::deleteK(const std::string &key) {
m_delete_envent_ = true;
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
m_mutex_.lock();
data_info_t info = m_page_map_.remove(key, hash_index); // 删除对应key的元数据
m_mutex_.unlock();
m_bitmap_[info.page_id].set(info.index, true); // 将对应记录标记为删除
return true;
}
// 数据在远端的更新操作
m_bitmap_[slot->info.page_id].set(slot->info.index, true); // 将之前数据标记为删除
slot->info = info; // 更新元数据信息

重构:
本地缓存与远端数据交互的基本单位是页,程序运行过程中,无效记录会越来越多,故需定时读取所有页,将有效记录写入到新页中,删除旧页,类似于一种垃圾回收。
在读取远端页时,先读取其头部元数据,再依次读取有效记录,而不是读取整个页数据,这是因为rebuild时远端页有效记录占比较小,这样的读取方式可以减小读取量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void LocalEngineEntity::rebuild_index() {
std::lock_guard<std::mutex> lk(m_mutex_);
std::string key;
std::string value;
data_info_t data_info;
std::shared_ptr<Page> page;
std::unordered_map<page_id_t, remote_info_t> tmp_addr_map; // 暂时存储页号与远端地址映射
uint32_t new_cache_size = 1;
std::vector<page_id_t> local_id = m_lru_list_.clear();
auto new_page = m_cur_page_;
page_id_t new_page_id = m_cur_page_id_;
// 处理本地缓存页
for (auto &page_id : local_id) {
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
m_lru_list_.insert(page_id);
new_cache_size++;
} else if (!bitmap.all()) { //存在有效记录
auto page = m_data_map_[page_id];
int record_num = page->record_number();
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) { // 该记录未被删除
if (new_page->is_full()) { // 页满,写入本地
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
key = page->read_key(i);
data_info.index = new_page->insert(key, page->read_value(i));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
m_data_map_[page_id] = nullptr; // 删除原先页
} else {
m_data_map_[page_id] = nullptr; // 删除原先页
}
}
page_id_t page_id;
remote_info_t info;
char head_data[kMaxIndex * 10];
char kv_data[2 * kMaxValueSize];
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
uint16_t head_length;
uint16_t offset, length;

// 处理远端内存页
for (auto &kv : m_addr_map_) {
page_id = kv.first;
info = kv.second;
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
tmp_addr_map.insert({page_id, info});
} else if (bitmap.all()) { // 不存在有效记录,将后端地址加入地址列表
m_addr_list_.emplace(info);
} else {
bool avai_info = true; // 是否将该远端地址加入地址列表
head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
// 读取页头部数据
m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
int record_num = m_max_index_[page_id];
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) {
if (new_page->is_full()) {
if (new_cache_size > kPageThreshold) { // 本地页满,写入远程
std::string &&page_data = new_page->to_string();
uint32_t len = page_data.length();
m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
tmp_addr_map.insert({new_page_id, info}); // 暂时记录页号与远程地址映射
avai_info = false;
} else { // 写入本地缓存
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
}
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
key = std::string(kv_data, kv_data + 16);
value = std::string(kv_data + 16, kv_data + length);
data_info.index = new_page->insert(key, std::move(value));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
if (avai_info) {
m_addr_list_.emplace(info);
}
}
}
m_addr_map_ = std::move(tmp_addr_map);
m_cur_page_id_ = new_page_id;
m_cur_page_ = new_page;
m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_cache_size_ = new_cache_size;
m_last_update_id_ = kNullPage;
m_data_map_[new_page_id] = new_page;
}

一些细节

变长字符编码方式演化

1
2
3
auto &&vicitm_page_data = victim_page->to_string();  // 记录淘汰页数据,开始写入远端内存
m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), len, remote_info.remote_addr,
remote_info.rkey);

kv数据在本地缓存是以string数组(Page类)的形式存储的,当本地缓存达到阈值时,需将很久未访问的页写入到远端,此时是写一个大字符串;故需要将string数组转换为一个大字符串。由于之后有可能再访问该页,需要把各记录的大小也编码进字符串中。刚开始我将每个记录编码成记录大小 +’\0’ +记录内容的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小 '\0' 记录内容
记录大小 '\0' 记录内容
...
'\0'(页结束标志)
*/
class Page {
public:
Page(std::string &data); // 以字符串填充页
std::string to_string(); // 页数据转换成字符串
private:
int get_size(const std::string &data, int &start); // 字符串转数字
std::string get_string(int size); // 数字转字符串

std::vector<std::string> m_value_; // 以vector存储记录
std::vector<std::string> m_key_; // 以vector存储记录
uint32_t m_cur_size_; // 当前页数据大小
uint32_t m_cur_index_; // 当前索引
};

// 将start为起点的字符串转换成数字
int Page::get_size(const std::string &data, int &start) {
std::string size_str = "";
int value_size = 0;
while (true) {
if (data[start] != '\0') { // 未遇到结束标志,加入字符串
size_str.push_back(data[start]);
} else {
if (!size_str.empty()) { // 字符串不为空,转换成数字
value_size = std::stoi(size_str);
}
break;
}
start++;
}
start++; // 跳过当前的结束字符
return value_size; // 页末尾结束字符返回0
}

// 将数字转换成字符串并在其后填充结束字符
std::string Page::get_string(int size) {
auto res = std::to_string(size);
res.push_back('\0');
return res;
}

Page::Page(std::string &data) {
int start = 0;
int value_num = get_size(data, start); // 读取记录数
int page_size = get_size(data, start); // 读取页大小
m_key_.reserve(value_num);
m_value_.reserve(value_num);

m_cur_index_ = value_num;
m_cur_size_ = page_size;
std::string kv;
int value_size;
while (true) { // 循环读取记录
value_size = get_size(data, start);
if (value_size <= 0) { // 遇到页末尾结束字符则返回
break;
}
kv = data.substr(start, value_size);
start += value_size;
m_key_.emplace_back(kv.substr(0, 16));
m_value_.emplace_back(kv.substr(16));
}
}

std::string Page::to_string() {
assert(this != nullptr);
std::string data;
data.reserve(m_cur_size_ + m_cur_index_ * 6); // 提前预订字符串空间
data.append(get_string(m_cur_index_)); // 加入记录数
data.append(get_string(m_cur_size_)); // 加入页大小
assert(m_cur_index_ == m_key_.size());
assert(m_key_.size() == m_value_.size());
for (uint32_t i = 0; i < m_cur_index_; i++) {
data.append(get_string(16 + m_value_[i].length())); // 加入记录大小
data.append(m_key_[i]); // 加入记录数据
data.append(m_value_[i]);
}
data.push_back('\0'); // 页末尾填充结束字符
return data;
}

而后稍微改变一下编码方式,将记录大小全部放在头部,记录数据放在尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/*
页数据字符串形式的排列格式为:
页索引数(记录数)'\0'
页容量(字节数)'\0'
记录大小'\0'
记录大小'\0'
记录内容(key value)
记录内容(key value)

保存key值以便在rebuild时能够更改元数据信息
...
*/
class Page {
public:
Page();
Page(std::string &data); // 以字符串填充页
std::string to_string(); // 页数据转换成字符串
private:
std::string get_string(int size); // 数字转字符串

std::string m_key_[kMaxIndex];
std::string m_value_[kMaxIndex];
uint32_t m_cur_size_; // 当前页数据大小
uint32_t m_cur_index_; // 当前索引

// std::vector<std::string> m_value_; // 以vector存储key值,方便rebuild时更新元数据
// std::vector<std::string> m_key_; // 以vector存储记录
};

std::string Page::to_string() {
std::string data;
data.reserve(m_cur_size_ + m_cur_index_ * 4); // 提前预订字符串空间
data.append(get_string(m_cur_index_)); // 加入记录数
data.append(get_string(m_cur_size_)); // 加入页大小
for (uint32_t i = 0; i < m_cur_index_; i++) { // 加入各个记录大小
data.append(get_string(16 + m_value_[i].size()));
}
for (uint32_t i = 0; i < m_cur_index_; i++) {
data.append(m_key_[i]); // 加入记录数据
data.append(m_value_[i]);
}
return data;
}
Page::Page(std::string &str) {
std::string::size_type pos;
std::string::size_type size = str.size();
// 获取页大小与最大记录数
int start = 0;
pos = str.find('\0', start);
m_cur_index_ = std::stoi(str.substr(start, pos - start));
start = pos + 1;
pos = str.find('\0', start);
m_cur_size_ = std::stoi(str.substr(start, pos - start));
start = pos + 1;
// 获取各个记录长度
std::vector<int> record_lengths(m_cur_index_);
for (uint32_t i = 0; i < m_cur_index_; i++) {
pos = str.find('\0', start);
record_lengths[i] = std::stoi(str.substr(start, pos - start));
start = pos + 1;
}
// 获取各个记录内容
int record_num = record_lengths.size();
for (int i = 0; i < record_num; i++) {
m_key_[i] = str.substr(start, 16);
m_value_[i] = str.substr(start + 16, record_lengths[i] - 16);
start += record_lengths[i];
}
}

这时候编码解码比之前稍微简单一点(去掉了get_size函数),但将长度编码进字符串的方式还是觉得有点低效。后面一想,为什么不直接把大字符串视作一个整数数组,对数组元素赋值就可以了,也就是说 123没必要转换成’1’ ‘2’ ‘3’后存入数组(3个字节),而是直接对一个short类型(2字节)的数赋值。 这样就不再需要编码额外的结束字符作为标记了。

字符数组的头部存储各种元数据,尾部存储实际key-value数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
std::string data;
data.resize(kAllocSize);
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针

u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = char_pointer + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
return data;
}
Page::Page(std::string &data) {
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
m_cur_index_ = u16_pointer[0];
m_cur_size_ = u16_pointer[1];
uint16_t offset, length;
for (uint16_t i = 0; i < m_cur_index_; i++) {
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_key_[i] = data.substr(offset, 16);
m_value_[i] = data.substr(offset + 16, length - 16);
}
m_is_dirty_ = false;
}

后面发现leveldb中用了以下这种编码形式
详解varint编码原理

#pragma pack使用错误

本地缓存总共8G空间,有5G是存储key的元数据。其结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
using page_id_t = uint16_t;
using index_t = uint16_t;

struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};

/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};

#pragma pack(4)
struct hash_map_slot_test {
char key[16];
data_info_t info;
hash_map_slot *next;
};
// 没有#pragma pack()

可以看到struct data_info_t占4字节,但struct hash_map_slot因为对齐的原因占32字节。自然而然的,为了节省空间,可以强制结构体4字节对齐,这样就能节省4字节的空间,也就节省了1/8的空间。但由于不熟悉#pragma pack,#pragma pack(4)并没有以#pragma pack()结束。然后一运行程序就段错误,gdb调试时bt显示调用栈,f3时传递参数为32位,f2突然截断,参数变成了16位。我觉得这个错误过于诡异,因为就修改了字节对齐,故到比赛结束我都没再用#pragma pack,一般也不建议使用#pragma pack。
示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using page_id_t = uint16_t;
using index_t = uint16_t;

struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};

/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};

#pragma pack(4)
struct hash_map_slot_test {
char key[16];
data_info_t info;
hash_map_slot_test *next;
};
#pragma pack()

int main() {
cout << sizeof(hash_map_slot) << endl;
cout << sizeof(hash_map_slot_test) << endl;
hash_map_slot slot1;
hash_map_slot_test slot2;

printf("hash_map_slot layout:\n%p \n%p \n%p\n", &(slot1.key), &(slot1.info), &(slot1.next));
printf("hash_map_slot test layout:\n%p \n%p \n%p\n", &(slot2.key), &(slot2.info), &(slot2.next));
}

/*
输出:
32
28
hash_map_slot layout:
0x7ffef416d1f0
0x7ffef416d200
0x7ffef416d208
hash_map_slot test layout:
0x7ffef416d1d0
0x7ffef416d1e0
0x7ffef416d1e4
*/

相关博客:
C/C++中结构体内存对齐(边界对齐),#pragma pack设置
关于#pragma pack(n)引发的一系列问题

右值引用本身是左值

1
2
3
4
5
6
1: void func(Test&& t);
2: void func(Test& t);

Test t1
Test&& t2 = std::move(t1);
func(t2);

对于右值引用我一直有一个误区,认为右值引用是右值,例如以上代码片段,实参类型为Test&&,很容易认为此时调用的函数为第二个,但实际上此时调用的却是第一个。这是因为右值引用本身是左值,更为具体来说,右值引用类型既可以被当作左值也可以被当作右值,判断的标准是,如果它有名字,那就是左值,否则就是右值

示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <bits/stdc++.h>
using namespace std;

class Test {
public:
Test() = default;
Test(const Test &test) : str(test.str) { printf("enter copy construction\n"); }
Test(Test &&test) : str(std::move(test.str)) { printf("enter move construction\n"); }

Test &operator=(const Test &test) {
str = test.str;
printf("enter copy assign\n");
return *this;
}
Test &operator=(Test &&test) {
str = std::move(test.str);
printf("enter move assign\n");
return *this;
}

private:
string str;
};

int main() {
Test p1, p2, p3;
Test &&p4 = std::move(p3);

Test p5(p1); // 调用复制构造
Test p6(std::move(p2)); // 调用移动构造
Test p7(p4); // 调用复制构造

Test t1, t2, t3, t4;
Test &&t5 = std::move(t4);
t1 = t2; // 调用复制赋值
t1 = std::move(t3); // 调用移动赋值
t1 = t5; // 调用复制赋值(误区)
}
/*
输出:
enter copy construction
enter move construction
enter copy construction
enter copy assign
enter move assign
enter copy assign
*/

实际上使用vscode将鼠标点到函数调用的地方就能看到所调用的函数


注意move后不应该再使用变量值
我有一次错误就在于调用move函数后仍然使用对象的size方法,导致未定义的行为。

1
2
3
4
5
6
7
8
9
10
template<class T>
void swap(T& a, T& b)
{
T tmp(std::move(a));
a = std::move(b);
b = std::move(tmp);
}

X a, b;
swap(a, b);

参考博客:
详解C++右值引用

其余优化

读者互斥锁
当读请求未命中时,程序需读取远端的字符串数据,并将其解码成string数组(Page类),插入本地缓存后再读取value;该操作耗时较长,且没必要持有锁;但如果多个请求同时请求该远端页,如果让每一个请求都读取远端页,既浪费IO资源,也没啥实际用处;故增加一个读者互斥锁,保证每一页只有一个读者正在请求远端页,其余请求同一远端页的读者会阻塞互斥锁前;待读取远端页的读者读取页数据完成并将该页插入本地缓存中,其余读者发现本地缓存已存在该页,就不会重复读取远端页了。
相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (m_data_map_[info.page_id] == nullptr) {  // 数据在远端内存
m_mutex_.unlock();
m_same_reader_mutex_[info.page_id].lock(); // 读者互斥
if (m_data_map_[info.page_id] == nullptr) { // 双重检查
auto remote_info = m_addr_map_[info.page_id];
std::string &&page_data = std::string(remote_info.size, '0');
m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
auto new_page = std::make_shared<Page>(page_data); // 构建缓存页
m_mutex_.lock();
m_data_map_[info.page_id] = new_page;
m_cache_size_++;
m_lru_list_.insert(info.page_id);
need_update_lru = false;
m_addr_map_.erase(info.page_id); // 在远端地址映射中删除该项
m_addr_list_.emplace(remote_info); // 将该页加入地址列表
m_mutex_.unlock();
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}
m_same_reader_mutex_[info.page_id].unlock();
m_mutex_.lock();
}

时不时用sizeof看看占用空间,增加一个读者互斥锁相当于每页多了40字节,占比不大。

1
2
3
4
5
6
7
8
9
int main() {
cout << "mutex:" << sizeof(std::mutex) << endl;
cout << "shared_ptr:" << sizeof(std::shared_ptr<int>) << endl;
}
/*
输出:
mutex:40
shared_ptr:16
*/

LRU优化
不论是读操作还是写操作,末尾都需要更新LRU列表;为了减小冲突,LRU采用独立的锁,不在m_mutex_锁内更新;当前页(新申请的页)并不加入LRU列表,也不会被淘汰,待当前页写满后再插入到LRU列表中;上次LRU更新的页本次也不再更新(已在队首)。

1
2
3
4
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}

map改成数组 vector改数组

1
2
3
由于最大页号确定,map<page_id,page>改成了page[max_page_id];
由于页最大记录数确定,std::vector<std::string> m_value_变成了std::string m_value_[kMaxIndex];
通过使用固定的数据结构,减少了扩容带来的开销。

程序说明

程序正确性预设
1:运行中页号不得超过kBitmapSize,页中记录数不得超过kMaxIndex,页导出字符串长度不得超过kAllocSize
2:程序经历连续的删除操作后才进入读写操作,使得rebuild最大化。在删除之前远端内存足容纳所有数据
3:即使每次插入时会检查当前页是否已满,但仍然无法阻止程序通过更新value值来增大页大小,所以页数据大小大于kAllocSize便会出现问题
4:kAllocSize小于65536,页成员就可以使用uint16存储,大于则需要使用uint32存储
5:mutex之外的操作耗时极短,可以在rebuild处理前完成

1
2
3
4
5
6
7
8
9
10
11
1: debug时系统高效充分打印所需的信息
2:架构越简洁。越不容易出错
3: 可编写简单的测试用例检验模块的正确性
4: 利用sizeof和system_clock简单估计内存占用与执行速度
5 右值引用变量在用于表达式时是左值,move后不应该再使用变量值
int &&x = 1;
f(x); // 调用 f(int& x)
f(std::move(x)); // 调用 f(int&& x)
6 编写代码时明确锁所维持的不变量是什么,而不是仅仅确保操作的原子性
m_mutex_:确保保护成员操作的原子性,并保证页数据仅存在本地(m_data_map_)与远端(m_addr_map_)中的一个位置
m_remote_read_lock_:淘汰时在数据真正写入前阻塞对应页的请求

相关代码

Engine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// kv_engine.h
namespace kv {

/* Encryption algorithm competitor can choose. */
enum aes_algorithm { CTR = 0, CBC, CBC_CS1, CBC_CS2, CBC_CS3, CFB, OFB };

/* Algorithm relate message. */
typedef struct crypto_message_t {
aes_algorithm algo;
Ipp8u *key;
Ipp32u key_len;
Ipp8u *counter;
Ipp32u counter_len;
Ipp8u *piv;
Ipp32u piv_len;
Ipp32u blk_size;
Ipp32u counter_bit;

IppsAESSpec *ctx;
} crypto_message_t;

/* Abstract base engine */
class Engine {
public:
virtual ~Engine(){};

virtual bool start(const std::string addr, const std::string port) = 0;

virtual void stop() = 0;

virtual bool alive() = 0;
};

class LocalEngineEntity; // 前置声明
/* Local-side engine */
class LocalEngine : public Engine {
public:
~LocalEngine();

bool start(const std::string addr, const std::string port) override;

void stop() override;

bool alive() override;

/* Init aes context message. */
bool set_aes();

bool encrypted(const std::string &value, std::string &encrypt_value);

/* Evaluation problem will call this function. */
crypto_message_t *get_aes() { return &m_aes_; }

bool write(const std::string &key, const std::string &value, bool use_aes = false);

bool read(const std::string &key, std::string &value);
/** The delete interface */
bool deleteK(const std::string &key);

/** Rebuild the hash index to recycle the unsed memory */
void rebuild_index();

private:
// static inline int get_index(const std::string &key) { return std::hash<std::string>()(key) & (kSharedNumber - 1); }
static inline int get_index(const std::string &key) { return CityHash16(key.c_str()) & (kSharedNumber - 1); }
kv::ConnectionManager *m_rdma_conn_;
// /* NOTE: should use some concurrent data structure, and also should take the
// * extra memory overhead into consideration */
// RDMAMemPool *m_rdma_mem_pool_;

crypto_message_t m_aes_;

LocalEngineEntity *m_entity_[kSharedNumber]; // 执行请求的实体
};

/* Remote-side engine */
class RemoteEngine : public Engine {
public:
struct WorkerInfo {
CmdMsgBlock *cmd_msg;
CmdMsgRespBlock *cmd_resp_msg;
struct ibv_mr *msg_mr;
struct ibv_mr *resp_mr;
rdma_cm_id *cm_id;
struct ibv_cq *cq;
};

~RemoteEngine(){};

bool start(const std::string addr, const std::string port) override;
void stop() override;
bool alive() override;

private:
void handle_connection();

int create_connection(struct rdma_cm_id *cm_id);

struct ibv_mr *rdma_register_memory(void *ptr, uint64_t size);

int remote_write(WorkerInfo *work_info, uint64_t local_addr, uint32_t lkey, uint32_t length, uint64_t remote_addr, uint32_t rkey);

int allocate_and_register_memory(uint64_t &addr, uint32_t &rkey, uint64_t size);

void worker(WorkerInfo *work_info, uint32_t num);

struct rdma_event_channel *m_cm_channel_;
struct rdma_cm_id *m_listen_id_;
struct ibv_pd *m_pd_;
struct ibv_context *m_context_;
bool m_stop_;
std::thread *m_conn_handler_;
WorkerInfo **m_worker_info_;
uint32_t m_worker_num_;
std::thread **m_worker_threads_;
};

} // namespace kv


// local_engine.cc
#include "assert.h"
#include "atomic"
#include "kv_engine.h"
#include "local_engine_entity.h"
#include <iostream>

namespace kv {

/**
* @description: start local engine service
* @param {string} addr the address string of RemoteEngine to connect
* @param {string} port the port of RemoteEngine to connect
* @return {bool} true for success
*/
bool LocalEngine::start(const std::string addr, const std::string port) {
m_rdma_conn_ = new ConnectionManager();
if (m_rdma_conn_ == nullptr) return false;
if (m_rdma_conn_->init(addr, port, 4, 72)) return false;
for (int i = 0; i < kSharedNumber; i++) {
m_entity_[i] = new LocalEngineEntity(this, m_rdma_conn_);
}
printf("LocalEngine::start finsh\n");
auto watcher = std::thread([]() {
sleep(60 * 9);
printf("timeout\n");
fflush(stdout);
abort();
});
watcher.detach();
return true;
}

/**
* @description: stop local engine service
* @return {void}
*/
void LocalEngine::stop() {
for (int i = 0; i < kSharedNumber; i++) {
delete m_entity_[i];
}
delete m_rdma_conn_;
printf("LocalEngine::stop finsh\n");
};

/**
* @description: get engine alive state
* @return {bool} true for alive
*/
bool LocalEngine::alive() { return true; }

/**
* @description: provide message about the aes_ecb mode
* @return {bool} true for success
*/
bool LocalEngine::set_aes() {
// Current algorithm is not supported, just for demonstration.
m_aes_.algo = CBC;
m_aes_.key_len = 16;
m_aes_.key = new Ipp8u[16]{0x60, 0x3d, 0xeb, 0x10, 0x15, 0xca, 0x71, 0xbe, 0x2b, 0x73, 0xae, 0xf0, 0x85, 0x7d, 0x77, 0x81};
if (m_aes_.key == nullptr) return false;
m_aes_.blk_size = 16;
m_aes_.piv_len = 16;
m_aes_.piv = new Ipp8u[16]{0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01, 0x00};
if (m_aes_.piv == nullptr) return false;

int ctxSize; // AES context size
ippsAESGetSize(&ctxSize); // evaluating AES context size
// allocatting memory for AES context
m_aes_.ctx = (IppsAESSpec *)(new Ipp8u[ctxSize]);
// AES context initialization
ippsAESInit(m_aes_.key, m_aes_.key_len, m_aes_.ctx, ctxSize);
return true;
}
// 参考pdf实现简单加密算法
bool LocalEngine::encrypted(const std::string &value, std::string &encrypt_value) {
Ipp8u ciph[(value.size() + m_aes_.blk_size - 1) & ~(m_aes_.blk_size - 1)];
// encrypting plaintext
ippsAESEncryptCBC((Ipp8u *)value.c_str(), ciph, value.size(), m_aes_.ctx, m_aes_.piv);
std::string tmp(reinterpret_cast<const char *>(ciph), value.size());
encrypt_value = std::move(tmp);
return true;
}

bool LocalEngine::write(const std::string &key, const std::string &value, bool use_aes) {
int index = get_index(key);
return m_entity_[index]->write(key, value, use_aes);
}
/**
* @description: read value from engine via key
* @param {string} key
* @param {string} &value
* @return {bool} true for success
*/

bool LocalEngine::read(const std::string &key, std::string &value) {
int index = get_index(key);
return m_entity_[index]->read(key, value);
}

bool LocalEngine::deleteK(const std::string &key) {
int index = get_index(key);
return m_entity_[index]->deleteK(key);
}
/* When we delete a number of KV pairs, we should rebuild the index to
* reallocate remote addr for each KV to recycle fragments. This will block
* front request processing. This solution should be optimized. */
void LocalEngine::rebuild_index() {
/** rebuild all the index to recycle the fragment */
/** step-1: block the database and not allowe any writes
* step-2: transverse the index to read each value
* step-3: read the value from the remote and write it to a new remote addr
* step-4: free all old addr
*/
printf("*********rebuild_index*********\n");
for (int i = 0; i < kSharedNumber; i++) {
m_entity_[i]->rebuild_index();
}
}

LocalEngine::~LocalEngine() {
if (nullptr != m_aes_.key) delete[] m_aes_.key;
if (nullptr != m_aes_.counter) delete[] m_aes_.counter;
if (nullptr != m_aes_.piv) delete[] m_aes_.piv;
if (nullptr != m_aes_.ctx) delete (Ipp8u *)m_aes_.ctx;
m_aes_.key = nullptr;
m_aes_.counter = nullptr;
m_aes_.piv = nullptr;
m_aes_.ctx = nullptr;
}
} // namespace kv

type

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// type.h
namespace kv {
using page_id_t = uint16_t;
using index_t = uint16_t;
struct remote_info_t { // 远端内存信息
uint64_t remote_addr;
uint32_t rkey;
uint32_t size;
};

struct data_info_t { // 数据信息
page_id_t page_id; // 页号
index_t index; // 索引
};

/* One slot stores the key and the meta info of the value which
describles the remote addr, size, remote-key on remote end. */
struct hash_map_slot {
char key[16];
data_info_t info;
hash_map_slot *next;
};
const page_id_t kNullPage = 0;
const data_info_t kNullInfo = {0xffff, 0xffff}; // 无效的信息

const int kSharedNumber = 1 << 6; // 缓存实体数目
const uint32_t kBucketNum = 1 << 21; // hash中bucket数
const uint32_t kSlotSize = (1 << 22) * 1.2 * 32 / kSharedNumber; // hash中slot数
const uint64_t kMaxDataSize = (uint64_t)1 << 36; // 测试数据大小
const uint32_t kPageSize = 60 * 1 << 10; // 页容量
const uint32_t kMaxValueSize = 1024; // value最大值
const uint32_t kMinValueSize = 80; // value最小值
const uint32_t kMaxIndex = kPageSize / 96; // 页中最大记录数,96=80+16
const uint64_t kBitmapSize = 1.4 * kMaxDataSize / kSharedNumber / kPageSize; // 运行过程中最大页号
const uint32_t kAllocSize = 1 << 16; // 分配的远端内存大小
const uint32_t kPageThreshold = 1 << 8; // 本地存储页的阈值
const uint32_t kEvictNumber = kPageThreshold * 0.05; // 一次淘汰的页数
const uint32_t kPrintFreq = 1 << 24; // 输出信息频率
const uint64_t kRebuildThreshold = (uint64_t)(1 << 30) * 36 / kSharedNumber; // 重建阈值
const uint32_t kEralyRegisterNumber = (uint64_t)(1 << 30) * 30 / kSharedNumber / kAllocSize;

class hash_map_t {
public:
/* Initialize all the pointers to nullptr. */
hash_map_t();
// index为key对应hash值
/* Find the corresponding key. */
hash_map_slot *find(const std::string &key, int index);

/* Insert into the head of the list. */
void insert(const std::string &key, const data_info_t &info, int index);

data_info_t remove(const std::string &key, int index);
// 只在rebuild时用到
void update(const std::string &key, const data_info_t &info);

private:
hash_map_slot *m_bucket_[kBucketNum];
hash_map_slot *m_slot_head_; // 可用的slot链表头部,用于连接被删除元素的slot
hash_map_slot m_hash_slot_array_[kSlotSize]; // 哈希数组
uint32_t m_slot_cnt_;
};

class Page {
public:
Page(std::string &data); // 以字符串填充页

std::string to_string(); // 页数据转换成字符串

void to_string(char *ptr);
Page() : m_cur_size_{0}, m_cur_index_{0}, m_is_dirty_{true} {};

bool is_full() { return m_cur_size_ > kPageSize; } // 页是否满

uint16_t page_size() { return m_cur_size_; } // 返回当前页大小

index_t record_number() { return m_cur_index_; } // 返回记录数

index_t insert(const std::string &key, const std::string &value) {
m_cur_size_ += 16 + value.size(); // 加上key的长度
m_key_[m_cur_index_] = key;
m_value_[m_cur_index_] = value;
return m_cur_index_++; // 返回当前索引并加一
}

index_t insert(const std::string &key, std::string &&value) {
m_cur_size_ += 16 + value.size(); // 加上key的长度
m_key_[m_cur_index_] = key;
m_value_[m_cur_index_] = std::move(value);
return m_cur_index_++; // 返回当前索引并加一
}

void update(index_t index, const std::string &value) {
m_is_dirty_ = true;
m_cur_size_ -= m_value_[index].size();
m_cur_size_ += value.size(); // 更新页当前大小
m_value_[index] = value;
}

void update(index_t index, std::string &&value) {
m_is_dirty_ = true;
m_cur_size_ -= m_value_[index].size();
m_cur_size_ += value.size(); // 更新页当前大小
m_value_[index] = std::move(value);
}

std::string read_value(index_t index) { return m_value_[index]; }

std::string read_key(index_t index) { return m_key_[index]; }

bool is_dirty() { return m_is_dirty_; }

private:
std::string m_key_[kMaxIndex];
std::string m_value_[kMaxIndex];
uint16_t m_cur_size_; // 当前页数据大小
index_t m_cur_index_; // 当前索引
bool m_is_dirty_;

// std::vector<std::string> m_value_; // 以vector存储key值,方便rebuild时更新元数据
// std::vector<std::string> m_key_; // 以vector存储记录
};

class LRUList {
public:
LRUList() = default;
void insert(page_id_t hit_id) {
m_mutex_.lock();
m_list_.emplace_front(hit_id);
m_speed_map_.insert({hit_id, m_list_.begin()});
m_mutex_.unlock();
}
void update(page_id_t hit_id) {
m_mutex_.lock();
auto iter = m_speed_map_[hit_id];
if (iter != m_list_.begin()) { // 将该页移至队首
m_list_.erase(iter);
m_list_.emplace_front(hit_id);
m_speed_map_[hit_id] = m_list_.begin();
}
m_mutex_.unlock();
}
page_id_t evict() {
m_mutex_.lock();
page_id_t vicitm_page_id = m_list_.back();
m_list_.pop_back();
m_speed_map_.erase(vicitm_page_id);
m_mutex_.unlock();
return vicitm_page_id;
}
std::vector<page_id_t> clear() { // 清空LRU列表所有元素
m_mutex_.lock();
std::vector<page_id_t> res(m_list_.begin(), m_list_.end());
m_list_.clear();
m_speed_map_.clear();
m_mutex_.unlock();
return res;
}
int size() { return m_list_.size(); }

private:
std::mutex m_mutex_;
std::list<page_id_t> m_list_; // LRU列表
std::unordered_map<page_id_t, std::list<page_id_t>::iterator> m_speed_map_; // 加速LRU列表访问
};

} // namespace kv

// type.c
#include <cassert>
#include "type.h"
namespace kv {
hash_map_t::hash_map_t() {
memset(m_bucket_, 0, sizeof(m_bucket_));
m_slot_head_ = &m_hash_slot_array_[0];
m_slot_head_->next = nullptr;
m_slot_cnt_ = 1;
}

/* Find the corresponding key. */
hash_map_slot *hash_map_t::find(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
if (cur == nullptr) {
return nullptr;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
return cur;
}
cur = cur->next;
}
return nullptr;
}

/* Insert into the head of the list. */
void hash_map_t::insert(const std::string &key, const data_info_t &info, int index) {
hash_map_slot *new_slot;
// 优先使用被删除数据的slot
if (m_slot_head_->next == nullptr) {
new_slot = &m_hash_slot_array_[m_slot_cnt_++];
assert(m_slot_cnt_ < kSlotSize);
} else {
new_slot = m_slot_head_->next;
m_slot_head_->next = new_slot->next;
}
new_slot->next = nullptr;

memcpy(new_slot->key, key.c_str(), 16);
new_slot->info = info;
if (!m_bucket_[index]) {
m_bucket_[index] = new_slot;
} else {
/* Insert into the head. */
hash_map_slot *tmp = m_bucket_[index];
m_bucket_[index] = new_slot;
new_slot->next = tmp;
}
}

// 只在rebuild时调用
void hash_map_t::update(const std::string &key, const data_info_t &info) {
int index = std::hash<std::string>()(key) & (kBucketNum - 1);
hash_map_slot *cur = m_bucket_[index];
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
cur->info = info;
return;
}
cur = cur->next;
}
}

data_info_t hash_map_t::remove(const std::string &key, int index) {
hash_map_slot *cur = m_bucket_[index];
hash_map_slot *parent = nullptr;
if (cur == nullptr) {
return kNullInfo;
}
while (cur) {
if (memcmp(cur->key, key.c_str(), 16) == 0) {
// 在bucket中删除该slot
if (parent == nullptr) {
m_bucket_[index] = cur->next;
} else {
parent->next = cur->next;
}
// 加入后备链表
cur->next = m_slot_head_->next;
m_slot_head_->next = cur;
return cur->info;
}
parent = cur;
cur = cur->next;
}
return kNullInfo;
}
/*
u16存储
记录数
页大小
各记录大小
char存储
各个记录(kv)
*/
std::string Page::to_string() {
std::string data;
data.resize(kAllocSize);
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针

u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = char_pointer + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
return data;
}
void Page::to_string(char *ptr) {
uint16_t data_start = (m_cur_index_ + 5) * sizeof(uint16_t); // 从该偏移开始存储实际的数据
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(ptr); // 解释为u32指针

u16_pointer[0] = m_cur_index_;
u16_pointer[1] = m_cur_size_;
uint16_t offset = data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) { // 加入各个记录起始地址
u16_pointer[i + 2] = offset;
offset += 16 + m_value_[i].size();
}
u16_pointer[m_cur_index_ + 2] = offset; // 实际数据最终偏移
char *p = ptr + data_start;
for (uint16_t i = 0; i < m_cur_index_; i++) {
memcpy(p, m_key_[i].c_str(), 16);
memcpy(p + 16, m_value_[i].c_str(), m_value_[i].size());
p += 16 + m_value_[i].size();
}
}
Page::Page(std::string &data) {
char *char_pointer = const_cast<char *>(data.c_str()); // 移除常量性
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(char_pointer); // 解释为u32指针
m_cur_index_ = u16_pointer[0];
m_cur_size_ = u16_pointer[1];
uint16_t offset, length;
for (uint16_t i = 0; i < m_cur_index_; i++) {
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_key_[i] = data.substr(offset, 16);
m_value_[i] = data.substr(offset + 16, length - 16);
}
m_is_dirty_ = false;
}

} // namespace kv

local_engine_entity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
// local_engine_entity.h
#pragma once
#include <condition_variable>
#include <bitset>
#include "type.h"
#include "rdma_conn_manager.h"
#include "rdma_mem_pool.h"
namespace kv {

class LocalEngine; // 前置声明
class LocalEngineEntity {
public:
LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn);
~LocalEngineEntity();
bool write(const std::string &key, const std::string &value, bool use_aes = false);
bool read(const std::string &key, std::string &value);
bool deleteK(const std::string &key);
void rebuild_index();
std::vector<uint64_t> print_memory(); // 输出内存占用信息

private:
void start_thread(); // 启动后台线程
remote_info_t request_signle_info(); // 请求单个远端地址
void register_remote_memory(); // 提前注册远端内存

hash_map_t m_page_map_; // 数据与元数据映射
std::shared_ptr<Page> m_data_map_[kBitmapSize]; // 页号与页映射(数据存于本地)
uint32_t m_cache_size_; // 本地缓存大小
LRUList m_lru_list_; // LRU列表
std::unordered_map<page_id_t, remote_info_t> m_addr_map_; // 页号与远端信息映射(数据存于远端)
uint16_t m_max_index_[kBitmapSize]; // 各个远端页最大索引
std::queue<remote_info_t> m_addr_list_; // 未使用的远端内存
std::mutex m_mutex_; // 保护以上成员

// 注意最大页号不要大于kBitmapSize!
std::bitset<kMaxIndex> m_bitmap_[kBitmapSize]; // 删除为1,正常为0
std::mutex m_same_reader_mutex_[kBitmapSize]; // 提供同一页远程读互斥访问

page_id_t m_cur_page_id_; // 当前使用页号,不存在于LRU列表
std::shared_ptr<Page> m_cur_page_; // 当前使用页

page_id_t m_vicitm_id_; // 淘汰页号
std::shared_ptr<Page> m_vicitm_page_; // 淘汰页号
page_id_t m_last_update_id_; // 上次lru更新的页号

bool m_alive_; // 控制后台进程生命周期
std::condition_variable m_cv_; // 用于唤醒后台进程
std::thread *m_backup_thread_; // 后台进程
std::mutex m_useless_mutex_; // 只是方便调用API,没啥实际意义

LocalEngine *m_engine_;
ConnectionManager *m_rdma_conn_;
RDMAMemPool *m_rdma_mem_pool_;

// 统计数据
uint64_t m_alloc_memory_{0}; // 申请远端内存大小

bool m_rebuild_allow_{true};
bool m_delete_envent_{false};
bool m_rw_envent_after_delete_{false};
};
} // namespace kv

// local_engine_entity.cc
#include <set>
#include <future>
#include <functional>
#include "local_engine_entity.h"
#include "kv_engine.h"

namespace kv {
LocalEngineEntity::LocalEngineEntity(LocalEngine *engine, ConnectionManager *rdma_conn) : m_engine_(engine), m_rdma_conn_(rdma_conn) {
m_rdma_mem_pool_ = new RDMAMemPool(m_rdma_conn_);
if (m_rdma_mem_pool_ == nullptr) {
printf("alloc rdma_mem_pool failed");
}
auto page = std::make_shared<Page>(); // 申请第一页
m_cur_page_id_ = 1;
m_cur_page_ = page;

m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_last_update_id_ = kNullPage;

m_data_map_[m_cur_page_id_] = page;
m_cache_size_ = 1;

m_alive_ = true;
// 提前申请远端内存
auto requester = std::thread([&]() { register_remote_memory(); });
requester.detach();
// 启动后台线程
start_thread();
}
// 申请kEralyRegisterNumber个kAllocSize大小的远端内存
void LocalEngineEntity::register_remote_memory() {
std::queue<remote_info_t> list;
remote_info_t remote_info;
remote_info.size = kAllocSize;
for (uint32_t i = 0; i < kEralyRegisterNumber; i++) {
m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
list.emplace(remote_info);
}
m_alloc_memory_ += kAllocSize * kEralyRegisterNumber;

m_mutex_.lock();
while (!m_addr_list_.empty()) {
list.emplace(m_addr_list_.front());
m_addr_list_.pop();
}
m_addr_list_ = std::move(list);
m_mutex_.unlock();
}

remote_info_t LocalEngineEntity::request_signle_info() {
remote_info_t remote_info;
// 先从后备地址列表选择远端地址
if (!m_addr_list_.empty()) {
remote_info = m_addr_list_.front();
m_addr_list_.pop();
return remote_info;
}
// 申请远端内存
remote_info.size = kAllocSize;
m_rdma_mem_pool_->get_mem(remote_info.size, remote_info.remote_addr, remote_info.rkey);
m_alloc_memory_ += remote_info.size;
return remote_info;
}

void LocalEngineEntity::start_thread() {
auto backup_func = [&]() {
uint64_t rebuild_threshold = kRebuildThreshold;
std::unique_lock<std::mutex> useless_lock(m_useless_mutex_); // 持有无用锁
while (m_alive_) {
while (m_cache_size_ < kPageThreshold && !(m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_)) { // 若当前大小小于阈值并且不需要重构则休眠
m_cv_.wait(useless_lock);
if (!m_alive_) { // 进程退出,该后台线程也应该退出
return;
}
}
if (m_rebuild_allow_ && m_delete_envent_ && m_rw_envent_after_delete_) {
rebuild_index();
m_rebuild_allow_ = false;
}
if (m_cache_size_ > kPageThreshold) {
for (uint32_t i = 0; i < kEvictNumber; i++) {
m_mutex_.lock();
page_id_t victim_id = m_lru_list_.evict();
std::shared_ptr<Page> victim_page = m_data_map_[victim_id];
remote_info_t info = request_signle_info();

// 设置淘汰页信息
m_data_map_[victim_id] = nullptr;
m_cache_size_--;
m_addr_map_[victim_id] = info;
m_max_index_[victim_id] = victim_page->record_number();
m_mutex_.unlock();

std::string &&vicitm_page_data = victim_page->to_string(); // 记录淘汰页数据,开始写入远端内存
// 将本地数据写入远端内存
m_rdma_conn_->remote_write((void *)vicitm_page_data.c_str(), kAllocSize, info.remote_addr, info.rkey);
}
}
}
};
m_backup_thread_ = new std::thread(backup_func);
}
LocalEngineEntity::~LocalEngineEntity() {
m_alive_ = false;
m_cv_.notify_one();
m_backup_thread_->join();
delete m_backup_thread_;
delete m_rdma_mem_pool_;
}

bool LocalEngineEntity::write(const std::string &key, const std::string &value, bool use_aes) {
if (m_rebuild_allow_ && m_delete_envent_) {
m_rw_envent_after_delete_ = true;
m_cv_.notify_one();
}

// 区分加密与非加密
std::string encrypt_value;
if (use_aes) {
m_engine_->encrypted(value, encrypt_value);
}
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);

m_mutex_.lock();
auto slot = m_page_map_.find(key, hash_index);

// 第一次写入或该页正在写入远端或该页正在远端
if (slot == nullptr || m_data_map_[slot->info.page_id] == nullptr) {
if (m_cur_page_->is_full()) { // 该页已满不可用
m_lru_list_.insert(m_cur_page_id_); // 直到页满才插入LRU列表
m_cur_page_id_++;
m_data_map_[m_cur_page_id_] = std::make_shared<Page>();
m_cur_page_ = m_data_map_[m_cur_page_id_];
m_cache_size_++;
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}

index_t index;
if (use_aes) {
index = m_cur_page_->insert(key, std::move(encrypt_value));
} else {
index = m_cur_page_->insert(key, value);
}

data_info_t info = {m_cur_page_id_, index};
if (slot == nullptr) { // 第一次写入,插入元数据
m_page_map_.insert(key, info, hash_index); // 插入key与元数据映射
} else {
m_bitmap_[slot->info.page_id].set(slot->info.index, true); // 将之前数据标记为删除
slot->info = info; // 更新元数据信息
}
m_mutex_.unlock();
} else { // 该页在本地且未被淘汰,更新页数据
data_info_t info = slot->info;
if (use_aes) {
m_data_map_[info.page_id]->update(info.index, std::move(encrypt_value));
} else {
m_data_map_[info.page_id]->update(info.index, value);
}
m_mutex_.unlock();
// 更新LRU列表
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}
}
return true;
}
bool LocalEngineEntity::read(const std::string &key, std::string &value) {
if (m_rebuild_allow_ && m_delete_envent_) {
m_rw_envent_after_delete_ = true;
m_cv_.notify_one();
}
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
bool need_update_lru = true;
m_mutex_.lock();

auto slot = m_page_map_.find(key, hash_index);
if (slot == nullptr) { // 元数据不存在
m_mutex_.unlock();
return false;
}
data_info_t info = slot->info;
if (info.page_id == m_vicitm_id_) {
value = m_vicitm_page_->read_value(info.index);
m_mutex_.unlock();
return true;
}
if (m_data_map_[info.page_id] == nullptr) { // 数据在远端内存
m_mutex_.unlock();
m_same_reader_mutex_[info.page_id].lock(); // 读者互斥
if (m_data_map_[info.page_id] == nullptr) { // 双重检查
auto remote_info = m_addr_map_[info.page_id];
std::string &&page_data = std::string(remote_info.size, '0');
m_rdma_conn_->remote_read((void *)page_data.c_str(), remote_info.size, remote_info.remote_addr, remote_info.rkey);
auto new_page = std::make_shared<Page>(page_data); // 构建缓存页
m_mutex_.lock();
m_data_map_[info.page_id] = new_page;
m_cache_size_++;
m_lru_list_.insert(info.page_id);
need_update_lru = false;
m_addr_map_.erase(info.page_id); // 在远端地址映射中删除该项
m_addr_list_.emplace(remote_info); // 将该页加入地址列表
m_mutex_.unlock();
// 尝试唤醒后台进程
if (m_cache_size_ > kPageThreshold) {
m_cv_.notify_one();
}
}
m_same_reader_mutex_[info.page_id].unlock();
m_mutex_.lock();
}
value = m_data_map_[info.page_id]->read_value(info.index);
m_mutex_.unlock();
if (info.page_id != m_last_update_id_ && info.page_id != m_cur_page_id_ && need_update_lru) {
m_lru_list_.update(info.page_id);
m_last_update_id_ = info.page_id;
}
return true;
}

bool LocalEngineEntity::deleteK(const std::string &key) {
m_delete_envent_ = true;
int hash_index = std::hash<std::string>()(key) & (kBucketNum - 1);
m_mutex_.lock();
data_info_t info = m_page_map_.remove(key, hash_index); // 删除对应key的元数据
m_mutex_.unlock();
m_bitmap_[info.page_id].set(info.index, true); // 将对应记录标记为删除
return true;
}

void LocalEngineEntity::rebuild_index() {
std::lock_guard<std::mutex> lk(m_mutex_);
std::string key;
std::string value;
data_info_t data_info;
std::shared_ptr<Page> page;
std::unordered_map<page_id_t, remote_info_t> tmp_addr_map; // 暂时存储页号与远端地址映射
uint32_t new_cache_size = 1;
std::vector<page_id_t> local_id = m_lru_list_.clear();
auto new_page = m_cur_page_;
page_id_t new_page_id = m_cur_page_id_;
// 处理本地缓存页
for (auto &page_id : local_id) {
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
m_lru_list_.insert(page_id);
new_cache_size++;
} else if (!bitmap.all()) { //存在有效记录
auto page = m_data_map_[page_id];
int record_num = page->record_number();
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) { // 该记录未被删除
if (new_page->is_full()) { // 页满,写入本地
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
key = page->read_key(i);
data_info.index = new_page->insert(key, page->read_value(i));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
m_data_map_[page_id] = nullptr; // 删除原先页
} else {
m_data_map_[page_id] = nullptr; // 删除原先页
}
}
page_id_t page_id;
remote_info_t info;
char head_data[kMaxIndex * 10];
char kv_data[2 * kMaxValueSize];
uint16_t *u16_pointer = reinterpret_cast<uint16_t *>(&head_data);
uint16_t head_length;
uint16_t offset, length;

// 处理远端内存页
for (auto &kv : m_addr_map_) {
page_id = kv.first;
info = kv.second;
auto &bitmap = m_bitmap_[page_id];
if (bitmap.none()) { // 不存在删除的记录,不进行操作
tmp_addr_map.insert({page_id, info});
} else if (bitmap.all()) { // 不存在有效记录,将后端地址加入地址列表
m_addr_list_.emplace(info);
} else {
bool avai_info = true; // 是否将该远端地址加入地址列表
head_length = (m_max_index_[page_id] + 5) * sizeof(uint16_t);
// 读取页头部数据
m_rdma_conn_->remote_read(head_data, head_length, info.remote_addr, info.rkey);
int record_num = m_max_index_[page_id];
for (int i = 0; i < record_num; i++) {
if (!bitmap.test(i)) {
if (new_page->is_full()) {
if (new_cache_size > kPageThreshold) { // 本地页满,写入远程
std::string &&page_data = new_page->to_string();
uint32_t len = page_data.length();
m_rdma_conn_->remote_write((void *)page_data.c_str(), len, info.remote_addr, info.rkey);
tmp_addr_map.insert({new_page_id, info}); // 暂时记录页号与远程地址映射
avai_info = false;
} else { // 写入本地缓存
m_data_map_[new_page_id] = new_page;
new_cache_size++;
m_lru_list_.insert(new_page_id);
}
new_page = std::make_shared<Page>();
new_page_id++;
}
// 读出数据插入新页并更新元数据映射
offset = u16_pointer[i + 2];
length = u16_pointer[i + 3] - u16_pointer[i + 2];
m_rdma_conn_->remote_read(kv_data, length, info.remote_addr + offset, info.rkey);
key = std::string(kv_data, kv_data + 16);
value = std::string(kv_data + 16, kv_data + length);
data_info.index = new_page->insert(key, std::move(value));
data_info.page_id = new_page_id;
m_page_map_.update(key, data_info);
}
}
if (avai_info) {
m_addr_list_.emplace(info);
}
}
}
m_addr_map_ = std::move(tmp_addr_map);
m_cur_page_id_ = new_page_id;
m_cur_page_ = new_page;
m_vicitm_id_ = kNullPage;
m_vicitm_page_ = nullptr;
m_cache_size_ = new_cache_size;
m_last_update_id_ = kNullPage;
m_data_map_[new_page_id] = new_page;
}

std::vector<uint64_t> LocalEngineEntity::print_memory() {
// 输出一系列统计数据
uint64_t key_metadata = kSlotSize * 32 + kBucketNum * 8;
uint64_t page_id_size = m_cur_page_id_;
uint64_t page_metadata = page_id_size * 48 + kBitmapSize * 1.2 * kMaxIndex;
uint64_t page_size = m_cache_size_ * (kPageSize);
return {key_metadata, page_id_size, page_metadata, page_size, m_alloc_memory_};
}
} // namespace kv