高速收费站收费员:中断服务下半部之工作队列详解

来源:百度文库 编辑:中财网 时间:2024/05/09 00:13:06
1       工作队列概述

工作队列(work queue)是另外一种将工作推后执行的形式,它和我们前面讨论的所有其他形式都不相同。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。



通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则:

2      如果推后执行的任务需要睡眠,那么只能选择工作队列;

2      如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时;

2      如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程;

2      如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。



另外如果你需要用一个可以重新调度的实体来执行你的下半部处理,你应该使用工作队列。它是惟一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在你需要获得大量的内存时、在你需要获取信号量时,在你需要执行阻塞式的I/O操作时,它都会非常有用。



实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。
2       工作队列的实现
2.1    工作者线程

工作队列子系统是一个用于创建内核线程的接口,通过它创建的进程负责执行由内核其他部分排到队列里的任务。它创建的这些内核线程被称作工作者线程(worker thread)。工作队列可以让你的驱动程序创建一个专门的工作者线程来处理需要推后的工作。不过,工作队列子系统提供了一个默认的工作者线程来处理这些工作。因此,工作队列最基本的表现形式就转变成了一个把需要推后执行的任务交给特定的通用线程这样一种接口。



默认的工作者线程叫做events/n,这里n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。



默认的工作者线程会从多个地方得到被推后的工作。许多内核驱动程序都把它们的下半部交给默认的工作者线程去做。除非一个驱动程序或者子系统必须建立一个属于它自己的内核线程,否则最好使用默认线程。不过并不存在什么东西能够阻止代码创建属于自己的工作者线程。如果你需要在工作者线程中执行大量的处理操作,这样做或许会带来好处。处理器密集型和性能要求严格的任务会因为拥有自己的工作者线程而获得好处。


2.2    工作队列的组织结构
2.2.1      工作队列workqueue_struct

外部可见的工作队列抽象,用户接口,是由每个CPU的工作队列组成的链表

64struct workqueue_struct {

65        struct cpu_workqueue_struct *cpu_wq;

66        const char *name;

67        struct list_head list;  /* Empty if single thread */

68};

2      cpu_wq:本队列包含的工作者线程;

2      name:所有本队列包含的线程的公共名称部分,创建工作队列时的唯一用户标识;

2      list:链接本队列的各个工作线程。



在早期的版本中,cpu_wq是用数组维护的,即对每个工作队列,每个CPU包含一个此线程。改成链表的优势在于,创建工作队列的时候可以指定只创建一个内核线程,这样消耗的资源较少。



在该结构体里面,给每个线程分配一个cpu_workqueue_struct,因而也就是给每个处理器分配一个,因为每个处理器都有一个该类型的工作者线程。


2.2.2      工作者线程cpu_workqueue_struct

这个结构是针对每个CPU的,属于内核维护的结构,用户不可见。

43struct cpu_workqueue_struct {

44

45        spinlock_t lock;

46

47        long remove_sequence;   /* Least-recently added (next to run) */

48        long insert_sequence;   /* Next to add */

49

50        struct list_head worklist;

51        wait_queue_head_t more_work;

52        wait_queue_head_t work_done;

53

54        struct workqueue_struct *wq;

55        struct task_struct *thread;

56

57        int run_depth;          /* Detect run_workqueue() recursion depth */

58} ____cacheline_aligned;

2      lock:操作该数据结构的互斥锁

2      remove_sequence:下一个要执行的工作序号,用于flush

2      insert_sequence:下一个要插入工作的序号

2      worklist:待处理的工作的链表头

2      more_work:标识有工作待处理的等待队列,插入新工作后唤醒对应的内核线程

2      work_done:处理完的等待队列,没完成一个工作后,唤醒可能等待通知处理完成通知的线程

2      wq:所属的工作队列节点

2      thread:关联的内核线程指针

2      run_depth:run_workqueue()循环深度,多处可能调用此函数



所有的工作者线程都是用普通的内核线程实现的,它们都要执行worker thread()函数。在它初始化完以后,这个函数执行一个死循环并开始休眠。当有操作被插入到队列里的时候,线程就会被唤醒,以便执行这些操作。当没有剩余的操作时,它又会继续休眠。


2.2.3      工作work_struct

工作用work_struct结构体表示:

linux+v2.6.19/include/linux/workqueue.h

14struct work_struct {

15        unsigned long pending;

16        struct list_head entry;

17        void (*func)(void *);

18        void *data;

19        void *wq_data;

20        struct timer_list timer;

21};

2      Pending:这个工作是否正在等待处理标志,加入到工作队列后置此标志

2      Entry:该工作在链表中的入口点,连接所有工作

2      Func:该工作执行的回调函数

2      Data:传递给处理函数的参数

2      wq_data:本工作所挂接的cpu_workqueue_struct;若需要使用定时器,则其为工作队列传递给timer

2      timer:延迟的工作队列所用到的定时器,无需延迟是初始化为NULL


2.2.4      三者的关系


位于最高一层的是工作队列。系统允许有多种类型的工作队列存在。每一个工作队列具备一个workqueue_struct,而SMP机器上每个CPU都具备一个该类的工作者线程cpu_workqueue_struct,系统通过CPU号和workqueue_struct 的链表指针及第一个成员cpu_wq可以得到每个CPU的cpu_workqueue_struct结构。

而每个工作提交时,将链接在当前CPU的cpu_workqueue_struct结构的worklist链表中。通常情况下由当前所注册的CPU执行此工作,但在flush_work中可能由其他CPU来执行。或者CPU热插拔后也将进行工作的转移。



内核中有些部分可以根据需要来创建工作队列。而在默认情况下内核只有events这一种类型的工作队列。大部分驱动程序都使用的是现存的默认工作者线程。它们使用起来简单、方便。可是,在有些要求更严格的情况下,驱动程序需要自己的工作者线程。


2.3    工作队列执行的细节

工作结构体被连接成链表,对于某个工作队列,在每个处理器上都存在这样一个链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。



此为工作者线程的标准模板,所以工作者线程都使用此函数。对于用户自定义的内核线程可以参考此函数。



233static int worker_thread(void *__cwq)

234{

235        struct cpu_workqueue_struct *cwq = __cwq;

// 与该工作者线程关联的cpu_workqueue_struct结构

236        DECLARE_WAITQUEUE(wait, current);

// 声明一个等待节点,若无工作,则睡眠

237        struct k_sigaction sa;

238        sigset_t blocked;

239

240        current->flags |= PF_NOFREEZE;

241

242        set_user_nice(current, -5);

// 设定较低的进程优先级, 工作进程不是个很紧急的进程,不和其他进程抢占CPU,通常在系统空闲时运行

244        /* 禁止并清除所有信号 */

245        sigfillset(&blocked);

246        sigprocmask(SIG_BLOCK, &blocked, NULL);

247        flush_signals(current);

248

255        /* SIG_IGN makes children autoreap: see do_notify_parent(). */

// 允许SIGCHLD信号,并设置处理函数

256        sa.sa.sa_handler = SIG_IGN;

257        sa.sa.sa_flags = 0;

258        siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));

259        do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

260

261        set_current_state(TASK_INTERRUPTIBLE);

// 可被信号中断,适当的时刻可被杀死,若收到停止命令则退出返回,否则进程就一直运行,无工作可执行时,主动休眠

262        while (!kthread_should_stop()) {

// 为了便于remove_wait_queue的统一处理,将当前内核线程添加到cpu_workqueue_struct的more_work等待队列中,当有新work结构链入队列中时会激活此等待队列

263                add_wait_queue(&cwq->more_work, &wait);



// 判断是否有工作需要作,无则调度让出CPU等待唤醒

264                if (list_empty(&cwq->worklist))

265                        schedule();

266                else

267                        __set_current_state(TASK_RUNNING);

268                remove_wait_queue(&cwq->more_work, &wait);

// 至此,线程肯定处于TASK_RUNNING,从等待队列中移出



//需要再次判断是因为可能从schedule中被唤醒的。如果有工作做,则执行

270                if (!list_empty(&cwq->worklist))

271                        run_workqueue(cwq);



// 无工作或者全部执行完毕了,循环整个过程,接着一般会休眠

272                set_current_state(TASK_INTERRUPTIBLE);

273        }

274        __set_current_state(TASK_RUNNING);

275        return 0;

276}



该函数在死循环中完成了以下功能:

2      线程将自己设置为休眠状态TASK_INTERRUPTIBLE并把自己加人到等待队列上。

2      如果工作链表是空的,线程调用schedule()函数进入睡眠状态。

2      如果链表中有对象,线程不会睡眠。相反,它将自己设置成TASK_RUNNING,脱离等待队列。

2      如果链表非空,调用run_workqueue函数执行被推后的工作。



run_workqueue执行具体的工作,多处会调用此函数。在调用Flush_work时为防止死锁,主动调用run_workqueue,此时可能导致多层次递归。

196static void run_workqueue(struct cpu_workqueue_struct *cwq)

197{

198        unsigned long flags;

199

204        spin_lock_irqsave(&cwq->lock, flags);

// 统计已经递归调用了多少次了

205        cwq->run_depth++;

206        if (cwq->run_depth > 3) {

207                /* morton gets to eat his hat */

208                printk("%s: recursion depth exceeded: %d\n",

209                        __FUNCTION__, cwq->run_depth);

210                dump_stack();

211        }

212        while (!list_empty(&cwq->worklist)) {

213                struct work_struct *work = list_entry(cwq->worklist.next,

214                                                struct work_struct, entry);

215                void (*f) (void *) = work->func;

216                void *data = work->data;

217                        //将当前节点从链表中删除并初始化其entry

218                list_del_init(cwq->worklist.next);

219                spin_unlock_irqrestore(&cwq->lock, flags);

220

221                BUG_ON(work->wq_data != cwq);

222                clear_bit(0, &work->pending); //清除pengding位,标示已经执行

223                f(data);

224

225                spin_lock_irqsave(&cwq->lock, flags);

226                cwq->remove_sequence++;

// // 唤醒可能等待的进程,通知其工作已经执行完毕

227                wake_up(&cwq->work_done);

228        }

229        cwq->run_depth--;

230        spin_unlock_irqrestore(&cwq->lock, flags);

231}


3       工作队列的API
3.1    API列表



功能描述


对应API函数


附注

静态定义一个工作


DECLARE_WORK(n, f, d)




动态创建一个工作


INIT_WORK(_work, _func, _data)




工作原型


void work_handler(void *data)




将工作添加到指定的工作队列中


queue_work(struct workqueue_struct *wq, struct work_struct *work)




将工作添加到keventd_wq队列中


schedule_work(struct work_struct *work)




延迟delay个tick后将工作添加到指定的工作队列中


queue_delayed_work(struct workqueue_struct *wq,

struct work_struct *work, unsigned long delay)




延迟delay个tick后将工作添加到keventd_wq队列中


schedule_delayed_work(struct work_struct *work, unsigned long delay)





刷新等待指定队列中的所有工作完成


flush_workqueue(struct workqueue_struct *wq)




刷新等待keventd_wq中的所有工作完成


flush_scheduled_work(void)




取消指定队列中所有延迟工作


cancel_delayed_work(struct work_struct *work)




创建一个工作队列


create_workqueue(name)




创建一个单线程的工作队列


create_singlethread_workqueue(name)




销毁指定的工作队列


destroy_workqueue(struct workqueue_struct *wq)





3.2    如何创建工作

首先要做的是实际创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地创建该结构体:

27#define __WORK_INITIALIZER(n, f, d) {           \

28        .entry  = { &(n).entry, &(n).entry },         \

29        .func = (f),                              \

30        .data = (d),                              \

31        .timer = TIMER_INITIALIZER(NULL, 0, 0),   \

32        }

33

34#define DECLARE_WORK(n, f, d)                \

35        struct work_struct n = __WORK_INITIALIZER(n, f, d)

这样就会静态地创建一个名为name,处理函数为func,参数为data的work_struct结构体。



同样,也可以在运行时通过指针创建一个工作:

40#define PREPARE_WORK(_work, _func, _data)       \

41        do {                                    \

42                (_work)->func = _func;             \

43                (_work)->data = _data;            \

44        } while (0)

45



49#define INIT_WORK(_work, _func, _data)         \

50        do {                                        \

51                INIT_LIST_HEAD(&(_work)->entry);      \

52                (_work)->pending = 0;                      \

53                PREPARE_WORK((_work), (_func), (_data));   \

54                init_timer(&(_work)->timer);                 \

55        } while (0)

这会动态地初始化一个由work指向的工作,处理函数为func,参数为data。

无论是动态还是静态创建,默认定时器初始化为0,即不进行延时调度。


3.3    工作队列处理函数

工作队列处理函数的原型是:

void work_handler(void *data)

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管操作处理函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。



在工作队列和内核其他部分之间使用锁机制就像在其他的进程上下文中使用锁机制一样方便。这使编写处理函数变得相对容易。


3.4    调度工作
3.4.1      queue_work

创建一个工作的时候无须考虑工作队列的类型。在创建之后,可以调用下面列举的函数。这些函数与schedule-work()以及schedule-delayed-Work()相近,惟一的区别就在于它们针对给定的工作队列而不是默认的event队列进行操作。



将工作添加到当前处理器对应的链表中,但并不能保证此工作由提交该工作的CPU执行。Flushwork时可能执行所有CPU上的工作或者CPU热插拔后将进行工作的转移

107int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

108{

109        int ret = 0, cpu = get_cpu();

// 工作结构还没在队列, 设置pending标志表示把工作结构挂接到队列中

111        if (!test_and_set_bit(0, &work->pending)) {

112                if (unlikely(is_single_threaded(wq)))

113                        cpu = singlethread_cpu;

114                BUG_ON(!list_empty(&work->entry));

115                __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

////////////////////////////////

84static void __queue_work(struct cpu_workqueue_struct *cwq,

85                         struct work_struct *work)

86{

87        unsigned long flags;

88

89        spin_lock_irqsave(&cwq->lock, flags);

//// 指向CPU工作队列

90        work->wq_data = cwq;

// 加到队列尾部

91        list_add_tail(&work->entry, &cwq->worklist);

92        cwq->insert_sequence++;

// 唤醒工作队列的内核处理线程

93        wake_up(&cwq->more_work);

94        spin_unlock_irqrestore(&cwq->lock, flags);

95}

////////////////////////////////////

116                ret = 1;

117        }

118        put_cpu();

119        return ret;

120}

121EXPORT_SYMBOL_GPL(queue_work);



一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。


3.4.2      schedule_work

在大多数情况下, 并不需要自己建立工作队列,而是只定义工作, 将工作结构挂接到内核预定义的事件工作队列中调度, 在kernel/workqueue.c中定义了一个静态全局量的工作队列static struct workqueue_struct *keventd_wq;



调度工作结构, 将工作结构添加到全局的事件工作队列keventd_wq,调用了queue_work通用模块。对外屏蔽了keventd_wq的接口,用户无需知道此参数,相当于使用了默认参数。keventd_wq由内核自己维护,创建,销毁。



455static struct workqueue_struct *keventd_wq;

463int fastcall schedule_work(struct work_struct *work)

464{

465        return queue_work(keventd_wq, work);

466}

467EXPORT_SYMBOL(schedule_work);


3.4.3      queue_delayed_work

有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,

同时也可以利用timer来进行延时调度,到期后才由默认的定时器回调函数进行工作注册。



延迟delay后,被定时器唤醒,将work添加到工作队列wq中。

143int fastcall queue_delayed_work(struct workqueue_struct *wq,

144                        struct work_struct *work, unsigned long delay)

145{

146        int ret = 0;

147        struct timer_list *timer = &work->timer;

148

149        if (!test_and_set_bit(0, &work->pending)) {

150                BUG_ON(timer_pending(timer));

151                BUG_ON(!list_empty(&work->entry));

152

153                /* This stores wq for the moment, for the timer_fn */

154                work->wq_data = wq;

155                timer->expires = jiffies + delay;

156                timer->data = (unsigned long)work;

157                timer->function = delayed_work_timer_fn;

////////////////////////////////////

定时器到期后执行的默认函数,其将某个work添加到一个工作队列中,需两个重要信息:

Work:__data定时器的唯一参数

待添加至的队列:由work->wq_data提供

123static void delayed_work_timer_fn(unsigned long __data)

124{

125        struct work_struct *work = (struct work_struct *)__data;

126        struct workqueue_struct *wq = work->wq_data;

127        int cpu = smp_processor_id();

128

129        if (unlikely(is_single_threaded(wq)))

130                cpu = singlethread_cpu;

131

132        __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);

133}

////////////////////////////////////

158                add_timer(timer);

159                ret = 1;

160        }

161        return ret;

162}

163EXPORT_SYMBOL_GPL(queue_delayed_work);


3.4.4      schedule_delayed_work

其利用queue_delayed_work实现了默认线程keventd_wq中工作的调度。



477int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)

478{

479        return queue_delayed_work(keventd_wq, work, delay);

480}

481EXPORT_SYMBOL(schedule_delayed_work);


3.5    刷新工作
3.5.1      flush_workqueue

排入队列的工作会在工作者线程下一次被唤醒的时候执行。有时,在继续下一步工作之前,你必须保证一些操作已经执行完毕了。这一点对模块来说就很重要,在卸载之前,它就有可能需要调用下面的函数。而在内核的其他部分,为了防止竟争条件的出现,也可能需要确保不再有待处理的工作。



出于以上目的,内核准备了一个用于刷新指定工作队列的函数flush_workqueue。其确保所有已经调度的工作已经完成了,否则阻塞直到其执行完毕,通常用于驱动模块的关闭处理。其检查已经每个CPU上执行完的序号是否大于此时已经待插入的序号。对于新的以后插入的工作,其不受影响。

320void fastcall flush_workqueue(struct workqueue_struct *wq)

321{

322        might_sleep();

323

324        if (is_single_threaded(wq)) {

325                /* Always use first cpu's area. */

326                flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));

327        } else {

328                int cpu;

// 被保护的代码可能休眠,故此处使用内核互斥锁而非自旋锁

330                mutex_lock(&workqueue_mutex);

// 将同时调度其他CPU上的工作,这说明了工作并非在其注册的CPU上执行

331                for_each_online_cpu(cpu)

332                        flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));

//////////////////////////

278static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)

279{

280        if (cwq->thread == current) {

// keventd本身需要刷新所有工作时,手动调用run_workqueue,否则将造成死锁。

285                run_workqueue(cwq);

286        } else {

287                DEFINE_WAIT(wait);

288                long sequence_needed;

289

290                spin_lock_irq(&cwq->lock);

// 保存队列中当前已有的工作所处的位置,不用等待新插入的工作执行完毕

291                sequence_needed = cwq->insert_sequence;

292

293                while (sequence_needed - cwq->remove_sequence > 0) {

// 如果队列中还有未执行完的工作,则休眠

294                        prepare_to_wait(&cwq->work_done, &wait,

295                                        TASK_UNINTERRUPTIBLE);

296                        spin_unlock_irq(&cwq->lock);

297                        schedule();

298                        spin_lock_irq(&cwq->lock);

299                }

300                finish_wait(&cwq->work_done, &wait);

301                spin_unlock_irq(&cwq->lock);

302        }

303}

//////////////////////////

333                mutex_unlock(&workqueue_mutex);

334        }

335}

336EXPORT_SYMBOL_GPL(flush_workqueue);



函数会一直等待,直到队列中所有对象都被执行以后才返回。在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文中使用它。



注意,该函数并不取消任何延迟执行的工作。就是说,任何通过schedule_delayed_work调度的工作,如果其延迟时间未结束,它并不会因为调用flush_scheduled_work()而被刷新掉。


3.5.2      flush_scheduled_work



刷新系统默认工作线程的函数为flush_scheduled_work,其调用了上面通用的函数

532void flush_scheduled_work(void)

533{

534        flush_workqueue(keventd_wq);

535}

536EXPORT_SYMBOL(flush_scheduled_work);


3.5.3      cancel_delayed_work

取消延迟执行的工作应该调用:

int cancel_delayed_work(struct work_struct *work);

这个函数可以取消任何与work_struct相关的挂起工作。