File Coverage

blib/lib/MCE/Shared/Server.pm
Criterion Covered Total %
statement 463 1009 45.8
branch 191 674 28.3
condition 37 259 14.2
subroutine 81 130 62.3
pod n/a
total 772 2072 37.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Server/Object packages for MCE::Shared.
4             ##
5             ###############################################################################
6              
7 46     46   319 use strict;
  46         95  
  46         1297  
8 46     46   218 use warnings;
  46         107  
  46         1055  
9              
10 46     46   798 use 5.010001;
  46         238  
11              
12 46     46   367 no warnings qw( threads recursion uninitialized numeric once );
  46         106  
  46         3462  
13              
14             package MCE::Shared::Server;
15              
16             our $VERSION = '1.881';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (TestingAndDebugging::ProhibitNoStrict)
21             ## no critic (InputOutput::ProhibitTwoArgOpen)
22              
23 46     46   18350 use if $^O eq 'MSWin32', 'threads';
  46         495  
  46         313  
24 46     46   2093 use if $^O eq 'MSWin32', 'threads::shared';
  46         127  
  46         169  
25              
26 46     46   1593 no overloading;
  46         96  
  46         1200  
27              
28 46     46   353 use Carp ();
  46         117  
  46         1002  
29 46     46   30720 use Storable ();
  46         150496  
  46         8199  
30              
31             my ($_spawn_child, $_freeze, $_thaw);
32              
33             BEGIN {
34 46     46   180 local $@;
35              
36             eval 'use IO::FDPass ();'
37 46 50 33 46   4250 if ( ! $INC{'IO/FDPass.pm'} && $^O ne 'cygwin' );
  46         23381  
  46         15391  
  46         680  
38              
39 46 50       263 $_spawn_child = $INC{'threads.pm'} ? 0 : 1;
40              
41 46 50       176 if ( ! $INC{'PDL.pm'} ) {
42 46     46   2779 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  46     46   314  
  46         960  
  46         2582  
  46         304  
  46         763  
  46         1232  
43 46 50       377 if ( ! $@ ) {
44 46         543 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
45 46         498 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
46 46 50       224 if ( $_encoder_ver - $_decoder_ver == 0 ) {
47 46         101 $_freeze = \&Sereal::Encoder::encode_sereal;
48 46         106 $_thaw = \&Sereal::Decoder::decode_sereal;
49             }
50             }
51             }
52              
53 46 50       2295 if ( ! defined $_freeze ) {
54 0         0 $_freeze = \&Storable::freeze;
55 0         0 $_thaw = \&Storable::thaw;
56             }
57             }
58              
59 0     0   0 sub _get_freeze { $_freeze; }
60 0     0   0 sub _get_thaw { $_thaw; }
61              
62 46     46   2981 use IO::Handle ();
  46         30909  
  46         976  
63 46     46   213 use Scalar::Util qw( blessed looks_like_number reftype weaken );
  46         109  
  46         3038  
64 46     46   3084 use Socket qw( SOL_SOCKET SO_RCVBUF );
  46         18247  
  46         2910  
65 46     46   1932 use Time::HiRes qw( alarm sleep time );
  46         3810  
  46         442  
66              
67 46     46   7701 use MCE::Signal 1.863 (); # requires 1.863 minimally
  46         11693  
  46         1309  
68 46     46   2533 use MCE::Util ();
  46         27078  
  46         850  
69 46     46   213 use MCE::Mutex ();
  46         80  
  46         1073  
70 46     46   3222 use bytes;
  46         137  
  46         297  
71              
72             ## The POSIX module has many symbols. Try not loading it simply
73             ## to have WNOHANG. The following covers most platforms.
74              
75             use constant {
76 46 50       5115 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
77             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
78 46     46   2994 };
  46         80  
79              
80             use constant {
81             # Max data channels. This cannot be greater than 8 on MSWin32.
82 46         378600 DATA_CHANNELS => 8,
83              
84             SHR_M_NEW => 'M~NEW', # New share
85             SHR_M_CID => 'M~CID', # ClientID request
86             SHR_M_DEE => 'M~DEE', # Deeply shared
87             SHR_M_INC => 'M~INC', # Increment count
88             SHR_M_OBJ => 'M~OBJ', # Object request
89             SHR_M_OB0 => 'M~OB0', # Object request - thaw'less
90             SHR_M_DES => 'M~DES', # Destroy request
91             SHR_M_EXP => 'M~EXP', # Export request
92             SHR_M_INX => 'M~INX', # Iterator next
93             SHR_M_IRW => 'M~IRW', # Iterator rewind
94             SHR_M_STP => 'M~STP', # Exit loop
95              
96             SHR_O_PDL => 'O~PDL', # PDL::ins inplace(this),what,coords
97             SHR_O_DAT => 'O~DAT', # Get MCE::Hobo data
98             SHR_O_CLR => 'O~CLR', # Clear
99             SHR_O_FCH => 'O~FCH', # Fetch
100             SHR_O_SZE => 'O~SZE', # Size
101              
102             WA_ARRAY => 1, # Wants list
103 46     46   315 };
  46         106  
104              
105             ###############################################################################
106             ## ----------------------------------------------------------------------------
107             ## Private functions.
108             ##
109             ###############################################################################
110              
111             my ($_SVR, $_stopped, %_all, %_obj, %_ob2, %_ob3, %_itr, %_new) = (undef);
112             my ($_next_id, $_is_client, $_init_pid, $_svr_pid) = (0, 1);
113             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
114             my %_export_nul;
115              
116             my @_db_modules = qw(
117             AnyDBM_File DB_File GDBM_File NDBM_File ODBM_File SDBM_File
118             BerkeleyDB::Btree BerkeleyDB::Hash BerkeleyDB::Queue
119             BerkeleyDB::Recno CDB_File KyotoCabinet::DB SQLite_File
120             TokyoCabinet::ADB TokyoCabinet::BDB TokyoCabinet::HDB
121             Tie::Array::DBD Tie::Hash::DBD
122             );
123              
124             my $_is_MSWin32 = ( $^O eq 'MSWin32') ? 1 : 0;
125             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
126             my $_oid = "$$.$_tid";
127              
128             sub _croak {
129 0     0   0 Carp::carp($_[0]);
130 0         0 MCE::Signal::stop_and_exit('INT');
131             }
132             sub CLONE {
133 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
134             }
135              
136             END {
137 46 0 33 46   30009 CORE::kill('KILL', $$)
138             if ($_is_MSWin32 && $MCE::Signal::KILLED);
139 46 100 66     1653 &_stop()
      66        
140             if ($_init_pid && $_init_pid eq "$$.$_tid" && $_is_client);
141             }
142              
143             sub _new {
144 164     164   663 my ($_class, $_deeply, %_hndls) = ($_[0]->{class}, $_[0]->{_DEEPLY_});
145 164         1268 my $_has_fh = ($_class =~ /^MCE::Shared::(?:Condvar|Queue)$/);
146              
147 164 100       557 if (!$_svr_pid) {
148             # Minimum support on platforms without IO::FDPass (not installed).
149             # Condvar and Queue must be shared first before others.
150             $_export_nul{ $_class } = undef, return _share(@_)
151 34 50 66     162 if $_has_fh && !$INC{'IO/FDPass.pm'};
152              
153 34         120 _start();
154             }
155              
156 164 100       1068 if ($_has_fh) {
157             _croak("Sharing module '$_class' while the server is running\n".
158             "requires the 'IO::FDPass' module, missing in Perl")
159 32 50       269 if !$INC{'IO/FDPass.pm'};
160              
161 32         335 for my $_k (qw(
162             _qw_sock _qr_sock _aw_sock _ar_sock _cw_sock _cr_sock
163             _mutex_0 _mutex_1 _mutex_2 _mutex_3 _mutex_4 _mutex_5
164             )) {
165 384 100       962 if ( defined $_[1]->{ $_k } ) {
166 64         779 $_hndls{ $_k } = delete $_[1]->{ $_k };
167 64         255 $_[1]->{ $_k } = undef;
168             }
169             }
170             }
171              
172 164         883 my $_chn = $_SVR->{_data_channels} + 1;
173 164         1158 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
174 164         636 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
175 164         780 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
176              
177             ##
178             # Sereal cannot encode $DB_RECNO. Therefore, must encode using Storable.
179             # Error: DB_File::RECNOINFO does not define the method FIRSTKEY
180             #
181             # my $ob = tie my @db, 'MCE::Shared', { module => 'DB_File' }, $file,
182             # O_RDWR|O_CREAT, 0640, $DB_RECNO or die "open error '$file': $!";
183             ##
184              
185 164         2739 my $_buf = Storable::freeze(shift);
186 164         15749 my $_bu2 = Storable::freeze([ @_ ]);
187              
188 164 50       7451 local $\ = undef if (defined $\);
189 164 50       726 local $/ = $LF if ($/ ne $LF);
190              
191 164 50       1531 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
192              
193 164         8114 print({$_DAT_W_SOCK} SHR_M_NEW.$LF . $_chn.$LF),
194 164 100       8148 print({$_DAU_W_SOCK} length($_buf).$LF, $_buf, length($_bu2).$LF, $_bu2,
  164         5814  
195             (keys %_hndls ? 1 : 0).$LF);
196              
197 164         289937 <$_DAU_W_SOCK>;
198              
199 164         1385 undef($_buf), undef($_bu2);
200              
201 164 100       1091 if (keys %_hndls) {
202 32         119 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
203 128 100       785 if (exists $_hndls{ $_k }) {
204 58         2772 IO::FDPass::send( fileno $_DAU_W_SOCK, fileno $_hndls{ $_k } );
205 58         15529 <$_DAU_W_SOCK>;
206             }
207             }
208             }
209              
210 164         84908 chomp(my $_id = <$_DAU_W_SOCK>),
211             chomp(my $_len = <$_DAU_W_SOCK>);
212              
213 164 50       2036 read($_DAU_W_SOCK, $_buf, $_len) if $_len;
214              
215 164 50       1496 $_DAT_LOCK->unlock() if !$_is_MSWin32;
216              
217 164 50       4631 $! = $_id, return '' unless $_len;
218              
219 164 100       761 if (keys %_hndls) {
220 32         375 $_all{ $_id } = $_class;
221 32         124 $_obj{ $_id } = \%_hndls;
222             }
223              
224 164 100       630 if (!$_deeply) {
225             # for auto-destroy
226 162 50       1408 $_new{ $_id } = $_tid ? $$ .'.'. $_tid : $$;
227             }
228              
229 164         5991 return $_thaw->($_buf);
230             }
231              
232             sub _incr_count {
233 0 0   0   0 return unless $_svr_pid;
234              
235 0         0 my $_chn = $_SVR->{_data_channels} + 1;
236 0         0 my $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
237 0         0 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
238 0         0 my $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
239              
240 0 0       0 local $\ = undef if (defined $\);
241 0 0       0 local $/ = $LF if ($/ ne $LF);
242 0         0 local $MCE::Signal::SIG;
243              
244             {
245 0         0 local $MCE::Signal::IPC = 1;
  0         0  
246 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_DAT_LOCK->lock();
247              
248 0         0 print({$_DAT_W_SOCK} SHR_M_INC.$LF . $_chn.$LF),
249 0         0 print({$_DAU_W_SOCK} $_[0].$LF);
  0         0  
250 0         0 <$_DAU_W_SOCK>;
251              
252 0 0       0 $_DAT_LOCK->unlock() if !$_is_MSWin32;
253             }
254              
255 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
256              
257 0         0 return;
258             }
259              
260             sub _share {
261 0     0   0 my ($_params, $_item) = (shift, shift);
262 0         0 my $_class = delete $_params->{'class'};
263 0         0 my $_id = ++$_next_id;
264              
265 0 0       0 if ($_class eq ':construct_module:') {
    0          
266 0         0 my ($_module, $_fcn) = ($_params->{module}, pop @{ $_item });
  0         0  
267 0 0       0 my $_has_args = @{ $_item } ? 1 : 0; local $@;
  0         0  
  0         0  
268              
269 0         0 ($_module) = $_module =~ /(.*)/; # remove tainted'ness
270 0         0 ($_fcn ) = $_fcn =~ /(.*)/;
271              
272 0 0       0 MCE::Shared::_use( $_class = $_module ) or _croak("$@\n");
273              
274 0 0       0 _croak("Can't locate object method \"$_fcn\" via package \"$_module\"")
275             unless eval qq{ $_module->can('$_fcn') };
276              
277 0 0       0 $! = 0; $_item = $_module->$_fcn(@{ $_item }) or return '';
  0         0  
  0         0  
278              
279 0 0       0 $_export_nul{ $_class } = undef if ($_class->isa('Graphics::Framebuffer'));
280 0 0       0 $_export_nul{ $_class } = undef if ($_fcn eq 'TIEHANDLE');
281              
282 0 0 0     0 return '' if (
      0        
283             $_has_args && $_fcn eq 'TIEHANDLE' && !defined(fileno $_item)
284             );
285             }
286             elsif ($_class eq ':construct_pdl:') {
287 0         0 local $@; local $SIG{__DIE__};
  0         0  
288              
289 0         0 $_class = 'PDL', $_item = eval q{
290             unless ($INC{'PDL.pm'}) {
291             use PDL;
292             # Disable PDL auto-threading.
293             eval q{ PDL::set_autopthread_targ(1) };
294             }
295              
296             my $_func = pop @{ $_item };
297              
298             if ($_func eq 'sbyte' ) { sbyte (@{ $_item }); }
299             elsif ($_func eq 'byte' ) { byte (@{ $_item }); }
300             elsif ($_func eq 'short' ) { short (@{ $_item }); }
301             elsif ($_func eq 'ushort' ) { ushort (@{ $_item }); }
302             elsif ($_func eq 'long' ) { long (@{ $_item }); }
303             elsif ($_func eq 'ulong' ) { ulong (@{ $_item }); }
304             elsif ($_func eq 'indx' ) { indx (@{ $_item }); }
305             elsif ($_func eq 'longlong' ) { longlong (@{ $_item }); }
306             elsif ($_func eq 'ulonglong') { ulonglong (@{ $_item }); }
307             elsif ($_func eq 'float' ) { float (@{ $_item }); }
308             elsif ($_func eq 'double' ) { double (@{ $_item }); }
309             elsif ($_func eq 'ldouble' ) { ldouble (@{ $_item }); }
310             elsif ($_func eq 'sequence' ) { sequence (@{ $_item }); }
311             elsif ($_func eq 'zeroes' ) { zeroes (@{ $_item }); }
312             elsif ($_func eq 'zeros' ) { zeros (@{ $_item }); }
313             elsif ($_func eq 'ones' ) { ones (@{ $_item }); }
314             elsif ($_func eq 'random' ) { random (@{ $_item }); }
315             elsif ($_func eq 'grandom' ) { grandom (@{ $_item }); }
316             else { pdl (@{ $_item }); }
317             };
318             }
319              
320 0         0 $_all{ $_id } = $_class;
321 0         0 $_ob3{"$_id:count"} = 1;
322              
323 0 0 0     0 if ($_class eq 'MCE::Shared::Handle' && reftype $_item eq 'ARRAY') {
324 0         0 $_obj{ $_id } = IO::Handle->new();
325 0         0 $_export_nul{ $_class } = undef;
326              
327 0         0 bless $_obj{ $_id }, $_class;
328             }
329             else {
330 0         0 $_obj{ $_id } = $_item;
331              
332 0 0 0     0 if ( reftype $_obj{ $_id } eq 'HASH' &&
333             reftype $_obj{ $_id }->{'fh'} eq 'GLOB' ) {
334              
335 0 0       0 if ( $_class->isa('Tie::File') ) {
336             # enable autoflush, enable raw layer
337 0         0 $_obj{ $_id }->{'fh'}->autoflush(1);
338 0         0 binmode($_obj{ $_id }->{'fh'}, ':raw');
339             }
340              
341 0         0 $_export_nul{ $_class } = undef;
342             }
343             }
344              
345 0         0 my $self = bless [ $_id, $_class ], 'MCE::Shared::Object';
346              
347 0         0 $_ob2{ $_id } = $_freeze->([ $self ]);
348              
349 0 0       0 if ( $_params->{tied} ) {
350             # set encoder/decoder upon receipt in MCE::Shared::_tie
351 0         0 for my $_module ( @_db_modules ) {
352 0 0       0 $self->[2] = 1, last if $_class->isa($_module);
353             }
354 0 0       0 $_export_nul{ $_class } = undef if $self->[2];
355             }
356              
357 0         0 return $self;
358             }
359              
360             sub _start {
361 97 100   97   416 return if $_svr_pid;
362              
363 46 50       187 if ($INC{'PDL.pm'}) { local $@;
  0         0  
364             # PDL::IO::Storable is required for serializing piddles.
365 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
366             # PDL data should not be naively copied in new threads.
367 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
368             # Disable PDL auto-threading.
369 0         0 eval q{ PDL::set_autopthread_targ(1) };
370             }
371              
372 46         93 local $_; $_init_pid = "$$.$_tid", $_stopped = undef;
  46         221  
373              
374 46 50       252 my $_data_channels = ($_init_pid eq $_oid) ? DATA_CHANNELS : 2;
375              
376 46         162 $_SVR = { _data_channels => $_data_channels };
377              
378             # Defaults to the misc channel used by _new, _get_hobo_data, and export.
379 46         233 MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), 0);
380             MCE::Util::_sock_pair($_SVR, qw(_dat_r_sock _dat_w_sock), $_, 1)
381 46         8433 for (1 .. $_data_channels + 1);
382              
383 46 50 33     75524 setsockopt($_SVR->{_dat_r_sock}[0], SOL_SOCKET, SO_RCVBUF, 4096)
384             if ($^O ne 'aix' && $^O ne 'linux');
385              
386 46 50       235 if ($_is_MSWin32) {
387 0         0 for (1 .. $_data_channels + 1) {
388 0         0 my $_mutex;
389 0         0 $_SVR->{'_mutex_'.$_} = threads::shared::share($_mutex);
390             }
391             }
392             else {
393             $_SVR->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
394 46         448 for (1 .. $_data_channels + 1);
395             }
396              
397 46         156574 MCE::Shared::Object::_start();
398              
399             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
400 46 50       2597 unless $_is_MSWin32;
401              
402 46 50       261 if ($_spawn_child) {
403 46         55717 $_svr_pid = fork();
404 46 50 33     5393 _loop() if (defined $_svr_pid && $_svr_pid == 0);
405             }
406             else {
407 0         0 $_svr_pid = threads->create(\&_loop);
408 0 0       0 $_svr_pid->detach() if defined $_svr_pid;
409             }
410              
411 46 50       1275 _croak("cannot start the shared-manager process: $!")
412             unless (defined $_svr_pid);
413              
414 46 50 33     1784 sleep 0.015 if (!$_spawn_child || $_is_MSWin32);
415              
416 46         8350 return;
417             }
418              
419             sub _stop {
420 15     15   85 $_stopped = 1;
421 15 50 33     404 return unless ($_is_client && $_init_pid && $_init_pid eq "$$.$_tid");
      33        
422              
423 15 50       118 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
424 15 100       143 MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
425              
426 15         373 local ($!, $?, $@);
427              
428 15 50       110 if (defined $_svr_pid) {
429 15         74 my $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
430              
431 15 50       75 if (ref $_svr_pid) {
432 0         0 eval { $_svr_pid->kill('KILL') };
  0         0  
433             }
434             else {
435             local $SIG{HUP} = local $SIG{QUIT} = local $SIG{PIPE} =
436 15     0   1427 local $SIG{INT} = local $SIG{TERM} = sub {};
437              
438 15         258 my $_start = time;
439              
440 15         55 eval {
441 15 50       150 local $\ = undef if (defined $\);
442 15         53 print {$_DAT_W_SOCK} SHR_M_STP.$LF.'0'.$LF;
  15         1606  
443             };
444              
445 15         95 while () {
446 30 100       2578 last if waitpid($_svr_pid, _WNOHANG);
447 15 50       155 if ( time - $_start > 0.7 ) {
448 0         0 CORE::kill('USR2', $_svr_pid);
449 0         0 waitpid($_svr_pid, 0);
450 0         0 last;
451             }
452 15         677479 sleep 0.045;
453             }
454             }
455              
456 15         192 $_init_pid = $_svr_pid = undef;
457 15         245 %_all = (), %_obj = ();
458              
459 15         243 MCE::Util::_destroy_socks($_SVR, qw( _dat_w_sock _dat_r_sock ));
460              
461 15         45749 for my $_i (1 .. $_SVR->{_data_channels} + 1) {
462 135         12915 delete $_SVR->{'_mutex_'.$_i};
463             }
464              
465 15         271 MCE::Shared::Object::_stop();
466             }
467              
468 15         321 return;
469             }
470              
471             sub _pid {
472 0 0   0   0 return ref($_svr_pid) ? int($_init_pid) : $_svr_pid;
473             }
474              
475             sub _destroy {
476 0     0   0 my ($_lkup, $_item, $_id) = @_;
477              
478             # safety for circular references to not destroy dangerously
479 0 0 0     0 return if exists $_ob3{ "$_id:count" } && --$_ob3{ "$_id:count" } > 0;
480              
481             # safety for circular references to not loop endlessly
482 0 0       0 return if exists $_lkup->{ $_id };
483              
484 0         0 $_lkup->{ $_id } = undef;
485              
486 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
    0          
487 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
488 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
489             }
490 0         0 delete $_ob3{ "$_id:deeply" };
491             }
492             elsif (exists $_obj{ $_id }) {
493 0 0 0     0 if ($_obj{ $_id }->isa('MCE::Shared::Scalar') ||
    0          
    0          
    0          
    0          
    0          
    0          
494             $_obj{ $_id }->isa('Tie::StdScalar')) {
495              
496 0 0       0 if (blessed($_item->FETCH())) {
497 0         0 my $_oid = $_item->FETCH()->SHARED_ID();
498 0         0 _destroy($_lkup, $_obj{ $_oid }, $_oid);
499             }
500              
501 0         0 undef ${ $_obj{ $_id } };
  0         0  
502             }
503 0         0 elsif ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
504 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
505 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
506 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
507 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
508             elsif (reftype $_obj{ $_id } eq 'GLOB') {
509 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
510             }
511             }
512              
513 0 0       0 weaken( delete $_obj{ $_id } ) if exists($_obj{ $_id });
514 0 0       0 weaken( delete $_itr{ $_id } ) if exists($_itr{ $_id });
515              
516             delete($_itr{ "$_id:args" }), delete($_all{ $_id }),
517 0         0 delete($_ob3{ "$_id:count" }), delete($_ob2{ $_id });
518              
519 0         0 return;
520             }
521              
522             ###############################################################################
523             ## ----------------------------------------------------------------------------
524             ## Server loop.
525             ##
526             ###############################################################################
527              
528             sub _exit {
529 0 0   0   0 $SIG{__DIE__} = sub {} unless $_tid;
        0      
530 0     0   0 $SIG{__WARN__} = sub {};
531              
532             # Flush file handles.
533 0         0 for my $_id ( keys %_obj ) {
534 0         0 eval {
535 0 0       0 if ($_obj{ $_id }->isa('Tie::File')) { $_obj{ $_id }->flush(); }
  0 0       0  
    0          
    0          
    0          
    0          
536 0         0 elsif ($_obj{ $_id }->can('sync')) { $_obj{ $_id }->sync(); }
537 0         0 elsif ($_obj{ $_id }->can('db_sync')) { $_obj{ $_id }->db_sync(); }
538 0         0 elsif ($_obj{ $_id }->can('close')) { $_obj{ $_id }->close(); }
539 0         0 elsif ($_obj{ $_id }->can('DESTROY')) { delete $_obj{ $_id }; }
540             elsif (reftype $_obj{ $_id } eq 'GLOB') {
541 0 0       0 close $_obj{ $_id } if defined(fileno $_obj{ $_id });
542             }
543             };
544             }
545              
546             # Destroy non-exportable objects.
547 0         0 for my $_id ( keys %_all ) {
548 0         0 eval {
549             weaken( delete $_obj{ $_id } )
550 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } );
551             };
552             }
553              
554             # Wait for the main thread to exit.
555 0 0 0     0 if ( !$_spawn_child && ($_is_MSWin32 || $INC{'Tk.pm'} || $INC{'Wx.pm'}) ) {
      0        
556 0         0 sleep 1.0;
557             }
558              
559 0 0 0     0 threads->exit(0) if ( !$_spawn_child || $_is_MSWin32 );
560              
561 0 0       0 CORE::kill('KILL', $$) unless $_is_MSWin32;
562 0         0 CORE::exit(0);
563             }
564              
565             sub _loop {
566 0     0   0 $_is_client = 0;
567              
568 0 0 0     0 $MCE::MCE = undef if ($MCE::MCE && $MCE::MCE->{_wid} == 0);
569              
570 0         0 local $\ = undef; local $/ = $LF; $| = 1;
  0         0  
  0         0  
571 0         0 my $_running_inside_eval = $^S;
572              
573       0     local $SIG{TERM} = local $SIG{QUIT} = local $SIG{INT} = local $SIG{HUP} = sub {}
574 0 0       0 if ($_init_pid eq $_oid);
575              
576 0     0   0 local $SIG{PIPE} = sub { $SIG{PIPE} = sub {}; CORE::kill('PIPE', getppid()); }
  0         0  
577 0 0 0     0 if ($_spawn_child && !$_is_MSWin32);
578              
579 0 0 0     0 local $SIG{USR2} = \&_exit if ($_init_pid eq $_oid && !$_is_MSWin32);
580 0 0 0     0 local $SIG{KILL} = \&_exit if ($_init_pid eq $_oid && !$_spawn_child);
581              
582             local $SIG{__DIE__} = sub {
583 0 0 0 0   0 if (!defined $^S || $^S) {
584 0 0 0     0 if ( ($INC{'threads.pm'} && threads->tid() != 0) ||
      0        
      0        
585             $ENV{'PERL_IPERL_RUNNING'} ||
586             $_running_inside_eval
587             ) {
588             # thread env or running inside IPerl, check stack trace
589 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
590 0 0 0     0 CORE::die(@_)
591             if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
592             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ );
593             }
594             else {
595             # normal env, trust $^S
596 0         0 CORE::die(@_);
597             }
598             }
599              
600 0         0 $SIG{INT} = $SIG{__DIE__} = $SIG{__WARN__} = sub {};
601 0 0       0 print {*STDERR} defined $_[0] ? $_[0] : '';
  0         0  
602              
603 0 0 0     0 ( $_spawn_child && !$_is_MSWin32 )
604             ? CORE::kill('KILL', -getpgrp)
605             : CORE::exit($?);
606 0         0 };
607              
608 0         0 my ($_id, $_fcn, $_wa, $_len, $_func, $_var);
609 0         0 my ($_channel_id, $_done) = (0, 0);
610              
611 0         0 my $_channels = $_SVR->{_dat_r_sock};
612 0         0 my $_DAT_R_SOCK = $_SVR->{_dat_r_sock}[0];
613 0         0 my $_DAU_R_SOCK;
614              
615             my $_auto_reply = sub {
616 0 0   0   0 if ( $_wa == WA_ARRAY ) {
617 0         0 my @_ret = eval { $_var->$_fcn(@_) };
  0         0  
618 0         0 my $_buf = $_freeze->(\@_ret);
619 0         0 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
620             }
621              
622 0         0 my $_ret = eval { $_var->$_fcn(@_) };
  0         0  
623 0         0 my $_buf = $_freeze->([ $_ret ]);
624              
625 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
626 0         0 };
627              
628             my $_obj_keys = sub {
629 0     0   0 my ( $_obj, @_keys, $_cnt ) = ( shift );
630              
631 0 0       0 return keys %{ $_obj } if $_obj->isa('Tie::StdHash');
  0         0  
632 0 0       0 return (0 .. $_obj->FETCHSIZE - 1) unless $_obj->can('FIRSTKEY');
633              
634 0 0       0 if ( wantarray ) {
    0          
    0          
635 0         0 my $_key = $_obj->FIRSTKEY;
636 0 0       0 if ( defined $_key ) {
637 0         0 push @_keys, $_key;
638             # CDB_File expects the $_key argument
639 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
640 0         0 push @_keys, $_key;
641             }
642             }
643             }
644             elsif ( $_obj->isa('Tie::ExtraHash') ) {
645 0         0 $_cnt = keys %{ $_obj->[0] };
  0         0  
646             }
647             elsif ( $_obj->isa('Tie::IxHash') ) {
648 0         0 $_cnt = keys %{ $_obj->[2] };
  0         0  
649             }
650             else {
651 0         0 my $_key = $_obj->FIRSTKEY; $_cnt = 0;
  0         0  
652 0 0       0 if ( defined $_key ) {
653 0         0 $_cnt = 1;
654             # CDB_File expects the $_key argument
655 0         0 while ( defined( $_key = $_obj->NEXTKEY($_key) ) ) {
656 0         0 $_cnt++;
657             }
658             }
659             }
660              
661 0 0       0 wantarray ? @_keys : $_cnt;
662 0         0 };
663              
664             my $_iter = sub {
665 0 0   0   0 unless ( exists $_itr{ $_id } ) {
666              
667 0         0 my $pkg = $_all{ $_id };
668 0 0 0     0 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
669 0 0       0 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    0          
670              
671 0 0 0     0 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      0        
672 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
673 0         0 return;
674             }
675              
676             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
677             # or similar module.
678              
679 0 0       0 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
680              
681 0 0       0 if ( !exists $_itr{ "$_id:args" } ) {
682 0         0 @{ $_itr{ "$_id:args" } } = $pkg->can('keys')
683             ? $_obj{ $_id }->keys()
684 0 0       0 : $_obj_keys->( $_obj{ $_id } );
685             }
686             else {
687 0         0 my $_args = $_itr{ "$_id:args" };
688 0 0 0     0 if ( @{ $_args } == 1 &&
  0         0  
689             $_args->[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
690              
691 0 0       0 @{ $_args } = $_obj{ $_id }->keys($_args->[0])
  0         0  
692             if $pkg->isa('MCE::Shared::Base::Common');
693             }
694             else {
695 0 0       0 $_obj{ $_id }->_prune_head()
696             if $pkg->isa('MCE::Shared::Cache');
697             }
698             }
699              
700             $_itr{ $_id } = sub {
701 0         0 my $_key = shift @{ $_itr{ "$_id:args" } };
  0         0  
702 0 0       0 print({$_DAU_R_SOCK} '-1'.$LF), return if !defined($_key);
  0         0  
703 0         0 my $_buf = $_freeze->([ $_key, $_obj{ $_id }->$get($_key) ]);
704 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
705 0         0 };
706             }
707              
708 0         0 $_itr{ $_id }->();
709              
710 0         0 return;
711 0         0 };
712              
713             my $_warn = sub {
714 0 0   0   0 if ( $_wa ) {
715 0         0 my $_buf = $_freeze->([ ]);
716 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
717             }
718 0         0 };
719              
720             # --------------------------------------------------------------------------
721              
722 0         0 my %_output_function; %_output_function = (
723              
724             SHR_M_NEW.$LF => sub { # New share
725 0     0   0 my ($_buf, $_params, $_class, $_args, $_fd, $_item);
726              
727 0         0 chomp($_len = <$_DAU_R_SOCK>),
728             read($_DAU_R_SOCK, $_buf, $_len);
729              
730 0         0 $_params = Storable::thaw($_buf);
731 0         0 $_class = $_params->{'class'};
732              
733 0   0     0 { local $@; MCE::Shared::_use($_params->{module} || $_class); }
  0         0  
  0         0  
734              
735             chomp($_len = <$_DAU_R_SOCK>), read($_DAU_R_SOCK, $_buf, $_len),
736 0         0 chomp($_len = <$_DAU_R_SOCK>), print({$_DAU_R_SOCK} $LF);
  0         0  
737              
738 0         0 $_args = Storable::thaw($_buf); undef $_buf;
  0         0  
739              
740 0 0       0 if ($_len) {
741 0         0 $_export_nul{ $_class } = undef;
742              
743 0         0 for my $_k (qw( _qw_sock _qr_sock _aw_sock _cw_sock )) {
744 0 0       0 if (exists $_args->[0]->{ $_k }) {
745 0         0 delete $_args->[0]->{ $_k };
746              
747 0 0       0 $_fd = IO::FDPass::recv(fileno $_DAU_R_SOCK); $_fd >= 0
  0         0  
748             or _croak("cannot receive file handle: $!");
749              
750 0 0       0 open $_args->[0]->{ $_k }, "+<&=$_fd"
751             or _croak("cannot convert file discriptor to handle: $!");
752              
753 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
754             }
755             }
756             }
757              
758 0 0       0 $_item = _share($_params, @{ $_args }) or do {
  0         0  
759 0         0 print {$_DAU_R_SOCK} int($!).$LF . '0'.$LF;
  0         0  
760 0         0 return;
761             };
762              
763 0         0 $_buf = $_freeze->($_item);
764              
765 0         0 print {$_DAU_R_SOCK} $_item->SHARED_ID().$LF .
  0         0  
766             length($_buf).$LF, $_buf;
767              
768 0 0       0 if ($_class eq 'MCE::Shared::Queue') {
    0          
    0          
769             MCE::Shared::Queue::_init_mgr(
770             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
771 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
772             }
773             elsif (reftype $_obj{ $_item->[0] } eq 'GLOB') {
774             MCE::Shared::Handle::_init_mgr(
775             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
776 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
777             }
778             elsif ($_class eq 'MCE::Shared::Condvar') {
779             MCE::Shared::Condvar::_init_mgr(
780             \$_DAU_R_SOCK, \%_obj, \%_output_function
781 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
782             }
783              
784 0         0 return;
785             },
786              
787             SHR_M_CID.$LF => sub { # ClientID request
788 0     0   0 print {$_DAU_R_SOCK} (++$_channel_id).$LF;
  0         0  
789 0 0       0 $_channel_id = 0 if ($_channel_id >= $_SVR->{_data_channels});
790              
791 0         0 return;
792             },
793              
794             SHR_M_DEE.$LF => sub { # Deeply shared
795 0     0   0 chomp(my $_id1 = <$_DAU_R_SOCK>),
796             chomp(my $_id2 = <$_DAU_R_SOCK>);
797              
798 0         0 $_ob3{ "$_id1:deeply" }->{ $_id2 } = undef;
799              
800 0         0 return;
801             },
802              
803             SHR_M_INC.$LF => sub { # Increment count
804 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
805              
806 0         0 $_ob3{ "$_id:count" }++;
807 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
808              
809 0         0 return;
810             },
811              
812             SHR_M_OBJ.$LF => sub { # Object request
813 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
814             chomp($_fcn = <$_DAU_R_SOCK>),
815             chomp($_wa = <$_DAU_R_SOCK>),
816             chomp($_len = <$_DAU_R_SOCK>),
817              
818             read($_DAU_R_SOCK, my($_buf), $_len);
819              
820 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
821              
822 0         0 $_wa ? $_auto_reply->(@{ $_thaw->($_buf) })
823 0 0       0 : eval { $_var->$_fcn(@{ $_thaw->($_buf) }) };
  0         0  
  0         0  
824              
825 0 0       0 warn $@ if $@;
826 0         0 return;
827             },
828              
829             SHR_M_OB0.$LF => sub { # Object request - thaw'less
830 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
831             chomp($_fcn = <$_DAU_R_SOCK>),
832             chomp($_wa = <$_DAU_R_SOCK>);
833              
834 0   0     0 $_var = $_obj{ $_id } || do { return $_warn->($_fcn) };
835              
836 0   0     0 my $_code = $_var->can($_fcn) || do {
837             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
838             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
839             $_obj_keys;
840             }
841             else {
842             $_wa ? $_auto_reply->() : eval { $_var->$_fcn() };
843             warn $@ if $@;
844             return;
845             }
846             };
847              
848 0 0       0 if ( $_wa == WA_ARRAY ) {
    0          
849 0         0 my @_ret = eval { $_code->($_var) };
  0         0  
850 0         0 my $_buf = $_freeze->(\@_ret);
851 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
852             }
853             elsif ( $_wa ) {
854 0         0 my $_ret = eval { $_code->($_var) };
  0         0  
855 0         0 my $_buf = $_freeze->([ $_ret ]);
856 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
857             }
858             else {
859 0         0 eval { $_code->($_var) };
  0         0  
860             }
861              
862 0 0       0 warn $@ if $@;
863 0         0 return;
864             },
865              
866             SHR_M_DES.$LF => sub { # Destroy request
867 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
868              
869 0         0 local $SIG{__DIE__};
870 0         0 local $SIG{__WARN__};
871              
872 0         0 $_var = undef; local $@;
  0         0  
873              
874 0         0 eval {
875 0 0       0 my $_ret = exists($_all{ $_id }) ? '1' : '0';
876 0 0       0 _destroy({}, $_obj{ $_id }, $_id) if $_ret;
877             };
878              
879 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
880              
881 0         0 return;
882             },
883              
884             SHR_M_EXP.$LF => sub { # Export request
885 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
886             chomp($_len = <$_DAU_R_SOCK>);
887              
888 0 0       0 read($_DAU_R_SOCK, my($_keys), $_len) if $_len;
889              
890 0 0       0 if (exists $_obj{ $_id }) {
891 0         0 my $_buf;
892              
893             # Do not export: e.g. objects with file handles
894 0 0       0 if ( exists $_export_nul{ $_all{ $_id } } ) {
895 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
896 0         0 return;
897             }
898              
899             # MCE::Shared::{ Array, Hash, Ordhash }, Hash::Ordered
900 0 0       0 if ($_obj{ $_id }->can('clone')) {
901             $_buf = ($_len)
902 0         0 ? Storable::freeze($_obj{ $_id }->clone(@{ $_thaw->($_keys) }))
903 0 0       0 : Storable::freeze($_obj{ $_id });
904             }
905             # Other
906             else {
907 0         0 $_buf = Storable::freeze($_obj{ $_id });
908             }
909              
910 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
911 0         0 undef $_buf;
912             }
913             else {
914 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
915             }
916              
917 0         0 return;
918             },
919              
920             SHR_M_INX.$LF => sub { # Iterator next
921 0     0   0 chomp($_id = <$_DAU_R_SOCK>);
922              
923 0         0 my $_var = $_obj{ $_id };
924              
925 0 0       0 if ( my $_code = $_var->can('next') ) {
926 0         0 my $_buf = $_freeze->([ $_code->($_var) ]);
927 0         0 print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  0         0  
928             }
929             else {
930 0         0 $_iter->();
931             }
932              
933 0         0 return;
934             },
935              
936             SHR_M_IRW.$LF => sub { # Iterator rewind
937 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
938             chomp($_len = <$_DAU_R_SOCK>),
939              
940             read($_DAU_R_SOCK, my($_buf), $_len);
941              
942 0         0 my $_var = $_obj{ $_id };
943              
944 0 0       0 if ( my $_code = $_var->can('rewind') ) {
945 0         0 $_code->($_var, @{ $_thaw->($_buf) });
  0         0  
946             }
947             else {
948 0 0       0 weaken( delete $_itr{ $_id } ) if ( exists $_itr{ $_id } );
949 0         0 my @_args = @{ $_thaw->($_buf) };
  0         0  
950 0 0       0 if ( @_args ) {
951 0         0 $_itr{ "$_id:args" } = \@_args;
952             } else {
953 0         0 delete $_itr{ "$_id:args" };
954             }
955             }
956              
957 0         0 print {$_DAU_R_SOCK} $LF;
  0         0  
958              
959 0         0 return;
960             },
961              
962             SHR_M_STP.$LF => sub { # Exit loop
963 0 0   0   0 $SIG{USR2} = sub {} unless $_is_MSWin32;
964              
965 0         0 $_done = 1;
966              
967 0         0 return;
968             },
969              
970             SHR_O_PDL.$LF => sub { # PDL::ins inplace(this),...
971 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
972             chomp($_len = <$_DAU_R_SOCK>),
973              
974             read($_DAU_R_SOCK, my($_buf), $_len);
975              
976 0 0       0 if ($_all{ $_id } eq 'PDL') {
977             # PDL ins( inplace($this), $what, @coords );
978 0         0 local @_ = @{ Storable::thaw($_buf) };
  0         0  
979              
980 0 0 0     0 if (@_ == 1) {
    0 0        
    0 0        
    0          
    0          
981 0         0 ins( inplace($_obj{ $_id }), @_, 0, 0 );
982             }
983             elsif (@_ == 2 && $_[0] =~ /^:,(\d+):(\d+)/ && ref($_[1])) {
984 0         0 my $_s = $2 - $1;
985 0         0 ins( inplace($_obj{ $_id }), $_[1]->slice(":,0:$_s"), 0, $1 );
986             }
987             elsif (!ref($_[0]) && $_[0] =~ /^(\d+)$/) {
988 0         0 $_obj{ $_id }->set(@_);
989             }
990             elsif (@_ == 2) {
991 0         0 $_[0] =~ /^:,(\d+)/;
992 0   0     0 ins( inplace($_obj{ $_id }), $_[1], 0, $1 // $_[0] );
993             }
994             elsif (@_ > 2) {
995 0         0 ins( inplace($_obj{ $_id }), @_ );
996             }
997             }
998              
999 0         0 return;
1000             },
1001              
1002             SHR_O_DAT.$LF => sub { # Get MCE::Hobo data
1003 0     0   0 my $_key;
1004              
1005 0         0 chomp($_id = <$_DAU_R_SOCK>),
1006             chomp($_key = <$_DAU_R_SOCK>);
1007              
1008 0   0     0 my $error = delete $_obj{ $_id }{ 'S'.$_key } // '';
1009 0   0     0 my $result = delete $_obj{ $_id }{ 'R'.$_key } // '';
1010              
1011 0         0 print {$_DAU_R_SOCK}
  0         0  
1012             length($error).$LF . length($result).$LF . $error, $result;
1013              
1014 0         0 return;
1015             },
1016              
1017             SHR_O_CLR.$LF => sub { # Clear
1018 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1019             chomp($_fcn = <$_DAU_R_SOCK>);
1020              
1021 0   0     0 my $_var = $_obj{ $_id } || do { return };
1022              
1023 0 0       0 if (exists $_ob3{ "$_id:deeply" }) {
1024 0         0 my $_keep = { $_id => 1 };
1025 0         0 for my $_oid (keys %{ $_ob3{ "$_id:deeply" } }) {
  0         0  
1026 0         0 _destroy($_keep, $_obj{ $_oid }, $_oid);
1027             }
1028 0         0 delete $_ob3{ "$_id:deeply" };
1029             }
1030              
1031 0         0 eval { $_var->$_fcn() };
  0         0  
1032              
1033 0 0       0 warn $@ if $@;
1034 0         0 return;
1035             },
1036              
1037             SHR_O_FCH.$LF => sub { # Fetch
1038 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1039             chomp($_fcn = <$_DAU_R_SOCK>),
1040             chomp($_len = <$_DAU_R_SOCK>);
1041              
1042 0 0       0 read($_DAU_R_SOCK, my($_key), $_len) if $_len;
1043              
1044 0   0     0 my $_var = $_obj{ $_id } || do {
1045             return print {$_DAU_R_SOCK} '-1'.$LF;
1046             };
1047              
1048             my $_buf = $_len
1049 0 0       0 ? eval { $_var->$_fcn( chop $_key ? ${ $_thaw->($_key) } : $_key ) }
  0         0  
1050 0 0       0 : eval { $_var->$_fcn() };
  0         0  
1051              
1052 0 0       0 warn $@ if $@;
1053              
1054 0 0       0 return print {$_DAU_R_SOCK} '-1'.$LF if ( !defined $_buf );
  0         0  
1055              
1056             my $_ret = ( blessed($_buf) && $_buf->can('SHARED_ID') && $_ob2{ $_buf->[0] } )
1057 0 0 0     0 ? $_ob2{ $_buf->[0] }
1058             : $_freeze->([ $_buf ]);
1059              
1060 0         0 print {$_DAU_R_SOCK} length($_ret).$LF, $_ret;
  0         0  
1061              
1062 0         0 return;
1063             },
1064              
1065             SHR_O_SZE.$LF => sub { # Size
1066 0     0   0 chomp($_id = <$_DAU_R_SOCK>),
1067             chomp($_fcn = <$_DAU_R_SOCK>);
1068              
1069 0   0     0 $_var = $_obj{ $_id } || do {
1070             print {$_DAU_R_SOCK} $LF if $_wa;
1071             return;
1072             };
1073              
1074 0   0     0 my $_code = $_var->can($_fcn) || do {
1075             if ( ($_fcn eq 'keys' || $_fcn eq 'SCALAR') &&
1076             ($_var->can('NEXTKEY') || $_var->can('FETCHSIZE')) ) {
1077             $_obj_keys;
1078             }
1079             else {
1080             $_len = eval { $_var->$_fcn() };
1081             print {$_DAU_R_SOCK} $_len.$LF;
1082              
1083             warn $@ if $@;
1084             return;
1085             }
1086             };
1087              
1088 0         0 $_len = eval { $_code->($_var) };
  0         0  
1089 0         0 print {$_DAU_R_SOCK} $_len.$LF;
  0         0  
1090              
1091 0 0       0 warn $@ if $@;
1092 0         0 return;
1093             },
1094              
1095 0         0 );
1096              
1097             MCE::Shared::Queue::_init_mgr(
1098             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1099 0 0       0 ) if $INC{'MCE/Shared/Queue.pm'};
1100              
1101             MCE::Shared::Handle::_init_mgr(
1102             \$_DAU_R_SOCK, \%_obj, \%_output_function, $_freeze, $_thaw
1103 0 0       0 ) if $INC{'MCE/Shared/Handle.pm'};
1104              
1105             MCE::Shared::Condvar::_init_mgr(
1106             \$_DAU_R_SOCK, \%_obj, \%_output_function
1107 0 0       0 ) if $INC{'MCE/Shared/Condvar.pm'};
1108              
1109             # --------------------------------------------------------------------------
1110              
1111             # Call on hash function.
1112              
1113 0 0       0 if ($_is_MSWin32) {
1114             # The normal loop hangs on Windows when processes/threads start/exit.
1115             # Using ioctl() properly, https://www.perlmonks.org/?node_id=780083
1116              
1117 0         0 my $_val_bytes = pack('L', 0);
1118 0         0 my ($_count, $_nbytes, $_start);
1119              
1120 0         0 while (!$_done) {
1121 0         0 $_start = time, $_count = 1;
1122              
1123             # MSWin32 FIONREAD
1124 0         0 IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_val_bytes);
1125              
1126 0 0       0 unless ($_nbytes = unpack('L', $_val_bytes)) {
1127 0 0       0 if ($_count) {
1128             # delay after a while to not consume a CPU core
1129 0 0 0     0 $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030;
1130             } else {
1131 0         0 sleep 0.015;
1132             }
1133 0         0 goto IOCTL;
1134             }
1135              
1136 0         0 do {
1137 0         0 sysread($_DAT_R_SOCK, $_func, 8);
1138 0 0       0 $_done = 1, last() unless length($_func) == 8;
1139 0         0 $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
1140              
1141 0         0 $_output_function{$_func}();
1142              
1143             } while (($_nbytes -= 8) >= 8);
1144             }
1145             }
1146             else {
1147 0         0 while (!$_done) {
1148 0         0 $_func = <$_DAT_R_SOCK>;
1149 0 0       0 last() unless length($_func) == 6;
1150 0         0 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
1151              
1152 0         0 $_output_function{$_func}();
1153             }
1154             }
1155              
1156 0         0 _exit();
1157             }
1158              
1159             ###############################################################################
1160             ## ----------------------------------------------------------------------------
1161             ## Object package.
1162             ##
1163             ###############################################################################
1164              
1165             package MCE::Shared::Object;
1166              
1167 46     46   527 use Scalar::Util qw( looks_like_number reftype );
  46         102  
  46         2913  
1168 46     46   23511 use MCE::Shared::Base ();
  46         108  
  46         1117  
1169 46     46   281 use bytes;
  46         90  
  46         257  
1170              
1171             use constant {
1172 46 50       3326 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
1173             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
1174 46     46   2462 };
  46         114  
1175             use constant {
1176 46         4748 _ID => 0, _CLASS => 1, _ENCODE => 2, _DECODE => 3, # shared object
1177             _DREF => 4, _ITER => 5, _MUTEX => 6,
1178 46     46   298 };
  46         85  
1179             use constant {
1180 46         4814 _UNDEF => 0, _ARRAY => 1, _SCALAR => 2, # wantarray
1181 46     46   316 };
  46         118  
1182              
1183             ## Below, no circular reference to original, therefore no memory leaks.
1184              
1185             use overload (
1186             q("") => \&MCE::Shared::Base::_stringify,
1187             q(0+) => \&MCE::Shared::Base::_numify,
1188             q(@{}) => sub {
1189 46     46   389 no overloading;
  46         86  
  46         9517  
1190 7 50   7   2103 $_[0]->[_DREF] || do {
1191 7         37 local $@; my $c = $_[0]->[_CLASS];
  7         65  
1192 7         144 ($c) = $c =~ /(.*)/; # remove tainted'ness
1193 7 50       1309 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEARRAY') };
1194 7         76 tie my @a, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  7         199  
1195 7         114 $_[0]->[_DREF] = \@a;
1196             };
1197             },
1198             q(%{}) => sub {
1199 46     46   470 no overloading;
  46         103  
  46         8961  
1200 9 50   9   5923 $_[0]->[_DREF] || do {
1201 9         118 local $@; my $c = $_[0]->[_CLASS];
  9         95  
1202 9         181 ($c) = $c =~ /(.*)/; # remove tainted'ness
1203 9 50       2386 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIEHASH') };
1204 9         92 tie my %h, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  9         433  
1205 9         190 $_[0]->[_DREF] = \%h;
1206             };
1207             },
1208             q(${}) => sub {
1209 46     46   553 no overloading;
  46         113  
  46         8109  
1210 0 0   0   0 $_[0]->[_DREF] || do {
1211 0         0 local $@; my $c = $_[0]->[_CLASS];
  0         0  
1212 0         0 ($c) = $c =~ /(.*)/; # remove tainted'ness
1213 0 0       0 return $_[0] unless eval qq{ eval { require $c }; $c->can('TIESCALAR') };
1214 0         0 tie my $s, __PACKAGE__, bless([ @{ $_[0] }[ 0..3 ] ], __PACKAGE__);
  0         0  
1215 0         0 $_[0]->[_DREF] = \$s;
1216             };
1217             },
1218 46         622 fallback => 1
1219 46     46   34199 );
  46         26993  
1220              
1221 46     46   5980 no overloading;
  46         96  
  46         4164  
1222              
1223             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_dat_ex, $_dat_un);
1224              
1225             my $_blessed = \&Scalar::Util::blessed;
1226              
1227             BEGIN {
1228 0         0 $_dat_ex = sub { _croak (
1229             "\nPlease start the shared-manager process manually when ready.\n",
1230             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1231 46     46   124987 ) };
1232             }
1233              
1234             # Hook for threads.
1235              
1236             sub CLONE {
1237 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
1238 0 0       0 &_init($_tid) if $_tid;
1239             }
1240              
1241             # Private functions.
1242              
1243             sub DESTROY {
1244 116 100   116   3130600 return if $_stopped;
1245 113 50 33     1649 return unless ( $_is_client && defined $_svr_pid && defined $_[0] );
      33        
1246              
1247 113 100 33     1519 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      66        
1248 58         312 local ($!, $?);
1249 58 50 33     1010 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1250             }
1251              
1252 113         528 my $_id = $_[0]->[_ID];
1253              
1254 113 100       507 if ( exists $_new{ $_id } ) {
1255 48 50       290 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1256              
1257 48 50       229 if ($_new{ $_id } eq $_pid) {
1258 48 50       187 return if $MCE::Signal::KILLED;
1259              
1260             delete($_all{ $_id }), delete($_obj{ $_id }),
1261             delete($_new{ $_id }), delete($_ob2{ $_id }),
1262 48         537 delete($_ob3{"$_id:count"});
1263              
1264 48         283 _req1('M~DES', $_id.$LF);
1265             }
1266             }
1267              
1268 113         1473 return;
1269             }
1270              
1271 0     0   0 sub _croak { goto &MCE::Shared::Base::_croak }
1272              
1273 2     2   22 sub SHARED_ID { $_[0]->[_ID] }
1274              
1275 9     9   46 sub TIEARRAY { $_[1] }
1276 3     3   26 sub TIEHANDLE { $_[1] }
1277 9     9   82 sub TIEHASH { $_[1] }
1278 0     0   0 sub TIESCALAR { $_[1] }
1279              
1280             sub _reset {
1281             MCE::Shared::Object::_init_condvar(
1282             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1283             $_freeze, $_thaw
1284 93 100   93   831 ) if $INC{'MCE/Shared/Condvar.pm'};
1285              
1286             MCE::Shared::Object::_init_handle(
1287             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1288             $_freeze, $_thaw
1289 93 100       537 ) if $INC{'MCE/Shared/Handle.pm'};
1290              
1291             MCE::Shared::Object::_init_queue(
1292             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, \%_obj,
1293             $_freeze, $_thaw
1294 93 100       718 ) if $INC{'MCE/Shared/Queue.pm'};
1295             }
1296              
1297             sub _start {
1298 46     46   194 $_chn = $_SVR->{_data_channels} + 1;
1299 46         218 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1300 46         137 $_DAT_W_SOCK = $_SVR->{_dat_w_sock}[0];
1301 46         111 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1302              
1303             # inlined for performance
1304             $_dat_ex = sub {
1305 2025 50   2025   6248 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1306 2025 50       4938 MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock}) if $_is_MSWin32;
1307             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
1308 2025 50       12179 unless $_DAT_LOCK->{ $_pid };
1309 46         429 };
1310             $_dat_un = sub {
1311 2025 50   2025   8999 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1312             CORE::syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
1313 2025 50       28038 if $_DAT_LOCK->{ $_pid };
1314 46         255 };
1315              
1316 46         168 _reset();
1317             }
1318              
1319             sub _stop {
1320 15     15   416 $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_dat_un = undef;
1321              
1322 0     0   0 $_dat_ex = sub { _croak (
1323             "\nPlease start the shared-manager process manually when ready.\n",
1324             "See section labeled \"Extra Functionality\" in MCE::Shared.\n\n"
1325 15         173 ) };
1326              
1327 15         1435 return;
1328             }
1329              
1330             sub _get_channel_id {
1331 31 50   31   996 local $\ = undef if (defined $\);
1332 31 50       889 local $/ = $LF if ($/ ne $LF);
1333 31         814 local $MCE::Signal::SIG;
1334              
1335 31         271 my $_ret;
1336              
1337             {
1338 31         204 local $MCE::Signal::IPC = 1;
  31         878  
1339 31 50       1585 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1340              
1341 31         2411 print {$_DAT_W_SOCK} 'M~CID'.$LF . $_chn.$LF;
  31         3787  
1342 31         5190 chomp($_ret = <$_DAU_W_SOCK>);
1343              
1344 31 50       1127 $_dat_un->() if !$_is_MSWin32;
1345             }
1346              
1347 31 50       446 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1348              
1349 31         366 return $_ret;
1350             }
1351              
1352             sub _init {
1353 31 50   31   1216 return unless defined $_SVR;
1354              
1355 31   33     2602 my $_id = $_[0] // &_get_channel_id();
1356 31 50       1793 $_id = $$ if ( $_id !~ /\d+/ );
1357              
1358 31         587 $_chn = abs($_id) % $_SVR->{_data_channels} + 1;
1359 31         622 $_DAT_LOCK = $_SVR->{'_mutex_'.$_chn};
1360 31         359 $_DAU_W_SOCK = $_SVR->{_dat_w_sock}[$_chn];
1361              
1362 31         2053 %_new = (), _reset();
1363              
1364 31         276 return;
1365             }
1366              
1367             ###############################################################################
1368             ## ----------------------------------------------------------------------------
1369             ## Private routines.
1370             ##
1371             ###############################################################################
1372              
1373             # Called by AUTOLOAD, STORE, set, and keys.
1374              
1375             sub _auto {
1376 1233 100   1233   4618 my $_wa = !defined wantarray ? _UNDEF : wantarray ? _ARRAY : _SCALAR;
    100          
1377              
1378 1233 50       4324 local $\ = undef if (defined $\);
1379 1233         2712 local $MCE::Signal::SIG;
1380              
1381 1233         2118 my $_buf;
1382              
1383             {
1384 1233         2020 local $MCE::Signal::IPC = 1;
  1233         2411  
1385 1233 50       4668 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1386              
1387 1233 100       22260 if ( @_ == 2 ) {
1388 155         5366 print({$_DAT_W_SOCK} 'M~OB0'.$LF . $_chn.$LF),
1389 155         385 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . $_wa.$LF);
  155         3743  
1390             }
1391             else {
1392 1078         14794 my ( $_fcn, $_id, $_buf ) = ( shift, shift()->[_ID], $_freeze->([ @_ ]) );
1393 1078         5012 my $_tmp = $_id.$LF . $_fcn.$LF . $_wa.$LF . length($_buf).$LF;
1394 1078         43760 print({$_DAT_W_SOCK} 'M~OBJ'.$LF . $_chn.$LF),
1395 1078         1791 print({$_DAU_W_SOCK} $_tmp, $_buf);
  1078         29044  
1396             }
1397              
1398 1233 100       4966 if ( $_wa ) {
1399 916 50       3192 local $/ = $LF if ($/ ne $LF);
1400 916         592587 chomp(my $_len = <$_DAU_W_SOCK>);
1401 916         7315 read($_DAU_W_SOCK, $_buf, $_len);
1402             }
1403              
1404 1233 50       5899 $_dat_un->() if !$_is_MSWin32;
1405             }
1406              
1407 1233 50       4549 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1408              
1409 1233 100       3500 return unless $_wa;
1410 916 100       13400 return ( $_wa != _ARRAY ) ? $_thaw->($_buf)[0] : @{ $_thaw->($_buf) };
  382         10229  
1411             }
1412              
1413             # Called by MCE::Hobo ( ->join, ->wait_one ).
1414              
1415             sub _get_hobo_data {
1416 58 50 33 58   1359 if ( $_spawn_child && $_init_pid && $_init_pid eq "$$.$_tid" ) {
      33        
1417 58         872 local ($!, $?);
1418 58 50 33     1144 return if ( ! $_svr_pid || waitpid($_svr_pid, _WNOHANG) );
1419             }
1420              
1421 58 50       327 local $\ = undef if (defined $\);
1422 58 50       341 local $/ = $LF if ($/ ne $LF);
1423 58         345 local $MCE::Signal::SIG;
1424              
1425 58         293 my ($_result, $_error);
1426              
1427             {
1428 58         128 local $MCE::Signal::IPC = 1;
  58         284  
1429 58 50       488 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1430              
1431 58         4735 print({$_DAT_W_SOCK} 'O~DAT'.$LF . $_chn.$LF),
1432 58         1617 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF . $_[1].$LF);
  58         2649  
1433              
1434 58         11171 chomp(my $_le1 = <$_DAU_W_SOCK>),
1435             chomp(my $_le2 = <$_DAU_W_SOCK>);
1436              
1437 58 50       455 read($_DAU_W_SOCK, $_error, $_le1) if $_le1;
1438 58 100       436 read($_DAU_W_SOCK, $_result, $_le2) if $_le2;
1439              
1440 58 50       598 $_dat_un->() if !$_is_MSWin32;
1441             }
1442              
1443 58 50       308 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1444              
1445 58         791 return ($_result, $_error);
1446             }
1447              
1448             # Called by await, rewind, broadcast, signal, timedwait, and wait.
1449             # Including CLOSE, DESTROY, and destroy.
1450              
1451             sub _req1 {
1452 85 50   85   601 return unless defined $_DAU_W_SOCK; # (in cleanup)
1453              
1454 85 50       397 local $\ = undef if (defined $\);
1455 85 50       327 local $/ = $LF if ($/ ne $LF );
1456 85         244 local $MCE::Signal::SIG;
1457              
1458 85         173 my $_ret;
1459              
1460             {
1461 85         213 local $MCE::Signal::IPC = 1;
  85         236  
1462 85 50       481 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1463              
1464 85         4125 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1465 85         1512 print({$_DAU_W_SOCK} $_[1]);
  85         1523  
1466 85         23792 chomp($_ret = <$_DAU_W_SOCK>);
1467              
1468 85 50       780 $_dat_un->() if !$_is_MSWin32;
1469             }
1470              
1471 85 50       385 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1472              
1473 85         296 $_ret;
1474             }
1475              
1476             # Called by PRINT, PRINTF, STORE, ins_inplace, and set.
1477              
1478             sub _req2 {
1479 224 50   224   506 local $\ = undef if (defined $\);
1480 224         363 local $MCE::Signal::SIG;
1481              
1482             {
1483 224         286 local $MCE::Signal::IPC = 1;
  224         309  
1484 224 50       557 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1485              
1486 224         5962 print({$_DAT_W_SOCK} $_[0].$LF . $_chn.$LF),
1487 224         2542 print({$_DAU_W_SOCK} $_[1], $_[2]);
  224         2149  
1488              
1489 224 50       891 $_dat_un->() if !$_is_MSWin32;
1490             }
1491              
1492 224 50       573 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1493              
1494 224         924 1;
1495             }
1496              
1497             # Called by CLEAR and clear.
1498              
1499             sub _req3 {
1500 40     40   362 my ( $_fcn, $self ) = @_;
1501              
1502 40 50       212 local $\ = undef if (defined $\);
1503 40 50       176 local $/ = $LF if ($/ ne $LF );
1504 40         108 local $MCE::Signal::SIG;
1505              
1506 40 50       200 delete $self->[_ITER] if defined $self->[_ITER];
1507              
1508             {
1509 40         97 local $MCE::Signal::IPC = 1;
  40         95  
1510 40 50       329 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1511              
1512 40         1885 print({$_DAT_W_SOCK} 'O~CLR'.$LF . $_chn.$LF),
1513 40         700 print({$_DAU_W_SOCK} $self->[_ID].$LF . $_fcn.$LF);
  40         1609  
1514              
1515 40 50       332 $_dat_un->() if !$_is_MSWin32;
1516             }
1517              
1518 40 50       241 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1519              
1520 40         340 return;
1521             }
1522              
1523             # Called by FETCH and get.
1524              
1525             sub _req4 {
1526 157 50   157   762 local $\ = undef if (defined $\);
1527 157 50       745 local $/ = $LF if ($/ ne $LF );
1528 157         466 local $MCE::Signal::SIG;
1529              
1530 157         362 my ( $_key, $_len, $_buf );
1531              
1532 157 100       601 if ( @_ == 3 ) {
1533 104 50       1565 $_key = ref($_[2]) ? $_[2].'0' : $_freeze->(\$_[2]).'1';
1534             }
1535              
1536             {
1537 157         377 local $MCE::Signal::IPC = 1;
  157         412  
1538 157 50       981 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1539              
1540 157         6864 print({$_DAT_W_SOCK} 'O~FCH'.$LF . $_chn.$LF),
1541 157         2972 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF . length($_key).$LF, $_key);
  157         4059  
1542 157         42722 chomp($_len = <$_DAU_W_SOCK>);
1543              
1544 157 100       1784 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1545              
1546 157 50       947 $_dat_un->() if !$_is_MSWin32;
1547             }
1548              
1549 157 50       847 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1550              
1551 157 100       517 return undef if ($_len < 0);
1552              
1553 156 50 33     796 if ( $_[1]->[_DECODE] && $_[0] eq 'FETCH' ) {
1554 0         0 local $@; $_buf = $_thaw->($_buf)[0];
  0         0  
1555 0   0     0 return eval { $_[1]->[_DECODE]->($_buf) } || $_buf;
1556             }
1557              
1558 156         4396 $_thaw->($_buf)[0];
1559             }
1560              
1561             # Called by FETCHSIZE, SCALAR, keys, and pending.
1562              
1563             sub _size {
1564 4 50   4   30 local $\ = undef if (defined $\);
1565 4 50       28 local $/ = $LF if ($/ ne $LF );
1566 4         21 local $MCE::Signal::SIG;
1567              
1568 4         18 my $_size;
1569              
1570             {
1571 4         11 local $MCE::Signal::IPC = 1;
  4         10  
1572 4 50       21 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1573              
1574 4         167 print({$_DAT_W_SOCK} 'O~SZE'.$LF . $_chn.$LF),
1575 4         58 print({$_DAU_W_SOCK} $_[1]->[_ID].$LF . $_[0].$LF);
  4         157  
1576 4         522 chomp($_size = <$_DAU_W_SOCK>);
1577              
1578 4 50       65 $_dat_un->() if !$_is_MSWin32;
1579             }
1580              
1581 4 50       34 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1582              
1583 4 50       72 length($_size) ? int($_size) : undef;
1584             }
1585              
1586             ###############################################################################
1587             ## ----------------------------------------------------------------------------
1588             ## Common methods.
1589             ##
1590             ###############################################################################
1591              
1592             our $AUTOLOAD; # MCE::Shared::Object::
1593              
1594             sub AUTOLOAD {
1595 235     235   82440 my $_fcn = $AUTOLOAD; substr($_fcn, 0, rindex($_fcn,':') + 1, '');
  235         1031  
1596              
1597             # save this method for future calls
1598 46     46   380 no strict 'refs';
  46         103  
  46         129271  
1599 235     972   2490 *{ $AUTOLOAD } = sub { _auto($_fcn, @_) };
  235         2182  
  972         390746  
1600              
1601 235         511 goto &{ $AUTOLOAD };
  235         1348  
1602             }
1603              
1604             # blessed ( )
1605              
1606             sub blessed {
1607 17     17   1001 $_[0]->[_CLASS];
1608             }
1609              
1610             # decoder ( CODE )
1611             # decoder ( )
1612              
1613             sub decoder {
1614 0 0 0 0   0 $_[0]->[_DECODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1615 0         0 $_[0]->[_DECODE];
1616             }
1617              
1618             # encoder ( CODE )
1619             # encoder ( )
1620              
1621             sub encoder {
1622 0 0 0 0   0 $_[0]->[_ENCODE] = $_[1] if (@_ == 2 && (ref $_[1] eq 'CODE' || !$_[1]));
      0        
1623 0         0 $_[0]->[_ENCODE];
1624             }
1625              
1626             # destroy ( { unbless => 1 } )
1627             # destroy ( )
1628              
1629             sub destroy {
1630 0     0   0 my $_id = $_[0]->[_ID];
1631 0 0 0     0 my $_un = (ref $_[1] eq 'HASH' && $_[1]->{'unbless'}) ? 1 : 0;
1632 0 0       0 my $_item = (defined wantarray) ? $_[0]->export({ unbless => $_un }) : undef;
1633 0 0       0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
1634              
1635 0         0 delete($_all{ $_id }), delete($_obj{ $_id });
1636              
1637 0 0 0     0 if (defined $_svr_pid && exists $_new{ $_id } && $_new{ $_id } eq $_pid) {
      0        
1638 0         0 delete($_new{ $_id }), _req1('M~DES', $_id.$LF);
1639             }
1640              
1641 0         0 $_[0] = undef;
1642 0         0 $_item;
1643             }
1644              
1645             # export ( { unbless => 1 }, key [, key, ... ] )
1646             # export ( key [, key, ... ] )
1647             # export ( )
1648              
1649             sub export {
1650 0     0   0 my $_ob = shift;
1651 0         0 my $_id = $_ob->[_ID];
1652 0 0       0 my $_lkup = ref($_[0]) eq 'HASH' ? shift : {};
1653              
1654             # safety for circular references to not loop endlessly
1655 0 0       0 return $_lkup->{ $_id } if exists $_lkup->{ $_id };
1656              
1657 0 0       0 my $_tmp = @_ ? $_freeze->([ @_ ]) : '';
1658 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1659 0         0 my $_class = $_ob->[_CLASS];
1660 0         0 my $_item;
1661              
1662 0         0 { local $@; MCE::Shared::_use($_class); }
  0         0  
1663              
1664             {
1665 0 0       0 local $\ = undef if (defined $\);
  0         0  
  0         0  
1666 0 0       0 local $/ = $LF if ($/ ne $LF);
1667 0         0 local $MCE::Signal::SIG;
1668              
1669 0         0 my $_len;
1670              
1671             {
1672 0         0 local $MCE::Signal::IPC = 1;
  0         0  
1673 0 0       0 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1674              
1675 0         0 print({$_DAT_W_SOCK} 'M~EXP'.$LF . $_chn.$LF),
1676 0         0 print({$_DAU_W_SOCK} $_buf, $_tmp); undef $_buf;
  0         0  
  0         0  
1677 0         0 chomp($_len = <$_DAU_W_SOCK>);
1678              
1679 0 0       0 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1680              
1681 0 0       0 $_dat_un->() if !$_is_MSWin32;
1682             }
1683              
1684 0 0       0 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1685              
1686 0 0       0 return undef if ($_len < 0);
1687              
1688 0         0 $_item = $_lkup->{ $_id } = Storable::thaw($_buf);
1689 0         0 undef $_buf;
1690             }
1691              
1692 0         0 my $_data; local $_;
  0         0  
1693              
1694             ## no critic
1695 0 0 0     0 if ( $_class->isa('MCE::Shared::Array') || $_class->isa('Tie::StdArray') ) {
    0 0        
    0 0        
1696 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1697 0         0 } @{ $_item };
  0         0  
1698              
1699 0 0       0 return $_lkup->{ $_id } = [ @{ $_item } ] if $_lkup->{'unbless'};
  0         0  
1700             }
1701             elsif ( $_class->isa('MCE::Shared::Hash') || $_class->isa('Tie::StdHash') ) {
1702 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1703 0         0 } CORE::values %{ $_item };
  0         0  
1704              
1705 0 0       0 return $_lkup->{ $_id } = { %{ $_item } } if $_lkup->{'unbless'};
  0         0  
1706             }
1707             elsif ( $_class->isa('MCE::Shared::Scalar') || $_class->isa('Tie::StdScalar') ) {
1708 0 0 0     0 if ( $_blessed->(${ $_item }) && ${ $_item }->can('export') ) {
  0         0  
  0         0  
1709 0         0 ${ $_item } = ${ $_item }->export($_lkup);
  0         0  
  0         0  
1710             }
1711 0 0       0 return $_lkup->{ $_id } = \do { my $o = ${ $_item } } if $_lkup->{'unbless'};
  0         0  
  0         0  
1712             }
1713             else {
1714 0 0       0 if ( $_class->isa('MCE::Shared::Ordhash') ) { $_data = $_item->[0] }
  0 0       0  
    0          
    0          
    0          
1715 0         0 elsif ( $_class->isa('MCE::Shared::Cache') ) { $_data = $_item->[0] }
1716 0         0 elsif ( $_class->isa('Hash::Ordered') ) { $_data = $_item->[0] }
1717 0         0 elsif ( $_class->isa('Tie::ExtraHash') ) { $_data = $_item->[0] }
1718 0         0 elsif ( $_class->isa('Tie::IxHash') ) { $_data = $_item->[2] }
1719              
1720 0 0       0 if ( reftype $_data eq 'ARRAY' ) {
    0          
1721 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1722 0         0 } @{ $_data };
  0         0  
1723             }
1724             elsif ( reftype $_data eq 'HASH' ) {
1725 0 0 0     0 map { $_ = $_->export($_lkup) if $_blessed->($_) && $_->can('export')
1726 0         0 } values %{ $_data };
  0         0  
1727             }
1728             }
1729              
1730 0         0 $_item;
1731             }
1732              
1733             # iterator ( index [, index, ... ] ) # Array
1734             # iterator ( key [, key, ... ] ) # Cache, Hash, Ordhash
1735             # iterator ( "query string" ) # Cache, Hash, Ordhash, Array
1736             # iterator ( )
1737              
1738             sub iterator {
1739 2     2   11 my ( $self, @keys ) = @_;
1740              
1741 2         11 my $pkg = $self->blessed();
1742 2 50 33     34 my $flg = ($pkg->can('NEXTKEY') || $pkg->can('keys')) ? 1 : 0;
1743 2 0       24 my $get = $pkg->can('FETCH') ? 'FETCH' : $pkg->can('get') ? 'get' : '';
    50          
1744              
1745 2 50 33     23 unless ( ($flg || $pkg->can('FETCHSIZE')) && $get ) {
      33        
1746 0     0   0 return sub {};
1747             }
1748              
1749             # MCE::Shared::{ Array, Cache, Hash, Ordhash }, Hash::Ordered,
1750             # or similar module.
1751              
1752 2 50       19 $get = 'peek' if $pkg->isa('MCE::Shared::Cache');
1753              
1754 2 50 0     6 if ( ! @keys ) {
    0          
    0          
1755 2         6 @keys = $self->keys;
1756             }
1757             elsif ( @keys == 1 && $keys[0] =~ /^(?:key|val)[ ]+\S\S?[ ]+\S/ ) {
1758 0 0   0   0 return sub {} unless $pkg->isa('MCE::Shared::Base::Common');
1759 0         0 @keys = $self->keys($keys[0]);
1760             }
1761             elsif ( $pkg->isa('MCE::Shared::Cache') ) {
1762 0         0 $self->_prune_head();
1763             }
1764              
1765             return sub {
1766 10 100   10   76 return unless @keys;
1767 8         14 my $key = shift @keys;
1768 8         42 return ( $key => $self->$get($key) );
1769 2         34 };
1770             }
1771              
1772             # rewind ( index [, index, ... ] ) # Array
1773             # rewind ( key [, key, ... ] ) # Cache, Hash, Ordhash
1774             # rewind ( "query string" ) # Cache, Hash, Ordhash, Array
1775             # rewind ( begin, end [, step, format ] ) # Sequence
1776             # rewind ( )
1777              
1778             sub rewind {
1779 18     18   8576 my $_id = shift()->[_ID];
1780 18         184 my $_buf = $_freeze->([ @_ ]);
1781 18         131 _req1('M~IRW', $_id.$LF . length($_buf).$LF . $_buf);
1782              
1783 18         65 return;
1784             }
1785              
1786             # next ( )
1787              
1788             sub next {
1789 127 50   127   1274 local $\ = undef if (defined $\);
1790 127 50       325 local $/ = $LF if ($/ ne $LF);
1791 127         198 local $MCE::Signal::SIG;
1792              
1793 127         221 my ( $_len, $_buf );
1794              
1795             {
1796 127         189 local $MCE::Signal::IPC = 1;
  127         218  
1797 127 50       410 $_is_MSWin32 ? CORE::lock $_DAT_LOCK : $_dat_ex->();
1798              
1799 127         4790 print({$_DAT_W_SOCK} 'M~INX'.$LF . $_chn.$LF),
1800 127         1666 print({$_DAU_W_SOCK} $_[0]->[_ID].$LF);
  127         3740  
1801 127         41928 chomp($_len = <$_DAU_W_SOCK>);
1802              
1803 127 100       891 read($_DAU_W_SOCK, $_buf, $_len) if ($_len >= 0);
1804              
1805 127 50       457 $_dat_un->() if !$_is_MSWin32;
1806             }
1807              
1808 127 50       416 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
1809              
1810 127 100       313 return if ($_len < 0);
1811              
1812 112 100       149 my $_b; return wantarray ? () : undef unless @{ $_b = $_thaw->($_buf) };
  112 100       164  
  112         1206  
1813              
1814 105 50       403 if ( $_[0]->[_DECODE] ) {
1815 0   0     0 local $@; $_b->[-1] = eval { $_[0]->[_DECODE]->($_b->[-1]) } || $_b->[-1];
  0         0  
1816             }
1817              
1818             ( wantarray )
1819 105 100       491 ? @{ $_b } == 2 ? ( $_b->[0], delete $_b->[-1] ) : @{ $_b }
  24 100       133  
  12         54  
1820             : delete $_b->[-1];
1821             }
1822              
1823             ###############################################################################
1824             ## ----------------------------------------------------------------------------
1825             ## Methods optimized for:
1826             ## MCE::Shared::{ Array, Hash, Ordhash, Scalar } and similar.
1827             ##
1828             ###############################################################################
1829              
1830             sub ins_inplace {
1831 0     0   0 my $_id = shift()->[_ID];
1832              
1833 0 0       0 if ( @_ ) {
1834 0         0 my $_tmp = Storable::freeze([ @_ ]);
1835 0         0 my $_buf = $_id.$LF . length($_tmp).$LF;
1836 0         0 _req2('O~PDL', $_buf, $_tmp);
1837             }
1838              
1839 0         0 return;
1840             }
1841              
1842 4     4   1513 sub FETCHSIZE { _size('FETCHSIZE', @_) }
1843 0     0   0 sub SCALAR { _size('SCALAR' , @_) }
1844 6     6   424 sub CLEAR { _req3('CLEAR' , @_) }
1845 98     98   18574770 sub FETCH { _req4('FETCH' , @_) }
1846              
1847             sub clear {
1848 34 50   34   13711 @_ > 1 ? _auto('clear', @_) : _req3('clear', @_);
1849             }
1850             sub get {
1851 59 50   59   1217 @_ > 2 ? _auto('get', @_) : _req4('get', @_);
1852             }
1853              
1854             sub FIRSTKEY {
1855 6     6   309 $_[0]->[_ITER] = [ $_[0]->keys ];
1856 6         33 shift @{ $_[0]->[_ITER] };
  6         123  
1857             }
1858             sub NEXTKEY {
1859 9     9   26 shift @{ $_[0]->[_ITER] };
  9         125  
1860             }
1861              
1862             sub STORE {
1863 58 50 33 58   1465 if ( @_ > 1 && $_[0]->[_ENCODE] ) {
    50 66        
    100 33        
1864 0 0       0 $_[-1] = $_[0]->[_ENCODE]->($_[-1]) if ref($_[-1]);
1865             }
1866             elsif ( @_ == 2 && $_blessed->($_[1]) && $_[1]->can('SHARED_ID') ) {
1867 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[1]->SHARED_ID().$LF);
1868 0         0 delete $_new{ $_[1]->SHARED_ID() };
1869             }
1870             elsif ( ref $_[2] ) {
1871 4 50 33     290 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
    100 100        
1872 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1873 0         0 delete $_new{ $_[2]->SHARED_ID() };
1874             }
1875             elsif ( $_[0]->[1]->isa('MCE::Shared::Array') ||
1876             $_[0]->[1]->isa('MCE::Shared::Hash') ) {
1877 2         66 $_[2] = MCE::Shared::share({ _DEEPLY_ => 1 }, $_[2]);
1878 2         30 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1879             }
1880             }
1881 58         290 _auto('STORE', @_); 1;
  58         284  
1882             }
1883              
1884             sub set {
1885 54 50   54   3055 if ( ref $_[2] ) {
1886 0 0 0     0 if ( $_blessed->($_[2]) && $_[2]->can('SHARED_ID') ) {
1887 0         0 _req2('M~DEE', $_[0]->[_ID].$LF, $_[2]->SHARED_ID().$LF);
1888 0         0 delete $_new{ $_[2]->SHARED_ID() };
1889             }
1890             }
1891 54         469 _auto('set', @_);
1892             }
1893              
1894             sub keys {
1895 145 50 66 145   1031 ( @_ == 1 && !wantarray ) ? _size('keys', @_) : _auto('keys', @_);
1896             }
1897              
1898             sub lock {
1899 12     12   2008050 my ( $self ) = @_;
1900 12 50       125 Carp::croak( sprintf(
1901             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1902             )) unless $self->[_MUTEX];
1903              
1904 12         200 $self->[_MUTEX]->lock();
1905             }
1906              
1907             sub unlock {
1908 4     4   60 my ( $self ) = @_;
1909 4 50       32 Carp::croak( sprintf(
1910             "Mutex not enabled for the shared %s instance", $self->[_CLASS]
1911             )) unless $self->[_MUTEX];
1912              
1913 4         116 $self->[_MUTEX]->unlock();
1914             }
1915              
1916             {
1917 46     46   476 no strict 'refs'; *{ __PACKAGE__.'::store' } = \&STORE;
  46         156  
  46         3787  
1918             }
1919              
1920             1;
1921              
1922             __END__