diff options
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r-- | bufferevent_async.c | 166 |
1 files changed, 85 insertions, 81 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c index 6395e57..f34ca5f 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -27,9 +27,8 @@ */ #include "event2/event-config.h" -#include "evconfig-private.h" -#ifdef EVENT__HAVE_SYS_TIME_H +#ifdef _EVENT_HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -37,14 +36,14 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef EVENT__HAVE_STDARG_H +#ifdef _EVENT_HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef EVENT__HAVE_UNISTD_H +#ifdef _EVENT_HAVE_UNISTD_H #include <unistd.h> #endif -#ifdef _WIN32 +#ifdef WIN32 #include <winsock2.h> #include <ws2tcpip.h> #endif @@ -93,9 +92,8 @@ const struct bufferevent_ops bufferevent_ops_async = { evutil_offsetof(struct bufferevent_async, bev.bev), be_async_enable, be_async_disable, - NULL, /* Unlink */ be_async_destruct, - bufferevent_generic_adj_timeouts_, + _bufferevent_generic_adj_timeouts, be_async_flush, be_async_ctrl, }; @@ -144,7 +142,7 @@ bev_async_del_write(struct bufferevent_async *beva) if (beva->write_added) { beva->write_added = 0; - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual(bev->ev_base); } } @@ -155,7 +153,7 @@ bev_async_del_read(struct bufferevent_async *beva) if (beva->read_added) { beva->read_added = 0; - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual(bev->ev_base); } } @@ -166,7 +164,7 @@ bev_async_add_write(struct bufferevent_async *beva) if (!beva->write_added) { beva->write_added = 1; - event_base_add_virtual_(bev->ev_base); + event_base_add_virtual(bev->ev_base); } } @@ -177,7 +175,7 @@ bev_async_add_read(struct bufferevent_async *beva) if (!beva->read_added) { beva->read_added = 1; - event_base_add_virtual_(bev->ev_base); + event_base_add_virtual(bev->ev_base); } } @@ -202,7 +200,7 @@ bev_async_consider_writing(struct bufferevent_async *beva) /* This is safe so long as bufferevent_get_write_max never returns * more than INT_MAX. That's true for now. XXXX */ - limit = (int)bufferevent_get_write_max_(&beva->bev); + limit = (int)_bufferevent_get_write_max(&beva->bev); if (at_most >= (size_t)limit && limit >= 0) at_most = limit; @@ -212,15 +210,15 @@ bev_async_consider_writing(struct bufferevent_async *beva) } /* XXXX doesn't respect low-water mark very well. */ - bufferevent_incref_(bev); - if (evbuffer_launch_write_(bev->output, at_most, + bufferevent_incref(bev); + if (evbuffer_launch_write(bev->output, at_most, &beva->write_overlapped)) { - bufferevent_decref_(bev); + bufferevent_decref(bev); beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); } else { beva->write_in_progress = at_most; - bufferevent_decrement_write_buckets_(&beva->bev, at_most); + _bufferevent_decrement_write_buckets(&beva->bev, at_most); bev_async_add_write(beva); } } @@ -257,8 +255,8 @@ bev_async_consider_reading(struct bufferevent_async *beva) } /* XXXX This over-commits. */ - /* XXXX see also not above on cast on bufferevent_get_write_max_() */ - limit = (int)bufferevent_get_read_max_(&beva->bev); + /* XXXX see also not above on cast on _bufferevent_get_write_max() */ + limit = (int)_bufferevent_get_read_max(&beva->bev); if (at_most >= (size_t)limit && limit >= 0) at_most = limit; @@ -267,14 +265,14 @@ bev_async_consider_reading(struct bufferevent_async *beva) return; } - bufferevent_incref_(bev); - if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { + bufferevent_incref(bev); + if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); - bufferevent_decref_(bev); + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + bufferevent_decref(bev); } else { beva->read_in_progress = at_most; - bufferevent_decrement_read_buckets_(&beva->bev, at_most); + _bufferevent_decrement_read_buckets(&beva->bev, at_most); bev_async_add_read(beva); } @@ -292,12 +290,12 @@ be_async_outbuf_callback(struct evbuffer *buf, /* If we added data to the outbuf and were not writing before, * we may want to write now. */ - bufferevent_incref_and_lock_(bev); + _bufferevent_incref_and_lock(bev); if (cbinfo->n_added) bev_async_consider_writing(bev_async); - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } static void @@ -311,12 +309,12 @@ be_async_inbuf_callback(struct evbuffer *buf, /* If we drained data from the inbuf and were not reading before, * we may want to read now */ - bufferevent_incref_and_lock_(bev); + _bufferevent_incref_and_lock(bev); if (cbinfo->n_deleted) bev_async_consider_reading(bev_async); - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } static int @@ -380,11 +378,15 @@ be_async_destruct(struct bufferevent *bev) bev_async_del_read(bev_async); bev_async_del_write(bev_async); - fd = evbuffer_overlapped_get_fd_(bev->input); - if (fd != (evutil_socket_t)INVALID_SOCKET && - (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { + fd = _evbuffer_overlapped_get_fd(bev->input); + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { + /* XXXX possible double-close */ evutil_closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); + } + /* delete this in case non-blocking connect was used */ + if (event_initialized(&bev->ev_write)) { + event_del(&bev->ev_write); + _bufferevent_del_generic_timeout_cbs(bev); } } @@ -396,7 +398,7 @@ bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) DWORD bytes, flags; evutil_socket_t fd; - fd = evbuffer_overlapped_get_fd_(bev->input); + fd = _evbuffer_overlapped_get_fd(bev->input); WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); } @@ -419,21 +421,21 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->bev.connecting); bev_a->bev.connecting = 0; - sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); + sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input); /* XXXX Handle error? */ setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); if (ok) - bufferevent_async_set_connected_(bev); + bufferevent_async_set_connected(bev); else bev_async_set_wsa_error(bev, eo); - bufferevent_run_eventcb_(bev, - ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); + _bufferevent_run_eventcb(bev, + ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); - event_base_del_virtual_(bev->ev_base); + event_base_del_virtual(bev->ev_base); - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } static void @@ -448,10 +450,10 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->read_in_progress); amount_unread = bev_a->read_in_progress - nbytes; - evbuffer_commit_read_(bev->input, nbytes); + evbuffer_commit_read(bev->input, nbytes); bev_a->read_in_progress = 0; if (amount_unread) - bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); + _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); if (!ok) bev_async_set_wsa_error(bev, eo); @@ -459,20 +461,21 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - bufferevent_trigger_nolock_(bev, EV_READ, 0); + if (evbuffer_get_length(bev->input) >= bev->wm_read.low) + _bufferevent_run_readcb(bev); bev_async_consider_reading(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + _bufferevent_run_eventcb(bev, what); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + _bufferevent_run_eventcb(bev, what); } } - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } static void @@ -488,11 +491,11 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, EVUTIL_ASSERT(bev_a->write_in_progress); amount_unwritten = bev_a->write_in_progress - nbytes; - evbuffer_commit_write_(bev->output, nbytes); + evbuffer_commit_write(bev->output, nbytes); bev_a->write_in_progress = 0; if (amount_unwritten) - bufferevent_decrement_write_buckets_(&bev_a->bev, + _bufferevent_decrement_write_buckets(&bev_a->bev, -amount_unwritten); @@ -502,24 +505,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - bufferevent_trigger_nolock_(bev, EV_WRITE, 0); + if (evbuffer_get_length(bev->output) <= + bev->wm_write.low) + _bufferevent_run_writecb(bev); bev_async_consider_writing(bev_a); } else if (!ok) { what |= BEV_EVENT_ERROR; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + _bufferevent_run_eventcb(bev, what); } else if (!nbytes) { what |= BEV_EVENT_EOF; bev_a->ok = 0; - bufferevent_run_eventcb_(bev, what, 0); + _bufferevent_run_eventcb(bev, what); } } - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } struct bufferevent * -bufferevent_async_new_(struct event_base *base, +bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options) { struct bufferevent_async *bev_a; @@ -528,10 +533,10 @@ bufferevent_async_new_(struct event_base *base, options |= BEV_OPT_THREADSAFE; - if (!(iocp = event_base_get_iocp_(base))) + if (!(iocp = event_base_get_iocp(base))) return NULL; - if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { + if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { int err = GetLastError(); /* We may have alrady associated this fd with a port. * Let's hope it's this port, and that the error code @@ -544,30 +549,30 @@ bufferevent_async_new_(struct event_base *base, return NULL; bev = &bev_a->bev.bev; - if (!(bev->input = evbuffer_overlapped_new_(fd))) { + if (!(bev->input = evbuffer_overlapped_new(fd))) { mm_free(bev_a); return NULL; } - if (!(bev->output = evbuffer_overlapped_new_(fd))) { + if (!(bev->output = evbuffer_overlapped_new(fd))) { evbuffer_free(bev->input); mm_free(bev_a); return NULL; } - if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, + if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, options)<0) goto err; evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); - event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); - event_overlapped_init_(&bev_a->read_overlapped, read_complete); - event_overlapped_init_(&bev_a->write_overlapped, write_complete); - - bufferevent_init_generic_timeout_cbs_(bev); + event_overlapped_init(&bev_a->connect_overlapped, connect_complete); + event_overlapped_init(&bev_a->read_overlapped, read_complete); + event_overlapped_init(&bev_a->write_overlapped, write_complete); bev_a->ok = fd >= 0; + if (bev_a->ok) + _bufferevent_init_generic_timeout_cbs(bev); return bev; err: @@ -576,23 +581,23 @@ err: } void -bufferevent_async_set_connected_(struct bufferevent *bev) +bufferevent_async_set_connected(struct bufferevent *bev) { struct bufferevent_async *bev_async = upcast(bev); bev_async->ok = 1; - bufferevent_init_generic_timeout_cbs_(bev); + _bufferevent_init_generic_timeout_cbs(bev); /* Now's a good time to consider reading/writing */ be_async_enable(bev, bev->enabled); } int -bufferevent_async_can_connect_(struct bufferevent *bev) +bufferevent_async_can_connect(struct bufferevent *bev) { const struct win32_extension_fns *ext = - event_get_win32_extension_fns_(); + event_get_win32_extension_fns(); if (BEV_IS_ASYNC(bev) && - event_base_get_iocp_(bev->ev_base) && + event_base_get_iocp(bev->ev_base) && ext && ext->ConnectEx) return 1; @@ -600,14 +605,14 @@ bufferevent_async_can_connect_(struct bufferevent *bev) } int -bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, +bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, const struct sockaddr *sa, int socklen) { BOOL rc; struct bufferevent_async *bev_async = upcast(bev); struct sockaddr_storage ss; const struct win32_extension_fns *ext = - event_get_win32_extension_fns_(); + event_get_win32_extension_fns(); EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); @@ -632,15 +637,15 @@ bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, WSAGetLastError() != WSAEINVAL) return -1; - event_base_add_virtual_(bev->ev_base); - bufferevent_incref_(bev); + event_base_add_virtual(bev->ev_base); + bufferevent_incref(bev); rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, &bev_async->connect_overlapped.overlapped); if (rc || WSAGetLastError() == ERROR_IO_PENDING) return 0; - event_base_del_virtual_(bev->ev_base); - bufferevent_decref_(bev); + event_base_del_virtual(bev->ev_base); + bufferevent_decref(bev); return -1; } @@ -651,28 +656,27 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, { switch (op) { case BEV_CTRL_GET_FD: - data->fd = evbuffer_overlapped_get_fd_(bev->input); + data->fd = _evbuffer_overlapped_get_fd(bev->input); return 0; case BEV_CTRL_SET_FD: { struct event_iocp_port *iocp; - if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) + if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) return 0; - if (!(iocp = event_base_get_iocp_(bev->ev_base))) + if (!(iocp = event_base_get_iocp(bev->ev_base))) return -1; - if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) + if (event_iocp_port_associate(iocp, data->fd, 1) < 0) return -1; - evbuffer_overlapped_set_fd_(bev->input, data->fd); - evbuffer_overlapped_set_fd_(bev->output, data->fd); + _evbuffer_overlapped_set_fd(bev->input, data->fd); + _evbuffer_overlapped_set_fd(bev->output, data->fd); return 0; } case BEV_CTRL_CANCEL_ALL: { struct bufferevent_async *bev_a = upcast(bev); - evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); + evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); if (fd != (evutil_socket_t)INVALID_SOCKET && (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { closesocket(fd); - evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); } bev_a->ok = 0; return 0; |