linux服务器开发之网关服务器的实现

什么是网关服务器

初学linux服务器开发时,我们的服务器是很简单的,只需要一个程序完成与客户端的连接,接收客户端数据,数据处理,向客户端发送数据。
但是在处理量很大的情况下,一台机器不能满足我们的需求,此时我们应该怎么办。
我们可以将服务端的任务分摊到多台机器上完成,见下图
这里写图片描述

>

从图中可见,此时整个服务端主要分为了三部分。
网关服务器:负责连接客户端与逻辑服务器,在两者间完成数据转发,使用负载均衡算法保证每个逻辑服务器的工作量均衡,以及进行数据加密。
逻辑服务器:负责业务逻辑的处理,与网关服务器进行数据交互,同时与数据库服务器进行数据交互。
数据库服务器:数据存储与读取的具体执行者。

实现网关服务器需要考虑哪些问题

效率问题

当我们需要用到网关服务器来负载均衡时,我可以假定我们需要处理的客户端请求是很多的(当然,我这里只是为了学习,具体业务并不需要),也就是说我们需要高并发,
高效处理。
因为网关服务器在客户端和逻辑服务器间相当于纽带的作用,所有的数据包都要从此经过,所以我们的网关服务器必须要保证可以 高效的处理大量连接上的事件

安全问题

如上所说,如果网关服务器被恶意发起连接,一旦挂掉,我们的全部服务都会终止,因此我们必须要对这种情况进行处理。同时,还有与客户端交互时的数据加密,这个事也
是要交给网关服务器来进行的。逻辑服务器一般都会与网关服务器配置于同一个局域网,所以通常不需要考虑数据的加密。

对连接的标识

逻辑服务器和客户端都会连接在网关服务器上,而网关服务器需要对其sockfd进行标识,要知晓究竟谁是服务器,谁是客户端,而且要对客户端的连接加一条可检索属
性(比如用户名).
为什么呢?因为对于客户端发送过来的数据,我们无论转到哪个逻辑服务器上都可以,而逻辑服务器返回的数据,我们需要知道要将该数据返回给哪个客户端,逻辑服务器并不能
知道每个客户端的sockfd是多少。

下面我们着重聊聊效率问题:

多路复用

>
我们不会去为每个sockfd都分配一个线程去服务它,我们更需要有一个线程可以去监听所有的fd上的事件,如果发生,我们再去分配线程去处理他。这就是多路复用。
多路复用有select poll
epoll,几乎凡是知道多路复用的人都知道epoll的高效。因为其底层红黑树,以及回调机制,是我们最好的选择(在大量连接,活跃量不高的情况下)。
而epoll分两种工作模式,LT和ET,LT模式下,epoll只是一个高效的poll,ET模式下会更高效。事实上众多的第三方库都使用的是LT模式,说白了就是
性价比,LT已经很高效,而改用ET模式,除了效率会更高,也会给编写带来一些复杂性以及产生一些头疼的问题,而处理这些特殊情况也需要时间,处理方式不当的话反而还
不如LT,所以,总而言之,性价比不高。(本人为了学习,此处使用的et模式)。

非阻塞

每个连接的sockfd,我们都有两种操作其的方式,阻塞和非阻塞,阻塞意味着我们此刻必须对sockfd进行等待,就是说我们不能去干别的事,这显然不可以。因
此,在以高并发为目标的服务器程序里,非阻塞是我们唯一的选择。
并且,et模式下,必须非阻塞,不然会产生套接字饿死的情况。
非阻塞模式下,我们还需要一样东西,就是缓冲区,因为你并不能保证你接受到的数据就是完整的。

工作模式

这里使用的是多线程Reacter半同步半异步模式。
主线程负责监听以及接收新的连接,维护一个任务队列,其余线程从任务队列里获取任务并完成,同时也将新的任务添加进任务队列。

架构

总体分为以下部分

main.h

程序主线程:监听fd绑定、监听,epoll监听

Connection.h

客户端和逻辑服务器的连接的封装
实现对连接的操作:
HandleRead()读, HandleWrite()写, Worker()数据处理,
shutdown()连接关闭,getData()从用户缓冲区获取数据,puttData()将数据写入用户缓冲区

ThreadPool.h

线程池的封装

SyncQueue.h

任务队列的封装
实现队列的添加取出,以及同步加锁等处理

Buffer.h

用户缓存区的封装

BaseFunc.h

基本函数的封装:如 setNoBlocking(), addFd()…

Util.h

工具类

正确性测试结果

这里写图片描述

代码

main.cpp

//
//  GataMain.cpp
//  QuoridorServer
//
//  Created by shiyi on 2016/12/2.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#include <stdio.h>
#include <iostream>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <arpa/inet.h>
#include <functional>  
#include "Util.h"
#include "ThreadPool.h"
#include "Connection.h"
#include "BaseFunc.h"

static const char *IP = "10.105.44.34";
// static const char *IP = "127.0.0.1";
// static const char *IP = "182.254.243.29";
static const int PORT = 11111;

//处理的最大连接数
static const int USER_PROCESS = 655536;
//epoll能监听的最大事件
static const int MAX_EVENT_NUMBER = 10000;

//信号通信的管道
static int sigPipefd[2];

//信号回调函数
static void sigHandler(int sig)
{
    int saveErrno = errno;
    send(sigPipefd[1], (char*)&sig, 1, 0);
    errno = saveErrno;
}

//添加信号回调
static void addSig(int sig, void(handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, 0, sizeof(sa));
    sa.sa_handler = handler;
    if(restart)
        sa.sa_flags |= SA_RESTART;
    sigfillset(&sa.sa_mask);
    if(-1 == sigaction(sig, &sa, NULL))
        Util::outError("sigaction");
}

static int setupSigPipe()
{
    //新建epoll监听表和事件管道
    int epollfd = epoll_create(USER_PROCESS);
    if(epollfd == -1)
        Util::outError("epoll_create");

    int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sigPipefd);
    assert(ret == 0);

    //将写设置为非阻塞
    setNoBlocking(sigPipefd[1]);
    addFd(epollfd, sigPipefd[0], EPOLLIN | EPOLLET);
    setNoBlocking(sigPipefd[0]);

    //设置信号处理函数
    addSig(SIGCHLD, sigHandler);
    addSig(SIGTERM, sigHandler);
    addSig(SIGINT, sigHandler);
    addSig(SIGPIPE, sigHandler);

    return epollfd;
}

int main()
{
    int ret;

    //构造协议地址结构
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = PF_INET;
    inet_pton(PF_INET, IP, &address.sin_addr);
    address.sin_port = htons(PORT);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert( listenfd >= 0 );

    int opt = 1;
    if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(int)) < 0)
    {
        perror("setsockopt");
        exit(1);
    }

    ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    if(ret == -1)
    {
        perror("bind");
    }

    if(listen(listenfd, 1000) < 0)
    {
        perror("listen");
        exit(1);
    }

    Connection *users = new Connection[USER_PROCESS];
    ThreadPool threadPool;

    //统一事件源
    int epollfd = setupSigPipe();

    epoll_event events[MAX_EVENT_NUMBER];
    // addFd(epollfd, listenfd, EPOLLIN | EPOLLET);
    addFd(epollfd, listenfd, EPOLLIN);
    // setNoBlocking(m_listenfd);

    bool isRunning = true;

    while(isRunning)
    {
        int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        //如果错误原因不是被中断,则循环退出
        if((num < 0) && (errno != EINTR))
        {
            Util::outError("epoll_wait failure");
            break;
        }

        for(int i=0; i<num; i++)
        {
            int sockfd = events[i].data.fd;
            //处理新的请求
            if(sockfd == listenfd)
            {
                //连接新的请求
                struct sockaddr_in clientAddr;
                socklen_t clientLen = sizeof(clientAddr);
                int connfd = accept(listenfd, (struct sockaddr*)&clientAddr, &clientLen);

                if(connfd < 0)
                {              
                    Util::outError("accept");
                    break;
                }

                Util::outMsg("accept a new client : %d %s\n", connfd, inet_ntoa(clientAddr.sin_addr));

                addFd(epollfd, connfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
                setNoBlocking(connfd);
                //初始化客户端链接
                users[connfd].init(epollfd, connfd, clientAddr);

            }
            //处理信号
            else if((sockfd == sigPipefd[0]) && (events[i].events & EPOLLIN)) 
            {
                char sigMsg[1024];
                int ret = recv(sockfd, sigMsg, sizeof(sigMsg), 0);
                if(ret <= 0)
                {
                    continue;
                }

                for(int j=0; j<ret; j++)
                {
                    //循环处理每个信号
                    switch(sigMsg[j])
                    {
                        case SIGCHLD:
                        {

                            break;
                        }
                        case SIGTERM:
                        case SIGINT:
                        {   
                            //退出
                            Util::outMsg("程序退出\n");
                            isRunning = false;
                            break;
                        }
                    }
                }
            }
            //处理读事件
            else if(events[i].events & EPOLLIN)
            {
                //向任务队列添加读任务
                threadPool.AddTask(std::bind(&Connection::HandleRead, users+sockfd));
            }
            //处理写事件
            else if(events[i].events & EPOLLOUT)
            {
                // cout<<"hello"<<sockfd<<endl;
                threadPool.AddTask(std::bind(&Connection::HandleWrite, users+sockfd));
            }
        }
    }

    delete[] users;

    close(sigPipefd[0]);
    close(sigPipefd[1]);
    close(epollfd);

    return 0;
}

Connection.h

//
//  Connection.h
//  QuoridorServer
//
//  Created by shiyi on 2016/12/2.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef Connection_H
#define Connection_H

#include <stdio.h>
#include <iostream>
#include <atomic>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <errno.h>
#include <map>
#include "Buffer.h"
#include "Util.h"
#include "BaseFunc.h"

#include "json/json.h"

const std::string serverIP[] = {
    "127.0.0.1",
    "182.254.243.29"
};

const size_t BUFFER_SIZE = 65535;

class Connection
{
public:

    static std::vector<Connection*> serverConnVt;
    static std::map<string, Connection*> clientConnMap;
    static int serverIdx;

    static Connection* getServerConn()
    {
        int size = serverConnVt.size();
        if(size == 0)
            return NULL;

        serverIdx = (serverIdx+1)%size;

        return serverConnVt[serverIdx];
    }

    Connection() : m_writeing(true), m_epollfd(-1), m_sockfd(-1)
    {}

    ~Connection(){}

    //初始化连接
    void init(int epollfd, int sockfd, const sockaddr_in& clientAddr)
    {
        //初始化读写缓冲区
        m_inBuff.init();
        m_outBuff.init();

        m_epollfd = epollfd;
        m_sockfd = sockfd;
        m_writeing = true;
        m_address = clientAddr;
        m_username = "";
        m_type = -1;

        std::string sip(inet_ntoa(clientAddr.sin_addr));

        for(auto& ip : serverIP)
        {
            if(ip.compare(0, sip.size(), sip) == 0)
            {
                m_type = 1;
                serverConnVt.push_back(this);
                cout<<sip<<"是服务端"<<endl;
                break;
            }
        }

        if(m_type != 1)
        {
            char t[10];
            sprintf(t, "%d", m_sockfd);
            m_username = t; 

            //存入客户端映射表
            clientConnMap.insert(pair<string, Connection*>(m_username, this));
        }
    }

    void HandleRead()
    {
        cout<<"read"<<endl;
        while(true)
        {
            char buf[BUFFER_SIZE];
            int ret = recv(m_sockfd, buf, BUFFER_SIZE, 0);
            if(ret < 0)
            {
                //缓冲区内容已读完
                if((errno == EAGAIN) || (errno == EWOULDBLOCK))
                {
                    modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
                    break;
                }
                //其他错误直接断开连接
                Util::outError("HandleRead");
                shutdown();
                return;
            }
            //断开连接
            else if(ret == 0)
            {
                shutdown();
                return;
            }
            else
            {
                //将读取的内容加入缓冲区
                m_inBuff.PutData(buf, ret);
            }

        }

        worker();
    }

    void HandleWrite()
    {
        cout<<"write"<<endl;
        //更改临界值
        if(!m_writeing)
        {            
            //休眠等待
            usleep(1000);
            //下次再来
            modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
            return;
        }
        m_writeing = false;

        //取出数据
        char buf[BUFFER_SIZE];

        int len = m_outBuff.GetDataAll(buf);

        int n = len;
        while (n > 0)
        {
            int ret = send(m_sockfd, buf+len-n, n, 0);
            if (ret < n)
            {
                if (ret == -1 && errno != EAGAIN)
                {
                    Util::outError("write error");
                }
                break;
            }
            n -= ret;
        }

        //n=0表示数据全部写完,删除写事件
        if(n == 0)
        {
            modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
        }
        else
        {
            modFd(m_epollfd, m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
        }

        //恢复临界值
        m_writeing = true;
    }

    void clientWork()
    {
        //解析
        //取出数据
        char buf[BUFFER_SIZE];
        int len = getData(buf);

        //解密buf
        printf("recv from %d :%s\n", m_sockfd, buf);

        std::string recvUser;
        Json::Reader reader;
        Json::Value inRoot;
        Json::Value outRoot;  
        if(reader.parse(buf, inRoot)) 
        {
            Json::Value data = inRoot["data"];
            outRoot["data"] = data;
            outRoot["user"] = m_username;
        }

        Connection* toConn = getServerConn();
        if(toConn->m_sockfd == -1)
        {
            printf("无可用逻辑服务器\n");
            return;
        }

        //生成json字符串
        std::string outStr = outRoot.toStyledString();
        len = outStr.size();

        printf("send to %d :%s\n", toConn->m_sockfd, outStr.c_str());

        memcpy(buf, &len, 4);
        memcpy(buf+4, outStr.c_str(), len);

        toConn->putData(buf, len+4);

        modFd(m_epollfd, toConn->m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);
    }


    void serverWork()
    {
        //解析
        //取出数据
        char buf[BUFFER_SIZE];
        int len = getData(buf);

        //解密buf
        printf("recv from %d :%s\n", m_sockfd, buf);

        std::string toUser;
        Json::Reader reader;
        Json::Value inRoot;
        Json::Value outRoot;  
        if(reader.parse(buf, inRoot)) 
        {
            toUser = inRoot["user"].asString();
            Json::Value data = inRoot["data"];
            outRoot["data"] = data;
        }

        auto iter = clientConnMap.find(toUser);
        if(iter == clientConnMap.end())
        {
            printf("客户端%s不存在\n", toUser.c_str());
            return;
        }

        Connection* toConn = (*iter).second;

        //生成json字符串
        std::string outStr = outRoot.toStyledString();
        len = outStr.size();

        printf("send to %d :%s\n", toConn->m_sockfd, outStr.c_str());

        memcpy(buf, &len, 4);
        memcpy(buf+4, outStr.c_str(), len);

        toConn->putData(buf, len+4);

        modFd(m_epollfd, toConn->m_sockfd, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT);

    }

    int getData(char *buf)
    {
        return m_inBuff.GetData(buf);

    }

    void putData(char *buf, int len)
    {
        while(!m_writeing)
            usleep(1000);
        m_writeing = false;

        m_outBuff.PutData(buf, len);

        m_writeing = true;
    }

    void worker()
    {
        //serverWork();
        if(m_type == 1)
        {
            cout<<"workerServer"<<endl;
            serverWork();
        }
        else
        {         
            cout<<"workerClient"<<endl;  
            clientWork();
        }
    }

    void shutdown()
    {
        //等待写事件完成后关闭
        while(!m_writeing)
            usleep(1000);

        m_writeing = false;
        removeFd(m_epollfd, m_sockfd);
        m_writeing = true;

        //服务端
        if(m_type == 1)
        {
            for(auto i=serverConnVt.begin(); i!=serverConnVt.end(); i++)
            {
                if((*i)->m_sockfd == m_sockfd)
                {
                    //在vt中删除该连接
                    serverConnVt.erase(i);
                    cout<<"退出服务端"<<m_sockfd<<endl;
                    break;
                }
            }
        }
        //客户端
        else
        {
            //map删除
            auto iter = clientConnMap.find(m_username);
            if(iter != clientConnMap.end())
            {
                clientConnMap.erase(iter);
                printf("客户端%s退出\n", m_username.c_str());
            }
        }
    }

private:
    int m_epollfd;                   //epoll描述符
    int m_sockfd;                    //套接字描述符
    std::string m_username;            //连接唯一标识
    int m_type;                        //连接类型   -1为未知客户端   0为已知客户端   1为服务端
    sockaddr_in m_address;           //套接字地址
    Buffer m_inBuff;                 //读缓冲
    Buffer m_outBuff;                //写缓冲
    std::atomic_bool m_writeing;     //是否正在写
};

std::vector<Connection*> Connection::serverConnVt;
std::map<string, Connection*> Connection::clientConnMap;
int Connection::serverIdx = -1;

#endif /* Connection_H */

Buffer.h

//
//  Buffer.h
//  QuoridorServer
//
//  Created by shiyi on 2016/12/2.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef Buffer_H
#define Buffer_H

#include <stdio.h>
#include <iostream>
#include <vector>

using namespace std;

class Buffer
{
public:
    Buffer() : m_widx(0), m_ridx(0)
    {}

    ~Buffer(){}

    void init()
    {
        m_widx = m_ridx = 0;
        m_buf.clear();
    }

    //增加内容
    void PutData(char *data, int len)
    {
        //如果调整空间后足够存放,则进行调整
        int capa = m_buf.capacity();
        if(capa < m_widx + len && capa > len + m_widx - m_ridx)
            adjust();

        for(int i = 0; i < len; i++)
            m_buf.push_back(data[i]);

        m_widx += len;
    }

    //返回获取的包的大小,数据不完整返回-1

    int GetData(char* data)
    {
        if(m_widx - m_ridx < 4)
            return -1;

        int len;
        char *t = (char*)&len;
        for(int i=0; i<4; i++)
        {
            t[i] = m_buf[m_ridx+i];
        }

        //printf("-=-=%d\n", len);

        if(len+4 > m_widx-m_ridx)
            return -1;

        m_ridx += 4;

        for(int i = 0; i < len; i++)
        {
            data[i] = m_buf[m_ridx++];
        }

        if(m_ridx >= m_widx)
        {
            m_ridx = m_widx = 0;
            m_buf.clear();
        }
        return len;
    }

    //返回Buffer内全部内容
    int GetDataAll(char* data)
    {
        int len = m_widx-m_ridx;

        for(int i = 0; i < len; i++)
        {
            if(m_ridx >= m_widx)
                break;
            data[i] = m_buf[m_ridx++];
        }

        if(m_ridx >= m_widx)
        {
            m_ridx = m_widx = 0;
            m_buf.clear();
        }

        return len;
    }

private:

    //将数据移至容器头部,充分利用空间
    void adjust()
    {
        vector<char> t(m_buf.begin()+m_ridx, m_buf.begin()+m_widx);
        m_widx -= m_ridx;
        m_ridx = 0;

        m_buf.clear();

        for(int i=0; i<m_widx; i++)
            m_buf.push_back(t[i]);
    }

private:

    int m_ridx;
    int m_widx;
    std::vector<char> m_buf;
};

#endif /* Buffer_H */

ThreadPool.h

//
//  ThreadPool.h
//  QuoridorServer
//
//  Created by shiyi on 2016/11/30.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef ThreadPool_H
#define ThreadPool_H

#include <stdio.h>
#include <iostream>
#include <functional>
#include <thread>
#include <atomic>
#include "SyncQueue.h"

const int MaxTaskCount = 100;

class ThreadPool
{
public:
    using Task = std::function<void()>;

    ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
    {
        if(numThreads < 4)
            numThreads = 4;

        printf("线程池启动-%d线程\n", numThreads);

        Start(numThreads);
    }

    ~ThreadPool()
    {
        Stop();
    }

    void Stop()
    {
        std::call_once(m_flag, [this]{
            StopThreadGroup();
        });
    }

    void AddTask(Task&& task)
    {
        m_queue.Push(std::forward<Task>(task));
    }

    void AddTask(const Task& task)
    {
        m_queue.Push(task);
    }

private:
    void Start(int numThreads)
    {
        m_running = true;
        //创建线程组
        for(int i=0; i<numThreads; i++)
        {
            m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
        }

    }

    void RunInThread()
    {
        while(m_running)
        {
            std::queue<Task> queue;
            m_queue.Take(queue);
            std::queue<int> a;

            while(!queue.empty())
            {
                if(!m_running)
                    return;
                auto task = queue.front();
                queue.pop();
                task();
            }
        }
    }

    void StopThreadGroup()
    {
        m_queue.Stop();
        m_running = false;

        for(auto thread : m_threadGroup)
        {
            thread->join();
        }

        m_threadGroup.clear();
    }

private:
    SyncQueue<Task> m_queue;                                    //同步队列
    std::vector<std::shared_ptr<std::thread>> m_threadGroup;    //处理任务的线程组
    atomic_bool m_running;                                      //是否停止
    std::once_flag m_flag;
};

#endif /* ThreadPool_H */

SyncQueue.h

//
//  SyncQueue.h
//  QuoridorServer
//
//  Created by shiyi on 2016/11/30.
//  Copyright © 2016年 shiyi. All rights reserved.
//

#ifndef SyncQueue_H
#define SyncQueue_H

#include <stdio.h>
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>

using namespace std;

template <typename T>
class SyncQueue
{
public:
    SyncQueue(int maxSize) : m_maxSize(maxSize), m_isStop(false)
    {
    }

    ~SyncQueue(){}

    void Push(const T& x)
    {
        Add(x);
    }

    void Push(T&& x)
    {
        Add(x);
    }

    void Take(T& t)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty(locker, [this]{
            return m_isStop || m_notEmpty();
        });

        if(m_isStop)
            return;
        t = m_queue.front();
        m_queue.pop();
        m_notFull.notify_one();
    }

    void Take(std::queue<T>& queue)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notEmpty.wait(locker, [this]{
            return m_isStop || NotEmpty();
        });

        if(m_isStop)
            return;
        queue = std::move(m_queue);
        m_notFull.notify_one();
    }

    void Stop()
    {
        {
            std::lock_guard<std::mutex> locker(m_mutex);
            m_isStop = true;
        }
        m_notFull.notify_all();
        m_notEmpty.notify_all();
    }

    bool Empty()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.empty();
    }

    bool Full()
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        return m_queue.size() >= m_maxSize;
    }

private:

    bool NotFull()
    {
        bool full = m_queue.size() >= m_maxSize;
        if(full)
            cout<<"缓冲区满,需要等待..."<<this_thread::get_id()<<endl;
        return !full;
    }

    bool NotEmpty()
    {
        bool empty = m_queue.empty();
        if(empty)
            cout<<"缓冲区空,需要等待..."<<this_thread::get_id()<<endl;
        return !empty;
    }

    template<typename F>
    void Add(F&& x)
    {
        std::unique_lock<std::mutex> locker(m_mutex);
        m_notFull.wait(locker, [this]{
            return m_isStop || NotFull();
        });
        if(m_isStop)
            return;
        m_queue.push(std::forward<F>(x));
        m_notEmpty.notify_one();
    }

private:
    bool m_isStop;                     //是否停止
    int m_maxSize;                     //同步队列最大的长度
    std::queue<T> m_queue;                  //缓冲区
    std::mutex m_mutex;                     //互斥量
    std::condition_variable m_notEmpty;     //不为空的条件变量
    std::condition_variable m_notFull;      //不满的条件变量
};

#endif /* SyncQueue_H */
请赐予我钱进的动力吧~
0%