File Coverage

blib/lib/Acme/Parataxis.pm
Criterion Covered Total %
statement 174 217 80.1
branch 48 96 50.0
condition 41 90 45.5
subroutine 27 34 79.4
pod 16 18 88.8
total 306 455 67.2


line stmt bran cond sub pod time code
1 15     15   3029981 use v5.40;
  15         79  
2 15     15   9149 use experimental qw[class try];
  15         68908  
  15         108  
3              
4             package Acme::Parataxis v0.0.10 {
5 15     15   14541 use Affix;
  15         959367  
  15         4547  
6 15     15   158 use Config;
  15         49  
  15         690  
7 15     15   97 use File::Spec;
  15         56  
  15         494  
8 15     15   89 use File::Basename qw[dirname];
  15         26  
  15         981  
9 15     15   838 use Time::HiRes qw[usleep];
  15         1822  
  15         172  
10 15     15   1939 use Exporter qw[import];
  15         49  
  15         588  
11 15     15   82 use Carp qw[croak];
  15         30  
  15         19707  
12             our %EXPORT_TAGS = (
13             all => [
14             our @EXPORT_OK
15             = qw[
16             run spawn yield await stop async fiber
17             await_sleep await_read await_write await_core_id
18             current_fid tid root maybe_yield
19             set_max_threads max_threads
20             ]
21             ]
22             );
23             #
24             our @IPC_BUFFER;
25             my $lib;
26             my @SCHEDULER_QUEUE;
27             my $IS_RUNNING = 0;
28              
29 15     15   97 sub _bind_functions ($l) {
  15         47  
  15         39  
30 15         76 affix $l, 'init_system', [], Int;
31 15         7302 affix $l, 'create_fiber', [ Pointer [SV], Pointer [SV] ], Int;
32 15         4296 affix $l, 'coro_call', [ Int, Pointer [SV] ], Pointer [SV];
33 15         3081 affix $l, 'coro_transfer', [ Int, Pointer [SV] ], Pointer [SV];
34 15         3106 affix $l, 'coro_yield', [ Pointer [SV] ], Pointer [SV];
35 15         2587 affix $l, 'is_finished', [Int], Int;
36 15         2399 affix $l, 'destroy_coro', [Int], Void;
37 15         2393 affix $l, 'force_depth_zero', [ Pointer [SV] ], Void;
38 15         2716 affix $l, 'cleanup', [], Void;
39 15         2084 affix $l, 'get_os_thread_id_export', [], Int;
40 15         2187 affix $l, 'get_current_parataxis_id', [], Int;
41 15         2182 affix $l, 'submit_c_job', [ Int, LongLong, Int ], Int;
42 15         2874 affix $l, 'check_for_completion', [], Int;
43 15         2110 affix $l, 'get_job_result', [Int], Pointer [SV];
44 15         2642 affix $l, 'get_job_coro_id', [Int], Int;
45 15         2509 affix $l, 'free_job_slot', [Int], Void;
46 15         2574 affix $l, 'get_thread_pool_size', [], Int;
47 15         2174 affix $l, 'get_max_thread_pool_size', [], Int;
48 15         2161 affix $l, 'set_max_threads', [Int], Void;
49 15         2602 affix $l, 'set_preempt_threshold', [LongLong], Void;
50 15         3486 affix $l, [ 'maybe_yield' => '_maybe_yield' ], [], Pointer [SV];
51 15         2629 affix $l, 'get_preempt_count', [], LongLong;
52              
53             # Capture the main interpreter context eagerly
54 15         9036 init_system();
55 15 50       45907 if ( $^O eq 'MSWin32' ) {
56 0         0 my $perl_dll = $Config{libperl};
57 0         0 $perl_dll =~ s/^lib//;
58 0         0 $perl_dll =~ s/\.a$//;
59 0         0 $perl_dll .= '.' . $Config{so};
60 0         0 my $p = Affix::load_library($perl_dll);
61 0         0 affix $p, 'win32_get_osfhandle', [Int], LongLong;
62             }
63             }
64              
65             BEGIN {
66 15 50   15   291 my $lib_name = ( $^O eq 'MSWin32' ? '' : 'lib' ) . 'parataxis.' . $Config{so};
67 15         49 my @paths;
68 15         1738 push @paths, File::Spec->catfile( dirname(__FILE__), $lib_name );
69 15         751 push @paths, File::Spec->catfile( dirname(__FILE__), '..', 'arch', 'auto', 'Acme', 'Parataxis', $lib_name );
70 15         703 push @paths, File::Spec->catfile( dirname(__FILE__), '..', '..', 'arch', 'auto', 'Acme', 'Parataxis', $lib_name );
71 15         677 push @paths, File::Spec->catfile( dirname(__FILE__), 'auto', 'Acme', 'Parataxis', $lib_name );
72              
73             # XXX - Local dir check (This is temporary)
74 15         174 push @paths, File::Spec->catfile( '.', $lib_name );
75 15         56 for my $inc (@INC) {
76 180 50       392 next if ref $inc;
77 180         1285 push @paths, File::Spec->catfile( $inc, 'auto', 'Acme', 'Parataxis', $lib_name );
78             }
79 15         36 for my $path (@paths) {
80 45 100       1000 if ( -e $path ) {
81 15         4585 $lib = Affix::load_library($path);
82 15 50       715 last if $lib;
83             }
84             }
85 15 50       85 die 'Could not find or load ' . $lib_name unless $lib;
86 15         84 _bind_functions($lib);
87             }
88              
89             # API aliases and wrappers
90 0     0 1 0 sub fiber : prototype(&) ($code) { spawn( 'Acme::Parataxis', $code ) }
  0         0  
  0         0  
  0         0  
91              
92 0     0 1 0 sub async : prototype(&) ($code) {
  0         0  
  0         0  
93 0         0 my $ret = run($code);
94 0         0 stop();
95 0         0 return $ret;
96             }
97              
98             sub await {
99 0     0 1 0 my $thing = shift;
100 0 0       0 if ( builtin::blessed($thing) ) {
101 0 0       0 return $thing->await if $thing->can('await');
102 0 0       0 return $thing->wait if $thing->can('wait');
103             }
104 0         0 croak 'await() requires a Future or Fiber object';
105             }
106              
107             sub yield {
108 64     64 1 2263 my $invocant = shift;
109 64 100 33     710 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  47   66     661  
      66        
110 47 50       216 unshift @_, $invocant if defined $invocant;
111 47         123 $invocant = 'Acme::Parataxis';
112             }
113 64         892 my $result = coro_yield( \@_ );
114 62 50       274 return unless defined $result;
115 62 50       503 return ( ref $result eq 'ARRAY' ) ? ( wantarray ? @$result : $result->[-1] ) : $result;
    50          
116             }
117              
118             sub spawn {
119 28     28 1 44903 my ( $class, $code ) = @_;
120 28 50       156 if ( ref $class eq 'CODE' ) {
121 0         0 $code = $class;
122 0         0 $class = 'Acme::Parataxis';
123             }
124 28         206 my $future = Acme::Parataxis::Future->new();
125 28         221 my $fiber = Acme::Parataxis->new( code => $code, future => $future );
126 28         77 push @SCHEDULER_QUEUE, $fiber;
127 28         123 return $future;
128             }
129              
130             sub await_sleep {
131 3     3 1 9 my $invocant = shift;
132 3 50 33     32 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  0   33     0  
      33        
133 0 0       0 unshift @_, $invocant if defined $invocant;
134             }
135 3   50     11 my $ms = shift // 0;
136 3 50       120 return 'Queue Full' if submit_c_job( 0, $ms, 0 ) < 0;
137 3         46 return yield('WAITING');
138             }
139              
140             sub await_core_id {
141 0     0 1 0 my $invocant = shift;
142 0 0 0     0 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  0   0     0  
      0        
143 0 0       0 unshift @_, $invocant if defined $invocant;
144             }
145 0 0       0 return 'Queue Full' if submit_c_job( 1, 0, 0 ) < 0;
146 0         0 return yield('WAITING');
147             }
148              
149             sub await_read {
150 32     32 1 1374001 my $invocant = shift;
151 32 50 33     346 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  0   33     0  
      33        
152 0 0       0 unshift @_, $invocant if defined $invocant;
153             }
154 32         111 my ( $fh, $timeout ) = @_;
155 32   100     120 $timeout //= 5000;
156 32         2725 my $fileno = fileno($fh);
157 32 50       157 die 'Not a valid filehandle' unless defined $fileno;
158 32 50       132 my $handle = $^O eq 'MSWin32' ? win32_get_osfhandle($fileno) : $fileno;
159 32 50       6349 return 'Queue Full' if submit_c_job( 2, $handle, $timeout ) < 0;
160 32         178 return yield('WAITING');
161             }
162              
163             sub await_write {
164 12     12 1 3991 my $invocant = shift;
165 12 50 33     95 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  0   33     0  
      33        
166 0 0       0 unshift @_, $invocant if defined $invocant;
167             }
168 12         28 my ( $fh, $timeout ) = @_;
169 12   50     30 $timeout //= 5000;
170 12         24 my $fileno = fileno($fh);
171 12 50       29 die 'Not a valid filehandle' unless defined $fileno;
172 12 50       51 my $handle = $^O eq 'MSWin32' ? win32_get_osfhandle($fileno) : $fileno;
173 12 50       318 return 'Queue Full' if submit_c_job( 3, $handle, $timeout ) < 0;
174 12         38 return yield('WAITING');
175             }
176              
177             sub maybe_yield {
178 20     20 1 116 my $invocant = shift;
179 20 50 33     113 if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
  0   33     0  
      33        
180 0 0       0 unshift @_, $invocant if defined $invocant;
181             }
182 20         106 my $result = Acme::Parataxis::_maybe_yield();
183 18 100       59 return unless defined $result;
184 2 50       12 return wantarray ? @$result : $result->[-1];
185             }
186 0     0 1 0 sub tid { get_os_thread_id_export() }
187 25     25 1 294 sub current_fid { get_current_parataxis_id() }
188 0   0 0 1 0 sub root { state $root //= Acme::Parataxis::Root->new() }
189 0     0 1 0 sub max_threads () { Acme::Parataxis::get_max_thread_pool_size() }
  0         0  
  0         0  
190              
191             # Scheduler internals
192 11     11   23 sub _scheduler_enqueue_by_id ($fid) {
  11         22  
  11         21  
193 11 50       50 if ( my $fiber = Acme::Parataxis->by_id($fid) ) {
194 11         200 push @SCHEDULER_QUEUE, $fiber;
195             }
196             }
197              
198             sub poll_io {
199 1620     1620 0 3447 my @ready;
200 1620         2453 while (1) {
201 1665         17617 my $job_idx = check_for_completion();
202 1665 100       5877 last if $job_idx == -1;
203 45         239 my $fid = get_job_coro_id($job_idx);
204 45         212 my $res = get_job_result($job_idx);
205 45         184 push @ready, [ $fid, $res ];
206 45         172 free_job_slot($job_idx);
207             }
208 1620         4398 return @ready;
209             }
210              
211 10     10 1 3098212 sub run ($code) {
  10         29  
  10         24  
212 10         33 @SCHEDULER_QUEUE = ();
213 10         26 $IS_RUNNING = 1;
214 10         200 my $main_fiber = Acme::Parataxis->new( code => $code );
215 10         40 push @SCHEDULER_QUEUE, $main_fiber;
216 10         59 while ($IS_RUNNING) {
217 1620         9461 my @ready = poll_io();
218 1620         4616 for my $ready (@ready) {
219 45         235 my ( $fid, $res ) = @$ready;
220 45         207 my $fiber = Acme::Parataxis->by_id($fid);
221 45 50       156 if ($fiber) {
222 45         151 my $yield_val = $fiber->call($res);
223 45 100 66     211 if ( defined $fiber && !$fiber->is_done ) {
224 25 50 33     126 if ( defined $yield_val && $yield_val eq 'WAITING' ) { }
225             else {
226 0         0 push @SCHEDULER_QUEUE, $fiber;
227             }
228             }
229             }
230             }
231 1620 100       4065 if (@SCHEDULER_QUEUE) {
232 50         109 my $current = shift @SCHEDULER_QUEUE;
233 50 50       241 next unless $current;
234 50 50       169 next if $current->is_done;
235 50         218 my $res = $current->call();
236 50 100 66     255 if ( defined $current && !$current->is_done ) {
237 34 100 66     226 if ( defined $res && $res eq 'WAITING' ) { }
238             else {
239 1         2 push @SCHEDULER_QUEUE, $current;
240             }
241             }
242             }
243 1620         3551 my $active_count = scalar keys %Acme::Parataxis::REGISTRY;
244 1620 100 66     9360 if ( defined $main_fiber && $main_fiber->is_done && $active_count == 0 && !@SCHEDULER_QUEUE ) {
      100        
      66        
245 8         19 $IS_RUNNING = 0;
246             }
247 1620 100 100     9613 if ( $IS_RUNNING && !@SCHEDULER_QUEUE && !@ready ) {
      100        
248 1559         1903205 usleep(1000);
249             }
250             }
251             }
252 7     7 1 62662 sub stop () { $IS_RUNNING = 0 }
  7         16  
  7         511  
253             class #
254             Acme::Parataxis {
255 15     15   179 use Carp qw[croak];
  15         32  
  15         27471  
256             field $code : reader : param;
257             field $is_done = 0;
258             field $error : reader;
259             field $result : reader;
260             field $fid : reader;
261             field $future : param = undef;
262              
263             method set_result ($val) {
264             $result = $val;
265             $future->set_result($val) if $future;
266             }
267              
268             method set_error ($err) {
269             $error = $err;
270             $future->set_error($err) if $future;
271             }
272              
273             method _clear_result () {
274             $result = undef;
275             $error = undef;
276             }
277             our %REGISTRY;
278             ADJUST {
279             Acme::Parataxis::force_depth_zero($code);
280             $fid = Acme::Parataxis::create_fiber( $code, $self );
281             $REGISTRY{$fid} = $self;
282             builtin::weaken $REGISTRY{$fid};
283             }
284              
285             method call (@args) {
286             croak 'Cannot call a finished fiber' if $is_done;
287             my $rv = Acme::Parataxis::coro_call( $fid, \@args );
288             return unless defined $self;
289             if ( $self->is_done ) {
290             my $err = $error;
291             die $err if defined $err;
292             }
293             return unless defined $rv;
294             return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
295             }
296              
297             method transfer (@args) {
298             croak 'Cannot transfer to a finished fiber' if $self->is_done;
299             my $rv = Acme::Parataxis::coro_transfer( $fid, \@args );
300             if ( $self->is_done ) {
301             my $err = $error;
302             die $err if defined $err;
303             }
304             return unless defined $rv;
305             return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
306             }
307              
308             method is_done () {
309             return 1 if $is_done;
310             if ( defined $fid && $fid >= 0 && Acme::Parataxis::is_finished($fid) ) {
311             $is_done = 1;
312             my $old_fid = $fid;
313             $fid = -1;
314             delete $REGISTRY{$old_fid};
315             Acme::Parataxis::destroy_coro($old_fid);
316             return 1;
317             }
318             return 0;
319             }
320              
321             method wait () {
322             while ( !$self->is_done ) {
323             Acme::Parataxis->yield('WAITING_FOR_CHILD');
324             }
325             return $self->result;
326             }
327              
328             method DESTROY {
329             return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
330             if ( defined $fid && $fid >= 0 ) {
331             delete $REGISTRY{$fid};
332             Acme::Parataxis::destroy_coro($fid);
333             $fid = -1;
334             }
335             }
336 56     56 0 95 sub by_id ( $class, $fid ) { $REGISTRY{$fid} }
  56         133  
  56         93  
  56         79  
  56         250  
337             }
338             class #
339             Acme::Parataxis::Root {
340              
341             method transfer (@args) {
342             my $rv = Acme::Parataxis::coro_transfer( -1, \@args );
343             return unless defined $rv;
344             return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
345             }
346             method fid () {-1}
347             }
348             class #
349             Acme::Parataxis::Future {
350 15     15   136 use Carp qw[croak];
  15         30  
  15         13497  
351             field $is_ready : reader = 0;
352             field $result;
353             field $error;
354             field @callbacks;
355              
356             method result () {
357             croak 'Future not ready' unless $is_ready;
358             return $result;
359             }
360              
361             method set_result ($val) {
362             die 'Future already ready' if $is_ready;
363             $result = $val;
364             $is_ready = 1;
365             $_->($self) for @callbacks;
366             }
367              
368             method set_error ($err) {
369             die 'Future already ready' if $is_ready;
370             $error = $err;
371             $is_ready = 1;
372             $_->($self) for @callbacks;
373             }
374              
375             method clear_result () {
376             $result = undef;
377             $error = undef;
378             }
379              
380             method on_ready ($cb) {
381             if ($is_ready) { $cb->($self) }
382             else { push @callbacks, $cb }
383             }
384              
385             method await () {
386             return $self->result if $is_ready;
387             my $fid = Acme::Parataxis->current_fid;
388 11         24 $self->on_ready(
389 11     11   25 sub ($f) {
  11         18  
390 11         51 Acme::Parataxis::_scheduler_enqueue_by_id($fid);
391             }
392             );
393             Acme::Parataxis->yield('WAITING');
394             $self->result;
395             }
396             }
397 15 50   15   201073 END { cleanup() unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
398             }
399             1;