2.9 消息队列(queue)
2.9.1 概念
在多任务系统中,任务间互相同步等待共享资源,我们一般会使用信号量,如果需要互斥,则使用互斥量。而任务间互相收发消息则可以使用消息队列。消息队列使用类似信号量的机制进行任务间的同步,并使用环形缓冲池来进行消息的队列缓冲管理,以达到任务间收发消息的阻塞和通知管理。
实现消息队列的目的在于任务间互相收发消息。一般,如果有信号量机制,用户就可以自己实现一套任务间的阻塞和通知收发功能,其本质在于接收方通过信号量的获取来开始接收消息,发送方通过信号量的释放来通知接收方处理。接收任务在无消息时被阻塞,消息到来时被唤醒处理。消息队列就是基于这样一种类信号量机制来进行消息的收发。在此基础上,进一步使用环形缓冲池的缓冲机制来缓存任务间的消息队列,就组合成了本章的消息队列,其既包含消息的缓冲队列,又包含消息的通知机制。
2.9.2 消息队列的创建与删除
(1)静态创建:kstat_t krhino_queue_create(kqueue_t *queue, const name_t *name, void **start, size_t msg_num)。
(2)动态创建:kstat_t krhino_queue_dyn_create(kqueue_t **queue, const name_t *name, size_t msg_num)。
queue的创建主要是建立三种数据结构:queue的阻塞队列blk_obj,用于管理等待该queue的任务列表;queue的消息管理msg_q结构,用来管理当前消息队列数量;queue的环形缓冲池ringbuf,用来缓存消息的指针队列。动态接口和静态接口的差异在于,krhino_queue_dyn_create在函数内部动态申请了queue以及ring buffer需要的内存。
(3)静态删除:kstat_t krhino_queue_del(kqueue_t *queue)。
(4)动态删除:kstat_t krhino_queue_dyn_del(kqueue_t *queue)。
动态删除需要释放queue的ring buffer队列queue->ringbuf,以及queue本身内存。
2.9.3 消息队列的发送和接收
消息队列从消息发送的通道上来说分为两种,消息(msg)往ring buffer发送以及消息直接往任务发送并唤醒任务接收。当queue中没有任务阻塞,即没有任务在等待接收消息时,此消息会放入ring buffer进行缓存;当queue中有任务在等待接收数据时,则消息不再往ring buffer缓存,而是直接唤醒阻塞任务,并将消息直接送给任务的消息数据结构task->msg。具体处理流程如图2-13所示。
图2-13 queue_send的处理流程
queue的接收和发送相对应,其函数原型:kstat_t krhino_queue_recv(kqueue_t*queue, tick_t ticks, void **msg),其处理流程如下:
(1)先判断ring buffer中有无缓冲队列需要处理,如果有,则从ringbuf的head处取出一个消息指针msg。
(2)如果当前缓冲区没有消息缓存,则当前任务需要等待,此时处理类似信号量的任务等待,会通过krhino_queue_recv传入的延时参数ticks来进行非等待、永久等待和延时等待的处理。永久等待和延时等待会将任务加入queue的阻塞列表,延时等待还会加入tick的处理队列,以便超时唤醒。
(3)当阻塞任务被唤醒时,需要判断是否被唤醒成功,如果是,则获取task->msg作为获取的消息队列,如果是被超时等恢复,则返回失败。
2.9.4 消息队列维测信息获取
queue模块提供了三个维测接口来获取和维护queue信息:
kstat_t krhino_queue_is_full(kqueue_t *queue)
该接口通过消息队列内的ring buffer缓存数目,来判断缓存区是否已满。如果缓冲队列满,后续的queue_send会失败。
kstat_t krhino_queue_info_get(kqueue_t *queue, msg_info_t *info)
该接口调用会返回queue内部buffer的基础信息,包括缓冲队列起始地址、目前数目、总大小等。
kstat_t krhino_queue_flush(kqueue_t *queue)
该接口用于将queue内的ring buffer缓冲清除,并将缓冲数目清0。
2.9.5 消息队列使用示例
#define QUEUE_BUF_SIZE 32 #define QUEUE_TEST_TIMES 10 static char *queue_buf[QUEUE_BUF_SIZE]; static kqueue_t g_queue; static int send_msg[QUEUE_TEST_TIMES]={1 ,2 ,3 ,4 ,5 ,6,7 ,8,9 ,10}; void test_queue(void) { kstat_t ret=RHINO_SUCCESS; int *p_recv_msg=NULL; msg_info_t msg_info; int i=0; /* create a queue */ ret=krhino_queue_create(&g_queue, "g_queue", (void**)&queue_buf, QUEUE_ BUF_SIZE); if (ret ! =RHINO_SUCCESS){ printf("queue create failed ! /n"); return; } /* send message */ for(i=0 ; i < QUEUE_TEST_TIMES; i++) { ret=krhino_queue_back_send(&g_queue, &send_msg[i]); if (ret ! =RHINO_SUCCESS){ printf("queue send failed ! /n"); return; } } /* check the msg_info */ ret=krhino_queue_info_get(&g_queue, &msg_info); if((msg_info.msg_q.size ! =QUEUE_BUF_SIZE)||(msg_info.msg_q.cur_num ! = QUEUE_TEST_TIMES)) { printf("queue info get failed ! /n"); return; } /* receive message */ for(i=0 ; i < QUEUE_TEST_TIMES; i++) { ret=krhino_queue_recv(&g_queue,0xFFFF, (void**)&p_recv_msg); if((ret ! =RHINO_SUCCESS)||(*p_recv_msg ! =send_msg[i])) { printf("queue receive failed ! /n"); return; } } printf("queue test success ! /n"); }