基于条件变量的消息队列

     条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。

     消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!

     消息队列在服务器开发过程中主要用于什么对象呢?

     1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!

     2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。

     3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!

给出源代码:

   BlockingQueue.h文件

/*
 * BlockingQueue.h
 *
 *  Created on: Apr 19, 2013
 *      Author: archy_yu
 */

#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_

#include <queue>
#include <pthread.h>

typedef void* CommonItem;

class BlockingQueue
{
public:
    BlockingQueue();

    virtual ~BlockingQueue();

    int peek(CommonItem &item);

    int append(CommonItem item);

private:

    pthread_mutex_t _mutex;

    pthread_cond_t _cond;

    std::queue<CommonItem> _read_queue;

    std::queue<CommonItem> _write_queue;

};

#endif /* BLOCKINGQUEUE_H_ */

  BlockingQueue.cpp 文件代码

/*
 * BlockingQueue.cpp
 *
 *  Created on: Apr 19, 2013
 *      Author: archy_yu
 */

#include "BlockingQueue.h"

BlockingQueue::BlockingQueue()
{
    pthread_mutex_init(&this->_mutex,NULL);
    pthread_cond_init(&this->_cond,NULL);
}

BlockingQueue::~BlockingQueue()
{
    pthread_mutex_destroy(&this->_mutex);
    pthread_cond_destroy(&this->_cond);
}

int BlockingQueue::peek(CommonItem &item)
{

    if( !this->_read_queue.empty() )
    {
        item = this->_read_queue.front();
        this->_read_queue.pop();
    }
    else
    {
        pthread_mutex_lock(&this->_mutex);

        while(this->_write_queue.empty())
        {
            pthread_cond_wait(&this->_cond,&this->_mutex);
        }

        while(!this->_write_queue.empty())
        {
            this->_read_queue.push(this->_write_queue.front());
            this->_write_queue.pop();
        }

        pthread_mutex_unlock(&this->_mutex);
    }

    return 0;
}

int BlockingQueue::append(CommonItem item)
{
    pthread_mutex_lock(&this->_mutex);
    this->_write_queue.push(item);
    pthread_cond_signal(&this->_cond);
    pthread_mutex_unlock(&this->_mutex);
    return 0;
}

  测试代码:

BlockingQueue _queue;

void* process(void* arg)
{

    int i=0;
    while(true)
    {
        int *j = new int();
        *j = i;
        _queue.append((void *)j);
        i ++;
    }
    return NULL;
}

int main(int argc,char** argv)
{
    pthread_t pid;
    pthread_create(&pid,0,process,0);

    long long int start = get_os_system_time();
    int i = 0;
    while(true)
    {
        int* j = NULL;
        _queue.peek((void* &)j);

        i ++;

        if(j != NULL && (*j) == 100000)
        {
            long long int end = get_os_system_time();
            printf("consume %d\n",end - start);
            break;
        }
    }

    return 0;
}

欢迎拍砖!!!

时间: 2016-04-20

基于条件变量的消息队列的相关文章

基于条件变量的消息队列 说明介绍_C 语言

条件变量是线程之前同步的另一种机制.条件变量给多线程提供了一种会和的场所.当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生.这样大大减少了锁竞争引起的线程调度和线程等待.      消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错.博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲.互斥锁和条件变量的消息队列:这个大概也参考了一下java的blocking

KiteQ —— 基于 go + protobuff 的消息队列

KiteQ 是一个基于 go + protobuff 实现的多种持久化方案的 mq 框架(消息队列). 特性: * 基于zk维护发送方.订阅方.broker订阅发送关系.支持水平.垂直方面的扩展 * 基于与topic以及第二级messageType订阅消息 * 基于mysql.文件存储方式多重持久层消息存储 * 保证可靠异步投递 * 支持两阶段提交分布式事务 工程结构: kiteq/ ├── README.md ├── binding 订阅关系管理处理跟ZK的交互 ├── build.sh 安装

POSIX和SYSTEM的消息队列应该注意的问题

  首先看看POSIX的代码: 1.posix_mq_server.c #include <mqueue.h>#include <sys/stat.h>#include <string.h>#include <stdio.h>#define MQ_FILE "/mq_test"#define BUF_LEN 128 int main(){     mqd_t mqd;    char buf[BUF_LEN];    int  por =

System V 消息队列

1.概述 消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.System V 消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除.可以将内核中的某个特定的消息队列画为一个消息链表,如下图所示: 对于系统中没个消息队列,内核维护一个msqid

基于TableStore构建简易海量Topic消息队列

前言 消息队列,通常有两种场景,一种是发布者订阅模式,一种是生产者消费者模式.发布者订阅模式,即发布者生产消息放入队列,多个监听的消费者都会收到同一份消息,也就是每个消费者收到的消息是一样的.生产者消费者模式,生产者生产消息放入队列,多个消费者同时监听队列,谁先抢到消息就会从队列中取走消息,最终每个消息只会有一个消费者拥有. 在大数据时代,传统的生产者消费者队列模式中的Topic数目可能从少量的几个变为海量topic.例如要实现一个全网爬虫抓取任务调度系统,每个大型的门户,SNS都会成为一个to

线程-linux下消息队列, 如何在满足某种条件下将队列清空?

问题描述 linux下消息队列, 如何在满足某种条件下将队列清空? 操作系统课上一个作业, 要求是用消息队列来实现某些功能 已知消息队列的特性 : 可以多个进程接受相同消息, 可知队列中的消息是不会消失的 目前所想的是用3个进程, 每个进程都有2个线程 1: 将消息发送到消息队列, 用while(1)不断等待读入 2: 从消息队列中接收消息, 用while(1)不断读取消息队列 如此类似一个群聊的功能 问题是 : 当某个进程送入消息, 3个进程都读取完毕后, 如何将队列中的消息清空? 而不是持续

基于HBase的消息队列:HQueue

1. HQueue简介 HQueue是一淘搜索网页抓取离线系统团队基于HBase开发的一套分布式.持久化消息队列.它利用HTable存储消息数据,借助HBase Coprocessor将原始的KeyValue数据封装成消息数据格式进行存储,并基于HBase Client API封装了HQueue Client API用于消息存取. HQueue可以有效使用在需要存储时间序列数据.作为MapReduce Job和iStream等输入.输出供上下游共享数据等场合. 2. HQueue特性 由于HQu

几种常见的微服务架构方案——ZeroC IceGrid、Spring Cloud、基于消息队列、Docker Swarm

微服务架构是当前很热门的一个概念,它不是凭空产生的,是技术发展的必然结果.虽然微服务架构没有公认的技术标准和规范草案,但业界已经有一些很有影响力的开源微服务架构平台,架构师可以根据公司的技术实力并结合项目的特点来选择某个合适的微服务架构平台,以此稳妥地实施项目的微服务化改造或开发进程. 本文选自<架构解密:从分布式到微服务>. 本文盘点了四种常用的微服务架构方案,分别是ZeroC IceGrid.Spring Cloud.基于消息队列与Docker Swarm. ZeroC IceGrid微服

云队列:一个基于Hadoop的大规模消息基础平台

云队列:一个基于Hadoop的大规模消息基础平台 东华大学 史冬冬 本文基于从当前分布式系统消息通信的需求出发,并结合Hadoop云计算平台,提出了一个面向Internet规模的.高性能和高可靠的消息队列服务平台--云队列(Cloudqueue).通过云队列平台所提供的消息服务实现异步通讯.存储转发.可靠传输.跨互联网等特性,云队列可以在云计算时代为应用程序提供高性能.高可靠.大容量的面向互联网的消息传递服务. 文章从现实需求出发,在分析了消息中间件的相关基础知识和Hadoop关键技术的基础之上