File Coverage

Shared.xs
Criterion Covered Total %
statement 625 721 86.6
branch 606 1324 45.7
condition n/a
subroutine n/a
pod n/a
total 1231 2045 60.2


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "pubsub.h"
8              
9             #include "XSParseKeyword.h"
10              
11 16           static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
12             (void)nargs;
13 16           const char *func = (const char *)hookdata;
14 16           OP *q_op = args[0]->op;
15 16           OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
16 16           *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
17             op_append_elem(OP_LIST, q_op, cvref));
18 16           return KEYWORD_PLUGIN_EXPR;
19             }
20              
21 8           static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
22             (void)nargs;
23 8           const char *func = (const char *)hookdata;
24 8           OP *q_op = args[0]->op;
25 8           OP *val_op = args[1]->op;
26 8           OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
27 8           OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
28 8           arglist = op_append_elem(OP_LIST, arglist, cvref);
29 8           *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
30 8           return KEYWORD_PLUGIN_EXPR;
31             }
32              
33             static const struct XSParseKeywordPieceType pieces_1expr[] = {
34             XPK_TERMEXPR, {0}
35             };
36              
37             static const struct XSParseKeywordPieceType pieces_2expr[] = {
38             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
39             };
40              
41             #define DEFINE_PS_KW(variant, PKG, kw, nargs, builder) \
42             static const struct XSParseKeywordHooks hooks_ps_##variant##_##kw = { \
43             .flags = XPK_FLAG_EXPR, \
44             .permit_hintkey = "Data::PubSub::Shared::" PKG "/ps_" #variant "_" #kw, \
45             .pieces = pieces_##nargs##expr, \
46             .build = builder, \
47             };
48              
49             /* Publisher keywords */
50             DEFINE_PS_KW(int, "Int", publish, 2, build_kw_2arg)
51              
52             /* Subscriber keywords */
53             DEFINE_PS_KW(int, "Int", poll, 1, build_kw_1arg)
54             DEFINE_PS_KW(int, "Int", lag, 1, build_kw_1arg)
55              
56             /* Str publisher keywords */
57             DEFINE_PS_KW(str, "Str", publish, 2, build_kw_2arg)
58              
59             /* Int32 keywords */
60             DEFINE_PS_KW(int32, "Int32", publish, 2, build_kw_2arg)
61             DEFINE_PS_KW(int32, "Int32", poll, 1, build_kw_1arg)
62             DEFINE_PS_KW(int32, "Int32", lag, 1, build_kw_1arg)
63              
64             /* Int16 keywords */
65             DEFINE_PS_KW(int16, "Int16", publish, 2, build_kw_2arg)
66             DEFINE_PS_KW(int16, "Int16", poll, 1, build_kw_1arg)
67             DEFINE_PS_KW(int16, "Int16", lag, 1, build_kw_1arg)
68              
69             /* Str subscriber keywords */
70             DEFINE_PS_KW(str, "Str", poll, 1, build_kw_1arg)
71             DEFINE_PS_KW(str, "Str", lag, 1, build_kw_1arg)
72              
73             #define REGISTER_PS_KW(variant, kw, func_name) \
74             register_xs_parse_keyword("ps_" #variant "_" #kw, \
75             &hooks_ps_##variant##_##kw, (void*)func_name)
76              
77             #define EXTRACT_HANDLE(classname, sv) \
78             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
79             croak("Expected a %s object", classname); \
80             PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(sv))); \
81             if (!h) croak("Attempted to use a destroyed %s object", classname)
82              
83             #define EXTRACT_SUB(classname, sv) \
84             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
85             croak("Expected a %s object", classname); \
86             PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(sv))); \
87             if (!sub) croak("Attempted to use a destroyed %s object", classname)
88              
89             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int
90              
91             PROTOTYPES: DISABLE
92              
93             BOOT:
94 20           boot_xs_parse_keyword(0.40);
95 20           REGISTER_PS_KW(int, publish, "Data::PubSub::Shared::Int::publish");
96 20           REGISTER_PS_KW(int, poll, "Data::PubSub::Shared::Int::Sub::poll");
97 20           REGISTER_PS_KW(int, lag, "Data::PubSub::Shared::Int::Sub::lag");
98 20           REGISTER_PS_KW(int32, publish, "Data::PubSub::Shared::Int32::publish");
99 20           REGISTER_PS_KW(int32, poll, "Data::PubSub::Shared::Int32::Sub::poll");
100 20           REGISTER_PS_KW(int32, lag, "Data::PubSub::Shared::Int32::Sub::lag");
101 20           REGISTER_PS_KW(int16, publish, "Data::PubSub::Shared::Int16::publish");
102 20           REGISTER_PS_KW(int16, poll, "Data::PubSub::Shared::Int16::Sub::poll");
103 20           REGISTER_PS_KW(int16, lag, "Data::PubSub::Shared::Int16::Sub::lag");
104 20           REGISTER_PS_KW(str, publish, "Data::PubSub::Shared::Str::publish");
105 20           REGISTER_PS_KW(str, poll, "Data::PubSub::Shared::Str::Sub::poll");
106 20           REGISTER_PS_KW(str, lag, "Data::PubSub::Shared::Str::Sub::lag");
107              
108             SV *
109             new(class, path, capacity)
110             const char *class
111             SV *path
112             UV capacity
113             PREINIT:
114             char errbuf[PUBSUB_ERR_BUFLEN];
115             CODE:
116 66 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
117 66           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
118 66 100         if (!h) croak("Data::PubSub::Shared::Int->new: %s", errbuf);
119 65           SV *obj = newSViv(PTR2IV(h));
120 65           SV *ref = newRV_noinc(obj);
121 65           sv_bless(ref, gv_stashpv(class, GV_ADD));
122 65           RETVAL = ref;
123             OUTPUT:
124             RETVAL
125              
126             SV *
127             new_memfd(class, name, capacity)
128             const char *class
129             const char *name
130             UV capacity
131             PREINIT:
132             char errbuf[PUBSUB_ERR_BUFLEN];
133             CODE:
134 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
135 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_memfd: %s", errbuf);
136 1           SV *obj = newSViv(PTR2IV(h));
137 1           SV *ref = newRV_noinc(obj);
138 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
139 1           RETVAL = ref;
140             OUTPUT:
141             RETVAL
142              
143             SV *
144             new_from_fd(class, fd)
145             const char *class
146             int fd
147             PREINIT:
148             char errbuf[PUBSUB_ERR_BUFLEN];
149             CODE:
150 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT, errbuf);
151 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_from_fd: %s", errbuf);
152 1           SV *obj = newSViv(PTR2IV(h));
153 1           SV *ref = newRV_noinc(obj);
154 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
155 1           RETVAL = ref;
156             OUTPUT:
157             RETVAL
158              
159             IV
160             memfd(self)
161             SV *self
162             PREINIT:
163 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
164             CODE:
165 1 50         RETVAL = h->backing_fd;
166             OUTPUT:
167             RETVAL
168              
169             void
170             DESTROY(self)
171             SV *self
172             PREINIT:
173 67 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
174             CODE:
175 67           sv_setiv(SvRV(self), 0);
176 67           pubsub_destroy(h);
177              
178             bool
179             publish(self, value)
180             SV *self
181             IV value
182             PREINIT:
183 468 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
184             CODE:
185 468 50         RETVAL = pubsub_int_publish(h, (int64_t)value);
186             OUTPUT:
187             RETVAL
188              
189             UV
190             publish_multi(self, ...)
191             SV *self
192             PREINIT:
193 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
194             CODE:
195 5           uint32_t count = items - 1;
196 5 100         if (count == 0) { RETVAL = 0; }
197             else {
198 4 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
199 4           int64_t *vals = (int64_t *)alloca(count * sizeof(int64_t));
200 212 100         for (uint32_t i = 0; i < count; i++)
201 208           vals[i] = (int64_t)SvIV(ST(i + 1));
202 4           RETVAL = pubsub_int_publish_multi(h, vals, count);
203             }
204             OUTPUT:
205             RETVAL
206              
207             void
208             publish_notify(self, value)
209             SV *self
210             IV value
211             PREINIT:
212 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
213             CODE:
214 1           pubsub_int_publish(h, (int64_t)value);
215 1           pubsub_notify(h);
216              
217             SV *
218             subscribe(self)
219             SV *self
220             PREINIT:
221 21 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
222             CODE:
223 21           PubSubSub *sub = pubsub_subscribe(h, 0);
224 21 50         if (!sub) croak("subscribe: out of memory");
225 21           sub->userdata = (void *)newSVsv(self);
226 21           SV *obj = newSViv(PTR2IV(sub));
227 21           SV *ref = newRV_noinc(obj);
228 21           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int::Sub", GV_ADD));
229 21           RETVAL = ref;
230             OUTPUT:
231             RETVAL
232              
233             SV *
234             subscribe_all(self)
235             SV *self
236             PREINIT:
237 26 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
238             CODE:
239 26           PubSubSub *sub = pubsub_subscribe(h, 1);
240 26 50         if (!sub) croak("subscribe_all: out of memory");
241 26           sub->userdata = (void *)newSVsv(self);
242 26           SV *obj = newSViv(PTR2IV(sub));
243 26           SV *ref = newRV_noinc(obj);
244 26           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int::Sub", GV_ADD));
245 26           RETVAL = ref;
246             OUTPUT:
247             RETVAL
248              
249             UV
250             capacity(self)
251             SV *self
252             PREINIT:
253 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
254             CODE:
255 1 50         RETVAL = h->capacity;
256             OUTPUT:
257             RETVAL
258              
259             UV
260             write_pos(self)
261             SV *self
262             PREINIT:
263 13 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
264             CODE:
265 13 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
266             OUTPUT:
267             RETVAL
268              
269             SV *
270             path(self)
271             SV *self
272             PREINIT:
273 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
274             CODE:
275 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
276             OUTPUT:
277             RETVAL
278              
279             SV *
280             stats(self)
281             SV *self
282             PREINIT:
283 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
284             CODE:
285 5           HV *hv = newHV();
286 5           PubSubHeader *hdr = h->hdr;
287 5           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
288 5           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
289 5           hv_store(hv, "write_pos", 9,
290             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
291 5           hv_store(hv, "publish_ok", 10,
292             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
293 5           hv_store(hv, "recoveries", 10,
294             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
295 5           hv_store(hv, "sub_waiters", 11,
296             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
297 5           RETVAL = newRV_noinc((SV *)hv);
298             OUTPUT:
299             RETVAL
300              
301             void
302             clear(self)
303             SV *self
304             PREINIT:
305 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
306             CODE:
307 4           pubsub_clear(h);
308              
309             void
310             sync(self)
311             SV *self
312             PREINIT:
313 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
314             CODE:
315 4 50         if (pubsub_sync(h) != 0)
316 0           croak("msync: %s", strerror(errno));
317              
318             void
319             unlink(self_or_class, ...)
320             SV *self_or_class
321             CODE:
322             const char *path;
323 1 50         if (sv_isobject(self_or_class)) {
324 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
325 1 50         if (!h) croak("Attempted to use a destroyed object");
326 1           path = h->path;
327             } else {
328 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int->unlink($path)");
329 0           path = SvPV_nolen(ST(1));
330             }
331 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
332 1 50         if (unlink(path) != 0)
333 0           croak("unlink(%s): %s", path, strerror(errno));
334              
335             IV
336             eventfd(self)
337             SV *self
338             PREINIT:
339 9 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
340             CODE:
341 9           RETVAL = pubsub_eventfd_create(h);
342 9 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
343             OUTPUT:
344             RETVAL
345              
346             void
347             eventfd_set(self, fd)
348             SV *self
349             int fd
350             PREINIT:
351 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
352             CODE:
353 1           pubsub_eventfd_set(h, fd);
354              
355             IV
356             fileno(self)
357             SV *self
358             PREINIT:
359 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
360             CODE:
361 1 50         RETVAL = h->notify_fd;
362             OUTPUT:
363             RETVAL
364              
365             void
366             eventfd_consume(self)
367             SV *self
368             PREINIT:
369 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
370             CODE:
371 4           pubsub_eventfd_consume(h);
372              
373             void
374             notify(self)
375             SV *self
376             PREINIT:
377 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
378             CODE:
379 4           pubsub_notify(h);
380              
381             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int::Sub
382              
383             void
384             DESTROY(self)
385             SV *self
386             PREINIT:
387 47 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
388             CODE:
389 47           sv_setiv(SvRV(self), 0);
390 47 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
391 47           pubsub_sub_destroy(sub);
392              
393             SV *
394             poll(self)
395             SV *self
396             PREINIT:
397 35 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
398             int64_t value;
399             CODE:
400 35           int r = pubsub_int_poll(sub, &value);
401 35 100         if (r == 1)
402 31           RETVAL = newSViv((IV)value);
403             else
404 4           RETVAL = &PL_sv_undef;
405             OUTPUT:
406             RETVAL
407              
408             void
409             poll_multi(self, count)
410             SV *self
411             UV count
412             PREINIT:
413 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
414             int64_t value;
415             PPCODE:
416 12 100         for (UV i = 0; i < count; i++) {
417 10           int r = pubsub_int_poll(sub, &value);
418 10 50         if (r != 1) break;
419 10 50         mXPUSHi((IV)value);
420             }
421              
422             SV *
423             poll_wait(self, ...)
424             SV *self
425             PREINIT:
426 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
427 9           double timeout = -1;
428             int64_t value;
429             CODE:
430 9 50         if (items > 1) timeout = SvNV(ST(1));
431 9           int r = pubsub_int_poll_wait(sub, &value, timeout);
432 9 100         if (r == 1)
433 8           RETVAL = newSViv((IV)value);
434             else
435 1           RETVAL = &PL_sv_undef;
436             OUTPUT:
437             RETVAL
438              
439             void
440             drain(self, ...)
441             SV *self
442             PREINIT:
443 8 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
444             int64_t value;
445             uint32_t max_count;
446             PPCODE:
447 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
448 1028 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
449 1020 50         mXPUSHi((IV)value);
450              
451             void
452             poll_wait_multi(self, count, ...)
453             SV *self
454             UV count
455             PREINIT:
456 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
457 3           double timeout = -1;
458             int64_t value;
459             PPCODE:
460 3 100         if (count == 0) XSRETURN(0);
461 2 50         if (items > 2) timeout = SvNV(ST(2));
462 2 100         if (!pubsub_int_poll_wait(sub, &value, timeout)) XSRETURN(0);
463 1 50         mXPUSHi((IV)value);
464 5 100         for (UV i = 1; i < count; i++) {
465 4 50         if (!pubsub_int_poll(sub, &value)) break;
466 4 50         mXPUSHi((IV)value);
467             }
468              
469             UV
470             lag(self)
471             SV *self
472             PREINIT:
473 18 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
474             CODE:
475 18           RETVAL = (UV)pubsub_lag(sub);
476             OUTPUT:
477             RETVAL
478              
479             UV
480             overflow_count(self)
481             SV *self
482             PREINIT:
483 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
484             CODE:
485 2 50         RETVAL = (UV)sub->overflow_count;
486             OUTPUT:
487             RETVAL
488              
489             UV
490             write_pos(self)
491             SV *self
492             PREINIT:
493 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
494             CODE:
495 2 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
496             OUTPUT:
497             RETVAL
498              
499             bool
500             has_overflow(self)
501             SV *self
502             PREINIT:
503 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
504             CODE:
505 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
506 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
507             OUTPUT:
508             RETVAL
509              
510             UV
511             cursor(self, ...)
512             SV *self
513             PREINIT:
514 6 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
515             CODE:
516 6 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
517 6 50         RETVAL = (UV)sub->cursor;
518             OUTPUT:
519             RETVAL
520              
521             void
522             reset(self)
523             SV *self
524             PREINIT:
525 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
526             CODE:
527 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
528              
529             void
530             reset_oldest(self)
531             SV *self
532             PREINIT:
533 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
534             CODE:
535 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
536 2 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
537              
538             UV
539             poll_cb(self, cb)
540             SV *self
541             SV *cb
542             PREINIT:
543 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
544             int64_t value;
545             CODE:
546 3           RETVAL = 0;
547 108 100         while (pubsub_int_poll(sub, &value)) {
548 105           dSP;
549 105 50         PUSHMARK(SP);
550 105 50         mXPUSHi((IV)value);
551 105           PUTBACK;
552 105           call_sv(cb, G_DISCARD);
553 105           RETVAL++;
554             }
555             OUTPUT:
556             RETVAL
557              
558             void
559             drain_notify(self, ...)
560             SV *self
561             PREINIT:
562 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
563             int64_t value;
564             uint32_t max_count;
565             PPCODE:
566 3           pubsub_sub_eventfd_consume(sub);
567 3 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
568 8 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
569 5 50         mXPUSHi((IV)value);
570              
571             void
572             eventfd_set(self, fd)
573             SV *self
574             int fd
575             PREINIT:
576 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
577             CODE:
578 1           sub->notify_fd = fd;
579              
580             IV
581             fileno(self)
582             SV *self
583             PREINIT:
584 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
585             CODE:
586 3 50         RETVAL = sub->notify_fd;
587             OUTPUT:
588             RETVAL
589              
590             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str
591              
592             SV *
593             new(class, path, capacity, ...)
594             const char *class
595             SV *path
596             UV capacity
597             PREINIT:
598             char errbuf[PUBSUB_ERR_BUFLEN];
599             uint32_t msg_size;
600             CODE:
601 49 100         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
602 49 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
603 49           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
604 49 100         if (!h) croak("Data::PubSub::Shared::Str->new: %s", errbuf);
605 48           SV *obj = newSViv(PTR2IV(h));
606 48           SV *ref = newRV_noinc(obj);
607 48           sv_bless(ref, gv_stashpv(class, GV_ADD));
608 48           RETVAL = ref;
609             OUTPUT:
610             RETVAL
611              
612             SV *
613             new_memfd(class, name, capacity, ...)
614             const char *class
615             const char *name
616             UV capacity
617             PREINIT:
618             char errbuf[PUBSUB_ERR_BUFLEN];
619             uint32_t msg_size;
620             CODE:
621 1 50         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
622 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
623 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_memfd: %s", errbuf);
624 1           SV *obj = newSViv(PTR2IV(h));
625 1           SV *ref = newRV_noinc(obj);
626 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
627 1           RETVAL = ref;
628             OUTPUT:
629             RETVAL
630              
631             SV *
632             new_from_fd(class, fd)
633             const char *class
634             int fd
635             PREINIT:
636             char errbuf[PUBSUB_ERR_BUFLEN];
637             CODE:
638 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_STR, errbuf);
639 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_from_fd: %s", errbuf);
640 1           SV *obj = newSViv(PTR2IV(h));
641 1           SV *ref = newRV_noinc(obj);
642 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
643 1           RETVAL = ref;
644             OUTPUT:
645             RETVAL
646              
647             IV
648             memfd(self)
649             SV *self
650             PREINIT:
651 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
652             CODE:
653 1 50         RETVAL = h->backing_fd;
654             OUTPUT:
655             RETVAL
656              
657             void
658             DESTROY(self)
659             SV *self
660             PREINIT:
661 50 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
662             CODE:
663 50           sv_setiv(SvRV(self), 0);
664 50           pubsub_destroy(h);
665              
666             SV *
667             publish(self, value)
668             SV *self
669             SV *value
670             PREINIT:
671 185 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
672             CODE:
673             STRLEN len;
674 185           bool utf8 = SvUTF8(value) ? true : false;
675             const char *str;
676 185 100         if (utf8)
677 3           str = SvPVutf8(value, len);
678             else
679 182           str = SvPV(value, len);
680 185           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
681 185 100         if (r == -1) croak("publish: message too long (%u > %u)", (unsigned)len, h->msg_size);
682 183           RETVAL = &PL_sv_yes;
683             OUTPUT:
684             RETVAL
685              
686             UV
687             publish_multi(self, ...)
688             SV *self
689             PREINIT:
690 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
691             CODE:
692 5           RETVAL = 0;
693 5           uint32_t count = items - 1;
694 5 100         if (count > 0) {
695 4           pubsub_mutex_lock(h->hdr);
696 16 100         for (uint32_t i = 0; i < count; i++) {
697 13           SV *val = ST(i + 1);
698             STRLEN len;
699 13           bool utf8 = SvUTF8(val) ? true : false;
700             const char *str;
701 13 50         if (utf8)
702 0           str = SvPVutf8(val, len);
703             else
704 13           str = SvPV(val, len);
705 13           int r = pubsub_str_publish_locked(h, str, (uint32_t)len, utf8);
706 13 100         if (r == -1) {
707 1           pubsub_mutex_unlock(h->hdr);
708 1           croak("publish_multi: message too long (%u > %u)", (unsigned)len, h->msg_size);
709             }
710 12           RETVAL++;
711             }
712 3           pubsub_mutex_unlock(h->hdr);
713 3           pubsub_wake_subscribers(h->hdr);
714             }
715             OUTPUT:
716             RETVAL
717              
718             void
719             publish_notify(self, value)
720             SV *self
721             SV *value
722             PREINIT:
723 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
724             CODE:
725             STRLEN len;
726 1           bool utf8 = SvUTF8(value) ? true : false;
727             const char *str;
728 1 50         if (utf8)
729 0           str = SvPVutf8(value, len);
730             else
731 1           str = SvPV(value, len);
732 1           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
733 1 50         if (r == -1) croak("publish_notify: message too long (%u > %u)", (unsigned)len, h->msg_size);
734 1           pubsub_notify(h);
735              
736             SV *
737             subscribe(self)
738             SV *self
739             PREINIT:
740 11 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
741             CODE:
742 11           PubSubSub *sub = pubsub_subscribe(h, 0);
743 11 50         if (!sub) croak("subscribe: out of memory");
744 11           sub->userdata = (void *)newSVsv(self);
745 11           SV *obj = newSViv(PTR2IV(sub));
746 11           SV *ref = newRV_noinc(obj);
747 11           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Str::Sub", GV_ADD));
748 11           RETVAL = ref;
749             OUTPUT:
750             RETVAL
751              
752             SV *
753             subscribe_all(self)
754             SV *self
755             PREINIT:
756 24 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
757             CODE:
758 24           PubSubSub *sub = pubsub_subscribe(h, 1);
759 24 50         if (!sub) croak("subscribe_all: out of memory");
760 24           sub->userdata = (void *)newSVsv(self);
761 24           SV *obj = newSViv(PTR2IV(sub));
762 24           SV *ref = newRV_noinc(obj);
763 24           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Str::Sub", GV_ADD));
764 24           RETVAL = ref;
765             OUTPUT:
766             RETVAL
767              
768             UV
769             capacity(self)
770             SV *self
771             PREINIT:
772 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
773             CODE:
774 1 50         RETVAL = h->capacity;
775             OUTPUT:
776             RETVAL
777              
778             UV
779             msg_size(self)
780             SV *self
781             PREINIT:
782 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
783             CODE:
784 2 50         RETVAL = h->msg_size;
785             OUTPUT:
786             RETVAL
787              
788             UV
789             write_pos(self)
790             SV *self
791             PREINIT:
792 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
793             CODE:
794 7 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
795             OUTPUT:
796             RETVAL
797              
798             SV *
799             path(self)
800             SV *self
801             PREINIT:
802 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
803             CODE:
804 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
805             OUTPUT:
806             RETVAL
807              
808             SV *
809             stats(self)
810             SV *self
811             PREINIT:
812 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
813             CODE:
814 2           HV *hv = newHV();
815 2           PubSubHeader *hdr = h->hdr;
816 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
817 2           hv_store(hv, "msg_size", 8, newSVuv(h->msg_size), 0);
818 2           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
819 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
820 2           hv_store(hv, "write_pos", 9,
821             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
822 2           hv_store(hv, "publish_ok", 10,
823             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
824 2           hv_store(hv, "recoveries", 10,
825             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
826 2           hv_store(hv, "sub_waiters", 11,
827             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
828 2           RETVAL = newRV_noinc((SV *)hv);
829             OUTPUT:
830             RETVAL
831              
832             void
833             clear(self)
834             SV *self
835             PREINIT:
836 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
837             CODE:
838 2           pubsub_clear(h);
839              
840             void
841             sync(self)
842             SV *self
843             PREINIT:
844 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
845             CODE:
846 2 50         if (pubsub_sync(h) != 0)
847 0           croak("msync: %s", strerror(errno));
848              
849             void
850             unlink(self_or_class, ...)
851             SV *self_or_class
852             CODE:
853             const char *path;
854 1 50         if (sv_isobject(self_or_class)) {
855 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
856 1 50         if (!h) croak("Attempted to use a destroyed object");
857 1           path = h->path;
858             } else {
859 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Str->unlink($path)");
860 0           path = SvPV_nolen(ST(1));
861             }
862 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
863 1 50         if (unlink(path) != 0)
864 0           croak("unlink(%s): %s", path, strerror(errno));
865              
866             IV
867             eventfd(self)
868             SV *self
869             PREINIT:
870 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
871             CODE:
872 4           RETVAL = pubsub_eventfd_create(h);
873 4 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
874             OUTPUT:
875             RETVAL
876              
877             void
878             eventfd_set(self, fd)
879             SV *self
880             int fd
881             PREINIT:
882 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
883             CODE:
884 0           pubsub_eventfd_set(h, fd);
885              
886             IV
887             fileno(self)
888             SV *self
889             PREINIT:
890 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
891             CODE:
892 0 0         RETVAL = h->notify_fd;
893             OUTPUT:
894             RETVAL
895              
896             void
897             eventfd_consume(self)
898             SV *self
899             PREINIT:
900 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
901             CODE:
902 2           pubsub_eventfd_consume(h);
903              
904             void
905             notify(self)
906             SV *self
907             PREINIT:
908 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
909             CODE:
910 2           pubsub_notify(h);
911              
912             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str::Sub
913              
914             void
915             DESTROY(self)
916             SV *self
917             PREINIT:
918 35 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
919             CODE:
920 35           sv_setiv(SvRV(self), 0);
921 35 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
922 35           pubsub_sub_destroy(sub);
923              
924             SV *
925             poll(self)
926             SV *self
927             PREINIT:
928 37 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
929             const char *str;
930             uint32_t len;
931             bool utf8;
932             CODE:
933 37           int r = pubsub_str_poll(sub, &str, &len, &utf8);
934 37 100         if (r == 1) {
935 34           RETVAL = newSVpvn(str, len);
936 34 100         if (utf8) SvUTF8_on(RETVAL);
937             } else {
938 3           RETVAL = &PL_sv_undef;
939             }
940             OUTPUT:
941             RETVAL
942              
943             void
944             poll_multi(self, count)
945             SV *self
946             UV count
947             PREINIT:
948 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
949             const char *str;
950             uint32_t len;
951             bool utf8;
952             PPCODE:
953 4 50         for (UV i = 0; i < count; i++) {
954 4           int r = pubsub_str_poll(sub, &str, &len, &utf8);
955 4 100         if (r != 1) break;
956 3           SV *sv = newSVpvn(str, len);
957 3 50         if (utf8) SvUTF8_on(sv);
958 3 50         mXPUSHs(sv);
959             }
960              
961             SV *
962             poll_wait(self, ...)
963             SV *self
964             PREINIT:
965 54 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
966 54           double timeout = -1;
967             const char *str;
968             uint32_t len;
969             bool utf8;
970             CODE:
971 54 50         if (items > 1) timeout = SvNV(ST(1));
972 54           int r = pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout);
973 54 100         if (r == 1) {
974 53           RETVAL = newSVpvn(str, len);
975 53 50         if (utf8) SvUTF8_on(RETVAL);
976             } else {
977 1           RETVAL = &PL_sv_undef;
978             }
979             OUTPUT:
980             RETVAL
981              
982             void
983             drain(self, ...)
984             SV *self
985             PREINIT:
986 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
987             const char *str;
988             uint32_t len;
989             bool utf8;
990             uint32_t max_count;
991             PPCODE:
992 4 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
993 20 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
994 16           SV *sv = newSVpvn(str, len);
995 16 50         if (utf8) SvUTF8_on(sv);
996 16 50         mXPUSHs(sv);
997             }
998              
999             void
1000             poll_wait_multi(self, count, ...)
1001             SV *self
1002             UV count
1003             PREINIT:
1004 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1005 2           double timeout = -1;
1006             const char *str;
1007             uint32_t len;
1008             bool utf8;
1009             PPCODE:
1010 2 100         if (count == 0) XSRETURN(0);
1011 1 50         if (items > 2) timeout = SvNV(ST(2));
1012 1 50         if (pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout) != 1) XSRETURN(0);
1013             {
1014 1           SV *sv = newSVpvn(str, len);
1015 1 50         if (utf8) SvUTF8_on(sv);
1016 1 50         mXPUSHs(sv);
1017             }
1018 3 100         for (UV i = 1; i < count; i++) {
1019 2 50         if (pubsub_str_poll(sub, &str, &len, &utf8) != 1) break;
1020 2           SV *sv = newSVpvn(str, len);
1021 2 50         if (utf8) SvUTF8_on(sv);
1022 2 50         mXPUSHs(sv);
1023             }
1024              
1025             UV
1026             lag(self)
1027             SV *self
1028             PREINIT:
1029 8 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1030             CODE:
1031 8           RETVAL = (UV)pubsub_lag(sub);
1032             OUTPUT:
1033             RETVAL
1034              
1035             UV
1036             overflow_count(self)
1037             SV *self
1038             PREINIT:
1039 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1040             CODE:
1041 1 50         RETVAL = (UV)sub->overflow_count;
1042             OUTPUT:
1043             RETVAL
1044              
1045             UV
1046             write_pos(self)
1047             SV *self
1048             PREINIT:
1049 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1050             CODE:
1051 1 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1052             OUTPUT:
1053             RETVAL
1054              
1055             bool
1056             has_overflow(self)
1057             SV *self
1058             PREINIT:
1059 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1060             CODE:
1061 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1062 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
1063             OUTPUT:
1064             RETVAL
1065              
1066             UV
1067             cursor(self, ...)
1068             SV *self
1069             PREINIT:
1070 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1071             CODE:
1072 4 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1073 4 50         RETVAL = (UV)sub->cursor;
1074             OUTPUT:
1075             RETVAL
1076              
1077             void
1078             reset(self)
1079             SV *self
1080             PREINIT:
1081 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1082             CODE:
1083 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1084              
1085             void
1086             reset_oldest(self)
1087             SV *self
1088             PREINIT:
1089 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1090             CODE:
1091 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1092 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1093              
1094             UV
1095             poll_cb(self, cb)
1096             SV *self
1097             SV *cb
1098             PREINIT:
1099 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1100             const char *str;
1101             uint32_t len;
1102             bool utf8;
1103             CODE:
1104 2           RETVAL = 0;
1105 6 100         while (pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
1106 4           dSP;
1107 4           SV *sv = newSVpvn(str, len);
1108 4 100         if (utf8) SvUTF8_on(sv);
1109 4 50         PUSHMARK(SP);
1110 4 50         mXPUSHs(sv);
1111 4           PUTBACK;
1112 4           call_sv(cb, G_DISCARD);
1113 4           RETVAL++;
1114             }
1115             OUTPUT:
1116             RETVAL
1117              
1118             void
1119             drain_notify(self, ...)
1120             SV *self
1121             PREINIT:
1122 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1123             const char *str;
1124             uint32_t len;
1125             bool utf8;
1126             uint32_t max_count;
1127             PPCODE:
1128 1           pubsub_sub_eventfd_consume(sub);
1129 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1130 2 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
1131 1           SV *sv = newSVpvn(str, len);
1132 1 50         if (utf8) SvUTF8_on(sv);
1133 1 50         mXPUSHs(sv);
1134             }
1135              
1136             void
1137             eventfd_set(self, fd)
1138             SV *self
1139             int fd
1140             PREINIT:
1141 0 0         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    0          
    0          
1142             CODE:
1143 0           sub->notify_fd = fd;
1144              
1145             IV
1146             fileno(self)
1147             SV *self
1148             PREINIT:
1149 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1150             CODE:
1151 1 50         RETVAL = sub->notify_fd;
1152             OUTPUT:
1153             RETVAL
1154              
1155             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32
1156              
1157             SV *
1158             new(class, path, capacity)
1159             const char *class
1160             SV *path
1161             UV capacity
1162             PREINIT:
1163             char errbuf[PUBSUB_ERR_BUFLEN];
1164             CODE:
1165 12 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1166 12           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1167 12 100         if (!h) croak("Data::PubSub::Shared::Int32->new: %s", errbuf);
1168 11           SV *obj = newSViv(PTR2IV(h));
1169 11           SV *ref = newRV_noinc(obj);
1170 11           sv_bless(ref, gv_stashpv(class, GV_ADD));
1171 11           RETVAL = ref;
1172             OUTPUT:
1173             RETVAL
1174              
1175             SV *
1176             new_memfd(class, name, capacity)
1177             const char *class
1178             const char *name
1179             UV capacity
1180             PREINIT:
1181             char errbuf[PUBSUB_ERR_BUFLEN];
1182             CODE:
1183 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1184 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_memfd: %s", errbuf);
1185 1           SV *obj = newSViv(PTR2IV(h));
1186 1           SV *ref = newRV_noinc(obj);
1187 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1188 1           RETVAL = ref;
1189             OUTPUT:
1190             RETVAL
1191              
1192             SV *
1193             new_from_fd(class, fd)
1194             const char *class
1195             int fd
1196             PREINIT:
1197             char errbuf[PUBSUB_ERR_BUFLEN];
1198             CODE:
1199 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT32, errbuf);
1200 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_from_fd: %s", errbuf);
1201 1           SV *obj = newSViv(PTR2IV(h));
1202 1           SV *ref = newRV_noinc(obj);
1203 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1204 1           RETVAL = ref;
1205             OUTPUT:
1206             RETVAL
1207              
1208             IV
1209             memfd(self)
1210             SV *self
1211             PREINIT:
1212 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1213             CODE:
1214 1 50         RETVAL = h->backing_fd;
1215             OUTPUT:
1216             RETVAL
1217              
1218             void
1219             DESTROY(self)
1220             SV *self
1221             PREINIT:
1222 13 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1223             CODE:
1224 13           sv_setiv(SvRV(self), 0);
1225 13           pubsub_destroy(h);
1226              
1227             bool
1228             publish(self, value)
1229             SV *self
1230             IV value
1231             PREINIT:
1232 50 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1233             CODE:
1234 50 50         RETVAL = pubsub_int32_publish(h, (int32_t)value);
1235             OUTPUT:
1236             RETVAL
1237              
1238             UV
1239             publish_multi(self, ...)
1240             SV *self
1241             PREINIT:
1242 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1243             CODE:
1244 1           uint32_t count = items - 1;
1245 1 50         if (count == 0) { RETVAL = 0; }
1246             else {
1247 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1248 1           int32_t *vals = (int32_t *)alloca(count * sizeof(int32_t));
1249 4 100         for (uint32_t i = 0; i < count; i++)
1250 3           vals[i] = (int32_t)SvIV(ST(i + 1));
1251 1           RETVAL = pubsub_int32_publish_multi(h, vals, count);
1252             }
1253             OUTPUT:
1254             RETVAL
1255              
1256             void
1257             publish_notify(self, value)
1258             SV *self
1259             IV value
1260             PREINIT:
1261 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1262             CODE:
1263 0           pubsub_int32_publish(h, (int32_t)value);
1264 0           pubsub_notify(h);
1265              
1266             SV *
1267             subscribe(self)
1268             SV *self
1269             PREINIT:
1270 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1271             CODE:
1272 4           PubSubSub *sub = pubsub_subscribe(h, 0);
1273 4 50         if (!sub) croak("subscribe: out of memory");
1274 4           sub->userdata = (void *)newSVsv(self);
1275 4           SV *obj = newSViv(PTR2IV(sub));
1276 4           SV *ref = newRV_noinc(obj);
1277 4           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int32::Sub", GV_ADD));
1278 4           RETVAL = ref;
1279             OUTPUT:
1280             RETVAL
1281              
1282             SV *
1283             subscribe_all(self)
1284             SV *self
1285             PREINIT:
1286 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1287             CODE:
1288 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1289 7 50         if (!sub) croak("subscribe_all: out of memory");
1290 7           sub->userdata = (void *)newSVsv(self);
1291 7           SV *obj = newSViv(PTR2IV(sub));
1292 7           SV *ref = newRV_noinc(obj);
1293 7           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int32::Sub", GV_ADD));
1294 7           RETVAL = ref;
1295             OUTPUT:
1296             RETVAL
1297              
1298             UV
1299             capacity(self)
1300             SV *self
1301             PREINIT:
1302 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1303             CODE:
1304 1 50         RETVAL = h->capacity;
1305             OUTPUT:
1306             RETVAL
1307              
1308             UV
1309             write_pos(self)
1310             SV *self
1311             PREINIT:
1312 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1313             CODE:
1314 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1315             OUTPUT:
1316             RETVAL
1317              
1318             SV *
1319             path(self)
1320             SV *self
1321             PREINIT:
1322 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1323             CODE:
1324 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1325             OUTPUT:
1326             RETVAL
1327              
1328             SV *
1329             stats(self)
1330             SV *self
1331             PREINIT:
1332 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1333             CODE:
1334 2           HV *hv = newHV();
1335 2           PubSubHeader *hdr = h->hdr;
1336 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1337 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1338 2           hv_store(hv, "write_pos", 9,
1339             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1340 2           hv_store(hv, "publish_ok", 10,
1341             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1342 2           hv_store(hv, "recoveries", 10,
1343             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1344 2           hv_store(hv, "sub_waiters", 11,
1345             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1346 2           RETVAL = newRV_noinc((SV *)hv);
1347             OUTPUT:
1348             RETVAL
1349              
1350             void
1351             clear(self)
1352             SV *self
1353             PREINIT:
1354 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1355             CODE:
1356 1           pubsub_clear(h);
1357              
1358             void
1359             sync(self)
1360             SV *self
1361             PREINIT:
1362 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1363             CODE:
1364 1 50         if (pubsub_sync(h) != 0)
1365 0           croak("msync: %s", strerror(errno));
1366              
1367             void
1368             unlink(self_or_class, ...)
1369             SV *self_or_class
1370             CODE:
1371             const char *path;
1372 0 0         if (sv_isobject(self_or_class)) {
1373 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1374 0 0         if (!h) croak("Attempted to use a destroyed object");
1375 0           path = h->path;
1376             } else {
1377 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int32->unlink($path)");
1378 0           path = SvPV_nolen(ST(1));
1379             }
1380 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1381 0 0         if (unlink(path) != 0)
1382 0           croak("unlink(%s): %s", path, strerror(errno));
1383              
1384             IV
1385             eventfd(self)
1386             SV *self
1387             PREINIT:
1388 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1389             CODE:
1390 0           RETVAL = pubsub_eventfd_create(h);
1391 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1392             OUTPUT:
1393             RETVAL
1394              
1395             void
1396             eventfd_set(self, fd)
1397             SV *self
1398             int fd
1399             PREINIT:
1400 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1401             CODE:
1402 0           pubsub_eventfd_set(h, fd);
1403              
1404             IV
1405             fileno(self)
1406             SV *self
1407             PREINIT:
1408 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1409             CODE:
1410 0 0         RETVAL = h->notify_fd;
1411             OUTPUT:
1412             RETVAL
1413              
1414             void
1415             eventfd_consume(self)
1416             SV *self
1417             PREINIT:
1418 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1419             CODE:
1420 0           pubsub_eventfd_consume(h);
1421              
1422             void
1423             notify(self)
1424             SV *self
1425             PREINIT:
1426 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1427             CODE:
1428 0           pubsub_notify(h);
1429              
1430             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32::Sub
1431              
1432             void
1433             DESTROY(self)
1434             SV *self
1435             PREINIT:
1436 11 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1437             CODE:
1438 11           sv_setiv(SvRV(self), 0);
1439 11 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1440 11           pubsub_sub_destroy(sub);
1441              
1442             SV *
1443             poll(self)
1444             SV *self
1445             PREINIT:
1446 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1447             int32_t value;
1448             CODE:
1449 9           int r = pubsub_int32_poll(sub, &value);
1450 9 100         if (r == 1)
1451 8           RETVAL = newSViv((IV)value);
1452             else
1453 1           RETVAL = &PL_sv_undef;
1454             OUTPUT:
1455             RETVAL
1456              
1457             void
1458             poll_multi(self, count)
1459             SV *self
1460             UV count
1461             PREINIT:
1462 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1463             int32_t value;
1464             PPCODE:
1465 7 50         for (UV i = 0; i < count; i++) {
1466 7           int r = pubsub_int32_poll(sub, &value);
1467 7 100         if (r != 1) break;
1468 6 50         mXPUSHi((IV)value);
1469             }
1470              
1471             SV *
1472             poll_wait(self, ...)
1473             SV *self
1474             PREINIT:
1475 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1476 1           double timeout = -1;
1477             int32_t value;
1478             CODE:
1479 1 50         if (items > 1) timeout = SvNV(ST(1));
1480 1           int r = pubsub_int32_poll_wait(sub, &value, timeout);
1481 1 50         if (r == 1)
1482 0           RETVAL = newSViv((IV)value);
1483             else
1484 1           RETVAL = &PL_sv_undef;
1485             OUTPUT:
1486             RETVAL
1487              
1488             void
1489             drain(self, ...)
1490             SV *self
1491             PREINIT:
1492 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1493             int32_t value;
1494             uint32_t max_count;
1495             PPCODE:
1496 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1497 9 50         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    100          
1498 8 50         mXPUSHi((IV)value);
1499              
1500             void
1501             poll_wait_multi(self, count, ...)
1502             SV *self
1503             UV count
1504             PREINIT:
1505 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1506 1           double timeout = -1;
1507             int32_t value;
1508             PPCODE:
1509 1 50         if (count == 0) XSRETURN(0);
1510 1 50         if (items > 2) timeout = SvNV(ST(2));
1511 1 50         if (!pubsub_int32_poll_wait(sub, &value, timeout)) XSRETURN(0);
1512 1 50         mXPUSHi((IV)value);
1513 3 100         for (UV i = 1; i < count; i++) {
1514 2 50         if (!pubsub_int32_poll(sub, &value)) break;
1515 2 50         mXPUSHi((IV)value);
1516             }
1517              
1518             UV
1519             lag(self)
1520             SV *self
1521             PREINIT:
1522 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1523             CODE:
1524 3           RETVAL = (UV)pubsub_lag(sub);
1525             OUTPUT:
1526             RETVAL
1527              
1528             UV
1529             overflow_count(self)
1530             SV *self
1531             PREINIT:
1532 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1533             CODE:
1534 1 50         RETVAL = (UV)sub->overflow_count;
1535             OUTPUT:
1536             RETVAL
1537              
1538             UV
1539             write_pos(self)
1540             SV *self
1541             PREINIT:
1542 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1543             CODE:
1544 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1545             OUTPUT:
1546             RETVAL
1547              
1548             bool
1549             has_overflow(self)
1550             SV *self
1551             PREINIT:
1552 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1553             CODE:
1554 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1555 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
1556             OUTPUT:
1557             RETVAL
1558              
1559             UV
1560             cursor(self, ...)
1561             SV *self
1562             PREINIT:
1563 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1564             CODE:
1565 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1566 1 50         RETVAL = (UV)sub->cursor;
1567             OUTPUT:
1568             RETVAL
1569              
1570             void
1571             reset(self)
1572             SV *self
1573             PREINIT:
1574 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1575             CODE:
1576 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1577              
1578             void
1579             reset_oldest(self)
1580             SV *self
1581             PREINIT:
1582 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1583             CODE:
1584 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1585 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1586              
1587             UV
1588             poll_cb(self, cb)
1589             SV *self
1590             SV *cb
1591             PREINIT:
1592 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1593             int32_t value;
1594             CODE:
1595 1           RETVAL = 0;
1596 6 100         while (pubsub_int32_poll(sub, &value)) {
1597 5           dSP;
1598 5 50         PUSHMARK(SP);
1599 5 50         mXPUSHi((IV)value);
1600 5           PUTBACK;
1601 5           call_sv(cb, G_DISCARD);
1602 5           RETVAL++;
1603             }
1604             OUTPUT:
1605             RETVAL
1606              
1607             void
1608             drain_notify(self, ...)
1609             SV *self
1610             PREINIT:
1611 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1612             int32_t value;
1613             uint32_t max_count;
1614             PPCODE:
1615 0           pubsub_sub_eventfd_consume(sub);
1616 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1617 0 0         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    0          
1618 0 0         mXPUSHi((IV)value);
1619              
1620             void
1621             eventfd_set(self, fd)
1622             SV *self
1623             int fd
1624             PREINIT:
1625 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1626             CODE:
1627 0           sub->notify_fd = fd;
1628              
1629             IV
1630             fileno(self)
1631             SV *self
1632             PREINIT:
1633 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1634             CODE:
1635 0 0         RETVAL = sub->notify_fd;
1636             OUTPUT:
1637             RETVAL
1638              
1639             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16
1640              
1641             SV *
1642             new(class, path, capacity)
1643             const char *class
1644             SV *path
1645             UV capacity
1646             PREINIT:
1647             char errbuf[PUBSUB_ERR_BUFLEN];
1648             CODE:
1649 11 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1650 11           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1651 11 100         if (!h) croak("Data::PubSub::Shared::Int16->new: %s", errbuf);
1652 10           SV *obj = newSViv(PTR2IV(h));
1653 10           SV *ref = newRV_noinc(obj);
1654 10           sv_bless(ref, gv_stashpv(class, GV_ADD));
1655 10           RETVAL = ref;
1656             OUTPUT:
1657             RETVAL
1658              
1659             SV *
1660             new_memfd(class, name, capacity)
1661             const char *class
1662             const char *name
1663             UV capacity
1664             PREINIT:
1665             char errbuf[PUBSUB_ERR_BUFLEN];
1666             CODE:
1667 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1668 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_memfd: %s", errbuf);
1669 1           SV *obj = newSViv(PTR2IV(h));
1670 1           SV *ref = newRV_noinc(obj);
1671 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1672 1           RETVAL = ref;
1673             OUTPUT:
1674             RETVAL
1675              
1676             SV *
1677             new_from_fd(class, fd)
1678             const char *class
1679             int fd
1680             PREINIT:
1681             char errbuf[PUBSUB_ERR_BUFLEN];
1682             CODE:
1683 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT16, errbuf);
1684 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_from_fd: %s", errbuf);
1685 1           SV *obj = newSViv(PTR2IV(h));
1686 1           SV *ref = newRV_noinc(obj);
1687 1           sv_bless(ref, gv_stashpv(class, GV_ADD));
1688 1           RETVAL = ref;
1689             OUTPUT:
1690             RETVAL
1691              
1692             IV
1693             memfd(self)
1694             SV *self
1695             PREINIT:
1696 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1697             CODE:
1698 1 50         RETVAL = h->backing_fd;
1699             OUTPUT:
1700             RETVAL
1701              
1702             void
1703             DESTROY(self)
1704             SV *self
1705             PREINIT:
1706 12 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1707             CODE:
1708 12           sv_setiv(SvRV(self), 0);
1709 12           pubsub_destroy(h);
1710              
1711             bool
1712             publish(self, value)
1713             SV *self
1714             IV value
1715             PREINIT:
1716 49 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1717             CODE:
1718 49 50         RETVAL = pubsub_int16_publish(h, (int16_t)value);
1719             OUTPUT:
1720             RETVAL
1721              
1722             UV
1723             publish_multi(self, ...)
1724             SV *self
1725             PREINIT:
1726 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1727             CODE:
1728 1           uint32_t count = items - 1;
1729 1 50         if (count == 0) { RETVAL = 0; }
1730             else {
1731 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1732 1           int16_t *vals = (int16_t *)alloca(count * sizeof(int16_t));
1733 4 100         for (uint32_t i = 0; i < count; i++)
1734 3           vals[i] = (int16_t)SvIV(ST(i + 1));
1735 1           RETVAL = pubsub_int16_publish_multi(h, vals, count);
1736             }
1737             OUTPUT:
1738             RETVAL
1739              
1740             void
1741             publish_notify(self, value)
1742             SV *self
1743             IV value
1744             PREINIT:
1745 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1746             CODE:
1747 0           pubsub_int16_publish(h, (int16_t)value);
1748 0           pubsub_notify(h);
1749              
1750             SV *
1751             subscribe(self)
1752             SV *self
1753             PREINIT:
1754 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1755             CODE:
1756 4           PubSubSub *sub = pubsub_subscribe(h, 0);
1757 4 50         if (!sub) croak("subscribe: out of memory");
1758 4           sub->userdata = (void *)newSVsv(self);
1759 4           SV *obj = newSViv(PTR2IV(sub));
1760 4           SV *ref = newRV_noinc(obj);
1761 4           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int16::Sub", GV_ADD));
1762 4           RETVAL = ref;
1763             OUTPUT:
1764             RETVAL
1765              
1766             SV *
1767             subscribe_all(self)
1768             SV *self
1769             PREINIT:
1770 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1771             CODE:
1772 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1773 7 50         if (!sub) croak("subscribe_all: out of memory");
1774 7           sub->userdata = (void *)newSVsv(self);
1775 7           SV *obj = newSViv(PTR2IV(sub));
1776 7           SV *ref = newRV_noinc(obj);
1777 7           sv_bless(ref, gv_stashpv("Data::PubSub::Shared::Int16::Sub", GV_ADD));
1778 7           RETVAL = ref;
1779             OUTPUT:
1780             RETVAL
1781              
1782             UV
1783             capacity(self)
1784             SV *self
1785             PREINIT:
1786 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1787             CODE:
1788 1 50         RETVAL = h->capacity;
1789             OUTPUT:
1790             RETVAL
1791              
1792             UV
1793             write_pos(self)
1794             SV *self
1795             PREINIT:
1796 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1797             CODE:
1798 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1799             OUTPUT:
1800             RETVAL
1801              
1802             SV *
1803             path(self)
1804             SV *self
1805             PREINIT:
1806 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1807             CODE:
1808 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1809             OUTPUT:
1810             RETVAL
1811              
1812             SV *
1813             stats(self)
1814             SV *self
1815             PREINIT:
1816 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1817             CODE:
1818 2           HV *hv = newHV();
1819 2           PubSubHeader *hdr = h->hdr;
1820 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1821 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1822 2           hv_store(hv, "write_pos", 9,
1823             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1824 2           hv_store(hv, "publish_ok", 10,
1825             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1826 2           hv_store(hv, "recoveries", 10,
1827             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1828 2           hv_store(hv, "sub_waiters", 11,
1829             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1830 2           RETVAL = newRV_noinc((SV *)hv);
1831             OUTPUT:
1832             RETVAL
1833              
1834             void
1835             clear(self)
1836             SV *self
1837             PREINIT:
1838 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1839             CODE:
1840 1           pubsub_clear(h);
1841              
1842             void
1843             sync(self)
1844             SV *self
1845             PREINIT:
1846 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1847             CODE:
1848 0 0         if (pubsub_sync(h) != 0)
1849 0           croak("msync: %s", strerror(errno));
1850              
1851             void
1852             unlink(self_or_class, ...)
1853             SV *self_or_class
1854             CODE:
1855             const char *path;
1856 0 0         if (sv_isobject(self_or_class)) {
1857 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1858 0 0         if (!h) croak("Attempted to use a destroyed object");
1859 0           path = h->path;
1860             } else {
1861 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int16->unlink($path)");
1862 0           path = SvPV_nolen(ST(1));
1863             }
1864 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1865 0 0         if (unlink(path) != 0)
1866 0           croak("unlink(%s): %s", path, strerror(errno));
1867              
1868             IV
1869             eventfd(self)
1870             SV *self
1871             PREINIT:
1872 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1873             CODE:
1874 0           RETVAL = pubsub_eventfd_create(h);
1875 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1876             OUTPUT:
1877             RETVAL
1878              
1879             void
1880             eventfd_set(self, fd)
1881             SV *self
1882             int fd
1883             PREINIT:
1884 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1885             CODE:
1886 0           pubsub_eventfd_set(h, fd);
1887              
1888             IV
1889             fileno(self)
1890             SV *self
1891             PREINIT:
1892 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1893             CODE:
1894 0 0         RETVAL = h->notify_fd;
1895             OUTPUT:
1896             RETVAL
1897              
1898             void
1899             eventfd_consume(self)
1900             SV *self
1901             PREINIT:
1902 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1903             CODE:
1904 0           pubsub_eventfd_consume(h);
1905              
1906             void
1907             notify(self)
1908             SV *self
1909             PREINIT:
1910 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1911             CODE:
1912 0           pubsub_notify(h);
1913              
1914             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16::Sub
1915              
1916             void
1917             DESTROY(self)
1918             SV *self
1919             PREINIT:
1920 11 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1921             CODE:
1922 11           sv_setiv(SvRV(self), 0);
1923 11 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1924 11           pubsub_sub_destroy(sub);
1925              
1926             SV *
1927             poll(self)
1928             SV *self
1929             PREINIT:
1930 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1931             int16_t value;
1932             CODE:
1933 9           int r = pubsub_int16_poll(sub, &value);
1934 9 100         if (r == 1)
1935 8           RETVAL = newSViv((IV)value);
1936             else
1937 1           RETVAL = &PL_sv_undef;
1938             OUTPUT:
1939             RETVAL
1940              
1941             void
1942             poll_multi(self, count)
1943             SV *self
1944             UV count
1945             PREINIT:
1946 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1947             int16_t value;
1948             PPCODE:
1949 7 50         for (UV i = 0; i < count; i++) {
1950 7           int r = pubsub_int16_poll(sub, &value);
1951 7 100         if (r != 1) break;
1952 6 50         mXPUSHi((IV)value);
1953             }
1954              
1955             SV *
1956             poll_wait(self, ...)
1957             SV *self
1958             PREINIT:
1959 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1960 1           double timeout = -1;
1961             int16_t value;
1962             CODE:
1963 1 50         if (items > 1) timeout = SvNV(ST(1));
1964 1           int r = pubsub_int16_poll_wait(sub, &value, timeout);
1965 1 50         if (r == 1)
1966 0           RETVAL = newSViv((IV)value);
1967             else
1968 1           RETVAL = &PL_sv_undef;
1969             OUTPUT:
1970             RETVAL
1971              
1972             void
1973             drain(self, ...)
1974             SV *self
1975             PREINIT:
1976 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1977             int16_t value;
1978             uint32_t max_count;
1979             PPCODE:
1980 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1981 9 50         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    100          
1982 8 50         mXPUSHi((IV)value);
1983              
1984             void
1985             poll_wait_multi(self, count, ...)
1986             SV *self
1987             UV count
1988             PREINIT:
1989 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1990 1           double timeout = -1;
1991             int16_t value;
1992             PPCODE:
1993 1 50         if (count == 0) XSRETURN(0);
1994 1 50         if (items > 2) timeout = SvNV(ST(2));
1995 1 50         if (!pubsub_int16_poll_wait(sub, &value, timeout)) XSRETURN(0);
1996 1 50         mXPUSHi((IV)value);
1997 3 100         for (UV i = 1; i < count; i++) {
1998 2 50         if (!pubsub_int16_poll(sub, &value)) break;
1999 2 50         mXPUSHi((IV)value);
2000             }
2001              
2002             UV
2003             lag(self)
2004             SV *self
2005             PREINIT:
2006 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2007             CODE:
2008 3           RETVAL = (UV)pubsub_lag(sub);
2009             OUTPUT:
2010             RETVAL
2011              
2012             UV
2013             overflow_count(self)
2014             SV *self
2015             PREINIT:
2016 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2017             CODE:
2018 1 50         RETVAL = (UV)sub->overflow_count;
2019             OUTPUT:
2020             RETVAL
2021              
2022             UV
2023             write_pos(self)
2024             SV *self
2025             PREINIT:
2026 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2027             CODE:
2028 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2029             OUTPUT:
2030             RETVAL
2031              
2032             bool
2033             has_overflow(self)
2034             SV *self
2035             PREINIT:
2036 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2037             CODE:
2038 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2039 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
2040             OUTPUT:
2041             RETVAL
2042              
2043             UV
2044             cursor(self, ...)
2045             SV *self
2046             PREINIT:
2047 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2048             CODE:
2049 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
2050 1 50         RETVAL = (UV)sub->cursor;
2051             OUTPUT:
2052             RETVAL
2053              
2054             void
2055             reset(self)
2056             SV *self
2057             PREINIT:
2058 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2059             CODE:
2060 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2061              
2062             void
2063             reset_oldest(self)
2064             SV *self
2065             PREINIT:
2066 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2067             CODE:
2068 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2069 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
2070              
2071             UV
2072             poll_cb(self, cb)
2073             SV *self
2074             SV *cb
2075             PREINIT:
2076 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2077             int16_t value;
2078             CODE:
2079 1           RETVAL = 0;
2080 6 100         while (pubsub_int16_poll(sub, &value)) {
2081 5           dSP;
2082 5 50         PUSHMARK(SP);
2083 5 50         mXPUSHi((IV)value);
2084 5           PUTBACK;
2085 5           call_sv(cb, G_DISCARD);
2086 5           RETVAL++;
2087             }
2088             OUTPUT:
2089             RETVAL
2090              
2091             void
2092             drain_notify(self, ...)
2093             SV *self
2094             PREINIT:
2095 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2096             int16_t value;
2097             uint32_t max_count;
2098             PPCODE:
2099 0           pubsub_sub_eventfd_consume(sub);
2100 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
2101 0 0         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    0          
2102 0 0         mXPUSHi((IV)value);
2103              
2104             void
2105             eventfd_set(self, fd)
2106             SV *self
2107             int fd
2108             PREINIT:
2109 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2110             CODE:
2111 0           sub->notify_fd = fd;
2112              
2113             IV
2114             fileno(self)
2115             SV *self
2116             PREINIT:
2117 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2118             CODE:
2119 0 0         RETVAL = sub->notify_fd;
2120             OUTPUT:
2121             RETVAL