File Coverage

Shared.xs
Criterion Covered Total %
statement 591 689 85.7
branch 604 1324 45.6
condition n/a
subroutine n/a
pod n/a
total 1195 2013 59.3


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "pubsub.h"
8              
9             #include "XSParseKeyword.h"
10              
11 16           static int build_kw_1arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
12             (void)nargs;
13 16           const char *func = (const char *)hookdata;
14 16           OP *q_op = args[0]->op;
15 16           OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
16 16           *out = op_convert_list(OP_ENTERSUB, OPf_STACKED,
17             op_append_elem(OP_LIST, q_op, cvref));
18 16           return KEYWORD_PLUGIN_EXPR;
19             }
20              
21 8           static int build_kw_2arg(pTHX_ OP **out, XSParseKeywordPiece *args[], size_t nargs, void *hookdata) {
22             (void)nargs;
23 8           const char *func = (const char *)hookdata;
24 8           OP *q_op = args[0]->op;
25 8           OP *val_op = args[1]->op;
26 8           OP *cvref = newCVREF(0, newGVOP(OP_GV, 0, gv_fetchpv(func, GV_ADD, SVt_PVCV)));
27 8           OP *arglist = op_append_elem(OP_LIST, q_op, val_op);
28 8           arglist = op_append_elem(OP_LIST, arglist, cvref);
29 8           *out = op_convert_list(OP_ENTERSUB, OPf_STACKED, arglist);
30 8           return KEYWORD_PLUGIN_EXPR;
31             }
32              
33             static const struct XSParseKeywordPieceType pieces_1expr[] = {
34             XPK_TERMEXPR, {0}
35             };
36              
37             static const struct XSParseKeywordPieceType pieces_2expr[] = {
38             XPK_TERMEXPR, XPK_COMMA, XPK_TERMEXPR, {0}
39             };
40              
41             #define DEFINE_PS_KW(variant, PKG, kw, nargs, builder) \
42             static const struct XSParseKeywordHooks hooks_ps_##variant##_##kw = { \
43             .flags = XPK_FLAG_EXPR, \
44             .permit_hintkey = "Data::PubSub::Shared::" PKG "/ps_" #variant "_" #kw, \
45             .pieces = pieces_##nargs##expr, \
46             .build = builder, \
47             };
48              
49             /* Publisher keywords */
50             DEFINE_PS_KW(int, "Int", publish, 2, build_kw_2arg)
51              
52             /* Subscriber keywords */
53             DEFINE_PS_KW(int, "Int", poll, 1, build_kw_1arg)
54             DEFINE_PS_KW(int, "Int", lag, 1, build_kw_1arg)
55              
56             /* Str publisher keywords */
57             DEFINE_PS_KW(str, "Str", publish, 2, build_kw_2arg)
58              
59             /* Int32 keywords */
60             DEFINE_PS_KW(int32, "Int32", publish, 2, build_kw_2arg)
61             DEFINE_PS_KW(int32, "Int32", poll, 1, build_kw_1arg)
62             DEFINE_PS_KW(int32, "Int32", lag, 1, build_kw_1arg)
63              
64             /* Int16 keywords */
65             DEFINE_PS_KW(int16, "Int16", publish, 2, build_kw_2arg)
66             DEFINE_PS_KW(int16, "Int16", poll, 1, build_kw_1arg)
67             DEFINE_PS_KW(int16, "Int16", lag, 1, build_kw_1arg)
68              
69             /* Str subscriber keywords */
70             DEFINE_PS_KW(str, "Str", poll, 1, build_kw_1arg)
71             DEFINE_PS_KW(str, "Str", lag, 1, build_kw_1arg)
72              
73             #define REGISTER_PS_KW(variant, kw, func_name) \
74             register_xs_parse_keyword("ps_" #variant "_" #kw, \
75             &hooks_ps_##variant##_##kw, (void*)func_name)
76              
77             #define EXTRACT_HANDLE(classname, sv) \
78             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
79             croak("Expected a %s object", classname); \
80             PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(sv))); \
81             if (!h) croak("Attempted to use a destroyed %s object", classname)
82              
83             #define MAKE_OBJ(class, ptr) \
84             SV *ref = newRV_noinc(newSViv(PTR2IV(ptr))); \
85             sv_bless(ref, gv_stashpv(class, GV_ADD)); \
86             RETVAL = ref
87              
88             #define EXTRACT_SUB(classname, sv) \
89             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
90             croak("Expected a %s object", classname); \
91             PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(sv))); \
92             if (!sub) croak("Attempted to use a destroyed %s object", classname)
93              
94             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int
95              
96             PROTOTYPES: DISABLE
97              
98             BOOT:
99 22           boot_xs_parse_keyword(0.40);
100 22           REGISTER_PS_KW(int, publish, "Data::PubSub::Shared::Int::publish");
101 22           REGISTER_PS_KW(int, poll, "Data::PubSub::Shared::Int::Sub::poll");
102 22           REGISTER_PS_KW(int, lag, "Data::PubSub::Shared::Int::Sub::lag");
103 22           REGISTER_PS_KW(int32, publish, "Data::PubSub::Shared::Int32::publish");
104 22           REGISTER_PS_KW(int32, poll, "Data::PubSub::Shared::Int32::Sub::poll");
105 22           REGISTER_PS_KW(int32, lag, "Data::PubSub::Shared::Int32::Sub::lag");
106 22           REGISTER_PS_KW(int16, publish, "Data::PubSub::Shared::Int16::publish");
107 22           REGISTER_PS_KW(int16, poll, "Data::PubSub::Shared::Int16::Sub::poll");
108 22           REGISTER_PS_KW(int16, lag, "Data::PubSub::Shared::Int16::Sub::lag");
109 22           REGISTER_PS_KW(str, publish, "Data::PubSub::Shared::Str::publish");
110 22           REGISTER_PS_KW(str, poll, "Data::PubSub::Shared::Str::Sub::poll");
111 22           REGISTER_PS_KW(str, lag, "Data::PubSub::Shared::Str::Sub::lag");
112              
113             SV *
114             new(class, path, capacity)
115             const char *class
116             SV *path
117             UV capacity
118             PREINIT:
119             char errbuf[PUBSUB_ERR_BUFLEN];
120             CODE:
121 66 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
122 66           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
123 66 100         if (!h) croak("Data::PubSub::Shared::Int->new: %s", errbuf);
124 65           MAKE_OBJ(class, h);
125             OUTPUT:
126             RETVAL
127              
128             SV *
129             new_memfd(class, name, capacity)
130             const char *class
131             const char *name
132             UV capacity
133             PREINIT:
134             char errbuf[PUBSUB_ERR_BUFLEN];
135             CODE:
136 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT, 0, errbuf);
137 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_memfd: %s", errbuf);
138 1           MAKE_OBJ(class, h);
139             OUTPUT:
140             RETVAL
141              
142             SV *
143             new_from_fd(class, fd)
144             const char *class
145             int fd
146             PREINIT:
147             char errbuf[PUBSUB_ERR_BUFLEN];
148             CODE:
149 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT, errbuf);
150 1 50         if (!h) croak("Data::PubSub::Shared::Int->new_from_fd: %s", errbuf);
151 1           MAKE_OBJ(class, h);
152             OUTPUT:
153             RETVAL
154              
155             IV
156             memfd(self)
157             SV *self
158             PREINIT:
159 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
160             CODE:
161 1 50         RETVAL = h->backing_fd;
162             OUTPUT:
163             RETVAL
164              
165             void
166             DESTROY(self)
167             SV *self
168             CODE:
169 67 50         if (!SvROK(self)) return;
170 67           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self)));
171 67 50         if (!h) return;
172 67           sv_setiv(SvRV(self), 0);
173 67           pubsub_destroy(h);
174              
175             bool
176             publish(self, value)
177             SV *self
178             IV value
179             PREINIT:
180 468 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
181             CODE:
182 468 50         RETVAL = pubsub_int_publish(h, (int64_t)value);
183             OUTPUT:
184             RETVAL
185              
186             UV
187             publish_multi(self, ...)
188             SV *self
189             PREINIT:
190 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
191             CODE:
192 5           uint32_t count = items - 1;
193 5 100         if (count == 0) { RETVAL = 0; }
194             else {
195 4 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
196 4           int64_t *vals = (int64_t *)alloca(count * sizeof(int64_t));
197 212 100         for (uint32_t i = 0; i < count; i++)
198 208           vals[i] = (int64_t)SvIV(ST(i + 1));
199 4           RETVAL = pubsub_int_publish_multi(h, vals, count);
200             }
201             OUTPUT:
202             RETVAL
203              
204             void
205             publish_notify(self, value)
206             SV *self
207             IV value
208             PREINIT:
209 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
210             CODE:
211 1           pubsub_int_publish(h, (int64_t)value);
212 1           pubsub_notify(h);
213              
214             SV *
215             subscribe(self)
216             SV *self
217             PREINIT:
218 21 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
219             CODE:
220 21           PubSubSub *sub = pubsub_subscribe(h, 0);
221 21 50         if (!sub) croak("subscribe: out of memory");
222 21           sub->userdata = (void *)newSVsv(self);
223 21           MAKE_OBJ("Data::PubSub::Shared::Int::Sub", sub);
224             OUTPUT:
225             RETVAL
226              
227             SV *
228             subscribe_all(self)
229             SV *self
230             PREINIT:
231 26 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
232             CODE:
233 26           PubSubSub *sub = pubsub_subscribe(h, 1);
234 26 50         if (!sub) croak("subscribe_all: out of memory");
235 26           sub->userdata = (void *)newSVsv(self);
236 26           MAKE_OBJ("Data::PubSub::Shared::Int::Sub", sub);
237             OUTPUT:
238             RETVAL
239              
240             UV
241             capacity(self)
242             SV *self
243             PREINIT:
244 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
245             CODE:
246 1 50         RETVAL = h->capacity;
247             OUTPUT:
248             RETVAL
249              
250             UV
251             write_pos(self)
252             SV *self
253             PREINIT:
254 13 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
255             CODE:
256 13 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
257             OUTPUT:
258             RETVAL
259              
260             SV *
261             path(self)
262             SV *self
263             PREINIT:
264 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
265             CODE:
266 2 100         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
267             OUTPUT:
268             RETVAL
269              
270             SV *
271             stats(self)
272             SV *self
273             PREINIT:
274 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
275             CODE:
276 5           HV *hv = newHV();
277 5           PubSubHeader *hdr = h->hdr;
278 5           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
279 5           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
280 5           hv_store(hv, "write_pos", 9,
281             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
282 5           hv_store(hv, "publish_ok", 10,
283             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
284 5           hv_store(hv, "recoveries", 10,
285             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
286 5           hv_store(hv, "sub_waiters", 11,
287             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
288 5           RETVAL = newRV_noinc((SV *)hv);
289             OUTPUT:
290             RETVAL
291              
292             void
293             clear(self)
294             SV *self
295             PREINIT:
296 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
297             CODE:
298 4           pubsub_clear(h);
299              
300             void
301             sync(self)
302             SV *self
303             PREINIT:
304 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
305             CODE:
306 4 50         if (pubsub_sync(h) != 0)
307 0           croak("msync: %s", strerror(errno));
308              
309             void
310             unlink(self_or_class, ...)
311             SV *self_or_class
312             CODE:
313             const char *path;
314 1 50         if (sv_isobject(self_or_class)) {
315 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
316 1 50         if (!h) croak("Attempted to use a destroyed object");
317 1           path = h->path;
318             } else {
319 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int->unlink($path)");
320 0           path = SvPV_nolen(ST(1));
321             }
322 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
323 1 50         if (unlink(path) != 0)
324 0           croak("unlink(%s): %s", path, strerror(errno));
325              
326             IV
327             eventfd(self)
328             SV *self
329             PREINIT:
330 9 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
331             CODE:
332 9           RETVAL = pubsub_eventfd_create(h);
333 9 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
334             OUTPUT:
335             RETVAL
336              
337             void
338             eventfd_set(self, fd)
339             SV *self
340             int fd
341             PREINIT:
342 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
343             CODE:
344 1           pubsub_eventfd_set(h, fd);
345              
346             IV
347             fileno(self)
348             SV *self
349             PREINIT:
350 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
351             CODE:
352 1 50         RETVAL = h->notify_fd;
353             OUTPUT:
354             RETVAL
355              
356             SV *
357             eventfd_consume(self)
358             SV *self
359             PREINIT:
360 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
361             CODE:
362 4           int64_t v = pubsub_eventfd_consume(h);
363 4 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
364             OUTPUT:
365             RETVAL
366              
367             void
368             notify(self)
369             SV *self
370             PREINIT:
371 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int", self);
    50          
    50          
372             CODE:
373 4           pubsub_notify(h);
374              
375             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int::Sub
376              
377             void
378             DESTROY(self)
379             SV *self
380             CODE:
381 47 50         if (!SvROK(self)) return;
382 47           PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
383 47 50         if (!sub) return;
384 47           sv_setiv(SvRV(self), 0);
385 47 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
386 47           pubsub_sub_destroy(sub);
387              
388             SV *
389             poll(self)
390             SV *self
391             PREINIT:
392 35 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
393             int64_t value;
394             CODE:
395 35           int r = pubsub_int_poll(sub, &value);
396 35 100         if (r == 1)
397 31           RETVAL = newSViv((IV)value);
398             else
399 4           RETVAL = &PL_sv_undef;
400             OUTPUT:
401             RETVAL
402              
403             void
404             poll_multi(self, count)
405             SV *self
406             UV count
407             PREINIT:
408 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
409             int64_t value;
410             PPCODE:
411 12 100         for (UV i = 0; i < count; i++) {
412 10           int r = pubsub_int_poll(sub, &value);
413 10 50         if (r != 1) break;
414 10 50         mXPUSHi((IV)value);
415             }
416              
417             SV *
418             poll_wait(self, ...)
419             SV *self
420             PREINIT:
421 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
422 9           double timeout = -1;
423             int64_t value;
424             CODE:
425 9 50         if (items > 1) timeout = SvNV(ST(1));
426 9           int r = pubsub_int_poll_wait(sub, &value, timeout);
427 9 100         if (r == 1)
428 8           RETVAL = newSViv((IV)value);
429             else
430 1           RETVAL = &PL_sv_undef;
431             OUTPUT:
432             RETVAL
433              
434             void
435             drain(self, ...)
436             SV *self
437             PREINIT:
438 8 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
439             int64_t value;
440             uint32_t max_count;
441             PPCODE:
442 8 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
443 1028 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
444 1020 50         mXPUSHi((IV)value);
445              
446             void
447             poll_wait_multi(self, count, ...)
448             SV *self
449             UV count
450             PREINIT:
451 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
452 3           double timeout = -1;
453             int64_t value;
454             PPCODE:
455 3 100         if (count == 0) XSRETURN(0);
456 2 50         if (items > 2) timeout = SvNV(ST(2));
457 2 100         if (!pubsub_int_poll_wait(sub, &value, timeout)) XSRETURN(0);
458 1 50         mXPUSHi((IV)value);
459 5 100         for (UV i = 1; i < count; i++) {
460 4 50         if (!pubsub_int_poll(sub, &value)) break;
461 4 50         mXPUSHi((IV)value);
462             }
463              
464             UV
465             lag(self)
466             SV *self
467             PREINIT:
468 18 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
469             CODE:
470 18           RETVAL = (UV)pubsub_lag(sub);
471             OUTPUT:
472             RETVAL
473              
474             UV
475             overflow_count(self)
476             SV *self
477             PREINIT:
478 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
479             CODE:
480 2 50         RETVAL = (UV)sub->overflow_count;
481             OUTPUT:
482             RETVAL
483              
484             UV
485             write_pos(self)
486             SV *self
487             PREINIT:
488 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
489             CODE:
490 2 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
491             OUTPUT:
492             RETVAL
493              
494             bool
495             has_overflow(self)
496             SV *self
497             PREINIT:
498 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
499             CODE:
500 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
501 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
502             OUTPUT:
503             RETVAL
504              
505             UV
506             cursor(self, ...)
507             SV *self
508             PREINIT:
509 6 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
510             CODE:
511 6 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
512 6 50         RETVAL = (UV)sub->cursor;
513             OUTPUT:
514             RETVAL
515              
516             void
517             reset(self)
518             SV *self
519             PREINIT:
520 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
521             CODE:
522 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
523              
524             void
525             reset_oldest(self)
526             SV *self
527             PREINIT:
528 2 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
529             CODE:
530 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
531 2 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
532              
533             UV
534             poll_cb(self, cb)
535             SV *self
536             SV *cb
537             PREINIT:
538 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
539             int64_t value;
540             CODE:
541 3           RETVAL = 0;
542 108 100         while (pubsub_int_poll(sub, &value)) {
543 105           dSP;
544 105           ENTER; SAVETMPS;
545 105 50         PUSHMARK(SP);
546 105 50         mXPUSHi((IV)value);
547 105           PUTBACK;
548 105           call_sv(cb, G_DISCARD);
549 105 50         FREETMPS; LEAVE;
550 105           RETVAL++;
551             }
552             OUTPUT:
553             RETVAL
554              
555             void
556             drain_notify(self, ...)
557             SV *self
558             PREINIT:
559 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
560             int64_t value;
561             uint32_t max_count;
562             PPCODE:
563 3           pubsub_sub_eventfd_consume(sub);
564 3 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
565 8 100         while (max_count-- > 0 && pubsub_int_poll(sub, &value))
    100          
566 5 50         mXPUSHi((IV)value);
567              
568             void
569             eventfd_set(self, fd)
570             SV *self
571             int fd
572             PREINIT:
573 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
574             CODE:
575 1           sub->notify_fd = fd;
576              
577             IV
578             fileno(self)
579             SV *self
580             PREINIT:
581 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int::Sub", self);
    50          
    50          
582             CODE:
583 3 50         RETVAL = sub->notify_fd;
584             OUTPUT:
585             RETVAL
586              
587             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str
588              
589             SV *
590             new(class, path, capacity, ...)
591             const char *class
592             SV *path
593             UV capacity
594             PREINIT:
595             char errbuf[PUBSUB_ERR_BUFLEN];
596             uint32_t msg_size;
597             CODE:
598 49 100         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
599 49 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
600 49           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
601 49 100         if (!h) croak("Data::PubSub::Shared::Str->new: %s", errbuf);
602 48           MAKE_OBJ(class, h);
603             OUTPUT:
604             RETVAL
605              
606             SV *
607             new_memfd(class, name, capacity, ...)
608             const char *class
609             const char *name
610             UV capacity
611             PREINIT:
612             char errbuf[PUBSUB_ERR_BUFLEN];
613             uint32_t msg_size;
614             CODE:
615 1 50         msg_size = (items > 3) ? (uint32_t)SvUV(ST(3)) : 0;
616 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_STR, msg_size, errbuf);
617 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_memfd: %s", errbuf);
618 1           MAKE_OBJ(class, h);
619             OUTPUT:
620             RETVAL
621              
622             SV *
623             new_from_fd(class, fd)
624             const char *class
625             int fd
626             PREINIT:
627             char errbuf[PUBSUB_ERR_BUFLEN];
628             CODE:
629 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_STR, errbuf);
630 1 50         if (!h) croak("Data::PubSub::Shared::Str->new_from_fd: %s", errbuf);
631 1           MAKE_OBJ(class, h);
632             OUTPUT:
633             RETVAL
634              
635             IV
636             memfd(self)
637             SV *self
638             PREINIT:
639 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
640             CODE:
641 1 50         RETVAL = h->backing_fd;
642             OUTPUT:
643             RETVAL
644              
645             void
646             DESTROY(self)
647             SV *self
648             CODE:
649 50 50         if (!SvROK(self)) return;
650 50           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self)));
651 50 50         if (!h) return;
652 50           sv_setiv(SvRV(self), 0);
653 50           pubsub_destroy(h);
654              
655             SV *
656             publish(self, value)
657             SV *self
658             SV *value
659             PREINIT:
660 185 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
661             CODE:
662             STRLEN len;
663 185           bool utf8 = SvUTF8(value) ? true : false;
664             const char *str;
665 185 100         if (utf8)
666 3           str = SvPVutf8(value, len);
667             else
668 182           str = SvPV(value, len);
669 185           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
670 185 100         if (r == -1) croak("publish: message too long (%u > %u)", (unsigned)len, h->msg_size);
671 183           RETVAL = &PL_sv_yes;
672             OUTPUT:
673             RETVAL
674              
675             UV
676             publish_multi(self, ...)
677             SV *self
678             PREINIT:
679 5 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
680             CODE:
681 5           RETVAL = 0;
682 5           uint32_t count = items - 1;
683 5 100         if (count > 0) {
684 4           pubsub_mutex_lock(h->hdr);
685 16 100         for (uint32_t i = 0; i < count; i++) {
686 13           SV *val = ST(i + 1);
687             STRLEN len;
688 13           bool utf8 = SvUTF8(val) ? true : false;
689             const char *str;
690 13 50         if (utf8)
691 0           str = SvPVutf8(val, len);
692             else
693 13           str = SvPV(val, len);
694 13           int r = pubsub_str_publish_locked(h, str, (uint32_t)len, utf8);
695 13 100         if (r == -1) {
696 1           pubsub_mutex_unlock(h->hdr);
697 1           croak("publish_multi: message too long (%u > %u)", (unsigned)len, h->msg_size);
698             }
699 12           RETVAL++;
700             }
701 3           pubsub_mutex_unlock(h->hdr);
702 3           pubsub_wake_subscribers(h->hdr);
703             }
704             OUTPUT:
705             RETVAL
706              
707             void
708             publish_notify(self, value)
709             SV *self
710             SV *value
711             PREINIT:
712 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
713             CODE:
714             STRLEN len;
715 1           bool utf8 = SvUTF8(value) ? true : false;
716             const char *str;
717 1 50         if (utf8)
718 0           str = SvPVutf8(value, len);
719             else
720 1           str = SvPV(value, len);
721 1           int r = pubsub_str_publish(h, str, (uint32_t)len, utf8);
722 1 50         if (r == -1) croak("publish_notify: message too long (%u > %u)", (unsigned)len, h->msg_size);
723 1           pubsub_notify(h);
724              
725             SV *
726             subscribe(self)
727             SV *self
728             PREINIT:
729 11 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
730             CODE:
731 11           PubSubSub *sub = pubsub_subscribe(h, 0);
732 11 50         if (!sub) croak("subscribe: out of memory");
733 11           sub->userdata = (void *)newSVsv(self);
734 11           MAKE_OBJ("Data::PubSub::Shared::Str::Sub", sub);
735             OUTPUT:
736             RETVAL
737              
738             SV *
739             subscribe_all(self)
740             SV *self
741             PREINIT:
742 24 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
743             CODE:
744 24           PubSubSub *sub = pubsub_subscribe(h, 1);
745 24 50         if (!sub) croak("subscribe_all: out of memory");
746 24           sub->userdata = (void *)newSVsv(self);
747 24           MAKE_OBJ("Data::PubSub::Shared::Str::Sub", sub);
748             OUTPUT:
749             RETVAL
750              
751             UV
752             capacity(self)
753             SV *self
754             PREINIT:
755 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
756             CODE:
757 1 50         RETVAL = h->capacity;
758             OUTPUT:
759             RETVAL
760              
761             UV
762             msg_size(self)
763             SV *self
764             PREINIT:
765 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
766             CODE:
767 2 50         RETVAL = h->msg_size;
768             OUTPUT:
769             RETVAL
770              
771             UV
772             write_pos(self)
773             SV *self
774             PREINIT:
775 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
776             CODE:
777 7 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
778             OUTPUT:
779             RETVAL
780              
781             SV *
782             path(self)
783             SV *self
784             PREINIT:
785 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
786             CODE:
787 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
788             OUTPUT:
789             RETVAL
790              
791             SV *
792             stats(self)
793             SV *self
794             PREINIT:
795 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
796             CODE:
797 2           HV *hv = newHV();
798 2           PubSubHeader *hdr = h->hdr;
799 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
800 2           hv_store(hv, "msg_size", 8, newSVuv(h->msg_size), 0);
801 2           hv_store(hv, "arena_cap", 9, newSVuv((UV)h->arena_cap), 0);
802 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
803 2           hv_store(hv, "write_pos", 9,
804             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
805 2           hv_store(hv, "publish_ok", 10,
806             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
807 2           hv_store(hv, "recoveries", 10,
808             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
809 2           hv_store(hv, "sub_waiters", 11,
810             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
811 2           RETVAL = newRV_noinc((SV *)hv);
812             OUTPUT:
813             RETVAL
814              
815             void
816             clear(self)
817             SV *self
818             PREINIT:
819 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
820             CODE:
821 2           pubsub_clear(h);
822              
823             void
824             sync(self)
825             SV *self
826             PREINIT:
827 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
828             CODE:
829 2 50         if (pubsub_sync(h) != 0)
830 0           croak("msync: %s", strerror(errno));
831              
832             void
833             unlink(self_or_class, ...)
834             SV *self_or_class
835             CODE:
836             const char *path;
837 1 50         if (sv_isobject(self_or_class)) {
838 1           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
839 1 50         if (!h) croak("Attempted to use a destroyed object");
840 1           path = h->path;
841             } else {
842 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Str->unlink($path)");
843 0           path = SvPV_nolen(ST(1));
844             }
845 1 50         if (!path) croak("cannot unlink anonymous or memfd pubsub");
846 1 50         if (unlink(path) != 0)
847 0           croak("unlink(%s): %s", path, strerror(errno));
848              
849             IV
850             eventfd(self)
851             SV *self
852             PREINIT:
853 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
854             CODE:
855 4           RETVAL = pubsub_eventfd_create(h);
856 4 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
857             OUTPUT:
858             RETVAL
859              
860             void
861             eventfd_set(self, fd)
862             SV *self
863             int fd
864             PREINIT:
865 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
866             CODE:
867 0           pubsub_eventfd_set(h, fd);
868              
869             IV
870             fileno(self)
871             SV *self
872             PREINIT:
873 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    0          
    0          
874             CODE:
875 0 0         RETVAL = h->notify_fd;
876             OUTPUT:
877             RETVAL
878              
879             SV *
880             eventfd_consume(self)
881             SV *self
882             PREINIT:
883 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
884             CODE:
885 2           int64_t v = pubsub_eventfd_consume(h);
886 2 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
887             OUTPUT:
888             RETVAL
889              
890             void
891             notify(self)
892             SV *self
893             PREINIT:
894 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Str", self);
    50          
    50          
895             CODE:
896 2           pubsub_notify(h);
897              
898             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Str::Sub
899              
900             void
901             DESTROY(self)
902             SV *self
903             CODE:
904 35 50         if (!SvROK(self)) return;
905 35           PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
906 35 50         if (!sub) return;
907 35           sv_setiv(SvRV(self), 0);
908 35 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
909 35           pubsub_sub_destroy(sub);
910              
911             SV *
912             poll(self)
913             SV *self
914             PREINIT:
915 37 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
916             const char *str;
917             uint32_t len;
918             bool utf8;
919             CODE:
920 37           int r = pubsub_str_poll(sub, &str, &len, &utf8);
921 37 100         if (r == 1) {
922 34           RETVAL = newSVpvn(str, len);
923 34 100         if (utf8) SvUTF8_on(RETVAL);
924             } else {
925 3           RETVAL = &PL_sv_undef;
926             }
927             OUTPUT:
928             RETVAL
929              
930             void
931             poll_multi(self, count)
932             SV *self
933             UV count
934             PREINIT:
935 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
936             const char *str;
937             uint32_t len;
938             bool utf8;
939             PPCODE:
940 4 50         for (UV i = 0; i < count; i++) {
941 4           int r = pubsub_str_poll(sub, &str, &len, &utf8);
942 4 100         if (r != 1) break;
943 3           SV *sv = newSVpvn(str, len);
944 3 50         if (utf8) SvUTF8_on(sv);
945 3 50         mXPUSHs(sv);
946             }
947              
948             SV *
949             poll_wait(self, ...)
950             SV *self
951             PREINIT:
952 54 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
953 54           double timeout = -1;
954             const char *str;
955             uint32_t len;
956             bool utf8;
957             CODE:
958 54 50         if (items > 1) timeout = SvNV(ST(1));
959 54           int r = pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout);
960 54 100         if (r == 1) {
961 53           RETVAL = newSVpvn(str, len);
962 53 50         if (utf8) SvUTF8_on(RETVAL);
963             } else {
964 1           RETVAL = &PL_sv_undef;
965             }
966             OUTPUT:
967             RETVAL
968              
969             void
970             drain(self, ...)
971             SV *self
972             PREINIT:
973 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
974             const char *str;
975             uint32_t len;
976             bool utf8;
977             uint32_t max_count;
978             PPCODE:
979 4 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
980 20 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
981 16           SV *sv = newSVpvn(str, len);
982 16 50         if (utf8) SvUTF8_on(sv);
983 16 50         mXPUSHs(sv);
984             }
985              
986             void
987             poll_wait_multi(self, count, ...)
988             SV *self
989             UV count
990             PREINIT:
991 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
992 2           double timeout = -1;
993             const char *str;
994             uint32_t len;
995             bool utf8;
996             PPCODE:
997 2 100         if (count == 0) XSRETURN(0);
998 1 50         if (items > 2) timeout = SvNV(ST(2));
999 1 50         if (pubsub_str_poll_wait(sub, &str, &len, &utf8, timeout) != 1) XSRETURN(0);
1000             {
1001 1           SV *sv = newSVpvn(str, len);
1002 1 50         if (utf8) SvUTF8_on(sv);
1003 1 50         mXPUSHs(sv);
1004             }
1005 3 100         for (UV i = 1; i < count; i++) {
1006 2 50         if (pubsub_str_poll(sub, &str, &len, &utf8) != 1) break;
1007 2           SV *sv = newSVpvn(str, len);
1008 2 50         if (utf8) SvUTF8_on(sv);
1009 2 50         mXPUSHs(sv);
1010             }
1011              
1012             UV
1013             lag(self)
1014             SV *self
1015             PREINIT:
1016 8 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1017             CODE:
1018 8           RETVAL = (UV)pubsub_lag(sub);
1019             OUTPUT:
1020             RETVAL
1021              
1022             UV
1023             overflow_count(self)
1024             SV *self
1025             PREINIT:
1026 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1027             CODE:
1028 1 50         RETVAL = (UV)sub->overflow_count;
1029             OUTPUT:
1030             RETVAL
1031              
1032             UV
1033             write_pos(self)
1034             SV *self
1035             PREINIT:
1036 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1037             CODE:
1038 1 50         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1039             OUTPUT:
1040             RETVAL
1041              
1042             bool
1043             has_overflow(self)
1044             SV *self
1045             PREINIT:
1046 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1047             CODE:
1048 2           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1049 2 100         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    100          
1050             OUTPUT:
1051             RETVAL
1052              
1053             UV
1054             cursor(self, ...)
1055             SV *self
1056             PREINIT:
1057 4 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1058             CODE:
1059 4 100         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1060 4 50         RETVAL = (UV)sub->cursor;
1061             OUTPUT:
1062             RETVAL
1063              
1064             void
1065             reset(self)
1066             SV *self
1067             PREINIT:
1068 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1069             CODE:
1070 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1071              
1072             void
1073             reset_oldest(self)
1074             SV *self
1075             PREINIT:
1076 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1077             CODE:
1078 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1079 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1080              
1081             UV
1082             poll_cb(self, cb)
1083             SV *self
1084             SV *cb
1085             PREINIT:
1086 2 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1087             const char *str;
1088             uint32_t len;
1089             bool utf8;
1090             CODE:
1091 2           RETVAL = 0;
1092 6 100         while (pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
1093 4           dSP;
1094 4           ENTER; SAVETMPS;
1095 4           SV *sv = newSVpvn(str, len);
1096 4 100         if (utf8) SvUTF8_on(sv);
1097 4 50         PUSHMARK(SP);
1098 4 50         mXPUSHs(sv);
1099 4           PUTBACK;
1100 4           call_sv(cb, G_DISCARD);
1101 4 50         FREETMPS; LEAVE;
1102 4           RETVAL++;
1103             }
1104             OUTPUT:
1105             RETVAL
1106              
1107             void
1108             drain_notify(self, ...)
1109             SV *self
1110             PREINIT:
1111 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1112             const char *str;
1113             uint32_t len;
1114             bool utf8;
1115             uint32_t max_count;
1116             PPCODE:
1117 1           pubsub_sub_eventfd_consume(sub);
1118 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1119 2 50         while (max_count-- > 0 && pubsub_str_poll(sub, &str, &len, &utf8) == 1) {
    100          
1120 1           SV *sv = newSVpvn(str, len);
1121 1 50         if (utf8) SvUTF8_on(sv);
1122 1 50         mXPUSHs(sv);
1123             }
1124              
1125             void
1126             eventfd_set(self, fd)
1127             SV *self
1128             int fd
1129             PREINIT:
1130 0 0         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    0          
    0          
1131             CODE:
1132 0           sub->notify_fd = fd;
1133              
1134             IV
1135             fileno(self)
1136             SV *self
1137             PREINIT:
1138 1 50         EXTRACT_SUB("Data::PubSub::Shared::Str::Sub", self);
    50          
    50          
1139             CODE:
1140 1 50         RETVAL = sub->notify_fd;
1141             OUTPUT:
1142             RETVAL
1143              
1144             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32
1145              
1146             SV *
1147             new(class, path, capacity)
1148             const char *class
1149             SV *path
1150             UV capacity
1151             PREINIT:
1152             char errbuf[PUBSUB_ERR_BUFLEN];
1153             CODE:
1154 12 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1155 12           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1156 12 100         if (!h) croak("Data::PubSub::Shared::Int32->new: %s", errbuf);
1157 11           MAKE_OBJ(class, h);
1158             OUTPUT:
1159             RETVAL
1160              
1161             SV *
1162             new_memfd(class, name, capacity)
1163             const char *class
1164             const char *name
1165             UV capacity
1166             PREINIT:
1167             char errbuf[PUBSUB_ERR_BUFLEN];
1168             CODE:
1169 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT32, 0, errbuf);
1170 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_memfd: %s", errbuf);
1171 1           MAKE_OBJ(class, h);
1172             OUTPUT:
1173             RETVAL
1174              
1175             SV *
1176             new_from_fd(class, fd)
1177             const char *class
1178             int fd
1179             PREINIT:
1180             char errbuf[PUBSUB_ERR_BUFLEN];
1181             CODE:
1182 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT32, errbuf);
1183 1 50         if (!h) croak("Data::PubSub::Shared::Int32->new_from_fd: %s", errbuf);
1184 1           MAKE_OBJ(class, h);
1185             OUTPUT:
1186             RETVAL
1187              
1188             IV
1189             memfd(self)
1190             SV *self
1191             PREINIT:
1192 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1193             CODE:
1194 1 50         RETVAL = h->backing_fd;
1195             OUTPUT:
1196             RETVAL
1197              
1198             void
1199             DESTROY(self)
1200             SV *self
1201             CODE:
1202 13 50         if (!SvROK(self)) return;
1203 13           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self)));
1204 13 50         if (!h) return;
1205 13           sv_setiv(SvRV(self), 0);
1206 13           pubsub_destroy(h);
1207              
1208             bool
1209             publish(self, value)
1210             SV *self
1211             IV value
1212             PREINIT:
1213 50 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1214             CODE:
1215 50 50         RETVAL = pubsub_int32_publish(h, (int32_t)value);
1216             OUTPUT:
1217             RETVAL
1218              
1219             UV
1220             publish_multi(self, ...)
1221             SV *self
1222             PREINIT:
1223 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1224             CODE:
1225 1           uint32_t count = items - 1;
1226 1 50         if (count == 0) { RETVAL = 0; }
1227             else {
1228 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1229 1           int32_t *vals = (int32_t *)alloca(count * sizeof(int32_t));
1230 4 100         for (uint32_t i = 0; i < count; i++)
1231 3           vals[i] = (int32_t)SvIV(ST(i + 1));
1232 1           RETVAL = pubsub_int32_publish_multi(h, vals, count);
1233             }
1234             OUTPUT:
1235             RETVAL
1236              
1237             void
1238             publish_notify(self, value)
1239             SV *self
1240             IV value
1241             PREINIT:
1242 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1243             CODE:
1244 0           pubsub_int32_publish(h, (int32_t)value);
1245 0           pubsub_notify(h);
1246              
1247             SV *
1248             subscribe(self)
1249             SV *self
1250             PREINIT:
1251 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1252             CODE:
1253 4           PubSubSub *sub = pubsub_subscribe(h, 0);
1254 4 50         if (!sub) croak("subscribe: out of memory");
1255 4           sub->userdata = (void *)newSVsv(self);
1256 4           MAKE_OBJ("Data::PubSub::Shared::Int32::Sub", sub);
1257             OUTPUT:
1258             RETVAL
1259              
1260             SV *
1261             subscribe_all(self)
1262             SV *self
1263             PREINIT:
1264 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1265             CODE:
1266 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1267 7 50         if (!sub) croak("subscribe_all: out of memory");
1268 7           sub->userdata = (void *)newSVsv(self);
1269 7           MAKE_OBJ("Data::PubSub::Shared::Int32::Sub", sub);
1270             OUTPUT:
1271             RETVAL
1272              
1273             UV
1274             capacity(self)
1275             SV *self
1276             PREINIT:
1277 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1278             CODE:
1279 1 50         RETVAL = h->capacity;
1280             OUTPUT:
1281             RETVAL
1282              
1283             UV
1284             write_pos(self)
1285             SV *self
1286             PREINIT:
1287 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1288             CODE:
1289 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1290             OUTPUT:
1291             RETVAL
1292              
1293             SV *
1294             path(self)
1295             SV *self
1296             PREINIT:
1297 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1298             CODE:
1299 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1300             OUTPUT:
1301             RETVAL
1302              
1303             SV *
1304             stats(self)
1305             SV *self
1306             PREINIT:
1307 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1308             CODE:
1309 2           HV *hv = newHV();
1310 2           PubSubHeader *hdr = h->hdr;
1311 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1312 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1313 2           hv_store(hv, "write_pos", 9,
1314             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1315 2           hv_store(hv, "publish_ok", 10,
1316             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1317 2           hv_store(hv, "recoveries", 10,
1318             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1319 2           hv_store(hv, "sub_waiters", 11,
1320             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1321 2           RETVAL = newRV_noinc((SV *)hv);
1322             OUTPUT:
1323             RETVAL
1324              
1325             void
1326             clear(self)
1327             SV *self
1328             PREINIT:
1329 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1330             CODE:
1331 1           pubsub_clear(h);
1332              
1333             void
1334             sync(self)
1335             SV *self
1336             PREINIT:
1337 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    50          
    50          
1338             CODE:
1339 1 50         if (pubsub_sync(h) != 0)
1340 0           croak("msync: %s", strerror(errno));
1341              
1342             void
1343             unlink(self_or_class, ...)
1344             SV *self_or_class
1345             CODE:
1346             const char *path;
1347 0 0         if (sv_isobject(self_or_class)) {
1348 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1349 0 0         if (!h) croak("Attempted to use a destroyed object");
1350 0           path = h->path;
1351             } else {
1352 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int32->unlink($path)");
1353 0           path = SvPV_nolen(ST(1));
1354             }
1355 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1356 0 0         if (unlink(path) != 0)
1357 0           croak("unlink(%s): %s", path, strerror(errno));
1358              
1359             IV
1360             eventfd(self)
1361             SV *self
1362             PREINIT:
1363 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1364             CODE:
1365 0           RETVAL = pubsub_eventfd_create(h);
1366 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1367             OUTPUT:
1368             RETVAL
1369              
1370             void
1371             eventfd_set(self, fd)
1372             SV *self
1373             int fd
1374             PREINIT:
1375 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1376             CODE:
1377 0           pubsub_eventfd_set(h, fd);
1378              
1379             IV
1380             fileno(self)
1381             SV *self
1382             PREINIT:
1383 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1384             CODE:
1385 0 0         RETVAL = h->notify_fd;
1386             OUTPUT:
1387             RETVAL
1388              
1389             SV *
1390             eventfd_consume(self)
1391             SV *self
1392             PREINIT:
1393 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1394             CODE:
1395 0           int64_t v = pubsub_eventfd_consume(h);
1396 0 0         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1397             OUTPUT:
1398             RETVAL
1399              
1400             void
1401             notify(self)
1402             SV *self
1403             PREINIT:
1404 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int32", self);
    0          
    0          
1405             CODE:
1406 0           pubsub_notify(h);
1407              
1408             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int32::Sub
1409              
1410             void
1411             DESTROY(self)
1412             SV *self
1413             CODE:
1414 11 50         if (!SvROK(self)) return;
1415 11           PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
1416 11 50         if (!sub) return;
1417 11           sv_setiv(SvRV(self), 0);
1418 11 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1419 11           pubsub_sub_destroy(sub);
1420              
1421             SV *
1422             poll(self)
1423             SV *self
1424             PREINIT:
1425 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1426             int32_t value;
1427             CODE:
1428 9           int r = pubsub_int32_poll(sub, &value);
1429 9 100         if (r == 1)
1430 8           RETVAL = newSViv((IV)value);
1431             else
1432 1           RETVAL = &PL_sv_undef;
1433             OUTPUT:
1434             RETVAL
1435              
1436             void
1437             poll_multi(self, count)
1438             SV *self
1439             UV count
1440             PREINIT:
1441 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1442             int32_t value;
1443             PPCODE:
1444 7 50         for (UV i = 0; i < count; i++) {
1445 7           int r = pubsub_int32_poll(sub, &value);
1446 7 100         if (r != 1) break;
1447 6 50         mXPUSHi((IV)value);
1448             }
1449              
1450             SV *
1451             poll_wait(self, ...)
1452             SV *self
1453             PREINIT:
1454 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1455 1           double timeout = -1;
1456             int32_t value;
1457             CODE:
1458 1 50         if (items > 1) timeout = SvNV(ST(1));
1459 1           int r = pubsub_int32_poll_wait(sub, &value, timeout);
1460 1 50         if (r == 1)
1461 0           RETVAL = newSViv((IV)value);
1462             else
1463 1           RETVAL = &PL_sv_undef;
1464             OUTPUT:
1465             RETVAL
1466              
1467             void
1468             drain(self, ...)
1469             SV *self
1470             PREINIT:
1471 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1472             int32_t value;
1473             uint32_t max_count;
1474             PPCODE:
1475 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1476 9 50         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    100          
1477 8 50         mXPUSHi((IV)value);
1478              
1479             void
1480             poll_wait_multi(self, count, ...)
1481             SV *self
1482             UV count
1483             PREINIT:
1484 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1485 1           double timeout = -1;
1486             int32_t value;
1487             PPCODE:
1488 1 50         if (count == 0) XSRETURN(0);
1489 1 50         if (items > 2) timeout = SvNV(ST(2));
1490 1 50         if (!pubsub_int32_poll_wait(sub, &value, timeout)) XSRETURN(0);
1491 1 50         mXPUSHi((IV)value);
1492 3 100         for (UV i = 1; i < count; i++) {
1493 2 50         if (!pubsub_int32_poll(sub, &value)) break;
1494 2 50         mXPUSHi((IV)value);
1495             }
1496              
1497             UV
1498             lag(self)
1499             SV *self
1500             PREINIT:
1501 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1502             CODE:
1503 3           RETVAL = (UV)pubsub_lag(sub);
1504             OUTPUT:
1505             RETVAL
1506              
1507             UV
1508             overflow_count(self)
1509             SV *self
1510             PREINIT:
1511 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1512             CODE:
1513 1 50         RETVAL = (UV)sub->overflow_count;
1514             OUTPUT:
1515             RETVAL
1516              
1517             UV
1518             write_pos(self)
1519             SV *self
1520             PREINIT:
1521 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1522             CODE:
1523 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1524             OUTPUT:
1525             RETVAL
1526              
1527             bool
1528             has_overflow(self)
1529             SV *self
1530             PREINIT:
1531 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1532             CODE:
1533 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
1534 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
1535             OUTPUT:
1536             RETVAL
1537              
1538             UV
1539             cursor(self, ...)
1540             SV *self
1541             PREINIT:
1542 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1543             CODE:
1544 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
1545 1 50         RETVAL = (UV)sub->cursor;
1546             OUTPUT:
1547             RETVAL
1548              
1549             void
1550             reset(self)
1551             SV *self
1552             PREINIT:
1553 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1554             CODE:
1555 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1556              
1557             void
1558             reset_oldest(self)
1559             SV *self
1560             PREINIT:
1561 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1562             CODE:
1563 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
1564 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
1565              
1566             UV
1567             poll_cb(self, cb)
1568             SV *self
1569             SV *cb
1570             PREINIT:
1571 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    50          
    50          
1572             int32_t value;
1573             CODE:
1574 1           RETVAL = 0;
1575 6 100         while (pubsub_int32_poll(sub, &value)) {
1576 5           dSP;
1577 5           ENTER; SAVETMPS;
1578 5 50         PUSHMARK(SP);
1579 5 50         mXPUSHi((IV)value);
1580 5           PUTBACK;
1581 5           call_sv(cb, G_DISCARD);
1582 5 50         FREETMPS; LEAVE;
1583 5           RETVAL++;
1584             }
1585             OUTPUT:
1586             RETVAL
1587              
1588             void
1589             drain_notify(self, ...)
1590             SV *self
1591             PREINIT:
1592 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1593             int32_t value;
1594             uint32_t max_count;
1595             PPCODE:
1596 0           pubsub_sub_eventfd_consume(sub);
1597 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1598 0 0         while (max_count-- > 0 && pubsub_int32_poll(sub, &value))
    0          
1599 0 0         mXPUSHi((IV)value);
1600              
1601             void
1602             eventfd_set(self, fd)
1603             SV *self
1604             int fd
1605             PREINIT:
1606 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1607             CODE:
1608 0           sub->notify_fd = fd;
1609              
1610             IV
1611             fileno(self)
1612             SV *self
1613             PREINIT:
1614 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int32::Sub", self);
    0          
    0          
1615             CODE:
1616 0 0         RETVAL = sub->notify_fd;
1617             OUTPUT:
1618             RETVAL
1619              
1620             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16
1621              
1622             SV *
1623             new(class, path, capacity)
1624             const char *class
1625             SV *path
1626             UV capacity
1627             PREINIT:
1628             char errbuf[PUBSUB_ERR_BUFLEN];
1629             CODE:
1630 11 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
1631 11           PubSubHandle *h = pubsub_create(p, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1632 11 100         if (!h) croak("Data::PubSub::Shared::Int16->new: %s", errbuf);
1633 10           MAKE_OBJ(class, h);
1634             OUTPUT:
1635             RETVAL
1636              
1637             SV *
1638             new_memfd(class, name, capacity)
1639             const char *class
1640             const char *name
1641             UV capacity
1642             PREINIT:
1643             char errbuf[PUBSUB_ERR_BUFLEN];
1644             CODE:
1645 1           PubSubHandle *h = pubsub_create_memfd(name, (uint32_t)capacity, PUBSUB_MODE_INT16, 0, errbuf);
1646 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_memfd: %s", errbuf);
1647 1           MAKE_OBJ(class, h);
1648             OUTPUT:
1649             RETVAL
1650              
1651             SV *
1652             new_from_fd(class, fd)
1653             const char *class
1654             int fd
1655             PREINIT:
1656             char errbuf[PUBSUB_ERR_BUFLEN];
1657             CODE:
1658 1           PubSubHandle *h = pubsub_open_fd(fd, PUBSUB_MODE_INT16, errbuf);
1659 1 50         if (!h) croak("Data::PubSub::Shared::Int16->new_from_fd: %s", errbuf);
1660 1           MAKE_OBJ(class, h);
1661             OUTPUT:
1662             RETVAL
1663              
1664             IV
1665             memfd(self)
1666             SV *self
1667             PREINIT:
1668 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1669             CODE:
1670 1 50         RETVAL = h->backing_fd;
1671             OUTPUT:
1672             RETVAL
1673              
1674             void
1675             DESTROY(self)
1676             SV *self
1677             CODE:
1678 12 50         if (!SvROK(self)) return;
1679 12           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self)));
1680 12 50         if (!h) return;
1681 12           sv_setiv(SvRV(self), 0);
1682 12           pubsub_destroy(h);
1683              
1684             bool
1685             publish(self, value)
1686             SV *self
1687             IV value
1688             PREINIT:
1689 49 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1690             CODE:
1691 49 50         RETVAL = pubsub_int16_publish(h, (int16_t)value);
1692             OUTPUT:
1693             RETVAL
1694              
1695             UV
1696             publish_multi(self, ...)
1697             SV *self
1698             PREINIT:
1699 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1700             CODE:
1701 1           uint32_t count = items - 1;
1702 1 50         if (count == 0) { RETVAL = 0; }
1703             else {
1704 1 50         if (count > 8192) croak("publish_multi: too many values (%u > 8192)", count);
1705 1           int16_t *vals = (int16_t *)alloca(count * sizeof(int16_t));
1706 4 100         for (uint32_t i = 0; i < count; i++)
1707 3           vals[i] = (int16_t)SvIV(ST(i + 1));
1708 1           RETVAL = pubsub_int16_publish_multi(h, vals, count);
1709             }
1710             OUTPUT:
1711             RETVAL
1712              
1713             void
1714             publish_notify(self, value)
1715             SV *self
1716             IV value
1717             PREINIT:
1718 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1719             CODE:
1720 0           pubsub_int16_publish(h, (int16_t)value);
1721 0           pubsub_notify(h);
1722              
1723             SV *
1724             subscribe(self)
1725             SV *self
1726             PREINIT:
1727 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1728             CODE:
1729 4           PubSubSub *sub = pubsub_subscribe(h, 0);
1730 4 50         if (!sub) croak("subscribe: out of memory");
1731 4           sub->userdata = (void *)newSVsv(self);
1732 4           MAKE_OBJ("Data::PubSub::Shared::Int16::Sub", sub);
1733             OUTPUT:
1734             RETVAL
1735              
1736             SV *
1737             subscribe_all(self)
1738             SV *self
1739             PREINIT:
1740 7 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1741             CODE:
1742 7           PubSubSub *sub = pubsub_subscribe(h, 1);
1743 7 50         if (!sub) croak("subscribe_all: out of memory");
1744 7           sub->userdata = (void *)newSVsv(self);
1745 7           MAKE_OBJ("Data::PubSub::Shared::Int16::Sub", sub);
1746             OUTPUT:
1747             RETVAL
1748              
1749             UV
1750             capacity(self)
1751             SV *self
1752             PREINIT:
1753 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1754             CODE:
1755 1 50         RETVAL = h->capacity;
1756             OUTPUT:
1757             RETVAL
1758              
1759             UV
1760             write_pos(self)
1761             SV *self
1762             PREINIT:
1763 4 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1764             CODE:
1765 4 50         RETVAL = (UV)__atomic_load_n(&h->hdr->write_pos, __ATOMIC_RELAXED);
1766             OUTPUT:
1767             RETVAL
1768              
1769             SV *
1770             path(self)
1771             SV *self
1772             PREINIT:
1773 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1774             CODE:
1775 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1776             OUTPUT:
1777             RETVAL
1778              
1779             SV *
1780             stats(self)
1781             SV *self
1782             PREINIT:
1783 2 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1784             CODE:
1785 2           HV *hv = newHV();
1786 2           PubSubHeader *hdr = h->hdr;
1787 2           hv_store(hv, "capacity", 8, newSVuv(h->capacity), 0);
1788 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1789 2           hv_store(hv, "write_pos", 9,
1790             newSVuv((UV)__atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED)), 0);
1791 2           hv_store(hv, "publish_ok", 10,
1792             newSVuv((UV)__atomic_load_n(&hdr->stat_publish_ok, __ATOMIC_RELAXED)), 0);
1793 2           hv_store(hv, "recoveries", 10,
1794             newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1795 2           hv_store(hv, "sub_waiters", 11,
1796             newSVuv(__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED)), 0);
1797 2           RETVAL = newRV_noinc((SV *)hv);
1798             OUTPUT:
1799             RETVAL
1800              
1801             void
1802             clear(self)
1803             SV *self
1804             PREINIT:
1805 1 50         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    50          
    50          
1806             CODE:
1807 1           pubsub_clear(h);
1808              
1809             void
1810             sync(self)
1811             SV *self
1812             PREINIT:
1813 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1814             CODE:
1815 0 0         if (pubsub_sync(h) != 0)
1816 0           croak("msync: %s", strerror(errno));
1817              
1818             void
1819             unlink(self_or_class, ...)
1820             SV *self_or_class
1821             CODE:
1822             const char *path;
1823 0 0         if (sv_isobject(self_or_class)) {
1824 0           PubSubHandle *h = INT2PTR(PubSubHandle*, SvIV(SvRV(self_or_class)));
1825 0 0         if (!h) croak("Attempted to use a destroyed object");
1826 0           path = h->path;
1827             } else {
1828 0 0         if (items < 2) croak("Usage: Data::PubSub::Shared::Int16->unlink($path)");
1829 0           path = SvPV_nolen(ST(1));
1830             }
1831 0 0         if (!path) croak("cannot unlink anonymous or memfd pubsub");
1832 0 0         if (unlink(path) != 0)
1833 0           croak("unlink(%s): %s", path, strerror(errno));
1834              
1835             IV
1836             eventfd(self)
1837             SV *self
1838             PREINIT:
1839 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1840             CODE:
1841 0           RETVAL = pubsub_eventfd_create(h);
1842 0 0         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1843             OUTPUT:
1844             RETVAL
1845              
1846             void
1847             eventfd_set(self, fd)
1848             SV *self
1849             int fd
1850             PREINIT:
1851 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1852             CODE:
1853 0           pubsub_eventfd_set(h, fd);
1854              
1855             IV
1856             fileno(self)
1857             SV *self
1858             PREINIT:
1859 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1860             CODE:
1861 0 0         RETVAL = h->notify_fd;
1862             OUTPUT:
1863             RETVAL
1864              
1865             SV *
1866             eventfd_consume(self)
1867             SV *self
1868             PREINIT:
1869 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1870             CODE:
1871 0           int64_t v = pubsub_eventfd_consume(h);
1872 0 0         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1873             OUTPUT:
1874             RETVAL
1875              
1876             void
1877             notify(self)
1878             SV *self
1879             PREINIT:
1880 0 0         EXTRACT_HANDLE("Data::PubSub::Shared::Int16", self);
    0          
    0          
1881             CODE:
1882 0           pubsub_notify(h);
1883              
1884             MODULE = Data::PubSub::Shared PACKAGE = Data::PubSub::Shared::Int16::Sub
1885              
1886             void
1887             DESTROY(self)
1888             SV *self
1889             CODE:
1890 11 50         if (!SvROK(self)) return;
1891 11           PubSubSub *sub = INT2PTR(PubSubSub*, SvIV(SvRV(self)));
1892 11 50         if (!sub) return;
1893 11           sv_setiv(SvRV(self), 0);
1894 11 50         if (sub->userdata) SvREFCNT_dec((SV *)sub->userdata);
1895 11           pubsub_sub_destroy(sub);
1896              
1897             SV *
1898             poll(self)
1899             SV *self
1900             PREINIT:
1901 9 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1902             int16_t value;
1903             CODE:
1904 9           int r = pubsub_int16_poll(sub, &value);
1905 9 100         if (r == 1)
1906 8           RETVAL = newSViv((IV)value);
1907             else
1908 1           RETVAL = &PL_sv_undef;
1909             OUTPUT:
1910             RETVAL
1911              
1912             void
1913             poll_multi(self, count)
1914             SV *self
1915             UV count
1916             PREINIT:
1917 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1918             int16_t value;
1919             PPCODE:
1920 7 50         for (UV i = 0; i < count; i++) {
1921 7           int r = pubsub_int16_poll(sub, &value);
1922 7 100         if (r != 1) break;
1923 6 50         mXPUSHi((IV)value);
1924             }
1925              
1926             SV *
1927             poll_wait(self, ...)
1928             SV *self
1929             PREINIT:
1930 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1931 1           double timeout = -1;
1932             int16_t value;
1933             CODE:
1934 1 50         if (items > 1) timeout = SvNV(ST(1));
1935 1           int r = pubsub_int16_poll_wait(sub, &value, timeout);
1936 1 50         if (r == 1)
1937 0           RETVAL = newSViv((IV)value);
1938             else
1939 1           RETVAL = &PL_sv_undef;
1940             OUTPUT:
1941             RETVAL
1942              
1943             void
1944             drain(self, ...)
1945             SV *self
1946             PREINIT:
1947 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1948             int16_t value;
1949             uint32_t max_count;
1950             PPCODE:
1951 1 50         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
1952 9 50         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    100          
1953 8 50         mXPUSHi((IV)value);
1954              
1955             void
1956             poll_wait_multi(self, count, ...)
1957             SV *self
1958             UV count
1959             PREINIT:
1960 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1961 1           double timeout = -1;
1962             int16_t value;
1963             PPCODE:
1964 1 50         if (count == 0) XSRETURN(0);
1965 1 50         if (items > 2) timeout = SvNV(ST(2));
1966 1 50         if (!pubsub_int16_poll_wait(sub, &value, timeout)) XSRETURN(0);
1967 1 50         mXPUSHi((IV)value);
1968 3 100         for (UV i = 1; i < count; i++) {
1969 2 50         if (!pubsub_int16_poll(sub, &value)) break;
1970 2 50         mXPUSHi((IV)value);
1971             }
1972              
1973             UV
1974             lag(self)
1975             SV *self
1976             PREINIT:
1977 3 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1978             CODE:
1979 3           RETVAL = (UV)pubsub_lag(sub);
1980             OUTPUT:
1981             RETVAL
1982              
1983             UV
1984             overflow_count(self)
1985             SV *self
1986             PREINIT:
1987 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
1988             CODE:
1989 1 50         RETVAL = (UV)sub->overflow_count;
1990             OUTPUT:
1991             RETVAL
1992              
1993             UV
1994             write_pos(self)
1995             SV *self
1996             PREINIT:
1997 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
1998             CODE:
1999 0 0         RETVAL = (UV)__atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2000             OUTPUT:
2001             RETVAL
2002              
2003             bool
2004             has_overflow(self)
2005             SV *self
2006             PREINIT:
2007 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2008             CODE:
2009 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_RELAXED);
2010 1 50         RETVAL = (sub->cursor < wp && wp - sub->cursor > sub->capacity);
    50          
    50          
2011             OUTPUT:
2012             RETVAL
2013              
2014             UV
2015             cursor(self, ...)
2016             SV *self
2017             PREINIT:
2018 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2019             CODE:
2020 1 50         if (items > 1) sub->cursor = (uint64_t)SvUV(ST(1));
2021 1 50         RETVAL = (UV)sub->cursor;
2022             OUTPUT:
2023             RETVAL
2024              
2025             void
2026             reset(self)
2027             SV *self
2028             PREINIT:
2029 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2030             CODE:
2031 1           sub->cursor = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2032              
2033             void
2034             reset_oldest(self)
2035             SV *self
2036             PREINIT:
2037 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2038             CODE:
2039 1           uint64_t wp = __atomic_load_n(&sub->hdr->write_pos, __ATOMIC_ACQUIRE);
2040 1 50         sub->cursor = (wp > sub->capacity) ? wp - sub->capacity : 0;
2041              
2042             UV
2043             poll_cb(self, cb)
2044             SV *self
2045             SV *cb
2046             PREINIT:
2047 1 50         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    50          
    50          
2048             int16_t value;
2049             CODE:
2050 1           RETVAL = 0;
2051 6 100         while (pubsub_int16_poll(sub, &value)) {
2052 5           dSP;
2053 5           ENTER; SAVETMPS;
2054 5 50         PUSHMARK(SP);
2055 5 50         mXPUSHi((IV)value);
2056 5           PUTBACK;
2057 5           call_sv(cb, G_DISCARD);
2058 5 50         FREETMPS; LEAVE;
2059 5           RETVAL++;
2060             }
2061             OUTPUT:
2062             RETVAL
2063              
2064             void
2065             drain_notify(self, ...)
2066             SV *self
2067             PREINIT:
2068 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2069             int16_t value;
2070             uint32_t max_count;
2071             PPCODE:
2072 0           pubsub_sub_eventfd_consume(sub);
2073 0 0         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
2074 0 0         while (max_count-- > 0 && pubsub_int16_poll(sub, &value))
    0          
2075 0 0         mXPUSHi((IV)value);
2076              
2077             void
2078             eventfd_set(self, fd)
2079             SV *self
2080             int fd
2081             PREINIT:
2082 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2083             CODE:
2084 0           sub->notify_fd = fd;
2085              
2086             IV
2087             fileno(self)
2088             SV *self
2089             PREINIT:
2090 0 0         EXTRACT_SUB("Data::PubSub::Shared::Int16::Sub", self);
    0          
    0          
2091             CODE:
2092 0 0         RETVAL = sub->notify_fd;
2093             OUTPUT:
2094             RETVAL