pthread_mutex_timedlock,pthread_mutex_trylock

  pthread_mutex_timedlock,pthread_mutex_trylock

  锁是多线程编程中最常用的同步机制,用于保护多线程共享的临界区。

  Pthreads提供了多种锁定机制,常见的有:

  1)互斥体:pthread_mutex_***

  2)自旋锁:pthread_spin_***

  3)条件变量:pthread_con_***

  4)读/写锁:pthread_rwlock_***

  在多线程编程中,根据不同的应用场合,选择合适的锁进行同步对多线程程序的性能有很大的影响。本文主要比较了pthread_mutex和pthread_spinlock两种锁机,并讨论了它们的适用场合。

  1个Pthread互斥体

  互斥体是一个睡眠等待锁。自2.6.x系列稳定内核以来,Linux的互斥体是futex (Fast-Usermode-muTEX)锁。

  Futex(快速用户区互斥的简称)是在Linux上实现锁和构建信号量、POSIX互斥等高级抽象锁的基础工具。它们最早出现在内核开发的2.5.7版本中;它的语义在2.5.40中被修复,之后出现在2.6.x系列稳定内核中。

  Futex由Hubertus Franke (IBM托马斯J沃森研究中心)、马修柯克伍德丹尼尔、Ingo Molnar(红帽)和Rusty Russell(IBM Linux技术中心)创建。

  Futex由用户空间中的对齐整数变量和附属于它的内核空间中的等待队列组成。多数情况下,多进程或多线程在用户空间操作Futex的整数变量(汇编语言调用CPU提供的原子操作指令进行增减),而在其他情况下,则需要通过代价高昂的系统调用在内核空间操作等待队列(如唤醒等待进程/线程或将当前进程/线程放入等待队列)。除了少数情况下多个线程同时竞争锁,基于futex的锁操作不需要昂贵的系统调用。

  该机制的核心思想是在大多数情况下将锁竞争操作放在用户空间,而不是昂贵的内核系统调用,从而提高效率。

  Pthreads提供的与互斥锁操作相关的API主要包括:

  1、pthread _ mutex _ lock(pthread _ mutex _ t * mutex);

  2、pthread _ mutex _ try lock(pthread _ mutex _ t * mutex);

  3、pthread _ mutex _ unlock(pthread _ mutex _ t *互斥);

  因为源代码比较长,这里就不摘录了。你可以参考:

  glibc-2 . 12 . 2/nptl/pthread _ mutex _ lock . c

  2个线程旋转锁

  自旋锁,也称为自旋锁,是一种忙等待锁。在多处理器环境中,自旋锁最多只能被一个可执行线程持有。如果一个可执行线程试图获取一个竞争的(已经持有的)旋转锁,那么该线程将一直忙于等待,旋转,也就是说,空闲,等待锁再次可用。如果锁没有被竞争,请求锁的线程将立即得到它并继续执行。

  争用的旋转锁使请求它的线程在等待锁再次可用时旋转,这浪费了CPU时间,因此旋转锁不应该持有很长时间。其实这才是自旋锁的设计初衷,可以在短时间内轻锁。

  内核中的自旋锁不能在可能导致睡眠的环境中使用。例如,线程A获得自旋锁L;此时,中断发生。在相应的中断处理程序函数B中,试图获得自旋锁L,这将中断处理程序进行自旋。但是,原来的锁持有者只是在中断处理程序结束后才趁机释放自旋锁,这就导致了死锁。

  因为涉及到多个处理器,所以自旋锁的效率非常重要。因为等待自旋锁,处理器只是不停的检查循环,不执行其他指令。即便如此,一般来说,spinlock的开销比上下文切换的开销要小得多。这就是为什么自旋锁在多处理器环境中被广泛使用的原因。

  Pthreads提供的与自旋锁操作相关的API主要包括:

  pthread _ spin _ lock(pthread _ spin lock _ t * lock);

  pthread _ spin _ try lock(pthread _ spin lock _ t * lock);

  pthread _ spin _ unlock(pthread _ spin lock _ t * lock);

  让我们来看看pthread中spinlock的实现:

  在每个情况中,从线程数依次从一个线程增加到15个线程,

  并重复执行10 次以保证测试结果不受意外情况的影响。

  3.1案例1:

  #包含标准视频

  #include pthread.h

  #包含stdint.h

  #包括unistd.h

  #include sys/syscall.h

  #包含错误号h

  #包含系统/时间。h

  #包含sched.h

  #包含linux/unistd.h

  #包含列表

  #包含"时间助手. h "

  #定义最大数组数量10000000

  使用命名空间标准

  工会联盟32

  uint32 _ t _ member

  char _ align[64];//用于多核的假共享

  结构统计

  uint32 _ t _ times

  uint32 _ t _ id

  联合联盟

  统计项目_项目

  char _ align[64];//用于多核的假共享

  align int 32g _ Array[10000000];

  volatile uint 32 _ t g _ Index=0;

  #ifdef USE_SPINLOCK

  pthread _ spinlock _ t spinlock

  #否则

  线程互斥锁互斥体;

  #endif

  PID _ t gettid(){ return syscall(_ _ NR _ gettid);}

  无效*消费者(无效*参数)

  align stat * pItem=(align stat *)arg;

  while (1)

  #ifdef USE_SPINLOCK

  pthread _ spin _ lock(自旋锁);

  #否则

  pthread_mutex_lock(互斥);

  #endif

  if (g_Index=MAX_ARRAY_NUM)

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  打破;

  (pItem- _item ._ times);

  数组[索引]._ member=g _ Index

  g指数

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  返回空

  int main(int argc,char *argv[])

  uint64 _ t t1

  uint 64 _ t nTimeSum=0;

  uint 32 _ t nThreadNum=0;

  #ifdef USE_SPINLOCK

  pthread_spin_init( spinlock,0);

  fprintf(stderr, case for spin lock:);

  #否则

  pthread_mutex_init( mutex,NULL);

  fprintf(stderr,互斥的案例:“);

  #endif

  int 32 _ t nCpuNum=(int)sysconf(_ SC _ n processors _ ONLN)* 2;

  fprintf(stderr, cpu_num=%dn ,nCpuNum/2);

  for(int 32 _ t j=1;nCpuNumj)

  nTimeSum=0;

  nThreadNum=j;

  align stat * pStatArray=new align stat[nThreadNum];

  memset(pStatArray,0x0,nThreadNum * sizeof(align stat));

  for(uint 32 _ t n loop=10;nLoop nLoop -)

  g _ Index=0;

  pthread _ t * pThreadArray=new pthread _ t[nThreadNum];

  //启动线程前测量时间.

  t1=时间助手:现在时间();

  for(uint 32 _ t I=0;我的线程数我)

  pStatArray[i]._item ._ id=I;

  if(pthread _ create(pThreadArray[I],NULL,consumer,(void *)( pStatArray[i]))

  perror( error:pthread _ create );

  nThreadNum=I;

  打破;

  for(uint 32 _ t I=0;我的线程数我)

  pthread_join(pThreadArray[i],NULL);

  //线程完成后测量时间.

  T2=时间助手:现在时间();

  nTimeSum=T2-t1;

  delete[]pThreadArray;

  fprintf(stderr, RepeatTimes=%d,ThreadNum=%d,UsedTime=%.6lf sn ,10,nThreadNum,(double(nTimeSum))/1000000);

  for(uint 32 _ t I=0;我的线程数我)

  fprintf(stderr, thread_id=%ut times=%un ,pStatArray[i]._item ._id,pStatArray[i]._item ._ times);

  删除[]pStatArray;

  #ifdef USE_SPINLOCK

  pthread _ spin _ destroy(自旋锁);

  #否则

  pthread_mutex_destroy(互斥);

  #endif

  返回0;

  3.2案例2

  无效*消费者(无效*参数)

  align stat * pItem=(align stat *)arg;

  while (1)

  #ifdef USE_SPINLOCK

  pthread _ spin _ lock(自旋锁);

  #否则

  pthread_mutex_lock(互斥);

  #endif

  if (g_Index=MAX_ARRAY_NUM)

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  打破;

  (pItem- _item ._ times);

  数组[索引]._ member=g _ Index

  g指数

  //添加临界段的长度

  列表uint32 _ t tmpList

  for(uint 32 _ t I=0;我我)

  tmplist。push _ back(一);

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  返回空

  3.3案例3

  无效*消费者(无效*参数)

  align stat * pItem=(align stat *)arg;

  while (1)

  #ifdef USE_SPINLOCK

  pthread _ spin _ lock(自旋锁);

  #否则

  pthread_mutex_lock(互斥);

  #endif

  if (g_Index=MAX_ARRAY_NUM)

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  打破;

  (pItem- _item ._ times);

  数组[索引]._ member=g _ Index

  g指数

  //添加临界段的长度

  列表uint32 _ t tmpList

  for(uint 32 _ t I=0;我我)

  tmplist。push _ back(一);

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  返回空

  3.4案例四

  无效*消费者(无效*参数)

  align stat * pItem=(align stat *)arg;

  while (1)

  #ifdef USE_SPINLOCK

  pthread _ spin _ lock(自旋锁);

  #否则

  pthread_mutex_lock(互斥);

  #endif

  if (g_Index=MAX_ARRAY_NUM)

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  打破;

  (pItem- _item ._ times);

  数组[索引]._ member=g _ Index

  g指数

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  //做同样的工作

  //align int 32 tmp array[10000000];

  //uint 32 _ t tmp数组[10000000];

  列表uint32 _ t tmpList

  for(uint 32 _ t I=0;我我)

  tmplist。push _ back(一);

  //tmpArray[i]._ member=I;

  //tmp array[I]=I;

  返回空

  四不同线程数下旋转锁分析

  以下数据是用英特尔编译器采集得到

  4.1 源代码

  #包含标准视频

  #include pthread.h

  #包含stdint.h

  #包括unistd.h

  #include sys/syscall.h

  #包含错误号h

  #包含系统/时间。h

  #包含sched.h

  #包含linux/unistd.h

  #包含列表

  #包含"时间助手. h "

  #定义最大数组数量10000000

  使用命名空间标准

  结构统计

  uint32 _ t _ times

  uint32 _ t _ id

  联合联盟

  统计项目_项目

  char _ align[64];//用于多核的假共享

  volatile uint 32 _ t g _ Index=0;

  #ifdef USE_SPINLOCK

  pthread _ spinlock _ t spinlock

  #否则

  线程互斥锁互斥体;

  #endif

  PID _ t gettid(){ return syscall(_ _ NR _ gettid);}

  无效*消费者(无效*参数)

  align stat * pItem=(align stat *)arg;

  while (1)

  #ifdef USE_SPINLOCK

  pthread _ spin _ lock(自旋锁);

  #否则

  pthread_mutex_lock(互斥);

  #endif

  if (g_Index=MAX_ARRAY_NUM)

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  打破;

  (pItem- _item ._ times);

  g指数

  #ifdef USE_SPINLOCK

  pthread _ spin _ unlock(自旋锁);

  #否则

  pthread _ mutex _解锁(互斥);

  #endif

  返回空

  int main(int argc,char *argv[])

  if(argc 2)

  fprintf(stderr, %s thread_numn ,argv[0]);

  退出(-1);

  uint64 _ t t1

  uint 64 _ t nTimeSum=0;

  uint 32 _ t nThreadNum=(uint 32 _ t)atoi(argv[1]);

  #ifdef USE_SPINLOCK

  pthread_spin_init( spinlock,0);

  fprintf(stderr,自旋锁的情况: n );

  #否则

  pthread_mutex_init( mutex,NULL);

  fprintf(stderr,互斥体的大小写: n’);

  #endif

  nTimeSum=0;

  align stat * pStatArray=new align stat[nThreadNum];

  memset(pStatArray,0x0,nThreadNum * sizeof(align stat));

  for(uint 32 _ t n loop=10;nLoop nLoop -)

  g _ Index=0;

  pthread _ t * pThreadArray=new pthread _ t[nThreadNum];

  //启动线程前测量时间.

  t1=时间助手:现在时间();

  for(uint 32 _ t I=0;我的线程数我)

  pStatArray[i]._item ._ id=I;

  if(pthread _ create(pThreadArray[I],NULL,consumer,(void *)( pStatArray[i]))

  perror( error:pthread _ create );

  nThreadNum=I;

  打破;

  for(uint 32 _ t I=0;我的线程数我)

  pthread_join(pThreadArray[i],NULL);

  //线程完成后测量时间.

  T2=时间助手:现在时间();

  nTimeSum=T2-t1;

  delete[]pThreadArray;

  fprintf(stderr, RepeatTimes=%d,ThreadNum=%d,UsedTime=%.6lf sn ,10,nThreadNum,(double(nTimeSum))/1000000);

  for(uint 32 _ t I=0;我的线程数我)

  fprintf(stderr, thread_id=%ut times=%un ,pStatArray[i]._item ._id,pStatArray[i]._item ._ times);

  删除[]pStatArray;

  #ifdef USE_SPINLOCK

  pthread _ spin _ destroy(自旋锁);

  #否则

  pthread_mutex_destroy(互斥);

  #endif

  返回0;

  编译:

  g-g-O2-Wall-I ./-DUSE _自旋锁-LP线程t _自旋锁_线程。CPP-o测试线程

  g-g-O2-Wall-I ./-LP螺纹t _ spin lock _ thread。互斥体线程

  4.2旋转锁在不同线程数下锁总线统计

  4 .2 .1t _自旋_线程_1

  4 .2 .2t _自旋_线程_2

  4 .2 .3t _自旋_线程_3

  t _自旋_线程_4

  4 .2 .5t _自旋_线程_5

  4 .2 .6t _自旋_线程_6

  4 .2 .7t _自旋_线程_7

  4 .2 .8t _自旋_线程_8

  5测试结果分析

  1) 现象1: 在性能对比测试案例1、案例2、案例3中,旋转锁版本程序的运行时间基本上是随线程数的增加而递增的?

  在对锁竞争激烈案例1、案例2、案例3情况下,因为除了临界区,线程不用执行其他任务,任务实际是串行执行的。

  Spinlock需要在循环试验期间锁定总线。随着线程的增加,每个线程获得锁的概率会更小,等待轮循试验的概率会更大,锁总线的操作会更频繁(参见4.2Spinlock对不同线程数下锁总线的统计),导致关键区域任务的执行时间更长。而且当线程数超过cpu核数时,可能会出现线程切换。

  2)现象2:在情况1、情况2和情况3的性能对比测试中,互斥版本程序的运行时间先随着线程数从1增加到3而增加,然后随着线程数的进一步增加而减少,直到达到8个线程后趋于稳定。

  在这种现象的前半段,随着线程数从1增加到3,执行时间的增加是可以理解的,因为在案例1、案例2、案例3中,由于线程除了临界区之外都不用执行其他任务,此时任务实际上是串行执行的,所以1线程的执行时间是最少的,因为没有其他线程争用锁。Futex可以在用户状态下获取锁。当线程数从一个逐渐增加到三个时,在用户态获取锁的概率更低,需要进入比在用户态获取锁开销大得多的内核系统调用,所以执行时间也相应增加。

  在这个现象的后半段,随着线程数从4个增加到15个,执行时间开始逐渐减少,逐渐稳定在8个线程。这很难理解。

  我的猜测是因为临界段比较短,线程得到锁后很快就会释放锁,所以在3-4个线程中,线程因为得不到锁而进行系统调用进入等待代码。

  我是这样理解的。消费者线索可以分为3个部分:

  ?Pthread_mutex_lock为Task1,执行时间为T1,可以细分为两种类型:

  ?在用户状态下直接获得锁的时间T11

  ?锁T12是通过在等待系统调用的锁之后唤醒而获得的。

  ?关键区域是Task2,执行时间T2,

  ?Pthread_mutex_unlock为Task3,执行时间为T3。

  ?在用户状态下锁定被直接释放的时间T31

  ?通过系统调用释放锁T32

  临界段的代码是串行执行的,但是pthread_mutex_lock和pthread_mutex_unlock是每个线程并发执行的。因为临界截面很短,T12和T32比T2大得多。所以在并发度不够高的情况下,Task1和Task3的重叠部分比较小。因此,T12和T32占总执行时间的比例很大。随着线程数量的增加,Task1和Task3的重叠部分相对增加,其占总执行时间的比例开始下降。当线程数达到cpu核心时,基本稳定。

  同时,T12和T32与T2相比越大,下降越明显。这可以解释临界区长度从情形1增加到情形3后,总执行时间的减少过程也相应变慢。

  3)现象3:在案例1、案例2和案例3的性能对比测试中,当线程数相对较少时(案例1为5个线程,案例3为8个线程),自旋锁版本的执行时间比互斥锁版本的执行时间短,但之后则相反。

  这是我对这种现象的理解。在锁竞争激烈的情况1、情况2、情况3下,由于线程除了临界段之外不用执行其他任务,任务实际上是串行执行的。此时,在线程数量较少的情况下,spinlock的性能优于mutex。随着线程数量的增加,spinlock的性能比mutex差,因为随着线程数量的增加,spinlock循环等待的代价逐渐大于mutex对上下文切换的系统调用。

  4)现象4:在案例1、案例2和案例3的性能对比测试中,从案例1到案例3,临界区逐渐变长,spinlock的性能优于mutex,从案例1的5个线程逐渐增加到案例3的8个线程。

  在自旋锁机制的情况下,随着临界区的长度,自旋锁循环等待过程中的总线锁数量随着临界区代码的长度相应增加,所以case1~case3的执行时间基本上是成比例增加的。

  正如现象所解释的

  5)现象五:性能测试中,除了临界段代码,还有其他代码要执行。当线程数从1到11时,自旋锁版本的执行时间优于互斥版本,尤其是当线程数为6时,自旋锁版本的执行时间仅为互斥版本的26%左右。

  这种情况主要模拟锁竞争不激烈,临界区较短的情况;在这种情况下,当线程数从1个增加到6个左右时,spinlock基本上需要在循环中等待的概率很小,每个线程分担临界段的任务,所以在6个左右线程时spinlock版本的执行时间最短。

  6)现象六:在测试过程中,使用spinlock的cpu时间会远远高于互斥量,因为spinlock是循环空等待。

  6.1测试数据

  6.2参考:

  http://en.wikipedia.org/wiki/Futex

  http://www.alexonlinux.com/pthread-mutex-vs-pthread-spinlock

pthread_mutex_timedlock,pthread_mutex_trylock