File Coverage

Shared.xs
Criterion Covered Total %
statement 444 524 84.7
branch 392 950 41.2
condition n/a
subroutine n/a
pod n/a
total 836 1474 56.7


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "reqrep.h"
8              
9             #define EXTRACT_HANDLE(classname, sv) \
10             if (!sv_isobject(sv) || !sv_derived_from(sv, classname)) \
11             croak("Expected a %s object", classname); \
12             ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(sv))); \
13             if (!h) croak("Attempted to use a destroyed %s object", classname)
14              
15             #define MAKE_OBJ(class, ptr) \
16             SV *ref = newRV_noinc(newSViv(PTR2IV(ptr))); \
17             sv_bless(ref, gv_stashpv(class, GV_ADD)); \
18             RETVAL = ref
19              
20             MODULE = Data::ReqRep::Shared PACKAGE = Data::ReqRep::Shared
21              
22             PROTOTYPES: DISABLE
23              
24             SV *
25             new(class, path, req_cap, resp_slots, resp_size, ...)
26             const char *class
27             SV *path
28             UV req_cap
29             UV resp_slots
30             UV resp_size
31             PREINIT:
32             char errbuf[REQREP_ERR_BUFLEN];
33             uint64_t arena_cap;
34             CODE:
35 48 100         arena_cap = (items > 5) ? (uint64_t)SvUV(ST(5)) : 0;
36 48 100         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
37 48           ReqRepHandle *h = reqrep_create(p, (uint32_t)req_cap, (uint32_t)resp_slots,
38             (uint32_t)resp_size, arena_cap, errbuf);
39 48 50         if (!h) croak("Data::ReqRep::Shared->new: %s", errbuf);
40 48           MAKE_OBJ(class, h);
41             OUTPUT:
42             RETVAL
43              
44             SV *
45             new_memfd(class, name, req_cap, resp_slots, resp_size, ...)
46             const char *class
47             const char *name
48             UV req_cap
49             UV resp_slots
50             UV resp_size
51             PREINIT:
52             char errbuf[REQREP_ERR_BUFLEN];
53             uint64_t arena_cap;
54             CODE:
55 3 50         arena_cap = (items > 5) ? (uint64_t)SvUV(ST(5)) : 0;
56 3           ReqRepHandle *h = reqrep_create_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots,
57             (uint32_t)resp_size, arena_cap, errbuf);
58 3 50         if (!h) croak("Data::ReqRep::Shared->new_memfd: %s", errbuf);
59 3           MAKE_OBJ(class, h);
60             OUTPUT:
61             RETVAL
62              
63             SV *
64             new_from_fd(class, fd)
65             const char *class
66             int fd
67             PREINIT:
68             char errbuf[REQREP_ERR_BUFLEN];
69             CODE:
70 1           ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_STR, errbuf);
71 1 50         if (!h) croak("Data::ReqRep::Shared->new_from_fd: %s", errbuf);
72 1           MAKE_OBJ(class, h);
73             OUTPUT:
74             RETVAL
75              
76             IV
77             memfd(self)
78             SV *self
79             PREINIT:
80 3 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
81             CODE:
82 3 50         RETVAL = h->backing_fd;
83             OUTPUT:
84             RETVAL
85              
86             void
87             DESTROY(self)
88             SV *self
89             CODE:
90 52 50         if (!SvROK(self)) return;
91 52           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
92 52 50         if (!h) return;
93 52           sv_setiv(SvRV(self), 0);
94 52           reqrep_destroy(h);
95              
96             void
97             recv(self)
98             SV *self
99             PREINIT:
100 91 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
101             const char *str;
102             uint32_t len;
103             uint64_t id;
104             bool utf8;
105             PPCODE:
106 91           int r = reqrep_try_recv(h, &str, &len, &utf8, &id);
107 91 50         if (r == -1) croak("Data::ReqRep::Shared: out of memory");
108 91 100         if (r == 1) {
109 81           SV *sv = newSVpvn(str, len);
110 81 100         if (utf8) SvUTF8_on(sv);
111 81 50         mXPUSHs(sv);
112 81 50         mXPUSHu((UV)id);
113             }
114              
115             void
116             recv_wait(self, ...)
117             SV *self
118             PREINIT:
119 2152 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
120 2152           double timeout = -1;
121             const char *str;
122             uint32_t len;
123             uint64_t id;
124             bool utf8;
125             PPCODE:
126 2152 50         if (items > 1) timeout = SvNV(ST(1));
127 2152           int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
128 2152 50         if (r == -1) croak("Data::ReqRep::Shared: out of memory");
129 2152 100         if (r == 1) {
130 2148           SV *sv = newSVpvn(str, len);
131 2148 50         if (utf8) SvUTF8_on(sv);
132 2148 50         mXPUSHs(sv);
133 2148 50         mXPUSHu((UV)id);
134             }
135              
136             void
137             recv_multi(self, count)
138             SV *self
139             UV count
140             PREINIT:
141 6 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
142             const char *str;
143             uint32_t len;
144             uint64_t id;
145             bool utf8;
146             PPCODE:
147             /* Hoist Perl SV construction out of process-shared futex mutex. */
148 6           struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
149 6           UV n = 0;
150 6           int last_r = 0;
151 6           int oom = 0;
152 6 50         if (count > 0) {
153 6           items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
154 6 50         if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
155             }
156 6           reqrep_mutex_lock(h->hdr);
157 16 100         for (UV i = 0; i < count; i++) {
158 14           last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
159 14 100         if (last_r <= 0) break;
160 10 50         char *c = (char *)malloc(len ? len : 1);
161 10 50         if (!c) { oom = 1; break; }
162 10 50         if (len) memcpy(c, str, len);
163 10           items_buf[n].buf = c;
164 10           items_buf[n].len = len;
165 10           items_buf[n].id = id;
166 10           items_buf[n].utf8 = utf8;
167 10           n++;
168             }
169 6           reqrep_mutex_unlock(h->hdr);
170 6           reqrep_wake_producers(h->hdr);
171 6 50         EXTEND(SP, (SSize_t)(2 * n));
    50          
172 16 100         for (UV j = 0; j < n; j++) {
173 10           SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
174 10 50         if (items_buf[j].utf8) SvUTF8_on(sv);
175 10           PUSHs(sv_2mortal(sv));
176 10           PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
177 10           free(items_buf[j].buf);
178             }
179 6           free(items_buf);
180 6 50         if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
    50          
181              
182             void
183             recv_wait_multi(self, count, ...)
184             SV *self
185             UV count
186             PREINIT:
187 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
188 2           double timeout = -1;
189             const char *str;
190             uint32_t len;
191             uint64_t id;
192             bool utf8;
193             PPCODE:
194 2 50         if (items > 2) timeout = SvNV(ST(2));
195             /* Block until at least 1 */
196 2           int r = reqrep_recv_wait(h, &str, &len, &utf8, &id, timeout);
197 2 50         if (r == -1) croak("Data::ReqRep::Shared: out of memory");
198 2 100         if (r != 1) XSRETURN(0);
199             {
200 1           SV *sv = newSVpvn(str, len);
201 1 50         if (utf8) SvUTF8_on(sv);
202 1 50         mXPUSHs(sv);
203 1 50         mXPUSHu((UV)id);
204             }
205             /* Grab up to count-1 more non-blocking — hoist SV construction out of lock. */
206 1           struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
207 1           UV n = 0;
208 1           int last_r2 = 0;
209 1           int oom = 0;
210 1 50         if (count > 1) {
211 1           items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
212 1 50         if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
213             }
214 1           reqrep_mutex_lock(h->hdr);
215 4 50         for (UV i = 1; i < count; i++) {
216 4           last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
217 4 100         if (last_r2 <= 0) break;
218 3 50         char *c = (char *)malloc(len ? len : 1);
219 3 50         if (!c) { oom = 1; break; }
220 3 50         if (len) memcpy(c, str, len);
221 3           items_buf[n].buf = c;
222 3           items_buf[n].len = len;
223 3           items_buf[n].id = id;
224 3           items_buf[n].utf8 = utf8;
225 3           n++;
226             }
227 1           reqrep_mutex_unlock(h->hdr);
228 1           reqrep_wake_producers(h->hdr);
229 1 50         EXTEND(SP, (SSize_t)(2 * n));
    50          
230 4 100         for (UV j = 0; j < n; j++) {
231 3           SV *sv = newSVpvn(items_buf[j].buf, items_buf[j].len);
232 3 50         if (items_buf[j].utf8) SvUTF8_on(sv);
233 3           PUSHs(sv_2mortal(sv));
234 3           PUSHs(sv_2mortal(newSVuv((UV)items_buf[j].id)));
235 3           free(items_buf[j].buf);
236             }
237 1           free(items_buf);
238 1 50         if (last_r2 == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
    50          
239              
240             void
241             drain(self, ...)
242             SV *self
243             PREINIT:
244 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
245             const char *str;
246             uint32_t len;
247             uint64_t id;
248             bool utf8;
249             uint32_t max_count;
250             PPCODE:
251 2 100         max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
252             /* Hoist SV construction out of the mutex (see recv_multi). */
253 2           struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
254 2           UV drained_n = 0;
255 2           int last_r = 0;
256 2           int oom = 0;
257 2           reqrep_mutex_lock(h->hdr);
258 11 100         while (max_count-- > 0) {
259 10           last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
260 10 100         if (last_r <= 0) break;
261 9           struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
262 9 50         char *c = (char *)malloc(len ? len : 1);
263 9 50         if (!it || !c) { free(it); free(c); oom = 1; break; }
    50          
264 9 50         if (len) memcpy(c, str, len);
265 9           it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
266 9 100         if (drained_tail) drained_tail->next = it; else drained_head = it;
267 9           drained_tail = it;
268 9           drained_n++;
269             }
270 2           reqrep_mutex_unlock(h->hdr);
271 2           reqrep_wake_producers(h->hdr);
272 2 50         EXTEND(SP, (SSize_t)(2 * drained_n));
    50          
273 11 100         while (drained_head) {
274 9           struct drain_item *it = drained_head; drained_head = it->next;
275 9           SV *sv = newSVpvn(it->buf, it->len);
276 9 50         if (it->utf8) SvUTF8_on(sv);
277 9           PUSHs(sv_2mortal(sv));
278 9           PUSHs(sv_2mortal(newSVuv((UV)it->id)));
279 9           free(it->buf);
280 9           free(it);
281             }
282 2 50         if (last_r == -1 || oom) croak("Data::ReqRep::Shared: out of memory");
    50          
283              
284             bool
285             reply(self, id, value)
286             SV *self
287             UV id
288             SV *value
289             PREINIT:
290 2249 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
291             STRLEN len;
292             CODE:
293 2249           const char *str = SvPV(value, len);
294 2249           bool utf8 = SvUTF8(value) ? true : false;
295 2249           int r = reqrep_reply(h, (uint64_t)id, str, (uint32_t)len, utf8);
296 2249 50         if (r == -1) croak("Data::ReqRep::Shared: invalid slot index");
297 2249 100         if (r == -3) croak("Data::ReqRep::Shared: response too long (max %u bytes)", h->resp_data_max);
298 2248 100         RETVAL = (r == 1);
299             OUTPUT:
300             RETVAL
301              
302             UV
303             size(self)
304             SV *self
305             PREINIT:
306 23 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
307             CODE:
308 23           RETVAL = (UV)reqrep_size(h);
309             OUTPUT:
310             RETVAL
311              
312             UV
313             capacity(self)
314             SV *self
315             PREINIT:
316 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
317             CODE:
318 4 50         RETVAL = h->req_cap;
319             OUTPUT:
320             RETVAL
321              
322             UV
323             resp_slots(self)
324             SV *self
325             PREINIT:
326 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
327             CODE:
328 4 50         RETVAL = h->resp_slots;
329             OUTPUT:
330             RETVAL
331              
332             UV
333             resp_size(self)
334             SV *self
335             PREINIT:
336 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
337             CODE:
338 4 50         RETVAL = h->resp_data_max;
339             OUTPUT:
340             RETVAL
341              
342             bool
343             is_empty(self)
344             SV *self
345             PREINIT:
346 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
347             CODE:
348 4 50         RETVAL = (reqrep_size(h) == 0);
349             OUTPUT:
350             RETVAL
351              
352             void
353             clear(self)
354             SV *self
355             PREINIT:
356 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
357             CODE:
358 1           reqrep_clear(h);
359              
360             void
361             unlink(self_or_class, ...)
362             SV *self_or_class
363             CODE:
364             const char *path;
365 29 50         if (sv_isobject(self_or_class)) {
366 29           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self_or_class)));
367 29 50         if (!h) croak("Attempted to use a destroyed object");
368 29           path = h->path;
369             } else {
370 0 0         if (items < 2) croak("Usage: Data::ReqRep::Shared->unlink($path)");
371 0           path = SvPV_nolen(ST(1));
372             }
373 29 50         if (!path) croak("cannot unlink anonymous or memfd channel");
374 29 50         if (unlink(path) != 0)
375 0           croak("unlink(%s): %s", path, strerror(errno));
376              
377             SV *
378             path(self)
379             SV *self
380             PREINIT:
381 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    0          
    0          
382             CODE:
383 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
384             OUTPUT:
385             RETVAL
386              
387             SV *
388             stats(self)
389             SV *self
390             PREINIT:
391 3 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
392             CODE:
393 3           HV *hv = newHV();
394 3           ReqRepHeader *hdr = h->hdr;
395 3           hv_store(hv, "size", 4, newSVuv((UV)reqrep_size(h)), 0);
396 3           hv_store(hv, "capacity", 8, newSVuv(h->req_cap), 0);
397 3           hv_store(hv, "resp_slots", 10, newSVuv(h->resp_slots), 0);
398 3           hv_store(hv, "resp_data_max", 13, newSVuv(h->resp_data_max), 0);
399 3           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
400 3           hv_store(hv, "arena_cap", 9, newSVuv(h->req_arena_cap), 0);
401 3           hv_store(hv, "arena_used", 10, newSVuv((UV)__atomic_load_n(&hdr->arena_used, __ATOMIC_RELAXED)), 0);
402 3           hv_store(hv, "requests", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_requests, __ATOMIC_RELAXED)), 0);
403 3           hv_store(hv, "replies", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_replies, __ATOMIC_RELAXED)), 0);
404 3           hv_store(hv, "send_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_send_full, __ATOMIC_RELAXED)), 0);
405 3           hv_store(hv, "recv_empty", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recv_empty, __ATOMIC_RELAXED)), 0);
406 3           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
407 3           hv_store(hv, "recv_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED)), 0);
408 3           hv_store(hv, "send_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED)), 0);
409 3           hv_store(hv, "slot_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED)), 0);
410 3           RETVAL = newRV_noinc((SV *)hv);
411             OUTPUT:
412             RETVAL
413              
414             void
415             sync(self)
416             SV *self
417             PREINIT:
418 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    0          
    0          
419             CODE:
420 0 0         if (reqrep_sync(h) != 0)
421 0           croak("msync: %s", strerror(errno));
422              
423             IV
424             eventfd(self)
425             SV *self
426             PREINIT:
427 5 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
428             CODE:
429 5           RETVAL = reqrep_eventfd_create(h);
430 5 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
431             OUTPUT:
432             RETVAL
433              
434             void
435             eventfd_set(self, fd)
436             SV *self
437             int fd
438             PREINIT:
439 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    0          
    0          
440             CODE:
441 0           reqrep_eventfd_set(h, fd);
442              
443             IV
444             fileno(self)
445             SV *self
446             PREINIT:
447 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
448             CODE:
449 2 50         RETVAL = h->notify_fd;
450             OUTPUT:
451             RETVAL
452              
453             SV *
454             eventfd_consume(self)
455             SV *self
456             PREINIT:
457 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
458             CODE:
459 2           int64_t v = reqrep_eventfd_consume(h);
460 2 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
461             OUTPUT:
462             RETVAL
463              
464             void
465             notify(self)
466             SV *self
467             PREINIT:
468 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    0          
    0          
469             CODE:
470 0           reqrep_notify(h);
471              
472             IV
473             reply_eventfd(self)
474             SV *self
475             PREINIT:
476 5 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
477             CODE:
478 5           RETVAL = reqrep_reply_eventfd_create(h);
479 5 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
480             OUTPUT:
481             RETVAL
482              
483             void
484             reply_eventfd_set(self, fd)
485             SV *self
486             int fd
487             PREINIT:
488 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    0          
    0          
489             CODE:
490 0           reqrep_reply_eventfd_set(h, fd);
491              
492             IV
493             reply_fileno(self)
494             SV *self
495             PREINIT:
496 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
497             CODE:
498 2 50         RETVAL = h->reply_fd;
499             OUTPUT:
500             RETVAL
501              
502             SV *
503             reply_eventfd_consume(self)
504             SV *self
505             PREINIT:
506 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
507             CODE:
508 1           int64_t v = reqrep_reply_eventfd_consume(h);
509 1 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
510             OUTPUT:
511             RETVAL
512              
513             void
514             reply_notify(self)
515             SV *self
516             PREINIT:
517 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared", self);
    50          
    50          
518             CODE:
519 4           reqrep_reply_notify(h);
520              
521              
522             MODULE = Data::ReqRep::Shared PACKAGE = Data::ReqRep::Shared::Client
523              
524             SV *
525             new(class, path)
526             const char *class
527             SV *path
528             PREINIT:
529             char errbuf[REQREP_ERR_BUFLEN];
530             CODE:
531 42           const char *p = SvPV_nolen(path);
532 42           ReqRepHandle *h = reqrep_open(p, REQREP_MODE_STR, errbuf);
533 42 50         if (!h) croak("Data::ReqRep::Shared::Client->new: %s", errbuf);
534 42           MAKE_OBJ(class, h);
535             OUTPUT:
536             RETVAL
537              
538             SV *
539             new_from_fd(class, fd)
540             const char *class
541             int fd
542             PREINIT:
543             char errbuf[REQREP_ERR_BUFLEN];
544             CODE:
545 1           ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_STR, errbuf);
546 1 50         if (!h) croak("Data::ReqRep::Shared::Client->new_from_fd: %s", errbuf);
547 1           MAKE_OBJ(class, h);
548             OUTPUT:
549             RETVAL
550              
551             IV
552             memfd(self)
553             SV *self
554             PREINIT:
555 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    0          
    0          
556             CODE:
557 0 0         RETVAL = h->backing_fd;
558             OUTPUT:
559             RETVAL
560              
561             void
562             DESTROY(self)
563             SV *self
564             CODE:
565 43 50         if (!SvROK(self)) return;
566 43           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
567 43 50         if (!h) return;
568 43           sv_setiv(SvRV(self), 0);
569 43           reqrep_destroy(h);
570              
571             SV *
572             send(self, value)
573             SV *self
574             SV *value
575             PREINIT:
576 96 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
577             STRLEN len;
578             uint64_t id;
579             CODE:
580 96           const char *str = SvPV(value, len);
581 96           bool utf8 = SvUTF8(value) ? true : false;
582 96           int r = reqrep_try_send(h, str, (uint32_t)len, utf8, &id);
583 96 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
584 96 100         RETVAL = (r == 1) ? newSVuv((UV)id) : &PL_sv_undef;
585             OUTPUT:
586             RETVAL
587              
588             SV *
589             send_wait(self, value, ...)
590             SV *self
591             SV *value
592             PREINIT:
593 216 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
594 216           double timeout = -1;
595             STRLEN len;
596             uint64_t id;
597             CODE:
598 216 50         if (items > 2) timeout = SvNV(ST(2));
599 216           const char *str = SvPV(value, len);
600 216           bool utf8 = SvUTF8(value) ? true : false;
601 216           int r = reqrep_send_wait(h, str, (uint32_t)len, utf8, &id, timeout);
602 216 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
603 216 100         RETVAL = (r == 1) ? newSVuv((UV)id) : &PL_sv_undef;
604             OUTPUT:
605             RETVAL
606              
607             SV *
608             send_notify(self, value)
609             SV *self
610             SV *value
611             PREINIT:
612 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
613             STRLEN len;
614             uint64_t id;
615             CODE:
616 1           const char *str = SvPV(value, len);
617 1           bool utf8 = SvUTF8(value) ? true : false;
618 1           int r = reqrep_try_send(h, str, (uint32_t)len, utf8, &id);
619 1 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
620 1 50         if (r == 1) {
621 1           reqrep_notify(h);
622 1           RETVAL = newSVuv((UV)id);
623             } else {
624 0           RETVAL = &PL_sv_undef;
625             }
626             OUTPUT:
627             RETVAL
628              
629             SV *
630             send_wait_notify(self, value, ...)
631             SV *self
632             SV *value
633             PREINIT:
634 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
635 1           double timeout = -1;
636             STRLEN len;
637             uint64_t id;
638             CODE:
639 1 50         if (items > 2) timeout = SvNV(ST(2));
640 1           const char *str = SvPV(value, len);
641 1           bool utf8 = SvUTF8(value) ? true : false;
642 1           int r = reqrep_send_wait(h, str, (uint32_t)len, utf8, &id, timeout);
643 1 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
644 1 50         if (r == 1) {
645 1           reqrep_notify(h);
646 1           RETVAL = newSVuv((UV)id);
647             } else {
648 0           RETVAL = &PL_sv_undef;
649             }
650             OUTPUT:
651             RETVAL
652              
653             SV *
654             get(self, id)
655             SV *self
656             UV id
657             PREINIT:
658 90 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
659             const char *str;
660             uint32_t len;
661             bool utf8;
662             CODE:
663 90           int r = reqrep_try_get(h, (uint64_t)id, &str, &len, &utf8);
664 90 50         if (r == -1) croak("Data::ReqRep::Shared::Client: invalid slot index");
665 90 50         if (r == -2) croak("Data::ReqRep::Shared::Client: out of memory");
666 90 100         if (r == 1) {
667 89           RETVAL = newSVpvn(str, len);
668 89 100         if (utf8) SvUTF8_on(RETVAL);
669             } else {
670 1           RETVAL = &PL_sv_undef;
671             }
672             OUTPUT:
673             RETVAL
674              
675             SV *
676             get_wait(self, id, ...)
677             SV *self
678             UV id
679             PREINIT:
680 6 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
681 6           double timeout = -1;
682             const char *str;
683             uint32_t len;
684             bool utf8;
685             CODE:
686 6 50         if (items > 2) timeout = SvNV(ST(2));
687 6           int r = reqrep_get_wait(h, (uint64_t)id, &str, &len, &utf8, timeout);
688 6 50         if (r == -1) croak("Data::ReqRep::Shared::Client: invalid slot index");
689 6 50         if (r == -2) croak("Data::ReqRep::Shared::Client: out of memory");
690 6 100         if (r == 1) {
691 3           RETVAL = newSVpvn(str, len);
692 3 50         if (utf8) SvUTF8_on(RETVAL);
693             } else {
694 3           RETVAL = &PL_sv_undef;
695             }
696             OUTPUT:
697             RETVAL
698              
699             SV *
700             req(self, value)
701             SV *self
702             SV *value
703             PREINIT:
704 747 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
705             STRLEN len;
706             const char *out_str;
707             uint32_t out_len;
708             bool out_utf8;
709             CODE:
710 747           const char *str = SvPV(value, len);
711 747           bool utf8 = SvUTF8(value) ? true : false;
712 747           int r = reqrep_request(h, str, (uint32_t)len, utf8, &out_str, &out_len, &out_utf8, -1);
713 747 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
714 747 50         if (r == 1) {
715 747           RETVAL = newSVpvn(out_str, out_len);
716 747 50         if (out_utf8) SvUTF8_on(RETVAL);
717             } else {
718 0           RETVAL = &PL_sv_undef;
719             }
720             OUTPUT:
721             RETVAL
722              
723             SV *
724             req_wait(self, value, timeout)
725             SV *self
726             SV *value
727             double timeout
728             PREINIT:
729 1801 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
730             STRLEN len;
731             const char *out_str;
732             uint32_t out_len;
733             bool out_utf8;
734             CODE:
735 1801           const char *str = SvPV(value, len);
736 1801           bool utf8 = SvUTF8(value) ? true : false;
737 1801           int r = reqrep_request(h, str, (uint32_t)len, utf8, &out_str, &out_len, &out_utf8, timeout);
738 1801 50         if (r == -2) croak("Data::ReqRep::Shared::Client: request too long (exceeds arena capacity or 2GB mask)");
739 1801 100         if (r == 1) {
740 1800           RETVAL = newSVpvn(out_str, out_len);
741 1800 50         if (out_utf8) SvUTF8_on(RETVAL);
742             } else {
743 1           RETVAL = &PL_sv_undef;
744             }
745             OUTPUT:
746             RETVAL
747              
748             void
749             cancel(self, id)
750             SV *self
751             UV id
752             PREINIT:
753 213 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
754             CODE:
755 213           reqrep_cancel(h, (uint64_t)id);
756              
757             UV
758             pending(self)
759             SV *self
760             PREINIT:
761 7 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
762             CODE:
763 7 50         RETVAL = (UV)reqrep_pending(h);
764             OUTPUT:
765             RETVAL
766              
767             SV *
768             path(self)
769             SV *self
770             PREINIT:
771 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    0          
    0          
772             CODE:
773 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
774             OUTPUT:
775             RETVAL
776              
777             SV *
778             stats(self)
779             SV *self
780             PREINIT:
781 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
782             CODE:
783 2           HV *hv = newHV();
784 2           ReqRepHeader *hdr = h->hdr;
785 2           hv_store(hv, "size", 4, newSVuv((UV)reqrep_size(h)), 0);
786 2           hv_store(hv, "capacity", 8, newSVuv(h->req_cap), 0);
787 2           hv_store(hv, "resp_slots", 10, newSVuv(h->resp_slots), 0);
788 2           hv_store(hv, "resp_data_max", 13, newSVuv(h->resp_data_max), 0);
789 2           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
790 2           hv_store(hv, "arena_cap", 9, newSVuv(h->req_arena_cap), 0);
791 2           hv_store(hv, "arena_used", 10, newSVuv((UV)__atomic_load_n(&hdr->arena_used, __ATOMIC_RELAXED)), 0);
792 2           hv_store(hv, "requests", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_requests, __ATOMIC_RELAXED)), 0);
793 2           hv_store(hv, "replies", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_replies, __ATOMIC_RELAXED)), 0);
794 2           hv_store(hv, "send_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_send_full, __ATOMIC_RELAXED)), 0);
795 2           hv_store(hv, "recv_empty", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recv_empty, __ATOMIC_RELAXED)), 0);
796 2           hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
797 2           hv_store(hv, "recv_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED)), 0);
798 2           hv_store(hv, "send_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED)), 0);
799 2           hv_store(hv, "slot_waiters", 12, newSVuv((UV)__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED)), 0);
800 2           RETVAL = newRV_noinc((SV *)hv);
801             OUTPUT:
802             RETVAL
803              
804             UV
805             size(self)
806             SV *self
807             PREINIT:
808 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
809             CODE:
810 4           RETVAL = (UV)reqrep_size(h);
811             OUTPUT:
812             RETVAL
813              
814             UV
815             capacity(self)
816             SV *self
817             PREINIT:
818 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
819             CODE:
820 4 50         RETVAL = h->req_cap;
821             OUTPUT:
822             RETVAL
823              
824             bool
825             is_empty(self)
826             SV *self
827             PREINIT:
828 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
829             CODE:
830 4 50         RETVAL = (reqrep_size(h) == 0);
831             OUTPUT:
832             RETVAL
833              
834             UV
835             resp_slots(self)
836             SV *self
837             PREINIT:
838 4 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
839             CODE:
840 4 50         RETVAL = h->resp_slots;
841             OUTPUT:
842             RETVAL
843              
844             UV
845             resp_size(self)
846             SV *self
847             PREINIT:
848 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    0          
    0          
849             CODE:
850 0 0         RETVAL = h->resp_data_max;
851             OUTPUT:
852             RETVAL
853              
854             IV
855             eventfd(self)
856             SV *self
857             PREINIT:
858 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
859             CODE:
860 2           RETVAL = reqrep_reply_eventfd_create(h);
861 2 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
862             OUTPUT:
863             RETVAL
864              
865             void
866             eventfd_set(self, fd)
867             SV *self
868             int fd
869             PREINIT:
870 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
871             CODE:
872 1           reqrep_reply_eventfd_set(h, fd);
873              
874             IV
875             fileno(self)
876             SV *self
877             PREINIT:
878 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
879             CODE:
880 2 50         RETVAL = h->reply_fd;
881             OUTPUT:
882             RETVAL
883              
884             SV *
885             eventfd_consume(self)
886             SV *self
887             PREINIT:
888 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
889             CODE:
890 1           int64_t v = reqrep_reply_eventfd_consume(h);
891 1 50         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
892             OUTPUT:
893             RETVAL
894              
895             void
896             notify(self)
897             SV *self
898             PREINIT:
899 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
900             CODE:
901 1           reqrep_notify(h);
902              
903             void
904             req_eventfd_set(self, fd)
905             SV *self
906             int fd
907             PREINIT:
908 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
909             CODE:
910 2           reqrep_eventfd_set(h, fd);
911              
912             IV
913             req_fileno(self)
914             SV *self
915             PREINIT:
916 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Client", self);
    50          
    50          
917             CODE:
918 1 50         RETVAL = h->notify_fd;
919             OUTPUT:
920             RETVAL
921              
922              
923             MODULE = Data::ReqRep::Shared PACKAGE = Data::ReqRep::Shared::Int
924              
925             SV *
926             new(class, path, req_cap, resp_slots)
927             const char *class
928             SV *path
929             UV req_cap
930             UV resp_slots
931             PREINIT:
932             char errbuf[REQREP_ERR_BUFLEN];
933             CODE:
934 7 50         const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
935 7           ReqRepHandle *h = reqrep_create_int(p, (uint32_t)req_cap, (uint32_t)resp_slots, errbuf);
936 7 50         if (!h) croak("Data::ReqRep::Shared::Int->new: %s", errbuf);
937 7           MAKE_OBJ(class, h);
938             OUTPUT:
939             RETVAL
940              
941             SV *
942             new_memfd(class, name, req_cap, resp_slots)
943             const char *class
944             const char *name
945             UV req_cap
946             UV resp_slots
947             PREINIT:
948             char errbuf[REQREP_ERR_BUFLEN];
949             CODE:
950 1           ReqRepHandle *h = reqrep_create_int_memfd(name, (uint32_t)req_cap, (uint32_t)resp_slots, errbuf);
951 1 50         if (!h) croak("Data::ReqRep::Shared::Int->new_memfd: %s", errbuf);
952 1           MAKE_OBJ(class, h);
953             OUTPUT:
954             RETVAL
955              
956             SV *
957             new_from_fd(class, fd)
958             const char *class
959             int fd
960             PREINIT:
961             char errbuf[REQREP_ERR_BUFLEN];
962             CODE:
963 1           ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_INT, errbuf);
964 1 50         if (!h) croak("Data::ReqRep::Shared::Int->new_from_fd: %s", errbuf);
965 1           MAKE_OBJ(class, h);
966             OUTPUT:
967             RETVAL
968              
969             IV
970             memfd(self)
971             SV *self
972             PREINIT:
973 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
974             CODE:
975 1 50         RETVAL = h->backing_fd;
976             OUTPUT:
977             RETVAL
978              
979             void
980             DESTROY(self)
981             SV *self
982             CODE:
983 9 50         if (!SvROK(self)) return;
984 9           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
985 9 50         if (!h) return;
986 9           sv_setiv(SvRV(self), 0);
987 9           reqrep_destroy(h);
988              
989             void
990             recv(self)
991             SV *self
992             PREINIT:
993 31 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
994             int64_t value;
995             uint64_t id;
996             PPCODE:
997 31 100         if (reqrep_int_try_recv(h, &value, &id)) {
998 29 50         mXPUSHi((IV)value);
999 29 50         mXPUSHu((UV)id);
1000             }
1001              
1002             void
1003             recv_wait(self, ...)
1004             SV *self
1005             PREINIT:
1006 7 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1007 7           double timeout = -1;
1008             int64_t value;
1009             uint64_t id;
1010             PPCODE:
1011 7 50         if (items > 1) timeout = SvNV(ST(1));
1012 7 50         if (reqrep_int_recv_wait(h, &value, &id, timeout)) {
1013 7 50         mXPUSHi((IV)value);
1014 7 50         mXPUSHu((UV)id);
1015             }
1016              
1017             bool
1018             reply(self, id, value)
1019             SV *self
1020             UV id
1021             IV value
1022             PREINIT:
1023 35 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1024             CODE:
1025 35           int r = reqrep_int_reply(h, (uint64_t)id, (int64_t)value);
1026 35 50         if (r == -1) croak("Data::ReqRep::Shared::Int: invalid slot index");
1027 35 100         RETVAL = (r == 1);
1028             OUTPUT:
1029             RETVAL
1030              
1031             UV
1032             size(self)
1033             SV *self
1034             PREINIT:
1035 3 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1036             CODE:
1037 3           RETVAL = (UV)reqrep_int_size(h);
1038             OUTPUT:
1039             RETVAL
1040              
1041             UV
1042             capacity(self)
1043             SV *self
1044             PREINIT:
1045 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1046             CODE:
1047 2 50         RETVAL = h->req_cap;
1048             OUTPUT:
1049             RETVAL
1050              
1051             UV
1052             resp_slots(self)
1053             SV *self
1054             PREINIT:
1055 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1056             CODE:
1057 2 50         RETVAL = h->resp_slots;
1058             OUTPUT:
1059             RETVAL
1060              
1061             SV *
1062             path(self)
1063             SV *self
1064             PREINIT:
1065 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1066             CODE:
1067 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1068             OUTPUT:
1069             RETVAL
1070              
1071             bool
1072             is_empty(self)
1073             SV *self
1074             PREINIT:
1075 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1076             CODE:
1077 1 50         RETVAL = (reqrep_int_size(h) == 0);
1078             OUTPUT:
1079             RETVAL
1080              
1081             UV
1082             resp_size(self)
1083             SV *self
1084             PREINIT:
1085 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1086             CODE:
1087 0 0         RETVAL = h->resp_data_max;
1088             OUTPUT:
1089             RETVAL
1090              
1091             SV *
1092             stats(self)
1093             SV *self
1094             PREINIT:
1095 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1096             CODE:
1097 1           HV *hv = newHV();
1098 1           ReqRepHeader *hdr = h->hdr;
1099 1           hv_store(hv, "size", 4, newSVuv((UV)reqrep_int_size(h)), 0);
1100 1           hv_store(hv, "capacity", 8, newSVuv(h->req_cap), 0);
1101 1           hv_store(hv, "resp_slots", 10, newSVuv(h->resp_slots), 0);
1102 1           hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
1103 1           hv_store(hv, "requests", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_requests, __ATOMIC_RELAXED)), 0);
1104 1           hv_store(hv, "replies", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_replies, __ATOMIC_RELAXED)), 0);
1105 1           hv_store(hv, "send_full", 9, newSVuv((UV)__atomic_load_n(&hdr->stat_send_full, __ATOMIC_RELAXED)), 0);
1106 1           hv_store(hv, "recv_empty", 10, newSVuv(__atomic_load_n(&hdr->stat_recv_empty, __ATOMIC_RELAXED)), 0);
1107 1           hv_store(hv, "recoveries", 10, newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1108 1           hv_store(hv, "send_waiters", 12, newSVuv(__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED)), 0);
1109 1           hv_store(hv, "recv_waiters", 12, newSVuv(__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED)), 0);
1110 1           hv_store(hv, "slot_waiters", 12, newSVuv(__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED)), 0);
1111 1           RETVAL = newRV_noinc((SV *)hv);
1112             OUTPUT:
1113             RETVAL
1114              
1115             void
1116             clear(self)
1117             SV *self
1118             PREINIT:
1119 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1120             CODE:
1121 1           reqrep_int_clear(h);
1122              
1123             void
1124             sync(self)
1125             SV *self
1126             PREINIT:
1127 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1128             CODE:
1129 0 0         if (reqrep_sync(h) != 0) croak("msync: %s", strerror(errno));
1130              
1131             IV
1132             eventfd(self)
1133             SV *self
1134             PREINIT:
1135 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1136             CODE:
1137 1           RETVAL = reqrep_eventfd_create(h);
1138 1 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1139             OUTPUT:
1140             RETVAL
1141              
1142             void
1143             eventfd_set(self, fd)
1144             SV *self
1145             int fd
1146             PREINIT:
1147 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1148             CODE:
1149 0           reqrep_eventfd_set(h, fd);
1150              
1151             IV
1152             fileno(self)
1153             SV *self
1154             PREINIT:
1155 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1156             CODE:
1157 1 50         RETVAL = h->notify_fd;
1158             OUTPUT:
1159             RETVAL
1160              
1161             SV *
1162             eventfd_consume(self)
1163             SV *self
1164             PREINIT:
1165 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1166             CODE:
1167 0           int64_t v = reqrep_eventfd_consume(h);
1168 0 0         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1169             OUTPUT:
1170             RETVAL
1171              
1172             void
1173             notify(self)
1174             SV *self
1175             PREINIT:
1176 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1177             CODE:
1178 0           reqrep_notify(h);
1179              
1180             IV
1181             reply_eventfd(self)
1182             SV *self
1183             PREINIT:
1184 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    50          
    50          
1185             CODE:
1186 1           RETVAL = reqrep_reply_eventfd_create(h);
1187 1 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1188             OUTPUT:
1189             RETVAL
1190              
1191             void
1192             reply_eventfd_set(self, fd)
1193             SV *self
1194             int fd
1195             PREINIT:
1196 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1197             CODE:
1198 0           reqrep_reply_eventfd_set(h, fd);
1199              
1200             IV
1201             reply_fileno(self)
1202             SV *self
1203             PREINIT:
1204 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1205             CODE:
1206 0 0         RETVAL = h->reply_fd;
1207             OUTPUT:
1208             RETVAL
1209              
1210             SV *
1211             reply_eventfd_consume(self)
1212             SV *self
1213             PREINIT:
1214 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1215             CODE:
1216 0           int64_t v = reqrep_reply_eventfd_consume(h);
1217 0 0         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1218             OUTPUT:
1219             RETVAL
1220              
1221             void
1222             reply_notify(self)
1223             SV *self
1224             PREINIT:
1225 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int", self);
    0          
    0          
1226             CODE:
1227 0           reqrep_reply_notify(h);
1228              
1229             void
1230             unlink(self_or_class, ...)
1231             SV *self_or_class
1232             CODE:
1233             const char *path;
1234 5 50         if (sv_isobject(self_or_class)) {
1235 5           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self_or_class)));
1236 5 50         if (!h) croak("Attempted to use a destroyed object");
1237 5           path = h->path;
1238             } else {
1239 0 0         if (items < 2) croak("Usage: ...->unlink($path)");
1240 0           path = SvPV_nolen(ST(1));
1241             }
1242 5 50         if (!path) croak("cannot unlink anonymous or memfd channel");
1243 5 50         if (unlink(path) != 0) croak("unlink(%s): %s", path, strerror(errno));
1244              
1245              
1246             MODULE = Data::ReqRep::Shared PACKAGE = Data::ReqRep::Shared::Int::Client
1247              
1248             SV *
1249             new(class, path)
1250             const char *class
1251             SV *path
1252             PREINIT:
1253             char errbuf[REQREP_ERR_BUFLEN];
1254             CODE:
1255 7           const char *p = SvPV_nolen(path);
1256 7           ReqRepHandle *h = reqrep_open(p, REQREP_MODE_INT, errbuf);
1257 7 50         if (!h) croak("Data::ReqRep::Shared::Int::Client->new: %s", errbuf);
1258 7           MAKE_OBJ(class, h);
1259             OUTPUT:
1260             RETVAL
1261              
1262             SV *
1263             new_from_fd(class, fd)
1264             const char *class
1265             int fd
1266             PREINIT:
1267             char errbuf[REQREP_ERR_BUFLEN];
1268             CODE:
1269 1           ReqRepHandle *h = reqrep_open_fd(fd, REQREP_MODE_INT, errbuf);
1270 1 50         if (!h) croak("Data::ReqRep::Shared::Int::Client->new_from_fd: %s", errbuf);
1271 1           MAKE_OBJ(class, h);
1272             OUTPUT:
1273             RETVAL
1274              
1275             IV
1276             memfd(self)
1277             SV *self
1278             PREINIT:
1279 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1280             CODE:
1281 0 0         RETVAL = h->backing_fd;
1282             OUTPUT:
1283             RETVAL
1284              
1285             void
1286             DESTROY(self)
1287             SV *self
1288             CODE:
1289 8 50         if (!SvROK(self)) return;
1290 8           ReqRepHandle *h = INT2PTR(ReqRepHandle*, SvIV(SvRV(self)));
1291 8 50         if (!h) return;
1292 8           sv_setiv(SvRV(self), 0);
1293 8           reqrep_destroy(h);
1294              
1295             SV *
1296             send(self, value)
1297             SV *self
1298             IV value
1299             PREINIT:
1300 32 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1301             uint64_t id;
1302             CODE:
1303 32           int r = reqrep_int_try_send(h, (int64_t)value, &id);
1304 32 100         RETVAL = (r == 1) ? newSVuv((UV)id) : &PL_sv_undef;
1305             OUTPUT:
1306             RETVAL
1307              
1308             SV *
1309             send_wait(self, value, ...)
1310             SV *self
1311             IV value
1312             PREINIT:
1313 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1314 2           double timeout = -1;
1315             uint64_t id;
1316             CODE:
1317 2 50         if (items > 2) timeout = SvNV(ST(2));
1318 2           int r = reqrep_int_send_wait(h, (int64_t)value, &id, timeout);
1319 2 50         RETVAL = (r == 1) ? newSVuv((UV)id) : &PL_sv_undef;
1320             OUTPUT:
1321             RETVAL
1322              
1323             SV *
1324             get(self, id)
1325             SV *self
1326             UV id
1327             PREINIT:
1328 25 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1329             int64_t value;
1330             CODE:
1331 25           int r = reqrep_int_try_get(h, (uint64_t)id, &value);
1332 25 50         if (r == -1) croak("Data::ReqRep::Shared::Int::Client: invalid slot index");
1333 25 50         RETVAL = (r == 1) ? newSViv((IV)value) : &PL_sv_undef;
1334             OUTPUT:
1335             RETVAL
1336              
1337             SV *
1338             get_wait(self, id, ...)
1339             SV *self
1340             UV id
1341             PREINIT:
1342 2 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1343 2           double timeout = -1;
1344             int64_t value;
1345             CODE:
1346 2 50         if (items > 2) timeout = SvNV(ST(2));
1347 2           int r = reqrep_int_get_wait(h, (uint64_t)id, &value, timeout);
1348 2 50         if (r == -1) croak("Data::ReqRep::Shared::Int::Client: invalid slot index");
1349 2 50         RETVAL = (r == 1) ? newSViv((IV)value) : &PL_sv_undef;
1350             OUTPUT:
1351             RETVAL
1352              
1353             SV *
1354             req(self, value)
1355             SV *self
1356             IV value
1357             PREINIT:
1358 5 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1359             int64_t out;
1360             CODE:
1361 5           int r = reqrep_int_request(h, (int64_t)value, &out, -1);
1362 5 50         RETVAL = (r == 1) ? newSViv((IV)out) : &PL_sv_undef;
1363             OUTPUT:
1364             RETVAL
1365              
1366             SV *
1367             req_wait(self, value, timeout)
1368             SV *self
1369             IV value
1370             double timeout
1371             PREINIT:
1372 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1373             int64_t out;
1374             CODE:
1375 1           int r = reqrep_int_request(h, (int64_t)value, &out, timeout);
1376 1 50         RETVAL = (r == 1) ? newSViv((IV)out) : &PL_sv_undef;
1377             OUTPUT:
1378             RETVAL
1379              
1380             void
1381             cancel(self, id)
1382             SV *self
1383             UV id
1384             PREINIT:
1385 3 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1386             CODE:
1387 3           reqrep_cancel(h, (uint64_t)id);
1388              
1389             UV
1390             pending(self)
1391             SV *self
1392             PREINIT:
1393 3 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1394             CODE:
1395 3 50         RETVAL = (UV)reqrep_pending(h);
1396             OUTPUT:
1397             RETVAL
1398              
1399             UV
1400             size(self)
1401             SV *self
1402             PREINIT:
1403 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1404             CODE:
1405 0           RETVAL = (UV)reqrep_int_size(h);
1406             OUTPUT:
1407             RETVAL
1408              
1409             UV
1410             capacity(self)
1411             SV *self
1412             PREINIT:
1413 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1414             CODE:
1415 1 50         RETVAL = h->req_cap;
1416             OUTPUT:
1417             RETVAL
1418              
1419             bool
1420             is_empty(self)
1421             SV *self
1422             PREINIT:
1423 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1424             CODE:
1425 1 50         RETVAL = (reqrep_int_size(h) == 0);
1426             OUTPUT:
1427             RETVAL
1428              
1429             UV
1430             resp_slots(self)
1431             SV *self
1432             PREINIT:
1433 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1434             CODE:
1435 0 0         RETVAL = h->resp_slots;
1436             OUTPUT:
1437             RETVAL
1438              
1439             UV
1440             resp_size(self)
1441             SV *self
1442             PREINIT:
1443 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1444             CODE:
1445 0 0         RETVAL = h->resp_data_max;
1446             OUTPUT:
1447             RETVAL
1448              
1449             SV *
1450             stats(self)
1451             SV *self
1452             PREINIT:
1453 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1454             CODE:
1455 0           HV *hv = newHV();
1456 0           ReqRepHeader *hdr = h->hdr;
1457 0           hv_store(hv, "size", 4, newSVuv((UV)reqrep_int_size(h)), 0);
1458 0           hv_store(hv, "capacity", 8, newSVuv(h->req_cap), 0);
1459 0           hv_store(hv, "resp_slots", 10, newSVuv(h->resp_slots), 0);
1460 0           hv_store(hv, "requests", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_requests, __ATOMIC_RELAXED)), 0);
1461 0           hv_store(hv, "replies", 7, newSVuv((UV)__atomic_load_n(&hdr->stat_replies, __ATOMIC_RELAXED)), 0);
1462 0           hv_store(hv, "recoveries", 10, newSVuv(__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
1463 0           RETVAL = newRV_noinc((SV *)hv);
1464             OUTPUT:
1465             RETVAL
1466              
1467             SV *
1468             path(self)
1469             SV *self
1470             PREINIT:
1471 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1472             CODE:
1473 0 0         RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
1474             OUTPUT:
1475             RETVAL
1476              
1477             IV
1478             eventfd(self)
1479             SV *self
1480             PREINIT:
1481 1 50         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    50          
    50          
1482             CODE:
1483 1           RETVAL = reqrep_reply_eventfd_create(h);
1484 1 50         if (RETVAL < 0) croak("eventfd: %s", strerror(errno));
1485             OUTPUT:
1486             RETVAL
1487              
1488             void
1489             eventfd_set(self, fd)
1490             SV *self
1491             int fd
1492             PREINIT:
1493 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1494             CODE:
1495 0           reqrep_reply_eventfd_set(h, fd);
1496              
1497             IV
1498             fileno(self)
1499             SV *self
1500             PREINIT:
1501 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1502             CODE:
1503 0 0         RETVAL = h->reply_fd;
1504             OUTPUT:
1505             RETVAL
1506              
1507             SV *
1508             eventfd_consume(self)
1509             SV *self
1510             PREINIT:
1511 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1512             CODE:
1513 0           int64_t v = reqrep_reply_eventfd_consume(h);
1514 0 0         RETVAL = (v >= 0) ? newSViv((IV)v) : &PL_sv_undef;
1515             OUTPUT:
1516             RETVAL
1517              
1518             void
1519             notify(self)
1520             SV *self
1521             PREINIT:
1522 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1523             CODE:
1524 0           reqrep_notify(h);
1525              
1526             void
1527             req_eventfd_set(self, fd)
1528             SV *self
1529             int fd
1530             PREINIT:
1531 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1532             CODE:
1533 0           reqrep_eventfd_set(h, fd);
1534              
1535             IV
1536             req_fileno(self)
1537             SV *self
1538             PREINIT:
1539 0 0         EXTRACT_HANDLE("Data::ReqRep::Shared::Int::Client", self);
    0          
    0          
1540             CODE:
1541 0 0         RETVAL = h->notify_fd;
1542             OUTPUT:
1543             RETVAL