File Coverage

Shared.xs
Criterion Covered Total %
statement 350 363 96.4
branch 348 594 58.5
condition n/a
subroutine n/a
pod n/a
total 698 957 72.9


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "queue.h"
8              
9             #ifdef HAVE_XS_PARSE_KEYWORD
10             #include "XSParseKeyword.h"
11              
12             /* ---- Keyword build functions (compile keywords to direct ENTERSUB ops) ---- */
13              
14             static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
15             (void)nargs;
16             const char *func = (const char *)hookdata;
17             OP *q_op = args[0]->op;
18             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
19             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
20             op_append_elem(OP_LIST, q_op, cvref));
21             return KEYWORD_PLUGIN_EXPR;
22             }
23              
24             static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
25             (void)nargs;
26             const char *func = (const char *)hookdata;
27             OP *q_op = args[0]->op;
28             OP *val_op = args[1]->op;
29             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
30             OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
31             arglist = op_append_elem(OP_LIST, arglist, cvref);
32             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
33             return KEYWORD_PLUGIN_EXPR;
34             }
35              
36             static const struct XSParseKeywordPieceType pieces_1expr[] = {
37             XPK_TERMEXPR, {0}
38             };
39              
40             static const struct XSParseKeywordPieceType pieces_2expr[] = {
41             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
42             };
43              
44             /* Hook definition macro: q__ */
45             #define DEFINE_Q_KW(variant, PKG, kw, nargs, builder) \
46             static const struct XSParseKeywordHooks hooks_q_##variant##_##kw = { \
47             .flags = XPK_FLAG_EXPR, \
48             .permit_hintkey = "Data::Queue::Shared::" PKG "/q_" #variant "_" #kw, \
49             .pieces = pieces_##nargs##expr, \
50             .build = builder, \
51             };
52              
53             /* Int keywords */
54             DEFINE_Q_KW(int, "Int", push, 2, build_kw_2arg)
55             DEFINE_Q_KW(int, "Int", pop, 1, build_kw_1arg)
56             DEFINE_Q_KW(int, "Int", peek, 1, build_kw_1arg)
57             DEFINE_Q_KW(int, "Int", size, 1, build_kw_1arg)
58              
59             /* Int32 keywords */
60             DEFINE_Q_KW(int32, "Int32", push, 2, build_kw_2arg)
61             DEFINE_Q_KW(int32, "Int32", pop, 1, build_kw_1arg)
62             DEFINE_Q_KW(int32, "Int32", peek, 1, build_kw_1arg)
63             DEFINE_Q_KW(int32, "Int32", size, 1, build_kw_1arg)
64              
65             /* Int16 keywords */
66             DEFINE_Q_KW(int16, "Int16", push, 2, build_kw_2arg)
67             DEFINE_Q_KW(int16, "Int16", pop, 1, build_kw_1arg)
68             DEFINE_Q_KW(int16, "Int16", peek, 1, build_kw_1arg)
69             DEFINE_Q_KW(int16, "Int16", size, 1, build_kw_1arg)
70              
71             /* Str keywords */
72             DEFINE_Q_KW(str, "Str", push, 2, build_kw_2arg)
73             DEFINE_Q_KW(str, "Str", pop, 1, build_kw_1arg)
74             DEFINE_Q_KW(str, "Str", peek, 1, build_kw_1arg)
75             DEFINE_Q_KW(str, "Str", size, 1, build_kw_1arg)
76              
77             #define REGISTER_Q_KW(variant, kw, func_name) \
78             register_xs_parse_keyword("q_" #variant "_" #kw, \
79             &hooks_q_##variant##_##kw, (void*)func_name)
80              
81             #endif /* HAVE_XS_PARSE_KEYWORD */
82              
83             #define EXTRACT_HANDLE(classname, sv) \
84             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
85             croak("Expected a %s object", classname); \
86             QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(sv))); \
87             if (!h) croak("Attempted to use a destroyed %s object", classname)
88              
89             #define MAKE_OBJ(class, ptr) \
90             SV *ref = newRV_noinc(newSViv(PTR2IV(ptr))); \
91             sv_bless(ref, gv_stashpv(class, GV_ADD)); \
92             RETVAL = ref
93              
94             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int
95              
96             PROTOTYPES: DISABLE
97              
98             BOOT:
99             #ifdef HAVE_XS_PARSE_KEYWORD
100             boot_xs_parse_keyword(0.40);
101             sv_setiv(get_sv("Data::Queue::Shared::HAVE_KEYWORDS", GV_ADD), 1);
102             REGISTER_Q_KW(int, push, "Data::Queue::Shared::Int::push");
103             REGISTER_Q_KW(int, pop, "Data::Queue::Shared::Int::pop");
104             REGISTER_Q_KW(int, peek, "Data::Queue::Shared::Int::peek");
105             REGISTER_Q_KW(int, size, "Data::Queue::Shared::Int::size");
106             REGISTER_Q_KW(int32, push, "Data::Queue::Shared::Int32::push");
107             REGISTER_Q_KW(int32, pop, "Data::Queue::Shared::Int32::pop");
108             REGISTER_Q_KW(int32, peek, "Data::Queue::Shared::Int32::peek");
109             REGISTER_Q_KW(int32, size, "Data::Queue::Shared::Int32::size");
110             REGISTER_Q_KW(int16, push, "Data::Queue::Shared::Int16::push");
111             REGISTER_Q_KW(int16, pop, "Data::Queue::Shared::Int16::pop");
112             REGISTER_Q_KW(int16, peek, "Data::Queue::Shared::Int16::peek");
113             REGISTER_Q_KW(int16, size, "Data::Queue::Shared::Int16::size");
114             REGISTER_Q_KW(str, push, "Data::Queue::Shared::Str::push");
115             REGISTER_Q_KW(str, pop, "Data::Queue::Shared::Str::pop");
116             REGISTER_Q_KW(str, peek, "Data::Queue::Shared::Str::peek");
117             REGISTER_Q_KW(str, size, "Data::Queue::Shared::Str::size");
118             #endif
119             /* XSMARKER: end of BOOT — INCLUDE_COMMAND regex must strip BOOT from generated sections */
120              
121             SV *
122             new(class, path, capacity)
123             const char *class
124             SV *path
125             UV capacity
126             PREINIT:
127             char errbuf[QUEUE_ERR_BUFLEN];
128             CODE:
129 39 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
130 39           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
131 39 100         if (!h) croak("Data::Queue::Shared::Int->new: %s", errbuf);
132 37           MAKE_OBJ(class, h);
133             OUTPUT:
134             RETVAL
135              
136             SV *
137             new_memfd(class, name, capacity)
138             const char *class
139             const char *name
140             UV capacity
141             PREINIT:
142             char errbuf[QUEUE_ERR_BUFLEN];
143             CODE:
144 6           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
145 6 50         if (!h) croak("Data::Queue::Shared::Int->new_memfd: %s", errbuf);
146 6           MAKE_OBJ(class, h);
147             OUTPUT:
148             RETVAL
149              
150             SV *
151             new_from_fd(class, fd)
152             const char *class
153             int fd
154             PREINIT:
155             char errbuf[QUEUE_ERR_BUFLEN];
156             CODE:
157 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_INT, errbuf);
158 1 50         if (!h) croak("Data::Queue::Shared::Int->new_from_fd: %s", errbuf);
159 1           MAKE_OBJ(class, h);
160             OUTPUT:
161             RETVAL
162              
163             IV
164             memfd(self)
165             SV *self
166             PREINIT:
167 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
168             CODE:
169 2 50         RETVAL = h->backing_fd;
170             OUTPUT:
171             RETVAL
172              
173             void
174             DESTROY(self)
175             SV *self
176             CODE:
177 44 50         if (!SvROK(self)) return;
178 44           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self)));
179 44 50         if (!h) return;
180 44           sv_setiv(SvRV(self), 0);
181 44           queue_destroy(h);
182              
183             bool
184             push(self, value)
185             SV *self
186             IV value
187             PREINIT:
188 192 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
189             CODE:
190 192 100         RETVAL = queue_int_try_push(h, (int64_t)value);
191             OUTPUT:
192             RETVAL
193              
194             SV *
195             pop(self)
196             SV *self
197             PREINIT:
198 100 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
199             int64_t value;
200             CODE:
201 100 100         if (queue_int_try_pop(h, &value))
202 96           RETVAL = newSViv((IV)value);
203             else
204 4           RETVAL = &PL_sv_undef;
205             OUTPUT:
206             RETVAL
207              
208             bool
209             push_wait(self, value, ...)
210             SV *self
211             IV value
212             PREINIT:
213 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
214 2           double timeout = -1;
215             CODE:
216 2 50         if (items > 2) timeout = SvNV(ST(2));
217 2 100         RETVAL = queue_int_push_wait(h, (int64_t)value, timeout);
218             OUTPUT:
219             RETVAL
220              
221             SV *
222             pop_wait(self, ...)
223             SV *self
224             PREINIT:
225 104 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
226 104           double timeout = -1;
227             int64_t value;
228             CODE:
229 104 50         if (items > 1) timeout = SvNV(ST(1));
230 104 100         if (queue_int_pop_wait(h, &value, timeout))
231 103           RETVAL = newSViv((IV)value);
232             else
233 1           RETVAL = &PL_sv_undef;
234             OUTPUT:
235             RETVAL
236              
237             UV
238             push_multi(self, ...)
239             SV *self
240             PREINIT:
241 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
242             CODE:
243 3           uint32_t count = items - 1;
244 3           uint32_t pushed = 0;
245 13 100         for (uint32_t i = 0; i < count; i++) {
246 11 100         if (!queue_int_try_push(h, (int64_t)SvIV(ST(i + 1)))) break;
247 10           pushed++;
248             }
249              
250 3 50         RETVAL = pushed;
251             OUTPUT:
252             RETVAL
253              
254             void
255             pop_multi(self, count)
256             SV *self
257             UV count
258             PREINIT:
259 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
260             int64_t value;
261             PPCODE:
262 7 100         for (UV i = 0; i < count; i++) {
263 6 100         if (!queue_int_try_pop(h, &value)) break;
264 5 50         mXPUSHi((IV)value);
265             }
266              
267             UV
268             size(self)
269             SV *self
270             PREINIT:
271 14 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
272             CODE:
273 14           RETVAL = (UV)queue_int_size(h);
274             OUTPUT:
275             RETVAL
276              
277             UV
278             capacity(self)
279             SV *self
280             PREINIT:
281 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
282             CODE:
283 7 50         RETVAL = h->capacity;
284             OUTPUT:
285             RETVAL
286              
287             bool
288             is_empty(self)
289             SV *self
290             PREINIT:
291 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
292             CODE:
293 5 100         RETVAL = (queue_int_size(h) == 0);
294             OUTPUT:
295             RETVAL
296              
297             bool
298             is_full(self)
299             SV *self
300             PREINIT:
301 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
302             CODE:
303 3 100         RETVAL = (queue_int_size(h) >= h->capacity);
304             OUTPUT:
305             RETVAL
306              
307             void
308             clear(self)
309             SV *self
310             PREINIT:
311 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
312             CODE:
313 5           queue_int_clear(h);
314              
315             void
316             unlink(self_or_class, ...)
317             SV *self_or_class
318             CODE:
319             const char *path;
320 8 100         if (sv_isobject(self_or_class)) {
321 7           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
322 7 50         if (!h) croak("Attempted to use a destroyed object");
323 7           path = h->path;
324             } else {
325 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Int->unlink($path)");
326 1           path = SvPV_nolen(ST(1));
327             }
328 8 100         if (!path) croak("cannot unlink anonymous or memfd queue");
329 6 50         if (unlink(path) != 0)
330 0           croak("unlink(%s): %s", path, strerror(errno));
331              
332             SV *
333             path(self)
334             SV *self
335             PREINIT:
336 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
337             CODE:
338 3 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
339             OUTPUT:
340             RETVAL
341              
342             SV *
343             stats(self)
344             SV *self
345             PREINIT:
346 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
347             CODE:
348 6           HV *hv = newHV();
349 6           QueueHeader *hdr = h->hdr;
350 6           hv_store(hv, "size", 4, newSVuv((UV)queue_int_size(h)), 0);
351 6           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
352 6           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
353 6           hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
354 6           hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
355 6           hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
356 6           hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
357 6           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
358 6           hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
359 6           hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
360 6           RETVAL = newRV_noinc((SV *)hv);
361             OUTPUT:
362             RETVAL
363              
364             SV *
365             peek(self)
366             SV *self
367             PREINIT:
368 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
369             int64_t value;
370             CODE:
371 9 100         if (queue_int_peek(h, &value))
372 6           RETVAL = newSViv((IV)value);
373             else
374 3           RETVAL = &PL_sv_undef;
375             OUTPUT:
376             RETVAL
377              
378             void
379             drain(self, ...)
380             SV *self
381             PREINIT:
382 12 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
383             int64_t value;
384             uint32_t max_count;
385             PPCODE:
386 12 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
387 66 100         while (max_count-- > 0 && queue_int_try_pop(h, &value))
    100          
388 54 50         mXPUSHi((IV)value);
389              
390             void
391             pop_wait_multi(self, count, ...)
392             SV *self
393             UV count
394             PREINIT:
395 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
396 3           double timeout = -1;
397             int64_t value;
398             PPCODE:
399 3 50         if (items > 2) timeout = SvNV(ST(2));
400             /* Block until at least 1 */
401 3 100         if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
402 2 50         mXPUSHi((IV)value);
403             /* Grab up to count-1 more non-blocking */
404 8 50         for (UV i = 1; i < count; i++) {
405 8 100         if (!queue_int_try_pop(h, &value)) break;
406 6 50         mXPUSHi((IV)value);
407             }
408              
409             UV
410             push_wait_multi(self, timeout, ...)
411             SV *self
412             double timeout
413             PREINIT:
414 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
415             CODE:
416 3           uint32_t nvalues = items - 2;
417 3           RETVAL = 0;
418 13 100         for (uint32_t i = 0; i < nvalues; i++) {
419 11 100         if (!queue_int_push_wait(h, (int64_t)SvIV(ST(i + 2)), timeout)) break;
420 10           RETVAL++;
421             }
422             OUTPUT:
423             RETVAL
424              
425             void
426             sync(self)
427             SV *self
428             PREINIT:
429 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
430             CODE:
431 2 50         if (queue_sync(h) != 0)
432 0           croak("msync: %s", strerror(errno));
433              
434             IV
435             eventfd(self)
436             SV *self
437             PREINIT:
438 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
439             CODE:
440 8           RETVAL = queue_eventfd_create(h);
441 8 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
442             OUTPUT:
443             RETVAL
444              
445             void
446             eventfd_set(self, fd)
447             SV *self
448             int fd
449             PREINIT:
450 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
451             CODE:
452 1           queue_eventfd_set(h, fd);
453              
454             IV
455             fileno(self)
456             SV *self
457             PREINIT:
458 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
459             CODE:
460 3 50         RETVAL = h->notify_fd;
461             OUTPUT:
462             RETVAL
463              
464             SV *
465             eventfd_consume(self)
466             SV *self
467             PREINIT:
468 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
469             CODE:
470 6           int64_t v = queue_eventfd_consume(h);
471 6 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
472             OUTPUT:
473             RETVAL
474              
475             void
476             notify(self)
477             SV *self
478             PREINIT:
479 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
480             CODE:
481 5           queue_notify(h);
482              
483             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Str
484              
485             SV *
486             new(class, path, capacity, ...)
487             const char *class
488             SV *path
489             UV capacity
490             PREINIT:
491             char errbuf[QUEUE_ERR_BUFLEN];
492             uint64_t arena_cap;
493             CODE:
494 37 100         if (items > 3)
495 14           arena_cap = (uint64_t)SvUV(ST(3));
496             else
497 23           arena_cap = (uint64_t)capacity * 256;
498 37 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
499 37           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
500 37 100         if (!h) croak("Data::Queue::Shared::Str->new: %s", errbuf);
501 36           MAKE_OBJ(class, h);
502             OUTPUT:
503             RETVAL
504              
505             SV *
506             new_memfd(class, name, capacity, ...)
507             const char *class
508             const char *name
509             UV capacity
510             PREINIT:
511             char errbuf[QUEUE_ERR_BUFLEN];
512             uint64_t arena_cap;
513             CODE:
514 4 100         if (items > 3)
515 1           arena_cap = (uint64_t)SvUV(ST(3));
516             else
517 3           arena_cap = (uint64_t)capacity * 256;
518 4           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
519 4 50         if (!h) croak("Data::Queue::Shared::Str->new_memfd: %s", errbuf);
520 4           MAKE_OBJ(class, h);
521             OUTPUT:
522             RETVAL
523              
524             SV *
525             new_from_fd(class, fd)
526             const char *class
527             int fd
528             PREINIT:
529             char errbuf[QUEUE_ERR_BUFLEN];
530             CODE:
531 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_STR, errbuf);
532 1 50         if (!h) croak("Data::Queue::Shared::Str->new_from_fd: %s", errbuf);
533 1           MAKE_OBJ(class, h);
534             OUTPUT:
535             RETVAL
536              
537             IV
538             memfd(self)
539             SV *self
540             PREINIT:
541 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
542             CODE:
543 3 50         RETVAL = h->backing_fd;
544             OUTPUT:
545             RETVAL
546              
547             void
548             DESTROY(self)
549             SV *self
550             CODE:
551 41 50         if (!SvROK(self)) return;
552 41           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self)));
553 41 50         if (!h) return;
554 41           sv_setiv(SvRV(self), 0);
555 41           queue_destroy(h);
556              
557             bool
558             push(self, value)
559             SV *self
560             SV *value
561             PREINIT:
562 170 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
563             STRLEN len;
564             CODE:
565 170           const char *str = SvPV(value, len);
566 170           bool utf8 = SvUTF8(value) ? true : false;
567 170           int r = queue_str_try_push(h, str, (uint32_t)len, utf8);
568 170 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
569 170 100         RETVAL = (r == 1);
570             OUTPUT:
571             RETVAL
572              
573             SV *
574             pop(self)
575             SV *self
576             PREINIT:
577 84 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
578             const char *str;
579             uint32_t len;
580             bool utf8;
581             CODE:
582 84           int r = queue_str_try_pop(h, &str, &len, &utf8);
583 84 100         if (r == 1) {
584 81           RETVAL = newSVpvn(str, len);
585 81 100         if (utf8) SvUTF8_on(RETVAL);
586 3 50         } else if (r == -1) {
587 0           croak("Data::Queue::Shared::Str: out of memory");
588             } else {
589 3           RETVAL = &PL_sv_undef;
590             }
591             OUTPUT:
592             RETVAL
593              
594             bool
595             push_wait(self, value, ...)
596             SV *self
597             SV *value
598             PREINIT:
599 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
600 2           double timeout = -1;
601             STRLEN len;
602             CODE:
603 2 50         if (items > 2) timeout = SvNV(ST(2));
604 2           const char *str = SvPV(value, len);
605 2           bool utf8 = SvUTF8(value) ? true : false;
606 2           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
607 2 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
608 2 100         RETVAL = (r == 1);
609             OUTPUT:
610             RETVAL
611              
612             SV *
613             pop_wait(self, ...)
614             SV *self
615             PREINIT:
616 54 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
617 54           double timeout = -1;
618             const char *str;
619             uint32_t len;
620             bool utf8;
621             CODE:
622 54 50         if (items > 1) timeout = SvNV(ST(1));
623 54           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
624 54 100         if (r == 1) {
625 53           RETVAL = newSVpvn(str, len);
626 53 50         if (utf8) SvUTF8_on(RETVAL);
627 1 50         } else if (r == -1) {
628 0           croak("Data::Queue::Shared::Str: out of memory");
629             } else {
630 1           RETVAL = &PL_sv_undef;
631             }
632             OUTPUT:
633             RETVAL
634              
635             UV
636             push_multi(self, ...)
637             SV *self
638             PREINIT:
639 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
640             CODE:
641 2           uint32_t count = items - 1;
642 2           uint32_t pushed = 0;
643 2           queue_mutex_lock(h->hdr);
644 7 100         for (uint32_t i = 0; i < count; i++) {
645 6           SV *sv = ST(i + 1);
646             STRLEN slen;
647 6           const char *str = SvPV(sv, slen);
648 6           bool utf8 = SvUTF8(sv) ? true : false;
649 6           int r = queue_str_push_locked(h, str, (uint32_t)slen, utf8);
650 6 50         if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
651 6 100         if (r != 1) break;
652 5           pushed++;
653             }
654 2           queue_mutex_unlock(h->hdr);
655 2 50         if (pushed) queue_wake_consumers_n(h->hdr, pushed);
656 2 50         RETVAL = pushed;
657             OUTPUT:
658             RETVAL
659              
660             void
661             pop_multi(self, count)
662             SV *self
663             UV count
664             PREINIT:
665 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
666             const char *str;
667             uint32_t len;
668             bool utf8;
669             PPCODE:
670             /* Hoist Perl SV construction out of the process-shared mutex:
671             * newSVpvn can longjmp on OOM and deadlock peers on the futex. */
672 1           struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
673 1           UV n = 0;
674 1           int last_r = 0;
675 1           int oom = 0;
676             /* Cap count at capacity: the queue can't hold more than capacity items,
677             * so a single pop_multi can't return more than that. This also prevents
678             * size_t overflow in the items_buf allocation for adversarial inputs. */
679 1 50         if (count > h->capacity) count = h->capacity;
680 1 50         if (count > 0) {
681 1           items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
682 1 50         if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
683             }
684 1           queue_mutex_lock(h->hdr);
685 3 100         for (UV i = 0; i < count; i++) {
686 2           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
687 2 50         if (last_r <= 0) break;
688 2 50         char *c = (char *)malloc(len ? len : 1);
689 2 50         if (!c) { oom = 1; break; }
690 2 50         if (len) memcpy(c, str, len);
691 2           items_buf[n].buf = c;
692 2           items_buf[n].len = len;
693 2           items_buf[n].utf8 = utf8;
694 2           n++;
695             }
696 1           queue_mutex_unlock(h->hdr);
697 1 50         if (n) queue_wake_producers_n(h->hdr, (uint32_t)n);
698 1 50         EXTEND(SP, (SSize_t)n);
    50          
699 3 100         for (UV j = 0; j < n; j++) {
700 2           SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
701 2 50         if (items_buf[j].utf8) SvUTF8_on(sv);
702 2           PUSHs(sv_2mortal(sv));
703 2           free(items_buf[j].buf);
704             }
705 1           free(items_buf);
706 1 50         if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
    50          
707              
708             UV
709             size(self)
710             SV *self
711             PREINIT:
712 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
713             CODE:
714 6           RETVAL = (UV)queue_str_size(h);
715             OUTPUT:
716             RETVAL
717              
718             UV
719             capacity(self)
720             SV *self
721             PREINIT:
722 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
723             CODE:
724 1 50         RETVAL = h->capacity;
725             OUTPUT:
726             RETVAL
727              
728             bool
729             is_empty(self)
730             SV *self
731             PREINIT:
732 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
733             CODE:
734 8 100         RETVAL = (queue_str_size(h) == 0);
735             OUTPUT:
736             RETVAL
737              
738             bool
739             is_full(self)
740             SV *self
741             PREINIT:
742 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
743             CODE:
744 3 100         RETVAL = (queue_str_size(h) >= h->capacity);
745             OUTPUT:
746             RETVAL
747              
748             void
749             clear(self)
750             SV *self
751             PREINIT:
752 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
753             CODE:
754 7           queue_str_clear(h);
755              
756             void
757             unlink(self_or_class, ...)
758             SV *self_or_class
759             CODE:
760             const char *path;
761 6 100         if (sv_isobject(self_or_class)) {
762 5           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
763 5 50         if (!h) croak("Attempted to use a destroyed object");
764 5           path = h->path;
765             } else {
766 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Str->unlink($path)");
767 1           path = SvPV_nolen(ST(1));
768             }
769 6 50         if (!path) croak("cannot unlink anonymous or memfd queue");
770 6 50         if (unlink(path) != 0)
771 0           croak("unlink(%s): %s", path, strerror(errno));
772              
773             SV *
774             path(self)
775             SV *self
776             PREINIT:
777 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
778             CODE:
779 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
780             OUTPUT:
781             RETVAL
782              
783             SV *
784             stats(self)
785             SV *self
786             PREINIT:
787 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
788             CODE:
789 9           HV *hv = newHV();
790 9           QueueHeader *hdr = h->hdr;
791 9           hv_store(hv, "size", 4, newSVuv((UV)queue_str_size(h)), 0);
792 9           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
793 9           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
794 9           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
795 9           hv_store(hv, "arena_used", 10, newSVuv(__atomic_load_n(&hdr->arena_used, __ATOMIC_RELAXED)), 0);
796 9           hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
797 9           hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
798 9           hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
799 9           hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
800 9           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
801 9           hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
802 9           hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
803 9           RETVAL = newRV_noinc((SV *)hv);
804             OUTPUT:
805             RETVAL
806              
807             SV *
808             peek(self)
809             SV *self
810             PREINIT:
811 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
812             const char *str;
813             uint32_t len;
814             bool utf8;
815             CODE:
816 7           int r = queue_str_peek(h, &str, &len, &utf8);
817 7 100         if (r == 1) {
818 6           RETVAL = newSVpvn(str, len);
819 6 100         if (utf8) SvUTF8_on(RETVAL);
820 1 50         } else if (r == -1) {
821 0           croak("Data::Queue::Shared::Str: out of memory");
822             } else {
823 1           RETVAL = &PL_sv_undef;
824             }
825             OUTPUT:
826             RETVAL
827              
828             bool
829             push_front(self, value)
830             SV *self
831             SV *value
832             PREINIT:
833 14 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
834             STRLEN len;
835             CODE:
836 14           const char *str = SvPV(value, len);
837 14           bool utf8 = SvUTF8(value) ? true : false;
838 14           int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
839 14 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
840 14 100         RETVAL = (r == 1);
841             OUTPUT:
842             RETVAL
843              
844             bool
845             push_front_wait(self, value, ...)
846             SV *self
847             SV *value
848             PREINIT:
849 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
850 3           double timeout = -1;
851             STRLEN len;
852             CODE:
853 3 100         if (items > 2) timeout = SvNV(ST(2));
854 3           const char *str = SvPV(value, len);
855 3           bool utf8 = SvUTF8(value) ? true : false;
856 3           int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
857 3 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
858 3 100         RETVAL = (r == 1);
859             OUTPUT:
860             RETVAL
861              
862             void
863             drain(self, ...)
864             SV *self
865             PREINIT:
866 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
867             const char *str;
868             uint32_t len;
869             bool utf8;
870             uint32_t max_count;
871             PPCODE:
872 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
873             /* Hoist SV construction out of the mutex (see pop_multi). */
874 8           struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
875 8           UV drained_n = 0;
876 8           int last_r = 0;
877 8           int oom = 0;
878 8           queue_mutex_lock(h->hdr);
879 30 100         while (max_count-- > 0) {
880 29           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
881 29 100         if (last_r <= 0) break;
882 22           struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
883 22 50         char *c = (char *)malloc(len ? len : 1);
884 22 50         if (!it || !c) { free(it); free(c); oom = 1; break; }
    50          
885 22 50         if (len) memcpy(c, str, len);
886 22           it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
887 22 100         if (drained_tail) drained_tail->next = it; else drained_head = it;
888 22           drained_tail = it;
889 22           drained_n++;
890             }
891 8           queue_mutex_unlock(h->hdr);
892 8 100         if (drained_n) queue_wake_producers_n(h->hdr, (uint32_t)drained_n);
893 8 50         EXTEND(SP, (SSize_t)drained_n);
    50          
894 30 100         while (drained_head) {
895 22           struct drain_item *it = drained_head; drained_head = it->next;
896 22           SV *sv = newSVpvn(it->buf, it->len);
897 22 50         if (it->utf8) SvUTF8_on(sv);
898 22           PUSHs(sv_2mortal(sv));
899 22           free(it->buf);
900 22           free(it);
901             }
902 8 50         if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
    50          
903              
904             void
905             pop_wait_multi(self, count, ...)
906             SV *self
907             UV count
908             PREINIT:
909 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
910 3           double timeout = -1;
911             const char *str;
912             uint32_t len;
913             bool utf8;
914             PPCODE:
915 3 50         if (items > 2) timeout = SvNV(ST(2));
916             /* Block until at least 1 */
917             {
918 3           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
919 3 50         if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
920 3 100         if (r != 1) XSRETURN(0);
921 2           SV *sv = newSVpvn(str, len);
922 2 50         if (utf8) SvUTF8_on(sv);
923 2 50         mXPUSHs(sv);
924             }
925             /* Grab up to count-1 more non-blocking */
926 6 50         for (UV i = 1; i < count; i++) {
927 6           int r = queue_str_try_pop(h, &str, &len, &utf8);
928 6 100         if (r <= 0) break;
929 4           SV *sv = newSVpvn(str, len);
930 4 50         if (utf8) SvUTF8_on(sv);
931 4 50         mXPUSHs(sv);
932             }
933              
934             UV
935             push_wait_multi(self, timeout, ...)
936             SV *self
937             double timeout
938             PREINIT:
939 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
940             CODE:
941 3           uint32_t nvalues = items - 2;
942 3           RETVAL = 0;
943 8 100         for (uint32_t i = 0; i < nvalues; i++) {
944 6           SV *sv = ST(i + 2);
945             STRLEN len;
946 6           const char *str = SvPV(sv, len);
947 6           bool utf8 = SvUTF8(sv) ? true : false;
948 6           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
949 6 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
950 6 100         if (r != 1) break;
951 5           RETVAL++;
952             }
953             OUTPUT:
954             RETVAL
955              
956             SV *
957             pop_back(self)
958             SV *self
959             PREINIT:
960 19 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
961             const char *str;
962             uint32_t len;
963             bool utf8;
964             CODE:
965 19           int r = queue_str_pop_back(h, &str, &len, &utf8);
966 19 100         if (r == 1) {
967 17           RETVAL = newSVpvn(str, len);
968 17 100         if (utf8) SvUTF8_on(RETVAL);
969 2 50         } else if (r == -1) {
970 0           croak("Data::Queue::Shared::Str: out of memory");
971             } else {
972 2           RETVAL = &PL_sv_undef;
973             }
974             OUTPUT:
975             RETVAL
976              
977             SV *
978             pop_back_wait(self, ...)
979             SV *self
980             PREINIT:
981 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
982 3           double timeout = -1;
983             const char *str;
984             uint32_t len;
985             bool utf8;
986             CODE:
987 3 50         if (items > 1) timeout = SvNV(ST(1));
988 3           int r = queue_str_pop_back_wait(h, &str, &len, &utf8, timeout);
989 3 100         if (r == 1) {
990 2           RETVAL = newSVpvn(str, len);
991 2 50         if (utf8) SvUTF8_on(RETVAL);
992 1 50         } else if (r == -1) {
993 0           croak("Data::Queue::Shared::Str: out of memory");
994             } else {
995 1           RETVAL = &PL_sv_undef;
996             }
997             OUTPUT:
998             RETVAL
999              
1000             void
1001             sync(self)
1002             SV *self
1003             PREINIT:
1004 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1005             CODE:
1006 2 50         if (queue_sync(h) != 0)
1007 0           croak("msync: %s", strerror(errno));
1008              
1009             IV
1010             eventfd(self)
1011             SV *self
1012             PREINIT:
1013 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1014             CODE:
1015 2           RETVAL = queue_eventfd_create(h);
1016 2 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1017             OUTPUT:
1018             RETVAL
1019              
1020             void
1021             eventfd_set(self, fd)
1022             SV *self
1023             int fd
1024             PREINIT:
1025 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
1026             CODE:
1027 0           queue_eventfd_set(h, fd);
1028              
1029             IV
1030             fileno(self)
1031             SV *self
1032             PREINIT:
1033 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
1034             CODE:
1035 0 0         RETVAL = h->notify_fd;
1036             OUTPUT:
1037             RETVAL
1038              
1039             SV *
1040             eventfd_consume(self)
1041             SV *self
1042             PREINIT:
1043 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1044             CODE:
1045 3           int64_t v = queue_eventfd_consume(h);
1046 3 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1047             OUTPUT:
1048             RETVAL
1049              
1050             void
1051             notify(self)
1052             SV *self
1053             PREINIT:
1054 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1055             CODE:
1056 2           queue_notify(h);
1057              
1058             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int32
1059              
1060             PROTOTYPES: DISABLE
1061              
1062             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int32/g; \$int_section =~ s/queue_int_/queue_int32_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT32/g; \$int_section =~ s/int64_t/int32_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"
1063              
1064             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int16
1065              
1066             PROTOTYPES: DISABLE
1067              
1068             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int16/g; \$int_section =~ s/queue_int_/queue_int16_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT16/g; \$int_section =~ s/int64_t/int16_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"