Line data Source code
1 : /* event.c
2 : ** strophe XMPP client library -- event loop and management
3 : **
4 : ** Copyright (C) 2005-2009 Collecta, Inc.
5 : **
6 : ** This software is provided AS-IS with no warranty, either express
7 : ** or implied.
8 : **
9 : ** This program is dual licensed under the MIT and GPLv3 licenses.
10 : */
11 :
12 : /** @file
13 : * Event loop and management.
14 : */
15 :
16 : /** @defgroup EventLoop Event loop
17 : * These functions manage the Strophe event loop.
18 : *
19 : * Simple tools can use xmpp_run() and xmpp_stop() to manage the life
20 : * cycle of the program. A common idiom is to set up a few initial
21 : * event handers, call xmpp_run(), and then respond and react to
22 : * events as they come in. At some point, one of the handlers will
23 : * call xmpp_stop() to quit the event loop which leads to the program
24 : * terminating.
25 : *
26 : * More complex programs will have their own event loops, and should
27 : * ensure that xmpp_run_once() is called regularly from there. For
28 : * example, a GUI program will already include an event loop to
29 : * process UI events from users, and xmpp_run_once() would be called
30 : * from an idle function.
31 : */
32 :
33 : #include <stdio.h>
34 : #include <stdlib.h>
35 : #include <string.h>
36 :
37 : #ifndef _WIN32
38 : #include <sys/select.h>
39 : #include <errno.h>
40 : #include <unistd.h>
41 : #define _sleep(x) usleep((x)*1000)
42 : #else
43 : #include <winsock2.h>
44 : #ifndef ETIMEDOUT
45 : #define ETIMEDOUT WSAETIMEDOUT
46 : #endif
47 : #ifndef ECONNRESET
48 : #define ECONNRESET WSAECONNRESET
49 : #endif
50 : #ifndef ECONNABORTED
51 : #define ECONNABORTED WSAECONNABORTED
52 : #endif
53 : #define _sleep(x) Sleep(x)
54 : #endif
55 :
56 : #include "strophe.h"
57 : #include "common.h"
58 : #include "parser.h"
59 :
60 : /** Max buffer size for receiving messages. */
61 : #define STROPE_MESSAGE_BUFFER_SIZE 4096
62 :
63 0 : static int _connect_next(xmpp_conn_t *conn)
64 : {
65 0 : sock_close(conn->sock);
66 0 : conn->sock = sock_connect(conn->xsock);
67 0 : if (conn->sock == INVALID_SOCKET)
68 : return -1;
69 :
70 0 : conn->timeout_stamp = time_stamp();
71 :
72 0 : return 0;
73 : }
74 :
75 : /** Run the event loop once.
76 : * This function will run send any data that has been queued by
77 : * xmpp_send and related functions and run through the Strophe even
78 : * loop a single time, and will not wait more than timeout
79 : * milliseconds for events. This is provided to support integration
80 : * with event loops outside the library, and if used, should be
81 : * called regularly to achieve low latency event handling.
82 : *
83 : * @param ctx a Strophe context object
84 : * @param timeout time to wait for events in milliseconds
85 : *
86 : * @ingroup EventLoop
87 : */
88 0 : void xmpp_run_once(xmpp_ctx_t *ctx, unsigned long timeout)
89 : {
90 0 : xmpp_connlist_t *connitem;
91 0 : xmpp_conn_t *conn;
92 0 : fd_set rfds, wfds;
93 0 : sock_t max = 0;
94 0 : int ret;
95 0 : struct timeval tv;
96 0 : xmpp_send_queue_t *sq, *tsq;
97 0 : int towrite;
98 0 : char buf[STROPE_MESSAGE_BUFFER_SIZE];
99 0 : uint64_t next;
100 0 : uint64_t usec;
101 0 : int tls_read_bytes = 0;
102 :
103 0 : if (ctx->loop_status == XMPP_LOOP_QUIT)
104 0 : return;
105 :
106 : /* send queued data */
107 0 : connitem = ctx->connlist;
108 0 : while (connitem) {
109 0 : conn = connitem->conn;
110 0 : if (conn->state != XMPP_STATE_CONNECTED) {
111 0 : connitem = connitem->next;
112 0 : continue;
113 : }
114 :
115 : /* if we're running tls, there may be some remaining data waiting to
116 : * be sent, so push that out */
117 0 : if (conn->tls) {
118 0 : ret = tls_clear_pending_write(conn->tls);
119 :
120 0 : if (ret < 0 && !tls_is_recoverable(tls_error(conn->tls))) {
121 : /* an error occurred */
122 0 : strophe_debug(ctx, "xmpp",
123 : "Send error occurred, disconnecting.");
124 0 : conn->error = ECONNABORTED;
125 0 : conn_disconnect(conn);
126 0 : goto next_item;
127 : }
128 : }
129 :
130 : /* write all data from the send queue to the socket */
131 0 : sq = conn->send_queue_head;
132 0 : while (sq) {
133 0 : towrite = sq->len - sq->written;
134 :
135 0 : if (conn->tls) {
136 0 : ret = tls_write(conn->tls, &sq->data[sq->written], towrite);
137 0 : if (ret < 0 && !tls_is_recoverable(tls_error(conn->tls)))
138 0 : conn->error = tls_error(conn->tls);
139 : } else {
140 0 : ret = sock_write(conn->sock, &sq->data[sq->written], towrite);
141 0 : if (ret < 0 && !sock_is_recoverable(sock_error()))
142 0 : conn->error = sock_error();
143 : }
144 0 : if (ret > 0 && ret < towrite)
145 0 : sq->written += ret; /* not all data could be sent now */
146 0 : sq->wip = 1;
147 0 : if (ret != towrite)
148 : break; /* partial write or an error */
149 :
150 : /* all data for this queue item written, delete and move on */
151 0 : strophe_debug(conn->ctx, "conn", "SENT: %s", sq->data);
152 0 : strophe_debug_verbose(1, ctx, "xmpp", "Q_SENT: %p", sq);
153 0 : tsq = sq;
154 0 : sq = sq->next;
155 0 : conn->send_queue_len--;
156 0 : if (tsq->owner & XMPP_QUEUE_USER)
157 0 : conn->send_queue_user_len--;
158 0 : if (!(tsq->owner & XMPP_QUEUE_SM) && conn->sm_state->sm_enabled) {
159 0 : tsq->sm_h = conn->sm_state->sm_sent_nr;
160 0 : conn->sm_state->sm_sent_nr++;
161 0 : strophe_debug_verbose(1, ctx, "xmpp", "SM_Q_MOVE: %p", tsq);
162 0 : add_queue_back(&conn->sm_state->sm_queue, tsq);
163 : tsq = NULL;
164 : }
165 : if (tsq) {
166 0 : strophe_debug_verbose(2, ctx, "xmpp", "Q_FREE: %p", tsq);
167 0 : strophe_debug_verbose(3, ctx, "conn", "Q_CONTENT: %s",
168 : tsq->data);
169 0 : strophe_free(ctx, tsq->data);
170 0 : strophe_free(ctx, tsq);
171 : }
172 :
173 : /* pop the top item */
174 0 : conn->send_queue_head = sq;
175 : /* if we've sent everything update the tail */
176 0 : if (!sq)
177 0 : conn->send_queue_tail = NULL;
178 : }
179 :
180 : /* tear down connection on error */
181 0 : if (conn->error) {
182 : /* FIXME: need to tear down send queues and random other things
183 : * maybe this should be abstracted */
184 0 : strophe_debug(ctx, "xmpp", "Send error occurred, disconnecting.");
185 0 : conn->error = ECONNABORTED;
186 0 : conn_disconnect(conn);
187 : }
188 0 : next_item:
189 0 : connitem = connitem->next;
190 : }
191 :
192 : /* reset parsers if needed */
193 0 : for (connitem = ctx->connlist; connitem; connitem = connitem->next) {
194 0 : if (connitem->conn->reset_parser)
195 0 : conn_parser_reset(connitem->conn);
196 : }
197 :
198 : /* fire any ready timed handlers, then make sure we don't wait past
199 : the time when timed handlers need to be called */
200 0 : next = handler_fire_timed(ctx);
201 :
202 0 : usec = ((next < timeout) ? next : timeout) * 1000;
203 0 : tv.tv_sec = (long)(usec / 1000000);
204 0 : tv.tv_usec = (long)(usec % 1000000);
205 :
206 0 : FD_ZERO(&rfds);
207 0 : FD_ZERO(&wfds);
208 :
209 : /* find events to watch */
210 0 : connitem = ctx->connlist;
211 0 : while (connitem) {
212 0 : conn = connitem->conn;
213 :
214 0 : switch (conn->state) {
215 0 : case XMPP_STATE_CONNECTING:
216 : /* connect has been called and we're waiting for it to complete */
217 : /* connection will give us write or error events */
218 :
219 : /* make sure the timeout hasn't expired */
220 0 : if (time_elapsed(conn->timeout_stamp, time_stamp()) <=
221 0 : conn->connect_timeout)
222 0 : FD_SET(conn->sock, &wfds);
223 : else {
224 0 : strophe_info(ctx, "xmpp", "Connection attempt timed out.");
225 0 : ret = _connect_next(conn);
226 0 : if (ret != 0) {
227 0 : conn->error = ETIMEDOUT;
228 0 : conn_disconnect(conn);
229 : } else {
230 0 : FD_SET(conn->sock, &wfds);
231 : }
232 : }
233 : break;
234 0 : case XMPP_STATE_CONNECTED:
235 0 : FD_SET(conn->sock, &rfds);
236 0 : if (conn->send_queue_len > 0)
237 0 : FD_SET(conn->sock, &wfds);
238 : break;
239 : case XMPP_STATE_DISCONNECTED:
240 : /* do nothing */
241 : default:
242 : break;
243 : }
244 :
245 : /* Check if there is something in the SSL buffer. */
246 0 : if (conn->tls)
247 0 : tls_read_bytes += tls_pending(conn->tls);
248 :
249 0 : if (conn->state != XMPP_STATE_DISCONNECTED && conn->sock > max)
250 0 : max = conn->sock;
251 :
252 0 : connitem = connitem->next;
253 : }
254 :
255 : /* check for events */
256 0 : if (max > 0)
257 0 : ret = select(max + 1, &rfds, &wfds, NULL, &tv);
258 : else {
259 0 : if (timeout > 0)
260 0 : _sleep(timeout);
261 0 : return;
262 : }
263 :
264 : /* select errored */
265 0 : if (ret < 0) {
266 0 : if (!sock_is_recoverable(sock_error()))
267 0 : strophe_error(ctx, "xmpp", "event watcher internal error %d",
268 : sock_error());
269 0 : return;
270 : }
271 :
272 : /* no events happened */
273 0 : if (ret == 0 && tls_read_bytes == 0)
274 : return;
275 :
276 : /* process events */
277 0 : connitem = ctx->connlist;
278 0 : while (connitem) {
279 0 : conn = connitem->conn;
280 :
281 0 : switch (conn->state) {
282 0 : case XMPP_STATE_CONNECTING:
283 0 : if (FD_ISSET(conn->sock, &wfds)) {
284 : /* connection complete */
285 :
286 : /* check for error */
287 0 : ret = sock_connect_error(conn->sock);
288 0 : if (ret != 0) {
289 : /* connection failed */
290 0 : strophe_debug(ctx, "xmpp", "connection failed, error %d",
291 : ret);
292 0 : ret = _connect_next(conn);
293 0 : if (ret != 0) {
294 0 : conn->error = ret;
295 0 : conn_disconnect(conn);
296 : }
297 : break;
298 : }
299 :
300 0 : conn->state = XMPP_STATE_CONNECTED;
301 0 : strophe_debug(ctx, "xmpp", "connection successful");
302 0 : conn_established(conn);
303 : }
304 :
305 : break;
306 0 : case XMPP_STATE_CONNECTED:
307 0 : if (FD_ISSET(conn->sock, &rfds) ||
308 0 : (conn->tls && tls_pending(conn->tls))) {
309 0 : if (conn->tls) {
310 0 : ret = tls_read(conn->tls, buf, STROPE_MESSAGE_BUFFER_SIZE);
311 : } else {
312 0 : ret =
313 0 : sock_read(conn->sock, buf, STROPE_MESSAGE_BUFFER_SIZE);
314 : }
315 :
316 0 : if (ret > 0) {
317 0 : ret = parser_feed(conn->parser, buf, ret);
318 0 : if (!ret) {
319 0 : strophe_debug(ctx, "xmpp", "parse error [%s]", buf);
320 0 : xmpp_send_error(conn, XMPP_SE_INVALID_XML,
321 : "parse error");
322 : }
323 : } else {
324 0 : if (conn->tls) {
325 0 : if (!tls_is_recoverable(tls_error(conn->tls))) {
326 0 : strophe_debug(ctx, "xmpp",
327 : "Unrecoverable TLS error, %d.",
328 : tls_error(conn->tls));
329 0 : conn->error = tls_error(conn->tls);
330 0 : conn_disconnect(conn);
331 : }
332 : } else {
333 : /* return of 0 means socket closed by server */
334 0 : strophe_debug(ctx, "xmpp",
335 : "Socket closed by remote host.");
336 0 : conn->error = ECONNRESET;
337 0 : conn_disconnect(conn);
338 : }
339 : }
340 : }
341 :
342 : break;
343 : case XMPP_STATE_DISCONNECTED:
344 : /* do nothing */
345 : default:
346 : break;
347 : }
348 :
349 0 : connitem = connitem->next;
350 : }
351 :
352 : /* fire any ready handlers */
353 0 : handler_fire_timed(ctx);
354 : }
355 :
356 : /** Start the event loop.
357 : * This function continuously calls xmpp_run_once and does not return
358 : * until xmpp_stop has been called.
359 : *
360 : * @param ctx a Strophe context object
361 : *
362 : * @ingroup EventLoop
363 : */
364 0 : void xmpp_run(xmpp_ctx_t *ctx)
365 : {
366 0 : if (ctx->loop_status != XMPP_LOOP_NOTSTARTED)
367 : return;
368 :
369 0 : ctx->loop_status = XMPP_LOOP_RUNNING;
370 0 : while (ctx->loop_status == XMPP_LOOP_RUNNING) {
371 0 : xmpp_run_once(ctx, ctx->timeout);
372 : }
373 :
374 : /* make it possible to start event loop again */
375 0 : ctx->loop_status = XMPP_LOOP_NOTSTARTED;
376 :
377 0 : strophe_debug(ctx, "event", "Event loop completed.");
378 : }
379 :
380 : /** Stop the event loop.
381 : * This will stop the event loop after the current iteration and cause
382 : * xmpp_run to exit.
383 : *
384 : * @param ctx a Strophe context object
385 : *
386 : * @ingroup EventLoop
387 : */
388 0 : void xmpp_stop(xmpp_ctx_t *ctx)
389 : {
390 0 : strophe_debug(ctx, "event", "Stopping event loop.");
391 :
392 0 : if (ctx->loop_status == XMPP_LOOP_RUNNING)
393 0 : ctx->loop_status = XMPP_LOOP_QUIT;
394 0 : }
395 :
396 : /** Set the timeout to use when calling xmpp_run().
397 : *
398 : * @param ctx a Strophe context object
399 : * @param timeout the time to wait for events in milliseconds
400 : *
401 : * @ingroup EventLoop
402 : */
403 0 : void xmpp_ctx_set_timeout(xmpp_ctx_t *ctx, unsigned long timeout)
404 : {
405 0 : ctx->timeout = timeout;
406 0 : }
|