diff options
author | Josh Gao <jmgao@google.com> | 2017-08-10 12:30:25 -0700 |
---|---|---|
committer | Josh Gao <jmgao@google.com> | 2017-08-10 12:30:47 -0700 |
commit | 83a0c9c65a60a92d3ea5542596b3ba56db492c37 (patch) | |
tree | 64bb0c050c4db9ad9d721e05e3e199bfd02e60b0 /bufferevent_filter.c | |
parent | af241a5c8c2cde0fab7d7d563276c1095b00e3b4 (diff) | |
download | libevent-83a0c9c65a60a92d3ea5542596b3ba56db492c37.tar.gz |
Revert "Upgrade to 2.1.8-stable (2017-01-22)." and "Probably Mac build fix?"android-o-iot-preview-5o-iot-preview-5
This reverts commits 2a572d125a91a4aafd3ad8ce87259fc640fa0763 and
af241a5c8c2cde0fab7d7d563276c1095b00e3b4, which break tombstoned.
Bug: http://b/64543673
Test: manual + treehugger
Diffstat (limited to 'bufferevent_filter.c')
-rw-r--r-- | bufferevent_filter.c | 214 |
1 files changed, 51 insertions, 163 deletions
diff --git a/bufferevent_filter.c b/bufferevent_filter.c index d47f945..557f8cc 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -26,13 +26,11 @@ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include "evconfig-private.h" - #include <sys/types.h> #include "event2/event-config.h" -#ifdef EVENT__HAVE_SYS_TIME_H +#ifdef _EVENT_HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -40,11 +38,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef EVENT__HAVE_STDARG_H +#ifdef _EVENT_HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef _WIN32 +#ifdef WIN32 #include <winsock2.h> #endif @@ -61,7 +59,6 @@ /* prototypes */ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); -static void be_filter_unlink(struct bufferevent *); static void be_filter_destruct(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); @@ -71,9 +68,6 @@ static int be_filter_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); -static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf, - const struct evbuffer_cb_info *cbinfo, void *arg); - static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); @@ -82,8 +76,6 @@ struct bufferevent_filtered { /** The bufferevent that we read/write filtered data from/to. */ struct bufferevent *underlying; - /** A callback on our inbuf to notice somebory removes data */ - struct evbuffer_cb_entry *inbuf_cb; /** A callback on our outbuf to notice when somebody adds data */ struct evbuffer_cb_entry *outbuf_cb; /** True iff we have received an EOF callback from the underlying @@ -105,9 +97,8 @@ const struct bufferevent_ops bufferevent_ops_filter = { evutil_offsetof(struct bufferevent_filtered, bev.bev), be_filter_enable, be_filter_disable, - be_filter_unlink, be_filter_destruct, - bufferevent_generic_adj_timeouts_, + _bufferevent_generic_adj_timeouts, be_filter_flush, be_filter_ctrl, }; @@ -189,13 +180,13 @@ bufferevent_filter_new(struct bufferevent *underlying, if (!bufev_f) return NULL; - if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, + if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, &bufferevent_ops_filter, tmp_options) < 0) { mm_free(bufev_f); return NULL; } if (options & BEV_OPT_THREADSAFE) { - bufferevent_enable_locking_(downcast(bufev_f), NULL); + bufferevent_enable_locking(downcast(bufev_f), NULL); } bufev_f->underlying = underlying; @@ -208,31 +199,28 @@ bufferevent_filter_new(struct bufferevent *underlying, bufferevent_setcb(bufev_f->underlying, be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); - bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input, - bufferevent_filtered_inbuf_cb, bufev_f); - evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb, - EVBUFFER_CB_ENABLED); - bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); - bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); - bufferevent_incref_(underlying); + _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); + bufferevent_incref(underlying); bufferevent_enable(underlying, EV_READ|EV_WRITE); - bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); + bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ); return downcast(bufev_f); } static void -be_filter_unlink(struct bufferevent *bev) +be_filter_destruct(struct bufferevent *bev) { struct bufferevent_filtered *bevf = upcast(bev); EVUTIL_ASSERT(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { - /* Yes, there is also a decref in bufferevent_decref_. + /* Yes, there is also a decref in bufferevent_decref. * That decref corresponds to the incref when we set * underlying for the first time. This decref is an * extra one to remove the last reference. @@ -248,25 +236,12 @@ be_filter_unlink(struct bufferevent *bev) if (bevf->underlying->errorcb == be_filter_eventcb) bufferevent_setcb(bevf->underlying, NULL, NULL, NULL, NULL); - bufferevent_unsuspend_read_(bevf->underlying, + bufferevent_unsuspend_read(bevf->underlying, BEV_SUSPEND_FILT_READ); } } -} - -static void -be_filter_destruct(struct bufferevent *bev) -{ - struct bufferevent_filtered *bevf = upcast(bev); - EVUTIL_ASSERT(bevf); - if (bevf->free_context) - bevf->free_context(bevf->context); - if (bevf->inbuf_cb) - evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb); - - if (bevf->outbuf_cb) - evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb); + _bufferevent_del_generic_timeout_cbs(bev); } static int @@ -278,7 +253,7 @@ be_filter_enable(struct bufferevent *bev, short event) if (event & EV_READ) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - bufferevent_unsuspend_read_(bevf->underlying, + bufferevent_unsuspend_read(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -292,7 +267,7 @@ be_filter_disable(struct bufferevent *bev, short event) BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); if (event & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); - bufferevent_suspend_read_(bevf->underlying, + bufferevent_suspend_read(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -361,8 +336,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* disable the callback that calls this function when the user adds to the output buffer. */ - evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb, - EVBUFFER_CB_ENABLED); + evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); do { int processed = 0; @@ -393,9 +367,10 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Or if we have filled the underlying output buffer. */ !be_underlying_writebuf_full(bevf,state)); - if (processed) { + if (processed && + evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { /* call the write callback.*/ - bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); + _bufferevent_run_writecb(bufev); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -428,145 +403,68 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf, int processed_any = 0; /* Somebody added more data to the output buffer. Try to * process it, if we should. */ - bufferevent_incref_and_lock_(bev); + _bufferevent_incref_and_lock(bev); be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - bufferevent_decref_and_unlock_(bev); + _bufferevent_decref_and_unlock(bev); } } +/* Called when the underlying socket has read. */ static void -be_filter_read_nolock_(struct bufferevent *underlying, void *me_) +be_filter_readcb(struct bufferevent *underlying, void *_me) { - struct bufferevent_filtered *bevf = me_; + struct bufferevent_filtered *bevf = _me; enum bufferevent_filter_result res; enum bufferevent_flush_mode state; struct bufferevent *bufev = downcast(bevf); - struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int processed_any = 0; - // It's possible our refcount is 0 at this point if another thread free'd our filterevent - EVUTIL_ASSERT(bufev_private->refcnt >= 0); - - // If our refcount is > 0 - if (bufev_private->refcnt > 0) { - - if (bevf->got_eof) - state = BEV_FINISHED; - else - state = BEV_NORMAL; - - /* XXXX use return value */ - res = be_filter_process_input(bevf, state, &processed_any); - (void)res; - - /* XXX This should be in process_input, not here. There are - * other places that can call process-input, and they should - * force readcb calls as needed. */ - if (processed_any) { - bufferevent_trigger_nolock_(bufev, EV_READ, 0); - if (evbuffer_get_length(underlying->input) > 0 && - be_readbuf_full(bevf, state)) { - /* data left in underlying buffer and filter input buffer - * hit its read high watermark. - * Schedule callback to avoid data gets stuck in underlying - * input buffer. - */ - evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb, - EVBUFFER_CB_ENABLED); - } - } - } -} - -/* Called when the size of our inbuf changes. */ -static void -bufferevent_filtered_inbuf_cb(struct evbuffer *buf, - const struct evbuffer_cb_info *cbinfo, void *arg) -{ - struct bufferevent_filtered *bevf = arg; - enum bufferevent_flush_mode state; - struct bufferevent *bev = downcast(bevf); - - BEV_LOCK(bev); + _bufferevent_incref_and_lock(bufev); if (bevf->got_eof) state = BEV_FINISHED; else state = BEV_NORMAL; + /* XXXX use return value */ + res = be_filter_process_input(bevf, state, &processed_any); + (void)res; - if (!be_readbuf_full(bevf, state)) { - /* opportunity to read data which was left in underlying - * input buffer because filter input buffer hit read - * high watermark. - */ - evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb, - EVBUFFER_CB_ENABLED); - if (evbuffer_get_length(bevf->underlying->input) > 0) - be_filter_read_nolock_(bevf->underlying, bevf); - } + /* XXX This should be in process_input, not here. There are + * other places that can call process-input, and they should + * force readcb calls as needed. */ + if (processed_any && + evbuffer_get_length(bufev->input) >= bufev->wm_read.low) + _bufferevent_run_readcb(bufev); - BEV_UNLOCK(bev); -} - -/* Called when the underlying socket has read. */ -static void -be_filter_readcb(struct bufferevent *underlying, void *me_) -{ - struct bufferevent_filtered *bevf = me_; - struct bufferevent *bev = downcast(bevf); - - BEV_LOCK(bev); - - be_filter_read_nolock_(underlying, me_); - - BEV_UNLOCK(bev); + _bufferevent_decref_and_unlock(bufev); } /* Called when the underlying socket has drained enough that we can write to it. */ static void -be_filter_writecb(struct bufferevent *underlying, void *me_) +be_filter_writecb(struct bufferevent *underlying, void *_me) { - struct bufferevent_filtered *bevf = me_; + struct bufferevent_filtered *bevf = _me; struct bufferevent *bev = downcast(bevf); - struct bufferevent_private *bufev_private = BEV_UPCAST(bev); int processed_any = 0; - BEV_LOCK(bev); - - // It's possible our refcount is 0 at this point if another thread free'd our filterevent - EVUTIL_ASSERT(bufev_private->refcnt >= 0); - - // If our refcount is > 0 - if (bufev_private->refcnt > 0) { - be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - } - - BEV_UNLOCK(bev); + _bufferevent_incref_and_lock(bev); + be_filter_process_output(bevf, BEV_NORMAL, &processed_any); + _bufferevent_decref_and_unlock(bev); } /* Called when the underlying socket has given us an error */ static void -be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) +be_filter_eventcb(struct bufferevent *underlying, short what, void *_me) { - struct bufferevent_filtered *bevf = me_; + struct bufferevent_filtered *bevf = _me; struct bufferevent *bev = downcast(bevf); - struct bufferevent_private *bufev_private = BEV_UPCAST(bev); - - BEV_LOCK(bev); - - // It's possible our refcount is 0 at this point if another thread free'd our filterevent - EVUTIL_ASSERT(bufev_private->refcnt >= 0); - - // If our refcount is > 0 - if (bufev_private->refcnt > 0) { - - /* All we can really to is tell our own eventcb. */ - bufferevent_run_eventcb_(bev, what, 0); - } - BEV_UNLOCK(bev); + _bufferevent_incref_and_lock(bev); + /* All we can really to is tell our own eventcb. */ + _bufferevent_run_eventcb(bev, what); + _bufferevent_decref_and_unlock(bev); } static int @@ -577,7 +475,7 @@ be_filter_flush(struct bufferevent *bufev, int processed_any = 0; EVUTIL_ASSERT(bevf); - bufferevent_incref_and_lock_(bufev); + _bufferevent_incref_and_lock(bufev); if (iotype & EV_READ) { be_filter_process_input(bevf, mode, &processed_any); @@ -589,7 +487,7 @@ be_filter_flush(struct bufferevent *bufev, /* XXX does this want to recursively call lower-level flushes? */ bufferevent_flush(bevf->underlying, iotype, mode); - bufferevent_decref_and_unlock_(bufev); + _bufferevent_decref_and_unlock(bufev); return processed_any; } @@ -604,20 +502,10 @@ be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, bevf = upcast(bev); data->ptr = bevf->underlying; return 0; - case BEV_CTRL_SET_FD: - bevf = upcast(bev); - - if (bevf->underlying && - bevf->underlying->be_ops && - bevf->underlying->be_ops->ctrl) { - return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data); - } - case BEV_CTRL_GET_FD: + case BEV_CTRL_SET_FD: case BEV_CTRL_CANCEL_ALL: default: return -1; } - - return -1; } |