File Coverage

Shared.xs
Criterion Covered Total %
statement 597 693 86.1
branch 606 1324 45.7
condition n/a
subroutine n/a
pod n/a
total 1203 2017 59.6


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "pubsub.h"
8              
9             #ifdef HAVE_XS_PARSE_KEYWORD
10             #include "XSParseKeyword.h"
11              
12             static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
13             (void)nargs;
14             const char *func = (const char *)hookdata;
15             OP *q_op = args[0]->op;
16             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
17             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
18             op_append_elem(OP_LIST, q_op, cvref));
19             return KEYWORD_PLUGIN_EXPR;
20             }
21              
22             static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
23             (void)nargs;
24             const char *func = (const char *)hookdata;
25             OP *q_op = args[0]->op;
26             OP *val_op = args[1]->op;
27             OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
28             OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
29             arglist = op_append_elem(OP_LIST, arglist, cvref);
30             *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
31             return KEYWORD_PLUGIN_EXPR;
32             }
33              
34             static const struct XSParseKeywordPieceType pieces_1expr[] = {
35             XPK_TERMEXPR, {0}
36             };
37              
38             static const struct XSParseKeywordPieceType pieces_2expr[] = {
39             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
40             };
41              
42             #define DEFINE_PS_KW(variant, PKG, kw, nargs, builder) \
43             static const struct XSParseKeywordHooks hooks_ps_##variant##_##kw = { \
44             .flags = XPK_FLAG_EXPR, \
45             .permit_hintkey = "Data::PubSub::Shared::" PKG "/ps_" #variant "_" #kw, \
46             .pieces = pieces_##nargs##expr, \
47             .build = builder, \
48             };
49              
50             /* Publisher keywords */
51             DEFINE_PS_KW(int, "Int", publish, 2, build_kw_2arg)
52              
53             /* Subscriber keywords */
54             DEFINE_PS_KW(int, "Int", poll, 1, build_kw_1arg)
55             DEFINE_PS_KW(int, "Int", lag, 1, build_kw_1arg)
56              
57             /* Str publisher keywords */
58             DEFINE_PS_KW(str, "Str", publish, 2, build_kw_2arg)
59              
60             /* Int32 keywords */
61             DEFINE_PS_KW(int32, "Int32", publish, 2, build_kw_2arg)
62             DEFINE_PS_KW(int32, "Int32", poll, 1, build_kw_1arg)
63             DEFINE_PS_KW(int32, "Int32", lag, 1, build_kw_1arg)
64              
65             /* Int16 keywords */
66             DEFINE_PS_KW(int16, "Int16", publish, 2, build_kw_2arg)
67             DEFINE_PS_KW(int16, "Int16", poll, 1, build_kw_1arg)
68             DEFINE_PS_KW(int16, "Int16", lag, 1, build_kw_1arg)
69              
70             /* Str subscriber keywords */
71             DEFINE_PS_KW(str, "Str", poll, 1, build_kw_1arg)
72             DEFINE_PS_KW(str, "Str", lag, 1, build_kw_1arg)
73              
74             #define REGISTER_PS_KW(variant, kw, func_name) \
75             register_xs_parse_keyword("ps_" #variant "_" #kw, \
76             &hooks_ps_##variant##_##kw, (void*)func_name)
77              
78             #endif /* HAVE_XS_PARSE_KEYWORD */
79              
80             #define EXTRACT_HANDLE(classname, sv) \
81             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
82             croak("Expected a %s object", classname); \
83             PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(sv))); \
84             if (!h) croak("Attempted to use a destroyed %s object", classname)
85              
86             #define EXTRACT_SUB(classname, sv) \
87             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
88             croak("Expected a %s object", classname); \
89             PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(sv))); \
90             if (!sub) croak("Attempted to use a destroyed %s object", classname)
91              
92             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int
93              
94             PROTOTYPES: DISABLE
95              
96             BOOT:
97             #ifdef HAVE_XS_PARSE_KEYWORD
98             boot_xs_parse_keyword(0.40);
99             sv_setiv(get_sv("Data::PubSub::Shared::HAVE_KEYWORDS", GV_ADD), 1);
100             REGISTER_PS_KW(int, publish, "Data::PubSub::Shared::Int::publish");
101             REGISTER_PS_KW(int, poll, "Data::PubSub::Shared::Int::Sub::poll");
102             REGISTER_PS_KW(int, lag, "Data::PubSub::Shared::Int::Sub::lag");
103             REGISTER_PS_KW(int32, publish, "Data::PubSub::Shared::Int32::publish");
104             REGISTER_PS_KW(int32, poll, "Data::PubSub::Shared::Int32::Sub::poll");
105             REGISTER_PS_KW(int32, lag, "Data::PubSub::Shared::Int32::Sub::lag");
106             REGISTER_PS_KW(int16, publish, "Data::PubSub::Shared::Int16::publish");
107             REGISTER_PS_KW(int16, poll, "Data::PubSub::Shared::Int16::Sub::poll");
108             REGISTER_PS_KW(int16, lag, "Data::PubSub::Shared::Int16::Sub::lag");
109             REGISTER_PS_KW(str, publish, "Data::PubSub::Shared::Str::publish");
110             REGISTER_PS_KW(str, poll, "Data::PubSub::Shared::Str::Sub::poll");
111             REGISTER_PS_KW(str, lag, "Data::PubSub::Shared::Str::Sub::lag");
112             #endif
113              
114             SV *
115             new(class, path, capacity)
116             const char *class
117             SV *path
118             UV capacity
119             PREINIT:
120             char errbuf[PUBSUB_ERR_BUFLEN];
121             CODE:
122 64 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
123 64           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
124 64 100         if (!h) croak("Data::PubSub::Shared::Int->new: %s", errbuf);
125 63           SV *obj = newSViv(PTR2IV(h));
126 63           SV *ref = newRV_noinc(obj);
127 63           sv_bless(ref, gv_stashpv(class, GV_ADD));
128 63           RETVAL = ref;
129             OUTPUT:
130             RETVAL
131              
132             SV *
133             new_memfd(class, name, capacity)
134             const char *class
135             const char *name
136             UV capacity
137             PREINIT:
138             char errbuf[PUBSUB_ERR_BUFLEN];
139             CODE:
140 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
141 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_memfd: %s", errbuf);
142 1           SV *obj = newSViv(PTR2IV(h));
143 1           SV *ref = newRV_noinc(obj);
144 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
145 1           RETVAL = ref;
146             OUTPUT:
147             RETVAL
148              
149             SV *
150             new_from_fd(class, fd)
151             const char *class
152             int fd
153             PREINIT:
154             char errbuf[PUBSUB_ERR_BUFLEN];
155             CODE:
156 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT, errbuf);
157 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_from_fd: %s", errbuf);
158 1           SV *obj = newSViv(PTR2IV(h));
159 1           SV *ref = newRV_noinc(obj);
160 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
161 1           RETVAL = ref;
162             OUTPUT:
163             RETVAL
164              
165             IV
166             memfd(self)
167             SV *self
168             PREINIT:
169 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
170             CODE:
171 1 50         RETVAL = h->backing_fd;
172             OUTPUT:
173             RETVAL
174              
175             void
176             DESTROY(self)
177             SV *self
178             PREINIT:
179 65 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
180             CODE:
181 65           sv_setiv(SvRV(self), 0);
182 65           pubsub_destroy(h);
183              
184             bool
185             publish(self, value)
186             SV *self
187             IV value
188             PREINIT:
189 465 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
190             CODE:
191 465 50         RETVAL = pubsub_int_publish(h, (int64_t)value);
192             OUTPUT:
193             RETVAL
194              
195             UV
196             publish_multi(self, ...)
197             SV *self
198             PREINIT:
199 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
200             CODE:
201 5           uint32_t count = items - 1;
202 5 100         if (count == 0) { RETVAL = 0; }
203             else {
204 4 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
205 4           int64_t *vals = (int64_t *)alloca(count * sizeof(int64_t));
206 212 100         for (uint32_t i = 0; i < count; i++)
207 208           vals[i] = (int64_t)SvIV(ST(i + 1));
208 4           RETVAL = pubsub_int_publish_multi(h, vals, count);
209             }
210             OUTPUT:
211             RETVAL
212              
213             void
214             publish_notify(self, value)
215             SV *self
216             IV value
217             PREINIT:
218 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
219             CODE:
220 1           pubsub_int_publish(h, (int64_t)value);
221 1           pubsub_notify(h);
222              
223             SV *
224             subscribe(self)
225             SV *self
226             PREINIT:
227 19 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
228             CODE:
229 19           PubSubSub *sub = pubsub_subscribe(h, 0);
230 19 50         if (!sub) croak("subscribe: out of memory");
231 19           sub->userdata = (void *)newSVsv(self);
232 19           SV *obj = newSViv(PTR2IV(sub));
233 19           SV *ref = newRV_noinc(obj);
234 19           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int::Sub", GV_ADD));
235 19           RETVAL = ref;
236             OUTPUT:
237             RETVAL
238              
239             SV *
240             subscribe_all(self)
241             SV *self
242             PREINIT:
243 26 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
244             CODE:
245 26           PubSubSub *sub = pubsub_subscribe(h, 1);
246 26 50         if (!sub) croak("subscribe_all: out of memory");
247 26           sub->userdata = (void *)newSVsv(self);
248 26           SV *obj = newSViv(PTR2IV(sub));
249 26           SV *ref = newRV_noinc(obj);
250 26           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int::Sub", GV_ADD));
251 26           RETVAL = ref;
252             OUTPUT:
253             RETVAL
254              
255             UV
256             capacity(self)
257             SV *self
258             PREINIT:
259 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
260             CODE:
261 1 50         RETVAL = h->capacity;
262             OUTPUT:
263             RETVAL
264              
265             UV
266             write_pos(self)
267             SV *self
268             PREINIT:
269 12 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
270             CODE:
271 12 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
272             OUTPUT:
273             RETVAL
274              
275             SV *
276             path(self)
277             SV *self
278             PREINIT:
279 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
280             CODE:
281 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
282             OUTPUT:
283             RETVAL
284              
285             SV *
286             stats(self)
287             SV *self
288             PREINIT:
289 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
290             CODE:
291 5           HV *hv = newHV();
292 5           PubSubHeader *hdr = h->hdr;
293 5           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
294 5           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
295 5           hv_store(hv, "write_pos", 9,
296             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
297 5           hv_store(hv, "publish_ok", 10,
298             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
299 5           hv_store(hv, "recoveries", 10,
300             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
301 5           hv_store(hv, "sub_waiters", 11,
302             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
303 5           RETVAL = newRV_noinc((SV *)hv);
304             OUTPUT:
305             RETVAL
306              
307             void
308             clear(self)
309             SV *self
310             PREINIT:
311 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
312             CODE:
313 4           pubsub_clear(h);
314              
315             void
316             sync(self)
317             SV *self
318             PREINIT:
319 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
320             CODE:
321 4 50         if (pubsub_sync(h) != 0)
322 0           croak("msync: %s", strerror(errno));
323              
324             void
325             unlink(self_or_class, ...)
326             SV *self_or_class
327             CODE:
328             const char *path;
329 1 50         if (sv_isobject(self_or_class)) {
330 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
331 1 50         if (!h) croak("Attempted to use a destroyed object");
332 1           path = h->path;
333             } else {
334 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int->unlink($path)");
335 0           path = SvPV_nolen(ST(1));
336             }
337 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
338 1 50         if (unlink(path) != 0)
339 0           croak("unlink(%s): %s", path, strerror(errno));
340              
341             IV
342             eventfd(self)
343             SV *self
344             PREINIT:
345 9 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
346             CODE:
347 9           RETVAL = pubsub_eventfd_create(h);
348 9 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
349             OUTPUT:
350             RETVAL
351              
352             void
353             eventfd_set(self, fd)
354             SV *self
355             int fd
356             PREINIT:
357 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
358             CODE:
359 1           pubsub_eventfd_set(h, fd);
360              
361             IV
362             fileno(self)
363             SV *self
364             PREINIT:
365 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
366             CODE:
367 1 50         RETVAL = h->notify_fd;
368             OUTPUT:
369             RETVAL
370              
371             void
372             eventfd_consume(self)
373             SV *self
374             PREINIT:
375 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
376             CODE:
377 4           pubsub_eventfd_consume(h);
378              
379             void
380             notify(self)
381             SV *self
382             PREINIT:
383 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
384             CODE:
385 4           pubsub_notify(h);
386              
387             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int::Sub
388              
389             void
390             DESTROY(self)
391             SV *self
392             PREINIT:
393 45 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
394             CODE:
395 45           sv_setiv(SvRV(self), 0);
396 45 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
397 45           pubsub_sub_destroy(sub);
398              
399             SV *
400             poll(self)
401             SV *self
402             PREINIT:
403 31 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
404             int64_t value;
405             CODE:
406 31           int r = pubsub_int_poll(sub, &value);
407 31 100         if (r == 1)
408 28           RETVAL = newSViv((IV)value);
409             else
410 3           RETVAL = &PL_sv_undef;
411             OUTPUT:
412             RETVAL
413              
414             void
415             poll_multi(self, count)
416             SV *self
417             UV count
418             PREINIT:
419 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
420             int64_t value;
421             PPCODE:
422 12 100         for (UV i = 0; i < count; i++) {
423 10           int r = pubsub_int_poll(sub, &value);
424 10 50         if (r != 1) break;
425 10 50         mXPUSHi((IV)value);
426             }
427              
428             SV *
429             poll_wait(self, ...)
430             SV *self
431             PREINIT:
432 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
433 9           double timeout = -1;
434             int64_t value;
435             CODE:
436 9 50         if (items > 1) timeout = SvNV(ST(1));
437 9           int r = pubsub_int_poll_wait(sub, &value, timeout);
438 9 100         if (r == 1)
439 8           RETVAL = newSViv((IV)value);
440             else
441 1           RETVAL = &PL_sv_undef;
442             OUTPUT:
443             RETVAL
444              
445             void
446             drain(self, ...)
447             SV *self
448             PREINIT:
449 8 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
450             int64_t value;
451             uint32_t max_count;
452             PPCODE:
453 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
454 1028 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
455 1020 50         mXPUSHi((IV)value);
456              
457             void
458             poll_wait_multi(self, count, ...)
459             SV *self
460             UV count
461             PREINIT:
462 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
463 3           double timeout = -1;
464             int64_t value;
465             PPCODE:
466 3 100         if (count == 0) XSRETURN(0);
467 2 50         if (items > 2) timeout = SvNV(ST(2));
468 2 100         if (!pubsub_int_poll_wait(sub, &value, timeout)) XSRETURN(0);
469 1 50         mXPUSHi((IV)value);
470 5 100         for (UV i = 1; i < count; i++) {
471 4 50         if (!pubsub_int_poll(sub, &value)) break;
472 4 50         mXPUSHi((IV)value);
473             }
474              
475             UV
476             lag(self)
477             SV *self
478             PREINIT:
479 16 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
480             CODE:
481 16           RETVAL = (UV)pubsub_lag(sub);
482             OUTPUT:
483             RETVAL
484              
485             UV
486             overflow_count(self)
487             SV *self
488             PREINIT:
489 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
490             CODE:
491 2 50         RETVAL = (UV)sub->overflow_count;
492             OUTPUT:
493             RETVAL
494              
495             UV
496             write_pos(self)
497             SV *self
498             PREINIT:
499 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
500             CODE:
501 2 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
502             OUTPUT:
503             RETVAL
504              
505             bool
506             has_overflow(self)
507             SV *self
508             PREINIT:
509 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
510             CODE:
511 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
512 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
513             OUTPUT:
514             RETVAL
515              
516             UV
517             cursor(self, ...)
518             SV *self
519             PREINIT:
520 6 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
521             CODE:
522 6 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
523 6 50         RETVAL = (UV)sub->cursor;
524             OUTPUT:
525             RETVAL
526              
527             void
528             reset(self)
529             SV *self
530             PREINIT:
531 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
532             CODE:
533 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
534              
535             void
536             reset_oldest(self)
537             SV *self
538             PREINIT:
539 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
540             CODE:
541 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
542 2 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
543              
544             UV
545             poll_cb(self, cb)
546             SV *self
547             SV *cb
548             PREINIT:
549 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
550             int64_t value;
551             CODE:
552 3           RETVAL = 0;
553 108 100         while (pubsub_int_poll(sub, &value)) {
554 105           dSP;
555 105 50         PUSHMARK(SP);
556 105 50         mXPUSHi((IV)value);
557 105           PUTBACK;
558 105           call_sv(cb, G_DISCARD);
559 105           RETVAL++;
560             }
561             OUTPUT:
562             RETVAL
563              
564             void
565             drain_notify(self, ...)
566             SV *self
567             PREINIT:
568 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
569             int64_t value;
570             uint32_t max_count;
571             PPCODE:
572 3           pubsub_sub_eventfd_consume(sub);
573 3 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
574 8 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
575 5 50         mXPUSHi((IV)value);
576              
577             void
578             eventfd_set(self, fd)
579             SV *self
580             int fd
581             PREINIT:
582 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
583             CODE:
584 1           sub->notify_fd = fd;
585              
586             IV
587             fileno(self)
588             SV *self
589             PREINIT:
590 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
591             CODE:
592 3 50         RETVAL = sub->notify_fd;
593             OUTPUT:
594             RETVAL
595              
596             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str
597              
598             SV *
599             new(class, path, capacity, ...)
600             const char *class
601             SV *path
602             UV capacity
603             PREINIT:
604             char errbuf[PUBSUB_ERR_BUFLEN];
605             uint32_t msg_size;
606             CODE:
607 47 100         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
608 47 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
609 47           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
610 47 100         if (!h) croak("Data::PubSub::Shared::Str->new: %s", errbuf);
611 46           SV *obj = newSViv(PTR2IV(h));
612 46           SV *ref = newRV_noinc(obj);
613 46           sv_bless(ref, gv_stashpv(class, GV_ADD));
614 46           RETVAL = ref;
615             OUTPUT:
616             RETVAL
617              
618             SV *
619             new_memfd(class, name, capacity, ...)
620             const char *class
621             const char *name
622             UV capacity
623             PREINIT:
624             char errbuf[PUBSUB_ERR_BUFLEN];
625             uint32_t msg_size;
626             CODE:
627 1 50         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
628 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
629 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_memfd: %s", errbuf);
630 1           SV *obj = newSViv(PTR2IV(h));
631 1           SV *ref = newRV_noinc(obj);
632 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
633 1           RETVAL = ref;
634             OUTPUT:
635             RETVAL
636              
637             SV *
638             new_from_fd(class, fd)
639             const char *class
640             int fd
641             PREINIT:
642             char errbuf[PUBSUB_ERR_BUFLEN];
643             CODE:
644 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_STR, errbuf);
645 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_from_fd: %s", errbuf);
646 1           SV *obj = newSViv(PTR2IV(h));
647 1           SV *ref = newRV_noinc(obj);
648 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
649 1           RETVAL = ref;
650             OUTPUT:
651             RETVAL
652              
653             IV
654             memfd(self)
655             SV *self
656             PREINIT:
657 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
658             CODE:
659 1 50         RETVAL = h->backing_fd;
660             OUTPUT:
661             RETVAL
662              
663             void
664             DESTROY(self)
665             SV *self
666             PREINIT:
667 48 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
668             CODE:
669 48           sv_setiv(SvRV(self), 0);
670 48           pubsub_destroy(h);
671              
672             SV *
673             publish(self, value)
674             SV *self
675             SV *value
676             PREINIT:
677 182 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
678             CODE:
679             STRLEN len;
680 182           bool utf8 = SvUTF8(value) ? true : false;
681             const char *str;
682 182 100         if (utf8)
683 2           str = SvPVutf8(value, len);
684             else
685 180           str = SvPV(value, len);
686 182           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
687 182 100         if (r == -1) croak("publish: message too long (%u > %u)", (unsigned)len, h->msg_size);
688 180           RETVAL = &PL_sv_yes;
689             OUTPUT:
690             RETVAL
691              
692             UV
693             publish_multi(self, ...)
694             SV *self
695             PREINIT:
696 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
697             CODE:
698 5           RETVAL = 0;
699 5           uint32_t count = items - 1;
700 5 100         if (count > 0) {
701 4           pubsub_mutex_lock(h->hdr);
702 16 100         for (uint32_t i = 0; i < count; i++) {
703 13           SV *val = ST(i + 1);
704             STRLEN len;
705 13           bool utf8 = SvUTF8(val) ? true : false;
706             const char *str;
707 13 50         if (utf8)
708 0           str = SvPVutf8(val, len);
709             else
710 13           str = SvPV(val, len);
711 13           int r = pubsub_str_publish_locked(h, str, (uint32_t)len, utf8);
712 13 100         if (r == -1) {
713 1           pubsub_mutex_unlock(h->hdr);
714 1           croak("publish_multi: message too long (%u > %u)", (unsigned)len, h->msg_size);
715             }
716 12           RETVAL++;
717             }
718 3           pubsub_mutex_unlock(h->hdr);
719 3           pubsub_wake_subscribers(h->hdr);
720             }
721             OUTPUT:
722             RETVAL
723              
724             void
725             publish_notify(self, value)
726             SV *self
727             SV *value
728             PREINIT:
729 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
730             CODE:
731             STRLEN len;
732 1           bool utf8 = SvUTF8(value) ? true : false;
733             const char *str;
734 1 50         if (utf8)
735 0           str = SvPVutf8(value, len);
736             else
737 1           str = SvPV(value, len);
738 1           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
739 1 50         if (r == -1) croak("publish_notify: message too long (%u > %u)", (unsigned)len, h->msg_size);
740 1           pubsub_notify(h);
741              
742             SV *
743             subscribe(self)
744             SV *self
745             PREINIT:
746 9 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
747             CODE:
748 9           PubSubSub *sub = pubsub_subscribe(h, 0);
749 9 50         if (!sub) croak("subscribe: out of memory");
750 9           sub->userdata = (void *)newSVsv(self);
751 9           SV *obj = newSViv(PTR2IV(sub));
752 9           SV *ref = newRV_noinc(obj);
753 9           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Str::Sub", GV_ADD));
754 9           RETVAL = ref;
755             OUTPUT:
756             RETVAL
757              
758             SV *
759             subscribe_all(self)
760             SV *self
761             PREINIT:
762 24 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
763             CODE:
764 24           PubSubSub *sub = pubsub_subscribe(h, 1);
765 24 50         if (!sub) croak("subscribe_all: out of memory");
766 24           sub->userdata = (void *)newSVsv(self);
767 24           SV *obj = newSViv(PTR2IV(sub));
768 24           SV *ref = newRV_noinc(obj);
769 24           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Str::Sub", GV_ADD));
770 24           RETVAL = ref;
771             OUTPUT:
772             RETVAL
773              
774             UV
775             capacity(self)
776             SV *self
777             PREINIT:
778 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
779             CODE:
780 1 50         RETVAL = h->capacity;
781             OUTPUT:
782             RETVAL
783              
784             UV
785             msg_size(self)
786             SV *self
787             PREINIT:
788 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
789             CODE:
790 2 50         RETVAL = h->msg_size;
791             OUTPUT:
792             RETVAL
793              
794             UV
795             write_pos(self)
796             SV *self
797             PREINIT:
798 6 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
799             CODE:
800 6 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
801             OUTPUT:
802             RETVAL
803              
804             SV *
805             path(self)
806             SV *self
807             PREINIT:
808 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
809             CODE:
810 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
811             OUTPUT:
812             RETVAL
813              
814             SV *
815             stats(self)
816             SV *self
817             PREINIT:
818 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
819             CODE:
820 2           HV *hv = newHV();
821 2           PubSubHeader *hdr = h->hdr;
822 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
823 2           hv_store(hv, "msg_size", 8, newSVuv(h->msg_size), 0);
824 2           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
825 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
826 2           hv_store(hv, "write_pos", 9,
827             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
828 2           hv_store(hv, "publish_ok", 10,
829             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
830 2           hv_store(hv, "recoveries", 10,
831             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
832 2           hv_store(hv, "sub_waiters", 11,
833             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
834 2           RETVAL = newRV_noinc((SV *)hv);
835             OUTPUT:
836             RETVAL
837              
838             void
839             clear(self)
840             SV *self
841             PREINIT:
842 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
843             CODE:
844 2           pubsub_clear(h);
845              
846             void
847             sync(self)
848             SV *self
849             PREINIT:
850 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
851             CODE:
852 2 50         if (pubsub_sync(h) != 0)
853 0           croak("msync: %s", strerror(errno));
854              
855             void
856             unlink(self_or_class, ...)
857             SV *self_or_class
858             CODE:
859             const char *path;
860 1 50         if (sv_isobject(self_or_class)) {
861 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
862 1 50         if (!h) croak("Attempted to use a destroyed object");
863 1           path = h->path;
864             } else {
865 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Str->unlink($path)");
866 0           path = SvPV_nolen(ST(1));
867             }
868 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
869 1 50         if (unlink(path) != 0)
870 0           croak("unlink(%s): %s", path, strerror(errno));
871              
872             IV
873             eventfd(self)
874             SV *self
875             PREINIT:
876 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
877             CODE:
878 4           RETVAL = pubsub_eventfd_create(h);
879 4 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
880             OUTPUT:
881             RETVAL
882              
883             void
884             eventfd_set(self, fd)
885             SV *self
886             int fd
887             PREINIT:
888 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
889             CODE:
890 0           pubsub_eventfd_set(h, fd);
891              
892             IV
893             fileno(self)
894             SV *self
895             PREINIT:
896 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
897             CODE:
898 0 0         RETVAL = h->notify_fd;
899             OUTPUT:
900             RETVAL
901              
902             void
903             eventfd_consume(self)
904             SV *self
905             PREINIT:
906 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
907             CODE:
908 2           pubsub_eventfd_consume(h);
909              
910             void
911             notify(self)
912             SV *self
913             PREINIT:
914 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
915             CODE:
916 2           pubsub_notify(h);
917              
918             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str::Sub
919              
920             void
921             DESTROY(self)
922             SV *self
923             PREINIT:
924 33 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
925             CODE:
926 33           sv_setiv(SvRV(self), 0);
927 33 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
928 33           pubsub_sub_destroy(sub);
929              
930             SV *
931             poll(self)
932             SV *self
933             PREINIT:
934 33 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
935             const char *str;
936             uint32_t len;
937             bool utf8;
938             CODE:
939 33           int r = pubsub_str_poll(sub, &str, &len, &utf8);
940 33 100         if (r == 1) {
941 31           RETVAL = newSVpvn(str, len);
942 31 100         if (utf8) SvUTF8_on(RETVAL);
943             } else {
944 2           RETVAL = &PL_sv_undef;
945             }
946             OUTPUT:
947             RETVAL
948              
949             void
950             poll_multi(self, count)
951             SV *self
952             UV count
953             PREINIT:
954 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
955             const char *str;
956             uint32_t len;
957             bool utf8;
958             PPCODE:
959 4 50         for (UV i = 0; i < count; i++) {
960 4           int r = pubsub_str_poll(sub, &str, &len, &utf8);
961 4 100         if (r != 1) break;
962 3           SV *sv = newSVpvn(str, len);
963 3 50         if (utf8) SvUTF8_on(sv);
964 3 50         mXPUSHs(sv);
965             }
966              
967             SV *
968             poll_wait(self, ...)
969             SV *self
970             PREINIT:
971 54 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
972 54           double timeout = -1;
973             const char *str;
974             uint32_t len;
975             bool utf8;
976             CODE:
977 54 50         if (items > 1) timeout = SvNV(ST(1));
978 54           int r = pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout);
979 54 100         if (r == 1) {
980 53           RETVAL = newSVpvn(str, len);
981 53 50         if (utf8) SvUTF8_on(RETVAL);
982             } else {
983 1           RETVAL = &PL_sv_undef;
984             }
985             OUTPUT:
986             RETVAL
987              
988             void
989             drain(self, ...)
990             SV *self
991             PREINIT:
992 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
993             const char *str;
994             uint32_t len;
995             bool utf8;
996             uint32_t max_count;
997             PPCODE:
998 4 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
999 20 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
1000 16           SV *sv = newSVpvn(str, len);
1001 16 50         if (utf8) SvUTF8_on(sv);
1002 16 50         mXPUSHs(sv);
1003             }
1004              
1005             void
1006             poll_wait_multi(self, count, ...)
1007             SV *self
1008             UV count
1009             PREINIT:
1010 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1011 2           double timeout = -1;
1012             const char *str;
1013             uint32_t len;
1014             bool utf8;
1015             PPCODE:
1016 2 100         if (count == 0) XSRETURN(0);
1017 1 50         if (items > 2) timeout = SvNV(ST(2));
1018 1 50         if (pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout) != 1) XSRETURN(0);
1019             {
1020 1           SV *sv = newSVpvn(str, len);
1021 1 50         if (utf8) SvUTF8_on(sv);
1022 1 50         mXPUSHs(sv);
1023             }
1024 3 100         for (UV i = 1; i < count; i++) {
1025 2 50         if (pubsub_str_poll(sub, &str, &len, &utf8) != 1) break;
1026 2           SV *sv = newSVpvn(str, len);
1027 2 50         if (utf8) SvUTF8_on(sv);
1028 2 50         mXPUSHs(sv);
1029             }
1030              
1031             UV
1032             lag(self)
1033             SV *self
1034             PREINIT:
1035 6 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1036             CODE:
1037 6           RETVAL = (UV)pubsub_lag(sub);
1038             OUTPUT:
1039             RETVAL
1040              
1041             UV
1042             overflow_count(self)
1043             SV *self
1044             PREINIT:
1045 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1046             CODE:
1047 1 50         RETVAL = (UV)sub->overflow_count;
1048             OUTPUT:
1049             RETVAL
1050              
1051             UV
1052             write_pos(self)
1053             SV *self
1054             PREINIT:
1055 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1056             CODE:
1057 1 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1058             OUTPUT:
1059             RETVAL
1060              
1061             bool
1062             has_overflow(self)
1063             SV *self
1064             PREINIT:
1065 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1066             CODE:
1067 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1068 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
1069             OUTPUT:
1070             RETVAL
1071              
1072             UV
1073             cursor(self, ...)
1074             SV *self
1075             PREINIT:
1076 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1077             CODE:
1078 4 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1079 4 50         RETVAL = (UV)sub->cursor;
1080             OUTPUT:
1081             RETVAL
1082              
1083             void
1084             reset(self)
1085             SV *self
1086             PREINIT:
1087 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1088             CODE:
1089 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1090              
1091             void
1092             reset_oldest(self)
1093             SV *self
1094             PREINIT:
1095 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1096             CODE:
1097 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1098 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1099              
1100             UV
1101             poll_cb(self, cb)
1102             SV *self
1103             SV *cb
1104             PREINIT:
1105 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1106             const char *str;
1107             uint32_t len;
1108             bool utf8;
1109             CODE:
1110 2           RETVAL = 0;
1111 6 100         while (pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
1112 4           dSP;
1113 4           SV *sv = newSVpvn(str, len);
1114 4 100         if (utf8) SvUTF8_on(sv);
1115 4 50         PUSHMARK(SP);
1116 4 50         mXPUSHs(sv);
1117 4           PUTBACK;
1118 4           call_sv(cb, G_DISCARD);
1119 4           RETVAL++;
1120             }
1121             OUTPUT:
1122             RETVAL
1123              
1124             void
1125             drain_notify(self, ...)
1126             SV *self
1127             PREINIT:
1128 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1129             const char *str;
1130             uint32_t len;
1131             bool utf8;
1132             uint32_t max_count;
1133             PPCODE:
1134 1           pubsub_sub_eventfd_consume(sub);
1135 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1136 2 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
1137 1           SV *sv = newSVpvn(str, len);
1138 1 50         if (utf8) SvUTF8_on(sv);
1139 1 50         mXPUSHs(sv);
1140             }
1141              
1142             void
1143             eventfd_set(self, fd)
1144             SV *self
1145             int fd
1146             PREINIT:
1147 0 0         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    0          
    0          
1148             CODE:
1149 0           sub->notify_fd = fd;
1150              
1151             IV
1152             fileno(self)
1153             SV *self
1154             PREINIT:
1155 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1156             CODE:
1157 1 50         RETVAL = sub->notify_fd;
1158             OUTPUT:
1159             RETVAL
1160              
1161             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32
1162              
1163             SV *
1164             new(class, path, capacity)
1165             const char *class
1166             SV *path
1167             UV capacity
1168             PREINIT:
1169             char errbuf[PUBSUB_ERR_BUFLEN];
1170             CODE:
1171 11 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1172 11           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1173 11 100         if (!h) croak("Data::PubSub::Shared::Int32->new: %s", errbuf);
1174 10           SV *obj = newSViv(PTR2IV(h));
1175 10           SV *ref = newRV_noinc(obj);
1176 10           sv_bless(ref, gv_stashpv(class, GV_ADD));
1177 10           RETVAL = ref;
1178             OUTPUT:
1179             RETVAL
1180              
1181             SV *
1182             new_memfd(class, name, capacity)
1183             const char *class
1184             const char *name
1185             UV capacity
1186             PREINIT:
1187             char errbuf[PUBSUB_ERR_BUFLEN];
1188             CODE:
1189 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1190 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_memfd: %s", errbuf);
1191 1           SV *obj = newSViv(PTR2IV(h));
1192 1           SV *ref = newRV_noinc(obj);
1193 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1194 1           RETVAL = ref;
1195             OUTPUT:
1196             RETVAL
1197              
1198             SV *
1199             new_from_fd(class, fd)
1200             const char *class
1201             int fd
1202             PREINIT:
1203             char errbuf[PUBSUB_ERR_BUFLEN];
1204             CODE:
1205 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT32, errbuf);
1206 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_from_fd: %s", errbuf);
1207 1           SV *obj = newSViv(PTR2IV(h));
1208 1           SV *ref = newRV_noinc(obj);
1209 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1210 1           RETVAL = ref;
1211             OUTPUT:
1212             RETVAL
1213              
1214             IV
1215             memfd(self)
1216             SV *self
1217             PREINIT:
1218 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1219             CODE:
1220 1 50         RETVAL = h->backing_fd;
1221             OUTPUT:
1222             RETVAL
1223              
1224             void
1225             DESTROY(self)
1226             SV *self
1227             PREINIT:
1228 12 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1229             CODE:
1230 12           sv_setiv(SvRV(self), 0);
1231 12           pubsub_destroy(h);
1232              
1233             bool
1234             publish(self, value)
1235             SV *self
1236             IV value
1237             PREINIT:
1238 49 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1239             CODE:
1240 49 50         RETVAL = pubsub_int32_publish(h, (int32_t)value);
1241             OUTPUT:
1242             RETVAL
1243              
1244             UV
1245             publish_multi(self, ...)
1246             SV *self
1247             PREINIT:
1248 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1249             CODE:
1250 1           uint32_t count = items - 1;
1251 1 50         if (count == 0) { RETVAL = 0; }
1252             else {
1253 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1254 1           int32_t *vals = (int32_t *)alloca(count * sizeof(int32_t));
1255 4 100         for (uint32_t i = 0; i < count; i++)
1256 3           vals[i] = (int32_t)SvIV(ST(i + 1));
1257 1           RETVAL = pubsub_int32_publish_multi(h, vals, count);
1258             }
1259             OUTPUT:
1260             RETVAL
1261              
1262             void
1263             publish_notify(self, value)
1264             SV *self
1265             IV value
1266             PREINIT:
1267 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1268             CODE:
1269 0           pubsub_int32_publish(h, (int32_t)value);
1270 0           pubsub_notify(h);
1271              
1272             SV *
1273             subscribe(self)
1274             SV *self
1275             PREINIT:
1276 3 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1277             CODE:
1278 3           PubSubSub *sub = pubsub_subscribe(h, 0);
1279 3 50         if (!sub) croak("subscribe: out of memory");
1280 3           sub->userdata = (void *)newSVsv(self);
1281 3           SV *obj = newSViv(PTR2IV(sub));
1282 3           SV *ref = newRV_noinc(obj);
1283 3           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int32::Sub", GV_ADD));
1284 3           RETVAL = ref;
1285             OUTPUT:
1286             RETVAL
1287              
1288             SV *
1289             subscribe_all(self)
1290             SV *self
1291             PREINIT:
1292 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1293             CODE:
1294 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1295 7 50         if (!sub) croak("subscribe_all: out of memory");
1296 7           sub->userdata = (void *)newSVsv(self);
1297 7           SV *obj = newSViv(PTR2IV(sub));
1298 7           SV *ref = newRV_noinc(obj);
1299 7           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int32::Sub", GV_ADD));
1300 7           RETVAL = ref;
1301             OUTPUT:
1302             RETVAL
1303              
1304             UV
1305             capacity(self)
1306             SV *self
1307             PREINIT:
1308 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1309             CODE:
1310 1 50         RETVAL = h->capacity;
1311             OUTPUT:
1312             RETVAL
1313              
1314             UV
1315             write_pos(self)
1316             SV *self
1317             PREINIT:
1318 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1319             CODE:
1320 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1321             OUTPUT:
1322             RETVAL
1323              
1324             SV *
1325             path(self)
1326             SV *self
1327             PREINIT:
1328 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1329             CODE:
1330 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1331             OUTPUT:
1332             RETVAL
1333              
1334             SV *
1335             stats(self)
1336             SV *self
1337             PREINIT:
1338 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1339             CODE:
1340 2           HV *hv = newHV();
1341 2           PubSubHeader *hdr = h->hdr;
1342 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1343 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1344 2           hv_store(hv, "write_pos", 9,
1345             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1346 2           hv_store(hv, "publish_ok", 10,
1347             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1348 2           hv_store(hv, "recoveries", 10,
1349             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1350 2           hv_store(hv, "sub_waiters", 11,
1351             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1352 2           RETVAL = newRV_noinc((SV *)hv);
1353             OUTPUT:
1354             RETVAL
1355              
1356             void
1357             clear(self)
1358             SV *self
1359             PREINIT:
1360 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1361             CODE:
1362 1           pubsub_clear(h);
1363              
1364             void
1365             sync(self)
1366             SV *self
1367             PREINIT:
1368 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1369             CODE:
1370 1 50         if (pubsub_sync(h) != 0)
1371 0           croak("msync: %s", strerror(errno));
1372              
1373             void
1374             unlink(self_or_class, ...)
1375             SV *self_or_class
1376             CODE:
1377             const char *path;
1378 0 0         if (sv_isobject(self_or_class)) {
1379 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1380 0 0         if (!h) croak("Attempted to use a destroyed object");
1381 0           path = h->path;
1382             } else {
1383 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int32->unlink($path)");
1384 0           path = SvPV_nolen(ST(1));
1385             }
1386 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1387 0 0         if (unlink(path) != 0)
1388 0           croak("unlink(%s): %s", path, strerror(errno));
1389              
1390             IV
1391             eventfd(self)
1392             SV *self
1393             PREINIT:
1394 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1395             CODE:
1396 0           RETVAL = pubsub_eventfd_create(h);
1397 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1398             OUTPUT:
1399             RETVAL
1400              
1401             void
1402             eventfd_set(self, fd)
1403             SV *self
1404             int fd
1405             PREINIT:
1406 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1407             CODE:
1408 0           pubsub_eventfd_set(h, fd);
1409              
1410             IV
1411             fileno(self)
1412             SV *self
1413             PREINIT:
1414 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1415             CODE:
1416 0 0         RETVAL = h->notify_fd;
1417             OUTPUT:
1418             RETVAL
1419              
1420             void
1421             eventfd_consume(self)
1422             SV *self
1423             PREINIT:
1424 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1425             CODE:
1426 0           pubsub_eventfd_consume(h);
1427              
1428             void
1429             notify(self)
1430             SV *self
1431             PREINIT:
1432 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1433             CODE:
1434 0           pubsub_notify(h);
1435              
1436             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32::Sub
1437              
1438             void
1439             DESTROY(self)
1440             SV *self
1441             PREINIT:
1442 10 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1443             CODE:
1444 10           sv_setiv(SvRV(self), 0);
1445 10 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1446 10           pubsub_sub_destroy(sub);
1447              
1448             SV *
1449             poll(self)
1450             SV *self
1451             PREINIT:
1452 8 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1453             int32_t value;
1454             CODE:
1455 8           int r = pubsub_int32_poll(sub, &value);
1456 8 100         if (r == 1)
1457 7           RETVAL = newSViv((IV)value);
1458             else
1459 1           RETVAL = &PL_sv_undef;
1460             OUTPUT:
1461             RETVAL
1462              
1463             void
1464             poll_multi(self, count)
1465             SV *self
1466             UV count
1467             PREINIT:
1468 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1469             int32_t value;
1470             PPCODE:
1471 7 50         for (UV i = 0; i < count; i++) {
1472 7           int r = pubsub_int32_poll(sub, &value);
1473 7 100         if (r != 1) break;
1474 6 50         mXPUSHi((IV)value);
1475             }
1476              
1477             SV *
1478             poll_wait(self, ...)
1479             SV *self
1480             PREINIT:
1481 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1482 1           double timeout = -1;
1483             int32_t value;
1484             CODE:
1485 1 50         if (items > 1) timeout = SvNV(ST(1));
1486 1           int r = pubsub_int32_poll_wait(sub, &value, timeout);
1487 1 50         if (r == 1)
1488 0           RETVAL = newSViv((IV)value);
1489             else
1490 1           RETVAL = &PL_sv_undef;
1491             OUTPUT:
1492             RETVAL
1493              
1494             void
1495             drain(self, ...)
1496             SV *self
1497             PREINIT:
1498 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1499             int32_t value;
1500             uint32_t max_count;
1501             PPCODE:
1502 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1503 9 50         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    100          
1504 8 50         mXPUSHi((IV)value);
1505              
1506             void
1507             poll_wait_multi(self, count, ...)
1508             SV *self
1509             UV count
1510             PREINIT:
1511 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1512 1           double timeout = -1;
1513             int32_t value;
1514             PPCODE:
1515 1 50         if (count == 0) XSRETURN(0);
1516 1 50         if (items > 2) timeout = SvNV(ST(2));
1517 1 50         if (!pubsub_int32_poll_wait(sub, &value, timeout)) XSRETURN(0);
1518 1 50         mXPUSHi((IV)value);
1519 3 100         for (UV i = 1; i < count; i++) {
1520 2 50         if (!pubsub_int32_poll(sub, &value)) break;
1521 2 50         mXPUSHi((IV)value);
1522             }
1523              
1524             UV
1525             lag(self)
1526             SV *self
1527             PREINIT:
1528 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1529             CODE:
1530 2           RETVAL = (UV)pubsub_lag(sub);
1531             OUTPUT:
1532             RETVAL
1533              
1534             UV
1535             overflow_count(self)
1536             SV *self
1537             PREINIT:
1538 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1539             CODE:
1540 1 50         RETVAL = (UV)sub->overflow_count;
1541             OUTPUT:
1542             RETVAL
1543              
1544             UV
1545             write_pos(self)
1546             SV *self
1547             PREINIT:
1548 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1549             CODE:
1550 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1551             OUTPUT:
1552             RETVAL
1553              
1554             bool
1555             has_overflow(self)
1556             SV *self
1557             PREINIT:
1558 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1559             CODE:
1560 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1561 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
1562             OUTPUT:
1563             RETVAL
1564              
1565             UV
1566             cursor(self, ...)
1567             SV *self
1568             PREINIT:
1569 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1570             CODE:
1571 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1572 1 50         RETVAL = (UV)sub->cursor;
1573             OUTPUT:
1574             RETVAL
1575              
1576             void
1577             reset(self)
1578             SV *self
1579             PREINIT:
1580 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1581             CODE:
1582 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1583              
1584             void
1585             reset_oldest(self)
1586             SV *self
1587             PREINIT:
1588 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1589             CODE:
1590 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1591 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1592              
1593             UV
1594             poll_cb(self, cb)
1595             SV *self
1596             SV *cb
1597             PREINIT:
1598 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1599             int32_t value;
1600             CODE:
1601 1           RETVAL = 0;
1602 6 100         while (pubsub_int32_poll(sub, &value)) {
1603 5           dSP;
1604 5 50         PUSHMARK(SP);
1605 5 50         mXPUSHi((IV)value);
1606 5           PUTBACK;
1607 5           call_sv(cb, G_DISCARD);
1608 5           RETVAL++;
1609             }
1610             OUTPUT:
1611             RETVAL
1612              
1613             void
1614             drain_notify(self, ...)
1615             SV *self
1616             PREINIT:
1617 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1618             int32_t value;
1619             uint32_t max_count;
1620             PPCODE:
1621 0           pubsub_sub_eventfd_consume(sub);
1622 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1623 0 0         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    0          
1624 0 0         mXPUSHi((IV)value);
1625              
1626             void
1627             eventfd_set(self, fd)
1628             SV *self
1629             int fd
1630             PREINIT:
1631 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1632             CODE:
1633 0           sub->notify_fd = fd;
1634              
1635             IV
1636             fileno(self)
1637             SV *self
1638             PREINIT:
1639 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1640             CODE:
1641 0 0         RETVAL = sub->notify_fd;
1642             OUTPUT:
1643             RETVAL
1644              
1645             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16
1646              
1647             SV *
1648             new(class, path, capacity)
1649             const char *class
1650             SV *path
1651             UV capacity
1652             PREINIT:
1653             char errbuf[PUBSUB_ERR_BUFLEN];
1654             CODE:
1655 10 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1656 10           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1657 10 100         if (!h) croak("Data::PubSub::Shared::Int16->new: %s", errbuf);
1658 9           SV *obj = newSViv(PTR2IV(h));
1659 9           SV *ref = newRV_noinc(obj);
1660 9           sv_bless(ref, gv_stashpv(class, GV_ADD));
1661 9           RETVAL = ref;
1662             OUTPUT:
1663             RETVAL
1664              
1665             SV *
1666             new_memfd(class, name, capacity)
1667             const char *class
1668             const char *name
1669             UV capacity
1670             PREINIT:
1671             char errbuf[PUBSUB_ERR_BUFLEN];
1672             CODE:
1673 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1674 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_memfd: %s", errbuf);
1675 1           SV *obj = newSViv(PTR2IV(h));
1676 1           SV *ref = newRV_noinc(obj);
1677 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1678 1           RETVAL = ref;
1679             OUTPUT:
1680             RETVAL
1681              
1682             SV *
1683             new_from_fd(class, fd)
1684             const char *class
1685             int fd
1686             PREINIT:
1687             char errbuf[PUBSUB_ERR_BUFLEN];
1688             CODE:
1689 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT16, errbuf);
1690 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_from_fd: %s", errbuf);
1691 1           SV *obj = newSViv(PTR2IV(h));
1692 1           SV *ref = newRV_noinc(obj);
1693 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1694 1           RETVAL = ref;
1695             OUTPUT:
1696             RETVAL
1697              
1698             IV
1699             memfd(self)
1700             SV *self
1701             PREINIT:
1702 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1703             CODE:
1704 1 50         RETVAL = h->backing_fd;
1705             OUTPUT:
1706             RETVAL
1707              
1708             void
1709             DESTROY(self)
1710             SV *self
1711             PREINIT:
1712 11 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1713             CODE:
1714 11           sv_setiv(SvRV(self), 0);
1715 11           pubsub_destroy(h);
1716              
1717             bool
1718             publish(self, value)
1719             SV *self
1720             IV value
1721             PREINIT:
1722 48 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1723             CODE:
1724 48 50         RETVAL = pubsub_int16_publish(h, (int16_t)value);
1725             OUTPUT:
1726             RETVAL
1727              
1728             UV
1729             publish_multi(self, ...)
1730             SV *self
1731             PREINIT:
1732 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1733             CODE:
1734 1           uint32_t count = items - 1;
1735 1 50         if (count == 0) { RETVAL = 0; }
1736             else {
1737 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1738 1           int16_t *vals = (int16_t *)alloca(count * sizeof(int16_t));
1739 4 100         for (uint32_t i = 0; i < count; i++)
1740 3           vals[i] = (int16_t)SvIV(ST(i + 1));
1741 1           RETVAL = pubsub_int16_publish_multi(h, vals, count);
1742             }
1743             OUTPUT:
1744             RETVAL
1745              
1746             void
1747             publish_notify(self, value)
1748             SV *self
1749             IV value
1750             PREINIT:
1751 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1752             CODE:
1753 0           pubsub_int16_publish(h, (int16_t)value);
1754 0           pubsub_notify(h);
1755              
1756             SV *
1757             subscribe(self)
1758             SV *self
1759             PREINIT:
1760 3 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1761             CODE:
1762 3           PubSubSub *sub = pubsub_subscribe(h, 0);
1763 3 50         if (!sub) croak("subscribe: out of memory");
1764 3           sub->userdata = (void *)newSVsv(self);
1765 3           SV *obj = newSViv(PTR2IV(sub));
1766 3           SV *ref = newRV_noinc(obj);
1767 3           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int16::Sub", GV_ADD));
1768 3           RETVAL = ref;
1769             OUTPUT:
1770             RETVAL
1771              
1772             SV *
1773             subscribe_all(self)
1774             SV *self
1775             PREINIT:
1776 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1777             CODE:
1778 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1779 7 50         if (!sub) croak("subscribe_all: out of memory");
1780 7           sub->userdata = (void *)newSVsv(self);
1781 7           SV *obj = newSViv(PTR2IV(sub));
1782 7           SV *ref = newRV_noinc(obj);
1783 7           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int16::Sub", GV_ADD));
1784 7           RETVAL = ref;
1785             OUTPUT:
1786             RETVAL
1787              
1788             UV
1789             capacity(self)
1790             SV *self
1791             PREINIT:
1792 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1793             CODE:
1794 1 50         RETVAL = h->capacity;
1795             OUTPUT:
1796             RETVAL
1797              
1798             UV
1799             write_pos(self)
1800             SV *self
1801             PREINIT:
1802 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1803             CODE:
1804 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1805             OUTPUT:
1806             RETVAL
1807              
1808             SV *
1809             path(self)
1810             SV *self
1811             PREINIT:
1812 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1813             CODE:
1814 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1815             OUTPUT:
1816             RETVAL
1817              
1818             SV *
1819             stats(self)
1820             SV *self
1821             PREINIT:
1822 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1823             CODE:
1824 2           HV *hv = newHV();
1825 2           PubSubHeader *hdr = h->hdr;
1826 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1827 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1828 2           hv_store(hv, "write_pos", 9,
1829             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1830 2           hv_store(hv, "publish_ok", 10,
1831             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1832 2           hv_store(hv, "recoveries", 10,
1833             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1834 2           hv_store(hv, "sub_waiters", 11,
1835             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1836 2           RETVAL = newRV_noinc((SV *)hv);
1837             OUTPUT:
1838             RETVAL
1839              
1840             void
1841             clear(self)
1842             SV *self
1843             PREINIT:
1844 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1845             CODE:
1846 1           pubsub_clear(h);
1847              
1848             void
1849             sync(self)
1850             SV *self
1851             PREINIT:
1852 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1853             CODE:
1854 0 0         if (pubsub_sync(h) != 0)
1855 0           croak("msync: %s", strerror(errno));
1856              
1857             void
1858             unlink(self_or_class, ...)
1859             SV *self_or_class
1860             CODE:
1861             const char *path;
1862 0 0         if (sv_isobject(self_or_class)) {
1863 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1864 0 0         if (!h) croak("Attempted to use a destroyed object");
1865 0           path = h->path;
1866             } else {
1867 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int16->unlink($path)");
1868 0           path = SvPV_nolen(ST(1));
1869             }
1870 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1871 0 0         if (unlink(path) != 0)
1872 0           croak("unlink(%s): %s", path, strerror(errno));
1873              
1874             IV
1875             eventfd(self)
1876             SV *self
1877             PREINIT:
1878 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1879             CODE:
1880 0           RETVAL = pubsub_eventfd_create(h);
1881 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1882             OUTPUT:
1883             RETVAL
1884              
1885             void
1886             eventfd_set(self, fd)
1887             SV *self
1888             int fd
1889             PREINIT:
1890 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1891             CODE:
1892 0           pubsub_eventfd_set(h, fd);
1893              
1894             IV
1895             fileno(self)
1896             SV *self
1897             PREINIT:
1898 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1899             CODE:
1900 0 0         RETVAL = h->notify_fd;
1901             OUTPUT:
1902             RETVAL
1903              
1904             void
1905             eventfd_consume(self)
1906             SV *self
1907             PREINIT:
1908 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1909             CODE:
1910 0           pubsub_eventfd_consume(h);
1911              
1912             void
1913             notify(self)
1914             SV *self
1915             PREINIT:
1916 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1917             CODE:
1918 0           pubsub_notify(h);
1919              
1920             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16::Sub
1921              
1922             void
1923             DESTROY(self)
1924             SV *self
1925             PREINIT:
1926 10 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1927             CODE:
1928 10           sv_setiv(SvRV(self), 0);
1929 10 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1930 10           pubsub_sub_destroy(sub);
1931              
1932             SV *
1933             poll(self)
1934             SV *self
1935             PREINIT:
1936 8 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1937             int16_t value;
1938             CODE:
1939 8           int r = pubsub_int16_poll(sub, &value);
1940 8 100         if (r == 1)
1941 7           RETVAL = newSViv((IV)value);
1942             else
1943 1           RETVAL = &PL_sv_undef;
1944             OUTPUT:
1945             RETVAL
1946              
1947             void
1948             poll_multi(self, count)
1949             SV *self
1950             UV count
1951             PREINIT:
1952 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1953             int16_t value;
1954             PPCODE:
1955 7 50         for (UV i = 0; i < count; i++) {
1956 7           int r = pubsub_int16_poll(sub, &value);
1957 7 100         if (r != 1) break;
1958 6 50         mXPUSHi((IV)value);
1959             }
1960              
1961             SV *
1962             poll_wait(self, ...)
1963             SV *self
1964             PREINIT:
1965 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1966 1           double timeout = -1;
1967             int16_t value;
1968             CODE:
1969 1 50         if (items > 1) timeout = SvNV(ST(1));
1970 1           int r = pubsub_int16_poll_wait(sub, &value, timeout);
1971 1 50         if (r == 1)
1972 0           RETVAL = newSViv((IV)value);
1973             else
1974 1           RETVAL = &PL_sv_undef;
1975             OUTPUT:
1976             RETVAL
1977              
1978             void
1979             drain(self, ...)
1980             SV *self
1981             PREINIT:
1982 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1983             int16_t value;
1984             uint32_t max_count;
1985             PPCODE:
1986 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1987 9 50         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    100          
1988 8 50         mXPUSHi((IV)value);
1989              
1990             void
1991             poll_wait_multi(self, count, ...)
1992             SV *self
1993             UV count
1994             PREINIT:
1995 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1996 1           double timeout = -1;
1997             int16_t value;
1998             PPCODE:
1999 1 50         if (count == 0) XSRETURN(0);
2000 1 50         if (items > 2) timeout = SvNV(ST(2));
2001 1 50         if (!pubsub_int16_poll_wait(sub, &value, timeout)) XSRETURN(0);
2002 1 50         mXPUSHi((IV)value);
2003 3 100         for (UV i = 1; i < count; i++) {
2004 2 50         if (!pubsub_int16_poll(sub, &value)) break;
2005 2 50         mXPUSHi((IV)value);
2006             }
2007              
2008             UV
2009             lag(self)
2010             SV *self
2011             PREINIT:
2012 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2013             CODE:
2014 2           RETVAL = (UV)pubsub_lag(sub);
2015             OUTPUT:
2016             RETVAL
2017              
2018             UV
2019             overflow_count(self)
2020             SV *self
2021             PREINIT:
2022 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2023             CODE:
2024 1 50         RETVAL = (UV)sub->overflow_count;
2025             OUTPUT:
2026             RETVAL
2027              
2028             UV
2029             write_pos(self)
2030             SV *self
2031             PREINIT:
2032 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2033             CODE:
2034 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2035             OUTPUT:
2036             RETVAL
2037              
2038             bool
2039             has_overflow(self)
2040             SV *self
2041             PREINIT:
2042 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2043             CODE:
2044 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2045 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
2046             OUTPUT:
2047             RETVAL
2048              
2049             UV
2050             cursor(self, ...)
2051             SV *self
2052             PREINIT:
2053 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2054             CODE:
2055 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
2056 1 50         RETVAL = (UV)sub->cursor;
2057             OUTPUT:
2058             RETVAL
2059              
2060             void
2061             reset(self)
2062             SV *self
2063             PREINIT:
2064 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2065             CODE:
2066 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2067              
2068             void
2069             reset_oldest(self)
2070             SV *self
2071             PREINIT:
2072 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2073             CODE:
2074 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2075 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
2076              
2077             UV
2078             poll_cb(self, cb)
2079             SV *self
2080             SV *cb
2081             PREINIT:
2082 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2083             int16_t value;
2084             CODE:
2085 1           RETVAL = 0;
2086 6 100         while (pubsub_int16_poll(sub, &value)) {
2087 5           dSP;
2088 5 50         PUSHMARK(SP);
2089 5 50         mXPUSHi((IV)value);
2090 5           PUTBACK;
2091 5           call_sv(cb, G_DISCARD);
2092 5           RETVAL++;
2093             }
2094             OUTPUT:
2095             RETVAL
2096              
2097             void
2098             drain_notify(self, ...)
2099             SV *self
2100             PREINIT:
2101 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2102             int16_t value;
2103             uint32_t max_count;
2104             PPCODE:
2105 0           pubsub_sub_eventfd_consume(sub);
2106 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
2107 0 0         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    0          
2108 0 0         mXPUSHi((IV)value);
2109              
2110             void
2111             eventfd_set(self, fd)
2112             SV *self
2113             int fd
2114             PREINIT:
2115 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2116             CODE:
2117 0           sub->notify_fd = fd;
2118              
2119             IV
2120             fileno(self)
2121             SV *self
2122             PREINIT:
2123 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2124             CODE:
2125 0 0         RETVAL = sub->notify_fd;
2126             OUTPUT:
2127             RETVAL