diff options
author | Elliott Hughes <enh@google.com> | 2017-08-07 14:18:18 -0700 |
---|---|---|
committer | Elliott Hughes <enh@google.com> | 2017-08-07 16:58:18 -0700 |
commit | 2a572d125a91a4aafd3ad8ce87259fc640fa0763 (patch) | |
tree | bd05e575a4a2629cc420c5e2bc54f176fee611ab /bufferevent_filter.c | |
parent | 596447c7ff2881a67e7082c905112584c3e61a17 (diff) | |
download | libevent-2a572d125a91a4aafd3ad8ce87259fc640fa0763.tar.gz |
Upgrade to 2.1.8-stable (2017-01-22).
Bug: N/A
Test: builds
Change-Id: Idbbdc1db3d01984a4f4b60f8fdf455140b6b7ca6
Diffstat (limited to 'bufferevent_filter.c')
-rw-r--r-- | bufferevent_filter.c | 214 |
1 files changed, 163 insertions, 51 deletions
diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 557f8cc..d47f945 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -26,11 +26,13 @@ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "evconfig-private.h" + #include <sys/types.h> #include "event2/event-config.h" -#ifdef _EVENT_HAVE_SYS_TIME_H +#ifdef EVENT__HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -38,11 +40,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef _EVENT_HAVE_STDARG_H +#ifdef EVENT__HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef WIN32 +#ifdef _WIN32 #include <winsock2.h> #endif @@ -59,6 +61,7 @@ /* prototypes */ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); +static void be_filter_unlink(struct bufferevent *); static void be_filter_destruct(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); @@ -68,6 +71,9 @@ static int be_filter_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); +static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg); + static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *info, void *arg); @@ -76,6 +82,8 @@ struct bufferevent_filtered { /** The bufferevent that we read/write filtered data from/to. */ struct bufferevent *underlying; + /** A callback on our inbuf to notice somebory removes data */ + struct evbuffer_cb_entry *inbuf_cb; /** A callback on our outbuf to notice when somebody adds data */ struct evbuffer_cb_entry *outbuf_cb; /** True iff we have received an EOF callback from the underlying @@ -97,8 +105,9 @@ const struct bufferevent_ops bufferevent_ops_filter = { evutil_offsetof(struct bufferevent_filtered, bev.bev), be_filter_enable, be_filter_disable, + be_filter_unlink, be_filter_destruct, - _bufferevent_generic_adj_timeouts, + bufferevent_generic_adj_timeouts_, be_filter_flush, be_filter_ctrl, }; @@ -180,13 +189,13 @@ bufferevent_filter_new(struct bufferevent *underlying, if (!bufev_f) return NULL; - if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, + if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, &bufferevent_ops_filter, tmp_options) < 0) { mm_free(bufev_f); return NULL; } if (options & BEV_OPT_THREADSAFE) { - bufferevent_enable_locking(downcast(bufev_f), NULL); + bufferevent_enable_locking_(downcast(bufev_f), NULL); } bufev_f->underlying = underlying; @@ -199,28 +208,31 @@ bufferevent_filter_new(struct bufferevent *underlying, bufferevent_setcb(bufev_f->underlying, be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); + bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input, + bufferevent_filtered_inbuf_cb, bufev_f); + evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb, + EVBUFFER_CB_ENABLED); + bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); - _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); - bufferevent_incref(underlying); + bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); + bufferevent_incref_(underlying); bufferevent_enable(underlying, EV_READ|EV_WRITE); - bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ); + bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); return downcast(bufev_f); } static void -be_filter_destruct(struct bufferevent *bev) +be_filter_unlink(struct bufferevent *bev) { struct bufferevent_filtered *bevf = upcast(bev); EVUTIL_ASSERT(bevf); - if (bevf->free_context) - bevf->free_context(bevf->context); if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { - /* Yes, there is also a decref in bufferevent_decref. + /* Yes, there is also a decref in bufferevent_decref_. * That decref corresponds to the incref when we set * underlying for the first time. This decref is an * extra one to remove the last reference. @@ -236,12 +248,25 @@ be_filter_destruct(struct bufferevent *bev) if (bevf->underlying->errorcb == be_filter_eventcb) bufferevent_setcb(bevf->underlying, NULL, NULL, NULL, NULL); - bufferevent_unsuspend_read(bevf->underlying, + bufferevent_unsuspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } } +} + +static void +be_filter_destruct(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + EVUTIL_ASSERT(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); - _bufferevent_del_generic_timeout_cbs(bev); + if (bevf->inbuf_cb) + evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb); + + if (bevf->outbuf_cb) + evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb); } static int @@ -253,7 +278,7 @@ be_filter_enable(struct bufferevent *bev, short event) if (event & EV_READ) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - bufferevent_unsuspend_read(bevf->underlying, + bufferevent_unsuspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -267,7 +292,7 @@ be_filter_disable(struct bufferevent *bev, short event) BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); if (event & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); - bufferevent_suspend_read(bevf->underlying, + bufferevent_suspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -336,7 +361,8 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* disable the callback that calls this function when the user adds to the output buffer. */ - evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0); + evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb, + EVBUFFER_CB_ENABLED); do { int processed = 0; @@ -367,10 +393,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Or if we have filled the underlying output buffer. */ !be_underlying_writebuf_full(bevf,state)); - if (processed && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { + if (processed) { /* call the write callback.*/ - _bufferevent_run_writecb(bufev); + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -403,68 +428,145 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf, int processed_any = 0; /* Somebody added more data to the output buffer. Try to * process it, if we should. */ - _bufferevent_incref_and_lock(bev); + bufferevent_incref_and_lock_(bev); be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - _bufferevent_decref_and_unlock(bev); + bufferevent_decref_and_unlock_(bev); } } -/* Called when the underlying socket has read. */ static void -be_filter_readcb(struct bufferevent *underlying, void *_me) +be_filter_read_nolock_(struct bufferevent *underlying, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; enum bufferevent_filter_result res; enum bufferevent_flush_mode state; struct bufferevent *bufev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int processed_any = 0; - _bufferevent_incref_and_lock(bufev); + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); + + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { + + if (bevf->got_eof) + state = BEV_FINISHED; + else + state = BEV_NORMAL; + + /* XXXX use return value */ + res = be_filter_process_input(bevf, state, &processed_any); + (void)res; + + /* XXX This should be in process_input, not here. There are + * other places that can call process-input, and they should + * force readcb calls as needed. */ + if (processed_any) { + bufferevent_trigger_nolock_(bufev, EV_READ, 0); + if (evbuffer_get_length(underlying->input) > 0 && + be_readbuf_full(bevf, state)) { + /* data left in underlying buffer and filter input buffer + * hit its read high watermark. + * Schedule callback to avoid data gets stuck in underlying + * input buffer. + */ + evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + } + } + } +} + +/* Called when the size of our inbuf changes. */ +static void +bufferevent_filtered_inbuf_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, void *arg) +{ + struct bufferevent_filtered *bevf = arg; + enum bufferevent_flush_mode state; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); if (bevf->got_eof) state = BEV_FINISHED; else state = BEV_NORMAL; - /* XXXX use return value */ - res = be_filter_process_input(bevf, state, &processed_any); - (void)res; - /* XXX This should be in process_input, not here. There are - * other places that can call process-input, and they should - * force readcb calls as needed. */ - if (processed_any && - evbuffer_get_length(bufev->input) >= bufev->wm_read.low) - _bufferevent_run_readcb(bufev); + if (!be_readbuf_full(bevf, state)) { + /* opportunity to read data which was left in underlying + * input buffer because filter input buffer hit read + * high watermark. + */ + evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb, + EVBUFFER_CB_ENABLED); + if (evbuffer_get_length(bevf->underlying->input) > 0) + be_filter_read_nolock_(bevf->underlying, bevf); + } - _bufferevent_decref_and_unlock(bufev); + BEV_UNLOCK(bev); +} + +/* Called when the underlying socket has read. */ +static void +be_filter_readcb(struct bufferevent *underlying, void *me_) +{ + struct bufferevent_filtered *bevf = me_; + struct bufferevent *bev = downcast(bevf); + + BEV_LOCK(bev); + + be_filter_read_nolock_(underlying, me_); + + BEV_UNLOCK(bev); } /* Called when the underlying socket has drained enough that we can write to it. */ static void -be_filter_writecb(struct bufferevent *underlying, void *_me) +be_filter_writecb(struct bufferevent *underlying, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; struct bufferevent *bev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bev); int processed_any = 0; - _bufferevent_incref_and_lock(bev); - be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - _bufferevent_decref_and_unlock(bev); + BEV_LOCK(bev); + + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); + + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { + be_filter_process_output(bevf, BEV_NORMAL, &processed_any); + } + + BEV_UNLOCK(bev); } /* Called when the underlying socket has given us an error */ static void -be_filter_eventcb(struct bufferevent *underlying, short what, void *_me) +be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; struct bufferevent *bev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bev); + + BEV_LOCK(bev); + + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); + + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { + + /* All we can really to is tell our own eventcb. */ + bufferevent_run_eventcb_(bev, what, 0); + } - _bufferevent_incref_and_lock(bev); - /* All we can really to is tell our own eventcb. */ - _bufferevent_run_eventcb(bev, what); - _bufferevent_decref_and_unlock(bev); + BEV_UNLOCK(bev); } static int @@ -475,7 +577,7 @@ be_filter_flush(struct bufferevent *bufev, int processed_any = 0; EVUTIL_ASSERT(bevf); - _bufferevent_incref_and_lock(bufev); + bufferevent_incref_and_lock_(bufev); if (iotype & EV_READ) { be_filter_process_input(bevf, mode, &processed_any); @@ -487,7 +589,7 @@ be_filter_flush(struct bufferevent *bufev, /* XXX does this want to recursively call lower-level flushes? */ bufferevent_flush(bevf->underlying, iotype, mode); - _bufferevent_decref_and_unlock(bufev); + bufferevent_decref_and_unlock_(bufev); return processed_any; } @@ -502,10 +604,20 @@ be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, bevf = upcast(bev); data->ptr = bevf->underlying; return 0; - case BEV_CTRL_GET_FD: case BEV_CTRL_SET_FD: + bevf = upcast(bev); + + if (bevf->underlying && + bevf->underlying->be_ops && + bevf->underlying->be_ops->ctrl) { + return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data); + } + + case BEV_CTRL_GET_FD: case BEV_CTRL_CANCEL_ALL: default: return -1; } + + return -1; } |