OpenClaw 是一个并发数据访问工具,无冲突版通过优化算法减少线程间的资源竞争,提高并发性能,以下是一个优化实现:

核心实现
#include <atomic>
#include <vector>
#include <functional>
#include <shared_mutex>
#include <iostream>
#include <thread>
#include <chrono>
template<typename T>
class ConflictFreeOpenClaw {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(const T& val) : data(val), next(nullptr) {}
};
// 使用多槽位减少冲突
static constexpr size_t SLOT_COUNT = 16;
std::vector<std::atomic<Node*>> heads;
std::vector<std::atomic<Node*>> tails;
// 用于读取的共享锁
mutable std::shared_mutex read_mutex;
// 线程本地存储减少竞争
static thread_local size_t thread_slot;
// 使用CAS操作实现无锁插入
bool try_insert_atomic(Node* new_node, size_t slot_idx) {
Node* expected = tails[slot_idx].load(std::memory_order_acquire);
new_node->next.store(expected, std::memory_order_relaxed);
return tails[slot_idx].compare_exchange_weak(
expected, new_node,
std::memory_order_release,
std::memory_order_acquire
);
}
// 根据线程ID选择槽位(减少冲突)
size_t get_slot_index() const {
static std::atomic<size_t> counter{0};
if (thread_slot == static_cast<size_t>(-1)) {
thread_slot = counter.fetch_add(1, std::memory_order_relaxed) % SLOT_COUNT;
}
return thread_slot;
}
public:
ConflictFreeOpenClaw() : heads(SLOT_COUNT), tails(SLOT_COUNT) {
for (size_t i = 0; i < SLOT_COUNT; ++i) {
heads[i].store(nullptr, std::memory_order_relaxed);
tails[i].store(nullptr, std::memory_order_relaxed);
}
}
~ConflictFreeOpenClaw() {
clear();
}
// 无冲突插入
void insert(const T& value) {
Node* new_node = new Node(value);
size_t slot_idx = get_slot_index();
// 尝试CAS插入,失败时重试
while (!try_insert_atomic(new_node, slot_idx)) {
std::this_thread::yield(); // 让出CPU,减少竞争
}
// 更新头指针
heads[slot_idx].store(new_node, std::memory_order_release);
}
// 批量插入减少锁开销
template<typename Iterator>
void bulk_insert(Iterator begin, Iterator end) {
std::shared_lock lock(read_mutex);
for (auto it = begin; it != end; ++it) {
insert(*it);
}
}
// 乐观读取(无锁)
bool try_find(const T& value, std::function<bool(const T&, const T&)> comparator) const {
for (size_t i = 0; i < SLOT_COUNT; ++i) {
Node* current = heads[i].load(std::memory_order_acquire);
while (current != nullptr) {
if (comparator(current->data, value)) {
return true;
}
current = current->next.load(std::memory_order_acquire);
}
}
return false;
}
// 安全读取(使用共享锁)
void for_each_safe(std::function<void(const T&)> processor) const {
std::shared_lock lock(read_mutex);
for (size_t i = 0; i < SLOT_COUNT; ++i) {
Node* current = heads[i].load(std::memory_order_acquire);
while (current != nullptr) {
processor(current->data);
current = current->next.load(std::memory_order_acquire);
}
}
}
// 延迟删除减少冲突
void remove_if(std::function<bool(const T&)> predicate) {
std::unique_lock lock(read_mutex);
for (size_t i = 0; i < SLOT_COUNT; ++i) {
Node* current = heads[i].load(std::memory_order_acquire);
Node* prev = nullptr;
while (current != nullptr) {
Node* next = current->next.load(std::memory_order_acquire);
if (predicate(current->data)) {
if (prev == nullptr) {
// 尝试原子更新头指针
heads[i].compare_exchange_weak(
current, next,
std::memory_order_release,
std::memory_order_acquire
);
} else {
prev->next.store(next, std::memory_order_release);
}
// 延迟删除实际节点
std::thread([node = current]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
delete node;
}).detach();
current = next;
} else {
prev = current;
current = next;
}
}
}
}
// 并行处理
template<typename Func>
void parallel_process(Func processor, size_t thread_count = 4) {
std::vector<std::thread> threads;
size_t items_per_thread = SLOT_COUNT / thread_count;
for (size_t t = 0; t < thread_count; ++t) {
threads.emplace_back([this, t, items_per_thread, &processor]() {
size_t start = t * items_per_thread;
size_t end = (t == thread_count - 1) ? SLOT_COUNT : start + items_per_thread;
for (size_t i = start; i < end; ++i) {
Node* current = heads[i].load(std::memory_order_acquire);
while (current != nullptr) {
processor(current->data);
current = current->next.load(std::memory_order_acquire);
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
}
void clear() {
std::unique_lock lock(read_mutex);
for (size_t i = 0; i < SLOT_COUNT; ++i) {
Node* current = heads[i].exchange(nullptr, std::memory_order_acquire);
while (current != nullptr) {
Node* next = current->next.load(std::memory_order_acquire);
delete current;
current = next;
}
tails[i].store(nullptr, std::memory_order_release);
}
}
// 性能监控
size_t approximate_size() const {
size_t count = 0;
for (size_t i = 0; i < SLOT_COUNT; ++i) {
Node* current = heads[i].load(std::memory_order_acquire);
while (current != nullptr) {
++count;
current = current->next.load(std::memory_order_acquire);
}
}
return count;
}
};
// 初始化线程本地存储
template<typename T>
thread_local size_t ConflictFreeOpenClaw<T>::thread_slot = static_cast<size_t>(-1);
使用示例
int main() {
ConflictFreeOpenClaw<int> claw;
// 并发插入示例
std::vector<std::thread> writers;
for (int i = 0; i < 10; ++i) {
writers.emplace_back([&claw, i]() {
for (int j = 0; j < 1000; ++j) {
claw.insert(i * 1000 + j);
}
});
}
// 并发读取示例
std::vector<std::thread> readers;
for (int i = 0; i < 5; ++i) {
readers.emplace_back([&claw, i]() {
claw.for_each_safe([](int value) {
// 处理数据
std::this_thread::sleep_for(std::chrono::microseconds(10));
});
});
}
// 等待所有线程完成
for (auto& t : writers) t.join();
for (auto& t : readers) t.join();
// 批量操作
std::vector<int> bulk_data(1000);
std::iota(bulk_data.begin(), bulk_data.end(), 10000);
claw.bulk_insert(bulk_data.begin(), bulk_data.end());
// 并行处理
std::atomic<int> sum{0};
claw.parallel_process([&sum](int value) {
sum += value;
}, 4);
std::cout << "Total items: " << claw.approximate_size() << std::endl;
std::cout << "Sum: " << sum.load() << std::endl;
// 条件删除
claw.remove_if([](int value) {
return value % 2 == 0; // 删除所有偶数
});
return 0;
}
性能优化策略
- 多槽位哈希:使用多个头尾指针减少竞争
- 线程本地存储:为每个线程分配专用槽位
- CAS操作:使用比较并交换实现无锁插入
- 延迟删除:分离删除操作减少阻塞
- 批量操作:减少锁获取次数
- 乐观读取:无锁读取提高并发性
- 并行处理:多线程同时处理不同槽位
编译选项(GCC/Clang)
g++ -std=c++17 -O3 -pthread -march=native openclaw.cpp -o openclaw
这个实现通过多种技术减少冲突,特别适合高并发场景下的数据管理。
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。