File Coverage

Shared.xs
Criterion Covered Total %
statement 349 362 96.4
branch 344 588 58.5
condition n/a
subroutine n/a
pod n/a
total 693 950 72.9


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "queue.h"
8              
9             #ifdef HAVE_XS_PARSE_KEYWORD
10             #include "XSParseKeyword.h"
11              
12             /* ---- Keyword build functions (compile keywords to direct ENTERSUB ops) ---- */
13              
14             static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
15             (void)nargs;
16             const char *func = (const char *)hookdata;
17             OP *q_op = args[0]->op;
18             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
19             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
20             op_append_elem(OP_LIST, q_op, cvref));
21             return KEYWORD_PLUGIN_EXPR;
22             }
23              
24             static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
25             (void)nargs;
26             const char *func = (const char *)hookdata;
27             OP *q_op = args[0]->op;
28             OP *val_op = args[1]->op;
29             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
30             OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
31             arglist = op_append_elem(OP_LIST, arglist, cvref);
32             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
33             return KEYWORD_PLUGIN_EXPR;
34             }
35              
36             static const struct XSParseKeywordPieceType pieces_1expr[] = {
37             XPK_TERMEXPR, {0}
38             };
39              
40             static const struct XSParseKeywordPieceType pieces_2expr[] = {
41             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
42             };
43              
44             /* Hook definition macro: q__ */
45             #define DEFINE_Q_KW(variant, PKG, kw, nargs, builder) \
46             static const struct XSParseKeywordHooks hooks_q_##variant##_##kw = { \
47             .flags = XPK_FLAG_EXPR, \
48             .permit_hintkey = "Data::Queue::Shared::" PKG "/q_" #variant "_" #kw, \
49             .pieces = pieces_##nargs##expr, \
50             .build = builder, \
51             };
52              
53             /* Int keywords */
54             DEFINE_Q_KW(int, "Int", push, 2, build_kw_2arg)
55             DEFINE_Q_KW(int, "Int", pop, 1, build_kw_1arg)
56             DEFINE_Q_KW(int, "Int", peek, 1, build_kw_1arg)
57             DEFINE_Q_KW(int, "Int", size, 1, build_kw_1arg)
58              
59             /* Int32 keywords */
60             DEFINE_Q_KW(int32, "Int32", push, 2, build_kw_2arg)
61             DEFINE_Q_KW(int32, "Int32", pop, 1, build_kw_1arg)
62             DEFINE_Q_KW(int32, "Int32", peek, 1, build_kw_1arg)
63             DEFINE_Q_KW(int32, "Int32", size, 1, build_kw_1arg)
64              
65             /* Int16 keywords */
66             DEFINE_Q_KW(int16, "Int16", push, 2, build_kw_2arg)
67             DEFINE_Q_KW(int16, "Int16", pop, 1, build_kw_1arg)
68             DEFINE_Q_KW(int16, "Int16", peek, 1, build_kw_1arg)
69             DEFINE_Q_KW(int16, "Int16", size, 1, build_kw_1arg)
70              
71             /* Str keywords */
72             DEFINE_Q_KW(str, "Str", push, 2, build_kw_2arg)
73             DEFINE_Q_KW(str, "Str", pop, 1, build_kw_1arg)
74             DEFINE_Q_KW(str, "Str", peek, 1, build_kw_1arg)
75             DEFINE_Q_KW(str, "Str", size, 1, build_kw_1arg)
76              
77             #define REGISTER_Q_KW(variant, kw, func_name) \
78             register_xs_parse_keyword("q_" #variant "_" #kw, \
79             &hooks_q_##variant##_##kw, (void*)func_name)
80              
81             #endif /* HAVE_XS_PARSE_KEYWORD */
82              
83             #define EXTRACT_HANDLE(classname, sv) \
84             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
85             croak("Expected a %s object", classname); \
86             QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(sv))); \
87             if (!h) croak("Attempted to use a destroyed %s object", classname)
88              
89             #define MAKE_OBJ(class, ptr) \
90             SV *ref = newRV_noinc(newSViv(PTR2IV(ptr))); \
91             sv_bless(ref, gv_stashpv(class, GV_ADD)); \
92             RETVAL = ref
93              
94             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int
95              
96             PROTOTYPES: DISABLE
97              
98             BOOT:
99             #ifdef HAVE_XS_PARSE_KEYWORD
100             boot_xs_parse_keyword(0.40);
101             sv_setiv(get_sv("Data::Queue::Shared::HAVE_KEYWORDS", GV_ADD), 1);
102             REGISTER_Q_KW(int, push, "Data::Queue::Shared::Int::push");
103             REGISTER_Q_KW(int, pop, "Data::Queue::Shared::Int::pop");
104             REGISTER_Q_KW(int, peek, "Data::Queue::Shared::Int::peek");
105             REGISTER_Q_KW(int, size, "Data::Queue::Shared::Int::size");
106             REGISTER_Q_KW(int32, push, "Data::Queue::Shared::Int32::push");
107             REGISTER_Q_KW(int32, pop, "Data::Queue::Shared::Int32::pop");
108             REGISTER_Q_KW(int32, peek, "Data::Queue::Shared::Int32::peek");
109             REGISTER_Q_KW(int32, size, "Data::Queue::Shared::Int32::size");
110             REGISTER_Q_KW(int16, push, "Data::Queue::Shared::Int16::push");
111             REGISTER_Q_KW(int16, pop, "Data::Queue::Shared::Int16::pop");
112             REGISTER_Q_KW(int16, peek, "Data::Queue::Shared::Int16::peek");
113             REGISTER_Q_KW(int16, size, "Data::Queue::Shared::Int16::size");
114             REGISTER_Q_KW(str, push, "Data::Queue::Shared::Str::push");
115             REGISTER_Q_KW(str, pop, "Data::Queue::Shared::Str::pop");
116             REGISTER_Q_KW(str, peek, "Data::Queue::Shared::Str::peek");
117             REGISTER_Q_KW(str, size, "Data::Queue::Shared::Str::size");
118             #endif
119             /* XSMARKER: end of BOOT — INCLUDE_COMMAND regex must strip BOOT from generated sections */
120              
121             SV *
122             new(class, path, capacity)
123             const char *class
124             SV *path
125             UV capacity
126             PREINIT:
127             char errbuf[QUEUE_ERR_BUFLEN];
128             CODE:
129 39 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
130 39           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
131 39 100         if (!h) croak("Data::Queue::Shared::Int->new: %s", errbuf);
132 37           MAKE_OBJ(class, h);
133             OUTPUT:
134             RETVAL
135              
136             SV *
137             new_memfd(class, name, capacity)
138             const char *class
139             const char *name
140             UV capacity
141             PREINIT:
142             char errbuf[QUEUE_ERR_BUFLEN];
143             CODE:
144 6           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_INT, 0, errbuf);
145 6 50         if (!h) croak("Data::Queue::Shared::Int->new_memfd: %s", errbuf);
146 6           MAKE_OBJ(class, h);
147             OUTPUT:
148             RETVAL
149              
150             SV *
151             new_from_fd(class, fd)
152             const char *class
153             int fd
154             PREINIT:
155             char errbuf[QUEUE_ERR_BUFLEN];
156             CODE:
157 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_INT, errbuf);
158 1 50         if (!h) croak("Data::Queue::Shared::Int->new_from_fd: %s", errbuf);
159 1           MAKE_OBJ(class, h);
160             OUTPUT:
161             RETVAL
162              
163             IV
164             memfd(self)
165             SV *self
166             PREINIT:
167 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
168             CODE:
169 2 50         RETVAL = h->backing_fd;
170             OUTPUT:
171             RETVAL
172              
173             void
174             DESTROY(self)
175             SV *self
176             CODE:
177 44 50         if (!SvROK(self)) return;
178 44           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self)));
179 44 50         if (!h) return;
180 44           sv_setiv(SvRV(self), 0);
181 44           queue_destroy(h);
182              
183             bool
184             push(self, value)
185             SV *self
186             IV value
187             PREINIT:
188 192 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
189             CODE:
190 192 100         RETVAL = queue_int_try_push(h, (int64_t)value);
191             OUTPUT:
192             RETVAL
193              
194             SV *
195             pop(self)
196             SV *self
197             PREINIT:
198 100 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
199             int64_t value;
200             CODE:
201 100 100         if (queue_int_try_pop(h, &value))
202 96           RETVAL = newSViv((IV)value);
203             else
204 4           RETVAL = &PL_sv_undef;
205             OUTPUT:
206             RETVAL
207              
208             bool
209             push_wait(self, value, ...)
210             SV *self
211             IV value
212             PREINIT:
213 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
214 2           double timeout = -1;
215             CODE:
216 2 50         if (items > 2) timeout = SvNV(ST(2));
217 2 100         RETVAL = queue_int_push_wait(h, (int64_t)value, timeout);
218             OUTPUT:
219             RETVAL
220              
221             SV *
222             pop_wait(self, ...)
223             SV *self
224             PREINIT:
225 104 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
226 104           double timeout = -1;
227             int64_t value;
228             CODE:
229 104 50         if (items > 1) timeout = SvNV(ST(1));
230 104 100         if (queue_int_pop_wait(h, &value, timeout))
231 103           RETVAL = newSViv((IV)value);
232             else
233 1           RETVAL = &PL_sv_undef;
234             OUTPUT:
235             RETVAL
236              
237             UV
238             push_multi(self, ...)
239             SV *self
240             PREINIT:
241 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
242             CODE:
243 3           uint32_t count = items - 1;
244 3           uint32_t pushed = 0;
245 13 100         for (uint32_t i = 0; i < count; i++) {
246 11 100         if (!queue_int_try_push(h, (int64_t)SvIV(ST(i + 1)))) break;
247 10           pushed++;
248             }
249              
250 3 50         RETVAL = pushed;
251             OUTPUT:
252             RETVAL
253              
254             void
255             pop_multi(self, count)
256             SV *self
257             UV count
258             PREINIT:
259 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
260             int64_t value;
261             PPCODE:
262 7 100         for (UV i = 0; i < count; i++) {
263 6 100         if (!queue_int_try_pop(h, &value)) break;
264 5 50         mXPUSHi((IV)value);
265             }
266              
267             UV
268             size(self)
269             SV *self
270             PREINIT:
271 14 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
272             CODE:
273 14           RETVAL = (UV)queue_int_size(h);
274             OUTPUT:
275             RETVAL
276              
277             UV
278             capacity(self)
279             SV *self
280             PREINIT:
281 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
282             CODE:
283 7 50         RETVAL = h->capacity;
284             OUTPUT:
285             RETVAL
286              
287             bool
288             is_empty(self)
289             SV *self
290             PREINIT:
291 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
292             CODE:
293 5 100         RETVAL = (queue_int_size(h) == 0);
294             OUTPUT:
295             RETVAL
296              
297             bool
298             is_full(self)
299             SV *self
300             PREINIT:
301 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
302             CODE:
303 3 100         RETVAL = (queue_int_size(h) >= h->capacity);
304             OUTPUT:
305             RETVAL
306              
307             void
308             clear(self)
309             SV *self
310             PREINIT:
311 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
312             CODE:
313 5           queue_int_clear(h);
314              
315             void
316             unlink(self_or_class, ...)
317             SV *self_or_class
318             CODE:
319             const char *path;
320 8 100         if (sv_isobject(self_or_class)) {
321 7           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
322 7 50         if (!h) croak("Attempted to use a destroyed object");
323 7           path = h->path;
324             } else {
325 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Int->unlink($path)");
326 1           path = SvPV_nolen(ST(1));
327             }
328 8 100         if (!path) croak("cannot unlink anonymous or memfd queue");
329 6 50         if (unlink(path) != 0)
330 0           croak("unlink(%s): %s", path, strerror(errno));
331              
332             SV *
333             path(self)
334             SV *self
335             PREINIT:
336 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
337             CODE:
338 3 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
339             OUTPUT:
340             RETVAL
341              
342             SV *
343             stats(self)
344             SV *self
345             PREINIT:
346 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
347             CODE:
348 6           HV *hv = newHV();
349 6           QueueHeader *hdr = h->hdr;
350 6           hv_store(hv, "size", 4, newSVuv((UV)queue_int_size(h)), 0);
351 6           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
352 6           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
353 6           hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
354 6           hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
355 6           hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
356 6           hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
357 6           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
358 6           hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
359 6           hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
360 6           RETVAL = newRV_noinc((SV *)hv);
361             OUTPUT:
362             RETVAL
363              
364             SV *
365             peek(self)
366             SV *self
367             PREINIT:
368 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
369             int64_t value;
370             CODE:
371 9 100         if (queue_int_peek(h, &value))
372 6           RETVAL = newSViv((IV)value);
373             else
374 3           RETVAL = &PL_sv_undef;
375             OUTPUT:
376             RETVAL
377              
378             void
379             drain(self, ...)
380             SV *self
381             PREINIT:
382 12 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
383             int64_t value;
384             uint32_t max_count;
385             PPCODE:
386 12 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
387 66 100         while (max_count-- > 0 && queue_int_try_pop(h, &value))
    100          
388 54 50         mXPUSHi((IV)value);
389              
390             void
391             pop_wait_multi(self, count, ...)
392             SV *self
393             UV count
394             PREINIT:
395 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
396 3           double timeout = -1;
397             int64_t value;
398             PPCODE:
399 3 50         if (items > 2) timeout = SvNV(ST(2));
400             /* Block until at least 1 */
401 3 100         if (!queue_int_pop_wait(h, &value, timeout)) XSRETURN(0);
402 2 50         mXPUSHi((IV)value);
403             /* Grab up to count-1 more non-blocking */
404 8 50         for (UV i = 1; i < count; i++) {
405 8 100         if (!queue_int_try_pop(h, &value)) break;
406 6 50         mXPUSHi((IV)value);
407             }
408              
409             UV
410             push_wait_multi(self, timeout, ...)
411             SV *self
412             double timeout
413             PREINIT:
414 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
415             CODE:
416 3           uint32_t nvalues = items - 2;
417 3           RETVAL = 0;
418 13 100         for (uint32_t i = 0; i < nvalues; i++) {
419 11 100         if (!queue_int_push_wait(h, (int64_t)SvIV(ST(i + 2)), timeout)) break;
420 10           RETVAL++;
421             }
422             OUTPUT:
423             RETVAL
424              
425             void
426             sync(self)
427             SV *self
428             PREINIT:
429 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
430             CODE:
431 2 50         if (queue_sync(h) != 0)
432 0           croak("msync: %s", strerror(errno));
433              
434             IV
435             eventfd(self)
436             SV *self
437             PREINIT:
438 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
439             CODE:
440 8           RETVAL = queue_eventfd_create(h);
441 8 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
442             OUTPUT:
443             RETVAL
444              
445             void
446             eventfd_set(self, fd)
447             SV *self
448             int fd
449             PREINIT:
450 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
451             CODE:
452 1           queue_eventfd_set(h, fd);
453              
454             IV
455             fileno(self)
456             SV *self
457             PREINIT:
458 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
459             CODE:
460 3 50         RETVAL = h->notify_fd;
461             OUTPUT:
462             RETVAL
463              
464             SV *
465             eventfd_consume(self)
466             SV *self
467             PREINIT:
468 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
469             CODE:
470 6           int64_t v = queue_eventfd_consume(h);
471 6 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
472             OUTPUT:
473             RETVAL
474              
475             void
476             notify(self)
477             SV *self
478             PREINIT:
479 5 50         EXTRACT_HANDLE("Data::Queue::Shared::Int", self);
    50          
    50          
480             CODE:
481 5           queue_notify(h);
482              
483             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Str
484              
485             SV *
486             new(class, path, capacity, ...)
487             const char *class
488             SV *path
489             UV capacity
490             PREINIT:
491             char errbuf[QUEUE_ERR_BUFLEN];
492             uint64_t arena_cap;
493             CODE:
494 37 100         if (items > 3)
495 14           arena_cap = (uint64_t)SvUV(ST(3));
496             else
497 23           arena_cap = (uint64_t)capacity * 256;
498 37 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
499 37           QueueHandle *h = queue_create(p, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
500 37 100         if (!h) croak("Data::Queue::Shared::Str->new: %s", errbuf);
501 36           MAKE_OBJ(class, h);
502             OUTPUT:
503             RETVAL
504              
505             SV *
506             new_memfd(class, name, capacity, ...)
507             const char *class
508             const char *name
509             UV capacity
510             PREINIT:
511             char errbuf[QUEUE_ERR_BUFLEN];
512             uint64_t arena_cap;
513             CODE:
514 4 100         if (items > 3)
515 1           arena_cap = (uint64_t)SvUV(ST(3));
516             else
517 3           arena_cap = (uint64_t)capacity * 256;
518 4           QueueHandle *h = queue_create_memfd(name, (uint32_t)capacity, QUEUE_MODE_STR, arena_cap, errbuf);
519 4 50         if (!h) croak("Data::Queue::Shared::Str->new_memfd: %s", errbuf);
520 4           MAKE_OBJ(class, h);
521             OUTPUT:
522             RETVAL
523              
524             SV *
525             new_from_fd(class, fd)
526             const char *class
527             int fd
528             PREINIT:
529             char errbuf[QUEUE_ERR_BUFLEN];
530             CODE:
531 1           QueueHandle *h = queue_open_fd(fd, QUEUE_MODE_STR, errbuf);
532 1 50         if (!h) croak("Data::Queue::Shared::Str->new_from_fd: %s", errbuf);
533 1           MAKE_OBJ(class, h);
534             OUTPUT:
535             RETVAL
536              
537             IV
538             memfd(self)
539             SV *self
540             PREINIT:
541 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
542             CODE:
543 3 50         RETVAL = h->backing_fd;
544             OUTPUT:
545             RETVAL
546              
547             void
548             DESTROY(self)
549             SV *self
550             CODE:
551 41 50         if (!SvROK(self)) return;
552 41           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self)));
553 41 50         if (!h) return;
554 41           sv_setiv(SvRV(self), 0);
555 41           queue_destroy(h);
556              
557             bool
558             push(self, value)
559             SV *self
560             SV *value
561             PREINIT:
562 170 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
563             STRLEN len;
564             CODE:
565 170           const char *str = SvPV(value, len);
566 170           bool utf8 = SvUTF8(value) ? true : false;
567 170           int r = queue_str_try_push(h, str, (uint32_t)len, utf8);
568 170 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
569 170 100         RETVAL = (r == 1);
570             OUTPUT:
571             RETVAL
572              
573             SV *
574             pop(self)
575             SV *self
576             PREINIT:
577 84 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
578             const char *str;
579             uint32_t len;
580             bool utf8;
581             CODE:
582 84           int r = queue_str_try_pop(h, &str, &len, &utf8);
583 84 100         if (r == 1) {
584 81           RETVAL = newSVpvn(str, len);
585 81 100         if (utf8) SvUTF8_on(RETVAL);
586 3 50         } else if (r == -1) {
587 0           croak("Data::Queue::Shared::Str: out of memory");
588             } else {
589 3           RETVAL = &PL_sv_undef;
590             }
591             OUTPUT:
592             RETVAL
593              
594             bool
595             push_wait(self, value, ...)
596             SV *self
597             SV *value
598             PREINIT:
599 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
600 2           double timeout = -1;
601             STRLEN len;
602             CODE:
603 2 50         if (items > 2) timeout = SvNV(ST(2));
604 2           const char *str = SvPV(value, len);
605 2           bool utf8 = SvUTF8(value) ? true : false;
606 2           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
607 2 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
608 2 100         RETVAL = (r == 1);
609             OUTPUT:
610             RETVAL
611              
612             SV *
613             pop_wait(self, ...)
614             SV *self
615             PREINIT:
616 54 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
617 54           double timeout = -1;
618             const char *str;
619             uint32_t len;
620             bool utf8;
621             CODE:
622 54 50         if (items > 1) timeout = SvNV(ST(1));
623 54           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
624 54 100         if (r == 1) {
625 53           RETVAL = newSVpvn(str, len);
626 53 50         if (utf8) SvUTF8_on(RETVAL);
627 1 50         } else if (r == -1) {
628 0           croak("Data::Queue::Shared::Str: out of memory");
629             } else {
630 1           RETVAL = &PL_sv_undef;
631             }
632             OUTPUT:
633             RETVAL
634              
635             UV
636             push_multi(self, ...)
637             SV *self
638             PREINIT:
639 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
640             CODE:
641 2           uint32_t count = items - 1;
642 2           uint32_t pushed = 0;
643 2           queue_mutex_lock(h->hdr);
644 7 100         for (uint32_t i = 0; i < count; i++) {
645 6           SV *sv = ST(i + 1);
646             STRLEN slen;
647 6           const char *str = SvPV(sv, slen);
648 6           bool utf8 = SvUTF8(sv) ? true : false;
649 6           int r = queue_str_push_locked(h, str, (uint32_t)slen, utf8);
650 6 50         if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
651 6 100         if (r != 1) break;
652 5           pushed++;
653             }
654 2           queue_mutex_unlock(h->hdr);
655 2 50         if (pushed) queue_wake_consumers(h->hdr);
656 2 50         RETVAL = pushed;
657             OUTPUT:
658             RETVAL
659              
660             void
661             pop_multi(self, count)
662             SV *self
663             UV count
664             PREINIT:
665 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
666             const char *str;
667             uint32_t len;
668             bool utf8;
669             PPCODE:
670             /* Hoist Perl SV construction out of the process-shared mutex:
671             * newSVpvn can longjmp on OOM and deadlock peers on the futex. */
672 1           struct { char *buf; uint32_t len; bool utf8; } *items_buf = NULL;
673 1           UV n = 0;
674 1           int last_r = 0;
675 1           int oom = 0;
676 1 50         if (count > 0) {
677 1           items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
678 1 50         if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
679             }
680 1           queue_mutex_lock(h->hdr);
681 3 100         for (UV i = 0; i < count; i++) {
682 2           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
683 2 50         if (last_r <= 0) break;
684 2 50         char *c = (char *)malloc(len ? len : 1);
685 2 50         if (!c) { oom = 1; break; }
686 2 50         if (len) memcpy(c, str, len);
687 2           items_buf[n].buf = c;
688 2           items_buf[n].len = len;
689 2           items_buf[n].utf8 = utf8;
690 2           n++;
691             }
692 1           queue_mutex_unlock(h->hdr);
693 1           queue_wake_producers(h->hdr);
694 1 50         EXTEND(SP, (SSize_t)n);
    50          
695 3 100         for (UV j = 0; j < n; j++) {
696 2           SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
697 2 50         if (items_buf[j].utf8) SvUTF8_on(sv);
698 2           PUSHs(sv_2mortal(sv));
699 2           free(items_buf[j].buf);
700             }
701 1           free(items_buf);
702 1 50         if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
    50          
703              
704             UV
705             size(self)
706             SV *self
707             PREINIT:
708 6 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
709             CODE:
710 6           RETVAL = (UV)queue_str_size(h);
711             OUTPUT:
712             RETVAL
713              
714             UV
715             capacity(self)
716             SV *self
717             PREINIT:
718 1 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
719             CODE:
720 1 50         RETVAL = h->capacity;
721             OUTPUT:
722             RETVAL
723              
724             bool
725             is_empty(self)
726             SV *self
727             PREINIT:
728 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
729             CODE:
730 8 100         RETVAL = (queue_str_size(h) == 0);
731             OUTPUT:
732             RETVAL
733              
734             bool
735             is_full(self)
736             SV *self
737             PREINIT:
738 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
739             CODE:
740 3 100         RETVAL = (queue_str_size(h) >= h->capacity);
741             OUTPUT:
742             RETVAL
743              
744             void
745             clear(self)
746             SV *self
747             PREINIT:
748 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
749             CODE:
750 7           queue_str_clear(h);
751              
752             void
753             unlink(self_or_class, ...)
754             SV *self_or_class
755             CODE:
756             const char *path;
757 6 100         if (sv_isobject(self_or_class)) {
758 5           QueueHandle *h = INT2PTR(QueueHandle*, SvIV(SvRV(self_or_class)));
759 5 50         if (!h) croak("Attempted to use a destroyed object");
760 5           path = h->path;
761             } else {
762 1 50         if (items < 2) croak("Usage: Data::Queue::Shared::Str->unlink($path)");
763 1           path = SvPV_nolen(ST(1));
764             }
765 6 50         if (!path) croak("cannot unlink anonymous or memfd queue");
766 6 50         if (unlink(path) != 0)
767 0           croak("unlink(%s): %s", path, strerror(errno));
768              
769             SV *
770             path(self)
771             SV *self
772             PREINIT:
773 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
774             CODE:
775 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
776             OUTPUT:
777             RETVAL
778              
779             SV *
780             stats(self)
781             SV *self
782             PREINIT:
783 9 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
784             CODE:
785 9           HV *hv = newHV();
786 9           QueueHeader *hdr = h->hdr;
787 9           hv_store(hv, "size", 4, newSVuv((UV)queue_str_size(h)), 0);
788 9           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
789 9           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
790 9           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
791 9           hv_store(hv, "arena_used", 10, newSVuv(__atomic_load_n(&hdr->arena_used, __ATOMIC_RELAXED)), 0);
792 9           hv_store(hv, "push_ok", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_push_ok, __ATOMIC_RELAXED)), 0);
793 9           hv_store(hv, "pop_ok", 6, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_ok, __ATOMIC_RELAXED)), 0);
794 9           hv_store(hv, "push_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_push_full, __ATOMIC_RELAXED)), 0);
795 9           hv_store(hv, "pop_empty", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_pop_empty, __ATOMIC_RELAXED)), 0);
796 9           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
797 9           hv_store(hv, "push_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED)), 0);
798 9           hv_store(hv, "pop_waiters", 11, newSVuv((UV)__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED)), 0);
799 9           RETVAL = newRV_noinc((SV *)hv);
800             OUTPUT:
801             RETVAL
802              
803             SV *
804             peek(self)
805             SV *self
806             PREINIT:
807 7 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
808             const char *str;
809             uint32_t len;
810             bool utf8;
811             CODE:
812 7           int r = queue_str_peek(h, &str, &len, &utf8);
813 7 100         if (r == 1) {
814 6           RETVAL = newSVpvn(str, len);
815 6 100         if (utf8) SvUTF8_on(RETVAL);
816 1 50         } else if (r == -1) {
817 0           croak("Data::Queue::Shared::Str: out of memory");
818             } else {
819 1           RETVAL = &PL_sv_undef;
820             }
821             OUTPUT:
822             RETVAL
823              
824             bool
825             push_front(self, value)
826             SV *self
827             SV *value
828             PREINIT:
829 14 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
830             STRLEN len;
831             CODE:
832 14           const char *str = SvPV(value, len);
833 14           bool utf8 = SvUTF8(value) ? true : false;
834 14           int r = queue_str_push_front(h, str, (uint32_t)len, utf8);
835 14 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
836 14 100         RETVAL = (r == 1);
837             OUTPUT:
838             RETVAL
839              
840             bool
841             push_front_wait(self, value, ...)
842             SV *self
843             SV *value
844             PREINIT:
845 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
846 3           double timeout = -1;
847             STRLEN len;
848             CODE:
849 3 100         if (items > 2) timeout = SvNV(ST(2));
850 3           const char *str = SvPV(value, len);
851 3           bool utf8 = SvUTF8(value) ? true : false;
852 3           int r = queue_str_push_front_wait(h, str, (uint32_t)len, utf8, timeout);
853 3 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
854 3 100         RETVAL = (r == 1);
855             OUTPUT:
856             RETVAL
857              
858             void
859             drain(self, ...)
860             SV *self
861             PREINIT:
862 8 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
863             const char *str;
864             uint32_t len;
865             bool utf8;
866             uint32_t max_count;
867             PPCODE:
868 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
869             /* Hoist SV construction out of the mutex (see pop_multi). */
870 8           struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
871 8           UV drained_n = 0;
872 8           int last_r = 0;
873 8           int oom = 0;
874 8           queue_mutex_lock(h->hdr);
875 30 100         while (max_count-- > 0) {
876 29           last_r = queue_str_pop_locked(h, &str, &len, &utf8);
877 29 100         if (last_r <= 0) break;
878 22           struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
879 22 50         char *c = (char *)malloc(len ? len : 1);
880 22 50         if (!it || !c) { free(it); free(c); oom = 1; break; }
    50          
881 22 50         if (len) memcpy(c, str, len);
882 22           it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
883 22 100         if (drained_tail) drained_tail->next = it; else drained_head = it;
884 22           drained_tail = it;
885 22           drained_n++;
886             }
887 8           queue_mutex_unlock(h->hdr);
888 8           queue_wake_producers(h->hdr);
889 8 50         EXTEND(SP, (SSize_t)drained_n);
    50          
890 30 100         while (drained_head) {
891 22           struct drain_item *it = drained_head; drained_head = it->next;
892 22           SV *sv = newSVpvn(it->buf, it->len);
893 22 50         if (it->utf8) SvUTF8_on(sv);
894 22           PUSHs(sv_2mortal(sv));
895 22           free(it->buf);
896 22           free(it);
897             }
898 8 50         if (last_r == -1 || oom) croak("Data::Queue::Shared::Str: out of memory");
    50          
899              
900             void
901             pop_wait_multi(self, count, ...)
902             SV *self
903             UV count
904             PREINIT:
905 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
906 3           double timeout = -1;
907             const char *str;
908             uint32_t len;
909             bool utf8;
910             PPCODE:
911 3 50         if (items > 2) timeout = SvNV(ST(2));
912             /* Block until at least 1 */
913             {
914 3           int r = queue_str_pop_wait(h, &str, &len, &utf8, timeout);
915 3 50         if (r == -1) croak("Data::Queue::Shared::Str: out of memory");
916 3 100         if (r != 1) XSRETURN(0);
917 2           SV *sv = newSVpvn(str, len);
918 2 50         if (utf8) SvUTF8_on(sv);
919 2 50         mXPUSHs(sv);
920             }
921             /* Grab up to count-1 more non-blocking */
922 6 50         for (UV i = 1; i < count; i++) {
923 6           int r = queue_str_try_pop(h, &str, &len, &utf8);
924 6 100         if (r <= 0) break;
925 4           SV *sv = newSVpvn(str, len);
926 4 50         if (utf8) SvUTF8_on(sv);
927 4 50         mXPUSHs(sv);
928             }
929              
930             UV
931             push_wait_multi(self, timeout, ...)
932             SV *self
933             double timeout
934             PREINIT:
935 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
936             CODE:
937 3           uint32_t nvalues = items - 2;
938 3           RETVAL = 0;
939 8 100         for (uint32_t i = 0; i < nvalues; i++) {
940 6           SV *sv = ST(i + 2);
941             STRLEN len;
942 6           const char *str = SvPV(sv, len);
943 6           bool utf8 = SvUTF8(sv) ? true : false;
944 6           int r = queue_str_push_wait(h, str, (uint32_t)len, utf8, timeout);
945 6 50         if (r == -2) croak("Data::Queue::Shared::Str: string too long (max 2GB)");
946 6 100         if (r != 1) break;
947 5           RETVAL++;
948             }
949             OUTPUT:
950             RETVAL
951              
952             SV *
953             pop_back(self)
954             SV *self
955             PREINIT:
956 19 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
957             const char *str;
958             uint32_t len;
959             bool utf8;
960             CODE:
961 19           int r = queue_str_pop_back(h, &str, &len, &utf8);
962 19 100         if (r == 1) {
963 17           RETVAL = newSVpvn(str, len);
964 17 100         if (utf8) SvUTF8_on(RETVAL);
965 2 50         } else if (r == -1) {
966 0           croak("Data::Queue::Shared::Str: out of memory");
967             } else {
968 2           RETVAL = &PL_sv_undef;
969             }
970             OUTPUT:
971             RETVAL
972              
973             SV *
974             pop_back_wait(self, ...)
975             SV *self
976             PREINIT:
977 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
978 3           double timeout = -1;
979             const char *str;
980             uint32_t len;
981             bool utf8;
982             CODE:
983 3 50         if (items > 1) timeout = SvNV(ST(1));
984 3           int r = queue_str_pop_back_wait(h, &str, &len, &utf8, timeout);
985 3 100         if (r == 1) {
986 2           RETVAL = newSVpvn(str, len);
987 2 50         if (utf8) SvUTF8_on(RETVAL);
988 1 50         } else if (r == -1) {
989 0           croak("Data::Queue::Shared::Str: out of memory");
990             } else {
991 1           RETVAL = &PL_sv_undef;
992             }
993             OUTPUT:
994             RETVAL
995              
996             void
997             sync(self)
998             SV *self
999             PREINIT:
1000 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1001             CODE:
1002 2 50         if (queue_sync(h) != 0)
1003 0           croak("msync: %s", strerror(errno));
1004              
1005             IV
1006             eventfd(self)
1007             SV *self
1008             PREINIT:
1009 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1010             CODE:
1011 2           RETVAL = queue_eventfd_create(h);
1012 2 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1013             OUTPUT:
1014             RETVAL
1015              
1016             void
1017             eventfd_set(self, fd)
1018             SV *self
1019             int fd
1020             PREINIT:
1021 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
1022             CODE:
1023 0           queue_eventfd_set(h, fd);
1024              
1025             IV
1026             fileno(self)
1027             SV *self
1028             PREINIT:
1029 0 0         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    0          
    0          
1030             CODE:
1031 0 0         RETVAL = h->notify_fd;
1032             OUTPUT:
1033             RETVAL
1034              
1035             SV *
1036             eventfd_consume(self)
1037             SV *self
1038             PREINIT:
1039 3 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1040             CODE:
1041 3           int64_t v = queue_eventfd_consume(h);
1042 3 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1043             OUTPUT:
1044             RETVAL
1045              
1046             void
1047             notify(self)
1048             SV *self
1049             PREINIT:
1050 2 50         EXTRACT_HANDLE("Data::Queue::Shared::Str", self);
    50          
    50          
1051             CODE:
1052 2           queue_notify(h);
1053              
1054             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int32
1055              
1056             PROTOTYPES: DISABLE
1057              
1058             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int32/g; \$int_section =~ s/queue_int_/queue_int32_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT32/g; \$int_section =~ s/int64_t/int32_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"
1059              
1060             MODULE = Data::Queue::Shared PACKAGE = Data::Queue::Shared::Int16
1061              
1062             PROTOTYPES: DISABLE
1063              
1064             INCLUDE_COMMAND: $^X -e "use strict; my \$t = do{local \$/; open my \$f,'<','Shared.xs' or die; <\$f>}; my (\$int_section) = \$t =~ /^(MODULE.*?PACKAGE = Data::Queue::Shared::Int\n.*?)^MODULE/ms; \$int_section =~ s/Data::Queue::Shared::Int(?!\\d)/Data::Queue::Shared::Int16/g; \$int_section =~ s/queue_int_/queue_int16_/g; \$int_section =~ s/QUEUE_MODE_INT(?!\\d)/QUEUE_MODE_INT16/g; \$int_section =~ s/int64_t/int16_t/g; \$int_section =~ s/^MODULE.*\\n//m; \$int_section =~ s/^BOOT:\\n(?:.*\\n)*?\\/\\* XSMARKER.*\\n//m; print \$int_section"