在C++程序中添加逻辑流程控制
2008-03-08 21:38:26 来源:WEB开发网核心提示:问题的引出在计算机程序中,除了常见的执行流程控制,在C++程序中添加逻辑流程控制,还有逻辑流程控制;有时,执行流程即为逻辑流程,假如它存在,从图中删除它,但在大多数情况下还是有所区别的,例如
问题的引出
在计算机程序中,除了常见的执行流程控制,还有逻辑流程控制;有时,执行流程即为逻辑流程,但在大多数情况下还是有所区别的,例如,假定有一个Web服务器使用同步套接字读取HTTP请求,那么会编写如下的代码:
void read(HTTP_REQUEST& http_request)
{
read(http_request.header);
read(http_request.body, http_request.header);
}
void read(HTTP_REQUEST_HEADER& header)
{
string line = read_line();
parse_request_link(line, header.method, header.uri,
header.version);
while (TRUE)
{
line = read_line();
if (line.empty())
break;
parse_header_field(line, header);
}
}
void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
{
string transfer_encoding = header.fields['Transfer-Encoding'];
if (transfer_encoding != b.chunkedb.)
body = read_bytes(header.fields['Content-Length']);
else
{
while (TRUE)
{
string chunk_header = read_line();
DWord chunk_size = atoi(chunk_header);
if (chunk_size == 0)
break;
BYTE[] chunk_body = read_bytes(chunk_size);
body += chunk_body;
}
}
}
string read_line()
{
while (TRUE)
{
int n = strpos(read_buffer, b.\nb., read_buffer.size());
if (n > 0)
break;
read_buffer += socket.read();
}
return read_buffer.extract(n);
}
Byte[] read_bytes(int sz)
{
while (TRUE)
{
if (sz <= read_buffer.size())
break;
read_buffer += socket.read();
}
return read_buffer.extract(sz);
}
在这段代码中,执行流程与逻辑流程是一致的,然而,假如在那些被动接收事件的场合使用了异步套接字,就必须编写下面这样的代码了:
read()
{
read_buffer += socket.read();
if (state == read_request_line)
{
if (!read_line(line))
return;
parse_request_link(line, method, uri, version);
state = read_header_field;
}
while (state == read_request_line)
{
if (!read_line(line))
return;
if (line.empty())
{
transfer_encoding = header.fields['Transfer-Encoding'];
if (transfer_encoding != b.chunkedb.)
{
content_length = header.fields['Content-Length'];
state = read_body;
}
else
state = read_chunk_header;
}
else
parse_header_field(line, header, value);
}
if (state == read_body)
{
request_body += read_buffer;
read_buffer.clear();
if (request_body.size() >= content_length)
state = read_finished;
return;
}
if (state == read_chunk_header)
{
if (!read_line(line))
return;
chunk_size = atoi(line);
if (chunk_size == 0)
{
state = read_finished;
return;
}
state = read_body;
}
if (state == read_chunk_body)
{
request_body.append(read_buffer, chunk_size);
if (chunk_size == 0)
state = read_chunk_header;
return;
}
}
执行流程完全不同了,但逻辑流程却仍保持不变,因为只能一块一块地接收数据,还必须保存状态值及其他变量,以便在事件发生时进行相应的处理。以上只是一些示范性代码,并不能真正工作,在实际中要编写像这样的函数会更加复杂,也更加轻易出错。 更多内容请看C/C++进阶技术文档专题,或
解决方案
为减少麻烦,可在程序主流程之外再创建一个子过程,这个子过程用于执行某些虚拟逻辑,在需要满足某些条件之后才能继续执行时,它可以先停下来,直到主流程告之它再次进行检查。对于上面的示例,可以写成如下的代码:
class Connection
{
SOCKET socket;
Connection(SOCKET s) : socket(s)
{
FLOW_START conn.flow_start();
}
void flow_start()
{
while (TRUE)
{
HTTP_REQUEST http_request;
try {
read(&http_request);
}
catch (EXCEPTION e)
{
break;
}
FILE file;
fp = fopen(http_request.uri);
if (fp == NULL)
{
write(fp);
fclose(file);
}
else
write(504);
}
socket.close();
delete this;
}
void read(HTTP_REQUEST* http_request)
{
read(&http_request.header);
read(&http_request.body, &http_request.header);
}
void read(HTTP_REQUEST_HEADER* header)
{ …}
void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
{ …}
string read_line()
{
while (TRUE)
{
FLOW_WAIT (m_buffer += )
char* str = strchr(m_buffer, '\n');
if (!str)
continue;
}
string s(m_buffer, 0, str - m_buffer);
memcpy(m_buffer, str);
buf_avail -= str - m_buffer;
return s;
}
BYTE[] read_bytes(int sz)
{
while (TRUE)
{
WAIT (m_buffer += );
if (m_buffer.length < sz)
continue;
}
BYTE[] data = m_buffer.extract(0, sz);
return data;
}
void write(FILE* fp)
{
int filesize = fp.size();
string header;
header << "200 OK\r\nContent-Length: " << filesize << ";\r\n"
<<"\r\n";
write(header.c_str(), header.size());
int szBulk;
for (int i = 0; i < filesize; i += szBulk)
{
szBulk = min(filesize - i, 8192);
data = fread(fp, szBulk);
write(data, szBulk);
}
}
void write(WORD error_status)
{
string header;
header << error_status << " Error\r\n"
<<"\r\n";
write(header.c_str(), header.size());
}
void write(BYTE[] data, int len)
{
while (len > 0)
{
int ret = socket.write(data, len);
if (ret > 0)
{
data += ret;
len -= ret;
}
if (len)
{
WAIT (bWritable == TRUE);
}
}
}
void OnRead()
{
int avail = socket.avail();
m_buffer += socket.read(avail);
}
void OnWrite()
{
bWritable = TRUE;
}
void OnClose()
{
delete this;
}
};
main {
Socket listen_socket;
listen_socket.listen(http_port);
socket_add(listen_socket, NULL);
socket_loop(socket_callback);
}
void socket_callback(void* user_data, SOCKET s, int msg,
int lParam, void* pParam)
{
switch (msg)
{
case READ:
if (user_data == NULL)
{
SOCKET s2 = accept(s);
Connection conn = new Connection(socket);
socket_add(s2, conn);
break;
}
((Connection*)user_data)->OnRead();
break;
case WRITE:
((Connection*)user_data)->OnWrite();
break;
case EXCEPT:
((Connection*)user_data)->OnExcept();
break;
}
}
这涉及到两个新的原语:一个为FLOW_START,其创建了一个新的子过程;另一个为FLOW_WAIT,其告之系统何时将被调用以继续程序流程。例如,FLOW_WAIT(m_buffer += )意味着m_buffer的操作符+=被执行,FLOW_WAIT (bWritable = TRUE)意味着bWritable被设为TRUE。
当一个连接对象创建后,因为FLOW_START这条指令,一个子过程也会被创建,执行流程会沿着此过程执行下去,直至碰到FLOW_WAIT,然后,它会继续执行主流程;当它追加m_buffer或设置bWritable为TRUE时,它将继续执行子过程,直至碰到另一个FLOW_WAIT,此时再返回到主流程当中。 更多内容请看C/C++进阶技术文档专题,或
逻辑流程VS线程
逻辑流程看起来像是虚拟线程,但它实际上运行在创建它的线程空间之内。尽管两者都有独立的进程堆栈,但逻辑流程的开销要小一些,且不用处理流程间的同步问题。
逻辑流程也能用于异常处理。例如,可添加类似如下的代码:
START_FLOW {
FLOW_WAIT(read_err=);
…
}
START_FLOW {
FLOW_WAIT(current_tick & last_receive_tick >= RECEIVE_TIMEOUT);
…
}
示例对比
下面还有一个例子演示了流程的可伸缩性及威力,比如说要解析以下格式的URL:
[scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]
假如只想遍历URL字符串一次,可能会编写如下代码:
void URL::ParseString(const string &url)
{
string s;
s.reserve(url.length());
if (Original.empty())
Original = url;
OriginalLength = url.length();
const char *p = url.c_str();
//解析scheme [http:]
while (*p && (*p != '/') && (*p != ':') &&
(*p != ';') && (*p != '?') &&
(*p != '#')) s += *p++;
if (*p == ':')
{
Scheme = s;
p++;
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#')) s += *p++;
}
// 解析 //[user[:pass]@]host[:port]/
// 解析端口)
if (*p && (*p == '/') && (*(p+1) == '/'))
{
p+=2;
s.resize(0);
while (*p && (*p != '/') && (*p != ':') &&
(*p != '@')) s += *p++;
Host = s;
if (*p == ':')
{
s.resize(0);
while (*p && (*p != '/') && (*p != '@')) s += *p++;
if (*p != '@') Port = ip_PORT(atol(&s[0]));
}
if (*p == '@')
{
p++;
if (Host.length() == 0)
{
User = s;
}
else
{
User = Host;
Password = s;
Host.resize(0);
}
s.resize(0);
while (*p && (*p != '/') && (*p != ':')) s += *p++;
Host = s;
if (*p == ':')
{
p++;
s.resize(0);
while (*p && *p != '/') s += *p++;
Port = IP_PORT(atol(&s[0]));
}
}
//重建NetLoc字符串
if (User.length())
{
NetLoc = User;
if (Password.length())
{
NetLoc += ":";
NetLoc += Password;
}
NetLoc += '@';
}
NetLoc += Host;
if (Port != 80)
{
char portstring[15];
NetLoc += ':';
sPRintf(portstring, "%d", Port);
NetLoc += portstring;
}
s.resize(0);
}
//解析路径[/a[/b[..]]/]与文件
//假如碰到'/'且s不为空,这是一个相对路径。
if (s.length() && (*p == '/'))
{
p++;
RelativePath = true;
Path.push_back(s);
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
else
{
//这是一个不带反斜线的纯文件名,或者它只是一个主机名。
if (*p != '/') RelativePath = Host.empty();
else {
p++;
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
}
//只要当前字后跟有反斜线,就把它追加到路径后。
while (*p == '/')
{
p++;
//if (s.length())
Path.push_back(s); // uri可为'...//...'
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
//现在当前字为文件名
File = s;
//
//获取文件类型
//
string::size_type pp = File.rfind('.');
if (pp != string::npos) {
FileType = File.substr(pp+1);
}
//寻找参数
if (*p == ';')
{
p++;
s.resize(0);
while (*p && (*p != '?') && (*p != '#') &&
(*p != '&')) s += *p++;
Params = s;
}
//寻找查询
//接受以'&'打头的查询
if (*p == '?' *p == '&')
{
s = *p; //保存前导查询字符
p++;
while (*p && (*p != '#')) s += *p++;
Query = s;
}
//寻找片断(fragment)
if (*p == '#')
{
p++;
s.resize(0);
while (*p) s += *p++;
Fragment = s;
}
}
假如使用流程的话,代码就会像下面这个样子:
class Url
{
string scheme, host, port, user, pass, uri, param, ankor;
string* head_token;
int last_pos, cur_pos;
char* url;
parse_url(char* param)
{
START_FLOW analyze_url();
url = param;
int len = strlen(url);
last_pos = 0;
cur_pos = 0;
head_token = NULL;
while (cur_pos < len) {
cur_pos++;
}
if (head_token)
*head_token = url + last_pos;
}
void analyze_url()
{
START_FLOW
{
read_to_tail(&scheme, "://");
START_FLOW
read_from_head(&host, "/");
START_FLOW
read_from_head(&port, ":");
START_FLOW
{
string tmp;
read_from_head(&tmp, "@");
user = host;
pass = port;
host.erase();
port.erase();
read_from_head(&port, ":");
host = tmp;
}
}
START_FLOW
{
read_from_head(&uri, "/"));
START_FLOW
read_from_head(¶m, "?");
START_FLOW
read_from_head(&anchor, "#");
}
}
void read_to_tail(string* token, char* end_str)
{
head_token = token;
while (TRUE)
{
WAIT (cur_pos=);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
head_token->assign(url + last_pos, cur_pos - last_pos);
last_pos = cur_pos = cur_pos + strlen(end_str);
head_token = NULL;
}
void read_from_head(string* token, char* start_str)
{
while (TRUE)
{
WAIT (cur_pos=);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
if (head_token)
head_token->assign(url + last_pos, cur_pos - last_pos);
head_token = token;
last_pos = cur_pos + 1;
}
};
代码短多了,也易于修改,面对更复杂的格式也更具可伸缩性。 更多内容请看C/C++进阶技术文档专题,或 使用线程来实现
不幸的是,没有任何编译器可以支持这两个原语,假如想使用它们,只能通过一个线程来实现,虽然会带来一些系统开销, 但是值得。为取代这两个原语,可以使用以下七个宏:
·VFLOW_EVENT_DECLARE(evt):声明一个事件变量。虚拟流程可使用事件来等待或发信号。
·VFLOW_EVENT_INITIALIZE(evt):初始化一个事件变量。这个宏可在C++中并入上一个宏。
·VFLOW_WAIT(evt):一个虚拟流程能调用它来等待一个事件。
·VFLOW_SIGNAL(evt):给一个事件发信号。所有等待事件的虚拟流程将会一个接一个地被激活。当被激活后,将继续之前的流程直至再碰到一个VFLOW_WAIT,此时它又被挂起,而在队列中等待的下一个虚拟流程将会被激活。调用VFLOW_SIGNAL的流程在所有等待的流程全部执行完毕后,才会继续执行。
·VFLOW_TERMINATE(evt):当它被调用时,所有等待事件的虚拟流程会立即退出。
·VFLOW_START(routine, param):要启动一个虚拟流程,需要调用routine(param)。当它碰到第一个VFLOW_WAIT时,它会将执行控制交回它的父流程。
·VFLOW_EXIT:用于虚拟流程的中途退出。
下面是修改后的代码,且在Windows与linux下都能运行:
//analyze [scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]
#include "vflow.h"
#include <stdio.h>
#include <string>
using namespace std;
class Url;
void flow_read_domain(void*);
void flow_read_host(void*);
void flow_read_port(void*);
void flow_read_host_port(void*);
void flow_read_query_string(void*);
void flow_read_param(void*);
void flow_read_anchor(void*);
class Url
{
public:
Url() {}
~Url() {}
string scheme, host, port, user, pass, uri, param, anchor;
string* head_token;
int last_pos, cur_pos;
char* url;
VFLOW_EVENT_DECLARE(cur_pos_changed);
void parse_url(char* param)
{
VFLOW_EVENT_INITIALIZE(cur_pos_changed);
url = param;
int len = strlen(url);
last_pos = 0;
set_pos(0);
head_token = NULL;
analyze_url();
while (cur_pos < len) {
set_pos(cur_pos+1);
}
if (head_token)
*head_token = url + last_pos;
VFLOW_TERMINATE(cur_pos_changed);
uri = "/" + uri;
}
void set_pos(int pos)
{
cur_pos = pos;
VFLOW_SIGNAL(cur_pos_changed);
}
void analyze_url()
{
VFLOW_START(::flow_read_domain, this);
VFLOW_START(::flow_read_query_string, this);
}
void flow_read_domain()
{
read_to_tail(&scheme, "://");
VFLOW_START(::flow_read_host, this);
VFLOW_START(::flow_read_port, this);
VFLOW_START(::flow_read_host_port, this);
}
void flow_read_host()
{
read_to_tail(&host, "/");
}
void flow_read_port()
{
read_from_head(&port, ":");
}
void flow_read_host_port()
{
string tmp;
read_from_head(&tmp, "@");
user = host;
pass = port;
host.erase();
port.erase();
read_from_head(&port, ":");
host = tmp;
}
void flow_read_query_string()
{
read_from_head(&uri, "/");
VFLOW_START(::flow_read_param, this);
VFLOW_START(::flow_read_anchor, this);
}
void flow_read_param()
{
read_from_head(¶m, "?");
}
void flow_read_anchor()
{
read_from_head(&anchor, "#");
}
void read_to_tail(string* token, char* end_str)
{
head_token = token;
while (1)
{
VFLOW_WAIT(cur_pos_changed);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
head_token->assign(url + last_pos, cur_pos - last_pos);
last_pos = cur_pos = cur_pos + strlen(end_str);
head_token = NULL;
}
void read_from_head(string* token, char* start_str)
{
while (1)
{
VFLOW_WAIT(cur_pos_changed);
if (memcmp(url + cur_pos, start_str, strlen(start_str)) == 0)
break;
}
if (head_token)
head_token->assign(url + last_pos, cur_pos - last_pos);
head_token = token;
last_pos = cur_pos + 1;
}
};
void flow_read_domain(void* param)
{ ((Url*)param)->flow_read_domain(); }
void flow_read_host(void* param)
{ ((Url*)param)->flow_read_host(); }
void flow_read_port(void* param)
{ ((Url*)param)->flow_read_port(); }
void flow_read_host_port(void* param)
{ ((Url*)param)->flow_read_host_port(); }
void flow_read_query_string(void* param)
{ ((Url*)param)->flow_read_query_string(); }
void flow_read_param(void* param)
{ ((Url*)param)->flow_read_param(); }
void flow_read_anchor(void* param)
{ ((Url*)param)->flow_read_anchor(); }
int main(int argc, char* argv[])
{
Url url;
url.parse_url("http://user:pass@abc.com:80/abc/def/
ghi.php?jklmn=1234&opq=567#rstuvw");
printf("schema=%s\nuser=%s\npass=%s\nhost=%s\nport=%s\nuri=%s\
nparam=%s\nanchor=%s\n",
url.scheme.c_str(), url.user.c_str(), url.pass.c_str(),
url.host.c_str(), url.port.c_str(), url.uri.c_str(),
url.param.c_str(), url.anchor.c_str());
return 0;
}
//vflow.h
#ifndef _VFLOW_H_
#define _VFLOW_H_
#ifdef WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
typedef
#ifdef WIN32
DWORD
#else
pthread_t
#endif
VF_THREAD_ID;
typedef void (*LPVFLOW_START_ROUTINE)(void* param);
typedef strUCt STRU_VIRTUAL_FLOW {
VF_THREAD_ID thread_id;
struct STRU_VIRTUAL_FLOW* map_prev;
struct STRU_VIRTUAL_FLOW* map_next;
struct STRU_VIRTUAL_FLOW* evt_next;
unsigned short status; // 1 means exit
#ifdef WIN32
HANDLE evt;
#else
pthread_mutex_t mut;
pthread_cond_t cond;
#endif
LPVFLOW_START_ROUTINE routine;
void* param;
} VIRTUAL_FLOW;
typedef struct {
VIRTUAL_FLOW* first;
VIRTUAL_FLOW* last;
} VIRTUAL_FLOW_EVENT;
//声明一个流程事件
#define VFLOW_EVENT_DECLARE(evt) \
VIRTUAL_FLOW_EVENT vf_##evt;
#define VFLOW_EVENT_INITIALIZE(evt) \
vf_##evt.first = vf_##evt.last = NULL;
#define VFLOW_START vf_start
//添加到等待队列
#define VFLOW_WAIT(evt) \
vf_wait(&vf_##evt);
//给等待事件的流程发信号
#define VFLOW_SIGNAL(evt) \
vf_signal(&vf_##evt);
//结束等待某一事件的所有流程
#define VFLOW_TERMINATE(evt) \
vf_terminate(&vf_##evt);
#define VFLOW_EXIT vf_exit
void vf_start(LPVFLOW_START_ROUTINE routine, void* param);
void vf_wait(VIRTUAL_FLOW_EVENT* evt);
void vf_signal(VIRTUAL_FLOW_EVENT* evt);
void vf_terminate(VIRTUAL_FLOW_EVENT* evt);
void vf_exit();
#ifdef __cplusplus
}
#endif
#endif // _VFLOW_H_
//vflow.c
#include "vflow.h"
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <sys/types.h>
#include <linux/unistd.h>
#endif
#define VF_MAP_SIZE 17
int g_vf_init = 0;
VIRTUAL_FLOW* g_vf_map[VF_MAP_SIZE];
#ifdef WIN32
#define GetThreadId GetCurrentThreadId
#else
#define GetThreadId pthread_self
#endif
//基于线程ID,从g_vf_map中获取virtual_flow
//假如bCreate = 1,且它不存在,就创建一个。
//否则,假如它存在,从图中删除它。
VIRTUAL_FLOW* get_my_vf(unsigned int bCreate)
{
VF_THREAD_ID thread_id = GetThreadId();
int n = ((unsigned char)(thread_id >> 24) + (unsigned char)(thread_id >> 16) + (unsigned char)(thread_id >> 8) + (unsigned char)thread_id) / VF_MAP_SIZE;
VIRTUAL_FLOW** ppVF = g_vf_map + n;
VIRTUAL_FLOW* pVF, *pVF2;
if (*ppVF == NULL)
{
if (!bCreate)
return NULL;
pVF = (VIRTUAL_FLOW*)malloc(sizeof(VIRTUAL_FLOW));
pVF->map_prev = pVF->map_next = pVF->evt_next = NULL;
*ppVF = pVF;
}
else
{
pVF = *ppVF;
while (1)
{
if (pVF->thread_id == thread_id)
{
if (bCreate)
return pVF;
if (pVF == *ppVF)
{
*ppVF = pVF->map_next;
if (*ppVF)
(*ppVF)->map_prev = NULL;
}
else
{
pVF->map_prev->map_next = pVF->map_next;
if (pVF->map_next)
pVF->map_next->map_prev = pVF->map_prev;
}
#ifdef WIN32
CloseHandle(pVF->evt);
#else
pthread_cond_destroy(&pVF->cond);
#endif
free(pVF);
return NULL;
}
if (pVF->map_next == NULL)
break;
pVF = pVF->map_next;
}
if (!bCreate)
return NULL;
pVF2 = (VIRTUAL_FLOW*)malloc(sizeof(VIRTUAL_FLOW));
pVF2->map_prev = pVF;
pVF2->map_next = pVF2->evt_next = NULL;
pVF->map_next = pVF2;
pVF = pVF2;
}
pVF->thread_id = thread_id;
#ifdef WIN32
pVF->evt = CreateEvent(NULL, FALSE, FALSE, NULL);
#else
pthread_cond_init(&pVF->cond, NULL);
pthread_mutex_init(&pVF->mut, NULL);
#endif
pVF->status = 0;
return pVF;
}
void vf_flow_wait(VIRTUAL_FLOW* vf)
{
#ifdef WIN32
WaitForSingleObject(vf->evt, INFINITE);
#else
pthread_cond_wait(&vf->cond, &vf->mut);
pthread_mutex_unlock(&vf->mut);
#endif
if (vf->status > 0)
{
vf_exit();
#ifdef WIN32
ExitThread(0);
#else
pthread_exit(NULL);
#endif
}
}
void vf_flow_activate(VIRTUAL_FLOW* vf)
{
#ifdef WIN32
SetEvent(vf->evt);
#else
pthread_mutex_lock(&vf->mut);
pthread_cond_signal(&vf->cond);
pthread_mutex_unlock(&vf->mut);
#endif
}
#ifdef WIN32
DWORD WINAPI
#else
void*
#endif
vf_flow_routine(void* param)
{
VIRTUAL_FLOW* parent_vf = (VIRTUAL_FLOW*)param;
VIRTUAL_FLOW* vf = get_my_vf(1);
vf->evt_next = parent_vf;
parent_vf->routine(parent_vf->param);
vf_exit();
#ifdef WIN32
return 0;
#else
return NULL;
#endif
}
void vf_init()
{
if (g_vf_init)
return;
memset(g_vf_map, 0, sizeof(g_vf_map));
g_vf_init = 1;
}
void vf_start(LPVFLOW_START_ROUTINE routine, void* param)
{
VIRTUAL_FLOW* vf;
#ifndef WIN32
pthread_t thread;
#endif
vf_init();
vf = get_my_vf(1);
vf->routine = routine;
vf->param = param;
#ifdef WIN32
CreateThread(NULL, 0, vf_flow_routine, vf, 0, NULL);
#else
pthread_mutex_lock(&vf->mut);
pthread_create(&thread, NULL, vf_flow_routine, vf);
#endif
vf_flow_wait(vf);
}
void vf_wait(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_next;
vf_init();
vf = get_my_vf(1);
if (evt->first == NULL)
evt->first = evt->last = vf;
else
{
evt->last->evt_next = vf;
evt->last = vf;
}
#ifndef WIN32
pthread_mutex_lock(&vf->mut);
#endif
vf_next = vf->evt_next;
if (vf_next)
{
vf->evt_next = NULL;
vf_flow_activate(vf_next);
}
vf_flow_wait(vf);
}
void vf_signal(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_first;
vf_init();
if (!
(vf_first = evt->first))
return;
vf = get_my_vf(1);
#ifndef WIN32
pthread_mutex_lock(&vf->mut);
#endif
evt->last->evt_next = vf;
evt->first = evt->last = NULL;
vf_flow_activate(vf_first);
vf_flow_wait(vf);
}
void vf_terminate(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_first;
vf_init();
for (vf = evt->first; vf; vf = vf->evt_next)
vf->status = 1;
vf_first = evt->first;
evt->first = evt->last = NULL;
if (vf_first)
vf_flow_activate(vf_first);
}
void vf_exit()
{
VIRTUAL_FLOW* vf;
vf = get_my_vf(1);
if (vf->evt_next)
vf_flow_activate(vf->evt_next);
get_my_vf(0);
} 更多内容请看C/C++进阶技术文档专题,或
在计算机程序中,除了常见的执行流程控制,还有逻辑流程控制;有时,执行流程即为逻辑流程,但在大多数情况下还是有所区别的,例如,假定有一个Web服务器使用同步套接字读取HTTP请求,那么会编写如下的代码:
void read(HTTP_REQUEST& http_request)
{
read(http_request.header);
read(http_request.body, http_request.header);
}
void read(HTTP_REQUEST_HEADER& header)
{
string line = read_line();
parse_request_link(line, header.method, header.uri,
header.version);
while (TRUE)
{
line = read_line();
if (line.empty())
break;
parse_header_field(line, header);
}
}
void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
{
string transfer_encoding = header.fields['Transfer-Encoding'];
if (transfer_encoding != b.chunkedb.)
body = read_bytes(header.fields['Content-Length']);
else
{
while (TRUE)
{
string chunk_header = read_line();
DWord chunk_size = atoi(chunk_header);
if (chunk_size == 0)
break;
BYTE[] chunk_body = read_bytes(chunk_size);
body += chunk_body;
}
}
}
string read_line()
{
while (TRUE)
{
int n = strpos(read_buffer, b.\nb., read_buffer.size());
if (n > 0)
break;
read_buffer += socket.read();
}
return read_buffer.extract(n);
}
Byte[] read_bytes(int sz)
{
while (TRUE)
{
if (sz <= read_buffer.size())
break;
read_buffer += socket.read();
}
return read_buffer.extract(sz);
}
在这段代码中,执行流程与逻辑流程是一致的,然而,假如在那些被动接收事件的场合使用了异步套接字,就必须编写下面这样的代码了:
read()
{
read_buffer += socket.read();
if (state == read_request_line)
{
if (!read_line(line))
return;
parse_request_link(line, method, uri, version);
state = read_header_field;
}
while (state == read_request_line)
{
if (!read_line(line))
return;
if (line.empty())
{
transfer_encoding = header.fields['Transfer-Encoding'];
if (transfer_encoding != b.chunkedb.)
{
content_length = header.fields['Content-Length'];
state = read_body;
}
else
state = read_chunk_header;
}
else
parse_header_field(line, header, value);
}
if (state == read_body)
{
request_body += read_buffer;
read_buffer.clear();
if (request_body.size() >= content_length)
state = read_finished;
return;
}
if (state == read_chunk_header)
{
if (!read_line(line))
return;
chunk_size = atoi(line);
if (chunk_size == 0)
{
state = read_finished;
return;
}
state = read_body;
}
if (state == read_chunk_body)
{
request_body.append(read_buffer, chunk_size);
if (chunk_size == 0)
state = read_chunk_header;
return;
}
}
执行流程完全不同了,但逻辑流程却仍保持不变,因为只能一块一块地接收数据,还必须保存状态值及其他变量,以便在事件发生时进行相应的处理。以上只是一些示范性代码,并不能真正工作,在实际中要编写像这样的函数会更加复杂,也更加轻易出错。 更多内容请看C/C++进阶技术文档专题,或
解决方案
为减少麻烦,可在程序主流程之外再创建一个子过程,这个子过程用于执行某些虚拟逻辑,在需要满足某些条件之后才能继续执行时,它可以先停下来,直到主流程告之它再次进行检查。对于上面的示例,可以写成如下的代码:
class Connection
{
SOCKET socket;
Connection(SOCKET s) : socket(s)
{
FLOW_START conn.flow_start();
}
void flow_start()
{
while (TRUE)
{
HTTP_REQUEST http_request;
try {
read(&http_request);
}
catch (EXCEPTION e)
{
break;
}
FILE file;
fp = fopen(http_request.uri);
if (fp == NULL)
{
write(fp);
fclose(file);
}
else
write(504);
}
socket.close();
delete this;
}
void read(HTTP_REQUEST* http_request)
{
read(&http_request.header);
read(&http_request.body, &http_request.header);
}
void read(HTTP_REQUEST_HEADER* header)
{ …}
void read(BYTE[]& body, HTTP_REQUEST_HEADER& header)
{ …}
string read_line()
{
while (TRUE)
{
FLOW_WAIT (m_buffer += )
char* str = strchr(m_buffer, '\n');
if (!str)
continue;
}
string s(m_buffer, 0, str - m_buffer);
memcpy(m_buffer, str);
buf_avail -= str - m_buffer;
return s;
}
BYTE[] read_bytes(int sz)
{
while (TRUE)
{
WAIT (m_buffer += );
if (m_buffer.length < sz)
continue;
}
BYTE[] data = m_buffer.extract(0, sz);
return data;
}
void write(FILE* fp)
{
int filesize = fp.size();
string header;
header << "200 OK\r\nContent-Length: " << filesize << ";\r\n"
<<"\r\n";
write(header.c_str(), header.size());
int szBulk;
for (int i = 0; i < filesize; i += szBulk)
{
szBulk = min(filesize - i, 8192);
data = fread(fp, szBulk);
write(data, szBulk);
}
}
void write(WORD error_status)
{
string header;
header << error_status << " Error\r\n"
<<"\r\n";
write(header.c_str(), header.size());
}
void write(BYTE[] data, int len)
{
while (len > 0)
{
int ret = socket.write(data, len);
if (ret > 0)
{
data += ret;
len -= ret;
}
if (len)
{
WAIT (bWritable == TRUE);
}
}
}
void OnRead()
{
int avail = socket.avail();
m_buffer += socket.read(avail);
}
void OnWrite()
{
bWritable = TRUE;
}
void OnClose()
{
delete this;
}
};
main {
Socket listen_socket;
listen_socket.listen(http_port);
socket_add(listen_socket, NULL);
socket_loop(socket_callback);
}
void socket_callback(void* user_data, SOCKET s, int msg,
int lParam, void* pParam)
{
switch (msg)
{
case READ:
if (user_data == NULL)
{
SOCKET s2 = accept(s);
Connection conn = new Connection(socket);
socket_add(s2, conn);
break;
}
((Connection*)user_data)->OnRead();
break;
case WRITE:
((Connection*)user_data)->OnWrite();
break;
case EXCEPT:
((Connection*)user_data)->OnExcept();
break;
}
}
这涉及到两个新的原语:一个为FLOW_START,其创建了一个新的子过程;另一个为FLOW_WAIT,其告之系统何时将被调用以继续程序流程。例如,FLOW_WAIT(m_buffer += )意味着m_buffer的操作符+=被执行,FLOW_WAIT (bWritable = TRUE)意味着bWritable被设为TRUE。
当一个连接对象创建后,因为FLOW_START这条指令,一个子过程也会被创建,执行流程会沿着此过程执行下去,直至碰到FLOW_WAIT,然后,它会继续执行主流程;当它追加m_buffer或设置bWritable为TRUE时,它将继续执行子过程,直至碰到另一个FLOW_WAIT,此时再返回到主流程当中。 更多内容请看C/C++进阶技术文档专题,或
逻辑流程VS线程
逻辑流程看起来像是虚拟线程,但它实际上运行在创建它的线程空间之内。尽管两者都有独立的进程堆栈,但逻辑流程的开销要小一些,且不用处理流程间的同步问题。
逻辑流程也能用于异常处理。例如,可添加类似如下的代码:
START_FLOW {
FLOW_WAIT(read_err=);
…
}
START_FLOW {
FLOW_WAIT(current_tick & last_receive_tick >= RECEIVE_TIMEOUT);
…
}
示例对比
下面还有一个例子演示了流程的可伸缩性及威力,比如说要解析以下格式的URL:
[scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]
假如只想遍历URL字符串一次,可能会编写如下代码:
void URL::ParseString(const string &url)
{
string s;
s.reserve(url.length());
if (Original.empty())
Original = url;
OriginalLength = url.length();
const char *p = url.c_str();
//解析scheme [http:]
while (*p && (*p != '/') && (*p != ':') &&
(*p != ';') && (*p != '?') &&
(*p != '#')) s += *p++;
if (*p == ':')
{
Scheme = s;
p++;
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#')) s += *p++;
}
// 解析 //[user[:pass]@]host[:port]/
// 解析端口)
if (*p && (*p == '/') && (*(p+1) == '/'))
{
p+=2;
s.resize(0);
while (*p && (*p != '/') && (*p != ':') &&
(*p != '@')) s += *p++;
Host = s;
if (*p == ':')
{
s.resize(0);
while (*p && (*p != '/') && (*p != '@')) s += *p++;
if (*p != '@') Port = ip_PORT(atol(&s[0]));
}
if (*p == '@')
{
p++;
if (Host.length() == 0)
{
User = s;
}
else
{
User = Host;
Password = s;
Host.resize(0);
}
s.resize(0);
while (*p && (*p != '/') && (*p != ':')) s += *p++;
Host = s;
if (*p == ':')
{
p++;
s.resize(0);
while (*p && *p != '/') s += *p++;
Port = IP_PORT(atol(&s[0]));
}
}
//重建NetLoc字符串
if (User.length())
{
NetLoc = User;
if (Password.length())
{
NetLoc += ":";
NetLoc += Password;
}
NetLoc += '@';
}
NetLoc += Host;
if (Port != 80)
{
char portstring[15];
NetLoc += ':';
sPRintf(portstring, "%d", Port);
NetLoc += portstring;
}
s.resize(0);
}
//解析路径[/a[/b[..]]/]与文件
//假如碰到'/'且s不为空,这是一个相对路径。
if (s.length() && (*p == '/'))
{
p++;
RelativePath = true;
Path.push_back(s);
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
else
{
//这是一个不带反斜线的纯文件名,或者它只是一个主机名。
if (*p != '/') RelativePath = Host.empty();
else {
p++;
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
}
//只要当前字后跟有反斜线,就把它追加到路径后。
while (*p == '/')
{
p++;
//if (s.length())
Path.push_back(s); // uri可为'...//...'
s.resize(0);
while (*p && (*p != '/') && (*p != ';') &&
(*p != '?') && (*p != '#') && (*p != '&')) s += *p++;
}
//现在当前字为文件名
File = s;
//
//获取文件类型
//
string::size_type pp = File.rfind('.');
if (pp != string::npos) {
FileType = File.substr(pp+1);
}
//寻找参数
if (*p == ';')
{
p++;
s.resize(0);
while (*p && (*p != '?') && (*p != '#') &&
(*p != '&')) s += *p++;
Params = s;
}
//寻找查询
//接受以'&'打头的查询
if (*p == '?' *p == '&')
{
s = *p; //保存前导查询字符
p++;
while (*p && (*p != '#')) s += *p++;
Query = s;
}
//寻找片断(fragment)
if (*p == '#')
{
p++;
s.resize(0);
while (*p) s += *p++;
Fragment = s;
}
}
假如使用流程的话,代码就会像下面这个样子:
class Url
{
string scheme, host, port, user, pass, uri, param, ankor;
string* head_token;
int last_pos, cur_pos;
char* url;
parse_url(char* param)
{
START_FLOW analyze_url();
url = param;
int len = strlen(url);
last_pos = 0;
cur_pos = 0;
head_token = NULL;
while (cur_pos < len) {
cur_pos++;
}
if (head_token)
*head_token = url + last_pos;
}
void analyze_url()
{
START_FLOW
{
read_to_tail(&scheme, "://");
START_FLOW
read_from_head(&host, "/");
START_FLOW
read_from_head(&port, ":");
START_FLOW
{
string tmp;
read_from_head(&tmp, "@");
user = host;
pass = port;
host.erase();
port.erase();
read_from_head(&port, ":");
host = tmp;
}
}
START_FLOW
{
read_from_head(&uri, "/"));
START_FLOW
read_from_head(¶m, "?");
START_FLOW
read_from_head(&anchor, "#");
}
}
void read_to_tail(string* token, char* end_str)
{
head_token = token;
while (TRUE)
{
WAIT (cur_pos=);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
head_token->assign(url + last_pos, cur_pos - last_pos);
last_pos = cur_pos = cur_pos + strlen(end_str);
head_token = NULL;
}
void read_from_head(string* token, char* start_str)
{
while (TRUE)
{
WAIT (cur_pos=);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
if (head_token)
head_token->assign(url + last_pos, cur_pos - last_pos);
head_token = token;
last_pos = cur_pos + 1;
}
};
代码短多了,也易于修改,面对更复杂的格式也更具可伸缩性。 更多内容请看C/C++进阶技术文档专题,或 使用线程来实现
不幸的是,没有任何编译器可以支持这两个原语,假如想使用它们,只能通过一个线程来实现,虽然会带来一些系统开销, 但是值得。为取代这两个原语,可以使用以下七个宏:
·VFLOW_EVENT_DECLARE(evt):声明一个事件变量。虚拟流程可使用事件来等待或发信号。
·VFLOW_EVENT_INITIALIZE(evt):初始化一个事件变量。这个宏可在C++中并入上一个宏。
·VFLOW_WAIT(evt):一个虚拟流程能调用它来等待一个事件。
·VFLOW_SIGNAL(evt):给一个事件发信号。所有等待事件的虚拟流程将会一个接一个地被激活。当被激活后,将继续之前的流程直至再碰到一个VFLOW_WAIT,此时它又被挂起,而在队列中等待的下一个虚拟流程将会被激活。调用VFLOW_SIGNAL的流程在所有等待的流程全部执行完毕后,才会继续执行。
·VFLOW_TERMINATE(evt):当它被调用时,所有等待事件的虚拟流程会立即退出。
·VFLOW_START(routine, param):要启动一个虚拟流程,需要调用routine(param)。当它碰到第一个VFLOW_WAIT时,它会将执行控制交回它的父流程。
·VFLOW_EXIT:用于虚拟流程的中途退出。
下面是修改后的代码,且在Windows与linux下都能运行:
//analyze [scheme://[user:pass@]host[:port]]/]uri[?param[#ankor]]
#include "vflow.h"
#include <stdio.h>
#include <string>
using namespace std;
class Url;
void flow_read_domain(void*);
void flow_read_host(void*);
void flow_read_port(void*);
void flow_read_host_port(void*);
void flow_read_query_string(void*);
void flow_read_param(void*);
void flow_read_anchor(void*);
class Url
{
public:
Url() {}
~Url() {}
string scheme, host, port, user, pass, uri, param, anchor;
string* head_token;
int last_pos, cur_pos;
char* url;
VFLOW_EVENT_DECLARE(cur_pos_changed);
void parse_url(char* param)
{
VFLOW_EVENT_INITIALIZE(cur_pos_changed);
url = param;
int len = strlen(url);
last_pos = 0;
set_pos(0);
head_token = NULL;
analyze_url();
while (cur_pos < len) {
set_pos(cur_pos+1);
}
if (head_token)
*head_token = url + last_pos;
VFLOW_TERMINATE(cur_pos_changed);
uri = "/" + uri;
}
void set_pos(int pos)
{
cur_pos = pos;
VFLOW_SIGNAL(cur_pos_changed);
}
void analyze_url()
{
VFLOW_START(::flow_read_domain, this);
VFLOW_START(::flow_read_query_string, this);
}
void flow_read_domain()
{
read_to_tail(&scheme, "://");
VFLOW_START(::flow_read_host, this);
VFLOW_START(::flow_read_port, this);
VFLOW_START(::flow_read_host_port, this);
}
void flow_read_host()
{
read_to_tail(&host, "/");
}
void flow_read_port()
{
read_from_head(&port, ":");
}
void flow_read_host_port()
{
string tmp;
read_from_head(&tmp, "@");
user = host;
pass = port;
host.erase();
port.erase();
read_from_head(&port, ":");
host = tmp;
}
void flow_read_query_string()
{
read_from_head(&uri, "/");
VFLOW_START(::flow_read_param, this);
VFLOW_START(::flow_read_anchor, this);
}
void flow_read_param()
{
read_from_head(¶m, "?");
}
void flow_read_anchor()
{
read_from_head(&anchor, "#");
}
void read_to_tail(string* token, char* end_str)
{
head_token = token;
while (1)
{
VFLOW_WAIT(cur_pos_changed);
if (memcmp(url + cur_pos, end_str, strlen(end_str)) == 0)
break;
}
head_token->assign(url + last_pos, cur_pos - last_pos);
last_pos = cur_pos = cur_pos + strlen(end_str);
head_token = NULL;
}
void read_from_head(string* token, char* start_str)
{
while (1)
{
VFLOW_WAIT(cur_pos_changed);
if (memcmp(url + cur_pos, start_str, strlen(start_str)) == 0)
break;
}
if (head_token)
head_token->assign(url + last_pos, cur_pos - last_pos);
head_token = token;
last_pos = cur_pos + 1;
}
};
void flow_read_domain(void* param)
{ ((Url*)param)->flow_read_domain(); }
void flow_read_host(void* param)
{ ((Url*)param)->flow_read_host(); }
void flow_read_port(void* param)
{ ((Url*)param)->flow_read_port(); }
void flow_read_host_port(void* param)
{ ((Url*)param)->flow_read_host_port(); }
void flow_read_query_string(void* param)
{ ((Url*)param)->flow_read_query_string(); }
void flow_read_param(void* param)
{ ((Url*)param)->flow_read_param(); }
void flow_read_anchor(void* param)
{ ((Url*)param)->flow_read_anchor(); }
int main(int argc, char* argv[])
{
Url url;
url.parse_url("http://user:pass@abc.com:80/abc/def/
ghi.php?jklmn=1234&opq=567#rstuvw");
printf("schema=%s\nuser=%s\npass=%s\nhost=%s\nport=%s\nuri=%s\
nparam=%s\nanchor=%s\n",
url.scheme.c_str(), url.user.c_str(), url.pass.c_str(),
url.host.c_str(), url.port.c_str(), url.uri.c_str(),
url.param.c_str(), url.anchor.c_str());
return 0;
}
//vflow.h
#ifndef _VFLOW_H_
#define _VFLOW_H_
#ifdef WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
typedef
#ifdef WIN32
DWORD
#else
pthread_t
#endif
VF_THREAD_ID;
typedef void (*LPVFLOW_START_ROUTINE)(void* param);
typedef strUCt STRU_VIRTUAL_FLOW {
VF_THREAD_ID thread_id;
struct STRU_VIRTUAL_FLOW* map_prev;
struct STRU_VIRTUAL_FLOW* map_next;
struct STRU_VIRTUAL_FLOW* evt_next;
unsigned short status; // 1 means exit
#ifdef WIN32
HANDLE evt;
#else
pthread_mutex_t mut;
pthread_cond_t cond;
#endif
LPVFLOW_START_ROUTINE routine;
void* param;
} VIRTUAL_FLOW;
typedef struct {
VIRTUAL_FLOW* first;
VIRTUAL_FLOW* last;
} VIRTUAL_FLOW_EVENT;
//声明一个流程事件
#define VFLOW_EVENT_DECLARE(evt) \
VIRTUAL_FLOW_EVENT vf_##evt;
#define VFLOW_EVENT_INITIALIZE(evt) \
vf_##evt.first = vf_##evt.last = NULL;
#define VFLOW_START vf_start
//添加到等待队列
#define VFLOW_WAIT(evt) \
vf_wait(&vf_##evt);
//给等待事件的流程发信号
#define VFLOW_SIGNAL(evt) \
vf_signal(&vf_##evt);
//结束等待某一事件的所有流程
#define VFLOW_TERMINATE(evt) \
vf_terminate(&vf_##evt);
#define VFLOW_EXIT vf_exit
void vf_start(LPVFLOW_START_ROUTINE routine, void* param);
void vf_wait(VIRTUAL_FLOW_EVENT* evt);
void vf_signal(VIRTUAL_FLOW_EVENT* evt);
void vf_terminate(VIRTUAL_FLOW_EVENT* evt);
void vf_exit();
#ifdef __cplusplus
}
#endif
#endif // _VFLOW_H_
//vflow.c
#include "vflow.h"
#include <stdlib.h>
#include <string.h>
#ifndef WIN32
#include <sys/types.h>
#include <linux/unistd.h>
#endif
#define VF_MAP_SIZE 17
int g_vf_init = 0;
VIRTUAL_FLOW* g_vf_map[VF_MAP_SIZE];
#ifdef WIN32
#define GetThreadId GetCurrentThreadId
#else
#define GetThreadId pthread_self
#endif
//基于线程ID,从g_vf_map中获取virtual_flow
//假如bCreate = 1,且它不存在,就创建一个。
//否则,假如它存在,从图中删除它。
VIRTUAL_FLOW* get_my_vf(unsigned int bCreate)
{
VF_THREAD_ID thread_id = GetThreadId();
int n = ((unsigned char)(thread_id >> 24) + (unsigned char)(thread_id >> 16) + (unsigned char)(thread_id >> 8) + (unsigned char)thread_id) / VF_MAP_SIZE;
VIRTUAL_FLOW** ppVF = g_vf_map + n;
VIRTUAL_FLOW* pVF, *pVF2;
if (*ppVF == NULL)
{
if (!bCreate)
return NULL;
pVF = (VIRTUAL_FLOW*)malloc(sizeof(VIRTUAL_FLOW));
pVF->map_prev = pVF->map_next = pVF->evt_next = NULL;
*ppVF = pVF;
}
else
{
pVF = *ppVF;
while (1)
{
if (pVF->thread_id == thread_id)
{
if (bCreate)
return pVF;
if (pVF == *ppVF)
{
*ppVF = pVF->map_next;
if (*ppVF)
(*ppVF)->map_prev = NULL;
}
else
{
pVF->map_prev->map_next = pVF->map_next;
if (pVF->map_next)
pVF->map_next->map_prev = pVF->map_prev;
}
#ifdef WIN32
CloseHandle(pVF->evt);
#else
pthread_cond_destroy(&pVF->cond);
#endif
free(pVF);
return NULL;
}
if (pVF->map_next == NULL)
break;
pVF = pVF->map_next;
}
if (!bCreate)
return NULL;
pVF2 = (VIRTUAL_FLOW*)malloc(sizeof(VIRTUAL_FLOW));
pVF2->map_prev = pVF;
pVF2->map_next = pVF2->evt_next = NULL;
pVF->map_next = pVF2;
pVF = pVF2;
}
pVF->thread_id = thread_id;
#ifdef WIN32
pVF->evt = CreateEvent(NULL, FALSE, FALSE, NULL);
#else
pthread_cond_init(&pVF->cond, NULL);
pthread_mutex_init(&pVF->mut, NULL);
#endif
pVF->status = 0;
return pVF;
}
void vf_flow_wait(VIRTUAL_FLOW* vf)
{
#ifdef WIN32
WaitForSingleObject(vf->evt, INFINITE);
#else
pthread_cond_wait(&vf->cond, &vf->mut);
pthread_mutex_unlock(&vf->mut);
#endif
if (vf->status > 0)
{
vf_exit();
#ifdef WIN32
ExitThread(0);
#else
pthread_exit(NULL);
#endif
}
}
void vf_flow_activate(VIRTUAL_FLOW* vf)
{
#ifdef WIN32
SetEvent(vf->evt);
#else
pthread_mutex_lock(&vf->mut);
pthread_cond_signal(&vf->cond);
pthread_mutex_unlock(&vf->mut);
#endif
}
#ifdef WIN32
DWORD WINAPI
#else
void*
#endif
vf_flow_routine(void* param)
{
VIRTUAL_FLOW* parent_vf = (VIRTUAL_FLOW*)param;
VIRTUAL_FLOW* vf = get_my_vf(1);
vf->evt_next = parent_vf;
parent_vf->routine(parent_vf->param);
vf_exit();
#ifdef WIN32
return 0;
#else
return NULL;
#endif
}
void vf_init()
{
if (g_vf_init)
return;
memset(g_vf_map, 0, sizeof(g_vf_map));
g_vf_init = 1;
}
void vf_start(LPVFLOW_START_ROUTINE routine, void* param)
{
VIRTUAL_FLOW* vf;
#ifndef WIN32
pthread_t thread;
#endif
vf_init();
vf = get_my_vf(1);
vf->routine = routine;
vf->param = param;
#ifdef WIN32
CreateThread(NULL, 0, vf_flow_routine, vf, 0, NULL);
#else
pthread_mutex_lock(&vf->mut);
pthread_create(&thread, NULL, vf_flow_routine, vf);
#endif
vf_flow_wait(vf);
}
void vf_wait(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_next;
vf_init();
vf = get_my_vf(1);
if (evt->first == NULL)
evt->first = evt->last = vf;
else
{
evt->last->evt_next = vf;
evt->last = vf;
}
#ifndef WIN32
pthread_mutex_lock(&vf->mut);
#endif
vf_next = vf->evt_next;
if (vf_next)
{
vf->evt_next = NULL;
vf_flow_activate(vf_next);
}
vf_flow_wait(vf);
}
void vf_signal(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_first;
vf_init();
if (!
(vf_first = evt->first))
return;
vf = get_my_vf(1);
#ifndef WIN32
pthread_mutex_lock(&vf->mut);
#endif
evt->last->evt_next = vf;
evt->first = evt->last = NULL;
vf_flow_activate(vf_first);
vf_flow_wait(vf);
}
void vf_terminate(VIRTUAL_FLOW_EVENT* evt)
{
VIRTUAL_FLOW* vf, *vf_first;
vf_init();
for (vf = evt->first; vf; vf = vf->evt_next)
vf->status = 1;
vf_first = evt->first;
evt->first = evt->last = NULL;
if (vf_first)
vf_flow_activate(vf_first);
}
void vf_exit()
{
VIRTUAL_FLOW* vf;
vf = get_my_vf(1);
if (vf->evt_next)
vf_flow_activate(vf->evt_next);
get_my_vf(0);
} 更多内容请看C/C++进阶技术文档专题,或
更多精彩
赞助商链接