File Coverage

EVx.xs
Criterion Covered Total %
statement 196 300 65.3
branch 85 196 43.3
condition n/a
subroutine n/a
pod n/a
total 281 496 56.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "wslay/wslay.h"
5             #include "EVAPI.h"
6            
7             //windows
8             #ifdef WIN32
9             #ifndef EWOULDBLOCK
10             #define EWOULDBLOCK WSAEWOULDBLOCK
11             #endif
12             #else
13             #ifndef EWOULDBLOCK
14             #define EWOULDBLOCK EAGAIN
15             #endif
16             #endif
17            
18             #define FRAGMENTED_EOF 0
19             #define FRAGMENTED_ERROR -1
20             #define FRAGMENTED_DATA 1
21            
22             typedef struct {
23             wslay_event_context_ptr ctx;
24             HV* perl_callbacks;
25             ev_io io;
26             SV* queue_wait_cb;
27             struct wslay_event_callbacks callbacks;
28             char read_stopped;
29             char write_stopped;
30             } websocket_object;
31            
32             static void wait_io_event(websocket_object* websock_object);
33            
34 5           static ssize_t recv_callback(wslay_event_context_ptr ctx, uint8_t* buf, size_t len, int flags, void* data) {
35 5           websocket_object* websock_object = (websocket_object*) data;
36             ssize_t r;
37 5 100         while ((r = recv(websock_object->io.fd, buf, len, 0)) == -1 && errno == EINTR);
    50          
38 5 100         if (r == -1) {
39 2 50         if (errno == EAGAIN || errno == EWOULDBLOCK) {
    0          
40 2           wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
41             } else {
42 2           wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
43             }
44 3 50         } else if (r == 0) { /* Unexpected EOF is also treated as an error */
45 0           wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
46 0           r = -1;
47             }
48 5           return r;
49             }
50            
51 6           static ssize_t send_callback(wslay_event_context_ptr ctx, const uint8_t* buf, size_t len, int flags, void* data) {
52 6           websocket_object* websock_object = (websocket_object*) data;
53             ssize_t r;
54 6           int sflags = 0;
55             #ifdef MSG_MORE
56 6 100         if(flags & WSLAY_MSG_MORE) { sflags |= MSG_MORE; }
57             #endif // MSG_MORE
58 6 50         while ((r = send(websock_object->io.fd, buf, len, sflags)) == -1 && errno == EINTR);
    0          
59 6 50         if (r == -1) {
60 0 0         if(errno == EAGAIN || errno == EWOULDBLOCK) {
    0          
61 0           wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
62             } else {
63 0           wslay_event_set_error(ctx, WSLAY_ERR_CALLBACK_FAILURE);
64             }
65             }
66 6           return r;
67             }
68            
69 8           static websocket_object* get_wslay_context (HV* hv) {
70             MAGIC* mg;
71 8 50         for (mg = SvMAGIC((SV*) hv); mg; mg = mg->mg_moremagic) {
72 8 50         if (mg->mg_type == PERL_MAGIC_ext && mg->mg_virtual == NULL) {
    50          
73 8           return (websocket_object*) mg->mg_ptr;
74             }
75             }
76 0           croak("Can't get ptr from object hash!\n");
77             }
78            
79 3           int genmask_callback(wslay_event_context_ptr ctx, uint8_t* buf, size_t len, void* data) {
80 3           websocket_object* websock_object = (websocket_object*) data;
81             SV** cb;
82 3 50         if (cb = hv_fetch(websock_object->perl_callbacks , "genmask", 7, 0)) {
83 0           dSP;
84 0           ENTER;
85 0           SAVETMPS;
86 0 0         PUSHMARK(SP);
87 0 0         XPUSHs(sv_2mortal(newSViv(len)));
88 0           PUTBACK;
89 0           int count = call_sv(*cb, G_SCALAR);
90 0           SPAGAIN;
91             int status;
92 0 0         if (count != 1) { croak("Wslay - genmask callback returned bad value!\n"); }
93 0           SV* data = POPs;
94             STRLEN souce_len;
95 0 0         char *source_buf = SvPV(data, souce_len);
96 0 0         if (souce_len) { memcpy(buf, source_buf, (souce_len < len ? souce_len : len)); }
97 0           PUTBACK;
98 0 0         FREETMPS;
99 0           LEAVE;
100 0           return 0;
101             };
102 3           int i = 0;
103 15 100         for(; i < len; i++){ buf[i] = (char) rand(); }
104 3           return 0;
105             }
106            
107 3           static void on_frame_recv_start_callback (wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_start_arg* frame, void* data) {
108             SV** cb;
109 3 50         if (!(cb = hv_fetch(((websocket_object*) data)->perl_callbacks, "on_frame_recv_start", 19, 0)) ) {
110 0           warn("Wslay - cant find on_frame_recv_start callback!\n");
111 0           return;
112             }
113 3           dSP;
114 3           ENTER;
115 3           SAVETMPS;
116 3 50         PUSHMARK(SP);
117 3 50         EXTEND(SP, 4);
118 3           PUSHs(sv_2mortal(newSViv(frame->fin)));
119 3           PUSHs(sv_2mortal(newSViv(frame->rsv)));
120 3           PUSHs(sv_2mortal(newSViv(frame->opcode)));
121 3           PUSHs(sv_2mortal(newSViv(frame->payload_length)));
122 3           PUTBACK;
123 3           call_sv(*cb, G_VOID);
124 3 50         FREETMPS;
125 3           LEAVE;
126             }
127            
128 3           static void on_frame_recv_chunk_callback (wslay_event_context_ptr ctx, const struct wslay_event_on_frame_recv_chunk_arg* chunk, void* data) {
129             SV** cb;
130 3 50         if (!(cb = hv_fetch(((websocket_object*) data)->perl_callbacks, "on_frame_recv_chunk", 19, 0))) {
131 0           warn("Wslay - cant find on_frame_recv_chunk callback!\n");
132 0           return;
133             }
134 3           dSP;
135 3           ENTER;
136 3           SAVETMPS;
137 3 50         PUSHMARK(SP);
138 3 50         EXTEND(SP, 1);
139 3           PUSHs(sv_2mortal(newSVpvn(chunk->data, chunk->data_length)));
140 3           PUTBACK;
141 3           call_sv(*cb, G_VOID);
142 3 50         FREETMPS;
143 3           LEAVE;
144             }
145            
146 3           static void on_frame_recv_end_callback(wslay_event_context_ptr ctx, void* data) {
147             SV** cb;
148 3 50         if (!(cb = hv_fetch(((websocket_object*) data)->perl_callbacks, "on_frame_recv_end", 17, 0))) {
149 0           warn("Wslay - cant find on_frame_recv_end callback!\n");
150 0           return;
151             }
152 3           dSP;
153 3 50         PUSHMARK(SP);
154 3           call_sv(*cb, G_DISCARD|G_NOARGS);
155             }
156            
157 2           static void on_msg_recv_callback(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg* msg, void* data) {
158 2 50         if (msg->opcode == 0x08) { return; } // on_close callback is for close messages
159             SV** cb;
160 2 50         if (!(cb = hv_fetch(((websocket_object*) data)->perl_callbacks, "on_msg_recv", 11, 0))) {
161 0           warn("Wslay - cant find on_msg_recv callback!\n");
162 0           return;
163             }
164 2           SV* msg_data = newSVpvn(msg->msg, msg->msg_length);
165 2 50         if (!(msg->rsv & WSLAY_RSV1_BIT) && msg->opcode == 1) { SvUTF8_on(msg_data); }
    100          
166 2           dSP;
167 2           ENTER;
168 2           SAVETMPS;
169 2 50         PUSHMARK(SP);
170 2 50         EXTEND(SP, 4);
171 2           PUSHs(sv_2mortal(newSViv(msg->rsv)));
172 2           PUSHs(sv_2mortal(newSViv(msg->opcode)));
173 2           PUSHs(sv_2mortal(msg_data));
174 2           PUSHs(sv_2mortal(newSViv(msg->status_code)));
175 2           PUTBACK;
176 2           call_sv(*cb, G_VOID);
177 1 50         FREETMPS;
178 1           LEAVE;
179             }
180            
181 3           ssize_t fragmented_msg_callback(wslay_event_context_ptr ctx, uint8_t* buf, size_t len, const union wslay_event_msg_source* source, int* eof, void* userdata) {
182 3           websocket_object* websock_object = (websocket_object*) userdata;
183 3           ssize_t bytes_written = 0;
184 3           dSP;
185 3           ENTER;
186 3           SAVETMPS;
187 3 50         PUSHMARK(SP);
188 3 50         XPUSHs(sv_2mortal(newSViv(len)));
189 3           PUTBACK;
190 3           int count = call_sv((SV*) source->data, G_ARRAY);
191 3           SPAGAIN;
192             SV* data;
193             int status;
194 3 100         if (count == 1) {
195 2           status = FRAGMENTED_DATA;
196 2           data = POPs;
197 1 50         } else if (count == 2) {
198 1 50         status = POPi;
199 1           data = POPs;
200             } else {
201 0           croak("Wslay - fragmented msg cb MUST return one or two elements! \n");
202             }
203             STRLEN souce_len;
204 3 50         char* source_buf = SvPV(data, souce_len);
205 3 100         if (souce_len) {
206 2           bytes_written = (souce_len < len ? souce_len : len );
207 2           memcpy(buf, source_buf, bytes_written);
208             }
209 3           PUTBACK;
210 3 50         FREETMPS;
211 3           LEAVE;
212 3 100         if (status == FRAGMENTED_EOF) {
213 1           *eof = 1;
214 1           SvREFCNT_dec((SV*) source->data);
215 2 50         } else if (status == FRAGMENTED_ERROR) {
216 0           bytes_written = -1;
217 0           wslay_event_set_error(websock_object->ctx, WSLAY_ERR_CALLBACK_FAILURE);
218 0           SvREFCNT_dec((SV*) source->data);
219             }
220             // else - FRAGMENTED_DATA
221 3           return bytes_written;
222             }
223            
224             //////////////////////
225 2           static void close_connection(websocket_object* websock_object) {
226 2           int status = wslay_event_get_status_code_received(websock_object->ctx);
227 2           wslay_event_context_free(websock_object->ctx);
228 2           websock_object->ctx = NULL;
229 2           close(websock_object->io.fd);
230 2           ev_io_stop(EV_DEFAULT, &(websock_object->io));
231             SV** cb;
232 2 50         if (cb = hv_fetch(websock_object->perl_callbacks, "on_close", 8, 0)) {
233 2           dSP;
234 2           ENTER;
235 2           SAVETMPS;
236 2 50         PUSHMARK(SP);
237 2 50         EXTEND(SP, 1);
238 2           PUSHs(sv_2mortal(newSViv(status)));
239 2           PUTBACK;
240 2           call_sv(*cb, G_VOID);
241 2 50         FREETMPS;
242 2           LEAVE;
243             };
244 2           }
245            
246 6           static void wslay_io_event (struct ev_loop* loop, struct ev_io* w, int revents) {
247 6           websocket_object* websock_object = (websocket_object*) w->data;
248 6 100         if (revents & EV_READ) {
249 3 50         if (wslay_event_recv(websock_object->ctx)) {
250 0           close_connection(websock_object);
251 0           return;
252             }
253             }
254 5 100         if (revents & EV_WRITE) {
255 3 50         if (wslay_event_send(websock_object->ctx)) {
256 0           close_connection(websock_object);
257 0           return;
258             }
259             }
260 5           wait_io_event(websock_object);
261             };
262            
263 11           static void wait_io_event(websocket_object* websock_object) {
264 11           ev_io_stop(EV_DEFAULT, &(websock_object->io));
265 11 50         if (websock_object->read_stopped && websock_object->write_stopped) { return; }
    0          
266 11           int events = 0;
267 11           char wanted_io = 0;
268 11 50         if (wslay_event_want_read(websock_object->ctx)) {
269 11 50         if (!websock_object->read_stopped) { events |= EV_READ; }
270 11           wanted_io = 1;
271             }
272 11 100         if (wslay_event_want_write(websock_object->ctx)) {
273 5 100         if (!websock_object->write_stopped) { events |= EV_WRITE; }
274 5           wanted_io = 1;
275 6 50         } else if (
276 0 0         websock_object->queue_wait_cb &&
277 0           !wslay_event_get_queued_msg_count(websock_object->ctx)
278             ) {
279 0           dSP;
280 0 0         PUSHMARK(SP);
281 0           call_sv(websock_object->queue_wait_cb, G_DISCARD|G_NOARGS);
282 0           SvREFCNT_dec(websock_object->queue_wait_cb);
283 0           websock_object->queue_wait_cb = NULL;
284             // recheck want write, because user might queue messages in perl callback
285 0 0         if (wslay_event_want_write(websock_object->ctx)) {
286 0 0         if (!websock_object->write_stopped) { events |= EV_WRITE; }
287 0           wanted_io = 1;
288             }
289             }
290            
291 11 50         if (events) {
292 11           ev_io_set(&(websock_object->io), websock_object->io.fd, events);
293 11           ev_io_start(EV_DEFAULT, &(websock_object->io));
294 0 0         } else if (!wanted_io) {
295 0           close_connection(websock_object);
296             }
297            
298             };
299            
300            
301             MODULE = Net::WebSocket::EVx PACKAGE = Net::WebSocket::EVx
302            
303            
304             BOOT:
305             {
306 1 50         I_EV_API("Net::WebSocket::EVx");
    50          
    50          
    50          
307             #ifdef WIN32
308             _setmaxstdio(2048);
309             #endif
310             }
311            
312             PROTOTYPES: DISABLE
313            
314             void _wslay_event_context_init(object, sock, is_server)
315             HV* object
316             int sock
317             int is_server
318             CODE:
319 2           websocket_object* websock_object = malloc(sizeof(websocket_object));
320 2           ev_io_init(&(websock_object->io), wslay_io_event, sock, EV_READ);
321 2           websock_object->io.data = (SV*) websock_object;
322 2           websock_object->queue_wait_cb = NULL;
323 2           websock_object->read_stopped = 0;
324 2           websock_object->write_stopped = 0;
325 2           sv_magicext((SV*) object, 0, PERL_MAGIC_ext, NULL, (const char *) websock_object, 0);
326 2           websock_object->perl_callbacks = object;
327 2           websock_object->callbacks.recv_callback = recv_callback;
328 2           websock_object->callbacks.send_callback = send_callback;
329 2           websock_object->callbacks.genmask_callback = genmask_callback;
330 2 50         websock_object->callbacks.on_frame_recv_start_callback = hv_exists(object, "on_frame_recv_start", strlen("on_frame_recv_start")) ? on_frame_recv_start_callback : NULL;
331 2 50         websock_object->callbacks.on_frame_recv_chunk_callback = hv_exists(object, "on_frame_recv_chunk", strlen("on_frame_recv_chunk")) ? on_frame_recv_chunk_callback : NULL;
332 2 50         websock_object->callbacks.on_frame_recv_end_callback = hv_exists(object, "on_frame_recv_end", strlen("on_frame_recv_end")) ? on_frame_recv_end_callback : NULL;
333 2 50         websock_object->callbacks.on_msg_recv_callback = hv_exists(object, "on_msg_recv", strlen("on_msg_recv")) ? on_msg_recv_callback : NULL;
334 4 100         if (is_server
    50          
335 1           ? wslay_event_context_server_init(&(websock_object->ctx), &(websock_object->callbacks), websock_object)
336 1           : wslay_event_context_client_init(&(websock_object->ctx), &(websock_object->callbacks), websock_object)
337             ) {
338 0           croak("Can't initialize! WSLAY_ERR_NOMEM \n");
339             } else {
340 2           wslay_event_config_set_allowed_rsv_bits(websock_object->ctx, WSLAY_RSV1_BIT);
341             }
342 2           wait_io_event(websock_object);
343            
344             void _wslay_event_config_set_no_buffering (object, buffering)
345             HV* object
346             int buffering
347             CODE:
348 2           websocket_object* websock_object = get_wslay_context(object);
349 2           wslay_event_config_set_no_buffering(websock_object->ctx, buffering);
350            
351             void _wslay_event_config_set_max_recv_msg_length(object, len)
352             HV* object
353             int len
354             CODE:
355 0           websocket_object* websock_object = get_wslay_context(object);
356 0           wslay_event_config_set_max_recv_msg_length(websock_object->ctx, len);
357            
358             void shutdown_read(object)
359             HV* object
360             CODE:
361 0           websocket_object* websock_object = get_wslay_context(object);
362 0           wslay_event_shutdown_read(websock_object->ctx);
363            
364             void shutdown_write(object)
365             HV* object
366             CODE:
367 0           websocket_object* websock_object = get_wslay_context(object);
368 0           wslay_event_shutdown_write(websock_object->ctx);
369            
370             void stop(object)
371             HV* object
372             CODE:
373 0           websocket_object* websock_object = get_wslay_context(object);
374 0           websock_object->read_stopped = 1;
375 0           websock_object->write_stopped = 1;
376 0           wait_io_event(websock_object);
377            
378             void stop_read(object)
379             HV* object
380             CODE:
381 0           websocket_object* websock_object = get_wslay_context(object);
382 0           websock_object->read_stopped = 1;
383 0           wait_io_event(websock_object);
384            
385             void stop_write(object)
386             HV* object
387             CODE:
388 1           websocket_object* websock_object = get_wslay_context(object);
389 1           websock_object->write_stopped = 1;
390 1           wait_io_event(websock_object);
391            
392             void start(object)
393             HV* object
394             CODE:
395 0           websocket_object* websock_object = get_wslay_context(object);
396 0           websock_object->read_stopped = 0;
397 0           websock_object->write_stopped = 0;
398 0           wait_io_event(websock_object);
399            
400             void start_read(object)
401             HV* object
402             CODE:
403 0           websocket_object* websock_object = get_wslay_context(object);
404 0           websock_object->read_stopped = 0;
405 0           wait_io_event(websock_object);
406            
407             void start_write(object)
408             HV* object
409             CODE:
410 1           websocket_object* websock_object = get_wslay_context(object);
411 1           websock_object->write_stopped = 0;
412 1           wait_io_event(websock_object);
413            
414             void _set_waiter(object, waiter)
415             HV* object
416             SV* waiter
417             CODE:
418 0           websocket_object* websock_object = get_wslay_context(object);
419 0 0         if (websock_object->queue_wait_cb) { SvREFCNT_dec(websock_object->queue_wait_cb); }
420 0           websock_object->queue_wait_cb = waiter;
421 0           SvREFCNT_inc(waiter);
422            
423             int queue_msg (object, data, opcode=1)
424             HV* object
425             SV* data
426             int opcode
427             CODE:
428 1           websocket_object* websock_object = get_wslay_context(object);
429             STRLEN len;
430             struct wslay_event_msg msg;
431 1 50         msg.msg = SvPV(data, len);
432 1           msg.msg_length = len;
433 1           msg.opcode = opcode;
434 1           int result = wslay_event_queue_msg(websock_object->ctx, &msg);
435 1 50         if (result == WSLAY_ERR_INVALID_ARGUMENT) { croak("Wslay queue_msg - WSLAY_ERR_INVALID_ARGUMENT"); }
436 1 50         if (result == WSLAY_ERR_NOMEM) { croak("Wslay queue_msg - WSLAY_ERR_NOMEM"); }
437 1           wait_io_event(websock_object);
438 1           RETVAL = result;
439             OUTPUT:
440             RETVAL
441            
442             int queue_msg_ex (object, data, opcode=1, rsv=WSLAY_RSV1_BIT)
443             HV* object
444             SV* data
445             int opcode
446             int rsv
447             CODE:
448 0           websocket_object* websock_object = get_wslay_context(object);
449             STRLEN len;
450             struct wslay_event_msg msg;
451 0 0         msg.msg = SvPV(data, len);
452 0           msg.msg_length = len;
453 0           msg.opcode = opcode;
454 0           int result = wslay_event_queue_msg_ex(websock_object->ctx, &msg, rsv);
455 0 0         if (result == WSLAY_ERR_INVALID_ARGUMENT) { croak("Wslay queue_msg - WSLAY_ERR_INVALID_ARGUMENT"); }
456 0 0         if (result == WSLAY_ERR_NOMEM) { croak("Wslay queue_msg - WSLAY_ERR_NOMEM"); }
457 0           wait_io_event(websock_object);
458 0           RETVAL = result;
459             OUTPUT:
460             RETVAL
461            
462             int queue_fragmented (object, cb, opcode=2)
463             HV* object
464             SV* cb
465             int opcode
466             CODE:
467 1           websocket_object* websock_object = get_wslay_context(object);
468             struct wslay_event_fragmented_msg msg;
469 1           msg.opcode = opcode;
470 1           msg.source.data = SvREFCNT_inc(cb);
471 1           msg.read_callback = fragmented_msg_callback;
472 1           int result = wslay_event_queue_fragmented_msg(websock_object->ctx, &msg);
473 1 50         if (result == WSLAY_ERR_INVALID_ARGUMENT) { croak("Wslay queue_fragmented - WSLAY_ERR_INVALID_ARGUMENT"); }
474 1 50         if (result == WSLAY_ERR_NOMEM) { croak("Wslay queue_fragmented - WSLAY_ERR_NOMEM"); }
475 1           wait_io_event(websock_object);
476 1           RETVAL = result;
477             OUTPUT:
478             RETVAL
479            
480             int queue_fragmented_ex (object, cb, opcode=2, rsv=WSLAY_RSV1_BIT)
481             HV* object
482             SV* cb
483             int opcode
484             int rsv
485             CODE:
486 0           websocket_object* websock_object = get_wslay_context(object);
487             struct wslay_event_fragmented_msg msg;
488 0           msg.opcode = opcode;
489 0           msg.source.data = SvREFCNT_inc(cb);
490 0           msg.read_callback = fragmented_msg_callback;
491 0           int result = wslay_event_queue_fragmented_msg_ex(websock_object->ctx, &msg, rsv);
492 0 0         if (result == WSLAY_ERR_INVALID_ARGUMENT) { croak("Wslay queue_fragmented - WSLAY_ERR_INVALID_ARGUMENT"); }
493 0 0         if (result == WSLAY_ERR_NOMEM) { croak("Wslay queue_fragmented - WSLAY_ERR_NOMEM"); }
494 0           wait_io_event(websock_object);
495 0           RETVAL = result;
496             OUTPUT:
497             RETVAL
498            
499             int close (object, status_code = 0, data = NULL)
500             HV* object
501             int status_code
502             SV* data
503             CODE:
504 0           websocket_object* websock_object = get_wslay_context(object);
505 0           STRLEN reason_length = 0;
506             char *reason;
507 0 0         if (data) { reason = SvPV(data, reason_length); }
    0          
508 0           int result = wslay_event_queue_close(websock_object->ctx, status_code, reason, reason_length);
509 0 0         if (result == WSLAY_ERR_INVALID_ARGUMENT) {croak("Wslay send - WSLAY_ERR_INVALID_ARGUMENT"); }
510 0 0         if (result == WSLAY_ERR_NOMEM) { croak("Wslay send - WSLAY_ERR_NOMEM"); }
511 0           wslay_event_shutdown_read(websock_object->ctx);
512 0           wait_io_event(websock_object);
513 0           RETVAL = result;
514             OUTPUT:
515             RETVAL
516            
517             int queued_count (object)
518             HV* object
519             CODE:
520 0           websocket_object* websock_object = get_wslay_context(object);
521 0           RETVAL = wslay_event_get_queued_msg_count(websock_object->ctx);
522             OUTPUT:
523             RETVAL
524            
525             void DESTROY (object)
526             HV* object
527             CODE:
528 2           websocket_object* websock_object = get_wslay_context(object);
529 2 50         if (websock_object->queue_wait_cb) { SvREFCNT_dec(websock_object->queue_wait_cb); }
530 2 50         if (websock_object->ctx) { close_connection(websock_object); }
531 2           free(websock_object);