File Coverage

blib/lib/MCE/Core/Worker.pm
Criterion Covered Total %
statement 215 324 66.3
branch 101 228 44.3
condition 10 66 15.1
subroutine 14 21 66.6
pod n/a
total 340 639 53.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Core methods for the worker process.
4             ##
5             ## This package provides main, loop, and relevant methods used internally by
6             ## the worker process.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Worker;
13              
14 85     85   794 use strict;
  85         200  
  85         2677  
15 85     85   427 use warnings;
  85         163  
  85         8731  
16              
17             our $VERSION = '1.888';
18              
19             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
20              
21             sub CLONE {
22 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
23             }
24              
25             ## Items below are folded into MCE.
26              
27             package # hide from rpm
28             MCE;
29              
30 85     85   585 no warnings qw( bareword threads recursion uninitialized );
  85         309  
  85         122162  
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Internal do, gather and send related functions for serializing data to
35             ## destination. User functions for handling gather, queue or void.
36             ##
37             ###############################################################################
38              
39             {
40             my (
41             $_dest, $_len, $_tag, $_task_id, $_user_func, $_val, $_wa,
42             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn,
43             $_dat_ex, $_dat_un
44             );
45              
46             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
47              
48             ## Create array structure containing various send functions.
49             my @_dest_function = ();
50              
51             $_dest_function[SENDTO_FILEV2] = sub { ## Content >> File
52              
53             return unless (defined $_val);
54             local $\ = undef if (defined $\);
55              
56             if (length ${ $_[1] }) {
57             my $_buf = $_[0]->{freeze}([ $_val, ${ $_[1] } ]);
58             $_dat_ex->() if $_lock_chn;
59             print({$_DAT_W_SOCK} OUTPUT_F_SND.$LF . $_chn.$LF),
60             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
61             $_dat_un->() if $_lock_chn;
62             }
63              
64             return;
65             };
66              
67             $_dest_function[SENDTO_FD] = sub { ## Content >> File descriptor
68              
69             return unless (defined $_val);
70             local $\ = undef if (defined $\);
71              
72             if (length ${ $_[1] }) {
73             my $_buf = $_[0]->{freeze}([ $_val, ${ $_[1] } ]);
74             $_dat_ex->() if $_lock_chn;
75             print({$_DAT_W_SOCK} OUTPUT_D_SND.$LF . $_chn.$LF),
76             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
77             $_dat_un->() if $_lock_chn;
78             }
79              
80             return;
81             };
82              
83             $_dest_function[SENDTO_STDOUT] = sub { ## Content >> STDOUT
84              
85             local $\ = undef if (defined $\);
86              
87             if (length ${ $_[1] }) {
88             my $_buf = $_[0]->{freeze}($_[1]);
89             $_dat_ex->() if $_lock_chn;
90             print({$_DAT_W_SOCK} OUTPUT_O_SND.$LF . $_chn.$LF),
91             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
92             $_dat_un->() if $_lock_chn;
93             }
94              
95             return;
96             };
97              
98             $_dest_function[SENDTO_STDERR] = sub { ## Content >> STDERR
99              
100             local $\ = undef if (defined $\);
101              
102             if (length ${ $_[1] }) {
103             my $_buf = $_[0]->{freeze}($_[1]);
104             $_dat_ex->() if $_lock_chn;
105             print({$_DAT_W_SOCK} OUTPUT_E_SND.$LF . $_chn.$LF),
106             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
107             $_dat_un->() if $_lock_chn;
108             }
109              
110             return;
111             };
112              
113             ## -------------------------------------------------------------------------
114              
115             sub _do_callback {
116              
117 133     133   300 my ($self, $_buf, $_aref);
118              
119 133         385 ($self, $_val, $_aref) = @_;
120              
121 133 100       1456 unless (defined wantarray) {
    100          
122 126         279 $_wa = WANTS_UNDEF;
123 0         0 } elsif (wantarray) {
124 2         7 $_wa = WANTS_ARRAY;
125             } else {
126 5         20 $_wa = WANTS_SCALAR;
127             }
128              
129 133 50       470 local $\ = undef if (defined $\);
130              
131             ## Crossover: Send arguments
132              
133 133 100       209 if ( ! @{ $_aref } ) {
  133         360  
134 2 50       8 $_dat_ex->() if $_lock_chn;
135 2         149 print({$_DAT_W_SOCK} OUTPUT_N_CBK.$LF . $_chn.$LF),
136 2         29 print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF);
  2         88  
137             }
138             else {
139 131         1630 $_buf = $self->{freeze}($_aref);
140 131         302 $_len = length $_buf;
141              
142 131 50       358 $_dat_ex->() if $_lock_chn;
143 131         4565 print({$_DAT_W_SOCK} OUTPUT_A_CBK.$LF . $_chn.$LF),
144 131         191 print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF . $_len.$LF, $_buf);
  131         2092  
145             }
146              
147             ## Crossover: Receive value
148              
149 133 100       544 if ( $_wa ) {
150 7 50       46 local $/ = $LF if ($/ ne $LF);
151 7         4926 chomp(my $_len = <$_DAU_W_SOCK>);
152              
153 7         105 read $_DAU_W_SOCK, my($_buf), $_len;
154 7 50       142 $_dat_un->() if $_lock_chn;
155              
156             return ( $_wa != WANTS_ARRAY )
157             ? ($self->{thaw}($_buf))->[0]
158 7 100       221 : @{ $self->{thaw}($_buf) };
  2         153  
159             }
160              
161 126 50       715 $_dat_un->() if $_lock_chn;
162             }
163              
164             ## -------------------------------------------------------------------------
165              
166             sub _do_gather {
167              
168 357     357   752 my $_buf; my ($self, $_aref) = @_;
  357         981  
169              
170 357 50       615 return unless (scalar @{ $_aref });
  357         1021  
171              
172 357         1274 $_tag = OUTPUT_A_GTR;
173 357         4251 $_buf = $self->{freeze}($_aref);
174 357         903 $_len = length $_buf;
175              
176 357 50       1308 local $\ = undef if (defined $\);
177              
178 357 50       921 $_dat_ex->() if $_lock_chn;
179 357         10655 print({$_DAT_W_SOCK} $_tag.$LF . $_chn.$LF),
180 357         651 print({$_DAU_W_SOCK} $_task_id.$LF . $_len.$LF, $_buf);
  357         50619  
181 357 50       1721 $_dat_un->() if $_lock_chn;
182              
183 357         3061 return;
184             }
185              
186             ## -------------------------------------------------------------------------
187              
188             sub _do_send {
189              
190 0     0   0 my $_data_ref; my $self = shift;
  0         0  
191              
192 0         0 $_dest = shift; $_val = shift;
  0         0  
193              
194 0 0       0 if (scalar @_ > 1) {
    0          
195 0         0 $_data_ref = \join('', @_);
196             }
197             elsif (my $_ref = ref $_[0]) {
198 0 0       0 if ($_ref eq 'SCALAR') {
    0          
    0          
199 0         0 $_data_ref = $_[0];
200             }
201             elsif ($_ref eq 'ARRAY') {
202 0         0 $_data_ref = \join('', @{ $_[0] });
  0         0  
203             }
204             elsif ($_ref eq 'HASH') {
205 0         0 $_data_ref = \join('', %{ $_[0] });
  0         0  
206             }
207             else {
208 0         0 $_data_ref = \join('', @_);
209             }
210             }
211             else {
212 0         0 $_data_ref = \(''.$_[0]);
213             }
214              
215 0         0 $_dest_function[$_dest]($self, $_data_ref);
216              
217 0         0 return 1;
218             }
219              
220             sub _do_send_glob {
221              
222 0     0   0 my ($self, $_glob, $_fd, $_data_ref) = @_;
223              
224 0 0       0 if ($self->{_wid} > 0) {
225 0 0       0 if ($_fd == 1) {
    0          
226 0         0 _do_send($self, SENDTO_STDOUT, undef, $_data_ref);
227             }
228             elsif ($_fd == 2) {
229 0         0 _do_send($self, SENDTO_STDERR, undef, $_data_ref);
230             }
231             else {
232 0         0 _do_send($self, SENDTO_FD, $_fd, $_data_ref);
233             }
234             }
235             else {
236 85     85   836 use bytes;
  85         195  
  85         662  
237 0         0 my $_fh = _sendto_fhs_get($self, $_fd);
238 0 0       0 local $\ = undef if (defined $\);
239              
240 0         0 print {$_fh} ${ $_data_ref };
  0         0  
  0         0  
241             }
242              
243 0         0 return 1;
244             }
245              
246             ## -------------------------------------------------------------------------
247              
248             sub _do_send_init {
249              
250 65     65   444 my ($self) = @_;
251              
252 65         606 $_chn = $self->{_chn};
253 65         360 $_DAT_LOCK = $self->{_dat_lock};
254 65         505 $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
255 65         551 $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
256 65         355 $_lock_chn = $self->{_lock_chn};
257 65         391 $_task_id = $self->{_task_id};
258              
259 65 50       563 if ($_lock_chn) {
260             # inlined for performance
261             $_dat_ex = sub {
262 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
263             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
264 0 0       0 if $_is_MSWin32;
265             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
266 0 0       0 unless $_DAT_LOCK->{ $_pid };
267 0         0 };
268             $_dat_un = sub {
269 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
270             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
271 0 0       0 if $_DAT_LOCK->{ $_pid };
272 0         0 };
273             }
274              
275             {
276 65         518 local $!;
  65         3004  
277 65 50       4880 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
278 65 50       19536 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
279             }
280              
281 65         3447 return;
282             }
283              
284             sub _do_send_clear {
285              
286 65     65   317 my ($self) = @_;
287              
288 65         327 $_dest = $_len = $_task_id = $_user_func = $_val = $_wa = undef;
289 65         213 $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_lock_chn = undef;
290 65         205 $_dat_ex = $_dat_un = $_tag = undef;
291              
292 65         171 return;
293             }
294              
295             ## -------------------------------------------------------------------------
296              
297             sub _do_user_func {
298              
299 246     246   958 my ($self, $_chunk, $_chunk_id) = @_;
300 246         430 my $_size = 0;
301              
302 246         536 delete $self->{_relayed};
303 246         575 $self->{_chunk_id} = $_chunk_id;
304              
305 246 50 33     752 if ($self->{progress} && $self->{_task_id} == 0) {
306             # use_slurpio
307 0 0 0     0 if (ref $_chunk eq 'SCALAR') {
    0          
    0          
308 0         0 $_size += length ${ $_chunk };
  0         0  
309             }
310             # sequence and bounds_only
311             elsif ($self->{sequence} && $self->{bounds_only}) {
312 0         0 my $_seq = $self->{sequence};
313 0 0       0 my $_step = (ref $_seq eq 'ARRAY') ? $_seq->[2] : $_seq->{step};
314 0         0 $_size += int(abs($_chunk->[0] - $_chunk->[1]) / abs($_step)) + 1;
315             }
316             # workers clear {input_data} to conserve memory when array ref
317             # otherwise, /path/to/infile or scalar reference
318             elsif ($self->{input_data}) {
319 0         0 map { $_size += length } @{ $_chunk };
  0         0  
  0         0  
320             }
321             # array or sequence
322             else {
323 0 0       0 $_size += (ref $_chunk eq 'ARRAY') ? @{ $_chunk } : 1;
  0         0  
324             }
325             }
326              
327 246 50       591 if ($self->{max_retries}) {
328 0         0 $self->{_retry} = [ $_chunk, $_chunk_id, $self->{max_retries} ];
329             }
330              
331 246 0 33     780 if ($self->{loop_timeout} && $self->{_task_id} == 0 &&
      33        
      0        
      0        
332             defined $self->{init_relay} && !$self->{_is_thread} && !$_is_MSWin32) {
333              
334 0 0       0 local $\ = undef if (defined $\);
335              
336 0 0       0 $_dat_ex->() if $_lock_chn;
337 0         0 print({$_DAT_W_SOCK} OUTPUT_C_NFY.$LF . $_chn.$LF),
338 0         0 print({$_DAU_W_SOCK} "$$:$_chunk_id".$LF);
  0         0  
339 0 0       0 $_dat_un->() if $_lock_chn;
340             }
341              
342 246         1402 $_user_func->($self, $_chunk, $_chunk_id);
343              
344 246 50 33     1248 if ($self->{progress} && $self->{_task_id} == 0) {
345 0 0       0 local $\ = undef if (defined $\);
346              
347 0 0       0 $_dat_ex->() if $_lock_chn;
348 0         0 print({$_DAT_W_SOCK} OUTPUT_P_NFY.$LF . $_chn.$LF),
349 0         0 print({$_DAU_W_SOCK} $_size.$LF);
  0         0  
350 0 0       0 $_dat_un->() if $_lock_chn;
351             }
352              
353 246         1055 return;
354             }
355              
356             sub _do_user_func_init {
357              
358 163     163   876 my ($self) = @_;
359              
360 163         535 $_user_func = $self->{user_func};
361              
362 163         10077 return;
363             }
364             }
365              
366             ###############################################################################
367             ## ----------------------------------------------------------------------------
368             ## Worker process -- Do.
369             ##
370             ###############################################################################
371              
372             sub MCE::Core::Worker::_guard::DESTROY {
373              
374 163     163   354 my ($mce, $id) = @{ $_[0] };
  163         617  
375              
376 163 50 33     782 if (defined $mce && $id eq "$$.$_tid") {
377 0         0 @{ $_[0] } = ();
  0         0  
378 0         0 warn "MCE worker $id exited prematurely.\n";
379 0         0 $mce->exit(2);
380             }
381              
382 163         617 return;
383             };
384              
385             sub _worker_do {
386              
387 163     163   960 my ($self, $_params_ref) = @_;
388              
389 163         624 @_ = ();
390              
391             ## Set options.
392 163         753 $self->{_abort_msg} = $_params_ref->{_abort_msg};
393 163         597 $self->{_run_mode} = $_params_ref->{_run_mode};
394 163         1045 $self->{use_slurpio} = $_params_ref->{_use_slurpio};
395 163         512 $self->{parallel_io} = $_params_ref->{_parallel_io};
396 163         742 $self->{progress} = $_params_ref->{_progress};
397 163         1540 $self->{max_retries} = $_params_ref->{_max_retries};
398 163         490 $self->{RS} = $_params_ref->{_RS};
399              
400 163         1075 _do_user_func_init($self);
401              
402             ## Init local vars.
403 163         385 my $_chn = $self->{_chn};
404 163         638 my $_DAT_LOCK = $self->{_dat_lock};
405 163         511 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
406 163         384 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
407 163         611 my $_lock_chn = $self->{_lock_chn};
408 163         500 my $_run_mode = $self->{_run_mode};
409 163         455 my $_task_id = $self->{_task_id};
410 163         932 my $_task_name = $self->{task_name};
411              
412             ## Do not override params if defined in user_tasks during instantiation.
413 163         873 for my $_p (qw(bounds_only chunk_size sequence user_args)) {
414 652 100       2590 if (defined $_params_ref->{"_${_p}"}) {
415             $self->{$_p} = $_params_ref->{"_${_p}"}
416 208 50       1410 unless (defined $self->{_task}->{$_p});
417             }
418             }
419              
420             {
421 163         381 my $_guard = bless([ $self, "$$.$_tid" ], MCE::Core::Worker::_guard::);
  163         3619  
422 163         2350 weaken( $self->{_guard} = $_guard );
423              
424             ## Assign user function.
425 163         891 $self->{_wuf} = \&_do_user_func;
426              
427             ## Call user_begin if defined.
428 163 50       832 if (defined $self->{user_begin}) {
429 0         0 $self->{_chunk_id} = 0;
430 0         0 $self->{user_begin}($self, $_task_id, $_task_name);
431 0 0 0     0 if ($_task_id == 0 && defined $self->{init_relay} && !$self->{_retry}) {
      0        
432 0         0 $self->sync();
433             }
434             }
435              
436             ## Retry chunk if previous attempt died.
437 163 50       21138 if ($self->{_retry}) {
438 0         0 $self->{_chunk_id} = $self->{_retry}->[1];
439 0         0 $self->{user_func}->($self, $self->{_retry}->[0], $self->{_retry}->[1]);
440 0         0 delete $self->{_retry};
441             }
442              
443             ## Call worker function.
444 163 100       12780 if ($_run_mode eq 'sequence') {
    50          
    100          
    100          
    100          
    50          
    100          
    50          
    50          
445             require MCE::Core::Input::Sequence
446 14 100       7489 unless $INC{'MCE/Core/Input/Sequence.pm'};
447 14         93 _worker_sequence_queue($self);
448             }
449             elsif (defined $self->{_task}->{sequence}) {
450             require MCE::Core::Input::Generator
451 0 0       0 unless $INC{'MCE/Core/Input/Generator.pm'};
452 0         0 _worker_sequence_generator($self);
453             }
454             elsif ($_run_mode eq 'array') {
455             require MCE::Core::Input::Request
456 36 50       190 unless $INC{'MCE/Core/Input/Request.pm'};
457 36         498 _worker_request_chunk($self, REQUEST_ARRAY);
458             }
459             elsif ($_run_mode eq 'glob') {
460             require MCE::Core::Input::Request
461 14 50       106 unless $INC{'MCE/Core/Input/Request.pm'};
462 14         75 _worker_request_chunk($self, REQUEST_GLOB);
463             }
464             elsif ($_run_mode eq 'hash') {
465             require MCE::Core::Input::Request
466 3 50       184 unless $INC{'MCE/Core/Input/Request.pm'};
467 3         103 _worker_request_chunk($self, REQUEST_HASH);
468             }
469             elsif ($_run_mode eq 'iterator') {
470             require MCE::Core::Input::Iterator
471 0 0       0 unless $INC{'MCE/Core/Input/Iterator.pm'};
472 0         0 _worker_user_iterator($self);
473             }
474             elsif ($_run_mode eq 'file') {
475             require MCE::Core::Input::Handle
476 14 100       21513 unless $INC{'MCE/Core/Input/Handle.pm'};
477 14         134 _worker_read_handle($self, READ_FILE, $_params_ref->{_input_file});
478             }
479             elsif ($_run_mode eq 'memory') {
480             require MCE::Core::Input::Handle
481 0 0       0 unless $INC{'MCE/Core/Input/Handle.pm'};
482 0         0 _worker_read_handle($self, READ_MEMORY, $self->{input_data});
483             }
484             elsif (defined $self->{user_func}) {
485 82 50       374 if ($self->{max_retries}) {
486 0         0 $self->{_retry} = [ undef, 0, $self->{max_retries} ];
487             }
488 82         339 $self->{_chunk_id} = 0;
489 82         983 $self->{user_func}->($self);
490             }
491              
492 163 100       1752 undef $self->{_next_jmp} if (defined $self->{_next_jmp});
493 163 100       1014 undef $self->{_last_jmp} if (defined $self->{_last_jmp});
494 163 50       710 undef $self->{user_data} if (defined $self->{user_data});
495              
496             ## Call user_end if defined.
497 163 50       516 if (defined $self->{user_end}) {
498 0         0 $self->{_chunk_id} = 0;
499 0 0 0     0 $self->sync() if ($_task_id == 0 && defined $self->{init_relay});
500 0         0 $self->{user_end}($self, $_task_id, $_task_name);
501             }
502              
503 163         311 @{ $_guard } = ();
  163         1987  
504 163         613 delete $self->{_guard};
505 163         1237 delete $self->{_wuf};
506             }
507              
508             ## Check for nested workers not yet joined.
509 163 50       701 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
510 163 50       526 MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
511              
512             ## Notify the main process a worker has completed.
513 163 50       747 local $\ = undef if (defined $\);
514              
515 163 50       12664 $_DAT_LOCK->lock() if $_lock_chn;
516              
517 163         5235 print({$_DAT_W_SOCK} OUTPUT_W_DNE.$LF . $_chn.$LF),
518 163         334 print({$_DAU_W_SOCK} $_task_id.$LF);
  163         2324  
519              
520 163 50       880 $_DAT_LOCK->unlock() if $_lock_chn;
521              
522 163 50       1064 if ($^O eq 'MSWin32') {
523 0         0 lock $self->{_run_lock};
524             }
525              
526 163         656 return;
527             }
528              
529             ###############################################################################
530             ## ----------------------------------------------------------------------------
531             ## Worker process -- Loop.
532             ##
533             ###############################################################################
534              
535             sub _worker_loop {
536              
537 65     65   727 my ($self) = @_;
538              
539 65         364 @_ = ();
540              
541 65         334 my ($_response, $_len, $_buf, $_params_ref);
542              
543 65         318 my $_COM_LOCK = $self->{_com_lock};
544 65         539 my $_COM_W_SOCK = $self->{_com_w_sock};
545 65         284 my $_job_delay = $self->{job_delay};
546              
547 65 50       2391 if ($^O eq 'MSWin32') { lock $MCE::_WIN_LOCK; }
  0         0  
548              
549 65         371 while (1) {
550             {
551 228         539 local $\ = undef; local $/ = $LF;
  228         1876  
  228         3281  
552 228         3028 $_COM_LOCK->lock();
553              
554             ## Wait for the next job request.
555 228         270697 $_response = <$_COM_W_SOCK>, print {$_COM_W_SOCK} $self->{_wid}.$LF;
  228         15224  
556              
557             ## Return if instructed to exit.
558 228 100       2591 $_COM_LOCK->unlock(), return if ($_response eq "_exit\n");
559              
560             ## Process send request.
561 163 50       5380 if ($_response eq "_data\n") {
    50          
562 0         0 chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len);
563 0         0 print({$_COM_W_SOCK} $LF), $_COM_LOCK->unlock();
  0         0  
564 0         0 $self->{user_data} = $self->{thaw}($_buf), undef $_buf;
565              
566             sleep $_job_delay * $self->{_wid}
567 0 0 0     0 if defined($_job_delay) && $_job_delay > 0.0;
568             }
569              
570             ## Process normal request.
571             elsif ($_response =~ /\d+/) {
572 163         21831 chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len);
573 163         1158 print({$_COM_W_SOCK} $LF), $_COM_LOCK->unlock();
  163         10221  
574 163         9286 $_params_ref = $self->{thaw}($_buf), undef $_buf;
575             }
576              
577             ## Leave loop if invalid response.
578             else {
579 0         0 last;
580             }
581             }
582              
583             ## Send request.
584 163 50       1081 _worker_do($self, {}), next if ($_response eq "_data\n");
585              
586             ## Wait here until MCE completes job submission to all workers.
587 163         20016 MCE::Util::_sysread($self->{_bsb_w_sock}, my($_b), 1);
588              
589             ## Normal request.
590             sleep $_job_delay * $self->{_wid}
591 163 50 33     1787 if defined($_job_delay) && $_job_delay > 0.0;
592              
593 163         1375 _worker_do($self, $_params_ref); undef $_params_ref;
  163         990  
594             }
595              
596             ## Notify the main process a worker has ended. The following is executed
597             ## when an invalid reply was received above (not likely to occur).
598              
599 0         0 $_COM_LOCK->unlock();
600              
601 0         0 die "Error: worker $self->{_wid} has ended prematurely";
602             }
603              
604             ###############################################################################
605             ## ----------------------------------------------------------------------------
606             ## Worker process -- Main.
607             ##
608             ###############################################################################
609              
610             sub _worker_main {
611              
612 65     65   837 my ( $self, $_wid, $_task, $_task_id, $_task_wid, $_params,
613             $_plugin_worker_init ) = @_;
614              
615 65         562 @_ = ();
616              
617 65 100       1377 if (exists $self->{input_data}) {
618 27         989 my $_ref = ref $self->{input_data};
619 27 50 33     1640 delete $self->{input_data} if ($_ref && $_ref ne 'SCALAR');
620             }
621              
622 65 100       1404 $self->{_task_id} = (defined $_task_id ) ? $_task_id : 0;
623 65 100       1146 $self->{_task_wid} = (defined $_task_wid) ? $_task_wid : $_wid;
624 65         675 $self->{_task} = $_task;
625 65         634 $self->{_wid} = $_wid;
626              
627             ## Define exit pid and DIE handler.
628             my $_use_threads = (defined $_task->{use_threads})
629 65 100       2186 ? $_task->{use_threads} : $self->{use_threads};
630              
631 65 50 33     1336 if ($INC{'threads.pm'} && $_use_threads) {
632 0         0 $self->{_exit_pid} = 'TID_' . $_tid;
633             } else {
634 65         3369 $self->{_exit_pid} = 'PID_' . $$;
635             }
636              
637 65         2362 my $_running_inside_eval = $^S;
638              
639 65     0   9354 local $SIG{SEGV} = sub { $self->exit(11) };
  0         0  
640              
641             local $SIG{__DIE__} = sub {
642 0 0 0 0   0 if (!defined $^S || $^S) {
643 0 0 0     0 if ( ($INC{'threads.pm'} && $_tid != 0) ||
      0        
      0        
644             $ENV{'PERL_IPERL_RUNNING'} ||
645             $_running_inside_eval
646             ) {
647             # thread env or running inside IPerl, check stack trace
648 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
649 0 0 0     0 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
650             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ )
651             {
652 0         0 CORE::die(@_);
653             }
654             }
655             else {
656             # normal env, trust $^S
657 0         0 CORE::die(@_);
658             }
659             }
660              
661 0         0 $SIG{__DIE__} = $SIG{__WARN__} = sub {};
662 0 0       0 my $_die_msg = (defined $_[0]) ? $_[0] : '';
663 0         0 $_die_msg =~ s/, <__ANONIO__> line \d+//;
664 0         0 local $\ = undef; print {*STDERR} $_die_msg;
  0         0  
  0         0  
665              
666 0         0 $self->exit(255, $_die_msg, $self->{_chunk_id});
667 65         4474 };
668              
669             ## Use options from user_tasks if defined.
670 65 100       3151 $self->{max_workers} = $_task->{max_workers} if ($_task->{max_workers});
671 65 50       701 $self->{chunk_size} = $_task->{chunk_size} if ($_task->{chunk_size});
672 65 100       723 $self->{gather} = $_task->{gather} if ($_task->{gather});
673 65 50       1249 $self->{sequence} = $_task->{sequence} if ($_task->{sequence});
674 65 50       724 $self->{bounds_only} = $_task->{bounds_only} if ($_task->{bounds_only});
675 65 100       792 $self->{task_name} = $_task->{task_name} if ($_task->{task_name});
676 65 50       736 $self->{user_args} = $_task->{user_args} if ($_task->{user_args});
677 65 50       549 $self->{user_begin} = $_task->{user_begin} if ($_task->{user_begin});
678 65 100       1293 $self->{user_func} = $_task->{user_func} if ($_task->{user_func});
679 65 50       621 $self->{user_end} = $_task->{user_end} if ($_task->{user_end});
680              
681             ## Init runtime vars. Obtain handle to lock files.
682 65         330 my $_chn;
683              
684 65 50 33     1164 if (defined $_params && exists $_params->{_chn}) {
685 0         0 $_chn = $self->{_chn} = delete $_params->{_chn}; # worker restarted
686             } else {
687 65         1036 $_chn = $self->{_chn} = $_wid % $self->{_data_channels} + 1; # default
688             }
689              
690             ## Choose locks for DATA channels.
691 65         1125 $self->{_com_lock} = $self->{'_mutex_0'};
692 65 50       1314 $self->{_dat_lock} = $self->{'_mutex_'.$_chn} if ($self->{_lock_chn});
693              
694             ## Delete attributes no longer required after being spawned.
695 65         428 delete @{ $self }{ qw(
  65         2456  
696             flush_file flush_stderr flush_stdout stderr_file stdout_file
697             on_post_exit on_post_run user_data user_error user_output
698             _pids _state _status _thrs _tids
699             ) };
700              
701             ## Call MCE::Shared's init routine if present; enables parallel IPC.
702             ## For threads, init is called automatically via the CLONE feature.
703 65 50 33     1661 MCE::Shared::init() if (!$_use_threads && $INC{'MCE/Shared.pm'});
704              
705 65         1910 _do_send_init($self);
706              
707             ## Call module's worker_init routine for modules plugged into MCE.
708 65         286 for my $_p (@{ $_plugin_worker_init }) { $_p->($self); }
  65         1587  
  28         1184  
709              
710             ## Begin processing if worker was added during processing.
711 65 50       878 _worker_do($self, $_params), undef $_params if defined($_params);
712              
713             ## Enter worker loop.
714 65         1660 _worker_loop($self);
715              
716             ## Clear worker session.
717 65         983 _do_send_clear($self);
718              
719 65         242 $self->{_com_lock} = undef;
720 65         350 $self->{_dat_lock} = undef;
721              
722 65         3394 return;
723             }
724              
725             1;
726              
727             __END__