File Coverage

Shared.xs
Criterion Covered Total %
statement 328 341 96.1
branch 325 556 58.4
condition n/a
subroutine n/a
pod n/a
total 653 897 72.8


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "queue.h"
8              
9             #ifdef HAVE_XS_PARSE_KEYWORD
10             #include "XSParseKeyword.h"
11              
12             /* ---- Keyword build functions (compile keywords to direct ENTERSUB ops) ---- */
13              
14             static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
15             (void)nargs;
16             const char *func = (const char *)hookdata;
17             OP *q_op = args[0]->op;
18             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
19             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
20             op_append_elem(OP_LIST, q_op, cvref));
21             return KEYWORD_PLUGIN_EXPR;
22             }
23              
24             static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
25             (void)nargs;
26             const char *func = (const char *)hookdata;
27             OP *q_op = args[0]->op;
28             OP *val_op = args[1]->op;
29             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
30             OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
31             arglist = op_append_elem(OP_LIST, arglist, cvref);
32             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
33             return KEYWORD_PLUGIN_EXPR;
34             }
35              
36             static const struct XSParseKeywordPieceType pieces_1expr[] = {
37             XPK_TERMEXPR, {0}
38             };
39              
40             static const struct XSParseKeywordPieceType pieces_2expr[] = {
41             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
42             };
43              
44             /* Hook definition macro: q__ */
45             #define DEFINE_Q_KW(variant, PKG, kw, nargs, builder) \
46             static const struct XSParseKeywordHooks hooks_q_##variant##_##kw = { \
47             .flags = XPK_FLAG_EXPR, \
48             .permit_hintkey = "Data::Queue::Shared::" PKG "/q_" #variant "_" #kw, \
49             .pieces = pieces_##nargs##expr, \
50             .build = builder, \
51             };
52              
53             /* Int keywords */
54             DEFINE_Q_KW(int, "Int", push, 2, build_kw_2arg)
55             DEFINE_Q_KW(int, "Int", pop, 1, build_kw_1arg)
56             DEFINE_Q_KW(int, "Int", peek, 1, build_kw_1arg)
57             DEFINE_Q_KW(int, "Int", size, 1, build_kw_1arg)
58              
59             /* Int32 keywords */
60             DEFINE_Q_KW(int32, "Int32", push, 2, build_kw_2arg)
61             DEFINE_Q_KW(int32, "Int32", pop, 1, build_kw_1arg)
62             DEFINE_Q_KW(int32, "Int32", peek, 1, build_kw_1arg)
63             DEFINE_Q_KW(int32, "Int32", size, 1, build_kw_1arg)
64              
65             /* Int16 keywords */
66             DEFINE_Q_KW(int16, "Int16", push, 2, build_kw_2arg)
67             DEFINE_Q_KW(int16, "Int16", pop, 1, build_kw_1arg)
68             DEFINE_Q_KW(int16, "Int16", peek, 1, build_kw_1arg)
69             DEFINE_Q_KW(int16, "Int16", size, 1, build_kw_1arg)
70              
71             /* Str keywords */
72             DEFINE_Q_KW(str, "Str", push, 2, build_kw_2arg)
73             DEFINE_Q_KW(str, "Str", pop, 1, build_kw_1arg)
74             DEFINE_Q_KW(str, "Str", peek, 1, build_kw_1arg)
75             DEFINE_Q_KW(str, "Str", size, 1, build_kw_1arg)
76              
77             #define REGISTER_Q_KW(variant, kw, func_name) \
78             register_xs_parse_keyword("q_" #variant "_" #kw, \
79             &hooks_q_##variant##_##kw, (void*)func_name)
80              
81             #endif /* HAVE_XS_PARSE_KEYWORD */
82              
83             #define EXTRACT_HANDLE(classname, sv) \
84             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
85             croak("Expected a %s object", classname); \
86             QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(sv))); \
87             if (!h) croak("Attempted to use a destroyed %s object", classname)
88              
89             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int
90              
91             PROTOTYPES: DISABLE
92              
93             BOOT:
94             #ifdef HAVE_XS_PARSE_KEYWORD
95             boot_xs_parse_keyword(0.40);
96             sv_setiv(get_sv("Data::Queue::Shared::HAVE_KEYWORDS", GV_ADD), 1);
97             REGISTER_Q_KW(int, push, "Data::Queue::Shared::Int::push");
98             REGISTER_Q_KW(int, pop, "Data::Queue::Shared::Int::pop");
99             REGISTER_Q_KW(int, peek, "Data::Queue::Shared::Int::peek");
100             REGISTER_Q_KW(int, size, "Data::Queue::Shared::Int::size");
101             REGISTER_Q_KW(int32, push, "Data::Queue::Shared::Int32::push");
102             REGISTER_Q_KW(int32, pop, "Data::Queue::Shared::Int32::pop");
103             REGISTER_Q_KW(int32, peek, "Data::Queue::Shared::Int32::peek");
104             REGISTER_Q_KW(int32, size, "Data::Queue::Shared::Int32::size");
105             REGISTER_Q_KW(int16, push, "Data::Queue::Shared::Int16::push");
106             REGISTER_Q_KW(int16, pop, "Data::Queue::Shared::Int16::pop");
107             REGISTER_Q_KW(int16, peek, "Data::Queue::Shared::Int16::peek");
108             REGISTER_Q_KW(int16, size, "Data::Queue::Shared::Int16::size");
109             REGISTER_Q_KW(str, push, "Data::Queue::Shared::Str::push");
110             REGISTER_Q_KW(str, pop, "Data::Queue::Shared::Str::pop");
111             REGISTER_Q_KW(str, peek, "Data::Queue::Shared::Str::peek");
112             REGISTER_Q_KW(str, size, "Data::Queue::Shared::Str::size");
113             #endif
114             /* XSMARKER: end of BOOT — INCLUDE_COMMAND regex must strip BOOT from generated sections */
115              
116             SV *
117             new(class, path, capacity)
118             const char *class
119             SV *path
120             UV capacity
121             PREINIT:
122             char errbuf[QUEUE_ERR_BUFLEN];
123             CODE:
124 37 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
125 37           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
126 37 100         if (!h) croak("Data::Queue::Shared::Int->new: %s", errbuf);
127 35           SV *obj = newSViv(PTR2IV(h));
128 35           SV *ref = newRV_noinc(obj);
129 35           sv_bless(ref, gv_stashpv(class, GV_ADD));
130 35           RETVAL = ref;
131             OUTPUT:
132             RETVAL
133              
134             SV *
135             new_memfd(class, name, capacity)
136             const char *class
137             const char *name
138             UV capacity
139             PREINIT:
140             char errbuf[QUEUE_ERR_BUFLEN];
141             CODE:
142 6           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
143 6 50         if (!h) croak("Data::Queue::Shared::Int->new_memfd: %s", errbuf);
144 6           SV *obj = newSViv(PTR2IV(h));
145 6           SV *ref = newRV_noinc(obj);
146 6           sv_bless(ref, gv_stashpv(class, GV_ADD));
147 6           RETVAL = ref;
148             OUTPUT:
149             RETVAL
150              
151             SV *
152             new_from_fd(class, fd)
153             const char *class
154             int fd
155             PREINIT:
156             char errbuf[QUEUE_ERR_BUFLEN];
157             CODE:
158 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_INT, errbuf);
159 1 50         if (!h) croak("Data::Queue::Shared::Int->new_from_fd: %s", errbuf);
160 1           SV *obj = newSViv(PTR2IV(h));
161 1           SV *ref = newRV_noinc(obj);
162 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
163 1           RETVAL = ref;
164             OUTPUT:
165             RETVAL
166              
167             IV
168             memfd(self)
169             SV *self
170             PREINIT:
171 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
172             CODE:
173 2 50         RETVAL = h->backing_fd;
174             OUTPUT:
175             RETVAL
176              
177             void
178             DESTROY(self)
179             SV *self
180             PREINIT:
181 42 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
182             CODE:
183 42           sv_setiv(SvRV(self), 0);
184 42           queue_destroy(h);
185              
186             bool
187             push(self, value)
188             SV *self
189             IV value
190             PREINIT:
191 137 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
192             CODE:
193 137 100         RETVAL = queue_int_try_push(h, (int64_t)value);
194              
195             OUTPUT:
196             RETVAL
197              
198             SV *
199             pop(self)
200             SV *self
201             PREINIT:
202 45 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
203             int64_t value;
204             CODE:
205 45 100         if (queue_int_try_pop(h, &value))
206 42           RETVAL = newSViv((IV)value);
207             else
208 3           RETVAL = &PL_sv_undef;
209             OUTPUT:
210             RETVAL
211              
212             bool
213             push_wait(self, value, ...)
214             SV *self
215             IV value
216             PREINIT:
217 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
218 2           double timeout = -1;
219             CODE:
220 2 50         if (items > 2) timeout = SvNV(ST(2));
221 2 100         RETVAL = queue_int_push_wait(h, (int64_t)value, timeout);
222              
223             OUTPUT:
224             RETVAL
225              
226             SV *
227             pop_wait(self, ...)
228             SV *self
229             PREINIT:
230 104 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
231 104           double timeout = -1;
232             int64_t value;
233             CODE:
234 104 50         if (items > 1) timeout = SvNV(ST(1));
235 104 100         if (queue_int_pop_wait(h, &value, timeout))
236 103           RETVAL = newSViv((IV)value);
237             else
238 1           RETVAL = &PL_sv_undef;
239             OUTPUT:
240             RETVAL
241              
242             UV
243             push_multi(self, ...)
244             SV *self
245             PREINIT:
246 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
247             CODE:
248 3           uint32_t count = items - 1;
249 3           uint32_t pushed = 0;
250 13 100         for (uint32_t i = 0; i < count; i++) {
251 11 100         if (!queue_int_try_push(h, (int64_t)SvIV(ST(i + 1)))) break;
252 10           pushed++;
253             }
254              
255 3 50         RETVAL = pushed;
256             OUTPUT:
257             RETVAL
258              
259             void
260             pop_multi(self, count)
261             SV *self
262             UV count
263             PREINIT:
264 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
265             int64_t value;
266             PPCODE:
267 7 100         for (UV i = 0; i < count; i++) {
268 6 100         if (!queue_int_try_pop(h, &value)) break;
269 5 50         mXPUSHi((IV)value);
270             }
271              
272             UV
273             size(self)
274             SV *self
275             PREINIT:
276 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
277             CODE:
278 8           RETVAL = (UV)queue_int_size(h);
279             OUTPUT:
280             RETVAL
281              
282             UV
283             capacity(self)
284             SV *self
285             PREINIT:
286 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
287             CODE:
288 6 50         RETVAL = h->capacity;
289             OUTPUT:
290             RETVAL
291              
292             bool
293             is_empty(self)
294             SV *self
295             PREINIT:
296 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
297             CODE:
298 5 100         RETVAL = (queue_int_size(h) == 0);
299             OUTPUT:
300             RETVAL
301              
302             bool
303             is_full(self)
304             SV *self
305             PREINIT:
306 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
307             CODE:
308 3 100         RETVAL = (queue_int_size(h) >= h->capacity);
309             OUTPUT:
310             RETVAL
311              
312             void
313             clear(self)
314             SV *self
315             PREINIT:
316 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
317             CODE:
318 5           queue_int_clear(h);
319              
320             void
321             unlink(self_or_class, ...)
322             SV *self_or_class
323             CODE:
324             const char *path;
325 8 100         if (sv_isobject(self_or_class)) {
326 7           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
327 7 50         if (!h) croak("Attempted to use a destroyed object");
328 7           path = h->path;
329             } else {
330 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Int->unlink($path)");
331 1           path = SvPV_nolen(ST(1));
332             }
333 8 100         if (!path) croak("cannot unlink anonymous or memfd queue");
334 6 50         if (unlink(path) != 0)
335 0           croak("unlink(%s): %s", path, strerror(errno));
336              
337             SV *
338             path(self)
339             SV *self
340             PREINIT:
341 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
342             CODE:
343 3 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
344             OUTPUT:
345             RETVAL
346              
347             SV *
348             stats(self)
349             SV *self
350             PREINIT:
351 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
352             CODE:
353 6           HV *hv = newHV();
354 6           QueueHeader *hdr = h->hdr;
355 6           hv_store(hv, "size", 4, newSVuv((UV)queue_int_size(h)), 0);
356 6           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
357 6           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
358 6           hv_store(hv, "push_ok", 7, newSVuv((UV)hdr->stat_push_ok), 0);
359 6           hv_store(hv, "pop_ok", 6, newSVuv((UV)hdr->stat_pop_ok), 0);
360 6           hv_store(hv, "push_full", 9, newSVuv((UV)hdr->stat_push_full), 0);
361 6           hv_store(hv, "pop_empty", 9, newSVuv((UV)hdr->stat_pop_empty), 0);
362 6           hv_store(hv, "recoveries", 10, newSVuv(hdr->stat_recoveries), 0);
363 6           hv_store(hv, "push_waiters", 12, newSVuv(hdr->push_waiters), 0);
364 6           hv_store(hv, "pop_waiters", 11, newSVuv(hdr->pop_waiters), 0);
365 6           RETVAL = newRV_noinc((SV *)hv);
366             OUTPUT:
367             RETVAL
368              
369             SV *
370             peek(self)
371             SV *self
372             PREINIT:
373 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
374             int64_t value;
375             CODE:
376 9 100         if (queue_int_peek(h, &value))
377 6           RETVAL = newSViv((IV)value);
378             else
379 3           RETVAL = &PL_sv_undef;
380             OUTPUT:
381             RETVAL
382              
383             void
384             drain(self, ...)
385             SV *self
386             PREINIT:
387 12 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
388             int64_t value;
389             uint32_t max_count;
390             PPCODE:
391 12 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
392 66 100         while (max_count-- > 0 && queue_int_try_pop(h, &value))
    100          
393 54 50         mXPUSHi((IV)value);
394              
395             void
396             pop_wait_multi(self, count, ...)
397             SV *self
398             UV count
399             PREINIT:
400 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
401 3           double timeout = -1;
402             int64_t value;
403             PPCODE:
404 3 50         if (items > 2) timeout = SvNV(ST(2));
405             /* Block until at least 1 */
406 3 100         if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
407 2 50         mXPUSHi((IV)value);
408             /* Grab up to count-1 more non-blocking */
409 8 50         for (UV i = 1; i < count; i++) {
410 8 100         if (!queue_int_try_pop(h, &value)) break;
411 6 50         mXPUSHi((IV)value);
412             }
413              
414             UV
415             push_wait_multi(self, timeout, ...)
416             SV *self
417             double timeout
418             PREINIT:
419 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
420             CODE:
421 3           uint32_t nvalues = items - 2;
422 3           RETVAL = 0;
423 13 100         for (uint32_t i = 0; i < nvalues; i++) {
424 11 100         if (!queue_int_push_wait(h, (int64_t)SvIV(ST(i + 2)), timeout)) break;
425 10           RETVAL++;
426             }
427             OUTPUT:
428             RETVAL
429              
430             void
431             sync(self)
432             SV *self
433             PREINIT:
434 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
435             CODE:
436 2 50         if (queue_sync(h) != 0)
437 0           croak("msync: %s", strerror(errno));
438              
439             IV
440             eventfd(self)
441             SV *self
442             PREINIT:
443 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
444             CODE:
445 8           RETVAL = queue_eventfd_create(h);
446 8 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
447             OUTPUT:
448             RETVAL
449              
450             void
451             eventfd_set(self, fd)
452             SV *self
453             int fd
454             PREINIT:
455 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
456             CODE:
457 1           queue_eventfd_set(h, fd);
458              
459             IV
460             fileno(self)
461             SV *self
462             PREINIT:
463 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
464             CODE:
465 3 50         RETVAL = h->notify_fd;
466             OUTPUT:
467             RETVAL
468              
469             void
470             eventfd_consume(self)
471             SV *self
472             PREINIT:
473 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
474             CODE:
475 6           queue_eventfd_consume(h);
476              
477             void
478             notify(self)
479             SV *self
480             PREINIT:
481 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
482             CODE:
483 5           queue_notify(h);
484              
485             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Str
486              
487             SV *
488             new(class, path, capacity, ...)
489             const char *class
490             SV *path
491             UV capacity
492             PREINIT:
493             char errbuf[QUEUE_ERR_BUFLEN];
494             uint64_t arena_cap;
495             CODE:
496 36 100         if (items > 3)
497 13           arena_cap = (uint64_t)SvUV(ST(3));
498             else
499 23           arena_cap = (uint64_t)capacity * 256;
500 36 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
501 36           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
502 36 100         if (!h) croak("Data::Queue::Shared::Str->new: %s", errbuf);
503 35           SV *obj = newSViv(PTR2IV(h));
504 35           SV *ref = newRV_noinc(obj);
505 35           sv_bless(ref, gv_stashpv(class, GV_ADD));
506 35           RETVAL = ref;
507             OUTPUT:
508             RETVAL
509              
510             SV *
511             new_memfd(class, name, capacity, ...)
512             const char *class
513             const char *name
514             UV capacity
515             PREINIT:
516             char errbuf[QUEUE_ERR_BUFLEN];
517             uint64_t arena_cap;
518             CODE:
519 4 100         if (items > 3)
520 1           arena_cap = (uint64_t)SvUV(ST(3));
521             else
522 3           arena_cap = (uint64_t)capacity * 256;
523 4           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
524 4 50         if (!h) croak("Data::Queue::Shared::Str->new_memfd: %s", errbuf);
525 4           SV *obj = newSViv(PTR2IV(h));
526 4           SV *ref = newRV_noinc(obj);
527 4           sv_bless(ref, gv_stashpv(class, GV_ADD));
528 4           RETVAL = ref;
529             OUTPUT:
530             RETVAL
531              
532             SV *
533             new_from_fd(class, fd)
534             const char *class
535             int fd
536             PREINIT:
537             char errbuf[QUEUE_ERR_BUFLEN];
538             CODE:
539 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_STR, errbuf);
540 1 50         if (!h) croak("Data::Queue::Shared::Str->new_from_fd: %s", errbuf);
541 1           SV *obj = newSViv(PTR2IV(h));
542 1           SV *ref = newRV_noinc(obj);
543 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
544 1           RETVAL = ref;
545             OUTPUT:
546             RETVAL
547              
548             IV
549             memfd(self)
550             SV *self
551             PREINIT:
552 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
553             CODE:
554 3 50         RETVAL = h->backing_fd;
555             OUTPUT:
556             RETVAL
557              
558             void
559             DESTROY(self)
560             SV *self
561             PREINIT:
562 40 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
563             CODE:
564 40           sv_setiv(SvRV(self), 0);
565 40           queue_destroy(h);
566              
567             bool
568             push(self, value)
569             SV *self
570             SV *value
571             PREINIT:
572 167 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
573             STRLEN len;
574             CODE:
575 167           const char *str = SvPV(value, len);
576 167           bool utf8 = SvUTF8(value) ? true : false;
577 167           int r = queue_str_try_push(h, str, (uint32_t)len, utf8);
578 167 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
579 167 100         RETVAL = (r == 1);
580              
581             OUTPUT:
582             RETVAL
583              
584             SV *
585             pop(self)
586             SV *self
587             PREINIT:
588 81 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
589             const char *str;
590             uint32_t len;
591             bool utf8;
592             CODE:
593 81           int r = queue_str_try_pop(h, &str, &len, &utf8);
594 81 100         if (r == 1) {
595 78           RETVAL = newSVpvn(str, len);
596 78 100         if (utf8) SvUTF8_on(RETVAL);
597 3 50         } else if (r == -1) {
598 0           croak("Data::Queue::Shared::Str: out of memory");
599             } else {
600 3           RETVAL = &PL_sv_undef;
601             }
602             OUTPUT:
603             RETVAL
604              
605             bool
606             push_wait(self, value, ...)
607             SV *self
608             SV *value
609             PREINIT:
610 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
611 2           double timeout = -1;
612             STRLEN len;
613             CODE:
614 2 50         if (items > 2) timeout = SvNV(ST(2));
615 2           const char *str = SvPV(value, len);
616 2           bool utf8 = SvUTF8(value) ? true : false;
617 2           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
618 2 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
619 2 100         RETVAL = (r == 1);
620              
621             OUTPUT:
622             RETVAL
623              
624             SV *
625             pop_wait(self, ...)
626             SV *self
627             PREINIT:
628 54 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
629 54           double timeout = -1;
630             const char *str;
631             uint32_t len;
632             bool utf8;
633             CODE:
634 54 50         if (items > 1) timeout = SvNV(ST(1));
635 54           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
636 54 100         if (r == 1) {
637 53           RETVAL = newSVpvn(str, len);
638 53 50         if (utf8) SvUTF8_on(RETVAL);
639 1 50         } else if (r == -1) {
640 0           croak("Data::Queue::Shared::Str: out of memory");
641             } else {
642 1           RETVAL = &PL_sv_undef;
643             }
644             OUTPUT:
645             RETVAL
646              
647             UV
648             push_multi(self, ...)
649             SV *self
650             PREINIT:
651 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
652             CODE:
653 2           uint32_t count = items - 1;
654 2           uint32_t pushed = 0;
655 2           queue_mutex_lock(h->hdr);
656 7 100         for (uint32_t i = 0; i < count; i++) {
657 6           SV *sv = ST(i + 1);
658             STRLEN slen;
659 6           const char *str = SvPV(sv, slen);
660 6           bool utf8 = SvUTF8(sv) ? true : false;
661 6           int r = queue_str_push_locked(h, str, (uint32_t)slen, utf8);
662 6 50         if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
663 6 100         if (r != 1) break;
664 5           pushed++;
665             }
666 2           queue_mutex_unlock(h->hdr);
667 2 50         if (pushed) queue_wake_consumers(h->hdr);
668 2 50         RETVAL = pushed;
669             OUTPUT:
670             RETVAL
671              
672             void
673             pop_multi(self, count)
674             SV *self
675             UV count
676             PREINIT:
677 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
678             const char *str;
679             uint32_t len;
680             bool utf8;
681             PPCODE:
682 1           int last_r = 0;
683 1           queue_mutex_lock(h->hdr);
684 3 100         for (UV i = 0; i < count; i++) {
685 2           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
686 2 50         if (last_r <= 0) break;
687 2           SV *sv = newSVpvn(str, len);
688 2 50         if (utf8) SvUTF8_on(sv);
689 2 50         mXPUSHs(sv);
690             }
691 1           queue_mutex_unlock(h->hdr);
692 1           queue_wake_producers(h->hdr);
693 1 50         if (last_r == -1) croak("Data::Queue::Shared::Str: out of memory");
694              
695             UV
696             size(self)
697             SV *self
698             PREINIT:
699 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
700             CODE:
701 6           RETVAL = (UV)queue_str_size(h);
702             OUTPUT:
703             RETVAL
704              
705             UV
706             capacity(self)
707             SV *self
708             PREINIT:
709 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
710             CODE:
711 1 50         RETVAL = h->capacity;
712             OUTPUT:
713             RETVAL
714              
715             bool
716             is_empty(self)
717             SV *self
718             PREINIT:
719 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
720             CODE:
721 8 100         RETVAL = (queue_str_size(h) == 0);
722             OUTPUT:
723             RETVAL
724              
725             bool
726             is_full(self)
727             SV *self
728             PREINIT:
729 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
730             CODE:
731 3 100         RETVAL = (queue_str_size(h) >= h->capacity);
732             OUTPUT:
733             RETVAL
734              
735             void
736             clear(self)
737             SV *self
738             PREINIT:
739 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
740             CODE:
741 7           queue_str_clear(h);
742              
743             void
744             unlink(self_or_class, ...)
745             SV *self_or_class
746             CODE:
747             const char *path;
748 6 100         if (sv_isobject(self_or_class)) {
749 5           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
750 5 50         if (!h) croak("Attempted to use a destroyed object");
751 5           path = h->path;
752             } else {
753 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Str->unlink($path)");
754 1           path = SvPV_nolen(ST(1));
755             }
756 6 50         if (!path) croak("cannot unlink anonymous or memfd queue");
757 6 50         if (unlink(path) != 0)
758 0           croak("unlink(%s): %s", path, strerror(errno));
759              
760             SV *
761             path(self)
762             SV *self
763             PREINIT:
764 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
765             CODE:
766 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
767             OUTPUT:
768             RETVAL
769              
770             SV *
771             stats(self)
772             SV *self
773             PREINIT:
774 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
775             CODE:
776 9           HV *hv = newHV();
777 9           QueueHeader *hdr = h->hdr;
778 9           hv_store(hv, "size", 4, newSVuv((UV)queue_str_size(h)), 0);
779 9           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
780 9           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
781 9           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
782 9           hv_store(hv, "arena_used", 10, newSVuv(hdr->arena_used), 0);
783 9           hv_store(hv, "push_ok", 7, newSVuv((UV)hdr->stat_push_ok), 0);
784 9           hv_store(hv, "pop_ok", 6, newSVuv((UV)hdr->stat_pop_ok), 0);
785 9           hv_store(hv, "push_full", 9, newSVuv((UV)hdr->stat_push_full), 0);
786 9           hv_store(hv, "pop_empty", 9, newSVuv((UV)hdr->stat_pop_empty), 0);
787 9           hv_store(hv, "recoveries", 10, newSVuv(hdr->stat_recoveries), 0);
788 9           hv_store(hv, "push_waiters", 12, newSVuv(hdr->push_waiters), 0);
789 9           hv_store(hv, "pop_waiters", 11, newSVuv(hdr->pop_waiters), 0);
790 9           RETVAL = newRV_noinc((SV *)hv);
791             OUTPUT:
792             RETVAL
793              
794             SV *
795             peek(self)
796             SV *self
797             PREINIT:
798 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
799             const char *str;
800             uint32_t len;
801             bool utf8;
802             CODE:
803 7           int r = queue_str_peek(h, &str, &len, &utf8);
804 7 100         if (r == 1) {
805 6           RETVAL = newSVpvn(str, len);
806 6 100         if (utf8) SvUTF8_on(RETVAL);
807 1 50         } else if (r == -1) {
808 0           croak("Data::Queue::Shared::Str: out of memory");
809             } else {
810 1           RETVAL = &PL_sv_undef;
811             }
812             OUTPUT:
813             RETVAL
814              
815             bool
816             push_front(self, value)
817             SV *self
818             SV *value
819             PREINIT:
820 14 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
821             STRLEN len;
822             CODE:
823 14           const char *str = SvPV(value, len);
824 14           bool utf8 = SvUTF8(value) ? true : false;
825 14           int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
826 14 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
827 14 100         RETVAL = (r == 1);
828             OUTPUT:
829             RETVAL
830              
831             bool
832             push_front_wait(self, value, ...)
833             SV *self
834             SV *value
835             PREINIT:
836 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
837 3           double timeout = -1;
838             STRLEN len;
839             CODE:
840 3 100         if (items > 2) timeout = SvNV(ST(2));
841 3           const char *str = SvPV(value, len);
842 3           bool utf8 = SvUTF8(value) ? true : false;
843 3           int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
844 3 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
845 3 100         RETVAL = (r == 1);
846             OUTPUT:
847             RETVAL
848              
849             void
850             drain(self, ...)
851             SV *self
852             PREINIT:
853 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
854             const char *str;
855             uint32_t len;
856             bool utf8;
857             uint32_t max_count;
858             PPCODE:
859 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
860 8           int last_r = 0;
861 8           queue_mutex_lock(h->hdr);
862 30 100         while (max_count-- > 0) {
863 29           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
864 29 100         if (last_r <= 0) break;
865 22           SV *sv = newSVpvn(str, len);
866 22 50         if (utf8) SvUTF8_on(sv);
867 22 50         mXPUSHs(sv);
868             }
869 8           queue_mutex_unlock(h->hdr);
870 8           queue_wake_producers(h->hdr);
871 8 50         if (last_r == -1) croak("Data::Queue::Shared::Str: out of memory");
872              
873             void
874             pop_wait_multi(self, count, ...)
875             SV *self
876             UV count
877             PREINIT:
878 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
879 3           double timeout = -1;
880             const char *str;
881             uint32_t len;
882             bool utf8;
883             PPCODE:
884 3 50         if (items > 2) timeout = SvNV(ST(2));
885             /* Block until at least 1 */
886             {
887 3           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
888 3 50         if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
889 3 100         if (r != 1) XSRETURN(0);
890 2           SV *sv = newSVpvn(str, len);
891 2 50         if (utf8) SvUTF8_on(sv);
892 2 50         mXPUSHs(sv);
893             }
894             /* Grab up to count-1 more non-blocking */
895 6 50         for (UV i = 1; i < count; i++) {
896 6           int r = queue_str_try_pop(h, &str, &len, &utf8);
897 6 100         if (r <= 0) break;
898 4           SV *sv = newSVpvn(str, len);
899 4 50         if (utf8) SvUTF8_on(sv);
900 4 50         mXPUSHs(sv);
901             }
902              
903             UV
904             push_wait_multi(self, timeout, ...)
905             SV *self
906             double timeout
907             PREINIT:
908 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
909             CODE:
910 3           uint32_t nvalues = items - 2;
911 3           RETVAL = 0;
912 8 100         for (uint32_t i = 0; i < nvalues; i++) {
913 6           SV *sv = ST(i + 2);
914             STRLEN len;
915 6           const char *str = SvPV(sv, len);
916 6           bool utf8 = SvUTF8(sv) ? true : false;
917 6           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
918 6 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
919 6 100         if (r != 1) break;
920 5           RETVAL++;
921             }
922             OUTPUT:
923             RETVAL
924              
925             SV *
926             pop_back(self)
927             SV *self
928             PREINIT:
929 19 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
930             const char *str;
931             uint32_t len;
932             bool utf8;
933             CODE:
934 19           int r = queue_str_pop_back(h, &str, &len, &utf8);
935 19 100         if (r == 1) {
936 17           RETVAL = newSVpvn(str, len);
937 17 100         if (utf8) SvUTF8_on(RETVAL);
938 2 50         } else if (r == -1) {
939 0           croak("Data::Queue::Shared::Str: out of memory");
940             } else {
941 2           RETVAL = &PL_sv_undef;
942             }
943             OUTPUT:
944             RETVAL
945              
946             SV *
947             pop_back_wait(self, ...)
948             SV *self
949             PREINIT:
950 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
951 3           double timeout = -1;
952             const char *str;
953             uint32_t len;
954             bool utf8;
955             CODE:
956 3 50         if (items > 1) timeout = SvNV(ST(1));
957 3           int r = queue_str_pop_back_wait(h, &str, &len, &utf8, timeout);
958 3 100         if (r == 1) {
959 2           RETVAL = newSVpvn(str, len);
960 2 50         if (utf8) SvUTF8_on(RETVAL);
961 1 50         } else if (r == -1) {
962 0           croak("Data::Queue::Shared::Str: out of memory");
963             } else {
964 1           RETVAL = &PL_sv_undef;
965             }
966             OUTPUT:
967             RETVAL
968              
969             void
970             sync(self)
971             SV *self
972             PREINIT:
973 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
974             CODE:
975 2 50         if (queue_sync(h) != 0)
976 0           croak("msync: %s", strerror(errno));
977              
978             IV
979             eventfd(self)
980             SV *self
981             PREINIT:
982 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
983             CODE:
984 2           RETVAL = queue_eventfd_create(h);
985 2 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
986             OUTPUT:
987             RETVAL
988              
989             void
990             eventfd_set(self, fd)
991             SV *self
992             int fd
993             PREINIT:
994 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
995             CODE:
996 0           queue_eventfd_set(h, fd);
997              
998             IV
999             fileno(self)
1000             SV *self
1001             PREINIT:
1002 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
1003             CODE:
1004 0 0         RETVAL = h->notify_fd;
1005             OUTPUT:
1006             RETVAL
1007              
1008             void
1009             eventfd_consume(self)
1010             SV *self
1011             PREINIT:
1012 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1013             CODE:
1014 3           queue_eventfd_consume(h);
1015              
1016             void
1017             notify(self)
1018             SV *self
1019             PREINIT:
1020 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1021             CODE:
1022 2           queue_notify(h);
1023              
1024             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int32
1025              
1026             PROTOTYPES: DISABLE
1027              
1028             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int32/g; \$int_section =~ s/queue_int_/queue_int32_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT32/g; \$int_section =~ s/int64_t/int32_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"
1029              
1030             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int16
1031              
1032             PROTOTYPES: DISABLE
1033              
1034             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int16/g; \$int_section =~ s/queue_int_/queue_int16_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT16/g; \$int_section =~ s/int64_t/int16_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"