File Coverage

src/future.c
Criterion Covered Total %
statement 13 927 1.4
branch 4 626 0.6
condition n/a
subroutine n/a
pod n/a
total 17 1553 1.0


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2              
3             #include "EXTERN.h"
4             #include "perl.h"
5             #include "XSUB.h"
6              
7             #include "future.h"
8              
9             #include "perl-backcompat.c.inc"
10              
11             #include "av-utils.c.inc"
12             #include "cv_set_anysv_refcounted.c.inc"
13              
14             #if !HAVE_PERL_VERSION(5, 16, 0)
15             # define false FALSE
16             # define true TRUE
17             #endif
18              
19             #ifdef HAVE_DMD_HELPER
20             # define WANT_DMD_API_044
21             # include "DMD_helper.h"
22             #endif
23              
24             #if !HAVE_PERL_VERSION(5, 16, 0)
25             # define XS_INTERNAL(name) static XS(name)
26             #endif
27              
28             #define mPUSHpvs(s) mPUSHp("" s "", sizeof(s)-1)
29              
30             static bool future_debug;
31             static bool capture_times;
32              
33             /* There's no reason these have to match those in Future.pm but for now we
34             * might as well just copy the same values
35             */
36             enum {
37             CB_DONE = (1<<0),
38             CB_FAIL = (1<<1),
39             CB_CANCEL = (1<<2),
40             CB_ALWAYS = CB_DONE|CB_FAIL|CB_CANCEL,
41              
42             CB_SELF = (1<<3),
43             CB_RESULT = (1<<4),
44              
45             CB_SEQ_READY = (1<<5),
46             CB_SEQ_CANCEL = (1<<6),
47             CB_SEQ_ANY = CB_SEQ_READY|CB_SEQ_CANCEL,
48              
49             CB_SEQ_IMDONE = (1<<7),
50             CB_SEQ_IMFAIL = (1<<8),
51              
52             CB_SEQ_STRICT = (1<<9),
53              
54             CB_IS_FUTURE = (1<<10),
55             };
56              
57             // TODO: Consider using different struct types to save memory? Or maybe it's
58             // so small a difference it doesn't matter
59             struct FutureXSCallback
60             {
61             unsigned int flags;
62             union {
63             SV *code; /* if !(flags & CB_SEQ_ANY) */
64             struct { /* if (flags & CB_SEQ_ANY) */
65             SV *thencode;
66             SV *elsecode;
67             HV *catches;
68             SV *f;
69             } seq;
70             };
71             };
72              
73             struct FutureXSRevocation
74             {
75             SV *precedent_f;
76             SV *toclear_sv_at;
77             };
78              
79             #define CB_NONSEQ_CODE(cb) \
80             ({ if((cb)->flags & CB_SEQ_ANY) croak("ARGH: CB_NONSEQ_CODE on SEQ"); (cb)->code;})
81              
82             enum {
83             SUBFLAG_NO_CANCEL = (1<<0),
84             };
85              
86             struct FutureXS
87             {
88             unsigned int ready : 1;
89             unsigned int cancelled : 1;
90             unsigned int reported : 1;
91             SV *label;
92             AV *result; // implies done
93             AV *failure; // implies fail
94             AV *callbacks; // values are struct FutureXSCallback ptrs directly. TODO: custom ptr/fill/max
95             AV *on_cancel; // values are CVs directly
96             AV *revoke_when_ready; // values are struct FutureXSRevocation ptrs directly.
97             int empty_revocation_slots;
98              
99             HV *udata;
100              
101             struct timeval btime, rtime;
102             SV *constructed_at;
103              
104             /* For convergents
105             * TODO: consider making this an optional extra part of the body, only
106             * allocated when required
107             */
108             AV *subs;
109             U8 *subflags;
110             Size_t pending_subs;
111              
112             /* For without_cancel, purely to keep a strongref */
113             SV *precedent_f;
114             };
115              
116             #ifdef USE_ITHREADS
117             static int future_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *param);
118              
119             static MGVTBL vtbl = {
120             .svt_dup = &future_dup,
121             };
122             #endif
123              
124 0           bool Future_sv_is_future(pTHX_ SV *sv)
125             {
126 0 0         if(!SvROK(sv) || !SvOBJECT(SvRV(sv)))
    0          
127 0           return false;
128              
129 0 0         if(sv_derived_from(sv, "Future") || sv_derived_from(sv, "Future::XS"))
    0          
130 0           return true;
131              
132 0           return false;
133             }
134              
135             #define get_future(sv) S_get_future(aTHX_ sv, FALSE)
136             #define maybe_get_future(sv) S_get_future(aTHX_ sv, TRUE)
137 0           static struct FutureXS *S_get_future(pTHX_ SV *sv, bool nullok)
138             {
139             assert(sv);
140             assert(SvROK(sv) && SvOBJECT(SvRV(sv)));
141             // TODO: Add some safety checking about class
142 0           struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV(SvRV(sv)));
143 0 0         if(self || nullok)
    0          
144 0           return self;
145 0           croak("Future::XS instance %" SVf " is not available in this thread", SVfARG(sv));
146             }
147              
148 0           SV *Future_new(pTHX_ const char *cls)
149             {
150 0 0         if(!cls)
151 0           cls = "Future::XS";
152              
153             struct FutureXS *self;
154 0           Newx(self, 1, struct FutureXS);
155              
156 0           self->ready = false;
157 0           self->cancelled = false;
158 0           self->reported = false;
159              
160 0           self->label = NULL;
161              
162 0 0         if(capture_times)
163 0           gettimeofday(&self->btime, NULL);
164             else
165 0           self->btime = (struct timeval){ 0 };
166              
167 0           self->rtime = (struct timeval){ 0 };
168              
169 0 0         if(future_debug)
170 0 0         self->constructed_at = newSVpvf("constructed at %s line %d", CopFILE(PL_curcop), CopLINE(PL_curcop));
171             else
172 0           self->constructed_at = NULL;
173              
174 0           self->result = NULL;
175 0           self->failure = NULL;
176              
177 0           self->callbacks = NULL;
178 0           self->on_cancel = NULL;
179 0           self->revoke_when_ready = NULL;
180 0           self->empty_revocation_slots = 0;
181              
182 0           self->udata = NULL;
183              
184 0           self->subs = NULL;
185 0           self->subflags = NULL;
186              
187 0           self->precedent_f = NULL;
188              
189 0           SV *ret = newSV(0);
190 0           sv_setref_pv(ret, cls, self);
191              
192             #ifdef USE_ITHREADS
193             MAGIC *mg = sv_magicext(SvRV(ret), SvRV(ret), PERL_MAGIC_ext, &vtbl, NULL, 0);
194             mg->mg_flags |= MGf_DUP;
195             #endif
196              
197 0           return ret;
198             }
199              
200             #define future_new_proto(f1) Future_new_proto(aTHX_ f1)
201 0           SV *Future_new_proto(pTHX_ SV *f1)
202             {
203             assert(f1 && SvROK(f1) && SvRV(f1));
204             // TODO Shortcircuit in the common case that f1 is a Future instance
205             // return future_new(HvNAME(SvSTASH(SvRV(f1))));
206              
207 0           dSP;
208 0           ENTER;
209 0           SAVETMPS;
210              
211 0 0         EXTEND(SP, 1);
212 0 0         PUSHMARK(SP);
213 0           PUSHs(sv_mortalcopy(f1));
214 0           PUTBACK;
215              
216 0           call_method("new", G_SCALAR);
217              
218 0           SPAGAIN;
219              
220 0           SV *ret = SvREFCNT_inc(POPs);
221              
222 0           PUTBACK;
223 0 0         FREETMPS;
224 0           LEAVE;
225              
226 0           return ret;
227             }
228              
229             #ifdef USE_ITHREADS
230              
231             static int future_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *param)
232             {
233             /* We don't currently support duplicating a Future instance across thread
234             * creation/return. For now just zero out the pointer and complain if anyone
235             * tries to access it.
236             * This at least means that incidental Future instances that happen to exist
237             * in main thread memory won't be disturbed when sidecar threads are joined.
238             */
239             sv_setiv(mg->mg_obj, 0);
240             }
241             #endif
242              
243             #define clear_callback(cb) S_clear_callback(aTHX_ cb)
244 0           static void S_clear_callback(pTHX_ struct FutureXSCallback *cb)
245             {
246 0           int flags = cb->flags;
247 0 0         if(flags & CB_SEQ_ANY) {
248 0           SvREFCNT_dec(cb->seq.thencode);
249 0           SvREFCNT_dec(cb->seq.elsecode);
250 0           SvREFCNT_dec(cb->seq.catches);
251 0           SvREFCNT_dec(cb->seq.f);
252             }
253             else {
254 0 0         SvREFCNT_dec(CB_NONSEQ_CODE(cb));
255             }
256 0           }
257              
258             #define destroy_callbacks(self) S_destroy_callbacks(aTHX_ self)
259 0           static void S_destroy_callbacks(pTHX_ struct FutureXS *self)
260             {
261 0           AV *callbacksav = self->callbacks;
262 0 0         while(callbacksav && AvFILLp(callbacksav) > -1) {
    0          
263 0           struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[AvFILLp(callbacksav)--];
264 0           clear_callback(cb);
265 0           Safefree(cb);
266             }
267 0           }
268              
269             #define future_mortal_selfstr(f) Future_mortal_selfstr(aTHX_ f)
270 0           static SV *Future_mortal_selfstr(pTHX_ SV *f)
271             {
272 0           struct FutureXS *self = get_future(f);
273              
274 0           SV *ret = newSVpvf("%" SVf, SVfARG(f));
275 0 0         if(self->label)
276 0           sv_catpvf(ret, " (\"%" SVf "\")", SVfARG(self->label));
277 0 0         if(future_debug)
278 0           sv_catpvf(ret, " (%" SVf ")", SVfARG(self->constructed_at));
279 0           return sv_2mortal(ret);
280             }
281              
282 0           void Future_destroy(pTHX_ SV *f)
283             {
284             #ifdef DEBUGGING
285             // Every pointer in this function ought to have been uniquely held
286             # define UNREF(p) \
287             do { \
288             if(p) assert(SvREFCNT(p) == 1); \
289             SvREFCNT_dec((SV *)p); \
290             (p) = (void *)0xAA55AA55; \
291             } while(0)
292             #else
293             # define UNREF(p) SvREFCNT_dec((SV *)p)
294             #endif
295              
296             /* Defend against being run during global destruction */
297 0 0         if(!f || !SvROK(f))
    0          
298 0           return;
299 0           struct FutureXS *self = maybe_get_future(f);
300 0 0         if(!self)
301 0           return;
302              
303 0 0         if(future_debug &&
304 0 0         (!self->ready || (self->failure && !self->reported))) {
    0          
    0          
305 0 0         if(!self->ready)
306 0 0         warn("%" SVf " was lost near %s line %d before it was ready\n",
307             SVfARG(future_mortal_selfstr(f)),
308             CopFILE(PL_curcop), CopLINE(PL_curcop));
309             else {
310 0           SV *failure = AvARRAY(self->failure)[0];
311 0 0         warn("%" SVf " was lost near %s line %d with an unreported failure of: %" SVf "\n",
312             SVfARG(future_mortal_selfstr(f)),
313             CopFILE(PL_curcop), CopLINE(PL_curcop),
314             SVfARG(failure));
315             }
316             }
317              
318 0           UNREF(self->label);
319              
320 0           UNREF(self->result);
321              
322 0           UNREF(self->failure);
323              
324 0           destroy_callbacks(self);
325 0           UNREF(self->callbacks);
326              
327 0           UNREF(self->on_cancel);
328              
329 0           AV *revocationsav = self->revoke_when_ready;
330 0 0         while(revocationsav && AvFILLp(revocationsav) > -1) {
    0          
331 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocationsav)[AvFILLp(revocationsav)--];
332 0           UNREF(rev->precedent_f);
333 0           UNREF(rev->toclear_sv_at);
334 0           Safefree(rev);
335             }
336 0           UNREF(self->revoke_when_ready);
337              
338 0           UNREF(self->udata);
339              
340 0           UNREF(self->constructed_at);
341              
342 0           UNREF(self->subs);
343 0           Safefree(self->subflags);
344              
345 0           UNREF(self->precedent_f);
346              
347 0           Safefree(self);
348              
349             #undef UNREF
350             }
351              
352 0           bool Future_is_ready(pTHX_ SV *f)
353             {
354 0           struct FutureXS *self = get_future(f);
355 0           return self->ready;
356             }
357              
358 0           bool Future_is_done(pTHX_ SV *f)
359             {
360 0           struct FutureXS *self = get_future(f);
361 0 0         return self->ready && !self->failure && !self->cancelled;
    0          
    0          
362             }
363              
364 0           bool Future_is_failed(pTHX_ SV *f)
365             {
366 0           struct FutureXS *self = get_future(f);
367 0 0         return self->ready && self->failure;
    0          
368             }
369              
370 0           bool Future_is_cancelled(pTHX_ SV *f)
371             {
372 0           struct FutureXS *self = get_future(f);
373 0           return self->cancelled;
374             }
375              
376             #define clear_on_cancel(self) S_clear_on_cancel(aTHX_ self)
377 0           static void S_clear_on_cancel(pTHX_ struct FutureXS *self)
378             {
379 0 0         if(!self->on_cancel)
380 0           return;
381              
382 0           AV *on_cancel = self->on_cancel;
383 0           self->on_cancel = NULL;
384              
385 0           SvREFCNT_dec(on_cancel);
386             }
387              
388             #define push_callback(self, cb) S_push_callback(aTHX_ self, cb)
389 0           static void S_push_callback(pTHX_ struct FutureXS *self, struct FutureXSCallback *cb)
390             {
391             struct FutureXSCallback *new;
392 0           Newx(new, 1, struct FutureXSCallback);
393              
394 0           new->flags = cb->flags;
395 0 0         if(cb->flags & CB_SEQ_ANY) {
396 0           new->seq.thencode = cb->seq.thencode;
397 0           new->seq.elsecode = cb->seq.elsecode;
398 0           new->seq.catches = cb->seq.catches;
399 0           new->seq.f = cb->seq.f;
400             }
401             else {
402 0 0         new->code = CB_NONSEQ_CODE(cb);
403             }
404              
405 0 0         if(!self->callbacks)
406 0           self->callbacks = newAV();
407              
408 0           av_push(self->callbacks, (SV *)new);
409 0           }
410              
411             #define wrap_cb(f, name, cv) S_wrap_cb(aTHX_ f, name, cv)
412 0           static SV *S_wrap_cb(pTHX_ SV *f, const char *name, SV *cv)
413             {
414             // TODO: This is quite the speed bump having to do this, in the common case
415             // that it isn't overridden
416 0           dSP;
417 0           ENTER;
418 0           SAVETMPS;
419              
420 0 0         EXTEND(SP, 3);
421 0 0         PUSHMARK(SP);
422 0           PUSHs(sv_mortalcopy(f));
423 0           mPUSHp(name, strlen(name));
424 0           PUSHs(sv_mortalcopy(cv));
425 0           PUTBACK;
426              
427 0           call_method("wrap_cb", G_SCALAR);
428              
429 0           SPAGAIN;
430 0           SV *ret = newSVsv(POPs);
431              
432 0           PUTBACK;
433 0 0         FREETMPS;
434 0           LEAVE;
435              
436 0           return ret;
437             }
438              
439             #define invoke_seq_callback(self, selfsv, cb) S_invoke_seq_callback(aTHX_ self, selfsv, cb)
440 0           static SV *S_invoke_seq_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
441             {
442 0           int flags = cb->flags;
443              
444 0           bool is_fail = cBOOL(self->failure);
445 0 0         bool is_done = !self->cancelled && !is_fail;
    0          
446              
447 0 0         AV *result = (is_done) ? self->result :
448 0 0         (is_fail) ? self->failure :
449             NULL;
450              
451 0 0         SV *code = (is_done) ? cb->seq.thencode :
452 0 0         (is_fail) ? cb->seq.elsecode :
453             NULL;
454              
455 0 0         if(is_fail && result && av_count(result) > 1 && cb->seq.catches) {
    0          
    0          
    0          
456 0           SV *category = AvARRAY(result)[1];
457 0 0         if(SvOK(category)) {
458 0           HE *he = hv_fetch_ent(cb->seq.catches, category, 0, 0);
459 0 0         if(he && HeVAL(he))
    0          
460 0           code = HeVAL(he);
461             }
462             }
463              
464 0 0         if(!code || !SvOK(code))
    0          
465 0           return newSVsv(selfsv);
466              
467 0           dSP;
468              
469 0           ENTER;
470 0           SAVETMPS;
471              
472 0 0         PUSHMARK(SP);
473 0 0         if(flags & CB_SELF)
474 0 0         XPUSHs(selfsv);
475 0 0         if(flags & CB_RESULT)
476 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
477 0           PUTBACK;
478              
479             assert(SvOK(code));
480 0           call_sv(code, G_SCALAR|G_EVAL);
481              
482 0           SPAGAIN;
483              
484 0 0         if(SvROK(ERRSV) || SvTRUE(ERRSV)) {
    0          
    0          
    0          
485 0           POPs;
486              
487 0           SV *fseq = cb->seq.f;
488              
489 0 0         if(!fseq)
490 0           fseq = future_new_proto(selfsv);
491              
492 0 0         future_failv(fseq, &ERRSV, 1);
493              
494 0 0         FREETMPS;
495 0           LEAVE;
496              
497 0           return fseq;
498             }
499              
500 0           SV *f2 = POPs;
501 0           SvREFCNT_inc(f2);
502              
503 0           PUTBACK;
504 0 0         FREETMPS;
505 0           LEAVE;
506              
507 0 0         if(!sv_is_future(f2)) {
508 0           SV *result = f2;
509              
510             // TODO: strictness check
511              
512 0           f2 = future_new_proto(selfsv);
513 0           future_donev(f2, &result, 1);
514             }
515              
516 0           return f2;
517             }
518              
519             #define invoke_callback(self, selfsv, cb) S_invoke_callback(aTHX_ self, selfsv, cb)
520 0           static void S_invoke_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
521             {
522 0           int flags = cb->flags;
523              
524 0           bool is_cancelled = self->cancelled;
525 0           bool is_fail = cBOOL(self->failure);
526 0 0         bool is_done = !is_cancelled && !is_fail;
    0          
527              
528 0 0         AV *result = (is_done) ? self->result :
529 0 0         (is_fail) ? self->failure :
530             NULL;
531              
532 0 0         if(is_done && !(flags & CB_DONE))
    0          
533 0           return;
534 0 0         if(is_fail && !(flags & CB_FAIL))
    0          
535 0           return;
536 0 0         if(is_cancelled && !(flags & CB_CANCEL))
    0          
537 0           return;
538              
539 0 0         if(flags & CB_IS_FUTURE) {
540 0           dSP;
541              
542 0           ENTER;
543 0           SAVETMPS;
544              
545 0 0         PUSHMARK(SP);
546 0 0         XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
    0          
547 0 0         if(result)
548 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
549              
550 0           PUTBACK;
551 0 0         if(is_done)
552 0           call_method("done", G_VOID);
553 0 0         else if(is_fail)
554 0           call_method("fail", G_VOID);
555             else
556 0           call_method("cancel", G_VOID);
557              
558 0 0         FREETMPS;
559 0           LEAVE;
560             }
561 0 0         else if(flags & CB_SEQ_ANY) {
562 0           SV *fseq = cb->seq.f;
563              
564 0 0         if(!SvOK(fseq)) {
565 0           warn("%" SVf " lost a sequence Future",
566             SVfARG(future_mortal_selfstr(selfsv)));
567 0           return;
568             }
569              
570 0           SV *f2 = invoke_seq_callback(self, selfsv, cb);
571 0 0         if(f2 == fseq)
572             /* immediate fail */
573 0           return;
574              
575 0           future_on_cancel(fseq, f2);
576              
577 0 0         if(future_is_ready(f2)) {
578 0 0         if(!future_is_cancelled(f2))
579 0           future_on_ready(f2, fseq);
580 0 0         else if(flags & CB_CANCEL)
581 0           future_cancel(fseq);
582             }
583             else {
584 0           struct FutureXS *f2self = get_future(f2);
585 0           struct FutureXSCallback cb2 = {
586             .flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
587 0           .code = sv_rvweaken(newSVsv(fseq)),
588             };
589 0           push_callback(f2self, &cb2);
590             }
591              
592             assert(SvREFCNT(f2) == 1);
593 0           SvREFCNT_dec(f2);
594             }
595             else {
596 0 0         SV *code = CB_NONSEQ_CODE(cb);
597              
598 0           dSP;
599              
600 0           ENTER;
601 0           SAVETMPS;
602              
603 0 0         PUSHMARK(SP);
604 0 0         if(flags & CB_SELF)
605 0 0         XPUSHs(selfsv);
606 0 0         if((flags & CB_RESULT) && result)
    0          
607 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
608              
609 0           PUTBACK;
610             assert(SvOK(code));
611 0           call_sv(code, G_VOID);
612              
613 0 0         FREETMPS;
614 0           LEAVE;
615             }
616             }
617              
618             #define revoke_on_cancel(rev) S_revoke_on_cancel(aTHX_ rev)
619 0           static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
620             {
621 0 0         if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
    0          
622             assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
623             assert(SvROK(rev->toclear_sv_at));
624 0           sv_set_undef(SvRV(rev->toclear_sv_at));
625 0           SvREFCNT_dec(rev->toclear_sv_at);
626 0           rev->toclear_sv_at = NULL;
627             }
628              
629 0 0         if(!SvOK(rev->precedent_f))
630 0           return;
631              
632 0           struct FutureXS *self = get_future(rev->precedent_f);
633              
634 0           self->empty_revocation_slots++;
635              
636 0           AV *on_cancel = self->on_cancel;
637 0 0         if(self->empty_revocation_slots >= 8 && on_cancel &&
    0          
    0          
638 0 0         self->empty_revocation_slots >= AvFILL(on_cancel)/2) {
639              
640             // Squash up the array to contain only defined values
641 0           SV **wrsv = AvARRAY(on_cancel),
642 0           **rdsv = AvARRAY(on_cancel),
643 0 0         **end = AvARRAY(on_cancel) + AvFILL(on_cancel);
644              
645 0 0         while(rdsv <= end) {
646 0 0         if(SvOK(*rdsv))
647             // Keep this one
648 0           *(wrsv++) = *rdsv;
649             else
650             // Free this one
651 0           SvREFCNT_dec(*rdsv);
652              
653 0           rdsv++;
654             }
655 0           AvFILLp(on_cancel) = wrsv - AvARRAY(on_cancel) - 1;
656              
657 0           self->empty_revocation_slots = 0;
658             }
659             }
660              
661             #define mark_ready(self, selfsv, state) S_mark_ready(aTHX_ self, selfsv, state)
662 0           static void S_mark_ready(pTHX_ struct FutureXS *self, SV *selfsv, const char *state)
663             {
664 0           self->ready = true;
665             // TODO: self->ready_at
666 0 0         if(capture_times)
667 0           gettimeofday(&self->rtime, NULL);
668              
669             /* Make sure self doesn't disappear during this function */
670 0           SvREFCNT_inc(SvRV(selfsv));
671 0           SAVEFREESV(SvRV(selfsv));
672              
673 0 0         if(self->precedent_f) {
674 0           SvREFCNT_dec(self->precedent_f);
675 0           self->precedent_f = NULL;
676             }
677              
678 0           clear_on_cancel(self);
679 0 0         if(self->revoke_when_ready) {
680 0           AV *revocations = self->revoke_when_ready;
681 0 0         for(size_t i = 0; i < av_count(revocations); i++) {
682 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
683 0           revoke_on_cancel(rev);
684              
685 0           SvREFCNT_dec(rev->precedent_f);
686 0           Safefree(rev);
687             }
688 0           AvFILLp(revocations) = -1;
689 0           SvREFCNT_dec(revocations);
690              
691 0           self->revoke_when_ready = NULL;
692             }
693              
694 0 0         if(!self->callbacks)
695 0           return;
696              
697 0           AV *callbacks = self->callbacks;
698              
699 0           struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
700 0           size_t i, n = av_count(callbacks);
701 0 0         for(i = 0; i < n; i++) {
702 0           struct FutureXSCallback *cb = cbs[i];
703 0           invoke_callback(self, selfsv, cb);
704             }
705              
706 0           destroy_callbacks(self);
707             }
708              
709             #define make_sequence(f1, cb) S_make_sequence(aTHX_ f1, cb)
710 0           static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
711             {
712 0           struct FutureXS *self = get_future(f1);
713              
714 0           int flags = cb->flags;
715              
716 0 0         if(self->ready) {
717             // TODO: CB_SEQ_IM*
718              
719 0           SV *f2 = invoke_seq_callback(self, f1, cb);
720 0           clear_callback(cb);
721 0           return f2;
722             }
723              
724 0           SV *fseq = future_new_proto(f1);
725 0 0         if(cb->flags & CB_SEQ_CANCEL)
726 0           future_on_cancel(fseq, f1);
727              
728 0           cb->flags |= CB_DONE|CB_FAIL;
729 0 0         if(cb->seq.thencode)
730 0           cb->seq.thencode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.thencode));
731 0 0         if(cb->seq.elsecode)
732 0           cb->seq.elsecode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.elsecode));
733 0           cb->seq.f = sv_rvweaken(newSVsv(fseq));
734              
735 0           push_callback(self, cb);
736              
737 0           return fseq;
738             }
739              
740             // TODO: move to a hax/ file
741             #define CvNAME_FILE_LINE(cv) S_CvNAME_FILE_LINE(aTHX_ cv)
742 0           static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
743             {
744 0 0         if(!CvANON(cv)) {
745 0           SV *ret = newSVpvf("HvNAME::GvNAME");
746 0           return ret;
747             }
748              
749 0           OP *cop = CvSTART(cv);
750 0 0         while(cop && OP_CLASS(cop) != OA_COP)
    0          
    0          
    0          
751 0           cop = cop->op_next;
752              
753 0 0         if(!cop)
754 0           return newSVpvs("__ANON__");
755              
756 0 0         return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
757             }
758              
759 0           static const char *statestr(struct FutureXS *self)
760             {
761 0 0         if(!self->ready)
762 0           return "pending";
763 0 0         if(self->cancelled)
764 0           return "cancelled";
765 0 0         if(self->failure)
766 0           return "failed";
767              
768 0           return "done";
769             }
770              
771 0           void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
772             {
773 0           struct FutureXS *self = get_future(f);
774              
775 0 0         if(self->cancelled)
776 0           return;
777              
778 0 0         if(self->ready)
779 0           croak("%" SVf " is already %s and cannot be ->done",
780             SVfARG(f), statestr(self));
781             // TODO: test subs
782              
783 0           self->result = newAV_svn_dup(svp, n);
784 0           mark_ready(self, f, "done");
785             }
786              
787 0           void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
788             {
789 0           struct FutureXS *self = get_future(f);
790              
791 0 0         if(self->cancelled)
792 0           return;
793              
794 0 0         if(self->ready)
795 0           croak("%" SVf " is already %s and cannot be ->fail'ed",
796             SVfARG(f), statestr(self));
797              
798 0 0         if(n == 1 &&
799 0 0         SvROK(svp[0]) && SvOBJECT(SvRV(svp[0])) &&
800 0           sv_derived_from(svp[0], "Future::Exception")) {
801 0           SV *exception = svp[0];
802 0           AV *failure = self->failure = newAV();
803              
804 0           dSP;
805              
806             {
807 0           ENTER;
808 0           SAVETMPS;
809              
810 0 0         EXTEND(SP, 1);
811 0 0         PUSHMARK(SP);
812 0           PUSHs(sv_mortalcopy(exception));
813 0           PUTBACK;
814              
815 0           call_method("message", G_SCALAR);
816              
817 0           SPAGAIN;
818              
819 0           av_push(failure, SvREFCNT_inc(POPs));
820              
821 0           PUTBACK;
822 0 0         FREETMPS;
823 0           LEAVE;
824             }
825              
826             {
827 0           ENTER;
828 0           SAVETMPS;
829              
830 0 0         EXTEND(SP, 1);
831 0 0         PUSHMARK(SP);
832 0           PUSHs(sv_mortalcopy(exception));
833 0           PUTBACK;
834              
835 0           call_method("category", G_SCALAR);
836              
837 0           SPAGAIN;
838              
839 0           av_push(failure, SvREFCNT_inc(POPs));
840              
841 0           PUTBACK;
842 0 0         FREETMPS;
843 0           LEAVE;
844             }
845              
846             {
847 0           ENTER;
848 0           SAVETMPS;
849              
850 0 0         EXTEND(SP, 1);
851 0 0         PUSHMARK(SP);
852 0           PUSHs(sv_mortalcopy(exception));
853 0           PUTBACK;
854              
855 0           SSize_t count = call_method("details", G_LIST);
856              
857 0           SPAGAIN;
858              
859 0           SV **retp = SP - count + 1;
860              
861 0 0         for(SSize_t i = 0; i < count; i++)
862 0           av_push(failure, SvREFCNT_inc(retp[i]));
863 0           SP -= count;
864              
865 0           PUTBACK;
866 0 0         FREETMPS;
867 0           LEAVE;
868             }
869             }
870             else {
871 0           self->failure = newAV_svn_dup(svp, n);
872             }
873              
874 0           mark_ready(self, f, "failed");
875             }
876              
877             #define future_failp(f, s) Future_failp(aTHX_ f, s)
878 0           void Future_failp(pTHX_ SV *f, const char *s)
879             {
880 0           struct FutureXS *self = get_future(f);
881              
882 0 0         if(self->cancelled)
883 0           return;
884              
885 0 0         if(self->ready)
886 0           croak("%" SVf " is already %s and cannot be ->fail'ed",
887             SVfARG(f), statestr(self));
888              
889 0           self->failure = newAV();
890 0           av_push(self->failure, newSVpv(s, strlen(s)));
891 0           mark_ready(self, f, "failed");
892             }
893              
894 0           void Future_on_cancel(pTHX_ SV *f, SV *code)
895             {
896 0           struct FutureXS *self = get_future(f);
897              
898 0 0         if(self->ready)
899 0           return;
900              
901 0           bool is_future = sv_is_future(code);
902             // TODO: is_future or callable(code) or croak
903              
904 0 0         if(!self->on_cancel)
905 0           self->on_cancel = newAV();
906              
907 0           SV *rv = newSVsv((SV *)code);
908 0           av_push(self->on_cancel, rv);
909              
910 0 0         if(is_future) {
911             struct FutureXSRevocation *rev;
912 0           Newx(rev, 1, struct FutureXSRevocation);
913              
914 0           rev->precedent_f = sv_rvweaken(newSVsv(f));
915 0           rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));
916              
917 0           struct FutureXS *codeself = get_future(code);
918 0 0         if(!codeself->revoke_when_ready)
919 0           codeself->revoke_when_ready = newAV();
920              
921 0           av_push(codeself->revoke_when_ready, (SV *)rev);
922             }
923             }
924              
925 0           void Future_on_ready(pTHX_ SV *f, SV *code)
926             {
927 0           struct FutureXS *self = get_future(f);
928              
929 0           bool is_future = sv_is_future(code);
930             // TODO: is_future or callable(code) or croak
931              
932 0           int flags = CB_ALWAYS|CB_SELF;
933 0 0         if(is_future)
934 0           flags |= CB_IS_FUTURE;
935              
936 0           struct FutureXSCallback cb = {
937             .flags = flags,
938             .code = code,
939             };
940              
941 0 0         if(self->ready)
942 0           invoke_callback(self, f, &cb);
943             else {
944 0           cb.code = wrap_cb(f, "on_ready", cb.code);
945 0           push_callback(self, &cb);
946             }
947 0           }
948              
949 0           void Future_on_done(pTHX_ SV *f, SV *code)
950             {
951 0           struct FutureXS *self = get_future(f);
952              
953 0           bool is_future = sv_is_future(code);
954             // TODO: is_future or callable(code) or croak
955              
956 0           int flags = CB_DONE|CB_RESULT;
957 0 0         if(is_future)
958 0           flags |= CB_IS_FUTURE;
959              
960 0           struct FutureXSCallback cb = {
961             .flags = flags,
962             .code = code,
963             };
964              
965 0 0         if(self->ready)
966 0           invoke_callback(self, f, &cb);
967             else {
968 0           cb.code = wrap_cb(f, "on_done", cb.code);
969 0           push_callback(self, &cb);
970             }
971 0           }
972              
973 0           void Future_on_fail(pTHX_ SV *f, SV *code)
974             {
975 0           struct FutureXS *self = get_future(f);
976              
977 0           bool is_future = sv_is_future(code);
978             // TODO: is_future or callable(code) or croak
979              
980 0           int flags = CB_FAIL|CB_RESULT;
981 0 0         if(is_future)
982 0           flags |= CB_IS_FUTURE;
983              
984 0           struct FutureXSCallback cb = {
985             .flags = flags,
986             .code = code,
987             };
988              
989 0 0         if(self->ready)
990 0           invoke_callback(self, f, &cb);
991             else {
992 0           cb.code = wrap_cb(f, "on_fail", cb.code);
993 0           push_callback(self, &cb);
994             }
995 0           }
996              
997             #define future_await(f) Future_await(aTHX_ f)
998 0           static void Future_await(pTHX_ SV *f)
999             {
1000 0           dSP;
1001              
1002 0           ENTER;
1003 0           SAVETMPS;
1004              
1005 0 0         PUSHMARK(SP);
1006 0 0         mXPUSHs(newSVsv(f));
1007 0           PUTBACK;
1008              
1009 0           call_method("await", G_VOID);
1010              
1011 0 0         FREETMPS;
1012 0           LEAVE;
1013 0           }
1014              
1015 0           AV *Future_get_result_av(pTHX_ SV *f, bool await)
1016             {
1017 0           struct FutureXS *self = get_future(f);
1018              
1019 0 0         if(await && !self->ready)
    0          
1020 0           future_await(f);
1021              
1022 0 0         if(!self->ready)
1023 0           croak("%" SVf " is not yet ready", SVfARG(f));
1024              
1025 0 0         if(self->failure) {
1026 0           self->reported = true;
1027              
1028 0           SV *exception = AvARRAY(self->failure)[0];
1029 0 0         if(av_count(self->failure) > 1) {
1030 0           dSP;
1031 0           ENTER;
1032 0           SAVETMPS;
1033              
1034 0 0         PUSHMARK(SP);
1035 0 0         EXTEND(SP, 1 + av_count(self->failure));
1036 0           mPUSHpvs("Future::Exception");
1037 0 0         for(SSize_t i = 0; i < av_count(self->failure); i++)
1038 0           PUSHs(sv_mortalcopy(AvARRAY(self->failure)[i]));
1039 0           PUTBACK;
1040              
1041 0           call_method("new", G_SCALAR);
1042              
1043 0           SPAGAIN;
1044              
1045 0           exception = SvREFCNT_inc(POPs);
1046              
1047 0           PUTBACK;
1048 0 0         FREETMPS;
1049 0           LEAVE;
1050              
1051 0           sv_2mortal(exception);
1052             }
1053              
1054 0 0         if(SvROK(exception) || SvPV_nolen(exception)[SvCUR(exception)-1] == '\n')
    0          
1055 0           die_sv(exception);
1056             else {
1057             /* We'd like to call Carp::croak to do the @CARP_NOT logic, but it gets
1058             * confused about a missing callframe first because this is XS. We'll
1059             * reïmplement the logic here
1060             */
1061             I32 cxix;
1062 0 0         for(cxix = cxstack_ix; cxix; cxix--) {
1063 0 0         if(CxTYPE(&cxstack[cxix]) != CXt_SUB)
1064 0           continue;
1065              
1066 0           const CV *cv = cxstack[cxix].blk_sub.cv;
1067 0 0         if(!cv)
1068 0           continue;
1069              
1070 0 0         const char *stashname = HvNAME(CvSTASH(cv));
    0          
    0          
    0          
    0          
    0          
1071 0 0         if(!stashname)
1072 0           continue;
1073              
1074             // The essence of the @CARP_NOT logic
1075 0 0         if(strEQ(stashname, "Future::_base"))
1076 0           continue;
1077              
1078 0 0         const COP *cop = cxix < cxstack_ix ? cxstack[cxix+1].blk_oldcop : PL_curcop;
1079              
1080 0 0         sv_catpvf(exception, " at %s line %d.\n", CopFILE(cop), CopLINE(cop));
1081 0           break;
1082             }
1083              
1084 0           die_sv(exception);
1085             }
1086             }
1087              
1088 0 0         if(self->cancelled)
1089 0           croak("%" SVf " was cancelled",
1090             SVfARG(future_mortal_selfstr(f)));
1091              
1092 0 0         if(!self->result)
1093 0           self->result = newAV();
1094              
1095 0           return self->result;
1096             }
1097              
1098 0           AV *Future_get_failure_av(pTHX_ SV *f)
1099             {
1100 0           struct FutureXS *self = get_future(f);
1101              
1102 0 0         if(!self->ready)
1103 0           future_await(f);
1104              
1105 0 0         if(!self->failure)
1106 0           return NULL;
1107              
1108 0           return self->failure;
1109             }
1110              
1111 0           void Future_cancel(pTHX_ SV *f)
1112             {
1113             /* Specifically don't make it an error to ->cancel a future instance not
1114             * available in this thread; as it often appears in defer / DESTROY / etc
1115             */
1116 0           struct FutureXS *self = maybe_get_future(f);
1117 0 0         if(!self)
1118 0           return;
1119              
1120 0 0         if(self->ready)
1121 0           return;
1122              
1123 0           self->cancelled = true;
1124 0           AV *on_cancel = self->on_cancel;
1125              
1126 0 0         if(self->subs) {
1127 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
1128 0           U8 flags = self->subflags[i];
1129 0 0         if(!(flags & SUBFLAG_NO_CANCEL))
1130 0           future_cancel(AvARRAY(self->subs)[i]);
1131             }
1132             }
1133              
1134             // TODO: maybe we need to clear these out from self before we do this, in
1135             // case of recursion?
1136              
1137 0 0         for(int i = on_cancel ? AvFILL(on_cancel) : -1; i >= 0; i--) {
    0          
    0          
1138 0           SV *code = AvARRAY(on_cancel)[i];
1139 0 0         if(!SvOK(code))
1140 0           continue;
1141              
1142 0 0         if(sv_is_future(code)) {
1143 0           dSP;
1144              
1145 0           ENTER;
1146 0           SAVETMPS;
1147              
1148 0 0         PUSHMARK(SP);
1149 0           PUSHs(code);
1150 0           PUTBACK;
1151              
1152 0           call_method("cancel", G_VOID);
1153              
1154 0 0         FREETMPS;
1155 0           LEAVE;
1156             }
1157             else {
1158 0           dSP;
1159              
1160 0           ENTER;
1161 0           SAVETMPS;
1162              
1163 0 0         PUSHMARK(SP);
1164 0           PUSHs(f);
1165 0           PUTBACK;
1166              
1167             assert(SvOK(code));
1168 0           call_sv(code, G_VOID);
1169              
1170 0 0         FREETMPS;
1171 0           LEAVE;
1172             }
1173             }
1174              
1175 0           mark_ready(self, f, "cancel");
1176             }
1177              
1178 0           SV *Future_without_cancel(pTHX_ SV *f)
1179             {
1180 0           struct FutureXSCallback cb = {
1181             .flags = CB_SEQ_READY|CB_CANCEL, /* without CB_SEQ_CANCEL */
1182             /* no code */
1183             };
1184              
1185 0           SV *ret = make_sequence(f, &cb);
1186 0           struct FutureXS *self = get_future(ret);
1187              
1188 0           self->precedent_f = newSVsv(f);
1189              
1190 0           return ret;
1191             }
1192              
1193 0           SV *Future_then(pTHX_ SV *f, U32 flags, SV *thencode, SV *elsecode)
1194             {
1195 0           struct FutureXSCallback cb = {
1196             .flags = CB_SEQ_ANY|CB_RESULT,
1197             .seq.thencode = thencode,
1198             .seq.elsecode = elsecode,
1199             };
1200 0 0         if(flags & FUTURE_THEN_WITH_F)
1201 0           cb.flags |= CB_SELF;
1202              
1203 0           return make_sequence(f, &cb);
1204             }
1205              
1206 0           SV *Future_followed_by(pTHX_ SV *f, SV *code)
1207             {
1208 0           struct FutureXSCallback cb = {
1209             .flags = CB_SEQ_ANY|CB_SELF,
1210             .seq.thencode = code,
1211 0           .seq.elsecode = SvREFCNT_inc(code),
1212             };
1213              
1214 0           return make_sequence(f, &cb);
1215             }
1216              
1217 0           SV *Future_thencatch(pTHX_ SV *f, U32 flags, SV *thencode, HV *catches, SV *elsecode)
1218             {
1219 0           struct FutureXSCallback cb = {
1220             .flags = CB_SEQ_ANY|CB_RESULT,
1221             .seq.thencode = thencode,
1222             .seq.elsecode = elsecode,
1223             .seq.catches = catches,
1224             };
1225 0 0         if(flags & FUTURE_THEN_WITH_F)
1226 0           cb.flags |= CB_SELF;
1227              
1228 0           return make_sequence(f, &cb);
1229             }
1230              
1231             #define future_new_subsv(cls, subs, n) S_future_new_subsv(aTHX_ cls, subs, n)
1232 0           static SV *S_future_new_subsv(pTHX_ const char *cls, SV **subs, size_t n)
1233             {
1234 0           HV *future_stash = get_hv("Future::", 0);
1235             assert(future_stash);
1236              
1237             /* Find the best prototype; pick the first derived instance if there is
1238             * one */
1239 0           SV *proto = NULL;
1240 0           size_t subcount = 0;
1241 0 0         for(Size_t i = 0; i < n; i++) {
1242 0 0         if(!SvROK(subs[i]) && SvPOK(subs[i]) && strEQ(SvPVX(subs[i]), "also"))
    0          
    0          
1243 0           i++;
1244              
1245 0 0         if(!SvROK(subs[i]) || !SvOBJECT(SvRV(subs[i])))
    0          
1246 0           croak("Expected a Future, got %" SVf, SVfARG(subs[i]));
1247              
1248 0           subcount++;
1249              
1250 0 0         if(!proto && SvSTASH(SvRV(subs[i])) != future_stash)
    0          
1251 0           proto = subs[i];
1252             }
1253              
1254 0 0         SV *f = proto ? future_new_proto(proto) : future_new(cls);
1255 0           struct FutureXS *self = get_future(f);
1256              
1257 0 0         if(!self->subs)
1258 0           self->subs = newAV();
1259 0           av_extend(self->subs, subcount);
1260 0 0         if(!self->subflags)
1261 0           Newx(self->subflags, subcount, U8);
1262              
1263 0 0         for(Size_t i = 0, subi = 0; i < n; i++, subi++) {
1264 0           U8 flags = 0;
1265 0 0         if(!SvROK(subs[i]) && SvPOK(subs[i]) && strEQ(SvPVX(subs[i]), "also"))
    0          
    0          
1266 0           flags |= SUBFLAG_NO_CANCEL, i++;
1267              
1268 0           av_store(self->subs, subi, newSVsv(subs[i]));
1269 0           self->subflags[subi] = flags;
1270             }
1271              
1272 0           return f;
1273             }
1274              
1275             #define copy_result(self, src) S_copy_result(aTHX_ self, src)
1276 0           static void S_copy_result(pTHX_ struct FutureXS *self, SV *src)
1277             {
1278             /* TODO: Handle non-Future::XS instances too */
1279 0           struct FutureXS *srcself = get_future(src);
1280              
1281             assert(srcself->ready);
1282             assert(!srcself->cancelled);
1283              
1284 0 0         if(srcself->failure) {
1285 0           self->failure = newAV_svn_dup(AvARRAY(srcself->failure), av_count(srcself->failure));
1286             }
1287             else {
1288             assert(srcself->result);
1289 0           self->result = newAV_svn_dup(AvARRAY(srcself->result), av_count(srcself->result));
1290             }
1291 0           }
1292              
1293             #define cancel_pending_subs(self) S_cancel_pending_subs(aTHX_ self)
1294 0           static void S_cancel_pending_subs(pTHX_ struct FutureXS *self)
1295             {
1296 0 0         if(!self->subs)
1297 0           return;
1298              
1299 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
1300 0           SV *sub = AvARRAY(self->subs)[i];
1301 0           U8 flags = self->subflags[i];
1302              
1303 0 0         if(!(flags & SUBFLAG_NO_CANCEL) && !future_is_ready(sub))
    0          
1304 0           future_cancel(sub);
1305             }
1306             }
1307              
1308 0           XS_INTERNAL(sub_on_ready_waitall)
1309             {
1310 0           dXSARGS;
1311              
1312 0           SV *f = XSANY_sv;
1313 0 0         if(!SvOK(f))
1314 0           return;
1315              
1316             /* Make sure self doesn't disappear during this function */
1317 0           SvREFCNT_inc(SvRV(f));
1318 0           SAVEFREESV(SvRV(f));
1319              
1320 0           struct FutureXS *self = get_future(f);
1321              
1322 0           self->pending_subs--;
1323              
1324 0 0         if(self->pending_subs)
1325 0           XSRETURN(0);
1326              
1327             /* TODO: This is really just newAVav() */
1328 0           self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
1329 0           mark_ready(self, f, "wait_all");
1330             }
1331              
1332 0           SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
1333             {
1334 0           SV *f = future_new_subsv(cls, subs, n);
1335 0           struct FutureXS *self = get_future(f);
1336              
1337             /* Reïnit subs + n */
1338 0           subs = AvARRAY(self->subs);
1339 0           n = av_count(self->subs);
1340              
1341 0           self->pending_subs = 0;
1342 0 0         for(Size_t i = 0; i < n; i++) {
1343             /* TODO: This should probably use some API function to make it transparent */
1344 0 0         if(!future_is_ready(subs[i]))
1345 0           self->pending_subs++;
1346             }
1347              
1348 0 0         if(!self->pending_subs) {
1349 0           self->result = newAV_svn_dup(subs, n);
1350 0           mark_ready(self, f, "wait_all");
1351              
1352 0           return f;
1353             }
1354              
1355 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
1356 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1357 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1358              
1359 0           GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
1360 0           CvGV_set(sub_on_ready, gv);
1361 0           CvANON_off(sub_on_ready);
1362              
1363 0 0         for(Size_t i = 0; i < n; i++) {
1364 0 0         if(!future_is_ready(subs[i]))
1365 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1366             }
1367              
1368 0           SvREFCNT_dec(sub_on_ready);
1369              
1370 0           return f;
1371             }
1372              
1373 0           XS_INTERNAL(sub_on_ready_waitany)
1374             {
1375 0           dXSARGS;
1376 0           SV *thissub = ST(0);
1377              
1378 0           SV *f = XSANY_sv;
1379 0 0         if(!SvOK(f))
1380 0           return;
1381              
1382             /* Make sure self doesn't disappear during this function */
1383 0           SvREFCNT_inc(SvRV(f));
1384 0           SAVEFREESV(SvRV(f));
1385              
1386 0           struct FutureXS *self = get_future(f);
1387              
1388 0 0         if(self->result || self->failure)
    0          
1389 0           return;
1390              
1391 0           self->pending_subs--;
1392              
1393 0           bool this_cancelled = future_is_cancelled(thissub);
1394              
1395 0 0         if(self->pending_subs && this_cancelled)
    0          
1396 0           return;
1397              
1398 0 0         if(this_cancelled) {
1399 0           future_failp(f, "All component futures were cancelled");
1400 0           return;
1401             }
1402             else
1403 0           copy_result(self, thissub);
1404              
1405 0           cancel_pending_subs(self);
1406              
1407 0           mark_ready(self, f, "wait_any");
1408             }
1409              
1410 0           SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
1411             {
1412 0           SV *f = future_new_subsv(cls, subs, n);
1413 0           struct FutureXS *self = get_future(f);
1414              
1415             /* Reïnit subs + n */
1416 0           subs = AvARRAY(self->subs);
1417 0           n = av_count(self->subs);
1418              
1419 0 0         if(!n) {
1420 0           future_failp(f, "Cannot ->wait_any with no subfutures");
1421 0           return f;
1422             }
1423              
1424 0           SV *immediate_ready = NULL;
1425 0 0         for(Size_t i = 0; i < n; i++) {
1426             /* TODO: This should probably use some API function to make it transparent */
1427 0 0         if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
    0          
1428 0           immediate_ready = subs[i];
1429 0           break;
1430             }
1431             }
1432              
1433 0 0         if(immediate_ready) {
1434 0           copy_result(self, immediate_ready);
1435              
1436 0           cancel_pending_subs(self);
1437              
1438 0           mark_ready(self, f, "wait_any");
1439              
1440 0           return f;
1441             }
1442              
1443 0           self->pending_subs = 0;
1444              
1445 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
1446 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1447 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1448              
1449 0           GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
1450 0           CvGV_set(sub_on_ready, gv);
1451 0           CvANON_off(sub_on_ready);
1452              
1453 0 0         for(Size_t i = 0; i < n; i++) {
1454 0 0         if(future_is_cancelled(subs[i]))
1455 0           continue;
1456              
1457 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1458 0           self->pending_subs++;
1459             }
1460              
1461 0           SvREFCNT_dec(sub_on_ready);
1462              
1463 0           return f;
1464             }
1465              
1466             #define compose_needsall_result(self) S_compose_needsall_result(aTHX_ self)
1467 0           static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
1468             {
1469 0           AV *result = self->result = newAV();
1470 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
1471 0           SV *sub = AvARRAY(self->subs)[i];
1472 0           struct FutureXS *subself = get_future(sub);
1473             assert(subself->result);
1474 0           av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
1475             }
1476 0           }
1477              
1478 0           XS_INTERNAL(sub_on_ready_needsall)
1479             {
1480 0           dXSARGS;
1481 0           SV *thissub = ST(0);
1482              
1483 0           SV *f = XSANY_sv;
1484 0 0         if(!SvOK(f))
1485 0           return;
1486              
1487             /* Make sure self doesn't disappear during this function */
1488 0           SvREFCNT_inc(SvRV(f));
1489 0           SAVEFREESV(SvRV(f));
1490              
1491 0           struct FutureXS *self = get_future(f);
1492              
1493 0 0         if(self->result || self->failure)
    0          
1494 0           return;
1495              
1496 0 0         if(future_is_cancelled(thissub)) {
1497 0           future_failp(f, "A component future was cancelled");
1498 0           cancel_pending_subs(self);
1499 0           return;
1500             }
1501 0 0         else if(future_is_failed(thissub)) {
1502 0           copy_result(self, thissub);
1503 0           cancel_pending_subs(self);
1504 0           mark_ready(self, f, "needs_all");
1505             }
1506             else {
1507 0           self->pending_subs--;
1508 0 0         if(self->pending_subs)
1509 0           return;
1510 0           compose_needsall_result(self);
1511 0           mark_ready(self, f, "needs_all");
1512             }
1513             }
1514              
1515 0           SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
1516             {
1517 0           SV *f = future_new_subsv(cls, subs, n);
1518 0           struct FutureXS *self = get_future(f);
1519              
1520             /* Reïnit subs + n */
1521 0           subs = AvARRAY(self->subs);
1522 0           n = av_count(self->subs);
1523              
1524 0 0         if(!n) {
1525 0           future_donev(f, NULL, 0);
1526 0           return f;
1527             }
1528              
1529 0           SV *immediate_fail = NULL;
1530 0 0         for(Size_t i = 0; i < n; i++) {
1531 0 0         if(future_is_cancelled(subs[i])) {
1532 0           future_failp(f, "A component future was cancelled");
1533 0           cancel_pending_subs(self);
1534 0           return f;
1535             }
1536 0 0         if(future_is_failed(subs[i])) {
1537 0           immediate_fail = subs[i];
1538 0           break;
1539             }
1540             }
1541              
1542 0 0         if(immediate_fail) {
1543 0           copy_result(self, immediate_fail);
1544 0           cancel_pending_subs(self);
1545 0           mark_ready(self, f, "needs_all");
1546 0           return f;
1547             }
1548              
1549 0           self->pending_subs = 0;
1550              
1551 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
1552 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1553 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1554              
1555 0           GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
1556 0           CvGV_set(sub_on_ready, gv);
1557 0           CvANON_off(sub_on_ready);
1558              
1559 0 0         for(Size_t i = 0; i < n; i++) {
1560 0 0         if(future_is_ready(subs[i]))
1561 0           continue;
1562              
1563 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1564 0           self->pending_subs++;
1565             }
1566              
1567 0 0         if(!self->pending_subs) {
1568 0           compose_needsall_result(self);
1569 0           mark_ready(self, f, "needs_all");
1570             }
1571              
1572 0           SvREFCNT_dec(sub_on_ready);
1573              
1574 0           return f;
1575             }
1576              
1577 0           XS_INTERNAL(sub_on_ready_needsany)
1578             {
1579 0           dXSARGS;
1580 0           SV *thissub = ST(0);
1581              
1582 0           SV *f = XSANY_sv;
1583 0 0         if(!SvOK(f))
1584 0           return;
1585              
1586             /* Make sure self doesn't disappear during this function */
1587 0           SvREFCNT_inc(SvRV(f));
1588 0           SAVEFREESV(SvRV(f));
1589              
1590 0           struct FutureXS *self = get_future(f);
1591              
1592 0 0         if(self->result || self->failure)
    0          
1593 0           return;
1594              
1595 0           self->pending_subs--;
1596              
1597 0           bool this_cancelled = future_is_cancelled(thissub);
1598              
1599 0 0         if(self->pending_subs && this_cancelled)
    0          
1600 0           return;
1601              
1602 0 0         if(this_cancelled) {
1603 0           future_failp(f, "All component futures were cancelled");
1604             }
1605 0 0         else if(future_is_failed(thissub)) {
1606 0 0         if(self->pending_subs)
1607 0           return;
1608              
1609 0           copy_result(self, thissub);
1610 0           mark_ready(self, f, "needs_any");
1611             }
1612             else {
1613 0           copy_result(self, thissub);
1614 0           cancel_pending_subs(self);
1615 0           mark_ready(self, f, "needs_any");
1616             }
1617             }
1618              
1619 0           SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
1620             {
1621 0           SV *f = future_new_subsv(cls, subs, n);
1622 0           struct FutureXS *self = get_future(f);
1623              
1624             /* Reïnit subs + n */
1625 0           subs = AvARRAY(self->subs);
1626 0           n = av_count(self->subs);
1627              
1628 0 0         if(!n) {
1629 0           future_failp(f, "Cannot ->needs_any with no subfutures");
1630 0           return f;
1631             }
1632              
1633 0           SV *immediate_done = NULL;
1634 0 0         for(Size_t i = 0; i < n; i++) {
1635 0 0         if(future_is_done(subs[i])) {
1636 0           immediate_done = subs[i];
1637 0           break;
1638             }
1639             }
1640              
1641 0 0         if(immediate_done) {
1642 0           copy_result(self, immediate_done);
1643 0           cancel_pending_subs(self);
1644 0           mark_ready(self, f, "needs_any");
1645 0           return f;
1646             }
1647              
1648 0           self->pending_subs = 0;
1649              
1650 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
1651 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1652 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1653              
1654 0           GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
1655 0           CvGV_set(sub_on_ready, gv);
1656 0           CvANON_off(sub_on_ready);
1657              
1658 0 0         for(Size_t i = 0; i < n; i++) {
1659 0 0         if(future_is_ready(subs[i]))
1660 0           continue;
1661              
1662 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1663 0           self->pending_subs++;
1664             }
1665              
1666 0 0         if(!self->pending_subs) {
1667 0           copy_result(self, subs[n-1]);
1668 0           mark_ready(self, f, "needs_any");
1669             }
1670              
1671 0           SvREFCNT_dec(sub_on_ready);
1672              
1673 0           return f;
1674             }
1675              
1676 0           Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
1677             {
1678 0           dSP;
1679              
1680 0           struct FutureXS *self = get_future(f);
1681              
1682 0           Size_t ret = 0;
1683 0 0         for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
    0          
1684 0           SV *sub = AvARRAY(self->subs)[i];
1685              
1686             bool want;
1687 0           switch(filter) {
1688 0           case FUTURE_SUBS_PENDING:
1689 0           want = !future_is_ready(sub);
1690 0           break;
1691              
1692 0           case FUTURE_SUBS_READY:
1693 0           want = future_is_ready(sub);
1694 0           break;
1695              
1696 0           case FUTURE_SUBS_DONE:
1697 0           want = future_is_done(sub);
1698 0           break;
1699              
1700 0           case FUTURE_SUBS_FAILED:
1701 0           want = future_is_failed(sub);
1702 0           break;
1703              
1704 0           case FUTURE_SUBS_CANCELLED:
1705 0           want = future_is_cancelled(sub);
1706 0           break;
1707             }
1708              
1709 0 0         if(want) {
1710 0 0         XPUSHs(sv_mortalcopy(sub));
1711 0           ret++;
1712             }
1713             }
1714              
1715 0           PUTBACK;
1716 0           return ret;
1717             }
1718              
1719 0           struct timeval Future_get_btime(pTHX_ SV *f)
1720             {
1721 0           struct FutureXS *self = get_future(f);
1722 0           return self->btime;
1723             }
1724              
1725 0           struct timeval Future_get_rtime(pTHX_ SV *f)
1726             {
1727 0           struct FutureXS *self = get_future(f);
1728 0           return self->rtime;
1729             }
1730              
1731 0           void Future_set_label(pTHX_ SV *f, SV *label)
1732             {
1733 0           struct FutureXS *self = get_future(f);
1734              
1735 0 0         if(self->label)
1736 0           SvREFCNT_dec(label);
1737              
1738 0           self->label = newSVsv(label);
1739 0           }
1740              
1741 0           SV *Future_get_label(pTHX_ SV *f)
1742             {
1743 0           struct FutureXS *self = get_future(f);
1744              
1745 0           return self->label;
1746             }
1747              
1748 0           void Future_set_udata(pTHX_ SV *f, SV *key, SV *value)
1749             {
1750 0           struct FutureXS *self = get_future(f);
1751              
1752 0 0         if(!self->udata)
1753 0           self->udata = newHV();
1754              
1755 0           hv_store_ent(self->udata, key, newSVsv(value), 0);
1756 0           }
1757              
1758 0           SV *Future_get_udata(pTHX_ SV *f, SV *key)
1759             {
1760 0           struct FutureXS *self = get_future(f);
1761              
1762 0 0         if(!self->udata)
1763 0           return &PL_sv_undef;
1764              
1765 0           HE *he = hv_fetch_ent(self->udata, key, 0, 0);
1766 0 0         return he ? HeVAL(he) : &PL_sv_undef;
1767             }
1768              
1769             /* DMD_HELPER assistants */
1770              
1771             #ifdef HAVE_DMD_HELPER
1772             static int dumpstruct_callback(pTHX_ DMDContext *ctx, struct FutureXSCallback *cb)
1773             {
1774             if(!(cb->flags & CB_SEQ_ANY))
1775             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback", cb, sizeof(struct FutureXSCallback),
1776             /* Some cheating here, to claim the "code" is either a CV or a Future,
1777             * depending on the CB_IS_FUTURE flag */
1778             3, ((const DMDNamedField []){
1779             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1780             {"the code CV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? NULL : cb->code},
1781             {"the Future SV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? cb->code : NULL },
1782             })
1783             );
1784             else
1785             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback(CB_SEQ)", cb, sizeof(struct FutureXSCallback),
1786             4, ((const DMDNamedField []){
1787             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1788             {"the then code CV", DMD_FIELD_PTR, .ptr = cb->seq.thencode},
1789             {"the else code CV", DMD_FIELD_PTR, .ptr = cb->seq.elsecode},
1790             {"the sequence future SV", DMD_FIELD_PTR, .ptr = cb->seq.f},
1791             })
1792             );
1793              
1794             return 0;
1795             }
1796              
1797             static int dumpstruct_revocation(pTHX_ DMDContext *ctx, struct FutureXSRevocation *rev)
1798             {
1799             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSRevocation", rev, sizeof(struct FutureXSRevocation),
1800             2, ((const DMDNamedField []){
1801             {"the precedent future SV", DMD_FIELD_PTR, .ptr = rev->precedent_f},
1802             {"the SV to clear RV", DMD_FIELD_PTR, .ptr = rev->toclear_sv_at},
1803             })
1804             );
1805              
1806             return 0;
1807             }
1808              
1809             static int dumpstruct(pTHX_ DMDContext *ctx, const SV *sv)
1810             {
1811             int ret = 0;
1812              
1813             // TODO: Add some safety checking
1814             struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV((SV *)sv));
1815              
1816             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXS", self, sizeof(struct FutureXS),
1817             12, ((const DMDNamedField []){
1818             {"ready", DMD_FIELD_BOOL, .b = self->ready},
1819             {"cancelled", DMD_FIELD_BOOL, .b = self->cancelled},
1820             {"the label SV", DMD_FIELD_PTR, .ptr = self->label},
1821             {"the result AV", DMD_FIELD_PTR, .ptr = self->result},
1822             {"the failure AV", DMD_FIELD_PTR, .ptr = self->failure},
1823             {"the callbacks AV", DMD_FIELD_PTR, .ptr = self->callbacks},
1824             {"the on_cancel AV", DMD_FIELD_PTR, .ptr = self->on_cancel},
1825             {"the revoke_when_ready AV", DMD_FIELD_PTR, .ptr = self->revoke_when_ready},
1826             {"the udata HV", DMD_FIELD_PTR, .ptr = self->udata},
1827             {"the constructed-at SV", DMD_FIELD_PTR, .ptr = self->constructed_at},
1828             {"the subs AV", DMD_FIELD_PTR, .ptr = self->subs},
1829             {"the pending sub count", DMD_FIELD_UINT, .n = self->pending_subs},
1830             })
1831             );
1832              
1833             for(size_t i = 0; self->callbacks && i < av_count(self->callbacks); i++) {
1834             struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[i];
1835             ret += dumpstruct_callback(aTHX_ ctx, cb);
1836             }
1837              
1838             for(size_t i = 0; self->revoke_when_ready && i < av_count(self->revoke_when_ready); i++) {
1839             struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(self->revoke_when_ready)[i];
1840             ret += dumpstruct_revocation(aTHX_ ctx, rev);
1841             }
1842              
1843             ret += DMD_ANNOTATE_SV(sv, (SV *)self, "the FutureXS structure");
1844              
1845             return ret;
1846             }
1847             #endif
1848              
1849             #define getenv_bool(key) S_getenv_bool(aTHX_ key)
1850 6           static bool S_getenv_bool(pTHX_ const char *key)
1851             {
1852 6           const char *val = getenv(key);
1853 6 50         if(!val || !val[0])
    0          
1854 6           return false;
1855 0 0         if(val[0] == '0' && strlen(val) == 1)
    0          
1856 0           return false;
1857 0           return true;
1858             }
1859              
1860             #ifndef newSVbool
1861             # define newSVbool(b) newSVsv(b ? &PL_sv_yes : &PL_sv_no)
1862             #endif
1863              
1864 3           void Future_reread_environment(pTHX)
1865             {
1866 3           future_debug = getenv_bool("PERL_FUTURE_DEBUG");
1867              
1868 3 50         capture_times = future_debug || getenv_bool("PERL_FUTURE_TIMES");
    50          
1869 3 50         sv_setsv(get_sv("Future::TIMES", GV_ADDMULTI), capture_times ? &PL_sv_yes : &PL_sv_no);
1870 3           }
1871              
1872 3           void Future_boot(pTHX)
1873             {
1874             #ifdef HAVE_DMD_HELPER
1875             DMD_SET_PACKAGE_HELPER("Future::XS", dumpstruct);
1876             #endif
1877              
1878 3           Future_reread_environment(aTHX);
1879              
1880             // We can only do this once
1881 3           newCONSTSUB(gv_stashpvn("Future::XS", 10, TRUE), "DEBUG", newSVbool(future_debug));
1882 3           }