使用 kqueue 在 FreeBSD 上开发高性能应用服务器
2010-05-21 00:00:00 来源:WEB开发网为了达到某种目的,甚至有人会通过 getsocketopt 来偷看 socket 读缓冲区的数据大小或写缓区
可用空间的大小。kqueue 开发人员考虑到这些现状,在 kevent 返回时,将读写缓冲区的可读字
节数或可写空间大小告诉应用程序。基于这个特性,使用 kqueue 的应用一般不使用非阻塞 IO。每次读时,根据 kevent 返回的可读字节大小,将接收缓冲区中的数据一次性读完;而发送数据时,也根据 kevent 返回的写缓冲区可写空间的大小,一次只发可写空间大小的数据。
结束语
本文介绍了 FreeBSD kqueue 这种多路复用 IO 模型的用法,重点介绍了 kqueue 对 Sockets IO 的控制和事件通知过程。有一定网络编程基础的程序员学习本文后,结合给出的例子就能开发出有一定性能保证的 FreeBSD 应用服务器了。
附:
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/event.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
const std::string IP = "192.168.79.18";
const int PORT = 4312;
const int MAX_EVENT_COUNT = 5000;
const int MAX_RECV_BUFF = 65535;
int listener_;
char buf_[MAX_RECV_BUFF];
int CreateListener();
bool Register(int kq, int fd);
void WaitEvent(int kq);
void HandleEvent(int kq, struct kevent* events, int nevents);
void Accept(int kq, int connSize);
void Receive(int sock, int availBytes);
void Enqueue(const char* buf, int bytes);
int main(int argc, char* argv[])
{
listener_ = CreateListener();
if (listener_ == -1)
return -1;
int kq = kqueue();
if (!Register(kq, listener_))
{
std::cerr << "Register listener to kq failed.\n";
return -1;
}
WaitEvent(kq);
return 0;
}
int CreateListener()
{
int sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1)
{
std::cerr << "socket() failed:" << errno << std::endl;
return -1;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = inet_addr(IP.c_str());
if (bind(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr)) == -1)
{
std::cerr << "bind() failed:" << errno << std::endl;
return -1;
}
if (listen(sock, 5) == -1)
{
std::cerr << "listen() failed:" << errno << std::endl;
return -1;
}
return sock;
}
bool Register(int kq, int fd)
{
struct kevent changes[1];
EV_SET(&changes[0], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
int ret = kevent(kq, changes, 1, NULL, 0, NULL);
if (ret == -1)
return false;
return true;
}
void WaitEvent(int kq)
{
struct kevent events[MAX_EVENT_COUNT];
while (true)
{
int ret = kevent(kq, NULL, 0, events, MAX_EVENT_COUNT, NULL);
if (ret == -1)
{
std::cerr << "kevent failed!\n";
continue;
}
HandleEvent(kq, events, ret);
}
}
void HandleEvent(int kq, struct kevent* events, int nevents)
{
for (int i = 0; i < nevents; i++)
{
int sock = events[i].ident;
int data = events[i].data;
if (sock == listener_)
Accept(kq, data);
else
Receive(sock, data);
}
}
void Accept(int kq, int connSize)
{
for (int i = 0; i < connSize; i++)
{
int client = accept(listener_, NULL, NULL);
if (client == -1)
{
std::cerr << "Accept failed.\n";
continue;
}
if (!Register(kq, client))
{
std::cerr << "Register client failed.\n";
return;
}
}
}
void Receive(int sock, int availBytes)
{
int bytes = recv(sock, buf_, availBytes, 0);
if (bytes == 0 || bytes == -1)
{
close(sock);
std::cerr << "client close or recv failed.\n";
return;
}
// Write buf to the receive queue.
Enqueue(buf_, bytes);
}
void Enqueue(const char* buf, int bytes)
{
}
更多精彩
赞助商链接