本章描述bufferevent的一些对通常使用不必要的高级特征。如果只想学习如何使用bufferevent,可以跳过这一章,直接阅读下一章。
有时候网络程序需要与自身通信。比如说,通过某些协议对用户连接进行隧道操作的程序,有时候也需要通过同样的协议对自身的连接进行隧道操作。当然,可以通过打开一个到自身监听端口的连接,让程序使用这个连接来达到这种目标。但是,通过网络栈来与自身通信比较浪费资源。
替代的解决方案是,创建一对成对的bufferevent。这样,写入到一个bufferevent的字节都被另一个接收(反过来也是),但是不需要使用套接字。
int
bufferevent_pair_new(struct event_base *base, int options,
struct bufferevent *pair[2])
{
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
int tmp_options;
options |= BEV_OPT_DEFER_CALLBACKS;
tmp_options = options & ~BEV_OPT_THREADSAFE;
bufev1 = bufferevent_pair_elt_new(base, options);
if (!bufev1)
return -1;
bufev2 = bufferevent_pair_elt_new(base, tmp_options);
if (!bufev2) {
bufferevent_free(downcast(bufev1));
return -1;
}
if (options & BEV_OPT_THREADSAFE) {
/*XXXX check return */
bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock);
}
bufev1->partner = bufev2;
bufev2->partner = bufev1;
evbuffer_freeze(downcast(bufev1)->input, 0);
evbuffer_freeze(downcast(bufev1)->output, 1);
evbuffer_freeze(downcast(bufev2)->input, 0);
evbuffer_freeze(downcast(bufev2)->output, 1);
pair[0] = downcast(bufev1);
pair[1] = downcast(bufev2);
return 0;
}
调用bufferevent_pair_new()会设置pair[0]和pair[1]为一对相互连接的bufferevent。除了BEV_OPT_CLOSE_ON_FREE无效、BEV_OPT_DEFER_CALLBACKS总是打开的之外,所有通常的选项都是支持的。
为什么bufferevent对需要带延迟回调运行?通常某一方上的操作会调用一个通知另一方的回调,从而调用另一方的回调,如此这样进行很多步。如果不延迟回调,这种调用链常常会导致栈溢出或者饿死其他连接,而且还要求所有的回调是可重入的。
成对的bufferevent支持flush:设置模式参数为BEV_NORMAL或者BEV_FLUSH会强制要求所有相关数据从对中的一个bufferevent传输到另一个中,而忽略可能会限制传输的水位设置。增加BEV_FINISHED到模式参数中还会让对端的bufferevent产生EOF事件。
释放对中的任何一个成员不会自动释放另一个,也不会产生EOF事件。释放仅仅会使对中的另一个成员成为断开的。bufferevent一旦断开,就不能再成功读写数据或者产生任何事件了。
struct bufferevent *
bufferevent_pair_get_partner(struct bufferevent *bev)
{
struct bufferevent_pair *bev_p;
struct bufferevent *partner = NULL;
bev_p = upcast(bev);
if (! bev_p)
return NULL;
incref_and_lock(bev);
if (bev_p->partner)
partner = downcast(bev_p->partner);
decref_and_unlock(bev);
return partner;
}
有时候在给出了对的一个成员时,需要获取另一个成员,这时候可以使用bufferevent_pair_get_partner()。如果bev是对的成员,而且对的另一个成员仍然存在,函数将返回另一个成员;否则,函数返回NULL。
bufferevent对由2.0.1-alpha版本引入,而bufferevent_pair_get_partner()函数由2.0.6版本引入。
有时候需要转换传递给某bufferevent的所有数据,这可以通过添加一个压缩层,或者将协议包装到另一个协议中进行传输来实现。
/**
@name Filtering support
@{
*/
/**
Values that filters can return.
*/
enum bufferevent_filter_result {
/** everything is okay */
BEV_OK = 0,
/** the filter needs to read more data before output */
BEV_NEED_MORE = 1,
/** the filter encountered a critical error, no further data
can be processed. */
BEV_ERROR = 2
};
/** A callback function to implement a filter for a bufferevent.
@param src An evbuffer to drain data from.
@param dst An evbuffer to add data to.
@param limit A suggested upper bound of bytes to write to dst.
The filter may ignore this value, but doing so means that
it will overflow the high-water mark associated with dst.
-1 means "no limit".
@param mode Whether we should write data as may be convenient
(BEV_NORMAL), or flush as much data as we can (BEV_FLUSH),
or flush as much as we can, possibly including an end-of-stream
marker (BEV_FINISH).
@param ctx A user-supplied pointer.
@return BEV_OK if we wrote some data; BEV_NEED_MORE if we can't
produce any more output until we get some input; and BEV_ERROR
on an error.
*/
typedef enum bufferevent_filter_result (*bufferevent_filter_cb)(
struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
enum bufferevent_flush_mode mode, void *ctx);
/**
Allocate a new filtering bufferevent on top of an existing bufferevent.
@param underlying the underlying bufferevent.
@param input_filter The filter to apply to data we read from the underlying
bufferevent
@param output_filter The filer to apply to data we write to the underlying
bufferevent
@param options A bitfield of bufferevent options.
@param free_context A function to use to free the filter context when
this bufferevent is freed.
@param ctx A context pointer to pass to the filter functions.
*/
EVENT2_EXPORT_SYMBOL
struct bufferevent *
bufferevent_filter_new(struct bufferevent *underlying,
bufferevent_filter_cb input_filter,
bufferevent_filter_cb output_filter,
int options,
void (*free_context)(void *),
void *ctx);
/**@}*/
bufferevent_filter_new()函数创建一个封装现有的“底层”bufferevent的过滤bufferevent。所有通过底层bufferevent接收的数据在到达过滤bufferevent之前都会经过“输入”过滤器的转换;所有通过底层bufferevent发送的数据在被发送到底层bufferevent之前都会经过“输出”过滤器的转换。
向底层bufferevent添加过滤器将替换其回调函数。可以向底层bufferevent的evbuffer添加回调函数,但是如果想让过滤器正确工作,就不能再设置bufferevent本身的回调函数。
input_filter和output_filter函数将随后描述。options参数支持所有通常的选项。如果设置了BEV_OPT_CLOSE_ON_FREE,那么释放过滤bufferevent也会同时释放底层bufferevent。ctx参数是传递给过滤函数的任意指针;如果提供了free_context,则在释放ctx之前它会被调用。
底层输入缓冲区有数据可读时,输入过滤器函数会被调用;过滤器的输出缓冲区有新的数据待写入时,输出过滤器函数会被调用。两个过滤器函数都有一对evbuffer参数:从source读取数据;向destination写入数据,而dst_limit参数描述了可以写入destination的字节数上限。过滤器函数可以忽略这个参数,但是这样可能会违背高水位或者速率限制。如果dst_limit是-1,则没有限制。mode参数向过滤器描述了写入的方式。值BEV_NORMAL表示应该在方便转换的基础上写入尽可能多的数据;而BEV_FLUSH表示写入尽可能多的数据;BEV_FINISHED表示过滤器函数应该在流的末尾执行额外的清理操作。最后,过滤器函数的ctx参数就是传递给bufferevent_filter_new()函数的指针(ctx参数)。
如果成功向目标缓冲区写入了任何数据,过滤器函数应该返回BEV_OK;如果不获得更多的输入,或者不使用不同的清空(flush)模式,就不能向目标缓冲区写入更多的数据,则应该返回BEV_NEED_MORE;如果过滤器上发生了不可恢复的错误,则应该返回BEV_ERROR。
创建过滤器将启用底层bufferevent的读取和写入。随后就不需要自己管理读取和写入了:过滤器在不想读取的时候会自动挂起底层bufferevent的读取。从2.0.8-rc版本开始,可以在过滤器之外独立地启用/禁用底层bufferevent的读取和写入。然而,这样可能会让过滤器不能成功取得所需要的数据。
不需要同时指定输入和输出过滤器:没有给定的过滤器将被一个不进行数据转换的过滤器取代。
某些程序需要限制单个或者一组bufferevent使用的带宽。2.0.4-alpha和2.0.5-alpha版本添加了为单个或者一组bufferevent设置速率限制的基本功能。
libevent的速率限制使用**记号存储器(token bucket)**算法确定在某时刻可以写入或者读取多少字节。每个速率限制对象在任何给定时刻都有一个“读存储器(read bucket)”和一个“写存储器(write bucket)”,其大小决定了对象可以立即读取或者写入多少字节。每个存储器有一个填充速率,一个最大突发尺寸,和一个时间单位,或者说“滴答(tick)”。一个时间单位流逝后,存储器被填充一些字节(决定于填充速率)——但是如果超过其突发尺寸,则超出的字节会丢失。
因此,填充速率决定了对象发送或者接收字节的最大平均速率,而突发尺寸决定了在单次突发中可以发送或者接收的最大字节数;时间单位则确定了传输的平滑程度。
#define EV_RATE_LIMIT_MAX_ EV_SSIZE_MAX
struct ev_token_bucket_cfg;
struct ev_token_bucket_cfg * ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
size_t write_rate, size_t write_burst,const struct timeval *tick_len);
void ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg){
mm_free(cfg);
}
int bufferevent_set_rate_limit(struct bufferevent *bev,
struct ev_token_bucket_cfg *cfg);
ev_token_bucket_cfg结构体代表用于限制单个或者一组bufferevent的一对记号存储器的配置值。要创建ev_token_bucket_cfg,调用ev_token_bucket_cfg_new函数,提供最大平均读取速率、最大突发读取量、最大平均写入速率、最大突发写入量,以及一个滴答的长度。如果tick_len参数为NULL,则默认的滴答长度为一秒。如果发生错误,函数会返回NULL。
注意:read_rate和write_rate参数的单位是字节每滴答。也就是说,如果滴答长度是十分之一秒,read_rate是300,则最大平均读取速率是3000字节每秒。此外,不支持大于EV_RATE_LIMIT_MAX的速率或者突发量。
要限制bufferevent的传输速率,使用一个ev_token_bucket_cfg,对其调用bufferevent_set_rate_limit()。成功时函数返回0,失败时返回-1。可以对任意数量的bufferevent使用相同的ev_token_bucket_cfg。要移除速率限制,可以调用bufferevent_set_rate_limit(),传递NULL作为cfg参数值。
调用ev_token_bucket_cfg_free()可以释放ev_token_bucket_cfg。注意:当前在没有任何bufferevent使用ev_token_bucket_cfg之前进行释放是不安全的。
如果要限制一组bufferevent总的带宽使用,可以将它们分配到一个速率限制组中。
struct bufferevent_rate_limit_group;
struct bufferevent_rate_limit_group *bufferevent_rate_limit_group_new(struct event_base *base,
const struct ev_token_bucket_cfg *cfg);
int bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group *g,
const struct ev_token_bucket_cfg *cfg);
void bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g);
int bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
struct bufferevent_rate_limit_group *g);
int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev);
要创建速率限制组,使用一个event_base和一个已经初始化的ev_token_bucket_cfg作为参数调用bufferevent_rate_limit_group_new函数。使用bufferevent_add_to_rate_limit_group将bufferevent添加到组中;使用bufferevent_remove_from_rate_limit_group从组中删除bufferevent。这些函数成功时返回0,失败时返回-1。
单个bufferevent在某时刻只能是一个速率限制组的成员。bufferevent可以同时有单独的速率限制(通过bufferevent_set_rate_limit设置)和组速率限制。设置了这两个限制时,对每个bufferevent,较低的限制将被应用。
调用bufferevent_rate_limit_group_set_cfg修改组的速率限制。函数成功时返回0,失败时返回-1。bufferevent_rate_limit_group_free函数释放速率限制组,移除所有成员。
在2.0版本中,组速率限制试图实现总体的公平,但是具体实现可能在小的时间范围内并不公平。如果你强烈关注调度的公平性,请帮助提供未来版本的补丁。
有时候需要得知应用到给定bufferevent或者组的速率限制,为此,libevent提供了函数:
bufferevent_get_read_limit bufferevent_get_write_limit bufferevent_rate_limit_group_get_write_limit bufferevent_rate_limit_group_get_read_limit
ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev);
ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev);
ev_ssize_t bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group *grp);
ev_ssize_t bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group *grp);
上述函数返回以字节为单位的bufferevent或者组的读写记号存储器大小。注意:如果bufferevent已经被强制超过其配置(清空(flush)操作就会这样),则这些值可能是负数。
ev_ssize_t bufferevent_get_max_to_read(struct bufferevent *bev);
ev_ssize_t bufferevent_get_max_to_write(struct bufferevent *bev);
这些函数返回在考虑了应用到bufferevent或者组(如果有)的速率限制,以及一次最大读写数据量的情况下,现在可以读或者写的字节数。
void bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
ev_uint64_t *total_read_out, ev_uint64_t *total_written_out);
void bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp);
每个bufferevent_rate_limit_group跟踪经过其发送的总的字节数,这可用于跟踪组中所有bufferevent总的使用情况。对一个组调用bufferevent_rate_limit_group_get_totals会分别设置total_read_out和total_written_out为组的总读取和写入字节数。组创建的时候这些计数从0开始,调用bufferevent_rate_limit_group_reset_totals会复位计数为0。
对于有复杂需求的程序,可能需要调整记号存储器的当前值。比如说,如果程序不通过使用bufferevent的方式产生一些通信量时。
bufferevent_decrement_read_limit bufferevent_decrement_write_limit bufferevent_rate_limit_group_decrement_read bufferevent_rate_limit_group_decrement_write
int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr);
int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr);
int bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group *grp, ev_ssize_t decr);
int bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group *grp, ev_ssize_t decr);
这些函数减小某个bufferevent或者速率限制组的当前读或者写存储器。注意:减小是有符号的。如果要增加存储器,就传入负值。
-
int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr);
struct bufferevent *bev
: 这是一个指向bufferevent
结构体的指针,表示需要调整读限制的缓冲事件对象。ev_ssize_t decr
: 这是一个ev_ssize_t
类型的值,表示要减少的读限制量。
-
int bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr);
struct bufferevent *bev
: 这是一个指向bufferevent
结构体的指针,表示需要调整写限制的缓冲事件对象。ev_ssize_t decr
: 这是一个ev_ssize_t
类型的值,表示要减少的写限制量。
-
int bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group *grp, ev_ssize_t decr);
struct bufferevent_rate_limit_group *grp
: 这是一个指向bufferevent_rate_limit_group
结构体的指针,表示需要调整读速率限制的速率限制组。ev_ssize_t decr
: 这是一个ev_ssize_t
类型的值,表示要减少的读速率限制量。
-
int bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group *grp, ev_ssize_t decr);
struct bufferevent_rate_limit_group *grp
: 这是一个指向bufferevent_rate_limit_group
结构体的指针,表示需要调整写速率限制的速率限制组。ev_ssize_t decr
: 这是一个ev_ssize_t
类型的值,表示要减少的写速率限制量
#include <stdio.h>
#include <stdlib.h>
#include "event2/bufferevent.h" // 假设你使用的是 libevent 库
#include "event2/rate_limit.h" // 假设这个库包含了速率限制相关的定义
// 模拟初始化和设置函数
struct bufferevent* initialize_bufferevent() {
// 在实际程序中,这里应该包含实际的 bufferevent 初始化代码
return NULL;
}
struct bufferevent_rate_limit_group* initialize_rate_limit_group() {
// 在实际程序中,这里应该包含实际的速率限制组初始化代码
return NULL;
}
int main() {
// 初始化 bufferevent 和速率限制组对象
struct bufferevent *bev = initialize_bufferevent();
struct bufferevent_rate_limit_group *grp = initialize_rate_limit_group();
// 检查对象是否正确初始化
if (!bev || !grp) {
fprintf(stderr, "Failed to initialize bufferevent or rate limit group.\n");
return 1;
}
// 示例:减少 bufferevent 对象的读限制
ev_ssize_t read_decrease_amount = 1024; // 例如:减少 1024 字节
int read_result = bufferevent_decrement_read_limit(bev, read_decrease_amount);
if (read_result == 0) {
printf("Successfully decreased read limit by %ld bytes.\n", read_decrease_amount);
} else {
fprintf(stderr, "Failed to decrease read limit.\n");
}
// 示例:减少 bufferevent 对象的写限制
ev_ssize_t write_decrease_amount = 2048; // 例如:减少 2048 字节
int write_result = bufferevent_decrement_write_limit(bev, write_decrease_amount);
if (write_result == 0) {
printf("Successfully decreased write limit by %ld bytes.\n", write_decrease_amount);
} else {
fprintf(stderr, "Failed to decrease write limit.\n");
}
// 示例:减少速率限制组的读速率限制
ev_ssize_t rate_limit_read_decrease = 512; // 例如:减少 512 字节/秒
int rate_limit_read_result = bufferevent_rate_limit_group_decrement_read(grp, rate_limit_read_decrease);
if (rate_limit_read_result == 0) {
printf("Successfully decreased read rate limit by %ld bytes per second.\n", rate_limit_read_decrease);
} else {
fprintf(stderr, "Failed to decrease read rate limit.\n");
}
// 示例:减少速率限制组的写速率限制
ev_ssize_t rate_limit_write_decrease = 1024; // 例如:减少 1024 字节/秒
int rate_limit_write_result = bufferevent_rate_limit_group_decrement_write(grp, rate_limit_write_decrease);
if (rate_limit_write_result == 0) {
printf("Successfully decreased write rate limit by %ld bytes per second.\n", rate_limit_write_decrease);
} else {
fprintf(stderr, "Failed to decrease write rate limit.\n");
}
// 清理资源
// 这里应该包含释放 bufferevent 和速率限制组对象的代码
return 0;
}
通常,不希望在每个滴答中为速率限制组中的所有bufferevent平等地分配可用的字节。比如说,有一个含有10000个活动bufferevent的速率限制组,它在每个滴答中可以写入10000字节,那么,因为系统调用和TCP头部的开销,让每个bufferevent在每个滴答中仅写入1字节是低效的。
为解决此问题,速率限制组有一个“最小共享(minimum share)”的概念。在上述情况下,不是允许每个bufferevent在每个滴答中写入1字节,而是在每个滴答中允许某个bufferevent写入一些(最小共享)字节,而其余的bufferevent将不允许写入。允许哪个bufferevent写入将在每个滴答中随机选择。
默认的最小共享值具有较好的性能,当前(2.0.6-rc版本)其值为64。可以通过这个函数调整最小共享值:
int bufferevent_rate_limit_group_set_min_share(
struct bufferevent_rate_limit_group *g,size_t share);
2.0版本的libevent的速率限制具有一些实现上的限制:
-
不是每种bufferevent类型都良好地或者说完整地支持速率限制。
-
bufferevent速率限制组不能嵌套,一个bufferevent在某时刻只能属于一个速率限制组。
-
速率限制实现仅计算TCP分组传输的数据,不包括TCP头部。
-
读速率限制实现依赖于TCP栈通知应用程序仅仅以某速率消费数据,并且在其缓冲区满的时候将数据推送到TCP连接的另一端。
-
某些bufferevent实现(特别是Windows中的IOCP实现)可能调拨过度。
-
存储器开始于一个滴答的通信量。这意味着bufferevent可以立即开始读取或者写入,而不用等待一个滴答的时间。但是这也意味着速率被限制为N.1个滴答的bufferevent可能传输N+1个滴答的通信量。
-
滴答不能小于1毫秒,毫秒的小数部分都被忽略。