File Coverage

src/future.c
Criterion Covered Total %
statement 13 889 1.4
branch 5 746 0.6
condition n/a
subroutine n/a
pod n/a
total 18 1635 1.1


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2              
3             #include "EXTERN.h"
4             #include "perl.h"
5             #include "XSUB.h"
6              
7             #include "future.h"
8              
9             #include "perl-backcompat.c.inc"
10              
11             #include "av-utils.c.inc"
12             #include "cv_set_anysv_refcounted.c.inc"
13              
14             #if !HAVE_PERL_VERSION(5, 16, 0)
15             # define false FALSE
16             # define true TRUE
17             #endif
18              
19             #ifdef HAVE_DMD_HELPER
20             # define WANT_DMD_API_044
21             # include "DMD_helper.h"
22             #endif
23              
24             #if !HAVE_PERL_VERSION(5, 16, 0)
25             # define XS_INTERNAL(name) static XS(name)
26             #endif
27              
28             #define mPUSHpvs(s) mPUSHp("" s "", sizeof(s)-1)
29              
30             static bool future_debug;
31             static bool capture_times;
32              
33             /* There's no reason these have to match those in Future.pm but for now we
34             * might as well just copy the same values
35             */
36             enum {
37             CB_DONE = (1<<0),
38             CB_FAIL = (1<<1),
39             CB_CANCEL = (1<<2),
40             CB_ALWAYS = CB_DONE|CB_FAIL|CB_CANCEL,
41              
42             CB_SELF = (1<<3),
43             CB_RESULT = (1<<4),
44              
45             CB_SEQ_READY = (1<<5),
46             CB_SEQ_CANCEL = (1<<6),
47             CB_SEQ_ANY = CB_SEQ_READY|CB_SEQ_CANCEL,
48              
49             CB_SEQ_IMDONE = (1<<7),
50             CB_SEQ_IMFAIL = (1<<8),
51              
52             CB_SEQ_STRICT = (1<<9),
53              
54             CB_IS_FUTURE = (1<<10),
55             };
56              
57             // TODO: Consider using different struct types to save memory? Or maybe it's
58             // so small a difference it doesn't matter
59             struct FutureXSCallback
60             {
61             unsigned int flags;
62             union {
63             SV *code; /* if !(flags & CB_SEQ_ANY) */
64             struct { /* if (flags & CB_SEQ_ANY) */
65             SV *thencode;
66             SV *elsecode;
67             HV *catches;
68             SV *f;
69             } seq;
70             };
71             };
72              
73             struct FutureXSRevocation
74             {
75             SV *precedent_f;
76             SV *toclear_sv_at;
77             };
78              
79             #define CB_NONSEQ_CODE(cb) \
80             ({ if((cb)->flags & CB_SEQ_ANY) croak("ARGH: CB_NONSEQ_CODE on SEQ"); (cb)->code;})
81              
82             struct FutureXS
83             {
84             unsigned int ready : 1;
85             unsigned int cancelled : 1;
86             unsigned int reported : 1;
87             SV *label;
88             AV *result; // implies done
89             AV *failure; // implies fail
90             AV *callbacks; // values are struct FutureXSCallback ptrs directly. TODO: custom ptr/fill/max
91             AV *on_cancel; // values are CVs directly
92             AV *revoke_when_ready; // values are struct FutureXSRevocation ptrs directly.
93             int empty_revocation_slots;
94              
95             HV *udata;
96              
97             struct timeval btime, rtime;
98             SV *constructed_at;
99              
100             /* For convergents
101             * TODO: consider making this an optional extra part of the body, only
102             * allocated when required
103             */
104             AV *subs;
105             Size_t pending_subs;
106              
107             /* For without_cancel, purely to keep a strongref */
108             SV *precedent_f;
109             };
110              
111             #ifdef USE_ITHREADS
112             static int future_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *param);
113              
114             static MGVTBL vtbl = {
115             .svt_dup = &future_dup,
116             };
117             #endif
118              
119 0           bool Future_sv_is_future(pTHX_ SV *sv)
120             {
121 0 0         if(!SvROK(sv) || !SvOBJECT(SvRV(sv)))
    0          
122 0           return false;
123              
124 0 0         if(sv_derived_from(sv, "Future") || sv_derived_from(sv, "Future::XS"))
    0          
125 0           return true;
126              
127 0           return false;
128             }
129              
130             #define get_future(sv) S_get_future(aTHX_ sv, FALSE)
131             #define maybe_get_future(sv) S_get_future(aTHX_ sv, TRUE)
132 0           static struct FutureXS *S_get_future(pTHX_ SV *sv, bool nullok)
133             {
134             assert(sv);
135             assert(SvROK(sv) && SvOBJECT(SvRV(sv)));
136             // TODO: Add some safety checking about class
137 0 0         struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV(SvRV(sv)));
138 0 0         if(self || nullok)
    0          
139 0           return self;
140 0           croak("Future::XS instance %" SVf " is not available in this thread", SVfARG(sv));
141             }
142              
143 0           SV *Future_new(pTHX_ const char *cls)
144             {
145 0 0         if(!cls)
146 0           cls = "Future::XS";
147              
148             struct FutureXS *self;
149 0           Newx(self, 1, struct FutureXS);
150              
151 0           self->ready = false;
152 0           self->cancelled = false;
153 0           self->reported = false;
154              
155 0           self->label = NULL;
156              
157 0 0         if(capture_times)
158 0           gettimeofday(&self->btime, NULL);
159             else
160 0           self->btime = (struct timeval){ 0 };
161              
162 0           self->rtime = (struct timeval){ 0 };
163              
164 0 0         if(future_debug)
165 0 0         self->constructed_at = newSVpvf("constructed at %s line %d", CopFILE(PL_curcop), CopLINE(PL_curcop));
166             else
167 0           self->constructed_at = NULL;
168              
169 0           self->result = NULL;
170 0           self->failure = NULL;
171              
172 0           self->callbacks = NULL;
173 0           self->on_cancel = NULL;
174 0           self->revoke_when_ready = NULL;
175 0           self->empty_revocation_slots = 0;
176              
177 0           self->udata = NULL;
178              
179 0           self->subs = NULL;
180              
181 0           self->precedent_f = NULL;
182              
183 0           SV *ret = newSV(0);
184 0           sv_setref_pv(ret, cls, self);
185              
186             #ifdef USE_ITHREADS
187             MAGIC *mg = sv_magicext(SvRV(ret), SvRV(ret), PERL_MAGIC_ext, &vtbl, NULL, 0);
188             mg->mg_flags |= MGf_DUP;
189             #endif
190              
191 0           return ret;
192             }
193              
194             #define future_new_proto(f1) Future_new_proto(aTHX_ f1)
195 0           SV *Future_new_proto(pTHX_ SV *f1)
196             {
197             assert(f1 && SvROK(f1) && SvRV(f1));
198             // TODO Shortcircuit in the common case that f1 is a Future instance
199             // return future_new(HvNAME(SvSTASH(SvRV(f1))));
200              
201 0           dSP;
202 0           ENTER;
203 0           SAVETMPS;
204              
205 0 0         EXTEND(SP, 1);
206 0 0         PUSHMARK(SP);
207 0           PUSHs(sv_mortalcopy(f1));
208 0           PUTBACK;
209              
210 0           call_method("new", G_SCALAR);
211              
212 0           SPAGAIN;
213              
214 0           SV *ret = SvREFCNT_inc(POPs);
215              
216 0           PUTBACK;
217 0 0         FREETMPS;
218 0           LEAVE;
219              
220 0           return ret;
221             }
222              
223             #ifdef USE_ITHREADS
224              
225             static int future_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *param)
226             {
227             /* We don't currently support duplicating a Future instance across thread
228             * creation/return. For now just zero out the pointer and complain if anyone
229             * tries to access it.
230             * This at least means that incidental Future instances that happen to exist
231             * in main thread memory won't be disturbed when sidecar threads are joined.
232             */
233             sv_setiv(mg->mg_obj, 0);
234             }
235             #endif
236              
237             #define clear_callback(cb) S_clear_callback(aTHX_ cb)
238 0           static void S_clear_callback(pTHX_ struct FutureXSCallback *cb)
239             {
240 0           int flags = cb->flags;
241 0 0         if(flags & CB_SEQ_ANY) {
242 0           SvREFCNT_dec(cb->seq.thencode);
243 0           SvREFCNT_dec(cb->seq.elsecode);
244 0           SvREFCNT_dec(cb->seq.catches);
245 0           SvREFCNT_dec(cb->seq.f);
246             }
247             else {
248 0 0         SvREFCNT_dec(CB_NONSEQ_CODE(cb));
249             }
250 0           }
251              
252             #define destroy_callbacks(self) S_destroy_callbacks(aTHX_ self)
253 0           static void S_destroy_callbacks(pTHX_ struct FutureXS *self)
254             {
255 0           AV *callbacksav = self->callbacks;
256 0 0         while(callbacksav && AvFILLp(callbacksav) > -1) {
    0          
257 0           struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[AvFILLp(callbacksav)--];
258 0           clear_callback(cb);
259 0           Safefree(cb);
260             }
261 0           }
262              
263 0           void Future_destroy(pTHX_ SV *f)
264             {
265             #ifdef DEBUGGING
266             // Every pointer in this function ought to have been uniquely held
267             # define UNREF(p) \
268             do { \
269             if(p) assert(SvREFCNT(p) == 1); \
270             SvREFCNT_dec((SV *)p); \
271             (p) = (void *)0xAA55AA55; \
272             } while(0)
273             #else
274             # define UNREF(p) SvREFCNT_dec((SV *)p)
275             #endif
276              
277             /* Defend against being run during global destruction */
278 0 0         if(!f || !SvROK(f))
    0          
279 0           return;
280 0           struct FutureXS *self = maybe_get_future(f);
281 0 0         if(!self)
282 0           return;
283              
284 0 0         if(future_debug &&
    0          
285 0 0         (!self->ready || (self->failure && !self->reported))) {
    0          
286 0 0         if(!self->ready)
287 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d before it was ready\n",
288 0           SVfARG(f), SVfARG(self->constructed_at),
289 0           CopFILE(PL_curcop), CopLINE(PL_curcop));
290             else {
291 0           SV *failure = AvARRAY(self->failure)[0];
292 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d with an unreported failure of: %" SVf "\n",
293 0           SVfARG(f), SVfARG(self->constructed_at),
294 0           CopFILE(PL_curcop), CopLINE(PL_curcop),
295             SVfARG(failure));
296             }
297             }
298              
299 0           UNREF(self->label);
300              
301 0           UNREF(self->result);
302              
303 0           UNREF(self->failure);
304              
305 0           destroy_callbacks(self);
306 0           UNREF(self->callbacks);
307              
308 0           UNREF(self->on_cancel);
309              
310 0           AV *revocationsav = self->revoke_when_ready;
311 0 0         while(revocationsav && AvFILLp(revocationsav) > -1) {
    0          
312 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocationsav)[AvFILLp(revocationsav)--];
313 0           UNREF(rev->precedent_f);
314 0           UNREF(rev->toclear_sv_at);
315 0           Safefree(rev);
316             }
317 0           UNREF(self->revoke_when_ready);
318              
319 0           UNREF(self->udata);
320              
321 0           UNREF(self->constructed_at);
322              
323 0           UNREF(self->subs);
324              
325 0           UNREF(self->precedent_f);
326              
327 0           Safefree(self);
328              
329             #undef UNREF
330             }
331              
332 0           bool Future_is_ready(pTHX_ SV *f)
333             {
334 0           struct FutureXS *self = get_future(f);
335 0           return self->ready;
336             }
337              
338 0           bool Future_is_done(pTHX_ SV *f)
339             {
340 0           struct FutureXS *self = get_future(f);
341 0 0         return self->ready && !self->failure && !self->cancelled;
    0          
    0          
342             }
343              
344 0           bool Future_is_failed(pTHX_ SV *f)
345             {
346 0           struct FutureXS *self = get_future(f);
347 0 0         return self->ready && self->failure;
    0          
348             }
349              
350 0           bool Future_is_cancelled(pTHX_ SV *f)
351             {
352 0           struct FutureXS *self = get_future(f);
353 0           return self->cancelled;
354             }
355              
356             #define clear_on_cancel(self) S_clear_on_cancel(aTHX_ self)
357 0           static void S_clear_on_cancel(pTHX_ struct FutureXS *self)
358             {
359 0 0         if(!self->on_cancel)
360 0           return;
361              
362 0           AV *on_cancel = self->on_cancel;
363 0           self->on_cancel = NULL;
364              
365 0           SvREFCNT_dec(on_cancel);
366             }
367              
368             #define push_callback(self, cb) S_push_callback(aTHX_ self, cb)
369 0           static void S_push_callback(pTHX_ struct FutureXS *self, struct FutureXSCallback *cb)
370             {
371             struct FutureXSCallback *new;
372 0           Newx(new, 1, struct FutureXSCallback);
373              
374 0           new->flags = cb->flags;
375 0 0         if(cb->flags & CB_SEQ_ANY) {
376 0           new->seq.thencode = cb->seq.thencode;
377 0           new->seq.elsecode = cb->seq.elsecode;
378 0           new->seq.catches = cb->seq.catches;
379 0           new->seq.f = cb->seq.f;
380             }
381             else {
382 0 0         new->code = CB_NONSEQ_CODE(cb);
383             }
384              
385 0 0         if(!self->callbacks)
386 0           self->callbacks = newAV();
387              
388 0           av_push(self->callbacks, (SV *)new);
389 0           }
390              
391             #define wrap_cb(f, name, cv) S_wrap_cb(aTHX_ f, name, cv)
392 0           static SV *S_wrap_cb(pTHX_ SV *f, const char *name, SV *cv)
393             {
394             // TODO: This is quite the speed bump having to do this, in the common case
395             // that it isn't overridden
396 0           dSP;
397 0           ENTER;
398 0           SAVETMPS;
399              
400 0 0         EXTEND(SP, 3);
401 0 0         PUSHMARK(SP);
402 0           PUSHs(sv_mortalcopy(f));
403 0           mPUSHp(name, strlen(name));
404 0           PUSHs(sv_mortalcopy(cv));
405 0           PUTBACK;
406              
407 0           call_method("wrap_cb", G_SCALAR);
408              
409 0           SPAGAIN;
410 0           SV *ret = newSVsv(POPs);
411              
412 0           PUTBACK;
413 0 0         FREETMPS;
414 0           LEAVE;
415              
416 0           return ret;
417             }
418              
419             #define invoke_seq_callback(self, selfsv, cb) S_invoke_seq_callback(aTHX_ self, selfsv, cb)
420 0           static SV *S_invoke_seq_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
421             {
422 0           int flags = cb->flags;
423              
424 0           bool is_fail = cBOOL(self->failure);
425 0 0         bool is_done = !self->cancelled && !is_fail;
    0          
426              
427 0 0         AV *result = (is_done) ? self->result :
428 0 0         (is_fail) ? self->failure :
429             NULL;
430              
431 0 0         SV *code = (is_done) ? cb->seq.thencode :
432 0 0         (is_fail) ? cb->seq.elsecode :
433             NULL;
434              
435 0 0         if(is_fail && result && av_count(result) > 1 && cb->seq.catches) {
    0          
    0          
    0          
    0          
436 0           SV *category = AvARRAY(result)[1];
437 0 0         if(SvOK(category)) {
    0          
    0          
438 0           HE *he = hv_fetch_ent(cb->seq.catches, category, 0, 0);
439 0 0         if(he && HeVAL(he))
    0          
440 0           code = HeVAL(he);
441             }
442             }
443              
444 0 0         if(!code || !SvOK(code))
    0          
    0          
    0          
445 0           return newSVsv(selfsv);
446              
447 0           dSP;
448              
449 0           ENTER;
450 0           SAVETMPS;
451              
452 0 0         PUSHMARK(SP);
453 0 0         if(flags & CB_SELF)
454 0 0         XPUSHs(selfsv);
455 0 0         if(flags & CB_RESULT)
456 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
457 0           PUTBACK;
458              
459             assert(SvOK(code));
460 0           call_sv(code, G_SCALAR|G_EVAL);
461              
462 0           SPAGAIN;
463              
464 0 0         if(SvROK(ERRSV) || SvTRUE(ERRSV)) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
465 0           POPs;
466              
467 0           SV *fseq = cb->seq.f;
468              
469 0 0         if(!fseq)
470 0           fseq = future_new_proto(selfsv);
471              
472 0 0         future_failv(fseq, &ERRSV, 1);
473              
474 0 0         FREETMPS;
475 0           LEAVE;
476              
477 0           return fseq;
478             }
479              
480 0           SV *f2 = POPs;
481 0           SvREFCNT_inc(f2);
482              
483 0           PUTBACK;
484 0 0         FREETMPS;
485 0           LEAVE;
486              
487 0 0         if(!sv_is_future(f2)) {
488 0           SV *result = f2;
489              
490             // TODO: strictness check
491              
492 0           f2 = future_new_proto(selfsv);
493 0           future_donev(f2, &result, 1);
494             }
495              
496 0           return f2;
497             }
498              
499             #define invoke_callback(self, selfsv, cb) S_invoke_callback(aTHX_ self, selfsv, cb)
500 0           static void S_invoke_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
501             {
502 0           int flags = cb->flags;
503              
504 0           bool is_cancelled = self->cancelled;
505 0           bool is_fail = cBOOL(self->failure);
506 0 0         bool is_done = !is_cancelled && !is_fail;
    0          
507              
508 0 0         AV *result = (is_done) ? self->result :
509 0 0         (is_fail) ? self->failure :
510             NULL;
511              
512 0 0         if(is_done && !(flags & CB_DONE))
    0          
513 0           return;
514 0 0         if(is_fail && !(flags & CB_FAIL))
    0          
515 0           return;
516 0 0         if(is_cancelled && !(flags & CB_CANCEL))
    0          
517 0           return;
518              
519 0 0         if(flags & CB_IS_FUTURE) {
520 0           dSP;
521              
522 0           ENTER;
523 0           SAVETMPS;
524              
525 0 0         PUSHMARK(SP);
526 0 0         XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
    0          
527 0 0         if(result)
528 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
529              
530 0           PUTBACK;
531 0 0         if(is_done)
532 0           call_method("done", G_VOID);
533 0 0         else if(is_fail)
534 0           call_method("fail", G_VOID);
535             else
536 0           call_method("cancel", G_VOID);
537              
538 0 0         FREETMPS;
539 0           LEAVE;
540             }
541 0 0         else if(flags & CB_SEQ_ANY) {
542 0           SV *fseq = cb->seq.f;
543              
544 0 0         if(!SvOK(fseq)) {
    0          
    0          
545 0 0         if(self->constructed_at)
546 0           warn("%" SVf " (%" SVf ") lost a sequence Future",
547 0           SVfARG(selfsv), SVfARG(self->constructed_at));
548             else
549 0           warn("%" SVf " lost a sequence Future",
550             SVfARG(selfsv));
551 0           return;
552             }
553              
554 0           SV *f2 = invoke_seq_callback(self, selfsv, cb);
555 0 0         if(f2 == fseq)
556             /* immediate fail */
557 0           return;
558              
559 0           future_on_cancel(fseq, f2);
560              
561 0 0         if(future_is_ready(f2)) {
562 0 0         if(!future_is_cancelled(f2))
563 0           future_on_ready(f2, fseq);
564 0 0         else if(flags & CB_CANCEL)
565 0           future_cancel(fseq);
566              
567 0           SvREFCNT_dec(f2);
568             }
569             else {
570 0           struct FutureXS *f2self = get_future(f2);
571 0           struct FutureXSCallback cb2 = {
572             .flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
573 0           .code = sv_rvweaken(newSVsv(fseq)),
574             };
575 0           push_callback(f2self, &cb2);
576             }
577             }
578             else {
579 0 0         SV *code = CB_NONSEQ_CODE(cb);
580              
581 0           dSP;
582              
583 0           ENTER;
584 0           SAVETMPS;
585              
586 0 0         PUSHMARK(SP);
587 0 0         if(flags & CB_SELF)
588 0 0         XPUSHs(selfsv);
589 0 0         if((flags & CB_RESULT) && result)
    0          
590 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
591              
592 0           PUTBACK;
593             assert(SvOK(code));
594 0           call_sv(code, G_VOID);
595              
596 0 0         FREETMPS;
597 0           LEAVE;
598             }
599             }
600              
601             #define revoke_on_cancel(rev) S_revoke_on_cancel(aTHX_ rev)
602 0           static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
603             {
604 0 0         if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
    0          
605             assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
606             assert(SvROK(rev->toclear_sv_at));
607 0           sv_set_undef(SvRV(rev->toclear_sv_at));
608 0           SvREFCNT_dec(rev->toclear_sv_at);
609 0           rev->toclear_sv_at = NULL;
610             }
611              
612 0 0         if(!SvOK(rev->precedent_f))
    0          
    0          
613 0           return;
614              
615 0           struct FutureXS *self = get_future(rev->precedent_f);
616              
617 0           self->empty_revocation_slots++;
618              
619 0           AV *on_cancel = self->on_cancel;
620 0 0         if(self->empty_revocation_slots >= 8 && on_cancel &&
    0          
    0          
621 0 0         self->empty_revocation_slots >= AvFILL(on_cancel)/2) {
622              
623             // Squash up the array to contain only defined values
624 0           SV **wrsv = AvARRAY(on_cancel),
625 0           **rdsv = AvARRAY(on_cancel),
626 0 0         **end = AvARRAY(on_cancel) + AvFILL(on_cancel);
627              
628 0 0         while(rdsv <= end) {
629 0 0         if(SvOK(*rdsv))
    0          
    0          
630             // Keep this one
631 0           *(wrsv++) = *rdsv;
632             else
633             // Free this one
634 0           SvREFCNT_dec(*rdsv);
635              
636 0           rdsv++;
637             }
638 0           AvFILLp(on_cancel) = wrsv - AvARRAY(on_cancel) - 1;
639              
640 0           self->empty_revocation_slots = 0;
641             }
642             }
643              
644             #define mark_ready(self, selfsv, state) S_mark_ready(aTHX_ self, selfsv, state)
645 0           static void S_mark_ready(pTHX_ struct FutureXS *self, SV *selfsv, const char *state)
646             {
647 0           self->ready = true;
648             // TODO: self->ready_at
649 0 0         if(capture_times)
650 0           gettimeofday(&self->rtime, NULL);
651              
652             /* Make sure self doesn't disappear during this function */
653 0           SvREFCNT_inc(SvRV(selfsv));
654 0           SAVEFREESV(SvRV(selfsv));
655              
656 0 0         if(self->precedent_f) {
657 0           SvREFCNT_dec(self->precedent_f);
658 0           self->precedent_f = NULL;
659             }
660              
661 0           clear_on_cancel(self);
662 0 0         if(self->revoke_when_ready) {
663 0           AV *revocations = self->revoke_when_ready;
664 0 0         for(size_t i = 0; i < av_count(revocations); i++) {
    0          
665 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
666 0           revoke_on_cancel(rev);
667              
668 0           SvREFCNT_dec(rev->precedent_f);
669 0           Safefree(rev);
670             }
671 0           AvFILLp(revocations) = -1;
672 0           SvREFCNT_dec(revocations);
673              
674 0           self->revoke_when_ready = NULL;
675             }
676              
677 0 0         if(!self->callbacks)
678 0           return;
679              
680 0           AV *callbacks = self->callbacks;
681              
682 0           struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
683 0 0         size_t i, n = av_count(callbacks);
684 0 0         for(i = 0; i < n; i++) {
685 0           struct FutureXSCallback *cb = cbs[i];
686 0           invoke_callback(self, selfsv, cb);
687             }
688              
689 0           destroy_callbacks(self);
690             }
691              
692             #define make_sequence(f1, cb) S_make_sequence(aTHX_ f1, cb)
693 0           static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
694             {
695 0           struct FutureXS *self = get_future(f1);
696              
697 0           int flags = cb->flags;
698              
699 0 0         if(self->ready) {
700             // TODO: CB_SEQ_IM*
701              
702 0           SV *f2 = invoke_seq_callback(self, f1, cb);
703 0           clear_callback(cb);
704 0           return f2;
705             }
706              
707 0           SV *fseq = future_new_proto(f1);
708 0 0         if(cb->flags & CB_SEQ_CANCEL)
709 0           future_on_cancel(fseq, f1);
710              
711 0           cb->flags |= CB_DONE|CB_FAIL;
712 0 0         if(cb->seq.thencode)
713 0           cb->seq.thencode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.thencode));
714 0 0         if(cb->seq.elsecode)
715 0           cb->seq.elsecode = wrap_cb(f1, "sequence", sv_2mortal(cb->seq.elsecode));
716 0           cb->seq.f = sv_rvweaken(newSVsv(fseq));
717              
718 0           push_callback(self, cb);
719              
720 0           return fseq;
721             }
722              
723             // TODO: move to a hax/ file
724             #define CvNAME_FILE_LINE(cv) S_CvNAME_FILE_LINE(aTHX_ cv)
725 0           static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
726             {
727 0 0         if(!CvANON(cv)) {
728 0           SV *ret = newSVpvf("HvNAME::GvNAME");
729 0           return ret;
730             }
731              
732 0           OP *cop = CvSTART(cv);
733 0 0         while(cop && OP_CLASS(cop) != OA_COP)
    0          
    0          
    0          
734 0           cop = cop->op_next;
735              
736 0 0         if(!cop)
737 0           return newSVpvs("__ANON__");
738              
739 0 0         return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
740             }
741              
742 0           void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
743             {
744 0           struct FutureXS *self = get_future(f);
745              
746 0 0         if(self->cancelled)
747 0           return;
748              
749 0 0         if(self->ready)
750 0           croak("%" SVf " is already (STATE) and cannot be ->done",
751             SVfARG(f));
752             // TODO: test subs
753              
754 0           self->result = newAV_svn_dup(svp, n);
755 0           mark_ready(self, f, "done");
756             }
757              
758 0           void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
759             {
760 0           struct FutureXS *self = get_future(f);
761              
762 0 0         if(self->cancelled)
763 0           return;
764              
765 0 0         if(self->ready)
766 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
767             SVfARG(f));
768              
769 0 0         if(n == 1 &&
    0          
770 0           SvROK(svp[0]) && SvOBJECT(SvRV(svp[0])) &&
771 0           sv_derived_from(svp[0], "Future::Exception")) {
772 0           SV *exception = svp[0];
773 0           AV *failure = self->failure = newAV();
774              
775 0           dSP;
776              
777             {
778 0           ENTER;
779 0           SAVETMPS;
780              
781 0 0         EXTEND(SP, 1);
782 0 0         PUSHMARK(SP);
783 0           PUSHs(sv_mortalcopy(exception));
784 0           PUTBACK;
785              
786 0           call_method("message", G_SCALAR);
787              
788 0           SPAGAIN;
789              
790 0           av_push(failure, SvREFCNT_inc(POPs));
791              
792 0           PUTBACK;
793 0 0         FREETMPS;
794 0           LEAVE;
795             }
796              
797             {
798 0           ENTER;
799 0           SAVETMPS;
800              
801 0 0         EXTEND(SP, 1);
802 0 0         PUSHMARK(SP);
803 0           PUSHs(sv_mortalcopy(exception));
804 0           PUTBACK;
805              
806 0           call_method("category", G_SCALAR);
807              
808 0           SPAGAIN;
809              
810 0           av_push(failure, SvREFCNT_inc(POPs));
811              
812 0           PUTBACK;
813 0 0         FREETMPS;
814 0           LEAVE;
815             }
816              
817             {
818 0           ENTER;
819 0           SAVETMPS;
820              
821 0 0         EXTEND(SP, 1);
822 0 0         PUSHMARK(SP);
823 0           PUSHs(sv_mortalcopy(exception));
824 0           PUTBACK;
825              
826 0           SSize_t count = call_method("details", G_LIST);
827              
828 0           SPAGAIN;
829              
830 0           SV **retp = SP - count + 1;
831              
832 0 0         for(SSize_t i = 0; i < count; i++)
833 0           av_push(failure, SvREFCNT_inc(retp[i]));
834 0           SP -= count;
835              
836 0           PUTBACK;
837 0 0         FREETMPS;
838 0           LEAVE;
839             }
840             }
841             else {
842 0           self->failure = newAV_svn_dup(svp, n);
843             }
844              
845 0           mark_ready(self, f, "failed");
846             }
847              
848             #define future_failp(f, s) Future_failp(aTHX_ f, s)
849 0           void Future_failp(pTHX_ SV *f, const char *s)
850             {
851 0           struct FutureXS *self = get_future(f);
852              
853 0 0         if(self->cancelled)
854 0           return;
855              
856 0 0         if(self->ready)
857 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
858             SVfARG(f));
859              
860 0           self->failure = newAV();
861 0           av_push(self->failure, newSVpv(s, strlen(s)));
862 0           mark_ready(self, f, "failed");
863             }
864              
865 0           void Future_on_cancel(pTHX_ SV *f, SV *code)
866             {
867 0           struct FutureXS *self = get_future(f);
868              
869 0 0         if(self->ready)
870 0           return;
871              
872 0           bool is_future = sv_is_future(code);
873             // TODO: is_future or callable(code) or croak
874              
875 0 0         if(!self->on_cancel)
876 0           self->on_cancel = newAV();
877              
878 0           SV *rv = newSVsv((SV *)code);
879 0           av_push(self->on_cancel, rv);
880              
881 0 0         if(is_future) {
882             struct FutureXSRevocation *rev;
883 0           Newx(rev, 1, struct FutureXSRevocation);
884              
885 0           rev->precedent_f = sv_rvweaken(newSVsv(f));
886 0           rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));
887              
888 0           struct FutureXS *codeself = get_future(code);
889 0 0         if(!codeself->revoke_when_ready)
890 0           codeself->revoke_when_ready = newAV();
891              
892 0           av_push(codeself->revoke_when_ready, (SV *)rev);
893             }
894             }
895              
896 0           void Future_on_ready(pTHX_ SV *f, SV *code)
897             {
898 0           struct FutureXS *self = get_future(f);
899              
900 0           bool is_future = sv_is_future(code);
901             // TODO: is_future or callable(code) or croak
902              
903 0           int flags = CB_ALWAYS|CB_SELF;
904 0 0         if(is_future)
905 0           flags |= CB_IS_FUTURE;
906              
907 0           struct FutureXSCallback cb = {
908             .flags = flags,
909             .code = code,
910             };
911              
912 0 0         if(self->ready)
913 0           invoke_callback(self, f, &cb);
914             else {
915 0           cb.code = wrap_cb(f, "on_ready", cb.code);
916 0           push_callback(self, &cb);
917             }
918 0           }
919              
920 0           void Future_on_done(pTHX_ SV *f, SV *code)
921             {
922 0           struct FutureXS *self = get_future(f);
923              
924 0           bool is_future = sv_is_future(code);
925             // TODO: is_future or callable(code) or croak
926              
927 0           int flags = CB_DONE|CB_RESULT;
928 0 0         if(is_future)
929 0           flags |= CB_IS_FUTURE;
930              
931 0           struct FutureXSCallback cb = {
932             .flags = flags,
933             .code = code,
934             };
935              
936 0 0         if(self->ready)
937 0           invoke_callback(self, f, &cb);
938             else {
939 0           cb.code = wrap_cb(f, "on_done", cb.code);
940 0           push_callback(self, &cb);
941             }
942 0           }
943              
944 0           void Future_on_fail(pTHX_ SV *f, SV *code)
945             {
946 0           struct FutureXS *self = get_future(f);
947              
948 0           bool is_future = sv_is_future(code);
949             // TODO: is_future or callable(code) or croak
950              
951 0           int flags = CB_FAIL|CB_RESULT;
952 0 0         if(is_future)
953 0           flags |= CB_IS_FUTURE;
954              
955 0           struct FutureXSCallback cb = {
956             .flags = flags,
957             .code = code,
958             };
959              
960 0 0         if(self->ready)
961 0           invoke_callback(self, f, &cb);
962             else {
963 0           cb.code = wrap_cb(f, "on_fail", cb.code);
964 0           push_callback(self, &cb);
965             }
966 0           }
967              
968             #define future_await(f) Future_await(aTHX_ f)
969 0           static void Future_await(pTHX_ SV *f)
970             {
971 0           dSP;
972              
973 0           ENTER;
974 0           SAVETMPS;
975              
976 0 0         PUSHMARK(SP);
977 0 0         mXPUSHs(newSVsv(f));
978 0           PUTBACK;
979              
980 0           call_method("await", G_VOID);
981              
982 0 0         FREETMPS;
983 0           LEAVE;
984 0           }
985              
986 0           AV *Future_get_result_av(pTHX_ SV *f, bool await)
987             {
988 0           struct FutureXS *self = get_future(f);
989              
990 0 0         if(await && !self->ready)
    0          
991 0           future_await(f);
992              
993 0 0         if(!self->ready)
994 0           croak("%" SVf " is not yet ready", SVfARG(f));
995              
996 0 0         if(self->failure) {
997 0           self->reported = true;
998              
999 0           SV *exception = AvARRAY(self->failure)[0];
1000 0 0         if(av_count(self->failure) > 1) {
    0          
1001 0           dSP;
1002 0           ENTER;
1003 0           SAVETMPS;
1004              
1005 0 0         PUSHMARK(SP);
1006 0 0         EXTEND(SP, 1 + av_count(self->failure));
    0          
    0          
    0          
    0          
1007 0           mPUSHpvs("Future::Exception");
1008 0 0         for(SSize_t i = 0; i < av_count(self->failure); i++)
    0          
1009 0           PUSHs(sv_mortalcopy(AvARRAY(self->failure)[i]));
1010 0           PUTBACK;
1011              
1012 0           call_method("new", G_SCALAR);
1013              
1014 0           SPAGAIN;
1015              
1016 0           exception = SvREFCNT_inc(POPs);
1017              
1018 0           PUTBACK;
1019 0 0         FREETMPS;
1020 0           LEAVE;
1021             }
1022              
1023 0 0         if(SvROK(exception) || SvPV_nolen(exception)[SvCUR(exception)-1] == '\n')
    0          
    0          
1024 0           die_sv(exception);
1025             else {
1026             /* We'd like to call Carp::croak to do the @CARP_NOT logic, but it gets
1027             * confused about a missing callframe first because this is XS. We'll
1028             * reïmplement the logic here
1029             */
1030             I32 cxix;
1031 0 0         for(cxix = cxstack_ix; cxix; cxix--) {
1032 0 0         if(CxTYPE(&cxstack[cxix]) != CXt_SUB)
1033 0           continue;
1034              
1035 0           const CV *cv = cxstack[cxix].blk_sub.cv;
1036 0 0         if(!cv)
1037 0           continue;
1038              
1039 0 0         const char *stashname = HvNAME(CvSTASH(cv));
    0          
    0          
    0          
    0          
    0          
1040 0 0         if(!stashname)
1041 0           continue;
1042              
1043             // The essence of the @CARP_NOT logic
1044 0 0         if(strEQ(stashname, "Future::_base"))
1045 0           continue;
1046              
1047 0 0         const COP *cop = cxix < cxstack_ix ? cxstack[cxix+1].blk_oldcop : PL_curcop;
1048              
1049 0 0         sv_catpvf(exception, " at %s line %d.\n", CopFILE(cop), CopLINE(cop));
1050 0           break;
1051             }
1052              
1053 0           die_sv(exception);
1054             }
1055             }
1056              
1057 0 0         if(self->cancelled)
1058 0           croak("%" SVf " was cancelled",
1059             SVfARG(f));
1060              
1061 0 0         if(!self->result)
1062 0           self->result = newAV();
1063              
1064 0           return self->result;
1065             }
1066              
1067 0           AV *Future_get_failure_av(pTHX_ SV *f)
1068             {
1069 0           struct FutureXS *self = get_future(f);
1070              
1071 0 0         if(!self->ready)
1072 0           future_await(f);
1073              
1074 0 0         if(!self->failure)
1075 0           return NULL;
1076              
1077 0           return self->failure;
1078             }
1079              
1080 0           void Future_cancel(pTHX_ SV *f)
1081             {
1082             /* Specifically don't make it an error to ->cancel a future instance not
1083             * available in this thread; as it often appears in defer / DESTROY / etc
1084             */
1085 0           struct FutureXS *self = maybe_get_future(f);
1086 0 0         if(!self)
1087 0           return;
1088              
1089 0 0         if(self->ready)
1090 0           return;
1091              
1092 0           self->cancelled = true;
1093 0           AV *on_cancel = self->on_cancel;
1094              
1095 0 0         if(self->subs) {
1096 0 0         for(Size_t i = 0; i < av_count(self->subs); i++)
    0          
1097 0           future_cancel(AvARRAY(self->subs)[i]);
1098             }
1099              
1100             // TODO: maybe we need to clear these out from self before we do this, in
1101             // case of recursion?
1102              
1103 0 0         for(int i = on_cancel ? AvFILL(on_cancel) : -1; i >= 0; i--) {
    0          
    0          
1104 0           SV *code = AvARRAY(on_cancel)[i];
1105 0 0         if(!SvOK(code))
    0          
    0          
1106 0           continue;
1107              
1108 0 0         if(sv_is_future(code)) {
1109 0           dSP;
1110              
1111 0           ENTER;
1112 0           SAVETMPS;
1113              
1114 0 0         PUSHMARK(SP);
1115 0           PUSHs(code);
1116 0           PUTBACK;
1117              
1118 0           call_method("cancel", G_VOID);
1119              
1120 0 0         FREETMPS;
1121 0           LEAVE;
1122             }
1123             else {
1124 0           dSP;
1125              
1126 0           ENTER;
1127 0           SAVETMPS;
1128              
1129 0 0         PUSHMARK(SP);
1130 0           PUSHs(f);
1131 0           PUTBACK;
1132              
1133             assert(SvOK(code));
1134 0           call_sv(code, G_VOID);
1135              
1136 0 0         FREETMPS;
1137 0           LEAVE;
1138             }
1139             }
1140              
1141 0           mark_ready(self, f, "cancel");
1142             }
1143              
1144 0           SV *Future_without_cancel(pTHX_ SV *f)
1145             {
1146 0           struct FutureXSCallback cb = {
1147             .flags = CB_SEQ_READY|CB_CANCEL, /* without CB_SEQ_CANCEL */
1148             /* no code */
1149             };
1150              
1151 0           SV *ret = make_sequence(f, &cb);
1152 0           struct FutureXS *self = get_future(ret);
1153              
1154 0           self->precedent_f = newSVsv(f);
1155              
1156 0           return ret;
1157             }
1158              
1159 0           SV *Future_then(pTHX_ SV *f, U32 flags, SV *thencode, SV *elsecode)
1160             {
1161 0           struct FutureXSCallback cb = {
1162             .flags = CB_SEQ_ANY|CB_RESULT,
1163             .seq.thencode = thencode,
1164             .seq.elsecode = elsecode,
1165             };
1166 0 0         if(flags & FUTURE_THEN_WITH_F)
1167 0           cb.flags |= CB_SELF;
1168              
1169 0           return make_sequence(f, &cb);
1170             }
1171              
1172 0           SV *Future_followed_by(pTHX_ SV *f, SV *code)
1173             {
1174 0           struct FutureXSCallback cb = {
1175             .flags = CB_SEQ_ANY|CB_SELF,
1176             .seq.thencode = code,
1177 0           .seq.elsecode = SvREFCNT_inc(code),
1178             };
1179              
1180 0           return make_sequence(f, &cb);
1181             }
1182              
1183 0           SV *Future_thencatch(pTHX_ SV *f, U32 flags, SV *thencode, HV *catches, SV *elsecode)
1184             {
1185 0           struct FutureXSCallback cb = {
1186             .flags = CB_SEQ_ANY|CB_RESULT,
1187             .seq.thencode = thencode,
1188             .seq.elsecode = elsecode,
1189             .seq.catches = catches,
1190             };
1191 0 0         if(flags & FUTURE_THEN_WITH_F)
1192 0           cb.flags |= CB_SELF;
1193              
1194 0           return make_sequence(f, &cb);
1195             }
1196              
1197             #define future_new_subsv(cls, subs, n) S_future_new_subsv(aTHX_ cls, subs, n)
1198 0           static SV *S_future_new_subsv(pTHX_ const char *cls, SV **subs, size_t n)
1199             {
1200 0           HV *future_stash = get_hv("Future::", 0);
1201             assert(future_stash);
1202              
1203             /* Find the best prototype; pick the first derived instance if there is
1204             * one */
1205 0           SV *proto = NULL;
1206 0 0         for(Size_t i = 0; i < n; i++) {
1207 0 0         if(!SvROK(subs[i]) || !SvOBJECT(SvRV(subs[i])))
    0          
1208 0           croak("Expected a Future, got %" SVf, SVfARG(subs[i]));
1209              
1210 0 0         if(SvSTASH(SvRV(subs[i])) != future_stash) {
1211 0           proto = subs[i];
1212 0           break;
1213             }
1214             }
1215              
1216 0 0         SV *f = proto ? future_new_proto(proto) : future_new(cls);
1217 0           struct FutureXS *self = get_future(f);
1218              
1219 0 0         if(!self->subs)
1220 0           self->subs = newAV();
1221              
1222 0 0         for(Size_t i = 0; i < n; i++)
1223 0           av_push(self->subs, newSVsv(subs[i]));
1224              
1225 0           return f;
1226             }
1227              
1228             #define copy_result(self, src) S_copy_result(aTHX_ self, src)
1229 0           static void S_copy_result(pTHX_ struct FutureXS *self, SV *src)
1230             {
1231             /* TODO: Handle non-Future::XS instances too */
1232 0           struct FutureXS *srcself = get_future(src);
1233              
1234             assert(srcself->ready);
1235             assert(!srcself->cancelled);
1236              
1237 0 0         if(srcself->failure) {
1238 0 0         self->failure = newAV_svn_dup(AvARRAY(srcself->failure), av_count(srcself->failure));
1239             }
1240             else {
1241             assert(srcself->result);
1242 0 0         self->result = newAV_svn_dup(AvARRAY(srcself->result), av_count(srcself->result));
1243             }
1244 0           }
1245              
1246             #define cancel_pending_subs(self) S_cancel_pending_subs(aTHX_ self)
1247 0           static void S_cancel_pending_subs(pTHX_ struct FutureXS *self)
1248             {
1249 0 0         if(!self->subs)
1250 0           return;
1251              
1252 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1253 0           SV *sub = AvARRAY(self->subs)[i];
1254 0 0         if(!future_is_ready(sub))
1255 0           future_cancel(sub);
1256             }
1257             }
1258              
1259 0           XS_INTERNAL(sub_on_ready_waitall)
1260             {
1261 0           dXSARGS;
1262              
1263 0           SV *f = XSANY_sv;
1264 0 0         if(!SvOK(f))
    0          
    0          
1265 0           return;
1266              
1267             /* Make sure self doesn't disappear during this function */
1268 0           SvREFCNT_inc(SvRV(f));
1269 0           SAVEFREESV(SvRV(f));
1270              
1271 0           struct FutureXS *self = get_future(f);
1272              
1273 0           self->pending_subs--;
1274              
1275 0 0         if(self->pending_subs)
1276 0           XSRETURN(0);
1277              
1278             /* TODO: This is really just newAVav() */
1279 0 0         self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
1280 0           mark_ready(self, f, "wait_all");
1281             }
1282              
1283 0           SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
1284             {
1285 0           SV *f = future_new_subsv(cls, subs, n);
1286 0           struct FutureXS *self = get_future(f);
1287              
1288 0           self->pending_subs = 0;
1289 0 0         for(Size_t i = 0; i < n; i++) {
1290             /* TODO: This should probably use some API function to make it transparent */
1291 0 0         if(!future_is_ready(subs[i]))
1292 0           self->pending_subs++;
1293             }
1294              
1295 0 0         if(!self->pending_subs) {
1296 0           self->result = newAV_svn_dup(subs, n);
1297 0           mark_ready(self, f, "wait_all");
1298              
1299 0           return f;
1300             }
1301              
1302 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
1303 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1304 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1305              
1306 0           GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
1307 0           CvGV_set(sub_on_ready, gv);
1308 0           CvANON_off(sub_on_ready);
1309              
1310 0 0         for(Size_t i = 0; i < n; i++) {
1311 0 0         if(!future_is_ready(subs[i]))
1312 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1313             }
1314              
1315 0           SvREFCNT_dec(sub_on_ready);
1316              
1317 0           return f;
1318             }
1319              
1320 0           XS_INTERNAL(sub_on_ready_waitany)
1321             {
1322 0           dXSARGS;
1323 0           SV *thissub = ST(0);
1324              
1325 0           SV *f = XSANY_sv;
1326 0 0         if(!SvOK(f))
    0          
    0          
1327 0           return;
1328              
1329             /* Make sure self doesn't disappear during this function */
1330 0           SvREFCNT_inc(SvRV(f));
1331 0           SAVEFREESV(SvRV(f));
1332              
1333 0           struct FutureXS *self = get_future(f);
1334              
1335 0 0         if(self->result || self->failure)
    0          
1336 0           return;
1337              
1338 0           self->pending_subs--;
1339              
1340 0           bool this_cancelled = future_is_cancelled(thissub);
1341              
1342 0 0         if(self->pending_subs && this_cancelled)
    0          
1343 0           return;
1344              
1345 0 0         if(this_cancelled) {
1346 0           future_failp(f, "All component futures were cancelled");
1347 0           return;
1348             }
1349             else
1350 0           copy_result(self, thissub);
1351              
1352 0           cancel_pending_subs(self);
1353              
1354 0           mark_ready(self, f, "wait_any");
1355             }
1356              
1357 0           SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
1358             {
1359 0           SV *f = future_new_subsv(cls, subs, n);
1360 0           struct FutureXS *self = get_future(f);
1361              
1362 0 0         if(!n) {
1363 0           future_failp(f, "Cannot ->wait_any with no subfutures");
1364 0           return f;
1365             }
1366              
1367 0           SV *immediate_ready = NULL;
1368 0 0         for(Size_t i = 0; i < n; i++) {
1369             /* TODO: This should probably use some API function to make it transparent */
1370 0 0         if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
    0          
1371 0           immediate_ready = subs[i];
1372 0           break;
1373             }
1374             }
1375              
1376 0 0         if(immediate_ready) {
1377 0           copy_result(self, immediate_ready);
1378              
1379 0           cancel_pending_subs(self);
1380              
1381 0           mark_ready(self, f, "wait_any");
1382              
1383 0           return f;
1384             }
1385              
1386 0           self->pending_subs = 0;
1387              
1388 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
1389 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1390 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1391              
1392 0           GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
1393 0           CvGV_set(sub_on_ready, gv);
1394 0           CvANON_off(sub_on_ready);
1395              
1396 0 0         for(Size_t i = 0; i < n; i++) {
1397 0 0         if(future_is_cancelled(subs[i]))
1398 0           continue;
1399              
1400 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1401 0           self->pending_subs++;
1402             }
1403              
1404 0           SvREFCNT_dec(sub_on_ready);
1405              
1406 0           return f;
1407             }
1408              
1409             #define compose_needsall_result(self) S_compose_needsall_result(aTHX_ self)
1410 0           static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
1411             {
1412 0           AV *result = self->result = newAV();
1413 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1414 0           SV *sub = AvARRAY(self->subs)[i];
1415 0           struct FutureXS *subself = get_future(sub);
1416             assert(subself->result);
1417 0 0         av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
1418             }
1419 0           }
1420              
1421 0           XS_INTERNAL(sub_on_ready_needsall)
1422             {
1423 0           dXSARGS;
1424 0           SV *thissub = ST(0);
1425              
1426 0           SV *f = XSANY_sv;
1427 0 0         if(!SvOK(f))
    0          
    0          
1428 0           return;
1429              
1430             /* Make sure self doesn't disappear during this function */
1431 0           SvREFCNT_inc(SvRV(f));
1432 0           SAVEFREESV(SvRV(f));
1433              
1434 0           struct FutureXS *self = get_future(f);
1435              
1436 0 0         if(self->result || self->failure)
    0          
1437 0           return;
1438              
1439 0 0         if(future_is_cancelled(thissub)) {
1440 0           future_failp(f, "A component future was cancelled");
1441 0           cancel_pending_subs(self);
1442 0           return;
1443             }
1444 0 0         else if(future_is_failed(thissub)) {
1445 0           copy_result(self, thissub);
1446 0           cancel_pending_subs(self);
1447 0           mark_ready(self, f, "needs_all");
1448             }
1449             else {
1450 0           self->pending_subs--;
1451 0 0         if(self->pending_subs)
1452 0           return;
1453 0           compose_needsall_result(self);
1454 0           mark_ready(self, f, "needs_all");
1455             }
1456             }
1457              
1458 0           SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
1459             {
1460 0           SV *f = future_new_subsv(cls, subs, n);
1461 0           struct FutureXS *self = get_future(f);
1462              
1463 0 0         if(!n) {
1464 0           future_donev(f, NULL, 0);
1465 0           return f;
1466             }
1467              
1468 0           SV *immediate_fail = NULL;
1469 0 0         for(Size_t i = 0; i < n; i++) {
1470 0 0         if(future_is_cancelled(subs[i])) {
1471 0           future_failp(f, "A component future was cancelled");
1472 0           cancel_pending_subs(self);
1473 0           return f;
1474             }
1475 0 0         if(future_is_failed(subs[i])) {
1476 0           immediate_fail = subs[i];
1477 0           break;
1478             }
1479             }
1480              
1481 0 0         if(immediate_fail) {
1482 0           copy_result(self, immediate_fail);
1483 0           cancel_pending_subs(self);
1484 0           mark_ready(self, f, "needs_all");
1485 0           return f;
1486             }
1487              
1488 0           self->pending_subs = 0;
1489              
1490 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
1491 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1492 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1493              
1494 0           GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
1495 0           CvGV_set(sub_on_ready, gv);
1496 0           CvANON_off(sub_on_ready);
1497              
1498 0 0         for(Size_t i = 0; i < n; i++) {
1499 0 0         if(future_is_ready(subs[i]))
1500 0           continue;
1501              
1502 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1503 0           self->pending_subs++;
1504             }
1505              
1506 0 0         if(!self->pending_subs) {
1507 0           compose_needsall_result(self);
1508 0           mark_ready(self, f, "needs_all");
1509             }
1510              
1511 0           SvREFCNT_dec(sub_on_ready);
1512              
1513 0           return f;
1514             }
1515              
1516 0           XS_INTERNAL(sub_on_ready_needsany)
1517             {
1518 0           dXSARGS;
1519 0           SV *thissub = ST(0);
1520              
1521 0           SV *f = XSANY_sv;
1522 0 0         if(!SvOK(f))
    0          
    0          
1523 0           return;
1524              
1525             /* Make sure self doesn't disappear during this function */
1526 0           SvREFCNT_inc(SvRV(f));
1527 0           SAVEFREESV(SvRV(f));
1528              
1529 0           struct FutureXS *self = get_future(f);
1530              
1531 0 0         if(self->result || self->failure)
    0          
1532 0           return;
1533              
1534 0           self->pending_subs--;
1535              
1536 0           bool this_cancelled = future_is_cancelled(thissub);
1537              
1538 0 0         if(self->pending_subs && this_cancelled)
    0          
1539 0           return;
1540              
1541 0 0         if(this_cancelled) {
1542 0           future_failp(f, "All component futures were cancelled");
1543             }
1544 0 0         else if(future_is_failed(thissub)) {
1545 0 0         if(self->pending_subs)
1546 0           return;
1547              
1548 0           copy_result(self, thissub);
1549 0           mark_ready(self, f, "needs_any");
1550             }
1551             else {
1552 0           copy_result(self, thissub);
1553 0           cancel_pending_subs(self);
1554 0           mark_ready(self, f, "needs_any");
1555             }
1556             }
1557              
1558 0           SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
1559             {
1560 0           SV *f = future_new_subsv(cls, subs, n);
1561 0           struct FutureXS *self = get_future(f);
1562              
1563 0 0         if(!n) {
1564 0           future_failp(f, "Cannot ->needs_any with no subfutures");
1565 0           return f;
1566             }
1567              
1568 0           SV *immediate_done = NULL;
1569 0 0         for(Size_t i = 0; i < n; i++) {
1570 0 0         if(future_is_done(subs[i])) {
1571 0           immediate_done = subs[i];
1572 0           break;
1573             }
1574             }
1575              
1576 0 0         if(immediate_done) {
1577 0           copy_result(self, immediate_done);
1578 0           cancel_pending_subs(self);
1579 0           mark_ready(self, f, "needs_any");
1580 0           return f;
1581             }
1582              
1583 0           self->pending_subs = 0;
1584              
1585 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
1586 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1587 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1588              
1589 0           GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
1590 0           CvGV_set(sub_on_ready, gv);
1591 0           CvANON_off(sub_on_ready);
1592              
1593 0 0         for(Size_t i = 0; i < n; i++) {
1594 0 0         if(future_is_ready(subs[i]))
1595 0           continue;
1596              
1597 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1598 0           self->pending_subs++;
1599             }
1600              
1601 0 0         if(!self->pending_subs) {
1602 0           copy_result(self, subs[n-1]);
1603 0           mark_ready(self, f, "needs_any");
1604             }
1605              
1606 0           SvREFCNT_dec(sub_on_ready);
1607              
1608 0           return f;
1609             }
1610              
1611 0           Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
1612             {
1613 0           dSP;
1614              
1615 0           struct FutureXS *self = get_future(f);
1616              
1617 0           Size_t ret = 0;
1618 0 0         for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
    0          
    0          
1619 0           SV *sub = AvARRAY(self->subs)[i];
1620              
1621             bool want;
1622 0           switch(filter) {
1623             case FUTURE_SUBS_PENDING:
1624 0           want = !future_is_ready(sub);
1625 0           break;
1626              
1627             case FUTURE_SUBS_READY:
1628 0           want = future_is_ready(sub);
1629 0           break;
1630              
1631             case FUTURE_SUBS_DONE:
1632 0           want = future_is_done(sub);
1633 0           break;
1634              
1635             case FUTURE_SUBS_FAILED:
1636 0           want = future_is_failed(sub);
1637 0           break;
1638              
1639             case FUTURE_SUBS_CANCELLED:
1640 0           want = future_is_cancelled(sub);
1641 0           break;
1642             }
1643              
1644 0 0         if(want) {
1645 0 0         XPUSHs(sv_mortalcopy(sub));
1646 0           ret++;
1647             }
1648             }
1649              
1650 0           PUTBACK;
1651 0           return ret;
1652             }
1653              
1654 0           struct timeval Future_get_btime(pTHX_ SV *f)
1655             {
1656 0           struct FutureXS *self = get_future(f);
1657 0           return self->btime;
1658             }
1659              
1660 0           struct timeval Future_get_rtime(pTHX_ SV *f)
1661             {
1662 0           struct FutureXS *self = get_future(f);
1663 0           return self->rtime;
1664             }
1665              
1666 0           void Future_set_label(pTHX_ SV *f, SV *label)
1667             {
1668 0           struct FutureXS *self = get_future(f);
1669              
1670 0 0         if(self->label)
1671 0           SvREFCNT_dec(label);
1672              
1673 0           self->label = newSVsv(label);
1674 0           }
1675              
1676 0           SV *Future_get_label(pTHX_ SV *f)
1677             {
1678 0           struct FutureXS *self = get_future(f);
1679              
1680 0           return self->label;
1681             }
1682              
1683 0           void Future_set_udata(pTHX_ SV *f, SV *key, SV *value)
1684             {
1685 0           struct FutureXS *self = get_future(f);
1686              
1687 0 0         if(!self->udata)
1688 0           self->udata = newHV();
1689              
1690 0           hv_store_ent(self->udata, key, newSVsv(value), 0);
1691 0           }
1692              
1693 0           SV *Future_get_udata(pTHX_ SV *f, SV *key)
1694             {
1695 0           struct FutureXS *self = get_future(f);
1696              
1697 0 0         if(!self->udata)
1698 0           return &PL_sv_undef;
1699              
1700 0           HE *he = hv_fetch_ent(self->udata, key, 0, 0);
1701 0 0         return he ? HeVAL(he) : &PL_sv_undef;
1702             }
1703              
1704             /* DMD_HELPER assistants */
1705              
1706             #ifdef HAVE_DMD_HELPER
1707             static int dumpstruct_callback(pTHX_ DMDContext *ctx, struct FutureXSCallback *cb)
1708             {
1709             if(!(cb->flags & CB_SEQ_ANY))
1710             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback", cb, sizeof(struct FutureXSCallback),
1711             /* Some cheating here, to claim the "code" is either a CV or a Future,
1712             * depending on the CB_IS_FUTURE flag */
1713             3, ((const DMDNamedField []){
1714             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1715             {"the code CV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? NULL : cb->code},
1716             {"the Future SV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? cb->code : NULL },
1717             })
1718             );
1719             else
1720             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback(CB_SEQ)", cb, sizeof(struct FutureXSCallback),
1721             4, ((const DMDNamedField []){
1722             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1723             {"the then code CV", DMD_FIELD_PTR, .ptr = cb->seq.thencode},
1724             {"the else code CV", DMD_FIELD_PTR, .ptr = cb->seq.elsecode},
1725             {"the sequence future SV", DMD_FIELD_PTR, .ptr = cb->seq.f},
1726             })
1727             );
1728              
1729             return 0;
1730             }
1731              
1732             static int dumpstruct_revocation(pTHX_ DMDContext *ctx, struct FutureXSRevocation *rev)
1733             {
1734             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSRevocation", rev, sizeof(struct FutureXSRevocation),
1735             2, ((const DMDNamedField []){
1736             {"the precedent future SV", DMD_FIELD_PTR, .ptr = rev->precedent_f},
1737             {"the SV to clear RV", DMD_FIELD_PTR, .ptr = rev->toclear_sv_at},
1738             })
1739             );
1740              
1741             return 0;
1742             }
1743              
1744             static int dumpstruct(pTHX_ DMDContext *ctx, const SV *sv)
1745             {
1746             int ret = 0;
1747              
1748             // TODO: Add some safety checking
1749             struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV((SV *)sv));
1750              
1751             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXS", self, sizeof(struct FutureXS),
1752             12, ((const DMDNamedField []){
1753             {"ready", DMD_FIELD_BOOL, .b = self->ready},
1754             {"cancelled", DMD_FIELD_BOOL, .b = self->cancelled},
1755             {"the label SV", DMD_FIELD_PTR, .ptr = self->label},
1756             {"the result AV", DMD_FIELD_PTR, .ptr = self->result},
1757             {"the failure AV", DMD_FIELD_PTR, .ptr = self->failure},
1758             {"the callbacks AV", DMD_FIELD_PTR, .ptr = self->callbacks},
1759             {"the on_cancel AV", DMD_FIELD_PTR, .ptr = self->on_cancel},
1760             {"the revoke_when_ready AV", DMD_FIELD_PTR, .ptr = self->revoke_when_ready},
1761             {"the udata HV", DMD_FIELD_PTR, .ptr = self->udata},
1762             {"the constructed-at SV", DMD_FIELD_PTR, .ptr = self->constructed_at},
1763             {"the subs AV", DMD_FIELD_PTR, .ptr = self->subs},
1764             {"the pending sub count", DMD_FIELD_UINT, .n = self->pending_subs},
1765             })
1766             );
1767              
1768             for(size_t i = 0; self->callbacks && i < av_count(self->callbacks); i++) {
1769             struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[i];
1770             ret += dumpstruct_callback(aTHX_ ctx, cb);
1771             }
1772              
1773             for(size_t i = 0; self->revoke_when_ready && i < av_count(self->revoke_when_ready); i++) {
1774             struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(self->revoke_when_ready)[i];
1775             ret += dumpstruct_revocation(aTHX_ ctx, rev);
1776             }
1777              
1778             ret += DMD_ANNOTATE_SV(sv, (SV *)self, "the FutureXS structure");
1779              
1780             return ret;
1781             }
1782             #endif
1783              
1784             #define getenv_bool(key) S_getenv_bool(aTHX_ key)
1785 6           static bool S_getenv_bool(pTHX_ const char *key)
1786             {
1787 6           const char *val = getenv(key);
1788 6 50         if(!val || !val[0])
    0          
1789 6           return false;
1790 0 0         if(val[0] == '0' && strlen(val) == 1)
    0          
1791 0           return false;
1792 0           return true;
1793             }
1794              
1795             #ifndef newSVbool
1796             # define newSVbool(b) newSVsv(b ? &PL_sv_yes : &PL_sv_no)
1797             #endif
1798              
1799 3           void Future_reread_environment(pTHX)
1800             {
1801 3           future_debug = getenv_bool("PERL_FUTURE_DEBUG");
1802              
1803 3 50         capture_times = future_debug || getenv_bool("PERL_FUTURE_TIMES");
    50          
1804 3 50         sv_setsv(get_sv("Future::TIMES", GV_ADDMULTI), capture_times ? &PL_sv_yes : &PL_sv_no);
1805 3           }
1806              
1807 3           void Future_boot(pTHX)
1808             {
1809             #ifdef HAVE_DMD_HELPER
1810             DMD_SET_PACKAGE_HELPER("Future::XS", dumpstruct);
1811             #endif
1812              
1813 3           Future_reread_environment(aTHX);
1814              
1815             // We can only do this once
1816 3 50         newCONSTSUB(gv_stashpvn("Future::XS", 10, TRUE), "DEBUG", newSVbool(future_debug));
1817 3           }