aboutsummaryrefslogtreecommitdiff
path: root/bufferevent_async.c
diff options
context:
space:
mode:
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r--bufferevent_async.c66
1 files changed, 43 insertions, 23 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 6395e57a9f0c..40c7c5e8d0d3 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -46,6 +46,7 @@
#ifdef _WIN32
#include <winsock2.h>
+#include <winerror.h>
#include <ws2tcpip.h>
#endif
@@ -100,11 +101,32 @@ const struct bufferevent_ops bufferevent_ops_async = {
be_async_ctrl,
};
+static inline void
+be_async_run_eventcb(struct bufferevent *bev, short what, int options)
+{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
+
+static inline void
+be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
+{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
+
+static inline int
+fatal_error(int err)
+{
+ switch (err) {
+ /* We may have already associated this fd with a port.
+ * Let's hope it's this port, and that the error code
+ * for doing this neer changes. */
+ case ERROR_INVALID_PARAMETER:
+ return 0;
+ }
+ return 1;
+}
+
static inline struct bufferevent_async *
upcast(struct bufferevent *bev)
{
struct bufferevent_async *bev_a;
- if (bev->be_ops != &bufferevent_ops_async)
+ if (!BEV_IS_ASYNC(bev))
return NULL;
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
return bev_a;
@@ -217,7 +239,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
&beva->write_overlapped)) {
bufferevent_decref_(bev);
beva->ok = 0;
- bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
+ be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
} else {
beva->write_in_progress = at_most;
bufferevent_decrement_write_buckets_(&beva->bev, at_most);
@@ -270,7 +292,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
bufferevent_incref_(bev);
if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
- bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
+ be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
bufferevent_decref_(bev);
} else {
beva->read_in_progress = at_most;
@@ -381,10 +403,10 @@ be_async_destruct(struct bufferevent *bev)
bev_async_del_write(bev_async);
fd = evbuffer_overlapped_get_fd_(bev->input);
- if (fd != (evutil_socket_t)INVALID_SOCKET &&
+ if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
evutil_closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
+ evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
}
}
@@ -428,8 +450,7 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
else
bev_async_set_wsa_error(bev, eo);
- bufferevent_run_eventcb_(bev,
- ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
+ be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
event_base_del_virtual_(bev->ev_base);
@@ -459,16 +480,16 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- bufferevent_trigger_nolock_(bev, EV_READ, 0);
+ be_async_trigger_nolock(bev, EV_READ, 0);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
}
}
@@ -502,16 +523,16 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
+ be_async_trigger_nolock(bev, EV_WRITE, 0);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
- bufferevent_run_eventcb_(bev, what, 0);
+ be_async_run_eventcb(bev, what, 0);
}
}
@@ -532,11 +553,7 @@ bufferevent_async_new_(struct event_base *base,
return NULL;
if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
- int err = GetLastError();
- /* We may have alrady associated this fd with a port.
- * Let's hope it's this port, and that the error code
- * for doing this neer changes. */
- if (err != ERROR_INVALID_PARAMETER)
+ if (fatal_error(GetLastError()))
return NULL;
}
@@ -580,7 +597,6 @@ bufferevent_async_set_connected_(struct bufferevent *bev)
{
struct bufferevent_async *bev_async = upcast(bev);
bev_async->ok = 1;
- bufferevent_init_generic_timeout_cbs_(bev);
/* Now's a good time to consider reading/writing */
be_async_enable(bev, bev->enabled);
}
@@ -654,25 +670,29 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
data->fd = evbuffer_overlapped_get_fd_(bev->input);
return 0;
case BEV_CTRL_SET_FD: {
+ struct bufferevent_async *bev_a = upcast(bev);
struct event_iocp_port *iocp;
if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
return 0;
if (!(iocp = event_base_get_iocp_(bev->ev_base)))
return -1;
- if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
- return -1;
+ if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
+ if (fatal_error(GetLastError()))
+ return -1;
+ }
evbuffer_overlapped_set_fd_(bev->input, data->fd);
evbuffer_overlapped_set_fd_(bev->output, data->fd);
+ bev_a->ok = data->fd >= 0;
return 0;
}
case BEV_CTRL_CANCEL_ALL: {
struct bufferevent_async *bev_a = upcast(bev);
evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
- if (fd != (evutil_socket_t)INVALID_SOCKET &&
+ if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
(bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
closesocket(fd);
- evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
+ evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
}
bev_a->ok = 0;
return 0;