File Coverage

src/future.c
Criterion Covered Total %
statement 13 877 1.4
branch 5 734 0.6
condition n/a
subroutine n/a
pod n/a
total 18 1611 1.1


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2              
3             #include "EXTERN.h"
4             #include "perl.h"
5             #include "XSUB.h"
6              
7             #include "future.h"
8              
9             #include "perl-backcompat.c.inc"
10              
11             #include "av-utils.c.inc"
12             #include "cv_set_anysv_refcounted.c.inc"
13              
14             #if !HAVE_PERL_VERSION(5, 16, 0)
15             # define false FALSE
16             # define true TRUE
17             #endif
18              
19             #ifdef HAVE_DMD_HELPER
20             # define WANT_DMD_API_044
21             # include "DMD_helper.h"
22             #endif
23              
24             #if !HAVE_PERL_VERSION(5, 16, 0)
25             # define XS_INTERNAL(name) static XS(name)
26             #endif
27              
28             #define mPUSHpvs(s) mPUSHp("" s "", sizeof(s)-1)
29              
30             static bool future_debug;
31             static bool capture_times;
32              
33             /* There's no reason these have to match those in Future.pm but for now we
34             * might as well just copy the same values
35             */
36             enum {
37             CB_DONE = (1<<0),
38             CB_FAIL = (1<<1),
39             CB_CANCEL = (1<<2),
40             CB_ALWAYS = CB_DONE|CB_FAIL|CB_CANCEL,
41              
42             CB_SELF = (1<<3),
43             CB_RESULT = (1<<4),
44              
45             CB_SEQ_READY = (1<<5),
46             CB_SEQ_CANCEL = (1<<6),
47             CB_SEQ_ANY = CB_SEQ_READY|CB_SEQ_CANCEL,
48              
49             CB_SEQ_IMDONE = (1<<7),
50             CB_SEQ_IMFAIL = (1<<8),
51              
52             CB_SEQ_STRICT = (1<<9),
53              
54             CB_IS_FUTURE = (1<<10),
55             };
56              
57             // TODO: Consider using different struct types to save memory? Or maybe it's
58             // so small a difference it doesn't matter
59             struct FutureXSCallback
60             {
61             unsigned int flags;
62             union {
63             SV *code; /* if !(flags & CB_SEQ_ANY) */
64             struct { /* if (flags & CB_SEQ_ANY) */
65             SV *thencode;
66             SV *elsecode;
67             HV *catches;
68             SV *f;
69             } seq;
70             };
71             };
72              
73             struct FutureXSRevocation
74             {
75             SV *precedent_f;
76             SV *toclear_sv_at;
77             };
78              
79             #define CB_NONSEQ_CODE(cb) \
80             ({ if((cb)->flags & CB_SEQ_ANY) croak("ARGH: CB_NONSEQ_CODE on SEQ"); (cb)->code;})
81              
82             struct FutureXS
83             {
84             unsigned int ready : 1;
85             unsigned int cancelled : 1;
86             unsigned int reported : 1;
87             SV *label;
88             AV *result; // implies done
89             AV *failure; // implies fail
90             AV *callbacks; // values are struct FutureXSCallback ptrs directly. TODO: custom ptr/fill/max
91             AV *on_cancel; // values are CVs directly
92             AV *revoke_when_ready; // values are struct FutureXSRevocation ptrs directly.
93             int empty_revocation_slots;
94              
95             HV *udata;
96              
97             struct timeval btime, rtime;
98             SV *constructed_at;
99              
100             /* For convergents
101             * TODO: consider making this an optional extra part of the body, only
102             * allocated when required
103             */
104             AV *subs;
105             Size_t pending_subs;
106              
107             /* For without_cancel, purely to keep a strongref */
108             SV *precedent_f;
109             };
110              
111 0           bool Future_sv_is_future(pTHX_ SV *sv)
112             {
113 0 0         if(!SvROK(sv) || !SvOBJECT(SvRV(sv)))
    0          
114 0           return false;
115              
116 0 0         if(sv_derived_from(sv, "Future") || sv_derived_from(sv, "Future::XS"))
    0          
117 0           return true;
118              
119 0           return false;
120             }
121              
122             #define get_future(sv) S_get_future(aTHX_ sv)
123 0           static struct FutureXS *S_get_future(pTHX_ SV *sv)
124             {
125             assert(sv);
126             assert(SvROK(sv) && SvOBJECT(SvRV(sv)));
127             // TODO: Add some safety checking about class
128 0 0         struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV(SvRV(sv)));
129             assert(self);
130 0           return self;
131             }
132              
133 0           SV *Future_new(pTHX_ const char *cls)
134             {
135 0 0         if(!cls)
136 0           cls = "Future::XS";
137              
138             struct FutureXS *self;
139 0           Newx(self, 1, struct FutureXS);
140              
141 0           self->ready = false;
142 0           self->cancelled = false;
143 0           self->reported = false;
144              
145 0           self->label = NULL;
146              
147 0 0         if(capture_times)
148 0           gettimeofday(&self->btime, NULL);
149             else
150 0           self->btime = (struct timeval){ 0 };
151              
152 0           self->rtime = (struct timeval){ 0 };
153              
154 0 0         if(future_debug)
155 0 0         self->constructed_at = newSVpvf("constructed at %s line %d", CopFILE(PL_curcop), CopLINE(PL_curcop));
156             else
157 0           self->constructed_at = NULL;
158              
159 0           self->result = NULL;
160 0           self->failure = NULL;
161              
162 0           self->callbacks = NULL;
163 0           self->on_cancel = NULL;
164 0           self->revoke_when_ready = NULL;
165 0           self->empty_revocation_slots = 0;
166              
167 0           self->udata = NULL;
168              
169 0           self->subs = NULL;
170              
171 0           self->precedent_f = NULL;
172              
173 0           SV *ret = newSV(0);
174 0           sv_setref_pv(ret, cls, self);
175              
176 0           return ret;
177             }
178              
179             #define future_new_proto(f1) Future_new_proto(aTHX_ f1)
180 0           SV *Future_new_proto(pTHX_ SV *f1)
181             {
182             assert(f1 && SvROK(f1) && SvRV(f1));
183             // TODO Shortcircuit in the common case that f1 is a Future instance
184             // return future_new(HvNAME(SvSTASH(SvRV(f1))));
185              
186 0           dSP;
187 0           ENTER;
188 0           SAVETMPS;
189              
190 0 0         EXTEND(SP, 1);
191 0 0         PUSHMARK(SP);
192 0           PUSHs(sv_mortalcopy(f1));
193 0           PUTBACK;
194              
195 0           call_method("new", G_SCALAR);
196              
197 0           SPAGAIN;
198              
199 0           SV *ret = SvREFCNT_inc(POPs);
200              
201 0           PUTBACK;
202 0 0         FREETMPS;
203 0           LEAVE;
204              
205 0           return ret;
206             }
207              
208             #define destroy_callbacks(self) S_destroy_callbacks(aTHX_ self)
209 0           static void S_destroy_callbacks(pTHX_ struct FutureXS *self)
210             {
211 0           AV *callbacksav = self->callbacks;
212 0 0         while(callbacksav && AvFILLp(callbacksav) > -1) {
    0          
213 0           struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[AvFILLp(callbacksav)--];
214              
215 0           int flags = cb->flags;
216 0 0         if(flags & CB_SEQ_ANY) {
217 0           SvREFCNT_dec(cb->seq.thencode);
218 0           SvREFCNT_dec(cb->seq.elsecode);
219 0           SvREFCNT_dec(cb->seq.catches);
220 0           SvREFCNT_dec(cb->seq.f);
221             }
222             else {
223 0 0         SvREFCNT_dec(CB_NONSEQ_CODE(cb));
224             }
225              
226 0           Safefree(cb);
227             }
228 0           }
229              
230 0           void Future_destroy(pTHX_ SV *f)
231             {
232             #ifdef DEBUGGING
233             // Every pointer in this function ought to have been uniquely held
234             # define UNREF(p) \
235             do { \
236             if(p) assert(SvREFCNT(p) == 1); \
237             SvREFCNT_dec((SV *)p); \
238             (p) = (void *)0xAA55AA55; \
239             } while(0)
240             #else
241             # define UNREF(p) SvREFCNT_dec((SV *)p)
242             #endif
243              
244 0           struct FutureXS *self = get_future(f);
245              
246 0 0         if(future_debug &&
    0          
247 0 0         (!self->ready || (self->failure && !self->reported))) {
    0          
248 0 0         if(!self->ready)
249 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d before it was ready\n",
250 0           SVfARG(f), SVfARG(self->constructed_at),
251 0           CopFILE(PL_curcop), CopLINE(PL_curcop));
252             else {
253 0           SV *failure = AvARRAY(self->failure)[0];
254 0 0         warn("%" SVf " was %" SVf " and was lost near %s line %d with an unreported failure of: %" SVf "\n",
255 0           SVfARG(f), SVfARG(self->constructed_at),
256 0           CopFILE(PL_curcop), CopLINE(PL_curcop),
257             SVfARG(failure));
258             }
259             }
260              
261 0           UNREF(self->label);
262              
263 0           UNREF(self->result);
264              
265 0           UNREF(self->failure);
266              
267 0           destroy_callbacks(self);
268 0           UNREF(self->callbacks);
269              
270 0           UNREF(self->on_cancel);
271              
272 0           AV *revocationsav = self->revoke_when_ready;
273 0 0         while(revocationsav && AvFILLp(revocationsav) > -1) {
    0          
274 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocationsav)[AvFILLp(revocationsav)--];
275 0           UNREF(rev->precedent_f);
276 0           UNREF(rev->toclear_sv_at);
277 0           Safefree(rev);
278             }
279 0           UNREF(self->revoke_when_ready);
280              
281 0           UNREF(self->udata);
282              
283 0           UNREF(self->constructed_at);
284              
285 0           UNREF(self->subs);
286              
287 0           UNREF(self->precedent_f);
288              
289 0           Safefree(self);
290              
291             #undef UNREF
292 0           }
293              
294 0           bool Future_is_ready(pTHX_ SV *f)
295             {
296 0           struct FutureXS *self = get_future(f);
297 0           return self->ready;
298             }
299              
300 0           bool Future_is_done(pTHX_ SV *f)
301             {
302 0           struct FutureXS *self = get_future(f);
303 0 0         return self->ready && !self->failure && !self->cancelled;
    0          
    0          
304             }
305              
306 0           bool Future_is_failed(pTHX_ SV *f)
307             {
308 0           struct FutureXS *self = get_future(f);
309 0 0         return self->ready && self->failure;
    0          
310             }
311              
312 0           bool Future_is_cancelled(pTHX_ SV *f)
313             {
314 0           struct FutureXS *self = get_future(f);
315 0           return self->cancelled;
316             }
317              
318             #define clear_on_cancel(self) S_clear_on_cancel(aTHX_ self)
319 0           static void S_clear_on_cancel(pTHX_ struct FutureXS *self)
320             {
321 0 0         if(!self->on_cancel)
322 0           return;
323              
324 0           AV *on_cancel = self->on_cancel;
325 0           self->on_cancel = NULL;
326              
327 0           SvREFCNT_dec(on_cancel);
328             }
329              
330             #define push_callback(self, cb) S_push_callback(aTHX_ self, cb)
331 0           static void S_push_callback(pTHX_ struct FutureXS *self, struct FutureXSCallback *cb)
332             {
333             struct FutureXSCallback *new;
334 0           Newx(new, 1, struct FutureXSCallback);
335              
336 0           new->flags = cb->flags;
337 0 0         if(cb->flags & CB_SEQ_ANY) {
338 0           new->seq.thencode = cb->seq.thencode;
339 0           new->seq.elsecode = cb->seq.elsecode;
340 0           new->seq.catches = cb->seq.catches;
341 0           new->seq.f = cb->seq.f;
342             }
343             else {
344 0 0         new->code = CB_NONSEQ_CODE(cb);
345             }
346              
347 0 0         if(!self->callbacks)
348 0           self->callbacks = newAV();
349              
350 0           av_push(self->callbacks, (SV *)new);
351 0           }
352              
353             #define wrap_cb(f, name, cv) S_wrap_cb(aTHX_ f, name, cv)
354 0           static SV *S_wrap_cb(pTHX_ SV *f, const char *name, SV *cv)
355             {
356             // TODO: This is quite the speed bump having to do this, in the common case
357             // that it isn't overridden
358 0           dSP;
359 0           ENTER;
360 0           SAVETMPS;
361              
362 0 0         EXTEND(SP, 3);
363 0 0         PUSHMARK(SP);
364 0           PUSHs(sv_mortalcopy(f));
365 0           mPUSHp(name, strlen(name));
366 0           PUSHs(sv_mortalcopy(cv));
367 0           PUTBACK;
368              
369 0           call_method("wrap_cb", G_SCALAR);
370              
371 0           SPAGAIN;
372 0           SV *ret = newSVsv(POPs);
373              
374 0           PUTBACK;
375 0 0         FREETMPS;
376 0           LEAVE;
377              
378 0           return ret;
379             }
380              
381             #define invoke_seq_callback(self, selfsv, cb) S_invoke_seq_callback(aTHX_ self, selfsv, cb)
382 0           static SV *S_invoke_seq_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
383             {
384 0           int flags = cb->flags;
385              
386 0           bool is_fail = cBOOL(self->failure);
387 0 0         bool is_done = !self->cancelled && !is_fail;
    0          
388              
389 0 0         AV *result = (is_done) ? self->result :
390 0 0         (is_fail) ? self->failure :
391             NULL;
392              
393 0 0         SV *code = (is_done) ? cb->seq.thencode :
394 0 0         (is_fail) ? cb->seq.elsecode :
395             NULL;
396              
397 0 0         if(is_fail && result && av_count(result) > 1 && cb->seq.catches) {
    0          
    0          
    0          
    0          
398 0           SV *category = AvARRAY(result)[1];
399 0 0         if(SvOK(category)) {
    0          
    0          
400 0           HE *he = hv_fetch_ent(cb->seq.catches, category, 0, 0);
401 0 0         if(he && HeVAL(he))
    0          
402 0           code = HeVAL(he);
403             }
404             }
405              
406 0 0         if(!code || !SvOK(code))
    0          
    0          
    0          
407 0           return newSVsv(selfsv);
408              
409 0           dSP;
410              
411 0           ENTER;
412 0           SAVETMPS;
413              
414 0 0         PUSHMARK(SP);
415 0 0         if(flags & CB_SELF)
416 0 0         XPUSHs(selfsv);
417 0 0         if(flags & CB_RESULT)
418 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
419 0           PUTBACK;
420              
421             assert(SvOK(code));
422 0           call_sv(code, G_SCALAR|G_EVAL);
423              
424 0           SPAGAIN;
425              
426 0 0         if(SvROK(ERRSV) || SvTRUE(ERRSV)) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
427 0           POPs;
428              
429 0           SV *fseq = cb->seq.f;
430              
431 0 0         if(!fseq)
432 0           fseq = future_new_proto(selfsv);
433              
434 0 0         future_failv(fseq, &ERRSV, 1);
435              
436 0 0         FREETMPS;
437 0           LEAVE;
438              
439 0           return fseq;
440             }
441              
442 0           SV *f2 = POPs;
443 0           SvREFCNT_inc(f2);
444              
445 0           PUTBACK;
446 0 0         FREETMPS;
447 0           LEAVE;
448              
449 0 0         if(!sv_is_future(f2)) {
450 0           SV *result = f2;
451              
452             // TODO: strictness check
453              
454 0           f2 = future_new_proto(selfsv);
455 0           future_donev(f2, &result, 1);
456             }
457              
458 0           return f2;
459             }
460              
461             #define invoke_callback(self, selfsv, cb) S_invoke_callback(aTHX_ self, selfsv, cb)
462 0           static void S_invoke_callback(pTHX_ struct FutureXS *self, SV *selfsv, struct FutureXSCallback *cb)
463             {
464 0           int flags = cb->flags;
465              
466 0           bool is_cancelled = self->cancelled;
467 0           bool is_fail = cBOOL(self->failure);
468 0 0         bool is_done = !is_cancelled && !is_fail;
    0          
469              
470 0 0         AV *result = (is_done) ? self->result :
471 0 0         (is_fail) ? self->failure :
472             NULL;
473              
474 0 0         if(is_done && !(flags & CB_DONE))
    0          
475 0           return;
476 0 0         if(is_fail && !(flags & CB_FAIL))
    0          
477 0           return;
478 0 0         if(is_cancelled && !(flags & CB_CANCEL))
    0          
479 0           return;
480              
481 0 0         if(flags & CB_IS_FUTURE) {
482 0           dSP;
483              
484 0           ENTER;
485 0           SAVETMPS;
486              
487 0 0         PUSHMARK(SP);
488 0 0         XPUSHs(CB_NONSEQ_CODE(cb)); // really a Future RV
    0          
489 0 0         if(result)
490 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
491              
492 0           PUTBACK;
493 0 0         if(is_done)
494 0           call_method("done", G_VOID);
495 0 0         else if(is_fail)
496 0           call_method("fail", G_VOID);
497             else
498 0           call_method("cancel", G_VOID);
499              
500 0 0         FREETMPS;
501 0           LEAVE;
502             }
503 0 0         else if(flags & CB_SEQ_ANY) {
504 0           SV *fseq = cb->seq.f;
505              
506 0 0         if(!SvOK(fseq)) {
    0          
    0          
507 0 0         if(self->constructed_at)
508 0           warn("%" SVf " (%" SVf ") lost a sequence Future",
509 0           SVfARG(selfsv), SVfARG(self->constructed_at));
510             else
511 0           warn("%" SVf " lost a sequence Future",
512             SVfARG(selfsv));
513 0           return;
514             }
515              
516 0           SV *f2 = invoke_seq_callback(self, selfsv, cb);
517 0 0         if(f2 == fseq)
518             /* immediate fail */
519 0           return;
520              
521 0           future_on_cancel(fseq, f2);
522              
523 0 0         if(future_is_ready(f2)) {
524 0 0         if(!future_is_cancelled(f2))
525 0           future_on_ready(f2, fseq);
526 0 0         else if(flags & CB_CANCEL)
527 0           future_cancel(fseq);
528              
529 0           SvREFCNT_dec(f2);
530             }
531             else {
532 0           struct FutureXS *f2self = get_future(f2);
533 0           struct FutureXSCallback cb2 = {
534             .flags = CB_DONE|CB_FAIL|CB_IS_FUTURE,
535 0           .code = sv_rvweaken(newSVsv(fseq)),
536             };
537 0           push_callback(f2self, &cb2);
538             }
539             }
540             else {
541 0 0         SV *code = CB_NONSEQ_CODE(cb);
542              
543 0           dSP;
544              
545 0           ENTER;
546 0           SAVETMPS;
547              
548 0 0         PUSHMARK(SP);
549 0 0         if(flags & CB_SELF)
550 0 0         XPUSHs(selfsv);
551 0 0         if((flags & CB_RESULT) && result)
    0          
552 0 0         XPUSHs_from_AV(result);
    0          
    0          
    0          
    0          
    0          
    0          
553              
554 0           PUTBACK;
555             assert(SvOK(code));
556 0           call_sv(code, G_VOID);
557              
558 0 0         FREETMPS;
559 0           LEAVE;
560             }
561             }
562              
563             #define revoke_on_cancel(rev) S_revoke_on_cancel(aTHX_ rev)
564 0           static void S_revoke_on_cancel(pTHX_ struct FutureXSRevocation *rev)
565             {
566 0 0         if(rev->toclear_sv_at && SvROK(rev->toclear_sv_at)) {
    0          
567             assert(SvTYPE(rev->toclear_sv_at) <= SVt_PVMG);
568             assert(SvROK(rev->toclear_sv_at));
569 0           sv_set_undef(SvRV(rev->toclear_sv_at));
570 0           SvREFCNT_dec(rev->toclear_sv_at);
571 0           rev->toclear_sv_at = NULL;
572             }
573              
574 0 0         if(!SvOK(rev->precedent_f))
    0          
    0          
575 0           return;
576              
577 0           struct FutureXS *self = get_future(rev->precedent_f);
578              
579 0           self->empty_revocation_slots++;
580              
581 0           AV *on_cancel = self->on_cancel;
582 0 0         if(self->empty_revocation_slots >= 8 && on_cancel &&
    0          
    0          
583 0 0         self->empty_revocation_slots >= AvFILL(on_cancel)/2) {
584              
585             // Squash up the array to contain only defined values
586 0           SV **wrsv = AvARRAY(on_cancel),
587 0           **rdsv = AvARRAY(on_cancel),
588 0 0         **end = AvARRAY(on_cancel) + AvFILL(on_cancel);
589              
590 0 0         while(rdsv <= end) {
591 0 0         if(SvOK(*rdsv))
    0          
    0          
592             // Keep this one
593 0           *(wrsv++) = *rdsv;
594             else
595             // Free this one
596 0           SvREFCNT_dec(*rdsv);
597              
598 0           rdsv++;
599             }
600 0           AvFILLp(on_cancel) = wrsv - AvARRAY(on_cancel) - 1;
601              
602 0           self->empty_revocation_slots = 0;
603             }
604             }
605              
606             #define mark_ready(self, selfsv, state) S_mark_ready(aTHX_ self, selfsv, state)
607 0           static void S_mark_ready(pTHX_ struct FutureXS *self, SV *selfsv, const char *state)
608             {
609 0           self->ready = true;
610             // TODO: self->ready_at
611 0 0         if(capture_times)
612 0           gettimeofday(&self->rtime, NULL);
613              
614             /* Make sure self doesn't disappear during this function */
615 0           SvREFCNT_inc(SvRV(selfsv));
616 0           SAVEFREESV(SvRV(selfsv));
617              
618 0 0         if(self->precedent_f) {
619 0           SvREFCNT_dec(self->precedent_f);
620 0           self->precedent_f = NULL;
621             }
622              
623 0           clear_on_cancel(self);
624 0 0         if(self->revoke_when_ready) {
625 0           AV *revocations = self->revoke_when_ready;
626 0 0         for(size_t i = 0; i < av_count(revocations); i++) {
    0          
627 0           struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(revocations)[i];
628 0           revoke_on_cancel(rev);
629              
630 0           SvREFCNT_dec(rev->precedent_f);
631 0           Safefree(rev);
632             }
633 0           AvFILLp(revocations) = -1;
634 0           SvREFCNT_dec(revocations);
635              
636 0           self->revoke_when_ready = NULL;
637             }
638              
639 0 0         if(!self->callbacks)
640 0           return;
641              
642 0           AV *callbacks = self->callbacks;
643              
644 0           struct FutureXSCallback **cbs = (struct FutureXSCallback **)AvARRAY(callbacks);
645 0 0         size_t i, n = av_count(callbacks);
646 0 0         for(i = 0; i < n; i++) {
647 0           struct FutureXSCallback *cb = cbs[i];
648 0           invoke_callback(self, selfsv, cb);
649             }
650              
651 0           destroy_callbacks(self);
652             }
653              
654             #define make_sequence(f1, cb) S_make_sequence(aTHX_ f1, cb)
655 0           static SV *S_make_sequence(pTHX_ SV *f1, struct FutureXSCallback *cb)
656             {
657 0           struct FutureXS *self = get_future(f1);
658              
659 0           int flags = cb->flags;
660              
661 0 0         if(self->ready) {
662             // TODO: CB_SEQ_IM*
663              
664 0           SV *f2 = invoke_seq_callback(self, f1, cb);
665 0           return f2;
666             }
667              
668 0           SV *fseq = future_new_proto(f1);
669 0 0         if(cb->flags & CB_SEQ_CANCEL)
670 0           future_on_cancel(fseq, f1);
671              
672 0           cb->flags |= CB_DONE|CB_FAIL;
673 0 0         if(cb->seq.thencode)
674 0           cb->seq.thencode = wrap_cb(f1, "sequence", cb->seq.thencode);
675 0 0         if(cb->seq.elsecode)
676 0           cb->seq.elsecode = wrap_cb(f1, "sequence", cb->seq.elsecode);
677 0           cb->seq.f = sv_rvweaken(newSVsv(fseq));
678              
679 0           push_callback(self, cb);
680              
681 0           return fseq;
682             }
683              
684             // TODO: move to a hax/ file
685             #define CvNAME_FILE_LINE(cv) S_CvNAME_FILE_LINE(aTHX_ cv)
686 0           static SV *S_CvNAME_FILE_LINE(pTHX_ CV *cv)
687             {
688 0 0         if(!CvANON(cv)) {
689 0           SV *ret = newSVpvf("HvNAME::GvNAME");
690 0           return ret;
691             }
692              
693 0           OP *cop = CvSTART(cv);
694 0 0         while(cop && OP_CLASS(cop) != OA_COP)
    0          
    0          
    0          
695 0           cop = cop->op_next;
696              
697 0 0         if(!cop)
698 0           return newSVpvs("__ANON__");
699              
700 0 0         return newSVpvf("__ANON__(%s line %d)", CopFILE((COP *)cop), CopLINE((COP *)cop));
701             }
702              
703 0           void Future_donev(pTHX_ SV *f, SV **svp, size_t n)
704             {
705 0           struct FutureXS *self = get_future(f);
706              
707 0 0         if(self->cancelled)
708 0           return;
709              
710 0 0         if(self->ready)
711 0           croak("%" SVf " is already (STATE) and cannot be ->done",
712             SVfARG(f));
713             // TODO: test subs
714              
715 0           self->result = newAV_svn_dup(svp, n);
716 0           mark_ready(self, f, "done");
717             }
718              
719 0           void Future_failv(pTHX_ SV *f, SV **svp, size_t n)
720             {
721 0           struct FutureXS *self = get_future(f);
722              
723 0 0         if(self->cancelled)
724 0           return;
725              
726 0 0         if(self->ready)
727 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
728             SVfARG(f));
729              
730 0 0         if(n == 1 &&
    0          
731 0           SvROK(svp[0]) && SvOBJECT(SvRV(svp[0])) &&
732 0           sv_derived_from(svp[0], "Future::Exception")) {
733 0           SV *exception = svp[0];
734 0           AV *failure = self->failure = newAV();
735              
736 0           dSP;
737              
738             {
739 0           ENTER;
740 0           SAVETMPS;
741              
742 0 0         EXTEND(SP, 1);
743 0 0         PUSHMARK(SP);
744 0           PUSHs(sv_mortalcopy(exception));
745 0           PUTBACK;
746              
747 0           call_method("message", G_SCALAR);
748              
749 0           SPAGAIN;
750              
751 0           av_push(failure, SvREFCNT_inc(POPs));
752              
753 0           PUTBACK;
754 0 0         FREETMPS;
755 0           LEAVE;
756             }
757              
758             {
759 0           ENTER;
760 0           SAVETMPS;
761              
762 0 0         EXTEND(SP, 1);
763 0 0         PUSHMARK(SP);
764 0           PUSHs(sv_mortalcopy(exception));
765 0           PUTBACK;
766              
767 0           call_method("category", G_SCALAR);
768              
769 0           SPAGAIN;
770              
771 0           av_push(failure, SvREFCNT_inc(POPs));
772              
773 0           PUTBACK;
774 0 0         FREETMPS;
775 0           LEAVE;
776             }
777              
778             {
779 0           ENTER;
780 0           SAVETMPS;
781              
782 0 0         EXTEND(SP, 1);
783 0 0         PUSHMARK(SP);
784 0           PUSHs(sv_mortalcopy(exception));
785 0           PUTBACK;
786              
787 0           SSize_t count = call_method("details", G_LIST);
788              
789 0           SPAGAIN;
790              
791 0           SV **retp = SP - count + 1;
792              
793 0 0         for(SSize_t i = 0; i < count; i++)
794 0           av_push(failure, SvREFCNT_inc(retp[i]));
795 0           SP -= count;
796              
797 0           PUTBACK;
798 0 0         FREETMPS;
799 0           LEAVE;
800             }
801             }
802             else {
803 0           self->failure = newAV_svn_dup(svp, n);
804             }
805              
806 0           mark_ready(self, f, "failed");
807             }
808              
809             #define future_failp(f, s) Future_failp(aTHX_ f, s)
810 0           void Future_failp(pTHX_ SV *f, const char *s)
811             {
812 0           struct FutureXS *self = get_future(f);
813              
814 0 0         if(self->cancelled)
815 0           return;
816              
817 0 0         if(self->ready)
818 0           croak("%" SVf " is already (STATE) and cannot be ->fail'ed",
819             SVfARG(f));
820              
821 0           self->failure = newAV();
822 0           av_push(self->failure, newSVpv(s, strlen(s)));
823 0           mark_ready(self, f, "failed");
824             }
825              
826 0           void Future_on_cancel(pTHX_ SV *f, SV *code)
827             {
828 0           struct FutureXS *self = get_future(f);
829              
830 0 0         if(self->ready)
831 0           return;
832              
833 0           bool is_future = sv_is_future(code);
834             // TODO: is_future or callable(code) or croak
835              
836 0 0         if(!self->on_cancel)
837 0           self->on_cancel = newAV();
838              
839 0           SV *rv = newSVsv((SV *)code);
840 0           av_push(self->on_cancel, rv);
841              
842 0 0         if(is_future) {
843             struct FutureXSRevocation *rev;
844 0           Newx(rev, 1, struct FutureXSRevocation);
845              
846 0           rev->precedent_f = sv_rvweaken(newSVsv(f));
847 0           rev->toclear_sv_at = sv_rvweaken(newRV_inc(rv));
848              
849 0           struct FutureXS *codeself = get_future(code);
850 0 0         if(!codeself->revoke_when_ready)
851 0           codeself->revoke_when_ready = newAV();
852              
853 0           av_push(codeself->revoke_when_ready, (SV *)rev);
854             }
855             }
856              
857 0           void Future_on_ready(pTHX_ SV *f, SV *code)
858             {
859 0           struct FutureXS *self = get_future(f);
860              
861 0           bool is_future = sv_is_future(code);
862             // TODO: is_future or callable(code) or croak
863              
864 0           int flags = CB_ALWAYS|CB_SELF;
865 0 0         if(is_future)
866 0           flags |= CB_IS_FUTURE;
867              
868 0           struct FutureXSCallback cb = {
869             .flags = flags,
870             .code = code,
871             };
872              
873 0 0         if(self->ready)
874 0           invoke_callback(self, f, &cb);
875             else {
876 0           cb.code = wrap_cb(f, "on_ready", cb.code);
877 0           push_callback(self, &cb);
878             }
879 0           }
880              
881 0           void Future_on_done(pTHX_ SV *f, SV *code)
882             {
883 0           struct FutureXS *self = get_future(f);
884              
885 0           bool is_future = sv_is_future(code);
886             // TODO: is_future or callable(code) or croak
887              
888 0           int flags = CB_DONE|CB_RESULT;
889 0 0         if(is_future)
890 0           flags |= CB_IS_FUTURE;
891              
892 0           struct FutureXSCallback cb = {
893             .flags = flags,
894             .code = code,
895             };
896              
897 0 0         if(self->ready)
898 0           invoke_callback(self, f, &cb);
899             else {
900 0           cb.code = wrap_cb(f, "on_done", cb.code);
901 0           push_callback(self, &cb);
902             }
903 0           }
904              
905 0           void Future_on_fail(pTHX_ SV *f, SV *code)
906             {
907 0           struct FutureXS *self = get_future(f);
908              
909 0           bool is_future = sv_is_future(code);
910             // TODO: is_future or callable(code) or croak
911              
912 0           int flags = CB_FAIL|CB_RESULT;
913 0 0         if(is_future)
914 0           flags |= CB_IS_FUTURE;
915              
916 0           struct FutureXSCallback cb = {
917             .flags = flags,
918             .code = code,
919             };
920              
921 0 0         if(self->ready)
922 0           invoke_callback(self, f, &cb);
923             else {
924 0           cb.code = wrap_cb(f, "on_fail", cb.code);
925 0           push_callback(self, &cb);
926             }
927 0           }
928              
929             #define future_await(f) Future_await(aTHX_ f)
930 0           static void Future_await(pTHX_ SV *f)
931             {
932 0           dSP;
933              
934 0           ENTER;
935 0           SAVETMPS;
936              
937 0 0         PUSHMARK(SP);
938 0 0         mXPUSHs(newSVsv(f));
939 0           PUTBACK;
940              
941 0           call_method("await", G_VOID);
942              
943 0 0         FREETMPS;
944 0           LEAVE;
945 0           }
946              
947 0           AV *Future_get_result_av(pTHX_ SV *f, bool await)
948             {
949 0           struct FutureXS *self = get_future(f);
950              
951 0 0         if(await && !self->ready)
    0          
952 0           future_await(f);
953              
954 0 0         if(!self->ready)
955 0           croak("%" SVf " is not yet ready", SVfARG(f));
956              
957 0 0         if(self->failure) {
958 0           self->reported = true;
959              
960 0           SV *exception = AvARRAY(self->failure)[0];
961 0 0         if(av_count(self->failure) > 1) {
    0          
962 0           dSP;
963 0           ENTER;
964 0           SAVETMPS;
965              
966 0 0         PUSHMARK(SP);
967 0 0         EXTEND(SP, 1 + av_count(self->failure));
    0          
    0          
    0          
    0          
968 0           mPUSHpvs("Future::Exception");
969 0 0         for(SSize_t i = 0; i < av_count(self->failure); i++)
    0          
970 0           PUSHs(sv_mortalcopy(AvARRAY(self->failure)[i]));
971 0           PUTBACK;
972              
973 0           call_method("new", G_SCALAR);
974              
975 0           SPAGAIN;
976              
977 0           exception = SvREFCNT_inc(POPs);
978              
979 0           PUTBACK;
980 0 0         FREETMPS;
981 0           LEAVE;
982             }
983              
984 0 0         if(SvROK(exception) || SvPV_nolen(exception)[SvCUR(exception)-1] == '\n')
    0          
    0          
985 0           die_sv(exception);
986             else {
987             /* We'd like to call Carp::croak to do the @CARP_NOT logic, but it gets
988             * confused about a missing callframe first because this is XS. We'll
989             * reïmplement the logic here
990             */
991             I32 cxix;
992 0 0         for(cxix = cxstack_ix; cxix; cxix--) {
993 0 0         if(CxTYPE(&cxstack[cxix]) != CXt_SUB)
994 0           continue;
995              
996 0           const CV *cv = cxstack[cxix].blk_sub.cv;
997 0 0         if(!cv)
998 0           continue;
999              
1000 0 0         const char *stashname = HvNAME(CvSTASH(cv));
    0          
    0          
    0          
    0          
    0          
1001 0 0         if(!stashname)
1002 0           continue;
1003              
1004             // The essence of the @CARP_NOT logic
1005 0 0         if(strEQ(stashname, "Future::_base"))
1006 0           continue;
1007              
1008 0 0         const COP *cop = cxix < cxstack_ix ? cxstack[cxix+1].blk_oldcop : PL_curcop;
1009              
1010 0 0         sv_catpvf(exception, " at %s line %d.\n", CopFILE(cop), CopLINE(cop));
1011 0           break;
1012             }
1013              
1014 0           die_sv(exception);
1015             }
1016             }
1017              
1018 0 0         if(self->cancelled)
1019 0           croak("%" SVf " was cancelled",
1020             SVfARG(f));
1021              
1022 0 0         if(!self->result)
1023 0           self->result = newAV();
1024              
1025 0           return self->result;
1026             }
1027              
1028 0           AV *Future_get_failure_av(pTHX_ SV *f)
1029             {
1030 0           struct FutureXS *self = get_future(f);
1031              
1032 0 0         if(!self->ready)
1033 0           future_await(f);
1034              
1035 0 0         if(!self->failure)
1036 0           return NULL;
1037              
1038 0           return self->failure;
1039             }
1040              
1041 0           void Future_cancel(pTHX_ SV *f)
1042             {
1043 0           struct FutureXS *self = get_future(f);
1044              
1045 0 0         if(self->ready)
1046 0           return;
1047              
1048 0           self->cancelled = true;
1049 0           AV *on_cancel = self->on_cancel;
1050              
1051 0 0         if(self->subs) {
1052 0 0         for(Size_t i = 0; i < av_count(self->subs); i++)
    0          
1053 0           future_cancel(AvARRAY(self->subs)[i]);
1054             }
1055              
1056             // TODO: maybe we need to clear these out from self before we do this, in
1057             // case of recursion?
1058              
1059 0 0         for(int i = on_cancel ? AvFILL(on_cancel) : -1; i >= 0; i--) {
    0          
    0          
1060 0           SV *code = AvARRAY(on_cancel)[i];
1061 0 0         if(!SvOK(code))
    0          
    0          
1062 0           continue;
1063              
1064 0 0         if(sv_is_future(code)) {
1065 0           dSP;
1066              
1067 0           ENTER;
1068 0           SAVETMPS;
1069              
1070 0 0         PUSHMARK(SP);
1071 0           PUSHs(code);
1072 0           PUTBACK;
1073              
1074 0           call_method("cancel", G_VOID);
1075              
1076 0 0         FREETMPS;
1077 0           LEAVE;
1078             }
1079             else {
1080 0           dSP;
1081              
1082 0           ENTER;
1083 0           SAVETMPS;
1084              
1085 0 0         PUSHMARK(SP);
1086 0           PUSHs(f);
1087 0           PUTBACK;
1088              
1089             assert(SvOK(code));
1090 0           call_sv(code, G_VOID);
1091              
1092 0 0         FREETMPS;
1093 0           LEAVE;
1094             }
1095             }
1096              
1097 0           mark_ready(self, f, "cancel");
1098             }
1099              
1100 0           SV *Future_without_cancel(pTHX_ SV *f)
1101             {
1102 0           struct FutureXSCallback cb = {
1103             .flags = CB_SEQ_READY|CB_CANCEL, /* without CB_SEQ_CANCEL */
1104             /* no code */
1105             };
1106              
1107 0           SV *ret = make_sequence(f, &cb);
1108 0           struct FutureXS *self = get_future(ret);
1109              
1110 0           self->precedent_f = newSVsv(f);
1111              
1112 0           return ret;
1113             }
1114              
1115 0           SV *Future_then(pTHX_ SV *f, U32 flags, SV *thencode, SV *elsecode)
1116             {
1117 0           struct FutureXSCallback cb = {
1118             .flags = CB_SEQ_ANY|CB_RESULT,
1119             .seq.thencode = thencode,
1120             .seq.elsecode = elsecode,
1121             };
1122 0 0         if(flags & FUTURE_THEN_WITH_F)
1123 0           cb.flags |= CB_SELF;
1124              
1125 0           return make_sequence(f, &cb);
1126             }
1127              
1128 0           SV *Future_followed_by(pTHX_ SV *f, SV *code)
1129             {
1130 0           struct FutureXSCallback cb = {
1131             .flags = CB_SEQ_ANY|CB_SELF,
1132             .seq.thencode = code,
1133             .seq.elsecode = code,
1134             };
1135              
1136 0           return make_sequence(f, &cb);
1137             }
1138              
1139 0           SV *Future_thencatch(pTHX_ SV *f, U32 flags, SV *thencode, HV *catches, SV *elsecode)
1140             {
1141 0           struct FutureXSCallback cb = {
1142             .flags = CB_SEQ_ANY|CB_RESULT,
1143             .seq.thencode = thencode,
1144             .seq.elsecode = elsecode,
1145             .seq.catches = catches,
1146             };
1147 0 0         if(flags & FUTURE_THEN_WITH_F)
1148 0           cb.flags |= CB_SELF;
1149              
1150 0           return make_sequence(f, &cb);
1151             }
1152              
1153             #define future_new_subsv(cls, subs, n) S_future_new_subsv(aTHX_ cls, subs, n)
1154 0           static SV *S_future_new_subsv(pTHX_ const char *cls, SV **subs, size_t n)
1155             {
1156 0           HV *future_stash = get_hv("Future::", 0);
1157             assert(future_stash);
1158              
1159             /* Find the best prototype; pick the first derived instance if there is
1160             * one */
1161 0           SV *proto = NULL;
1162 0 0         for(Size_t i = 0; i < n; i++) {
1163 0 0         if(!SvROK(subs[i]) || !SvOBJECT(SvRV(subs[i])))
    0          
1164 0           croak("Expected a Future, got %" SVf, SVfARG(subs[i]));
1165              
1166 0 0         if(SvSTASH(SvRV(subs[i])) != future_stash) {
1167 0           proto = subs[i];
1168 0           break;
1169             }
1170             }
1171              
1172 0 0         SV *f = proto ? future_new_proto(proto) : future_new(cls);
1173 0           struct FutureXS *self = get_future(f);
1174              
1175 0 0         if(!self->subs)
1176 0           self->subs = newAV();
1177              
1178 0 0         for(Size_t i = 0; i < n; i++)
1179 0           av_push(self->subs, newSVsv(subs[i]));
1180              
1181 0           return f;
1182             }
1183              
1184             #define copy_result(self, src) S_copy_result(aTHX_ self, src)
1185 0           static void S_copy_result(pTHX_ struct FutureXS *self, SV *src)
1186             {
1187             /* TODO: Handle non-Future::XS instances too */
1188 0           struct FutureXS *srcself = get_future(src);
1189              
1190             assert(srcself->ready);
1191             assert(!srcself->cancelled);
1192              
1193 0 0         if(srcself->failure) {
1194 0 0         self->failure = newAV_svn_dup(AvARRAY(srcself->failure), av_count(srcself->failure));
1195             }
1196             else {
1197             assert(srcself->result);
1198 0 0         self->result = newAV_svn_dup(AvARRAY(srcself->result), av_count(srcself->result));
1199             }
1200 0           }
1201              
1202             #define cancel_pending_subs(self) S_cancel_pending_subs(aTHX_ self)
1203 0           static void S_cancel_pending_subs(pTHX_ struct FutureXS *self)
1204             {
1205 0 0         if(!self->subs)
1206 0           return;
1207              
1208 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1209 0           SV *sub = AvARRAY(self->subs)[i];
1210 0 0         if(!future_is_ready(sub))
1211 0           future_cancel(sub);
1212             }
1213             }
1214              
1215 0           XS_INTERNAL(sub_on_ready_waitall)
1216             {
1217 0           dXSARGS;
1218              
1219 0           SV *f = XSANY_sv;
1220 0 0         if(!SvOK(f))
    0          
    0          
1221 0           return;
1222              
1223             /* Make sure self doesn't disappear during this function */
1224 0           SvREFCNT_inc(SvRV(f));
1225 0           SAVEFREESV(SvRV(f));
1226              
1227 0           struct FutureXS *self = get_future(f);
1228              
1229 0           self->pending_subs--;
1230              
1231 0 0         if(self->pending_subs)
1232 0           XSRETURN(0);
1233              
1234             /* TODO: This is really just newAVav() */
1235 0 0         self->result = newAV_svn_dup(AvARRAY(self->subs), av_count(self->subs));
1236 0           mark_ready(self, f, "wait_all");
1237             }
1238              
1239 0           SV *Future_new_waitallv(pTHX_ const char *cls, SV **subs, size_t n)
1240             {
1241 0           SV *f = future_new_subsv(cls, subs, n);
1242 0           struct FutureXS *self = get_future(f);
1243              
1244 0           self->pending_subs = 0;
1245 0 0         for(Size_t i = 0; i < n; i++) {
1246             /* TODO: This should probably use some API function to make it transparent */
1247 0 0         if(!future_is_ready(subs[i]))
1248 0           self->pending_subs++;
1249             }
1250              
1251 0 0         if(!self->pending_subs) {
1252 0           self->result = newAV_svn_dup(subs, n);
1253 0           mark_ready(self, f, "wait_all");
1254              
1255 0           return f;
1256             }
1257              
1258 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitall, __FILE__);
1259 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1260 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1261              
1262 0           GV *gv = gv_fetchpvs("Future::XS::(wait_all callback)", GV_ADDMULTI, SVt_PVCV);
1263 0           CvGV_set(sub_on_ready, gv);
1264 0           CvANON_off(sub_on_ready);
1265              
1266 0 0         for(Size_t i = 0; i < n; i++) {
1267 0 0         if(!future_is_ready(subs[i]))
1268 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1269             }
1270              
1271 0           SvREFCNT_dec(sub_on_ready);
1272              
1273 0           return f;
1274             }
1275              
1276 0           XS_INTERNAL(sub_on_ready_waitany)
1277             {
1278 0           dXSARGS;
1279 0           SV *thissub = ST(0);
1280              
1281 0           SV *f = XSANY_sv;
1282 0 0         if(!SvOK(f))
    0          
    0          
1283 0           return;
1284              
1285             /* Make sure self doesn't disappear during this function */
1286 0           SvREFCNT_inc(SvRV(f));
1287 0           SAVEFREESV(SvRV(f));
1288              
1289 0           struct FutureXS *self = get_future(f);
1290              
1291 0 0         if(self->result || self->failure)
    0          
1292 0           return;
1293              
1294 0           self->pending_subs--;
1295              
1296 0           bool this_cancelled = future_is_cancelled(thissub);
1297              
1298 0 0         if(self->pending_subs && this_cancelled)
    0          
1299 0           return;
1300              
1301 0 0         if(this_cancelled) {
1302 0           future_failp(f, "All component futures were cancelled");
1303 0           return;
1304             }
1305             else
1306 0           copy_result(self, thissub);
1307              
1308 0           cancel_pending_subs(self);
1309              
1310 0           mark_ready(self, f, "wait_any");
1311             }
1312              
1313 0           SV *Future_new_waitanyv(pTHX_ const char *cls, SV **subs, size_t n)
1314             {
1315 0           SV *f = future_new_subsv(cls, subs, n);
1316 0           struct FutureXS *self = get_future(f);
1317              
1318 0 0         if(!n) {
1319 0           future_failp(f, "Cannot ->wait_any with no subfutures");
1320 0           return f;
1321             }
1322              
1323 0           SV *immediate_ready = NULL;
1324 0 0         for(Size_t i = 0; i < n; i++) {
1325             /* TODO: This should probably use some API function to make it transparent */
1326 0 0         if(future_is_ready(subs[i]) && !future_is_cancelled(subs[i])) {
    0          
1327 0           immediate_ready = subs[i];
1328 0           break;
1329             }
1330             }
1331              
1332 0 0         if(immediate_ready) {
1333 0           copy_result(self, immediate_ready);
1334              
1335 0           cancel_pending_subs(self);
1336              
1337 0           mark_ready(self, f, "wait_any");
1338              
1339 0           return f;
1340             }
1341              
1342 0           self->pending_subs = 0;
1343              
1344 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_waitany, __FILE__);
1345 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1346 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1347              
1348 0           GV *gv = gv_fetchpvs("Future::XS::(wait_any callback)", GV_ADDMULTI, SVt_PVCV);
1349 0           CvGV_set(sub_on_ready, gv);
1350 0           CvANON_off(sub_on_ready);
1351              
1352 0 0         for(Size_t i = 0; i < n; i++) {
1353 0 0         if(future_is_cancelled(subs[i]))
1354 0           continue;
1355              
1356 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1357 0           self->pending_subs++;
1358             }
1359              
1360 0           SvREFCNT_dec(sub_on_ready);
1361              
1362 0           return f;
1363             }
1364              
1365             #define compose_needsall_result(self) S_compose_needsall_result(aTHX_ self)
1366 0           static void S_compose_needsall_result(pTHX_ struct FutureXS *self)
1367             {
1368 0           AV *result = self->result = newAV();
1369 0 0         for(Size_t i = 0; i < av_count(self->subs); i++) {
    0          
1370 0           SV *sub = AvARRAY(self->subs)[i];
1371 0           struct FutureXS *subself = get_future(sub);
1372             assert(subself->result);
1373 0 0         av_push_svn(result, AvARRAY(subself->result), av_count(subself->result));
1374             }
1375 0           }
1376              
1377 0           XS_INTERNAL(sub_on_ready_needsall)
1378             {
1379 0           dXSARGS;
1380 0           SV *thissub = ST(0);
1381              
1382 0           SV *f = XSANY_sv;
1383 0 0         if(!SvOK(f))
    0          
    0          
1384 0           return;
1385              
1386             /* Make sure self doesn't disappear during this function */
1387 0           SvREFCNT_inc(SvRV(f));
1388 0           SAVEFREESV(SvRV(f));
1389              
1390 0           struct FutureXS *self = get_future(f);
1391              
1392 0 0         if(self->result || self->failure)
    0          
1393 0           return;
1394              
1395 0 0         if(future_is_cancelled(thissub)) {
1396 0           future_failp(f, "A component future was cancelled");
1397 0           cancel_pending_subs(self);
1398 0           return;
1399             }
1400 0 0         else if(future_is_failed(thissub)) {
1401 0           copy_result(self, thissub);
1402 0           cancel_pending_subs(self);
1403 0           mark_ready(self, f, "needs_all");
1404             }
1405             else {
1406 0           self->pending_subs--;
1407 0 0         if(self->pending_subs)
1408 0           return;
1409 0           compose_needsall_result(self);
1410 0           mark_ready(self, f, "needs_all");
1411             }
1412             }
1413              
1414 0           SV *Future_new_needsallv(pTHX_ const char *cls, SV **subs, size_t n)
1415             {
1416 0           SV *f = future_new_subsv(cls, subs, n);
1417 0           struct FutureXS *self = get_future(f);
1418              
1419 0 0         if(!n) {
1420 0           future_donev(f, NULL, 0);
1421 0           return f;
1422             }
1423              
1424 0           SV *immediate_fail = NULL;
1425 0 0         for(Size_t i = 0; i < n; i++) {
1426 0 0         if(future_is_cancelled(subs[i])) {
1427 0           future_failp(f, "A component future was cancelled");
1428 0           cancel_pending_subs(self);
1429 0           return f;
1430             }
1431 0 0         if(future_is_failed(subs[i])) {
1432 0           immediate_fail = subs[i];
1433 0           break;
1434             }
1435             }
1436              
1437 0 0         if(immediate_fail) {
1438 0           copy_result(self, immediate_fail);
1439 0           cancel_pending_subs(self);
1440 0           mark_ready(self, f, "needs_all");
1441 0           return f;
1442             }
1443              
1444 0           self->pending_subs = 0;
1445              
1446 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsall, __FILE__);
1447 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1448 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1449              
1450 0           GV *gv = gv_fetchpvs("Future::XS::(needs_all callback)", GV_ADDMULTI, SVt_PVCV);
1451 0           CvGV_set(sub_on_ready, gv);
1452 0           CvANON_off(sub_on_ready);
1453              
1454 0 0         for(Size_t i = 0; i < n; i++) {
1455 0 0         if(future_is_ready(subs[i]))
1456 0           continue;
1457              
1458 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1459 0           self->pending_subs++;
1460             }
1461              
1462 0 0         if(!self->pending_subs) {
1463 0           compose_needsall_result(self);
1464 0           mark_ready(self, f, "needs_all");
1465             }
1466              
1467 0           SvREFCNT_dec(sub_on_ready);
1468              
1469 0           return f;
1470             }
1471              
1472 0           XS_INTERNAL(sub_on_ready_needsany)
1473             {
1474 0           dXSARGS;
1475 0           SV *thissub = ST(0);
1476              
1477 0           SV *f = XSANY_sv;
1478 0 0         if(!SvOK(f))
    0          
    0          
1479 0           return;
1480              
1481             /* Make sure self doesn't disappear during this function */
1482 0           SvREFCNT_inc(SvRV(f));
1483 0           SAVEFREESV(SvRV(f));
1484              
1485 0           struct FutureXS *self = get_future(f);
1486              
1487 0 0         if(self->result || self->failure)
    0          
1488 0           return;
1489              
1490 0           self->pending_subs--;
1491              
1492 0           bool this_cancelled = future_is_cancelled(thissub);
1493              
1494 0 0         if(self->pending_subs && this_cancelled)
    0          
1495 0           return;
1496              
1497 0 0         if(this_cancelled) {
1498 0           future_failp(f, "All component futures were cancelled");
1499             }
1500 0 0         else if(future_is_failed(thissub)) {
1501 0 0         if(self->pending_subs)
1502 0           return;
1503              
1504 0           copy_result(self, thissub);
1505 0           mark_ready(self, f, "needs_any");
1506             }
1507             else {
1508 0           copy_result(self, thissub);
1509 0           cancel_pending_subs(self);
1510 0           mark_ready(self, f, "needs_any");
1511             }
1512             }
1513              
1514 0           SV *Future_new_needsanyv(pTHX_ const char *cls, SV **subs, size_t n)
1515             {
1516 0           SV *f = future_new_subsv(cls, subs, n);
1517 0           struct FutureXS *self = get_future(f);
1518              
1519 0 0         if(!n) {
1520 0           future_failp(f, "Cannot ->needs_any with no subfutures");
1521 0           return f;
1522             }
1523              
1524 0           SV *immediate_done = NULL;
1525 0 0         for(Size_t i = 0; i < n; i++) {
1526 0 0         if(future_is_done(subs[i])) {
1527 0           immediate_done = subs[i];
1528 0           break;
1529             }
1530             }
1531              
1532 0 0         if(immediate_done) {
1533 0           copy_result(self, immediate_done);
1534 0           cancel_pending_subs(self);
1535 0           mark_ready(self, f, "needs_any");
1536 0           return f;
1537             }
1538              
1539 0           self->pending_subs = 0;
1540              
1541 0           CV *sub_on_ready = newXS(NULL, sub_on_ready_needsany, __FILE__);
1542 0           cv_set_anysv_refcounted(sub_on_ready, newSVsv(f));
1543 0           sv_rvweaken(CvXSUBANY_sv(sub_on_ready));
1544              
1545 0           GV *gv = gv_fetchpvs("Future::XS::(needs_any callback)", GV_ADDMULTI, SVt_PVCV);
1546 0           CvGV_set(sub_on_ready, gv);
1547 0           CvANON_off(sub_on_ready);
1548              
1549 0 0         for(Size_t i = 0; i < n; i++) {
1550 0 0         if(future_is_ready(subs[i]))
1551 0           continue;
1552              
1553 0           future_on_ready(subs[i], sv_2mortal(newRV_inc((SV *)sub_on_ready)));
1554 0           self->pending_subs++;
1555             }
1556              
1557 0 0         if(!self->pending_subs) {
1558 0           copy_result(self, subs[n-1]);
1559 0           mark_ready(self, f, "needs_any");
1560             }
1561              
1562 0           SvREFCNT_dec(sub_on_ready);
1563              
1564 0           return f;
1565             }
1566              
1567 0           Size_t Future_mPUSH_subs(pTHX_ SV *f, enum FutureSubFilter filter)
1568             {
1569 0           dSP;
1570              
1571 0           struct FutureXS *self = get_future(f);
1572              
1573 0           Size_t ret = 0;
1574 0 0         for(Size_t i = 0; self->subs && i < av_count(self->subs); i++) {
    0          
    0          
1575 0           SV *sub = AvARRAY(self->subs)[i];
1576              
1577             bool want;
1578 0           switch(filter) {
1579             case FUTURE_SUBS_PENDING:
1580 0           want = !future_is_ready(sub);
1581 0           break;
1582              
1583             case FUTURE_SUBS_READY:
1584 0           want = future_is_ready(sub);
1585 0           break;
1586              
1587             case FUTURE_SUBS_DONE:
1588 0           want = future_is_done(sub);
1589 0           break;
1590              
1591             case FUTURE_SUBS_FAILED:
1592 0           want = future_is_failed(sub);
1593 0           break;
1594              
1595             case FUTURE_SUBS_CANCELLED:
1596 0           want = future_is_cancelled(sub);
1597 0           break;
1598             }
1599              
1600 0 0         if(want) {
1601 0 0         XPUSHs(sv_mortalcopy(sub));
1602 0           ret++;
1603             }
1604             }
1605              
1606 0           PUTBACK;
1607 0           return ret;
1608             }
1609              
1610 0           struct timeval Future_get_btime(pTHX_ SV *f)
1611             {
1612 0           struct FutureXS *self = get_future(f);
1613 0           return self->btime;
1614             }
1615              
1616 0           struct timeval Future_get_rtime(pTHX_ SV *f)
1617             {
1618 0           struct FutureXS *self = get_future(f);
1619 0           return self->rtime;
1620             }
1621              
1622 0           void Future_set_label(pTHX_ SV *f, SV *label)
1623             {
1624 0           struct FutureXS *self = get_future(f);
1625              
1626 0 0         if(self->label)
1627 0           SvREFCNT_dec(label);
1628              
1629 0           self->label = newSVsv(label);
1630 0           }
1631              
1632 0           SV *Future_get_label(pTHX_ SV *f)
1633             {
1634 0           struct FutureXS *self = get_future(f);
1635              
1636 0           return self->label;
1637             }
1638              
1639 0           void Future_set_udata(pTHX_ SV *f, SV *key, SV *value)
1640             {
1641 0           struct FutureXS *self = get_future(f);
1642              
1643 0 0         if(!self->udata)
1644 0           self->udata = newHV();
1645              
1646 0           hv_store_ent(self->udata, key, newSVsv(value), 0);
1647 0           }
1648              
1649 0           SV *Future_get_udata(pTHX_ SV *f, SV *key)
1650             {
1651 0           struct FutureXS *self = get_future(f);
1652              
1653 0 0         if(!self->udata)
1654 0           return &PL_sv_undef;
1655              
1656 0           HE *he = hv_fetch_ent(self->udata, key, 0, 0);
1657 0 0         return he ? HeVAL(he) : &PL_sv_undef;
1658             }
1659              
1660             /* DMD_HELPER assistants */
1661              
1662             #ifdef HAVE_DMD_HELPER
1663             static int dumpstruct_callback(pTHX_ DMDContext *ctx, struct FutureXSCallback *cb)
1664             {
1665             if(!(cb->flags & CB_SEQ_ANY))
1666             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback", cb, sizeof(struct FutureXSCallback),
1667             /* Some cheating here, to claim the "code" is either a CV or a Future,
1668             * depending on the CB_IS_FUTURE flag */
1669             3, ((const DMDNamedField []){
1670             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1671             {"the code CV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? NULL : cb->code},
1672             {"the Future SV", DMD_FIELD_PTR, .ptr = (cb->flags & CB_IS_FUTURE) ? cb->code : NULL },
1673             })
1674             );
1675             else
1676             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSCallback(CB_SEQ)", cb, sizeof(struct FutureXSCallback),
1677             4, ((const DMDNamedField []){
1678             {"flags", DMD_FIELD_UINT, .n = cb->flags},
1679             {"the then code CV", DMD_FIELD_PTR, .ptr = cb->seq.thencode},
1680             {"the else code CV", DMD_FIELD_PTR, .ptr = cb->seq.elsecode},
1681             {"the sequence future SV", DMD_FIELD_PTR, .ptr = cb->seq.f},
1682             })
1683             );
1684              
1685             return 0;
1686             }
1687              
1688             static int dumpstruct_revocation(pTHX_ DMDContext *ctx, struct FutureXSRevocation *rev)
1689             {
1690             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXSRevocation", rev, sizeof(struct FutureXSRevocation),
1691             2, ((const DMDNamedField []){
1692             {"the precedent future SV", DMD_FIELD_PTR, .ptr = rev->precedent_f},
1693             {"the SV to clear RV", DMD_FIELD_PTR, .ptr = rev->toclear_sv_at},
1694             })
1695             );
1696              
1697             return 0;
1698             }
1699              
1700             static int dumpstruct(pTHX_ DMDContext *ctx, const SV *sv)
1701             {
1702             int ret = 0;
1703              
1704             // TODO: Add some safety checking
1705             struct FutureXS *self = INT2PTR(struct FutureXS *, SvIV((SV *)sv));
1706              
1707             DMD_DUMP_STRUCT(ctx, "Future::XS/FutureXS", self, sizeof(struct FutureXS),
1708             12, ((const DMDNamedField []){
1709             {"ready", DMD_FIELD_BOOL, .b = self->ready},
1710             {"cancelled", DMD_FIELD_BOOL, .b = self->cancelled},
1711             {"the label SV", DMD_FIELD_PTR, .ptr = self->label},
1712             {"the result AV", DMD_FIELD_PTR, .ptr = self->result},
1713             {"the failure AV", DMD_FIELD_PTR, .ptr = self->failure},
1714             {"the callbacks AV", DMD_FIELD_PTR, .ptr = self->callbacks},
1715             {"the on_cancel AV", DMD_FIELD_PTR, .ptr = self->on_cancel},
1716             {"the revoke_when_ready AV", DMD_FIELD_PTR, .ptr = self->revoke_when_ready},
1717             {"the udata HV", DMD_FIELD_PTR, .ptr = self->udata},
1718             {"the constructed-at SV", DMD_FIELD_PTR, .ptr = self->constructed_at},
1719             {"the subs AV", DMD_FIELD_PTR, .ptr = self->subs},
1720             {"the pending sub count", DMD_FIELD_UINT, .n = self->pending_subs},
1721             })
1722             );
1723              
1724             for(size_t i = 0; self->callbacks && i < av_count(self->callbacks); i++) {
1725             struct FutureXSCallback *cb = (struct FutureXSCallback *)AvARRAY(self->callbacks)[i];
1726             ret += dumpstruct_callback(aTHX_ ctx, cb);
1727             }
1728              
1729             for(size_t i = 0; self->revoke_when_ready && i < av_count(self->revoke_when_ready); i++) {
1730             struct FutureXSRevocation *rev = (struct FutureXSRevocation *)AvARRAY(self->revoke_when_ready)[i];
1731             ret += dumpstruct_revocation(aTHX_ ctx, rev);
1732             }
1733              
1734             ret += DMD_ANNOTATE_SV(sv, (SV *)self, "the FutureXS structure");
1735              
1736             return ret;
1737             }
1738             #endif
1739              
1740             #define getenv_bool(key) S_getenv_bool(aTHX_ key)
1741 4           static bool S_getenv_bool(pTHX_ const char *key)
1742             {
1743 4           const char *val = getenv(key);
1744 4 50         if(!val || !val[0])
    0          
1745 4           return false;
1746 0 0         if(val[0] == '0' && strlen(val) == 1)
    0          
1747 0           return false;
1748 0           return true;
1749             }
1750              
1751             #ifndef newSVbool
1752             # define newSVbool(b) newSVsv(b ? &PL_sv_yes : &PL_sv_no)
1753             #endif
1754              
1755 2           void Future_reread_environment(pTHX)
1756             {
1757 2           future_debug = getenv_bool("PERL_FUTURE_DEBUG");
1758              
1759 2 50         capture_times = future_debug || getenv_bool("PERL_FUTURE_TIMES");
    50          
1760 2 50         sv_setsv(get_sv("Future::TIMES", GV_ADDMULTI), capture_times ? &PL_sv_yes : &PL_sv_no);
1761 2           }
1762              
1763 2           void Future_boot(pTHX)
1764             {
1765             #ifdef HAVE_DMD_HELPER
1766             DMD_SET_PACKAGE_HELPER("Future::XS", dumpstruct);
1767             #endif
1768              
1769 2           Future_reread_environment(aTHX);
1770              
1771             // We can only do this once
1772 2 50         newCONSTSUB(gv_stashpvn("Future::XS", 10, TRUE), "DEBUG", newSVbool(future_debug));
1773 2           }