背景 由于在多处理器环境中某些资源的有限性,有时需要互斥访问(mutual exclusion),这时候就需要引入锁的概念,只有获取了锁的任务才能够对资源进行访问,由于多线程的核心是CPU的时间分片,所以同一时刻只能有一个任务获取到锁。 内核当发生访问资源冲突的时候,通常有两种处理方式:
Spinlock 是内核中提供的一种比较常见的锁机制,自旋锁是“原地等待”的方式解决资源冲突的。即,一个线程获取了一个自旋锁后,另外一个线程期望获取该自旋锁,获取不到,只能够原地“打转”(忙等待)。 由于自旋锁的这个忙等待的特性,注定了它使用场景上的限制 —— 自旋锁不应该被长时间的持有(消耗 CPU 资源)。 自旋锁的优点自旋锁不会使线程状态发生切换,一直处于用户态,即线程一直都是active的;不会使线程进入阻塞状态,减少了不必要的上下文切换,执行速度快。 非自旋锁在获取不到锁的时候会进入阻塞状态,从而进入内核态,当获取到锁的时候需要从内核态恢复,需要线程上下文切换。(线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响锁的性能)。 自旋锁的使用在linux kernel的实现中,经常会遇到这样的场景:共享数据被中断上下文和进程上下文访问,该如何保护呢? 如果只有进程上下文的访问,那么可以考虑使用semaphore或者mutex的锁机制,但是现在中断上下文也掺和进来,那些可以导致睡眠的lock就不能使用了,这时候,可以考虑使用spin lock。 在中断上下文,是不允许睡眠的,所以,这里需要的是一个不会导致睡眠的锁——spinlock。 换言之,中断上下文要用锁,首选 spinlock。 使用自旋锁,有两种方式定义一个锁: 动态的: spinlock_t lock;) I1 N/ a" c5 U6 }8 T9 Z* Cspin_lock_init (&lock);9 E7 c, \$ h5 ? r, U* s 静态的: DEFINE_SPINLOCK(lock);使用步骤 spinlock的使用很简单:
使用实例 static spinlock_t lock;* |/ `1 N D9 I' b/ ~static int flage = 1; 0 Q. {7 l" I% B* x" c7 B. M spin_lock_init(&lock); 5 g4 \# ]0 |. Q& [5 M- l static int hello_open (struct inode *inode, struct file *filep)( k2 x/ _' \- k7 k { spin_lock(&lock); if(flage !=1) { spin_unlock(&lock); return -EBUSY; }# d" y4 |; L; j. T/ g1 F; A8 }! Q2 c3 T" \ flage =0;9 Z c4 R% d4 z spin_unlock(&lock);: G! F ^/ x/ G+ s8 Z1 y+ L, _ return 0;. Z/ i) q, L( c( G( z }4 D* G O& \ x1 }$ w% U% g) S8 M static int hello_release (struct inode *inode, struct file *filep)8 K1 F$ b. T2 k p3 X { flage = 1;8 P0 u9 U3 `: f- t5 M return 0; ]" c8 Q. h( @" g$ d }# k1 W" {6 I8 Q7 s# \ 补充 中断上下文不能睡眠的原因是:
但在中断处理程序里,CPU寄存器的值肯定已经变化了吧(最重要的程序计数器PC、堆栈SP等),如果此时因为睡眠或阻塞操作调用了schedule(),则保存的进程上下文就不是当前的进程context了.所以不可以在中断处理程序中调用schedule()。
BUG(); 因此,强行调用schedule()的结果就是内核BUG。
自旋锁不可递归,自己等待自己已经获取的锁,会导致死锁。 自旋锁可以在中断上下文中使用,但是试想一个场景:一个线程获取了一个锁,但是被中断处理程序打断,中断处理程序也获取了这个锁(但是之前已经被锁住了,无法获取到,只能自旋),中断无法退出,导致线程中后面释放锁的代码无法被执行,导致死锁。(如果确认中断中不会访问和线程中同一个锁,其实无所谓)。 一、考虑下面的场景(内核抢占场景): (1)进程A在某个系统调用过程中访问了共享资源 R (2)进程B在某个系统调用过程中也访问了共享资源 R 会不会造成冲突呢? 假设在A访问共享资源R的过程中发生了中断,中断唤醒了沉睡中的,优先级更高的B,在中断返回现场的时候,发生进程切换,B启动执行,并通过系统调用访问了R,如果没有锁保护,则会出现两个thread进入临界区,导致程序执行不正确。OK,我们加上spin lock看看如何:A在进入临界区之前获取了spin lock,同样的,在A访问共享资源R的过程中发生了中断,中断唤醒了沉睡中的,优先级更高的B,B在访问临界区之前仍然会试图获取spin lock,这时候由于A进程持有spin lock而导致B进程进入了永久的spin……怎么破?linux的kernel很简单,在A进程获取spin lock的时候,禁止本CPU上的抢占(上面的永久spin的场合仅仅在本CPU的进程抢占本CPU的当前进程这样的场景中发生)。如果A和B运行在不同的CPU上,那么情况会简单一些:A进程虽然持有spin lock而导致B进程进入spin状态,不过由于运行在不同的CPU上,A进程会持续执行并会很快释放spin lock,解除B进程的spin状态。 二、再考虑下面的场景(中断上下文场景):
在这样的场景下,使用spin lock可以保护访问共享资源R的临界区吗? 我们假设CPU0上的进程A持有spin lock进入临界区,这时候,外设P发生了中断事件,并且调度到了CPU1上执行,看起来没有什么问题,执行在CPU1上的handler会稍微等待一会CPU0上的进程A,等它立刻临界区就会释放spin lock的,但是,如果外设P的中断事件被调度到了CPU0上执行会怎么样?CPU0上的进程A在持有spin lock的状态下被中断上下文抢占,而抢占它的CPU0上的handler在进入临界区之前仍然会试图获取spin lock,悲剧发生了,CPU0上的P外设的中断handler永远的进入spin状态,这时候,CPU1上的进程B也不可避免在试图持有spin lock的时候失败而导致进入spin状态。为了解决这样的问题,linux kernel采用了这样的办法:如果涉及到中断上下文的访问,spin lock需要和禁止本 CPU 上的中断联合使用。 三、再考虑下面的场景(底半部场景) linux kernel中提供了丰富的bottom half的机制,虽然同属中断上下文,不过还是稍有不同。我们可以把上面的场景简单修改一下:外设P不是中断handler中访问共享资源R,而是在的bottom half中访问。使用spin lock+禁止本地中断当然是可以达到保护共享资源的效果,但是使用牛刀来杀鸡似乎有点小题大做,这时候disable bottom half就可以了。 四、中断上下文之间的竞争 同一种中断handler之间在uni core和multi core上都不会并行执行,这是linux kernel的特性。如果不同中断handler需要使用spin lock保护共享资源,对于新的内核(不区分fast handler和slow handler),所有handler都是关闭中断的,因此使用spin lock不需要关闭中断的配合。bottom half又分成softirq和tasklet,同一种softirq会在不同的CPU上并发执行,因此如果某个驱动中的softirq的handler中会访问某个全局变量,对该全局变量是需要使用spin lock保护的,不用配合disable CPU中断或者bottom half。tasklet更简单,因为同一种tasklet不会多个CPU上并发。 自旋锁的实现原理数据结构 首先定义一个 spinlock_t 的数据类型,其本质上是一个整数值(对该数值的操作需要保证原子性),该数值表示spin lock是否可用。初始化的时候被设定为1。当thread想要持有锁的时候调用spin_lock函数,该函数将spin lock那个整数值减去1,然后进行判断,如果等于0,表示可以获取spin lock,如果是负数,则说明其他thread的持有该锁,本thread需要spin。 内核中的spinlock_t的数据类型定义如下: typedef struct spinlock {: W. l+ s% O1 m# wstruct raw_spinlock rlock; } spinlock_t;+ u$ `, i- }# K; ~/ V5 B- C1 B* E typedef struct raw_spinlock {# {+ h. l) w4 ~( k; l arch_spinlock_t raw_lock; } raw_spinlock_t; 通用(适用于各种arch)的spin lock使用spinlock_t这样的type name,各种arch定义自己的struct raw_spinlock。听起来不错的主意和命名方式,直到linux realtime tree(PREEMPT_RT)提出对spinlock的挑战。 spin lock的命名规范定义如下:
同样的,这里也只是选择一个典型的API来分析,其他的大家可以自行学习。我们选择的是 arch_spin_lock,其ARM32的代码如下: static inline void arch_spin_lock(arch_spinlock_t *lock)& |' e+ ~+ M+ a- l9 t5 P{4 N( U9 Y }$ G) ?1 e6 p unsigned long tmp;. e4 I* m0 Y% {. w$ | u32 newval;0 q6 B/ u( [$ M3 y" J( W6 { * p/ s, q7 a: ?) V; n. j arch_spinlock_t lockval; prefetchw(&lock->slock);---------(0)5 _- v/ ^6 f" i) X __asm__ __volatile__( "1: ldrex %0, [%3]\n"---------(1) " add %1, %0, %4\n" ----------(2) " strex %2, %1,[%3]\n"---------(3) " teq %2, #0\n"-------------(4)4 ~" ?7 i* N4 r " bne 1b" : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)0 G. y+ c$ |% N( F : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)' n7 u- g. [4 _0 T4 ?9 f : "cc"); while (lockval.tickets.next != lockval.tickets.owner) {----(5)' s) }1 ~- R; @- \4 R( ? wfe();------------(6) lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);----(7). _( ^1 m7 ~8 J2 L2 U3 C0 t/ Y+ e } ' ~& G9 Q$ X! h$ x8 G smp_mb();-----------(8)5 O3 Q* ?, y$ x } (0)和preloading cache相关的操作,主要是为了性能考虑6 _% M: c( ]( o0 r3 L (1)lockval = lock->slock (如果lock->slock没有被其他处理器独占,则标记当前执行处理器对lock->slock地址的独占访问;否则不影响) (2)newval = lockval + (1 << TICKET_SHIFT)- L9 M2 K$ A4 ^: I- f0 i, H& c (3)strex tmp, newval, [&lock->slock] (如果当前执行处理器没有独占lock->slock地址的访问,不进行存储,返回1给temp;如果当前处理器已经独占lock->slock内存访问,则对内存进行写,返回0给temp,清除独占标记) lock->tickets.next = lock->tickets.next + 1 (4)检查是否写入成功 lockval.tickets.next1 _. G- q" Q; r) L, n& o: N (5)初始化时lock->tickets.owner、lock->tickets.next都为0,假设第一次执行arch_spin_lock,lockval = *lock,lock->tickets.next++,lockval.tickets.next 等于 lockval.tickets.owner,获取到自旋锁;自旋锁未释放,第二次执行的时候,lock->tickets.owner = 0, lock->tickets.next = 1,拷贝到lockval后,lockval.tickets.next != lockval.tickets.owner,会执行wfe等待被自旋锁释放被唤醒,自旋锁释放时会执行 lock->tickets.owner++,lockval.tickets.owner重新赋值 (6)暂时中断挂起执行。如果当前spin lock的状态是locked,那么调用wfe进入等待状态。更具体的细节请参考ARM WFI和WFE指令中的描述。3 G+ E( Z) _: I! k) d (7)其他的CPU唤醒了本cpu的执行,说明owner发生了变化,该新的own赋给lockval,然后继续判断spin lock的状态,也就是回到step 5。5 {- u4 C% x! T9 c7 f+ I, _; d (8)memory barrier的操作,具体可以参考memory barrier中的描述。 释放锁static inline void arch_spin_unlock(arch_spinlock_t *lock) { smp_mb(); % I) J8 N$ z+ D7 r+ N' {- A lock->tickets.owner++; ---------------------- (0)+ N' f6 r7 u7 l dsb_sev(); ---------------------------------- (1)& g3 J% J5 f/ c% U: I }; G3 J$ F. \( I, W& L- P7 _, S0 h
1)拥有自旋锁的进程A在内核态阻塞了,内核调度B进程,碰巧B进程也要获得自旋锁,此时B只能自旋转。而此时抢占已经关闭,不会调度A进程了,B永远自旋,产生死锁。 2)进程A拥有自旋锁,中断到来,CPU执行中断函数,中断处理函数,中断处理函数需要获得自旋锁,访问共享资源,此时无法获得锁,只能自旋,产生死锁。 如何避免死锁
因为自旋锁持有时间非常短,没有直观的现象,下面举一个会导致死锁的实例。 运行条件
针对单CPU,拥有自旋锁的任务不应该调度会引起休眠的函数,否则会导致死锁。 步骤:
驱动代码如下: #include <linux/init.h>#include <linux/module.h> #include <linux/kdev_t.h>8 M/ v9 M1 K8 R- e& ? #include <linux/fs.h>; D( i8 v+ W$ S* c0 q( o, A #include <linux/cdev.h> #include <linux/device.h> #include <linux/spinlock.h>6 S9 j2 j# d/ g9 R5 N7 p# {- ~* V8 Q static int major = 250; static int minor = 0;4 G& u4 m: H, ?5 W static dev_t devno; static struct cdev cdev; static struct class *cls;2 j& S1 f0 y/ z3 o( K static struct device *test_device;4 ]# k; X9 H' N- F1 K2 K5 ]' p1 E) }6 A static spinlock_t lock;7 i8 Z/ o' o% W static int flage = 1; ( e, H" R& P! s9 s* d #define DEAD 1 static int hello_open (struct inode *inode, struct file *filep)8 v* ?7 Z9 @5 p- ^. S {' |6 l5 o% K- d) v2 h- \) S( ~ spin_lock(&lock);: w. D. j9 h; A" y" g& o! Q if(flage !=1)+ Z( M% `, s# m) [$ i0 s1 C" P { spin_unlock(&lock);: S6 t! {. o8 Z; t- n& g return -EBUSY; }' ~" d3 R1 A( W- B( ~. t flage =0;: K1 e% q! y) v #if DEAD #elif spin_unlock(&lock); #endif" `, s7 ^+ r- n- ?& [ return 0;. `8 Z7 O4 ~% r {9 o } N( u! K$ x# f; m: I$ y static int hello_release (struct inode *inode, struct file *filep) {2 G6 |$ z' C; u! V flage = 1; #if DEAD spin_unlock(&lock); #endif7 _2 ?0 ^5 Z7 U6 M9 X; [ return 0;# b" G! e. L" N6 Z; F% m+ q$ [ } static struct file_operations hello_ops =& d' j; ]0 n/ G* H {3 {9 I$ w* |+ u, X! M& _+ [ .open = hello_open,* r/ f3 y7 \+ l2 Z" \/ \8 B .release = hello_release, }; static int hello_init(void)2 E: r; d7 c. W! c) y { int result; int error; 3 q( C' X- W& T/ j printk("hello_init \n");# z$ a5 b8 x" t, C" P6 P3 I% [, }3 D0 G result = register_chrdev( major, "hello", &hello_ops);+ W) R( a' Z1 _8 ^( i; z if(result < 0) {" Z9 ]5 O x4 Y) U9 J2 J- C printk("register_chrdev fail \n"); return result; }/ w4 y# f/ L+ `3 K3 K. W devno = MKDEV(major,minor); cls = class_create(THIS_MODULE,"helloclass");0 n v' V; n' a' x; ~" T% x5 b7 J+ l if(IS_ERR(cls)) {$ e( Z2 x Y L F& V1 v3 ~6 n2 J unregister_chrdev(major,"hello");8 Z) V! L% S4 F e( F return result; }8 g, |7 e* F% B; R F |; H" [ test_device = device_create(cls,NULL,devno,NULL,"test");* Z) J" v8 z1 }2 s$ \, s if(IS_ERR(test_device )) {: F8 p) Z3 M: e3 p" l$ c/ S' ` class_destroy(cls); unregister_chrdev(major,"hello");% f i# C4 W* N% o b return result;( V i4 y, l3 J% Z9 h5 U }/ \ I2 d' q- g; a0 d& U* ~ spin_lock_init(&lock);) B- m4 O7 e3 H: n0 I2 V5 B return 0;+ v/ W( t. s! g& d; F+ |8 G* G }# Q! g) B* u" L4 r. h o static void hello_exit(void) _3 Q1 U: ?/ ]( \/ M* ] {5 j0 P2 {# Z; P printk("hello_exit \n"); device_destroy(cls,devno); class_destroy(cls);, V- n# p; }( X$ y! G unregister_chrdev(major,"hello");- r5 l; H- q, i* b) \- y return;5 ^. V( W! G; |$ ?+ E6 d$ X } module_init(hello_init);6 l7 G$ K) G0 F! ? module_exit(hello_exit); MODULE_LICENSE("GPL"); 测试程序如下: #include <stdio.h>#include <sys/types.h>( _3 g* R; W6 s; K #include <sys/stat.h>: z0 F4 @+ r3 v! @, y/ ] #include <fcntl.h>- e1 D3 m( B+ W! d5 K/ N main()2 W/ M" U/ B* X4 Z8 G% E {5 ]: ~3 ?. J4 A. }. O int fd; fd = open("/dev/test",O_RDWR); if(fd<0) { perror("open fail \n"); return;+ V* V. F+ A2 t' w4 U2 m$ D: g }7 n; C' z% I" E) H, [/ I sleep(20);0 D( P* N1 q( R4 E' R; @# n close(fd); printf("open ok \n ");. d4 [# [# M7 m0 T/ a p } 测试步骤:
insmod hello.ko) ^ ~/ ~4 x* q: q
./a , R3 x8 g+ F9 m1 l! x D
./b, o, `. `' d+ U2 V7 J 注意,一定要在进程A没有退出的时候运行进程B。 |