File Coverage

blib/lib/MCE/Shared/Server.pm
Criterion Covered Total %
statement 462 1008 45.8
branch 190 672 28.2
condition 37 259 14.2
subroutine 81 130 62.3
pod n/a
total 770 2069 37.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Server/Object packages for MCE::Shared.
4             ##
5             ###############################################################################
6              
7 43     43   246 use strict;
  43         89  
  43         1063  
8 43     43   166 use warnings;
  43         53  
  43         851  
9              
10 43     43   615 use 5.010001;
  43         143  
11              
12 43     43   203 no warnings qw( threads recursion uninitialized numeric once );
  43         78  
  43         4281  
13              
14             package MCE::Shared::Server;
15              
16             our $VERSION = '1.885';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (TestingAndDebugging::ProhibitNoStrict)
21             ## no critic (InputOutput::ProhibitTwoArgOpen)
22              
23 43     43   15325 use if $^O eq 'MSWin32', 'threads';
  43         351  
  43         240  
24 43     43   1706 use if $^O eq 'MSWin32', 'threads::shared';
  43         71  
  43         156  
25              
26 43     43   1234 no overloading;
  43         132  
  43         956  
27              
28 43     43   243 use Carp ();
  43         65  
  43         661  
29 43     43   22510 use Storable ();
  43         116773  
  43         6333  
30              
31             my ($_spawn_child, $_freeze, $_thaw);
32              
33             BEGIN {
34 43     43   140 local $@;
35              
36             eval 'use IO::FDPass ();'
37 43 50 33 43   3393 if ( ! $INC{'IO/FDPass.pm'} && $^O ne 'cygwin' );
  43         17719  
  43         11598  
  43         470  
38              
39 43 50       188 $_spawn_child = $INC{'threads.pm'} ? 0 : 1;
40              
41 43 50       155 if ( ! $INC{'PDL.pm'} ) {
42 43     43   2140 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  43     43   227  
  43         734  
  43         1635  
  43         202  
  43         556  
  43         941  
43 43 50       320 if ( ! $@ ) {
44 43         413 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
45 43         399 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
46 43 50       172 if ( $_encoder_ver - $_decoder_ver == 0 ) {
47 43         79 $_freeze = \&Sereal::Encoder::encode_sereal;
48 43         81 $_thaw = \&Sereal::Decoder::decode_sereal;
49             }
50             }
51             }
52              
53 43 50       1719 if ( ! defined $_freeze ) {
54 0         0 $_freeze = \&Storable::freeze;
55 0         0 $_thaw = \&Storable::thaw;
56             }
57             }
58              
59 0     0   0 sub _get_freeze { $_freeze; }
60 0     0   0 sub _get_thaw { $_thaw; }
61              
62 43     43   2384 use IO::Handle ();
  43         25743  
  43         785  
63 43     43   183 use Scalar::Util qw( blessed looks_like_number reftype weaken );
  43         79  
  43         2453  
64 43     43   2517 use Socket qw( SOL_SOCKET SO_RCVBUF );
  43         15131  
  43         2460  
65 43     43   1414 use Time::HiRes qw( alarm sleep time );
  43         3119  
  43         305  
66              
67 43     43   6190 use MCE::Signal 1.863 (); # requires 1.863 minimally
  43         9392  
  43         894  
68 43     43   2108 use MCE::Util ();
  43         22033  
  43         650  
69 43     43   217 use MCE::Mutex ();
  43         59  
  43         784  
70 43     43   2580 use bytes;
  43         147  
  43         215  
71              
72             ## The POSIX module has many symbols. Try not loading it simply
73             ## to have WNOHANG. The following covers most platforms.
74              
75             use constant {
76 43 50       4346 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
77             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
78 43     43   2233 };
  43         81  
79              
80             use constant {
81             # Max data channels. This cannot be greater than 8 on MSWin32.
82 43         289766 DATA_CHANNELS => 8,
83              
84             SHR_M_NEW => 'M~NEW', # New share
85             SHR_M_CID => 'M~CID', # ClientID request
86             SHR_M_DEE => 'M~DEE', # Deeply shared
87             SHR_M_INC => 'M~INC', # Increment count
88             SHR_M_OBJ => 'M~OBJ', # Object request
89             SHR_M_OB0 => 'M~OB0', # Object request - thaw'less
90             SHR_M_DES => 'M~DES', # Destroy request
91             SHR_M_EXP => 'M~EXP', # Export request
92             SHR_M_INX => 'M~INX', # Iterator next
93             SHR_M_IRW => 'M~IRW', # Iterator rewind
94             SHR_M_STP => 'M~STP', # Exit loop
95              
96             SHR_O_PDL => 'O~PDL', # PDL::ins inplace(this),what,coords
97             SHR_O_DAT => 'O~DAT', # Get MCE::Hobo data
98             SHR_O_CLR => 'O~CLR', # Clear
99             SHR_O_FCH => 'O~FCH', # Fetch
100             SHR_O_SZE => 'O~SZE', # Size
101              
102             WA_ARRAY => 1, # Wants list
103 43     43   256 };
  43         128  
104              
105             ###############################################################################
106             ## ----------------------------------------------------------------------------
107             ## Private functions.
108             ##
109             ###############################################################################
110              
111             my ($_SVR, $_stopped, %_all, %_obj, %_ob2, %_ob3, %_itr, %_new) = (undef);
112             my ($_next_id, $_is_client, $_init_pid, $_svr_pid) = (0, 1);
113             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
114             my %_export_nul;
115              
116             my @_db_modules = qw(
117             AnyDBM_File DB_File GDBM_File NDBM_File ODBM_File SDBM_File
118             BerkeleyDB::Btree BerkeleyDB::Hash BerkeleyDB::Queue
119             BerkeleyDB::Recno CDB_File KyotoCabinet::DB SQLite_File
120             TokyoCabinet::ADB TokyoCabinet::BDB TokyoCabinet::HDB
121             Tie::Array::DBD Tie::Hash::DBD
122             );
123              
124             my $_is_MSWin32 = ( $^O eq 'MSWin32') ? 1 : 0;
125             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
126             my $_oid = "$$.$_tid";
127              
128             sub _croak {
129 0     0   0 Carp::carp($_[0]);
130 0         0 MCE::Signal::stop_and_exit('INT');
131             }
132             sub CLONE {
133 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
134             }
135              
136             END {
137 43 0 33 43   20373 CORE::kill('KILL', $$)
138             if ($_is_MSWin32 && $MCE::Signal::KILLED);
139 43 100 66     1077 &_stop()
      66        
140             if ($_init_pid && $_init_pid eq "$$.$_tid" && $_is_client);
141             }
142              
143             sub _new {
144 158     158   497 my ($_class, $_deeply, %_hndls) = ($_[0]->{class}, $_[0]->{_DEEPLY_});
145 158         1041 my $_has_fh = ($_class =~ /^MCE::Shared::(?:Condvar|Queue)$/);
146              
147 158 100       452 if (!$_svr_pid) {
148             # Minimum support on platforms without IO::FDPass (not installed).
149             # Condvar and Queue must be shared first before others.
150             $_export_nul{ $_class } = undef, return _share(@_)
151 31 50 66     121 if $_has_fh && !$INC{'IO/FDPass.pm'};
152              
153 31         108 _start();
154             }
155              
156 158 100       879 if ($_has_fh) {
157             _croak("Sharing module '$_class' while the server is running\n".
158             "requires the 'IO::FDPass' module, missing in Perl")
159 29 50       161 if !$INC{'IO/FDPass.pm'};
160              
161 29         216 for my $_k (qw(
162             _qr_mutex _qw_sock _qr_sock _aw_sock _ar_sock _cw_sock _cr_sock
163             )) {
164 203 100       545 if ( defined $_[1]->{ $_k } ) {
165 84         816 $_hndls{ $_k } = delete $_[1]->{ $_k };
166 84         239 $_[1]->{ $_k } = undef;
167             }
168             }
169             }
170              
171 158         556 my $_chn = $_SVR->{_data_channels} + 1;
172 158         910 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
173 158         573 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
174 158         529 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
175              
176             ##
177             # Sereal cannot encode $DB_RECNO. Therefore, must encode using Storable.
178             # Error: DB_File::RECNOINFO does not define the method FIRSTKEY
179             #
180             # my $ob = tie my @db, 'MCE::Shared', { module => 'DB_File' }, $file,
181             # O_RDWR|O_CREAT, 0640, $DB_RECNO or die "open error '$file': $!";
182             ##
183              
184 158         1998 my $_buf = Storable::freeze(shift);
185 158         11703 my $_bu2 = Storable::freeze([ @_ ]);
186              
187 158 50       5975 local $\ = undef if (defined $\);
188 158 50       676 local $/ = $LF if ($/ ne $LF);
189              
190 158 50       1257 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
191              
192 158         6188 print({$_DAT_W_SOCK} SHR_M_NEW.$LF . $_chn.$LF),
193 158 100       5729 print({$_DAU_W_SOCK} length($_buf).$LF, $_buf, length($_bu2).$LF, $_bu2,
  158         2717  
194             (keys %_hndls ? 1 : 0).$LF);
195              
196 158         194510 <$_DAU_W_SOCK>;
197              
198 158         1812 undef($_buf), undef($_bu2);
199              
200 158 100       904 if (keys %_hndls) {
201 29         101 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
202 116 100       468 if (exists $_hndls{ $_k }) {
203 55         2287 IO::FDPass::send( fileno $_DAU_W_SOCK, fileno $_hndls{ $_k } );
204 55         47234 <$_DAU_W_SOCK>;
205             }
206             }
207             }
208              
209 158         53831 chomp(my $_id = <$_DAU_W_SOCK>),
210             chomp(my $_len = <$_DAU_W_SOCK>);
211              
212 158 50       1454 read($_DAU_W_SOCK, $_buf, $_len) if $_len;
213              
214 158 50       1352 $_DAT_LOCK->unlock() if !$_is_MSWin32;
215              
216 158 50       3310 $! = $_id, return '' unless $_len;
217              
218 158 100       586 if (keys %_hndls) {
219 29         244 $_all{ $_id } = $_class;
220 29         95 $_obj{ $_id } = \%_hndls;
221             }
222              
223 158 100       433 if (!$_deeply) {
224             # for auto-destroy
225 156 50       1198 $_new{ $_id } = $_tid ? $$ .'.'. $_tid : $$;
226             }
227              
228 158         4459 return $_thaw->($_buf);
229             }
230              
231             sub _incr_count {
232 0 0   0   0 return unless $_svr_pid;
233              
234 0         0 my $_chn = $_SVR->{_data_channels} + 1;
235 0         0 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
236 0         0 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
237 0         0 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
238              
239 0 0       0 local $\ = undef if (defined $\);
240 0 0       0 local $/ = $LF if ($/ ne $LF);
241 0         0 local $MCE::Signal::SIG;
242              
243             {
244 0         0 local $MCE::Signal::IPC = 1;
  0         0  
245 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
246              
247 0         0 print({$_DAT_W_SOCK} SHR_M_INC.$LF . $_chn.$LF),
248 0         0 print({$_DAU_W_SOCK} $_[0].$LF);
  0         0  
249 0         0 <$_DAU_W_SOCK>;
250              
251 0 0       0 $_DAT_LOCK->unlock() if !$_is_MSWin32;
252             }
253              
254 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
255              
256 0         0 return;
257             }
258              
259             sub _share {
260 0     0   0 my ($_params, $_item) = (shift, shift);
261 0         0 my $_class = delete $_params->{'class'};
262 0         0 my $_id = ++$_next_id;
263              
264 0 0       0 if ($_class eq ':construct_module:') {
    0          
265 0         0 my ($_module, $_fcn) = ($_params->{module}, pop @{ $_item });
  0         0  
266 0 0       0 my $_has_args = @{ $_item } ? 1 : 0; local $@;
  0         0  
  0         0  
267              
268 0         0 ($_module) = $_module =~ /(.*)/; # remove tainted'ness
269 0         0 ($_fcn ) = $_fcn =~ /(.*)/;
270              
271 0 0       0 MCE::Shared::_use( $_class = $_module ) or _croak("$@\n");
272              
273 0 0       0 _croak("Can't locate object method \"$_fcn\" via package \"$_module\"")
274             unless eval qq{ $_module->can('$_fcn') };
275              
276 0 0       0 $! = 0; $_item = $_module->$_fcn(@{ $_item }) or return '';
  0         0  
  0         0  
277              
278 0 0       0 $_export_nul{ $_class } = undef if ($_class->isa('Graphics::Framebuffer'));
279 0 0       0 $_export_nul{ $_class } = undef if ($_fcn eq 'TIEHANDLE');
280              
281 0 0 0     0 return '' if (
      0        
282             $_has_args && $_fcn eq 'TIEHANDLE' && !defined(fileno $_item)
283             );
284             }
285             elsif ($_class eq ':construct_pdl:') {
286 0         0 local $@; local $SIG{__DIE__};
  0         0  
287              
288 0         0 $_class = 'PDL', $_item = eval q{
289             unless ($INC{'PDL.pm'}) {
290             use PDL;
291             # Disable PDL auto-threading.
292             eval q{ PDL::set_autopthread_targ(1) };
293             }
294              
295             my $_func = pop @{ $_item };
296              
297             if ($_func eq 'sbyte' ) { sbyte (@{ $_item }); }
298             elsif ($_func eq 'byte' ) { byte (@{ $_item }); }
299             elsif ($_func eq 'short' ) { short (@{ $_item }); }
300             elsif ($_func eq 'ushort' ) { ushort (@{ $_item }); }
301             elsif ($_func eq 'long' ) { long (@{ $_item }); }
302             elsif ($_func eq 'ulong' ) { ulong (@{ $_item }); }
303             elsif ($_func eq 'indx' ) { indx (@{ $_item }); }
304             elsif ($_func eq 'longlong' ) { longlong (@{ $_item }); }
305             elsif ($_func eq 'ulonglong') { ulonglong (@{ $_item }); }
306             elsif ($_func eq 'float' ) { float (@{ $_item }); }
307             elsif ($_func eq 'double' ) { double (@{ $_item }); }
308             elsif ($_func eq 'ldouble' ) { ldouble (@{ $_item }); }
309             elsif ($_func eq 'sequence' ) { sequence (@{ $_item }); }
310             elsif ($_func eq 'zeroes' ) { zeroes (@{ $_item }); }
311             elsif ($_func eq 'zeros' ) { zeros (@{ $_item }); }
312             elsif ($_func eq 'ones' ) { ones (@{ $_item }); }
313             elsif ($_func eq 'random' ) { random (@{ $_item }); }
314             elsif ($_func eq 'grandom' ) { grandom (@{ $_item }); }
315             else { pdl (@{ $_item }); }
316             };
317             }
318              
319 0         0 $_all{ $_id } = $_class;
320 0         0 $_ob3{"$_id:count"} = 1;
321              
322 0 0 0     0 if ($_class eq 'MCE::Shared::Handle' && reftype $_item eq 'ARRAY') {
323 0         0 $_obj{ $_id } = IO::Handle->new();
324 0         0 $_export_nul{ $_class } = undef;
325              
326 0         0 bless $_obj{ $_id }, $_class;
327             }
328             else {
329 0         0 $_obj{ $_id } = $_item;
330              
331 0 0 0     0 if ( reftype $_obj{ $_id } eq 'HASH' &&
332             reftype $_obj{ $_id }->{'fh'} eq 'GLOB' ) {
333              
334 0 0       0 if ( $_class->isa('Tie::File') ) {
335             # enable autoflush, enable raw layer
336 0         0 $_obj{ $_id }->{'fh'}->autoflush(1);
337 0         0 binmode($_obj{ $_id }->{'fh'}, ':raw');
338             }
339              
340 0         0 $_export_nul{ $_class } = undef;
341             }
342             }
343              
344 0         0 my $self = bless [ $_id, $_class ], 'MCE::Shared::Object';
345              
346 0         0 $_ob2{ $_id } = $_freeze->([ $self ]);
347              
348 0 0       0 if ( $_params->{tied} ) {
349             # set encoder/decoder upon receipt in MCE::Shared::_tie
350 0         0 for my $_module ( @_db_modules ) {
351 0 0       0 $self->[2] = 1, last if $_class->isa($_module);
352             }
353 0 0       0 $_export_nul{ $_class } = undef if $self->[2];
354             }
355              
356 0         0 return $self;
357             }
358              
359             sub _start {
360 91 100   91   368 return if $_svr_pid;
361              
362 43 50       152 if ($INC{'PDL.pm'}) { local $@;
  0         0  
363             # PDL::IO::Storable is required for serializing piddles.
364 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
365             # PDL data should not be naively copied in new threads.
366 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
367             # Disable PDL auto-threading.
368 0         0 eval q{ PDL::set_autopthread_targ(1) };
369             }
370              
371 43         68 local $_; $_init_pid = "$$.$_tid", $_stopped = undef;
  43         177  
372              
373 43 50       169 my $_data_channels = ($_init_pid eq $_oid) ? DATA_CHANNELS : 2;
374              
375 43         124 $_SVR = { _data_channels => $_data_channels };
376              
377             # Defaults to the misc channel used by _new, _get_hobo_data, and export.
378 43         220 MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), 0);
379             MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), $_, 1)
380 43         6424 for (1 .. $_data_channels + 1);
381              
382 43 50 33     57402 setsockopt($_SVR->{_dat_r_sock}[0], SOL_SOCKET, SO_RCVBUF, 4096)
383             if ($^O ne 'aix' && $^O ne 'linux');
384              
385 43 50       174 if ($_is_MSWin32) {
386 0         0 for (1 .. $_data_channels + 1) {
387 0         0 my $_mutex;
388 0         0 $_SVR->{'_mutex_'.$_} = threads::shared::share($_mutex);
389             }
390             }
391             else {
392             $_SVR->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
393 43         381 for (1 .. $_data_channels + 1);
394             }
395              
396 43         106453 MCE::Shared::Object::_start();
397              
398             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
399 43 50       1830 unless $_is_MSWin32;
400              
401 43 50       210 if ($_spawn_child) {
402 43         59316 $_svr_pid = fork();
403 43 50 33     4184 _loop() if (defined $_svr_pid && $_svr_pid == 0);
404             }
405             else {
406 0         0 $_svr_pid = threads->create(\&_loop);
407 0 0       0 $_svr_pid->detach() if defined $_svr_pid;
408             }
409              
410 43 50       1020 _croak("cannot start the shared-manager process: $!")
411             unless (defined $_svr_pid);
412              
413 43 50 33     1356 sleep 0.015 if (!$_spawn_child || $_is_MSWin32);
414              
415 43         6169 return;
416             }
417              
418             sub _stop {
419 15     15   71 $_stopped = 1;
420 15 50 33     293 return unless ($_is_client && $_init_pid && $_init_pid eq "$$.$_tid");
      33        
421              
422 15 50       83 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
423 15 100       88 MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
424              
425 15         317 local ($!, $?, $@);
426              
427 15 50       66 if (defined $_svr_pid) {
428 15         51 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
429              
430 15 50       56 if (ref $_svr_pid) {
431 0         0 eval { $_svr_pid->kill('KILL') };
  0         0  
432             }
433             else {
434             local $SIG{HUP} = local $SIG{QUIT} = local $SIG{PIPE} =
435 15     0   969 local $SIG{INT} = local $SIG{TERM} = sub {};
436              
437 15         151 my $_start = time;
438              
439 15         39 eval {
440 15 50       59 local $\ = undef if (defined $\);
441 15         31 print {$_DAT_W_SOCK} SHR_M_STP.$LF.'0'.$LF;
  15         2137  
442             };
443              
444 15         79 while () {
445 29 100       1672 last if waitpid($_svr_pid, _WNOHANG);
446 14 50       94 if ( time - $_start > 0.7 ) {
447 0         0 CORE::kill('USR2', $_svr_pid);
448 0         0 waitpid($_svr_pid, 0);
449 0         0 last;
450             }
451 14         631630 sleep 0.045;
452             }
453             }
454              
455 15         83 $_init_pid = $_svr_pid = undef;
456 15         105 %_all = (), %_obj = ();
457              
458 15         514 MCE::Util::_destroy_socks($_SVR, qw( _dat_w_sock _dat_r_sock ));
459              
460 15         25509 for my $_i (1 .. $_SVR->{_data_channels} + 1) {
461 135         7385 delete $_SVR->{'_mutex_'.$_i};
462             }
463              
464 15         131 MCE::Shared::Object::_stop();
465             }
466              
467 15         246 return;
468             }
469              
470             sub _pid {
471 0 0   0   0 return ref($_svr_pid) ? int($_init_pid) : $_svr_pid;
472             }
473              
474             sub _destroy {
475 0     0   0 my ($_lkup, $_item, $_id) = @_;
476              
477             # safety for circular references to not destroy dangerously
478 0 0 0     0 return if exists $_ob3{ "$_id:count" } && --$_ob3{ "$_id:count" } > 0;
479              
480             # safety for circular references to not loop endlessly
481 0 0       0 return if exists $_lkup->{ $_id };
482              
483 0         0 $_lkup->{ $_id } = undef;
484              
485 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
    0          
486 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
487 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
488             }
489 0         0 delete $_ob3{ "$_id:deeply" };
490             }
491             elsif (exists $_obj{ $_id }) {
492 0 0 0     0 if ($_obj{ $_id }->isa('MCE::Shared::Scalar') ||
    0          
    0          
    0          
    0          
    0          
    0          
493             $_obj{ $_id }->isa('Tie::StdScalar')) {
494              
495 0 0       0 if (blessed($_item->FETCH())) {
496 0         0 my $_oid = $_item->FETCH()->SHARED_ID();
497 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
498             }
499              
500 0         0 undef ${ $_obj{ $_id } };
  0         0  
501             }
502 0         0 elsif ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
503 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
504 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
505 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
506 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
507             elsif (reftype $_obj{ $_id } eq 'GLOB') {
508 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
509             }
510             }
511              
512 0 0       0 weaken( delete $_obj{ $_id } ) if exists($_obj{ $_id });
513 0 0       0 weaken( delete $_itr{ $_id } ) if exists($_itr{ $_id });
514              
515             delete($_itr{ "$_id:args" }), delete($_all{ $_id }),
516 0         0 delete($_ob3{ "$_id:count" }), delete($_ob2{ $_id });
517              
518 0         0 return;
519             }
520              
521             ###############################################################################
522             ## ----------------------------------------------------------------------------
523             ## Server loop.
524             ##
525             ###############################################################################
526              
527             sub _exit {
528 0 0   0   0 $SIG{__DIE__} = sub {} unless $_tid;
        0      
529 0     0   0 $SIG{__WARN__} = sub {};
530              
531             # Flush file handles.
532 0         0 for my $_id ( keys %_obj ) {
533 0         0 eval {
534 0 0       0 if ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
  0 0       0  
    0          
    0          
    0          
    0          
535 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
536 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
537 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
538 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
539             elsif (reftype $_obj{ $_id } eq 'GLOB') {
540 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
541             }
542             };
543             }
544              
545             # Destroy non-exportable objects.
546 0         0 for my $_id ( keys %_all ) {
547 0         0 eval {
548             weaken( delete $_obj{ $_id } )
549 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } );
550             };
551             }
552              
553             # Wait for the main thread to exit.
554 0 0 0     0 if ( !$_spawn_child && ($_is_MSWin32 || $INC{'Tk.pm'} || $INC{'Wx.pm'}) ) {
      0        
555 0         0 sleep 1.0;
556             }
557              
558 0 0 0     0 threads->exit(0) if ( !$_spawn_child || $_is_MSWin32 );
559              
560 0 0       0 CORE::kill('KILL', $$) unless $_is_MSWin32;
561 0         0 CORE::exit(0);
562             }
563              
564             sub _loop {
565 0     0   0 $_is_client = 0;
566              
567 0 0 0     0 $MCE::MCE = undef if ($MCE::MCE && $MCE::MCE->{_wid} == 0);
568              
569 0         0 local $\ = undef; local $/ = $LF; $| = 1;
  0         0  
  0         0  
570 0         0 my $_running_inside_eval = $^S;
571              
572       0     local $SIG{TERM} = local $SIG{QUIT} = local $SIG{INT} = local $SIG{HUP} = sub {}
573 0 0       0 if ($_init_pid eq $_oid);
574              
575 0     0   0 local $SIG{PIPE} = sub { $SIG{PIPE} = sub {}; CORE::kill('PIPE', getppid()); }
  0         0  
576 0 0 0     0 if ($_spawn_child && !$_is_MSWin32);
577              
578 0 0 0     0 local $SIG{USR2} = \&_exit if ($_init_pid eq $_oid && !$_is_MSWin32);
579 0 0 0     0 local $SIG{KILL} = \&_exit if ($_init_pid eq $_oid && !$_spawn_child);
580              
581             local $SIG{__DIE__} = sub {
582 0 0 0 0   0 if (!defined $^S || $^S) {
583 0 0 0     0 if ( ($INC{'threads.pm'} && threads->tid() != 0) ||
      0        
      0        
584             $ENV{'PERL_IPERL_RUNNING'} ||
585             $_running_inside_eval
586             ) {
587             # thread env or running inside IPerl, check stack trace
588 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
589 0 0 0     0 CORE::die(@_)
590             if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
591             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ );
592             }
593             else {
594             # normal env, trust $^S
595 0         0 CORE::die(@_);
596             }
597             }
598              
599 0         0 $SIG{INT} = $SIG{__DIE__} = $SIG{__WARN__} = sub {};
600 0 0       0 print {*STDERR} defined $_[0] ? $_[0] : '';
  0         0  
601              
602 0 0 0     0 ( $_spawn_child && !$_is_MSWin32 )
603             ? CORE::kill('KILL', -getpgrp)
604             : CORE::exit($?);
605 0         0 };
606              
607 0         0 my ($_id, $_fcn, $_wa, $_len, $_func, $_var);
608 0         0 my ($_channel_id, $_done) = (0, 0);
609              
610 0         0 my $_channels = $_SVR->{_dat_r_sock};
611 0         0 my $_DAT_R_SOCK = $_SVR->{_dat_r_sock}[0];
612 0         0 my $_DAU_R_SOCK;
613              
614             my $_auto_reply = sub {
615 0 0   0   0 if ( $_wa == WA_ARRAY ) {
616 0         0 my @_ret = eval { $_var->$_fcn(@_) };
  0         0  
617 0         0 my $_buf = $_freeze->(\@_ret);
618 0         0 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
619             }
620              
621 0         0 my $_ret = eval { $_var->$_fcn(@_) };
  0         0  
622 0         0 my $_buf = $_freeze->([ $_ret ]);
623              
624 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
625 0         0 };
626              
627             my $_obj_keys = sub {
628 0     0   0 my ( $_obj, @_keys, $_cnt ) = ( shift );
629              
630 0 0       0 return keys %{ $_obj } if $_obj->isa('Tie::StdHash');
  0         0  
631 0 0       0 return (0 .. $_obj->FETCHSIZE - 1) unless $_obj->can('FIRSTKEY');
632              
633 0 0       0 if ( wantarray ) {
    0          
    0          
634 0         0 my $_key = $_obj->FIRSTKEY;
635 0 0       0 if ( defined $_key ) {
636 0         0 push @_keys, $_key;
637             # CDB_File expects the $_key argument
638 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
639 0         0 push @_keys, $_key;
640             }
641             }
642             }
643             elsif ( $_obj->isa('Tie::ExtraHash') ) {
644 0         0 $_cnt = keys %{ $_obj->[0] };
  0         0  
645             }
646             elsif ( $_obj->isa('Tie::IxHash') ) {
647 0         0 $_cnt = keys %{ $_obj->[2] };
  0         0  
648             }
649             else {
650 0         0 my $_key = $_obj->FIRSTKEY; $_cnt = 0;
  0         0  
651 0 0       0 if ( defined $_key ) {
652 0         0 $_cnt = 1;
653             # CDB_File expects the $_key argument
654 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
655 0         0 $_cnt++;
656             }
657             }
658             }
659              
660 0 0       0 wantarray ? @_keys : $_cnt;
661 0         0 };
662              
663             my $_iter = sub {
664 0 0   0   0 unless ( exists $_itr{ $_id } ) {
665              
666 0         0 my $pkg = $_all{ $_id };
667 0 0 0     0 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
668 0 0       0 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    0          
669              
670 0 0 0     0 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      0        
671 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
672 0         0 return;
673             }
674              
675             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
676             # or similar module.
677              
678 0 0       0 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
679              
680 0 0       0 if ( !exists $_itr{ "$_id:args" } ) {
681 0         0 @{ $_itr{ "$_id:args" } } = $pkg->can('keys')
682             ? $_obj{ $_id }->keys()
683 0 0       0 : $_obj_keys->( $_obj{ $_id } );
684             }
685             else {
686 0         0 my $_args = $_itr{ "$_id:args" };
687 0 0 0     0 if ( @{ $_args } == 1 &&
  0         0  
688             $_args->[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
689              
690 0 0       0 @{ $_args } = $_obj{ $_id }->keys($_args->[0])
  0         0  
691             if $pkg->isa('MCE::Shared::Base::Common');
692             }
693             else {
694 0 0       0 $_obj{ $_id }->_prune_head()
695             if $pkg->isa('MCE::Shared::Cache');
696             }
697             }
698              
699             $_itr{ $_id } = sub {
700 0         0 my $_key = shift @{ $_itr{ "$_id:args" } };
  0         0  
701 0 0       0 print({$_DAU_R_SOCK} '-1'.$LF), return if !defined($_key);
  0         0  
702 0         0 my $_buf = $_freeze->([ $_key, $_obj{ $_id }->$get($_key) ]);
703 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
704 0         0 };
705             }
706              
707 0         0 $_itr{ $_id }->();
708              
709 0         0 return;
710 0         0 };
711              
712             my $_warn = sub {
713 0 0   0   0 if ( $_wa ) {
714 0         0 my $_buf = $_freeze->([ ]);
715 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
716             }
717 0         0 };
718              
719             # --------------------------------------------------------------------------
720              
721 0         0 my %_output_function; %_output_function = (
722              
723             SHR_M_NEW.$LF => sub { # New share
724 0     0   0 my ($_buf, $_params, $_class, $_args, $_fd, $_item);
725              
726 0         0 chomp($_len = <$_DAU_R_SOCK>),
727             read($_DAU_R_SOCK, $_buf, $_len);
728              
729 0         0 $_params = Storable::thaw($_buf);
730 0         0 $_class = $_params->{'class'};
731              
732 0   0     0 { local $@; MCE::Shared::_use($_params->{module} || $_class); }
  0         0  
  0         0  
733              
734             chomp($_len = <$_DAU_R_SOCK>), read($_DAU_R_SOCK, $_buf, $_len),
735 0         0 chomp($_len = <$_DAU_R_SOCK>), print({$_DAU_R_SOCK} $LF);
  0         0  
736              
737 0         0 $_args = Storable::thaw($_buf); undef $_buf;
  0         0  
738              
739 0 0       0 if ($_len) {
740 0         0 $_export_nul{ $_class } = undef;
741              
742 0         0 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
743 0 0       0 if (exists $_args->[0]->{ $_k }) {
744 0         0 delete $_args->[0]->{ $_k };
745              
746 0 0       0 $_fd = IO::FDPass::recv(fileno $_DAU_R_SOCK); $_fd >= 0
  0         0  
747             or _croak("cannot receive file handle: $!");
748              
749 0 0       0 open $_args->[0]->{ $_k }, "+<&=$_fd"
750             or _croak("cannot convert file discriptor to handle: $!");
751              
752 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
753             }
754             }
755             }
756              
757 0 0       0 $_item = _share($_params, @{ $_args }) or do {
  0         0  
758 0         0 print {$_DAU_R_SOCK} int($!).$LF . '0'.$LF;
  0         0  
759 0         0 return;
760             };
761              
762 0         0 $_buf = $_freeze->($_item);
763              
764 0         0 print {$_DAU_R_SOCK} $_item->SHARED_ID().$LF .
  0         0  
765             length($_buf).$LF, $_buf;
766              
767 0 0       0 if ($_class eq 'MCE::Shared::Queue') {
    0          
    0          
768             MCE::Shared::Queue::_init_mgr(
769             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
770 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
771             }
772             elsif (reftype $_obj{ $_item->[0] } eq 'GLOB') {
773             MCE::Shared::Handle::_init_mgr(
774             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
775 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
776             }
777             elsif ($_class eq 'MCE::Shared::Condvar') {
778             MCE::Shared::Condvar::_init_mgr(
779             \$_DAU_R_SOCK, \%_obj, \%_output_function
780 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
781             }
782              
783 0         0 return;
784             },
785              
786             SHR_M_CID.$LF => sub { # ClientID request
787 0     0   0 print {$_DAU_R_SOCK} (++$_channel_id).$LF;
  0         0  
788 0 0       0 $_channel_id = 0 if ($_channel_id >= $_SVR->{_data_channels});
789              
790 0         0 return;
791             },
792              
793             SHR_M_DEE.$LF => sub { # Deeply shared
794 0     0   0 chomp(my $_id1 = <$_DAU_R_SOCK>),
795             chomp(my $_id2 = <$_DAU_R_SOCK>);
796              
797 0         0 $_ob3{ "$_id1:deeply" }->{ $_id2 } = undef;
798              
799 0         0 return;
800             },
801              
802             SHR_M_INC.$LF => sub { # Increment count
803 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
804              
805 0         0 $_ob3{ "$_id:count" }++;
806 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
807              
808 0         0 return;
809             },
810              
811             SHR_M_OBJ.$LF => sub { # Object request
812 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
813             chomp($_fcn = <$_DAU_R_SOCK>),
814             chomp($_wa = <$_DAU_R_SOCK>),
815             chomp($_len = <$_DAU_R_SOCK>),
816              
817             read($_DAU_R_SOCK, my($_buf), $_len);
818              
819 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
820              
821 0         0 $_wa ? $_auto_reply->(@{ $_thaw->($_buf) })
822 0 0       0 : eval { $_var->$_fcn(@{ $_thaw->($_buf) }) };
  0         0  
  0         0  
823              
824 0 0       0 warn $@ if $@;
825 0         0 return;
826             },
827              
828             SHR_M_OB0.$LF => sub { # Object request - thaw'less
829 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
830             chomp($_fcn = <$_DAU_R_SOCK>),
831             chomp($_wa = <$_DAU_R_SOCK>);
832              
833 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
834              
835 0   0     0 my $_code = $_var->can($_fcn) || do {
836             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
837             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
838             $_obj_keys;
839             }
840             else {
841             $_wa ? $_auto_reply->() : eval { $_var->$_fcn() };
842             warn $@ if $@;
843             return;
844             }
845             };
846              
847 0 0       0 if ( $_wa == WA_ARRAY ) {
    0          
848 0         0 my @_ret = eval { $_code->($_var) };
  0         0  
849 0         0 my $_buf = $_freeze->(\@_ret);
850 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
851             }
852             elsif ( $_wa ) {
853 0         0 my $_ret = eval { $_code->($_var) };
  0         0  
854 0         0 my $_buf = $_freeze->([ $_ret ]);
855 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
856             }
857             else {
858 0         0 eval { $_code->($_var) };
  0         0  
859             }
860              
861 0 0       0 warn $@ if $@;
862 0         0 return;
863             },
864              
865             SHR_M_DES.$LF => sub { # Destroy request
866 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
867              
868 0         0 local $SIG{__DIE__};
869 0         0 local $SIG{__WARN__};
870              
871 0         0 $_var = undef; local $@;
  0         0  
872              
873 0         0 eval {
874 0 0       0 my $_ret = exists($_all{ $_id }) ? '1' : '0';
875 0 0       0 _destroy({}, $_obj{ $_id }, $_id) if $_ret;
876             };
877              
878 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
879              
880 0         0 return;
881             },
882              
883             SHR_M_EXP.$LF => sub { # Export request
884 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
885             chomp($_len = <$_DAU_R_SOCK>);
886              
887 0 0       0 read($_DAU_R_SOCK, my($_keys), $_len) if $_len;
888              
889 0 0       0 if (exists $_obj{ $_id }) {
890 0         0 my $_buf;
891              
892             # Do not export: e.g. objects with file handles
893 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } ) {
894 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
895 0         0 return;
896             }
897              
898             # MCE::Shared::{ Array, Hash, Ordhash }, Hash::Ordered
899 0 0       0 if ($_obj{ $_id }->can('clone')) {
900             $_buf = ($_len)
901 0         0 ? Storable::freeze($_obj{ $_id }->clone(@{ $_thaw->($_keys) }))
902 0 0       0 : Storable::freeze($_obj{ $_id });
903             }
904             # Other
905             else {
906 0         0 $_buf = Storable::freeze($_obj{ $_id });
907             }
908              
909 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
910 0         0 undef $_buf;
911             }
912             else {
913 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
914             }
915              
916 0         0 return;
917             },
918              
919             SHR_M_INX.$LF => sub { # Iterator next
920 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
921              
922 0         0 my $_var = $_obj{ $_id };
923              
924 0 0       0 if ( my $_code = $_var->can('next') ) {
925 0         0 my $_buf = $_freeze->([ $_code->($_var) ]);
926 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
927             }
928             else {
929 0         0 $_iter->();
930             }
931              
932 0         0 return;
933             },
934              
935             SHR_M_IRW.$LF => sub { # Iterator rewind
936 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
937             chomp($_len = <$_DAU_R_SOCK>),
938              
939             read($_DAU_R_SOCK, my($_buf), $_len);
940              
941 0         0 my $_var = $_obj{ $_id };
942              
943 0 0       0 if ( my $_code = $_var->can('rewind') ) {
944 0         0 $_code->($_var, @{ $_thaw->($_buf) });
  0         0  
945             }
946             else {
947 0 0       0 weaken( delete $_itr{ $_id } ) if ( exists $_itr{ $_id } );
948 0         0 my @_args = @{ $_thaw->($_buf) };
  0         0  
949 0 0       0 if ( @_args ) {
950 0         0 $_itr{ "$_id:args" } = \@_args;
951             } else {
952 0         0 delete $_itr{ "$_id:args" };
953             }
954             }
955              
956 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
957              
958 0         0 return;
959             },
960              
961             SHR_M_STP.$LF => sub { # Exit loop
962 0 0   0   0 $SIG{USR2} = sub {} unless $_is_MSWin32;
963              
964 0         0 $_done = 1;
965              
966 0         0 return;
967             },
968              
969             SHR_O_PDL.$LF => sub { # PDL::ins inplace(this),...
970 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
971             chomp($_len = <$_DAU_R_SOCK>),
972              
973             read($_DAU_R_SOCK, my($_buf), $_len);
974              
975 0 0       0 if ($_all{ $_id } eq 'PDL') {
976             # PDL ins( inplace($this), $what, @coords );
977 0         0 local @_ = @{ Storable::thaw($_buf) };
  0         0  
978              
979 0 0 0     0 if (@_ == 1) {
    0 0        
    0 0        
    0          
    0          
980 0         0 ins( inplace($_obj{ $_id }), @_, 0, 0 );
981             }
982             elsif (@_ == 2 && $_[0] =~ /^:,(\d+):(\d+)/ && ref($_[1])) {
983 0         0 my $_s = $2 - $1;
984 0         0 ins( inplace($_obj{ $_id }), $_[1]->slice(":,0:$_s"), 0, $1 );
985             }
986             elsif (!ref($_[0]) && $_[0] =~ /^(\d+)$/) {
987 0         0 $_obj{ $_id }->set(@_);
988             }
989             elsif (@_ == 2) {
990 0         0 $_[0] =~ /^:,(\d+)/;
991 0   0     0 ins( inplace($_obj{ $_id }), $_[1], 0, $1 // $_[0] );
992             }
993             elsif (@_ > 2) {
994 0         0 ins( inplace($_obj{ $_id }), @_ );
995             }
996             }
997              
998 0         0 return;
999             },
1000              
1001             SHR_O_DAT.$LF => sub { # Get MCE::Hobo data
1002 0     0   0 my $_key;
1003              
1004 0         0 chomp($_id = <$_DAU_R_SOCK>),
1005             chomp($_key = <$_DAU_R_SOCK>);
1006              
1007 0   0     0 my $error = delete $_obj{ $_id }{ 'S'.$_key } // '';
1008 0   0     0 my $result = delete $_obj{ $_id }{ 'R'.$_key } // '';
1009              
1010 0         0 print {$_DAU_R_SOCK}
  0         0  
1011             length($error).$LF . length($result).$LF . $error, $result;
1012              
1013 0         0 return;
1014             },
1015              
1016             SHR_O_CLR.$LF => sub { # Clear
1017 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1018             chomp($_fcn = <$_DAU_R_SOCK>);
1019              
1020 0   0     0 my $_var = $_obj{ $_id } || do { return };
1021              
1022 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
1023 0         0 my $_keep = { $_id => 1 };
1024 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
1025 0         0 _destroy($_keep, $_obj{ $_oid }, $_oid);
1026             }
1027 0         0 delete $_ob3{ "$_id:deeply" };
1028             }
1029              
1030 0         0 eval { $_var->$_fcn() };
  0         0  
1031              
1032 0 0       0 warn $@ if $@;
1033 0         0 return;
1034             },
1035              
1036             SHR_O_FCH.$LF => sub { # Fetch
1037 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1038             chomp($_fcn = <$_DAU_R_SOCK>),
1039             chomp($_len = <$_DAU_R_SOCK>);
1040              
1041 0 0       0 read($_DAU_R_SOCK, my($_key), $_len) if $_len;
1042              
1043 0   0     0 my $_var = $_obj{ $_id } || do {
1044             return print {$_DAU_R_SOCK} '-1'.$LF;
1045             };
1046              
1047             my $_buf = $_len
1048 0 0       0 ? eval { $_var->$_fcn( chop $_key ? ${ $_thaw->($_key) } : $_key ) }
  0         0  
1049 0 0       0 : eval { $_var->$_fcn() };
  0         0  
1050              
1051 0 0       0 warn $@ if $@;
1052              
1053 0 0       0 return print {$_DAU_R_SOCK} '-1'.$LF if ( !defined $_buf );
  0         0  
1054              
1055             my $_ret = ( blessed($_buf) && $_buf->can('SHARED_ID') && $_ob2{ $_buf->[0] } )
1056 0 0 0     0 ? $_ob2{ $_buf->[0] }
1057             : $_freeze->([ $_buf ]);
1058              
1059 0         0 print {$_DAU_R_SOCK} length($_ret).$LF, $_ret;
  0         0  
1060              
1061 0         0 return;
1062             },
1063              
1064             SHR_O_SZE.$LF => sub { # Size
1065 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1066             chomp($_fcn = <$_DAU_R_SOCK>);
1067              
1068 0   0     0 $_var = $_obj{ $_id } || do {
1069             print {$_DAU_R_SOCK} $LF if $_wa;
1070             return;
1071             };
1072              
1073 0   0     0 my $_code = $_var->can($_fcn) || do {
1074             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
1075             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
1076             $_obj_keys;
1077             }
1078             else {
1079             $_len = eval { $_var->$_fcn() };
1080             print {$_DAU_R_SOCK} $_len.$LF;
1081              
1082             warn $@ if $@;
1083             return;
1084             }
1085             };
1086              
1087 0         0 $_len = eval { $_code->($_var) };
  0         0  
1088 0         0 print {$_DAU_R_SOCK} $_len.$LF;
  0         0  
1089              
1090 0 0       0 warn $@ if $@;
1091 0         0 return;
1092             },
1093              
1094 0         0 );
1095              
1096             MCE::Shared::Queue::_init_mgr(
1097             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1098 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
1099              
1100             MCE::Shared::Handle::_init_mgr(
1101             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1102 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
1103              
1104             MCE::Shared::Condvar::_init_mgr(
1105             \$_DAU_R_SOCK, \%_obj, \%_output_function
1106 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
1107              
1108             # --------------------------------------------------------------------------
1109              
1110             # Call on hash function.
1111              
1112 0 0       0 if ($_is_MSWin32) {
1113             # The normal loop hangs on Windows when processes/threads start/exit.
1114             # Using ioctl() properly, https://www.perlmonks.org/?node_id=780083
1115              
1116 0         0 my $_val_bytes = pack('L', 0);
1117 0         0 my ($_count, $_nbytes, $_start);
1118              
1119 0         0 while (!$_done) {
1120 0         0 $_start = time, $_count = 1;
1121              
1122             # MSWin32 FIONREAD
1123 0         0 IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_val_bytes);
1124              
1125 0 0       0 unless ($_nbytes = unpack('L', $_val_bytes)) {
1126 0 0       0 if ($_count) {
1127             # delay after a while to not consume a CPU core
1128 0 0 0     0 $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
1129             } else {
1130 0         0 sleep 0.015;
1131             }
1132 0         0 goto IOCTL;
1133             }
1134              
1135 0         0 do {
1136 0         0 sysread($_DAT_R_SOCK, $_func, 8);
1137 0 0       0 $_done = 1, last() unless length($_func) == 8;
1138 0         0 $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
1139              
1140 0         0 $_output_function{$_func}();
1141              
1142             } while (($_nbytes -= 8) >= 8);
1143             }
1144             }
1145             else {
1146 0         0 while (!$_done) {
1147 0         0 $_func = <$_DAT_R_SOCK>;
1148 0 0       0 last() unless length($_func) == 6;
1149 0         0 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
1150              
1151 0         0 $_output_function{$_func}();
1152             }
1153             }
1154              
1155 0         0 _exit();
1156             }
1157              
1158             ###############################################################################
1159             ## ----------------------------------------------------------------------------
1160             ## Object package.
1161             ##
1162             ###############################################################################
1163              
1164             package MCE::Shared::Object;
1165              
1166 43     43   363 use Scalar::Util qw( looks_like_number reftype );
  43         107  
  43         2373  
1167 43     43   18048 use MCE::Shared::Base ();
  43         103  
  43         879  
1168 43     43   245 use bytes;
  43         70  
  43         205  
1169              
1170             use constant {
1171 43 50       2682 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
1172             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
1173 43     43   1678 };
  43         215  
1174             use constant {
1175 43         3726 _ID => 0, _CLASS => 1, _ENCODE => 2, _DECODE => 3, # shared object
1176             _DREF => 4, _ITER => 5, _MUTEX => 6,
1177 43     43   223 };
  43         83  
1178             use constant {
1179 43         3749 _UNDEF => 0, _ARRAY => 1, _SCALAR => 2, # wantarray
1180 43     43   247 };
  43         72  
1181              
1182             ## Below, no circular reference to original, therefore no memory leaks.
1183              
1184             use overload (
1185             q("") => \&MCE::Shared::Base::_stringify,
1186             q(0+) => \&MCE::Shared::Base::_numify,
1187             q(@{}) => sub {
1188 43     43   240 no overloading;
  43         62  
  43         7292  
1189 7 50   7   1642 $_[0]->[_DREF] || do {
1190 7         30 local $@; my $c = $_[0]->[_CLASS];
  7         35  
1191 7         106 ($c) = $c =~ /(.*)/; # remove tainted'ness
1192 7 50       902 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEARRAY') };
1193 7         67 tie my @a, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  7         152  
1194 7         88 $_[0]->[_DREF] = \@a;
1195             };
1196             },
1197             q(%{}) => sub {
1198 43     43   268 no overloading;
  43         70  
  43         6843  
1199 9 50   9   3888 $_[0]->[_DREF] || do {
1200 9         33 local $@; my $c = $_[0]->[_CLASS];
  9         53  
1201 9         118 ($c) = $c =~ /(.*)/; # remove tainted'ness
1202 9 50       1481 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEHASH') };
1203 9         49 tie my %h, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  9         270  
1204 9         129 $_[0]->[_DREF] = \%h;
1205             };
1206             },
1207             q(${}) => sub {
1208 43     43   385 no overloading;
  43         85  
  43         6534  
1209 0 0   0   0 $_[0]->[_DREF] || do {
1210 0         0 local $@; my $c = $_[0]->[_CLASS];
  0         0  
1211 0         0 ($c) = $c =~ /(.*)/; # remove tainted'ness
1212 0 0       0 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIESCALAR') };
1213 0         0 tie my $s, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  0         0  
1214 0         0 $_[0]->[_DREF] = \$s;
1215             };
1216             },
1217 43         492 fallback => 1
1218 43     43   27592 );
  43         22516  
1219              
1220 43     43   5402 no overloading;
  43         69  
  43         3392  
1221              
1222             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_dat_ex, $_dat_un);
1223              
1224             my $_blessed = \&Scalar::Util::blessed;
1225              
1226             BEGIN {
1227 0         0 $_dat_ex = sub { _croak (
1228             "\nPlease start the shared-manager process manually when ready.\n",
1229             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1230 43     43   92296 ) };
1231             }
1232              
1233             # Hook for threads.
1234              
1235             sub CLONE {
1236 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
1237 0 0       0 &_init($_tid) if $_tid;
1238             }
1239              
1240             # Private functions.
1241              
1242             sub DESTROY {
1243 113 100   113   2904372 return if $_stopped;
1244 110 50 33     1154 return unless ( $_is_client && defined $_svr_pid && defined $_[0] );
      33        
1245              
1246 110 100 33     971 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      66        
1247 58         231 local ($!, $?);
1248 58 50 33     612 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1249             }
1250              
1251 110         395 my $_id = $_[0]->[_ID];
1252              
1253 110 100       303 if ( exists $_new{ $_id } ) {
1254 48 50       142 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1255              
1256 48 50       200 if ($_new{ $_id } eq $_pid) {
1257 48 50       107 return if $MCE::Signal::KILLED;
1258              
1259             delete($_all{ $_id }), delete($_obj{ $_id }),
1260             delete($_new{ $_id }), delete($_ob2{ $_id }),
1261 48         386 delete($_ob3{"$_id:count"});
1262              
1263 48         453 _req1('M~DES', $_id.$LF);
1264             }
1265             }
1266              
1267 110         869 return;
1268             }
1269              
1270 0     0   0 sub _croak { goto &MCE::Shared::Base::_croak }
1271              
1272 2     2   10 sub SHARED_ID { $_[0]->[_ID] }
1273              
1274 9     9   36 sub TIEARRAY { $_[1] }
1275 3     3   16 sub TIEHANDLE { $_[1] }
1276 9     9   37 sub TIEHASH { $_[1] }
1277 0     0   0 sub TIESCALAR { $_[1] }
1278              
1279             sub _reset {
1280             MCE::Shared::Object::_init_condvar(
1281             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1282             $_freeze, $_thaw
1283 84 100   84   515 ) if $INC{'MCE/Shared/Condvar.pm'};
1284              
1285             MCE::Shared::Object::_init_handle(
1286             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1287             $_freeze, $_thaw
1288 84 100       308 ) if $INC{'MCE/Shared/Handle.pm'};
1289              
1290             MCE::Shared::Object::_init_queue(
1291             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1292             $_freeze, $_thaw
1293 84 100       482 ) if $INC{'MCE/Shared/Queue.pm'};
1294             }
1295              
1296             sub _start {
1297 43     43   117 $_chn = $_SVR->{_data_channels} + 1;
1298 43         147 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1299 43         142 $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
1300 43         93 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1301              
1302             # inlined for performance
1303             $_dat_ex = sub {
1304 2005 50   2005   4932 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1305             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1306 2005 50       9762 unless $_DAT_LOCK->{ $_pid };
1307 43         349 };
1308             $_dat_un = sub {
1309 2005 50   2005   6477 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1310             CORE::syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1311 2005 50       19850 if $_DAT_LOCK->{ $_pid };
1312 43         163 };
1313              
1314 43         147 _reset();
1315             }
1316              
1317             sub _stop {
1318 15     15   266 $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_dat_un = undef;
1319              
1320 0     0   0 $_dat_ex = sub { _croak (
1321             "\nPlease start the shared-manager process manually when ready.\n",
1322             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1323 15         114 ) };
1324              
1325 15         763 return;
1326             }
1327              
1328             sub _get_channel_id {
1329 28 50   28   542 local $\ = undef if (defined $\);
1330 28 50       411 local $/ = $LF if ($/ ne $LF);
1331 28         447 local $MCE::Signal::SIG;
1332              
1333 28         321 my $_ret;
1334              
1335             {
1336 28         145 local $MCE::Signal::IPC = 1;
  28         507  
1337 28 50       909 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1338              
1339 28         1206 print {$_DAT_W_SOCK} 'M~CID'.$LF . $_chn.$LF;
  28         2937  
1340 28         6961 chomp($_ret = <$_DAU_W_SOCK>);
1341              
1342 28 50       696 $_dat_un->() if !$_is_MSWin32;
1343             }
1344              
1345 28 50       330 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1346              
1347 28         245 return $_ret;
1348             }
1349              
1350             sub _init {
1351 28 50   28   829 return unless defined $_SVR;
1352              
1353 28   33     1904 my $_id = $_[0] // &_get_channel_id();
1354 28 50       1081 $_id = $$ if ( $_id !~ /\d+/ );
1355              
1356 28         485 $_chn = abs($_id) % $_SVR->{_data_channels} + 1;
1357 28         442 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1358 28         176 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1359              
1360 28         1222 %_new = (), _reset();
1361              
1362 28         173 return;
1363             }
1364              
1365             ###############################################################################
1366             ## ----------------------------------------------------------------------------
1367             ## Private routines.
1368             ##
1369             ###############################################################################
1370              
1371             # Called by AUTOLOAD, STORE, set, and keys.
1372              
1373             sub _auto {
1374 1228 100   1228   3392 my $_wa = !defined wantarray ? _UNDEF : wantarray ? _ARRAY : _SCALAR;
    100          
1375              
1376 1228 50       3277 local $\ = undef if (defined $\);
1377 1228         1984 local $MCE::Signal::SIG;
1378              
1379 1228         1541 my $_buf;
1380              
1381             {
1382 1228         1514 local $MCE::Signal::IPC = 1;
  1228         1639  
1383 1228 50       3831 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1384              
1385 1228 100       17696 if ( @_ == 2 ) {
1386 155         4064 print({$_DAT_W_SOCK} 'M~OB0'.$LF . $_chn.$LF),
1387 155         245 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . $_wa.$LF);
  155         2219  
1388             }
1389             else {
1390 1073         11464 my ( $_fcn, $_id, $_buf ) = ( shift, shift()->[_ID], $_freeze->([ @_ ]) );
1391 1073         3540 my $_tmp = $_id.$LF . $_fcn.$LF . $_wa.$LF . length($_buf).$LF;
1392 1073         68978 print({$_DAT_W_SOCK} 'M~OBJ'.$LF . $_chn.$LF),
1393 1073         1533 print({$_DAU_W_SOCK} $_tmp, $_buf);
  1073         12862  
1394             }
1395              
1396 1228 100       4066 if ( $_wa ) {
1397 916 50       2647 local $/ = $LF if ($/ ne $LF);
1398 916         411093 chomp(my $_len = <$_DAU_W_SOCK>);
1399 916         5034 read($_DAU_W_SOCK, $_buf, $_len);
1400             }
1401              
1402 1228 50       4434 $_dat_un->() if !$_is_MSWin32;
1403             }
1404              
1405 1228 50       3397 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1406              
1407 1228 100       3631 return unless $_wa;
1408 916 100       9357 return ( $_wa != _ARRAY ) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  382         6648  
1409             }
1410              
1411             # Called by MCE::Hobo ( ->join, ->wait_one ).
1412              
1413             sub _get_hobo_data {
1414 49 50 33 49   735 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      33        
1415 49         411 local ($!, $?);
1416 49 50 33     581 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1417             }
1418              
1419 49 50       209 local $\ = undef if (defined $\);
1420 49 50       158 local $/ = $LF if ($/ ne $LF);
1421 49         171 local $MCE::Signal::SIG;
1422              
1423 49         88 my ($_result, $_error);
1424              
1425             {
1426 49         65 local $MCE::Signal::IPC = 1;
  49         153  
1427 49 50       306 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1428              
1429 49         1968 print({$_DAT_W_SOCK} 'O~DAT'.$LF . $_chn.$LF),
1430 49         828 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF . $_[1].$LF);
  49         770  
1431              
1432 49         7046 chomp(my $_le1 = <$_DAU_W_SOCK>),
1433             chomp(my $_le2 = <$_DAU_W_SOCK>);
1434              
1435 49 50       214 read($_DAU_W_SOCK, $_error, $_le1) if $_le1;
1436 49 100       213 read($_DAU_W_SOCK, $_result, $_le2) if $_le2;
1437              
1438 49 50       256 $_dat_un->() if !$_is_MSWin32;
1439             }
1440              
1441 49 50       169 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1442              
1443 49         287 return ($_result, $_error);
1444             }
1445              
1446             # Called by await, dequeue_timed, rewind, broadcast, signal, timedwait, and
1447             # wait. Including CLOSE, DESTROY, and destroy.
1448              
1449             sub _req1 {
1450 78 50   78   260 return unless defined $_DAU_W_SOCK; # (in cleanup)
1451              
1452 78 50       200 local $\ = undef if (defined $\);
1453 78 50       187 local $/ = $LF if ($/ ne $LF );
1454 78         139 local $MCE::Signal::SIG;
1455              
1456 78         131 my $_ret;
1457              
1458             {
1459 78         104 local $MCE::Signal::IPC = 1;
  78         188  
1460 78 50       289 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1461              
1462 78         2503 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1463 78         1271 print({$_DAU_W_SOCK} $_[1]);
  78         1337  
1464 78         20513 chomp($_ret = <$_DAU_W_SOCK>);
1465              
1466 78 50       421 $_dat_un->() if !$_is_MSWin32;
1467             }
1468              
1469 78 50       303 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1470              
1471 78         176 $_ret;
1472             }
1473              
1474             # Called by PRINT, PRINTF, STORE, ins_inplace, and set.
1475              
1476             sub _req2 {
1477 224 50   224   421 local $\ = undef if (defined $\);
1478 224         243 local $MCE::Signal::SIG;
1479              
1480             {
1481 224         229 local $MCE::Signal::IPC = 1;
  224         247  
1482 224 50       455 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1483              
1484 224         5108 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1485 224         2262 print({$_DAU_W_SOCK} $_[1], $_[2]);
  224         1810  
1486              
1487 224 50       761 $_dat_un->() if !$_is_MSWin32;
1488             }
1489              
1490 224 50       517 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1491              
1492 224         820 1;
1493             }
1494              
1495             # Called by CLEAR and clear.
1496              
1497             sub _req3 {
1498 40     40   365 my ( $_fcn, $self ) = @_;
1499              
1500 40 50       169 local $\ = undef if (defined $\);
1501 40 50       186 local $/ = $LF if ($/ ne $LF );
1502 40         96 local $MCE::Signal::SIG;
1503              
1504 40 50       156 delete $self->[_ITER] if defined $self->[_ITER];
1505              
1506             {
1507 40         68 local $MCE::Signal::IPC = 1;
  40         79  
1508 40 50       230 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1509              
1510 40         1336 print({$_DAT_W_SOCK} 'O~CLR'.$LF . $_chn.$LF),
1511 40         495 print({$_DAU_W_SOCK} $self->[_ID].$LF . $_fcn.$LF);
  40         636  
1512              
1513 40 50       253 $_dat_un->() if !$_is_MSWin32;
1514             }
1515              
1516 40 50       181 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1517              
1518 40         279 return;
1519             }
1520              
1521             # Called by FETCH and get.
1522              
1523             sub _req4 {
1524 157 50   157   559 local $\ = undef if (defined $\);
1525 157 50       440 local $/ = $LF if ($/ ne $LF );
1526 157         300 local $MCE::Signal::SIG;
1527              
1528 157         276 my ( $_key, $_len, $_buf );
1529              
1530 157 100       395 if ( @_ == 3 ) {
1531 104 50       1155 $_key = ref($_[2]) ? $_[2].'0' : $_freeze->(\$_[2]).'1';
1532             }
1533              
1534             {
1535 157         288 local $MCE::Signal::IPC = 1;
  157         272  
1536 157 50       678 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1537              
1538 157         5433 print({$_DAT_W_SOCK} 'O~FCH'.$LF . $_chn.$LF),
1539 157         2051 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . length($_key).$LF, $_key);
  157         2678  
1540 157         31604 chomp($_len = <$_DAU_W_SOCK>);
1541              
1542 157 100       1054 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1543              
1544 157 50       642 $_dat_un->() if !$_is_MSWin32;
1545             }
1546              
1547 157 50       517 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1548              
1549 157 100       354 return undef if ($_len < 0);
1550              
1551 156 50 33     547 if ( $_[1]->[_DECODE] && $_[0] eq 'FETCH' ) {
1552 0         0 local $@; $_buf = $_thaw->($_buf)[0];
  0         0  
1553 0   0     0 return eval { $_[1]->[_DECODE]->($_buf) } || $_buf;
1554             }
1555              
1556 156         2843 $_thaw->($_buf)[0];
1557             }
1558              
1559             # Called by FETCHSIZE, SCALAR, keys, and pending.
1560              
1561             sub _size {
1562 4 50   4   27 local $\ = undef if (defined $\);
1563 4 50       18 local $/ = $LF if ($/ ne $LF );
1564 4         12 local $MCE::Signal::SIG;
1565              
1566 4         9 my $_size;
1567              
1568             {
1569 4         11 local $MCE::Signal::IPC = 1;
  4         8  
1570 4 50       33 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1571              
1572 4         131 print({$_DAT_W_SOCK} 'O~SZE'.$LF . $_chn.$LF),
1573 4         48 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF);
  4         52  
1574 4         446 chomp($_size = <$_DAU_W_SOCK>);
1575              
1576 4 50       33 $_dat_un->() if !$_is_MSWin32;
1577             }
1578              
1579 4 50       20 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1580              
1581 4 50       41 length($_size) ? int($_size) : undef;
1582             }
1583              
1584             ###############################################################################
1585             ## ----------------------------------------------------------------------------
1586             ## Common methods.
1587             ##
1588             ###############################################################################
1589              
1590             our $AUTOLOAD; # MCE::Shared::Object::
1591              
1592             sub AUTOLOAD {
1593 235     235   97052 my $_fcn = $AUTOLOAD; substr($_fcn, 0, rindex($_fcn,':') + 1, '');
  235         784  
1594              
1595             # save this method for future calls
1596 43     43   332 no strict 'refs';
  43         91  
  43         98429  
1597 235     976   1816 *{ $AUTOLOAD } = sub { _auto($_fcn, @_) };
  235         1602  
  976         362471  
1598              
1599 235         471 goto &{ $AUTOLOAD };
  235         1051  
1600             }
1601              
1602             # blessed ( )
1603              
1604             sub blessed {
1605 17     17   818 $_[0]->[_CLASS];
1606             }
1607              
1608             # decoder ( CODE )
1609             # decoder ( )
1610              
1611             sub decoder {
1612 0 0 0 0   0 $_[0]->[_DECODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1613 0         0 $_[0]->[_DECODE];
1614             }
1615              
1616             # encoder ( CODE )
1617             # encoder ( )
1618              
1619             sub encoder {
1620 0 0 0 0   0 $_[0]->[_ENCODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1621 0         0 $_[0]->[_ENCODE];
1622             }
1623              
1624             # destroy ( { unbless => 1 } )
1625             # destroy ( )
1626              
1627             sub destroy {
1628 0     0   0 my $_id = $_[0]->[_ID];
1629 0 0 0     0 my $_un = (ref $_[1] eq 'HASH' && $_[1]->{'unbless'}) ? 1 : 0;
1630 0 0       0 my $_item = (defined wantarray) ? $_[0]->export({ unbless => $_un }) : undef;
1631 0 0       0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1632              
1633 0         0 delete($_all{ $_id }), delete($_obj{ $_id });
1634              
1635 0 0 0     0 if (defined $_svr_pid && exists $_new{ $_id } && $_new{ $_id } eq $_pid) {
      0        
1636 0         0 delete($_new{ $_id }), _req1('M~DES', $_id.$LF);
1637             }
1638              
1639 0         0 $_[0] = undef;
1640 0         0 $_item;
1641             }
1642              
1643             # export ( { unbless => 1 }, key [, key, ... ] )
1644             # export ( key [, key, ... ] )
1645             # export ( )
1646              
1647             sub export {
1648 0     0   0 my $_ob = shift;
1649 0         0 my $_id = $_ob->[_ID];
1650 0 0       0 my $_lkup = ref($_[0]) eq 'HASH' ? shift : {};
1651              
1652             # safety for circular references to not loop endlessly
1653 0 0       0 return $_lkup->{ $_id } if exists $_lkup->{ $_id };
1654              
1655 0 0       0 my $_tmp = @_ ? $_freeze->([ @_ ]) : '';
1656 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1657 0         0 my $_class = $_ob->[_CLASS];
1658 0         0 my $_item;
1659              
1660 0         0 { local $@; MCE::Shared::_use($_class); }
  0         0  
1661              
1662             {
1663 0 0       0 local $\ = undef if (defined $\);
  0         0  
  0         0  
1664 0 0       0 local $/ = $LF if ($/ ne $LF);
1665 0         0 local $MCE::Signal::SIG;
1666              
1667 0         0 my $_len;
1668              
1669             {
1670 0         0 local $MCE::Signal::IPC = 1;
  0         0  
1671 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1672              
1673 0         0 print({$_DAT_W_SOCK} 'M~EXP'.$LF . $_chn.$LF),
1674 0         0 print({$_DAU_W_SOCK} $_buf, $_tmp); undef $_buf;
  0         0  
  0         0  
1675 0         0 chomp($_len = <$_DAU_W_SOCK>);
1676              
1677 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1678              
1679 0 0       0 $_dat_un->() if !$_is_MSWin32;
1680             }
1681              
1682 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1683              
1684 0 0       0 return undef if ($_len < 0);
1685              
1686 0         0 $_item = $_lkup->{ $_id } = Storable::thaw($_buf);
1687 0         0 undef $_buf;
1688             }
1689              
1690 0         0 my $_data; local $_;
  0         0  
1691              
1692             ## no critic
1693 0 0 0     0 if ( $_class->isa('MCE::Shared::Array') || $_class->isa('Tie::StdArray') ) {
    0 0        
    0 0        
1694 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1695 0         0 } @{ $_item };
  0         0  
1696              
1697 0 0       0 return $_lkup->{ $_id } = [ @{ $_item } ] if $_lkup->{'unbless'};
  0         0  
1698             }
1699             elsif ( $_class->isa('MCE::Shared::Hash') || $_class->isa('Tie::StdHash') ) {
1700 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1701 0         0 } CORE::values %{ $_item };
  0         0  
1702              
1703 0 0       0 return $_lkup->{ $_id } = { %{ $_item } } if $_lkup->{'unbless'};
  0         0  
1704             }
1705             elsif ( $_class->isa('MCE::Shared::Scalar') || $_class->isa('Tie::StdScalar') ) {
1706 0 0 0     0 if ( $_blessed->(${ $_item }) && ${ $_item }->can('export') ) {
  0         0  
  0         0  
1707 0         0 ${ $_item } = ${ $_item }->export($_lkup);
  0         0  
  0         0  
1708             }
1709 0 0       0 return $_lkup->{ $_id } = \do { my $o = ${ $_item } } if $_lkup->{'unbless'};
  0         0  
  0         0  
1710             }
1711             else {
1712 0 0       0 if ( $_class->isa('MCE::Shared::Ordhash') ) { $_data = $_item->[0] }
  0 0       0  
    0          
    0          
    0          
1713 0         0 elsif ( $_class->isa('MCE::Shared::Cache') ) { $_data = $_item->[0] }
1714 0         0 elsif ( $_class->isa('Hash::Ordered') ) { $_data = $_item->[0] }
1715 0         0 elsif ( $_class->isa('Tie::ExtraHash') ) { $_data = $_item->[0] }
1716 0         0 elsif ( $_class->isa('Tie::IxHash') ) { $_data = $_item->[2] }
1717              
1718 0 0       0 if ( reftype $_data eq 'ARRAY' ) {
    0          
1719 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1720 0         0 } @{ $_data };
  0         0  
1721             }
1722             elsif ( reftype $_data eq 'HASH' ) {
1723 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1724 0         0 } values %{ $_data };
  0         0  
1725             }
1726             }
1727              
1728 0         0 $_item;
1729             }
1730              
1731             # iterator ( index [, index, ... ] ) # Array
1732             # iterator ( key [, key, ... ] ) # Cache, Hash, Ordhash
1733             # iterator ( "query string" ) # Cache, Hash, Ordhash, Array
1734             # iterator ( )
1735              
1736             sub iterator {
1737 2     2   8 my ( $self, @keys ) = @_;
1738              
1739 2         7 my $pkg = $self->blessed();
1740 2 50 33     29 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
1741 2 0       18 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    50          
1742              
1743 2 50 33     20 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      33        
1744 0     0   0 return sub {};
1745             }
1746              
1747             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
1748             # or similar module.
1749              
1750 2 50       16 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
1751              
1752 2 50 0     6 if ( ! @keys ) {
    0          
    0          
1753 2         7 @keys = $self->keys;
1754             }
1755             elsif ( @keys == 1 && $keys[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
1756 0 0   0   0 return sub {} unless $pkg->isa('MCE::Shared::Base::Common');
1757 0         0 @keys = $self->keys($keys[0]);
1758             }
1759             elsif ( $pkg->isa('MCE::Shared::Cache') ) {
1760 0         0 $self->_prune_head();
1761             }
1762              
1763             return sub {
1764 10 100   10   62 return unless @keys;
1765 8         12 my $key = shift @keys;
1766 8         36 return ( $key => $self->$get($key) );
1767 2         23 };
1768             }
1769              
1770             # rewind ( index [, index, ... ] ) # Array
1771             # rewind ( key [, key, ... ] ) # Cache, Hash, Ordhash
1772             # rewind ( "query string" ) # Cache, Hash, Ordhash, Array
1773             # rewind ( begin, end [, step, format ] ) # Sequence
1774             # rewind ( )
1775              
1776             sub rewind {
1777 18     18   8082 my $_id = shift()->[_ID];
1778 18         137 my $_buf = $_freeze->([ @_ ]);
1779 18         109 _req1('M~IRW', $_id.$LF . length($_buf).$LF . $_buf);
1780              
1781 18         49 return;
1782             }
1783              
1784             # next ( )
1785              
1786             sub next {
1787 127 50   127   938 local $\ = undef if (defined $\);
1788 127 50       264 local $/ = $LF if ($/ ne $LF);
1789 127         162 local $MCE::Signal::SIG;
1790              
1791 127         169 my ( $_len, $_buf );
1792              
1793             {
1794 127         177 local $MCE::Signal::IPC = 1;
  127         168  
1795 127 50       324 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1796              
1797 127         3256 print({$_DAT_W_SOCK} 'M~INX'.$LF . $_chn.$LF),
1798 127         1443 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF);
  127         2261  
1799 127         25208 chomp($_len = <$_DAU_W_SOCK>);
1800              
1801 127 100       820 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1802              
1803 127 50       386 $_dat_un->() if !$_is_MSWin32;
1804             }
1805              
1806 127 50       328 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1807              
1808 127 100       265 return if ($_len < 0);
1809              
1810 112 100       130 my $_b; return wantarray ? () : undef unless @{ $_b = $_thaw->($_buf) };
  112 100       132  
  112         972  
1811              
1812 105 50       325 if ( $_[0]->[_DECODE] ) {
1813 0   0     0 local $@; $_b->[-1] = eval { $_[0]->[_DECODE]->($_b->[-1]) } || $_b->[-1];
  0         0  
1814             }
1815              
1816             ( wantarray )
1817 105 100       333 ? @{ $_b } == 2 ? ( $_b->[0], delete $_b->[-1] ) : @{ $_b }
  24 100       95  
  12         93  
1818             : delete $_b->[-1];
1819             }
1820              
1821             ###############################################################################
1822             ## ----------------------------------------------------------------------------
1823             ## Methods optimized for:
1824             ## MCE::Shared::{ Array, Hash, Ordhash, Scalar } and similar.
1825             ##
1826             ###############################################################################
1827              
1828             sub ins_inplace {
1829 0     0   0 my $_id = shift()->[_ID];
1830              
1831 0 0       0 if ( @_ ) {
1832 0         0 my $_tmp = Storable::freeze([ @_ ]);
1833 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1834 0         0 _req2('O~PDL', $_buf, $_tmp);
1835             }
1836              
1837 0         0 return;
1838             }
1839              
1840 4     4   1081 sub FETCHSIZE { _size('FETCHSIZE', @_) }
1841 0     0   0 sub SCALAR { _size('SCALAR' , @_) }
1842 6     6   252 sub CLEAR { _req3('CLEAR' , @_) }
1843 98     98   15607716 sub FETCH { _req4('FETCH' , @_) }
1844              
1845             sub clear {
1846 34 50   34   15009 @_ > 1 ? _auto('clear', @_) : _req3('clear', @_);
1847             }
1848             sub get {
1849 59 50   59   853 @_ > 2 ? _auto('get', @_) : _req4('get', @_);
1850             }
1851              
1852             sub FIRSTKEY {
1853 6     6   167 $_[0]->[_ITER] = [ $_[0]->keys ];
1854 6         19 shift @{ $_[0]->[_ITER] };
  6         129  
1855             }
1856             sub NEXTKEY {
1857 9     9   15 shift @{ $_[0]->[_ITER] };
  9         79  
1858             }
1859              
1860             sub STORE {
1861 58 50 33 58   1097 if ( @_ > 1 && $_[0]->[_ENCODE] ) {
    50 66        
    100 33        
1862 0 0       0 $_[-1] = $_[0]->[_ENCODE]->($_[-1]) if ref($_[-1]);
1863             }
1864             elsif ( @_ == 2 && $_blessed->($_[1]) && $_[1]->can('SHARED_ID') ) {
1865 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[1]->SHARED_ID().$LF);
1866 0         0 delete $_new{ $_[1]->SHARED_ID() };
1867             }
1868             elsif ( ref $_[2] ) {
1869 4 50 33     187 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
    100 100        
1870 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1871 0         0 delete $_new{ $_[2]->SHARED_ID() };
1872             }
1873             elsif ( $_[0]->[1]->isa('MCE::Shared::Array') ||
1874             $_[0]->[1]->isa('MCE::Shared::Hash') ) {
1875 2         42 $_[2] = MCE::Shared::share({ _DEEPLY_ => 1 }, $_[2]);
1876 2         15 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1877             }
1878             }
1879 58         192 _auto('STORE', @_); 1;
  58         199  
1880             }
1881              
1882             sub set {
1883 45 50   45   2714 if ( ref $_[2] ) {
1884 0 0 0     0 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
1885 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1886 0         0 delete $_new{ $_[2]->SHARED_ID() };
1887             }
1888             }
1889 45         291 _auto('set', @_);
1890             }
1891              
1892             sub keys {
1893 145 50 66 145   839 ( @_ == 1 && !wantarray ) ? _size('keys', @_) : _auto('keys', @_);
1894             }
1895              
1896             sub lock {
1897 6     6   2002969 my ( $self ) = @_;
1898 6 50       68 Carp::croak( sprintf(
1899             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1900             )) unless $self->[_MUTEX];
1901              
1902 6         120 $self->[_MUTEX]->lock();
1903             }
1904              
1905             sub unlock {
1906 1     1   23 my ( $self ) = @_;
1907 1 50       5 Carp::croak( sprintf(
1908             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1909             )) unless $self->[_MUTEX];
1910              
1911 1         22 $self->[_MUTEX]->unlock();
1912             }
1913              
1914             {
1915 43     43   332 no strict 'refs'; *{ __PACKAGE__.'::store' } = \&STORE;
  43         98  
  43         3047  
1916             }
1917              
1918             1;
1919              
1920             __END__