CVE漏洞中文网

0DayBank一个专门收集整理全球互联网漏洞的公开发布网站
  1. 首页
  2. 漏洞列表
  3. 正文

lock free

2019年4月3日 535点热度 0人点赞 0条评论

菜单腾讯云备案控制台
云+社区
专栏问答沙龙快讯团队主页开发者手册云学院TVP
找文章 / 找答案 / 找技术大牛
找文章 / 找答案 / 找技术大牛
写文章提问登录注册
Lock-Free 学习总结
写文章
Lock-Free 学习总结
gaccob发表于贾老师の博客
437
Lock-Free
Lock-Free:

如果一个方法是 Lock-Free 的, 它保证线程无限次调用这个方法都能够在有限步内完成.

相比于传统的基于 Mutex 锁机制, Lock-Free 有下面的优势:

Lock-Free 的速度更快
线程之间不会相互影响, 没有死锁
不受异步信号影响, 可重入
不会发生线程优先级反转
在通常使用 Mutex 互斥锁的场景, 有的线程抢占了锁, 其他线程则会被阻塞, 当获得锁的进程挂掉之后, 整个程序就 block 住了. 但在 Lock-Free 的程序中, 单个线程挂掉, 也不会影响其他线程, 因为线程之间不会相互影响.

但是, Lock-Free 也有不少缺陷:

只能利用有限的原子操作, 例如 CAS (操作的位数有限), 编码实现复杂
竞争会加剧, 优先级不好控制
测试时需要考虑各种软硬件环境, 很难做到尽善尽美
再引入一个 Wait-Free 概念:

假如一个方法是 Wait-Free 的, 那么它保证了每一次调用都可以在有限的步骤内结束.

一般来说: 阻塞 > Lock-Free > Wait-Free

本文的讨论范畴在 Lock-Free, 如果涉及 Wait-Free 会单独说明.

CAS 原语
CAS (compare and swap) 是 CPU 硬件同步原语(primitive), CAS(V, A, B) 操作可以用下面的代码来示意:

template
bool CAS(T* addr, T expect_val, T val) {
if (*addr == expect_val) {
*addr = val;
return true;
}
return false;
}
从 80486 开始, 所有的 Intel 处理器上, 通过一条汇编指令 CMPXCHG 即可实现 CAS 操作. CAS 的价值在于它是一个原子操作, 不会被 CPU中断或者其他方式打断, 因为在硬件层实现, 所以开销极小.

CAS 并不是一项新技术, 它的使用可以追溯到 70 年代, 早在 80 年代就有很多经典书籍中提到使用 CAS 来实现并行编程, 如 USC 大牛 Kai HWang 的 "Computer Architecture and Parallel Processing".

GCC 4.1+ 开始支持 CAS 的原子操作:

bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval)
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval)
通常将 CAS 用于同步的方式是从地址 V 读取值 A, 执行多步计算来获得新值 B, 然后使用 CAS 将 V 的值从 A 改为 B, 如果 V 处的值尚未同时更改, 则 CAS 操作成功.

与 CAS 类似的还有:

Fetch-And-Add, 对变量做 +1 的原子操作;
Test-And-Set, 对变量赋值并传回原值;
在一些 CPU 上还实现了 DCAS (double CAS) 原语, 比如 Motorola 68040, DCAS 的语义可以用下面的代码来表示:

template
bool DCAS(T1* p1, T2* p2, T1 e1, T2 e2, T1 v1, T2 v2) {
if (*p1 == e1 && *p2 == e2) {
*p1 = v1;
*p2 = v2;
return true;
}
return false;
}
还有一些 32 位 CPU 上实现了 64 位的 CAS, 也被成为 CAS2: 相比于 DCAS, CAS2 原语比较内存中两个相邻字长, 功能弱, 但是更通用.

Lock-Free FIFO
引用参考文献[3]中的一个基于链表的 FIFO 示例:

// 伪码说明: q^ 表示内存 q 处的值

// 链表示意
// dummy -> a -> b -> c
// | |
// V V
// head tail

// 从尾部出列
Enquque(x)
q <- new record q^.value <- x q^.next <- NULL // 通过 CAS 操作保证将新节点加入到链表尾部 repeat p <- tail succ <- CAS(p^.next, NULL, q) util succ = TRUE // 更新 tail, 不可能失败(当前线程对 p^.next 赋值, 相当于一把锁) CAS(tail, p, q) // 从头部入列 Dequeue() repeat p <- head if p^.next = NULL error "queue empty" until CAS(head, p, p^.next) return p^.next 上面的实现会存在一个问题, 线程之前会相互影响, 即并非 Lock-Free: 当多个线程并发时, 只有一个线程能完成 CAS(p^.next, NULL, q), 其他线程只能等待这个线程完成 CAS(tail, p, q) 操作之后才能继续. 更坏的情况, 如果这个线程在 CAS(tail, p, q) 之前出现异常, 那么所有的线程都被挂住了. 文献[3]中提出了两种改进方案: // 改进方案一: 严格保证 tail 总是指向链表尾部, 增加一步 CAS 操作 Enqueue(x) q <- new record q^.value <- x q^.next <- NULL // 通过 CAS 操作保证将新节点加入到链表尾部 repeat p <- tail succ <- CAS(p^.next, NULL, q) // 其他线程并发操作, 尝试更新 tail (失败忽略) if succ != TRUE CAS(tail, p, p^.next) util succ = TRUE // 尝试更新 tail (失败忽略) CAS(tail, p, q) // 改进方案二: tail 并不总是指向链表尾, 以一定的 CPU 开销来换取 Lock-Free // 如果有 p 个并发线程, 则 tail 最多与实际链表尾部相差 2p - 1 个位置, 时间开销是可控的 Enqueue(x) q <- new record q^.value <- x q^.next <- NULL p <- tail old_p <- p // p 不一定是链表尾, 通过遍历保证操作合法 repeat while p^.next != NULL p = p^.next until CAS(p^.next, NULL, q) // 尝试更新 tail (失败忽略) CAS(tail, old_p, q) Lock-Free MAP 上一节中给出的都是伪代码的示例, 落地到实际的编程语言中, 需要更具象. 引用文献[1]的一个 WRRMMap (Write Rarely Read Many) 示例: template
class WRRM {
Map* map_;

public:
// 查询无需加锁
V LookUp(const K& k) {
return (*map_)[k];
}
// 更新做一次全量拷贝, 并插入新数据
void Update(const K& k, const V& v) {
Map* new_ = 0;
Map* old_;
do {
old_ = map_;
if (new_) delete new_;
new_ = new Map(*old_);
new_->insert(make_pair(k, v));
} while (!CAS(&map_, old_, new_));
}
// 什么时候释放 old_ 资源?
};
上面例子中都存在一个问题, 即 MAP 的 Update() 操作中, 何时释放老的资源?

如果是有自动 GC 机制的语言, 那就不需要纠结, 直接交给语言去处理就好了.
如果需要手动回收资源, 例如 C++, 则需要周详考虑, 因为当准备 delete old 时, 可能还有其他线程在访问.
文献[1] 中对第 2 种情给出了两种解决方法:

等待一段时间, 在 Update() 完成拷贝之后, 其他线程再访问 LookUp() 会访问新数据, 老数据的访问会越来越少, 直到没有. 但这并不是一个完备的方案, 而且有可能带来优先级反转.
另外一个解决方法是对 map 的访问做引用计数, 并利用 DCAS (或者 CAS2) 保证同步安全, 并给出了一个 sample:
template
class WRRM {
// map <--> 访问的引用计数
// map 和 ref-count 的内存是连续的, 所以使用 CAS2 原语
typedef std::pair*, unsigned> Data;
Data data_;

public:
// CAS 操作保证引用计数正确增加
V LookUp(const K& k) {
Data old, fresh;
do {
old = data_;
fresh = old;
++ fresh.second; // 开始访问 引用计数自增
} while (!CAS2(&data_, old, fresh));
V result = (fresh.first)[k];
do {
old = data_;
fresh = old;
-- fresh.second; // 访问完毕 引用计数自减
} while (!CAS2(&data_, old, fresh));
return result;
}

// 原文献中 引用计数的 CAS 阈值设置为 1 (old.second = 1)
// 但是从代码理解与实现角度来看 0 会更合适一些
// 只有在 data 的引用计数 == 0 时, 才完成 update, 并释放原来的 map 资源
void Update(const K& k, const V& v) {
Data old, fresh;
old.second = 0;
fresh.first = NULL;
fresh.second = 0;
// 原文献这里还有一处临时变量的优化, 这里忽略掉了, 不影响示范
do {
old.first = data_.first;
if (fresh.first) delete fresh.first;
fresh.first = new Map(old.first);
fresh.first->insert(std::make_pair(k, v));
} while (!CAS2(&data_, old, fresh));
delete old.first;
}
};
上面的例子虽然解决了垃圾回收的问题, 但还存在一定的缺陷:

在 Update() 时, 如果一直有线程在 LookUp(), 引用计数一直大于 0, 那么 Update() 线程会一直处于等待的状态, 还有可能导致线程饿死. 所以, WRRMMap 成了一个 Write-Rarely-Read-Many-But-Not-Too-Many 的 map.

针对上述问题, 文献[1]的作者在文献[2]又中提出了一种更完善的解决方案: Hazard 链表.

全局维护一份 hazard 链表, LookUp() 过程中将 map 指针挂载上去, 访问完毕再移除.(一定程度上相当于做了引用计数).
每个线程维护一份待释放的 garbage 链表, 每次 Update() 之后将要释放的 old map 挂载上去.
per 线程的 garbage 链表在一定时机做回收检查, 如果数据在 hazard 链表中不存在, 即不再被访问, 即可以安全释放.
Hazard 链表能解决的问题在于:

LookUp() 和 Update() 操作都是真正的 Lock-Free, 读/写线程都不会受其他线程的影响, 保持各自独立的行为.
额外引入的数据量(全局的 Hazard 链表, 每个线程各自的 garbage 链表)不算大, 链表也可以被 O(1) 的哈希表代替, 效率比较高.
这篇文献中使用了 Map 来做示范, 实际上可以很容易的扩展到其他类型的数据结构.
如果要扩展到 Wait-Free, 可以更进一步考虑陷阱技术(trap), 这里不深入讨论.
代码示意: (为了方便理解, 稍微做了一些调整)

// 全局的 hazard 链表
class Hazard;
static Hazard* hazard_head;
static int hazard_len;

// 每个线程的 garbage 链表
__per_thread__ vector< Map* > garbage_list;
static const size_t GARBAGE_THRESHOLD = 4;

class Hazard {
Hazard* next_;
int active_;
public:
void* hazard_;

static Hazard* Acquire() {
Hazard* p = hazard_head;

// 从 hazard 链表出找到一个空闲的, 并将 active 置为 1
for (; p; p = p->next_) {
if (p->active_ || !CAS(&p->active_, 0, 1)) {
continue;
}
return p;
}

// 现有的找不到, 创建一个
// 先修改链表长度
int old_len;
do {
old_len = hazard_len;
} while (!CAS(&hazard_len, old_len, old_len + 1));

// 再创建后插入链表
p = new Hazard;
p->active_ = 1;
p->hazard_ = NULL;
Hazard* old;
do {
old = hazard_head;
p->next = old;
} while (!CAS(&hazard_head, old, p));

return p;
}

static void Release(Hazard* p) {
// 重置 active 标记
p->hazard_ = NULL;
p->active = 0;
}
};

// 垃圾回收
void GarbageCollect(Hazard* head) {
// 遍历 hazard 链表, 排序
std::vector hazards;
while (head) {
void* p = head->hazard_;
if (p) hazards.push_back(p);
head = head->next_;
}
std::sort(hazards.begin(), hazards.end(), std::less());

// 遍历当前线程的 garbage 链表 检索
std::vector*>::iterator it = garbage_list.begin();
while (it != garbage_list.end()) {
// 如果不在 hazard 链表中: 即不在被访问, 可以安全删除
if (!std::binary_search(hazards.begin(), hazards.end(), *it)) {
delete *it;
// 一个小trick, 与链表尾部做一下交换
if (&*it != &garbage_list.back()) {
*it = garbage_list.back();
}
garbage_list.pop_back();
} else {
++ it;
}
}
}

// Lock-Free 的数据结构 Write-Rarely-Read-Many-Map
template
class WRRM {
Map* map_;

public:
V LookUp(const K& k) {
Hazard* record = Hazard::Aquire();

// 保证挂载到 hazard 链表中的 与 正在访问的 map_ 指针一致
// 避免同步问题
Map* ptr;
do {
ptr = map_;
record->hazard_ = ptr;
} while (map_ != ptr);

V result = (\*ptr)[k];
Hazard::Release(record);
return result;
}

void Update(const K& k, const V& v) {
Map* old_map, new_map = 0;
do {
old_map = map_;
if (new_map) delete new_map;
new_map = new Map(*old_map);
new_map[k] = v;
} while (!CAS(&map_, old_map, old_map));

// 这里回收 old_map 资源
garbage_list.push_back(old_map);
if (garbage_list.size() >= GARBAGE_THRESHOLD) {
GarbageCollect(hazard_head);
}
}
};
Lock-Free Queue
CodeProject 上有一篇关于基于数组的 Lock-Free 循环队列的参考文章[4], 直接看示例代码(为了篇幅简洁和方便理解做了一些修改):

// 仅示意 不要在意语法细节
templaye
struct Queue {
// 循环数组
T* data_[SIZE];

// 三组游标控制
volatile uint32_t read_index_;
volatile uint32_t write_index_;
volatile uint32_t read_max_index_;
};

// 下标映射
template
inline uint32_t Queue::index(uint32_t count) {
return (count % SIZE);
}

// 插入数据
template
bool Queue::push(const T &data) {
uint32_t read_index, write_index;
do {
read_index = read_index_;
write_index = write_index_;
// 队列已满
if (index(write_index + 1) == index(read_index)) {
return false;
}
// 先占位, 修改 write_index_ 游标
} while (!CAS(&write_index_, write_index, (write_index + 1)));

data_[index(write_index)] = data;

// 修改 read_max_index_ 游标
// 这里有可能需要等待其他 producer 操作完成, 所以如果操作失败, 可以 yield 一段时间
while (!CAS(&read_max_index_, write_index, (write_index + 1))) {
sched_yield();
}
return true;
}

// pop 数据
template
bool Queue::pop(T &data) {
uint32_t read_max_index, read_index;
do {
read_index = read_index_;
read_max_index = read_max_index_;
// 队列为空
if (index(read_index) == index(read_max_index)) {
return false;
}

data = data_[index(read_index)];

// 修改 read_index_ 游标
if (CAS(&read_index_, read_index, (read_index + 1))) {
return true;
}
} while(1);
return false;
}
上面的循环队列是一个很实用的例子, 但很遗憾, 确并非一个符合"Lock-Free"标准的设计, 不同线程之间会相互影响:

线程 T1 执行 push(), 在 CAS(&writeindex, XX, XX) 之后给 data 赋值时挂了, 对其他生产者线程来说, CAS(&read_maxindex, XX, XX) 游标再也不可能成功, 进程会被卡住, 这已经违背了 Lock-Free 的定义.

幸运的是, 在只有单个生产者时, read_maxindex 退化为 writeindex, 会减少一次 CAS 操作, 这是 Queue 会进化为 Lock-Free 的完全体, 并提高效率. 退化后的代码参考如下:

// 插入数据
template
bool Queue::push(const T &data) {
uint32_t read_index, write_index;
do {
read_index = read_index_;
write_index = write_index_;
// 队列已满
if (index(write_index + 1) == index(read_index)) {
return false;
}
// 先占位, 修改 write_index_ 游标
} while (!CAS(&write_index_, write_index, (write_index + 1)));
data_[index(write_index)] = data;
return true;
}

// pop 数据
template
bool Queue::pop(T &data) {
uint32_t read_max_index, read_index;
do {
read_index = read_index_;
read_max_index = write_index_;
// 队列为空
if (index(read_index) == index(read_max_index)) {
return false;
}
data = data_[index(read_index)];
// 修改 read_index_ 游标
if (CAS(&read_index_, read_index, (read_index + 1))) {
return true;
}
} while(1);
return false;
}
此时, 即便生产者线程(仅有一个) push() 操作意外失败, 带来的影响也仅仅是那个 index 上的数据错误, 不影响整个进程的演进, 可以根据应用场景来决定处理策略: 恢复生产者, 或者告警退出.

CAS 的 ABA 问题
ABA 问题描述:

切换到线程 T1, 获取内存 V 的值 A
切换到线程 T2, 获取内存 V 的值 A, 修改成 B, 然后再修改成 A
切换到线程 T1, 获取内存 V 的值还是 A, 继续执行
coolshell 上有篇文章给出了一个生动的例子(From 维基百科):

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了.

更具有参考意义的是Hazard Pointer Wiki上提到的一个 Lock-Free 堆栈的例子:

当前栈元素 [A, B, C], 栈顶 head 指向 A
线程 T1 执行 pop() 准备 CAS(&head, B, A)
线程 T2 抢占, pop A, pop B, 然后 push A
线程 T1 恢复, CAS(&head, B, A) 成功, 则此时 head 指向一个被 pop 的元素 B
借鉴无锁 MAP 的设计, ABA 问题也可以通过引用计数来解决: 对内存 V 做一个引用计数 Ref, 当修改 V 的值时, Ref 自增, compare 会同时比较 V 和 Ref, 当 值都一致时才能做 swap.

引用计数的解决方法需要 DCAS 原语支持, 或者保证 V 和 Ref 在内存中连续的前提下 CAS2 原语支持, 不过这样也会带来编码的设计的复杂度.

DCAS不是无锁算法设计的银弹.

参考文章
Lock-Free Data Structures, Andrei Alexandrescu, 2004
Lock-Free Data Structures with Hazard Pointers, Andrei Alexandrescu, Maged Michael, 2004
Implementing Lock-Free Queues, John D, Valois, 1994
Yet another implementation of a lock-free circular array queue, Faustino Frechilla, 2011
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于 2018-08-02
其他
举报

贾老师の博客
74 篇文章29 人订阅
Linux coredump
浮点数存储格式
C/C++ 中的异常处理
【笔记】读卡马克《Parallel Implementations》
《企业IT架构转型之道》读书笔记
我来说两句
0 条评论
登录 后参与评论
上一篇:echarts - 特殊需求实现代码汇总之【饼图】篇
下一篇:【开源】小Z为DNSmasq写了一个WEB界面PHPDNS
相关文章
来自专栏非典型技术宅
Swift多线程之Operation:异步加载CollectionView图片1. Operation 设置依赖关系2. 前置知识点内容3. CollectionView中图片进行异步加载

1797
来自专栏Golang语言社区
Golang视角下的设计模式
这篇文章想聊聊Golang语言下的设计模式问题,我觉得这个话题还是比较有意思的。Golang没有像java那样对设计模式疯狂的迷恋,而是摆出了一份“看庭前花开花...

1092
来自专栏拭心的安卓进阶之路
Java 集合深入理解(10):Deque 双端队列
什么是 Deque ? Deque 是 Double ended queue (双端队列) 的缩写,读音和 deck 一样,蛋壳。 Deque 继承自 Queu...

2769
来自专栏小鄧子的技术博客专栏
算法与数据结构(3),并发结构
本来已经合上电脑了,躺在床上,翻来覆去睡不着,索性,不睡了,起床,听听歌,更新简书,这可能是这一系列的最后一篇,脚趾的伤也好的差不多了,下个礼拜就要全身心找工作...

1063
来自专栏铭毅天下
干货 | Elasticsearch Nested类型深入详解
本文通过一个例子将Nested类型适合解决的问题、应用场景、使用方法串起来, 文中所有的DSL都在Elasticsearch6.X+验证通过。

1663
来自专栏乐百川的学习频道
设计模式(二十四) 访问者模式
访问者模式提供了一种方法,将算法和数据结构分离。假设我们需要对一个数据结构进行不同的操作,就可以考虑使用访问者模式。访问者模式的要点在于,需要一个访问者接口,提...

2116
来自专栏流媒体
Json海量数据解析Json海量数据解析
​ 在android开发中,app和服务器进行数据传输时大多数会用到json。在解析json中通常会用到以下几种主流的解析库:jackson、gson、fa...

1152
来自专栏calmound
缺省参数是编译期间绑定的,而不是动态绑定
看一个程序 #include using namespace std; class A { public: virtual void ...

3216
来自专栏大内老A
WCF技术剖析之十四:泛型数据契约和集合数据契约(上篇)
在.NET Framework 2.0中,泛型第一次被引入。我们可以定义泛型接口、泛型类型、泛型委托和泛型方法。序列化依赖于真实具体的类型,而泛型则刻意模糊了具...

2218
来自专栏菩提树下的杨过
Unity Application Block 1.2 学习笔记
昨天花了一天时间,把IOC/DI的相关文章以及Unity相关的一些文章基本在园子里搜了个遍 先给出几篇不错的文章链接: Unity Application Bl...

19710

扫描二维码

扫码关注云+社区0daybank

标签: 暂无
最后更新:2019年4月3日

小助手

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >

文章评论

您需要 登录 之后才可以评论

COPYRIGHT © 2024 www.pdr.cn CVE漏洞中文网. ALL RIGHTS RESERVED.

鲁ICP备2022031030号

联系邮箱:wpbgssyubnmsxxxkkk@proton.me