File Coverage

Future.xs
Criterion Covered Total %
statement 546 553 98.7
branch 341 476 71.6
condition n/a
subroutine n/a
pod n/a
total 887 1029 86.2


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "EVAPI.h"
7              
8             #define IS_PVCV(sv) (sv && SvROK(sv) && SvTYPE(SvRV(sv)) == SVt_PVCV)
9              
10             typedef struct {
11             AV *tasks;
12             SV *final_cb;
13             I32 remaining;
14             CV **cvs;
15             I32 num_cvs;
16             CV *shared_cv; /* for unsafe mode */
17             int *is_freed_ptr;
18             } parallel_ctx;
19              
20             /* Guard registered with SAVEDESTRUCTOR_X so that, on any unwind (including
21             uncaught croak from unsafe-mode tasks), we clear ctx->is_freed_ptr and free
22             the heap-allocated is_freed flag. Prevents writes to a dead stack slot when
23             async tasks complete after the XSUB has unwound. */
24             typedef struct {
25             int **target_field; /* &ctx->is_freed_ptr */
26             int *is_freed;
27             } ifp_guard;
28              
29 1101           static void ifp_guard_destroy(pTHX_ void *p) {
30 1101           ifp_guard *g = (ifp_guard *)p;
31 1101 100         if (!*(g->is_freed)) {
32             /* cleanup didn't run — ctx still alive, safe to clear field */
33 21           *(g->target_field) = NULL;
34             }
35 1101           Safefree(g->is_freed);
36 1101           Safefree(g);
37 1101           }
38              
39             typedef struct {
40             AV *tasks;
41             SV *final_cb;
42             I32 current_idx;
43             int running;
44             int delayed;
45             I32 total_tasks;
46             int unsafe;
47             CV *current_cv;
48             int *is_freed_ptr;
49             } series_ctx;
50              
51             typedef struct {
52             AV *tasks;
53             SV *final_cb;
54             I32 remaining;
55             I32 current_idx;
56             I32 total_tasks;
57             I32 limit;
58             I32 active;
59             int unsafe;
60             int running;
61             int delayed;
62             CV **cvs;
63             I32 num_cvs;
64             CV *shared_cv;
65             int *is_freed_ptr;
66             } plimit_ctx;
67              
68             typedef struct {
69             AV *tasks;
70             SV *final_cb;
71             int settled;
72             CV **cvs;
73             I32 num_cvs;
74             CV *shared_cv;
75             int *is_freed_ptr;
76             } race_ctx;
77              
78 24           static void parallel_cleanup(pTHX_ parallel_ctx **ctx_ptr) {
79 24 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
80 24           parallel_ctx *ctx = *ctx_ptr;
81 24           *ctx_ptr = NULL;
82              
83 24 100         if (ctx->is_freed_ptr) {
84 20           *(ctx->is_freed_ptr) = 1;
85             }
86              
87 24 100         if (ctx->shared_cv) {
88 4           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
89 4           SvREFCNT_dec((SV*)ctx->shared_cv);
90             }
91 24 100         if (ctx->cvs) {
92             I32 i;
93 6053 100         for (i = 0; i < ctx->num_cvs; i++) {
94 6033 100         if (ctx->cvs[i]) {
95 6023           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
96 6023           SvREFCNT_dec((SV*)ctx->cvs[i]);
97             }
98             }
99 20           Safefree(ctx->cvs);
100             }
101 24 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
102 24 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
103 24           Safefree(ctx);
104             }
105              
106 6127           static void parallel_task_done(pTHX_ CV *cv) {
107 6127           dXSARGS;
108 6127 50         SvREFCNT_inc_simple_void(cv);
109 6127           sv_2mortal((SV*)cv);
110 6127           parallel_ctx *ctx = (parallel_ctx *)CvXSUBANY(cv).any_ptr;
111 6127 100         if (!ctx) {
112 2           XSRETURN_EMPTY;
113             }
114              
115 6125 100         if (ctx->cvs) {
116 6019           CvXSUBANY(cv).any_ptr = NULL;
117             }
118              
119 6125 100         if (--ctx->remaining <= 0) {
120 17           SV *cb = ctx->final_cb;
121 32 50         if (IS_PVCV(cb)) {
    100          
    50          
122 16           dSP;
123 16           ENTER;
124 16           SAVETMPS;
125 16           SvREFCNT_inc(cb);
126 16           sv_2mortal(cb);
127 16 50         PUSHMARK(SP);
128 16           PUTBACK;
129              
130 16           parallel_cleanup(aTHX_ &ctx);
131              
132 16           call_sv(cb, G_DISCARD | G_VOID);
133              
134 15 50         FREETMPS;
135 15           LEAVE;
136             } else {
137 1           parallel_cleanup(aTHX_ &ctx);
138             }
139             }
140 6124           XSRETURN_EMPTY;
141             }
142              
143 31           static void series_cleanup(pTHX_ series_ctx **ctx_ptr) {
144 31 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
145 31           series_ctx *ctx = *ctx_ptr;
146 31           *ctx_ptr = NULL;
147              
148 31 100         if (ctx->is_freed_ptr) {
149 26           *(ctx->is_freed_ptr) = 1;
150             }
151              
152 31 100         if (ctx->current_cv) {
153 8           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
154 8           SvREFCNT_dec((SV*)ctx->current_cv);
155             }
156 31 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
157 31 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
158 31           Safefree(ctx);
159             }
160              
161             static void _series_next(pTHX_ series_ctx **ctx_ptr);
162              
163 5040           static void series_next_cb(pTHX_ CV *cv) {
164 5040           dXSARGS;
165 5040 50         SvREFCNT_inc_simple_void(cv);
166 5040           sv_2mortal((SV*)cv);
167 5040           series_ctx *ctx = (series_ctx *)CvXSUBANY(cv).any_ptr;
168 5040 100         if (!ctx) {
169 3           XSRETURN_EMPTY;
170             }
171              
172 5037 100         if (!ctx->unsafe) {
173 5031           CvXSUBANY(cv).any_ptr = NULL;
174 5031 50         if (ctx->current_cv == cv) {
175 5031           ctx->current_cv = NULL;
176 5031           SvREFCNT_dec((SV*)cv);
177             }
178             }
179              
180 5037 100         if (items > 0 && SvTRUE(ST(0))) {
    50          
181 3           ctx->current_idx = ctx->total_tasks;
182             }
183 5037           _series_next(aTHX_ &ctx);
184 5037           XSRETURN_EMPTY;
185             }
186              
187 5070           static void _series_next(pTHX_ series_ctx **ctx_ptr) {
188 5070 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
189 5070           series_ctx *ctx = *ctx_ptr;
190              
191 5070 100         if (ctx->running) {
192 5028           ctx->delayed = 1;
193 5028           return;
194             }
195              
196 42           ctx->running = 1;
197 42           ctx->delayed = 1;
198              
199 5090 100         while (ctx->delayed) {
200 5080           ctx->delayed = 0;
201 5080 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
202 5080 100         if (ctx->current_idx >= ctx->total_tasks) {
203 26           SV *cb = ctx->final_cb;
204 49 50         if (IS_PVCV(cb)) {
    100          
    50          
205 25           dSP;
206 25           ENTER;
207 25           SAVETMPS;
208 25           SvREFCNT_inc(cb);
209 25           sv_2mortal(cb);
210 25 50         PUSHMARK(SP);
211 25           PUTBACK;
212 25           series_cleanup(aTHX_ ctx_ptr);
213 25           call_sv(cb, G_DISCARD | G_VOID);
214 23 50         FREETMPS;
215 23           LEAVE;
216             } else {
217 1           series_cleanup(aTHX_ ctx_ptr);
218             }
219 24           return;
220             }
221              
222 5051 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
223 5051           ? &task_ary[ctx->current_idx]
224 10105 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
225 5054 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
226              
227 5054           ctx->current_idx++;
228              
229 10091 100         if (IS_PVCV(task_sv)) {
    100          
    50          
230 5043 100         if (!ctx->unsafe) {
231 5037 50         if (ctx->current_cv) {
232 0           CvXSUBANY(ctx->current_cv).any_ptr = NULL;
233 0           SvREFCNT_dec((SV*)ctx->current_cv);
234             }
235 5037           CV *cv = newXS(NULL, series_next_cb, __FILE__);
236 5037           CvXSUBANY(cv).any_ptr = ctx;
237 5037           ctx->current_cv = (CV*)SvREFCNT_inc((SV*)cv);
238 6 100         } else if (!ctx->current_cv) {
239 4           ctx->current_cv = newXS(NULL, series_next_cb, __FILE__);
240 4           CvXSUBANY(ctx->current_cv).any_ptr = ctx;
241             }
242              
243 5043           SV *next_rv = NULL;
244 5043           dSP;
245 5043           ENTER;
246 5043           SAVETMPS;
247 5043 100         if (!ctx->unsafe) {
248 5037           next_rv = sv_2mortal(newRV_noinc((SV*)ctx->current_cv));
249             } else {
250 6           next_rv = sv_2mortal(newRV_inc((SV*)ctx->current_cv));
251             }
252 5043 50         PUSHMARK(SP);
253 5043 50         XPUSHs(next_rv);
254 5043           PUTBACK;
255            
256 5043 100         U32 flags = G_DISCARD | (ctx->unsafe ? 0 : G_EVAL);
257 5043           call_sv(task_sv, flags);
258              
259 5042 100         if (!ctx->unsafe) {
260 5037           SPAGAIN;
261 5037 50         if (SvTRUE(ERRSV)) {
    100          
262 5 50         SV *err = sv_mortalcopy(ERRSV);
263 5           series_cleanup(aTHX_ ctx_ptr);
264 5           croak_sv(err);
265             }
266             }
267              
268 5037 50         FREETMPS;
269 5037           LEAVE;
270 5037 50         if (!*ctx_ptr) return;
271             } else {
272 11           ctx->delayed = 1;
273             }
274             }
275              
276 10 50         if (*ctx_ptr) (*ctx_ptr)->running = 0;
277             }
278              
279 25           static void plimit_cleanup(pTHX_ plimit_ctx **ctx_ptr) {
280 25 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
281 25           plimit_ctx *ctx = *ctx_ptr;
282 25           *ctx_ptr = NULL;
283              
284 25 100         if (ctx->is_freed_ptr) {
285 21           *(ctx->is_freed_ptr) = 1;
286             }
287              
288 25 100         if (ctx->shared_cv) {
289 4           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
290 4           SvREFCNT_dec((SV*)ctx->shared_cv);
291             }
292 25 100         if (ctx->cvs) {
293             I32 i;
294 1070 100         for (i = 0; i < ctx->num_cvs; i++) {
295 1049 100         if (ctx->cvs[i]) {
296 1037           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
297 1037           SvREFCNT_dec((SV*)ctx->cvs[i]);
298             }
299             }
300 21           Safefree(ctx->cvs);
301             }
302 25 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
303 25 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
304 25           Safefree(ctx);
305             }
306              
307             static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr);
308              
309 1142           static void plimit_task_done(pTHX_ CV *cv) {
310 1142           dXSARGS;
311 1142 50         SvREFCNT_inc_simple_void(cv);
312 1142           sv_2mortal((SV*)cv);
313 1142           plimit_ctx *ctx = (plimit_ctx *)CvXSUBANY(cv).any_ptr;
314 1142 100         if (!ctx) {
315 2           XSRETURN_EMPTY;
316             }
317              
318 1140 100         if (ctx->cvs) {
319 1033           CvXSUBANY(cv).any_ptr = NULL;
320             }
321              
322 1140           ctx->active--;
323              
324 1140 100         if (--ctx->remaining <= 0) {
325 17           SV *cb = ctx->final_cb;
326 32 50         if (IS_PVCV(cb)) {
    100          
    50          
327 16           dSP;
328 16           ENTER;
329 16           SAVETMPS;
330 16           SvREFCNT_inc(cb);
331 16           sv_2mortal(cb);
332 16 50         PUSHMARK(SP);
333 16           PUTBACK;
334              
335 16           plimit_cleanup(aTHX_ &ctx);
336              
337 16           call_sv(cb, G_DISCARD | G_VOID);
338              
339 15 50         FREETMPS;
340 15           LEAVE;
341             } else {
342 1           plimit_cleanup(aTHX_ &ctx);
343             }
344             } else {
345 1123           _plimit_dispatch(aTHX_ &ctx);
346             }
347 1139           XSRETURN_EMPTY;
348             }
349              
350 1148           static void _plimit_dispatch(pTHX_ plimit_ctx **ctx_ptr) {
351 2259 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
352 1148           plimit_ctx *ctx = *ctx_ptr;
353              
354 1148 100         if (ctx->running) {
355 1111           ctx->delayed = 1;
356 1111           return;
357             }
358              
359 37           int is_freed = 0;
360 37           int *old_is_freed_ptr = ctx->is_freed_ptr;
361 37           ctx->is_freed_ptr = &is_freed;
362              
363 37           ctx->running = 1;
364 37           ctx->delayed = 1;
365              
366 52 100         while (ctx->delayed) {
367 37           ctx->delayed = 0;
368              
369 1171 50         while (!is_freed && ctx->active < ctx->limit && ctx->current_idx < ctx->total_tasks) {
    100          
    100          
370 1156 50         SV **task_ary = (AvREAL(ctx->tasks) && !SvMAGICAL(ctx->tasks)) ? AvARRAY(ctx->tasks) : NULL;
    100          
371 1153 50         SV **fetch_ptr = (task_ary && ctx->current_idx <= AvFILL(ctx->tasks))
    50          
372 1153           ? &task_ary[ctx->current_idx]
373 2309 100         : av_fetch(ctx->tasks, ctx->current_idx, 0);
374 1156 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
375              
376 1156           ctx->current_idx++;
377              
378 2282 100         if (IS_PVCV(task_sv)) {
    100          
    50          
379 1144           SV *done_rv = NULL;
380 1144           CV *cv = NULL;
381 1144           dSP;
382              
383 1144 100         if (!ctx->unsafe) {
384 1037           cv = newXS(NULL, plimit_task_done, __FILE__);
385 1037           CvXSUBANY(cv).any_ptr = ctx;
386 1037           ctx->cvs[ctx->current_idx - 1] = (CV*)SvREFCNT_inc((SV*)cv);
387             }
388              
389 1144           ctx->active++;
390 1144           int task_unsafe = ctx->unsafe;
391              
392 1144           ENTER;
393 1144           SAVETMPS;
394 1144 100         if (!ctx->unsafe) {
395 1037           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
396             } else {
397 107           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
398             }
399 1144 50         PUSHMARK(SP);
400 1144 50         XPUSHs(done_rv);
401 1144           PUTBACK;
402              
403 1144 100         U32 flags = G_DISCARD | (task_unsafe ? 0 : G_EVAL);
404 1144           call_sv(task_sv, flags);
405              
406 1143 100         if (!task_unsafe) {
407 1037           SPAGAIN;
408 1037 50         if (SvTRUE(ERRSV)) {
    100          
409 5 50         SV *err = sv_mortalcopy(ERRSV);
410 5 100         if (!is_freed) {
411 4           plimit_cleanup(aTHX_ ctx_ptr);
412             }
413 5 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
414 5           croak_sv(err);
415             }
416             }
417              
418 1138 50         FREETMPS;
419 1138           LEAVE;
420 1138 100         if (is_freed) goto done;
421             } else {
422 12 100         if (--ctx->remaining <= 0) {
423 4           SV *cb = ctx->final_cb;
424 7 50         if (IS_PVCV(cb)) {
    50          
    50          
425 4           dSP;
426 4           ENTER;
427 4           SAVETMPS;
428 4           SvREFCNT_inc(cb);
429 4           sv_2mortal(cb);
430 4 50         PUSHMARK(SP);
431 4           PUTBACK;
432 4           plimit_cleanup(aTHX_ ctx_ptr);
433 4 50         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
434 4           call_sv(cb, G_DISCARD | G_VOID);
435 3 50         FREETMPS;
436 3           LEAVE;
437             } else {
438 0           plimit_cleanup(aTHX_ ctx_ptr);
439 0 0         if (old_is_freed_ptr) *old_is_freed_ptr = 1;
440             }
441 3           goto done;
442             }
443 8           ctx->delayed = 1;
444             }
445             }
446             }
447              
448 15 50         if (!is_freed) {
449 15           ctx->is_freed_ptr = old_is_freed_ptr;
450 15           ctx->running = 0;
451             }
452 0           done:
453 30 100         if (is_freed && old_is_freed_ptr) {
    50          
454 15           *old_is_freed_ptr = 1;
455             }
456             }
457              
458 1018           static void race_cleanup(pTHX_ race_ctx **ctx_ptr) {
459 1018 50         if (!ctx_ptr || !*ctx_ptr) return;
    50          
460 1018           race_ctx *ctx = *ctx_ptr;
461 1018           *ctx_ptr = NULL;
462              
463 1018 100         if (ctx->is_freed_ptr) {
464 1013           *(ctx->is_freed_ptr) = 1;
465             }
466              
467 1018 100         if (ctx->shared_cv) {
468 4           CvXSUBANY(ctx->shared_cv).any_ptr = NULL;
469 4           SvREFCNT_dec((SV*)ctx->shared_cv);
470             }
471 1018 100         if (ctx->cvs) {
472             I32 i;
473 3040 100         for (i = 0; i < ctx->num_cvs; i++) {
474 2026 100         if (ctx->cvs[i]) {
475 1017           CvXSUBANY(ctx->cvs[i]).any_ptr = NULL;
476 1017           SvREFCNT_dec((SV*)ctx->cvs[i]);
477             }
478             }
479 1014           Safefree(ctx->cvs);
480             }
481 1018 50         if (ctx->tasks) SvREFCNT_dec((SV*)ctx->tasks);
482 1018 50         if (ctx->final_cb) SvREFCNT_dec(ctx->final_cb);
483 1018           Safefree(ctx);
484             }
485              
486 1019           static void race_task_done(pTHX_ CV *cv) {
487 1019           dXSARGS;
488 1019 50         SvREFCNT_inc_simple_void(cv);
489 1019           sv_2mortal((SV*)cv);
490 1019           race_ctx *ctx = (race_ctx *)CvXSUBANY(cv).any_ptr;
491 1019 100         if (!ctx || ctx->settled) {
    50          
492 5           XSRETURN_EMPTY;
493             }
494 1014           ctx->settled = 1;
495              
496 1014 100         if (ctx->cvs) {
497 1010           CvXSUBANY(cv).any_ptr = NULL;
498             }
499              
500 1014           SV *cb = ctx->final_cb;
501 2026 50         if (IS_PVCV(cb)) {
    100          
    50          
502 1013           dSP;
503 1013           ENTER;
504 1013           SAVETMPS;
505 1013           SvREFCNT_inc(cb);
506 1013           sv_2mortal(cb);
507              
508 1013 50         PUSHMARK(SP);
509 2026 100         for (I32 i = 0; i < items; i++) {
510 1013 50         XPUSHs(sv_mortalcopy(ST(i)));
511             }
512 1013           PUTBACK;
513              
514 1013           race_cleanup(aTHX_ &ctx);
515              
516 1013           call_sv(cb, G_DISCARD | G_VOID);
517              
518 1012 50         FREETMPS;
519 1012           LEAVE;
520             } else {
521 1           race_cleanup(aTHX_ &ctx);
522             }
523 1013           XSRETURN_EMPTY;
524             }
525              
526             MODULE = EV::Future PACKAGE = EV::Future
527              
528             PROTOTYPES: DISABLE
529              
530             BOOT:
531 8 50         I_EV_API ("EV::Future");
    50          
    50          
532              
533             void
534             parallel(tasks, final_cb, ...)
535             AV *tasks
536             SV *final_cb
537             CODE:
538 26           int unsafe = 0;
539 26 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
540              
541 26           I32 len = av_len(tasks) + 1;
542 26 100         if (len <= 0) {
543 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
544 1           dSP;
545 1           ENTER;
546 1           SAVETMPS;
547 1 50         PUSHMARK(SP);
548 1           PUTBACK;
549 1           call_sv(final_cb, G_DISCARD | G_VOID);
550 1 50         FREETMPS;
551 1           LEAVE;
552             }
553 1           return;
554             }
555              
556 25           ENTER;
557              
558             int *is_freed;
559 25           Newxz(is_freed, 1, int);
560              
561             parallel_ctx *ctx;
562 25           Newx(ctx, 1, parallel_ctx);
563 25           ctx->is_freed_ptr = is_freed;
564 25           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
565 25           ctx->final_cb = SvREFCNT_inc(final_cb);
566 25           ctx->remaining = len;
567 25           ctx->cvs = NULL;
568 25           ctx->num_cvs = 0;
569 25           ctx->shared_cv = NULL;
570              
571             ifp_guard *guard;
572 25           Newx(guard, 1, ifp_guard);
573 25           guard->target_field = &ctx->is_freed_ptr;
574 25           guard->is_freed = is_freed;
575 25           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
576              
577 25           SV *done_rv = NULL;
578 25 100         if (unsafe) {
579 4           ctx->shared_cv = newXS(NULL, parallel_task_done, __FILE__);
580 4           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
581 4           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
582             } else {
583 21           ctx->num_cvs = len;
584 21           Newxz(ctx->cvs, len, CV*);
585             }
586              
587 25           dSP;
588              
589             I32 i;
590 25 100         U32 flags = G_DISCARD | (unsafe ? 0 : G_EVAL);
591              
592 6157 100         for (i = 0; i < len; i++) {
593 6141 50         if (*is_freed) break;
594              
595 6141 50         SV **task_ary = (AvREAL(tasks) && !SvMAGICAL(tasks)) ? AvARRAY(tasks) : NULL;
    100          
596 6138 50         SV **fetch_ptr = (task_ary && i <= AvFILL(tasks))
    100          
597 6137           ? &task_ary[i]
598 12279 100         : av_fetch(tasks, i, 0);
599 6141 100         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
600              
601 12266 100         if (IS_PVCV(task_sv)) {
    100          
    50          
602 6130           CV *cv = NULL;
603 6130 100         if (!unsafe) {
604 6024           cv = newXS(NULL, parallel_task_done, __FILE__);
605 6024           CvXSUBANY(cv).any_ptr = ctx;
606 6024           ctx->cvs[i] = (CV*)SvREFCNT_inc((SV*)cv);
607             }
608              
609 6130           ENTER;
610 6130           SAVETMPS;
611 6130 100         if (!unsafe) {
612 6024           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
613             }
614 6130 50         PUSHMARK(SP);
615 6130 50         XPUSHs(done_rv);
616 6130           PUTBACK;
617              
618 6130           call_sv(task_sv, flags);
619 6129           SPAGAIN;
620 6129 100         if (!unsafe) {
621 6024 50         if (SvTRUE(ERRSV)) {
    100          
622 4 50         SV *err = sv_mortalcopy(ERRSV);
623 4 100         if (!*is_freed) {
624 3           parallel_cleanup(aTHX_ &ctx);
625             }
626 4           croak_sv(err);
627             }
628             }
629              
630 6125 100         FREETMPS;
631 6125           LEAVE;
632             } else {
633 11 100         if (--ctx->remaining <= 0) {
634 4           SV *cb = ctx->final_cb;
635 4 50         if (IS_PVCV(cb)) {
    50          
    50          
636 4           ENTER;
637 4           SAVETMPS;
638 4           SvREFCNT_inc(cb);
639 4           sv_2mortal(cb);
640 4 50         PUSHMARK(SP);
641 4           PUTBACK;
642 4           parallel_cleanup(aTHX_ &ctx);
643 4           call_sv(cb, G_DISCARD | G_VOID);
644 3 50         FREETMPS;
645 3           LEAVE;
646             } else {
647 0           parallel_cleanup(aTHX_ &ctx);
648             }
649 3           break;
650             }
651             }
652             }
653 19           LEAVE;
654              
655             void
656             series(tasks, final_cb, ...)
657             AV *tasks
658             SV *final_cb
659             CODE:
660 34           int unsafe = 0;
661 34 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
662              
663 34           I32 len = av_len(tasks) + 1;
664 34 100         if (len <= 0) {
665 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
666 1           dSP;
667 1           ENTER;
668 1           SAVETMPS;
669 1 50         PUSHMARK(SP);
670 1           PUTBACK;
671 1           call_sv(final_cb, G_DISCARD | G_VOID);
672 1 50         FREETMPS;
673 1           LEAVE;
674             }
675 1           return;
676             }
677              
678 33           ENTER;
679              
680             int *is_freed;
681 33           Newxz(is_freed, 1, int);
682              
683             series_ctx *ctx;
684 33           Newx(ctx, 1, series_ctx);
685 33           ctx->is_freed_ptr = is_freed;
686 33           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
687 33           ctx->final_cb = SvREFCNT_inc(final_cb);
688 33           ctx->current_idx = 0;
689 33           ctx->running = 0;
690 33           ctx->delayed = 0;
691 33           ctx->total_tasks = len;
692 33           ctx->unsafe = unsafe;
693 33           ctx->current_cv = NULL;
694              
695             ifp_guard *guard;
696 33           Newx(guard, 1, ifp_guard);
697 33           guard->target_field = &ctx->is_freed_ptr;
698 33           guard->is_freed = is_freed;
699 33           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
700              
701 33           _series_next(aTHX_ &ctx);
702              
703 25           LEAVE;
704              
705             void
706             parallel_limit(tasks, limit, final_cb, ...)
707             AV *tasks
708             I32 limit
709             SV *final_cb
710             CODE:
711 26           int unsafe = 0;
712 26 100         if (items > 3 && SvTRUE(ST(3))) unsafe = 1;
    50          
713              
714 26           I32 len = av_len(tasks) + 1;
715 26 100         if (len <= 0) {
716 1 50         if (IS_PVCV(final_cb)) {
    50          
    50          
717 1           dSP;
718 1           ENTER;
719 1           SAVETMPS;
720 1 50         PUSHMARK(SP);
721 1           PUTBACK;
722 1           call_sv(final_cb, G_DISCARD | G_VOID);
723 1 50         FREETMPS;
724 1           LEAVE;
725             }
726 1           return;
727             }
728              
729 25 100         if (limit < 1) limit = 1;
730 25 100         if (limit > len) limit = len;
731              
732 25           ENTER;
733              
734             int *is_freed;
735 25           Newxz(is_freed, 1, int);
736              
737             plimit_ctx *ctx;
738 25           Newx(ctx, 1, plimit_ctx);
739 25           ctx->is_freed_ptr = is_freed;
740 25           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
741 25           ctx->final_cb = SvREFCNT_inc(final_cb);
742 25           ctx->remaining = len;
743 25           ctx->current_idx = 0;
744 25           ctx->total_tasks = len;
745 25           ctx->limit = limit;
746 25           ctx->active = 0;
747 25           ctx->unsafe = unsafe;
748 25           ctx->running = 0;
749 25           ctx->delayed = 0;
750 25           ctx->cvs = NULL;
751 25           ctx->num_cvs = 0;
752 25           ctx->shared_cv = NULL;
753              
754             ifp_guard *guard;
755 25           Newx(guard, 1, ifp_guard);
756 25           guard->target_field = &ctx->is_freed_ptr;
757 25           guard->is_freed = is_freed;
758 25           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
759              
760 25 100         if (unsafe) {
761 4           ctx->shared_cv = newXS(NULL, plimit_task_done, __FILE__);
762 4           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
763             } else {
764 21           ctx->num_cvs = len;
765 21           Newxz(ctx->cvs, len, CV*);
766             }
767              
768 25           _plimit_dispatch(aTHX_ &ctx);
769              
770 18           LEAVE;
771              
772             void
773             race(tasks, final_cb, ...)
774             AV *tasks
775             SV *final_cb
776             CODE:
777 1020           int unsafe = 0;
778 1020 100         if (items > 2 && SvTRUE(ST(2))) unsafe = 1;
    50          
779              
780 1020           I32 len = av_len(tasks) + 1;
781 1020 100         if (len <= 0) {
782 2 50         if (IS_PVCV(final_cb)) {
    100          
    50          
783 1           dSP;
784 1           ENTER;
785 1           SAVETMPS;
786 1 50         PUSHMARK(SP);
787 1           PUTBACK;
788 1           call_sv(final_cb, G_DISCARD | G_VOID);
789 1 50         FREETMPS;
790 1           LEAVE;
791             }
792 2           return;
793             }
794              
795 1018           ENTER;
796              
797             int *is_freed;
798 1018           Newxz(is_freed, 1, int);
799              
800             race_ctx *ctx;
801 1018           Newx(ctx, 1, race_ctx);
802 1018           ctx->is_freed_ptr = is_freed;
803 1018           ctx->tasks = (AV*)SvREFCNT_inc((SV*)tasks);
804 1018           ctx->final_cb = SvREFCNT_inc(final_cb);
805 1018           ctx->settled = 0;
806 1018           ctx->cvs = NULL;
807 1018           ctx->num_cvs = 0;
808 1018           ctx->shared_cv = NULL;
809              
810             ifp_guard *guard;
811 1018           Newx(guard, 1, ifp_guard);
812 1018           guard->target_field = &ctx->is_freed_ptr;
813 1018           guard->is_freed = is_freed;
814 1018           SAVEDESTRUCTOR_X(ifp_guard_destroy, guard);
815              
816 1018           SV *done_rv = NULL;
817 1018 100         if (unsafe) {
818 4           ctx->shared_cv = newXS(NULL, race_task_done, __FILE__);
819 4           CvXSUBANY(ctx->shared_cv).any_ptr = ctx;
820 4           done_rv = sv_2mortal(newRV_inc((SV*)ctx->shared_cv));
821             } else {
822 1014           ctx->num_cvs = len;
823 1014           Newxz(ctx->cvs, len, CV*);
824             }
825              
826 1018           dSP;
827              
828             I32 i;
829 1018 100         U32 flags = G_DISCARD | (unsafe ? 0 : G_EVAL);
830              
831 2037 100         for (i = 0; i < len; i++) {
832 2029 100         if (*is_freed || ctx->settled) break;
    50          
833              
834 1025 50         SV **task_ary = (AvREAL(tasks) && !SvMAGICAL(tasks)) ? AvARRAY(tasks) : NULL;
    50          
835 1025 50         SV **fetch_ptr = (task_ary && i <= AvFILL(tasks))
    50          
836 1025           ? &task_ary[i]
837 2050 50         : av_fetch(tasks, i, 0);
838 1025 50         SV *task_sv = fetch_ptr ? *fetch_ptr : NULL;
839              
840 2044 100         if (IS_PVCV(task_sv)) {
    100          
    50          
841 1023           CV *cv = NULL;
842 1023 100         if (!unsafe) {
843 1017           cv = newXS(NULL, race_task_done, __FILE__);
844 1017           CvXSUBANY(cv).any_ptr = ctx;
845 1017           ctx->cvs[i] = (CV*)SvREFCNT_inc((SV*)cv);
846             }
847              
848 1023           ENTER;
849 1023           SAVETMPS;
850 1023 100         if (!unsafe) {
851 1017           done_rv = sv_2mortal(newRV_noinc((SV*)cv));
852             }
853 1023 50         PUSHMARK(SP);
854 1023 50         XPUSHs(done_rv);
855 1023           PUTBACK;
856              
857 1023           call_sv(task_sv, flags);
858 1022           SPAGAIN;
859 1022 100         if (!unsafe) {
860 1017 50         if (SvTRUE(ERRSV)) {
    100          
861 3 50         SV *err = sv_mortalcopy(ERRSV);
862 3 100         if (!*is_freed) {
863 2           race_cleanup(aTHX_ &ctx);
864             }
865 3           croak_sv(err);
866             }
867             }
868              
869 1019 100         FREETMPS;
870 1019           LEAVE;
871             } else {
872 2           ctx->settled = 1;
873 2           SV *cb = ctx->final_cb;
874 2 50         if (IS_PVCV(cb)) {
    50          
    50          
875 2           ENTER;
876 2           SAVETMPS;
877 2           SvREFCNT_inc(cb);
878 2           sv_2mortal(cb);
879 2 50         PUSHMARK(SP);
880 2           PUTBACK;
881 2           race_cleanup(aTHX_ &ctx);
882 2           call_sv(cb, G_DISCARD | G_VOID);
883 2 50         FREETMPS;
884 2           LEAVE;
885             } else {
886 0           race_cleanup(aTHX_ &ctx);
887             }
888 2           break;
889             }
890             }
891 1014           LEAVE;