aboutsummaryrefslogtreecommitdiff
path: root/bufferevent_async.c
diff options
context:
space:
mode:
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r--bufferevent_async.c166
1 files changed, 85 insertions, 81 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 6395e57..f34ca5f 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -27,9 +27,8 @@
*/
#include "event2/event-config.h"
-#include "evconfig-private.h"
-#ifdef EVENT__HAVE_SYS_TIME_H
+#ifdef _EVENT_HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
@@ -37,14 +36,14 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#ifdef EVENT__HAVE_STDARG_H
+#ifdef _EVENT_HAVE_STDARG_H
#include <stdarg.h>
#endif
-#ifdef EVENT__HAVE_UNISTD_H
+#ifdef _EVENT_HAVE_UNISTD_H
#include <unistd.h>
#endif
-#ifdef _WIN32
+#ifdef WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#endif
@@ -93,9 +92,8 @@ const struct bufferevent_ops bufferevent_ops_async = {
evutil_offsetof(struct bufferevent_async, bev.bev),
be_async_enable,
be_async_disable,
- NULL, /* Unlink */
be_async_destruct,
- bufferevent_generic_adj_timeouts_,
+ _bufferevent_generic_adj_timeouts,
be_async_flush,
be_async_ctrl,
};
@@ -144,7 +142,7 @@ bev_async_del_write(struct bufferevent_async *beva)
if (beva->write_added) {
beva->write_added = 0;
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual(bev->ev_base);
}
}
@@ -155,7 +153,7 @@ bev_async_del_read(struct bufferevent_async *beva)
if (beva->read_added) {
beva->read_added = 0;
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual(bev->ev_base);
}
}
@@ -166,7 +164,7 @@ bev_async_add_write(struct bufferevent_async *beva)
if (!beva->write_added) {
beva->write_added = 1;
- event_base_add_virtual_(bev->ev_base);
+ event_base_add_virtual(bev->ev_base);
}
}
@@ -177,7 +175,7 @@ bev_async_add_read(struct bufferevent_async *beva)
if (!beva->read_added) {
beva->read_added = 1;
- event_base_add_virtual_(bev->ev_base);
+ event_base_add_virtual(bev->ev_base);
}
}
@@ -202,7 +200,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
- limit = (int)bufferevent_get_write_max_(&beva->bev);
+ limit = (int)_bufferevent_get_write_max(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;
@@ -212,15 +210,15 @@ bev_async_consider_writing(struct bufferevent_async *beva)
}
/* XXXX doesn't respect low-water mark very well. */
- bufferevent_incref_(bev);
- if (evbuffer_launch_write_(bev->output, at_most,
+ bufferevent_incref(bev);
+ if (evbuffer_launch_write(bev->output, at_most,
&beva->write_overlapped)) {
- bufferevent_decref_(bev);
+ bufferevent_decref(bev);
beva->ok = 0;
- bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->write_in_progress = at_most;
- bufferevent_decrement_write_buckets_(&beva->bev, at_most);
+ _bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
@@ -257,8 +255,8 @@ bev_async_consider_reading(struct bufferevent_async *beva)
}
/* XXXX This over-commits. */
- /* XXXX see also not above on cast on bufferevent_get_write_max_() */
- limit = (int)bufferevent_get_read_max_(&beva->bev);
+ /* XXXX see also not above on cast on _bufferevent_get_write_max() */
+ limit = (int)_bufferevent_get_read_max(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;
@@ -267,14 +265,14 @@ bev_async_consider_reading(struct bufferevent_async *beva)
return;
}
- bufferevent_incref_(bev);
- if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
+ bufferevent_incref(bev);
+ if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
- bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
- bufferevent_decref_(bev);
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
+ bufferevent_decref(bev);
} else {
beva->read_in_progress = at_most;
- bufferevent_decrement_read_buckets_(&beva->bev, at_most);
+ _bufferevent_decrement_read_buckets(&beva->bev, at_most);
bev_async_add_read(beva);
}
@@ -292,12 +290,12 @@ be_async_outbuf_callback(struct evbuffer *buf,
/* If we added data to the outbuf and were not writing before,
* we may want to write now. */
- bufferevent_incref_and_lock_(bev);
+ _bufferevent_incref_and_lock(bev);
if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
- bufferevent_decref_and_unlock_(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static void
@@ -311,12 +309,12 @@ be_async_inbuf_callback(struct evbuffer *buf,
/* If we drained data from the inbuf and were not reading before,
* we may want to read now */
- bufferevent_incref_and_lock_(bev);
+ _bufferevent_incref_and_lock(bev);
if (cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
- bufferevent_decref_and_unlock_(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static int
@@ -380,11 +378,15 @@ be_async_destruct(struct bufferevent *bev)
bev_async_del_read(bev_async);
bev_async_del_write(bev_async);
- fd = evbuffer_overlapped_get_fd_(bev->input);
- if (fd != (evutil_socket_t)INVALID_SOCKET &&
- (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
+ fd = _evbuffer_overlapped_get_fd(bev->input);
+ if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
+ /* XXXX possible double-close */
evutil_closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
+ }
+ /* delete this in case non-blocking connect was used */
+ if (event_initialized(&bev->ev_write)) {
+ event_del(&bev->ev_write);
+ _bufferevent_del_generic_timeout_cbs(bev);
}
}
@@ -396,7 +398,7 @@ bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
DWORD bytes, flags;
evutil_socket_t fd;
- fd = evbuffer_overlapped_get_fd_(bev->input);
+ fd = _evbuffer_overlapped_get_fd(bev->input);
WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
}
@@ -419,21 +421,21 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
- sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
+ sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input);
/* XXXX Handle error? */
setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
if (ok)
- bufferevent_async_set_connected_(bev);
+ bufferevent_async_set_connected(bev);
else
bev_async_set_wsa_error(bev, eo);
- bufferevent_run_eventcb_(bev,
- ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
+ _bufferevent_run_eventcb(bev,
+ ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
- event_base_del_virtual_(bev->ev_base);
+ event_base_del_virtual(bev->ev_base);
- bufferevent_decref_and_unlock_(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static void
@@ -448,10 +450,10 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->read_in_progress);
amount_unread = bev_a->read_in_progress - nbytes;
- evbuffer_commit_read_(bev->input, nbytes);
+ evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
if (amount_unread)
- bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
+ _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
if (!ok)
bev_async_set_wsa_error(bev, eo);
@@ -459,20 +461,21 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- bufferevent_trigger_nolock_(bev, EV_READ, 0);
+ if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
+ _bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ _bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ _bufferevent_run_eventcb(bev, what);
}
}
- bufferevent_decref_and_unlock_(bev);
+ _bufferevent_decref_and_unlock(bev);
}
static void
@@ -488,11 +491,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
EVUTIL_ASSERT(bev_a->write_in_progress);
amount_unwritten = bev_a->write_in_progress - nbytes;
- evbuffer_commit_write_(bev->output, nbytes);
+ evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
if (amount_unwritten)
- bufferevent_decrement_write_buckets_(&bev_a->bev,
+ _bufferevent_decrement_write_buckets(&bev_a->bev,
-amount_unwritten);
@@ -502,24 +505,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
+ if (evbuffer_get_length(bev->output) <=
+ bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ _bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ _bufferevent_run_eventcb(bev, what);
}
}
- bufferevent_decref_and_unlock_(bev);
+ _bufferevent_decref_and_unlock(bev);
}
struct bufferevent *
-bufferevent_async_new_(struct event_base *base,
+bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
{
struct bufferevent_async *bev_a;
@@ -528,10 +533,10 @@ bufferevent_async_new_(struct event_base *base,
options |= BEV_OPT_THREADSAFE;
- if (!(iocp = event_base_get_iocp_(base)))
+ if (!(iocp = event_base_get_iocp(base)))
return NULL;
- if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
+ if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
int err = GetLastError();
/* We may have alrady associated this fd with a port.
* Let's hope it's this port, and that the error code
@@ -544,30 +549,30 @@ bufferevent_async_new_(struct event_base *base,
return NULL;
bev = &bev_a->bev.bev;
- if (!(bev->input = evbuffer_overlapped_new_(fd))) {
+ if (!(bev->input = evbuffer_overlapped_new(fd))) {
mm_free(bev_a);
return NULL;
}
- if (!(bev->output = evbuffer_overlapped_new_(fd))) {
+ if (!(bev->output = evbuffer_overlapped_new(fd))) {
evbuffer_free(bev->input);
mm_free(bev_a);
return NULL;
}
- if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
+ if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
options)<0)
goto err;
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
- event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
- event_overlapped_init_(&bev_a->read_overlapped, read_complete);
- event_overlapped_init_(&bev_a->write_overlapped, write_complete);
-
- bufferevent_init_generic_timeout_cbs_(bev);
+ event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+ event_overlapped_init(&bev_a->read_overlapped, read_complete);
+ event_overlapped_init(&bev_a->write_overlapped, write_complete);
bev_a->ok = fd >= 0;
+ if (bev_a->ok)
+ _bufferevent_init_generic_timeout_cbs(bev);
return bev;
err:
@@ -576,23 +581,23 @@ err:
}
void
-bufferevent_async_set_connected_(struct bufferevent *bev)
+bufferevent_async_set_connected(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
bev_async->ok = 1;
- bufferevent_init_generic_timeout_cbs_(bev);
+ _bufferevent_init_generic_timeout_cbs(bev);
/* Now's a good time to consider reading/writing */
be_async_enable(bev, bev->enabled);
}
int
-bufferevent_async_can_connect_(struct bufferevent *bev)
+bufferevent_async_can_connect(struct bufferevent *bev)
{
const struct win32_extension_fns *ext =
- event_get_win32_extension_fns_();
+ event_get_win32_extension_fns();
if (BEV_IS_ASYNC(bev) &&
- event_base_get_iocp_(bev->ev_base) &&
+ event_base_get_iocp(bev->ev_base) &&
ext && ext->ConnectEx)
return 1;
@@ -600,14 +605,14 @@ bufferevent_async_can_connect_(struct bufferevent *bev)
}
int
-bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
+bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
const struct sockaddr *sa, int socklen)
{
BOOL rc;
struct bufferevent_async *bev_async = upcast(bev);
struct sockaddr_storage ss;
const struct win32_extension_fns *ext =
- event_get_win32_extension_fns_();
+ event_get_win32_extension_fns();
EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
@@ -632,15 +637,15 @@ bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
WSAGetLastError() != WSAEINVAL)
return -1;
- event_base_add_virtual_(bev->ev_base);
- bufferevent_incref_(bev);
+ event_base_add_virtual(bev->ev_base);
+ bufferevent_incref(bev);
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
&bev_async->connect_overlapped.overlapped);
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
return 0;
- event_base_del_virtual_(bev->ev_base);
- bufferevent_decref_(bev);
+ event_base_del_virtual(bev->ev_base);
+ bufferevent_decref(bev);
return -1;
}
@@ -651,28 +656,27 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
{
switch (op) {
case BEV_CTRL_GET_FD:
- data->fd = evbuffer_overlapped_get_fd_(bev->input);
+ data->fd = _evbuffer_overlapped_get_fd(bev->input);
return 0;
case BEV_CTRL_SET_FD: {
struct event_iocp_port *iocp;
- if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
+ if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
return 0;
- if (!(iocp = event_base_get_iocp_(bev->ev_base)))
+ if (!(iocp = event_base_get_iocp(bev->ev_base)))
return -1;
- if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
+ if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
return -1;
- evbuffer_overlapped_set_fd_(bev->input, data->fd);
- evbuffer_overlapped_set_fd_(bev->output, data->fd);
+ _evbuffer_overlapped_set_fd(bev->input, data->fd);
+ _evbuffer_overlapped_set_fd(bev->output, data->fd);
return 0;
}
case BEV_CTRL_CANCEL_ALL: {
struct bufferevent_async *bev_a = upcast(bev);
- evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
+ evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
if (fd != (evutil_socket_t)INVALID_SOCKET &&
(bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
}
bev_a->ok = 0;
return 0;