File Coverage

blib/lib/MCE/Channel.pm
Criterion Covered Total %
statement 65 71 91.5
branch 15 32 46.8
condition 10 21 47.6
subroutine 19 20 95.0
pod 2 2 100.0
total 111 146 76.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Queue-like and two-way communication capability.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel;
8              
9 18     18   441266 use strict;
  18         92  
  18         547  
10 18     18   93 use warnings;
  18         24  
  18         518  
11              
12 18     18   137 no warnings qw( uninitialized once );
  18         32  
  18         965  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (TestingAndDebugging::ProhibitNoStrict)
18              
19 18     18   12524 use if $^O eq 'MSWin32', 'threads';
  18         239  
  18         143  
20 18     18   1010 use if $^O eq 'MSWin32', 'threads::shared';
  18         26  
  18         68  
21              
22 18     18   495 use Carp ();
  18         34  
  18         3232  
23              
24             $Carp::Internal{ (__PACKAGE__) }++;
25              
26             my ( $freeze, $thaw );
27              
28             BEGIN {
29 18 50 33 18   233 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
30 18         36 local $@;
31 18     18   1445 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  18     18   147  
  18         517  
  18         1052  
  18         130  
  18         350  
  18         559  
32 18 50       80 if ( ! $@ ) {
33 18         290 my $encoder_ver = int( Sereal::Encoder->VERSION() );
34 18         156 my $decoder_ver = int( Sereal::Decoder->VERSION() );
35 18 50       74 if ( $encoder_ver - $decoder_ver == 0 ) {
36 18         36 $freeze = \&Sereal::Encoder::encode_sereal;
37 18         68 $thaw = \&Sereal::Decoder::decode_sereal;
38             }
39             }
40             }
41              
42 18 50       430 if ( ! defined $freeze ) {
43 0         0 require Storable;
44 0         0 $freeze = \&Storable::freeze;
45 0         0 $thaw = \&Storable::thaw;
46             }
47             }
48              
49 18     18   9008 use MCE::Util ();
  18         51  
  18         3255  
50              
51             my $tid = $INC{'threads.pm'} ? threads->tid() : 0;
52              
53             sub new {
54 29     29 1 1990 my ( $class, %argv ) = @_;
55 29 50       247 my $impl = defined( $argv{impl} ) ? ucfirst( lc $argv{impl} ) : 'Mutex';
56              
57             # Replace 'fast' with 'Fast' in the implementation value.
58 29         134 $impl =~ s/fast$/Fast/;
59              
60 29 50 66     219 $impl = 'Threads' if ( $impl eq 'Mutex' && $^O eq 'MSWin32' );
61 29 50 66     155 $impl = 'ThreadsFast' if ( $impl eq 'MutexFast' && $^O eq 'MSWin32' );
62 29 50 33     145 $impl = 'Mutex' if ( $impl eq 'Threads' && $^O eq 'cygwin' );
63 29 50 33     118 $impl = 'MutexFast' if ( $impl eq 'ThreadsFast' && $^O eq 'cygwin' );
64              
65 29 50       2014 eval "require MCE::Channel::$impl; 1;" ||
66             Carp::croak("Could not load Channel implementation '$impl': $@");
67              
68 29         152 my $pkg = 'MCE::Channel::'.$impl;
69 18     18   143 no strict 'refs';
  18         37  
  18         8135  
70              
71 29         175 $pkg->new(%argv);
72             }
73              
74             sub CLONE {
75 0 0   0   0 $tid = threads->tid if $INC{'threads.pm'};
76             }
77              
78             sub DESTROY {
79 19 50   19   35851 my ( $pid, $self ) = ( $tid ? $$ .'.'. $tid : $$, @_ );
80              
81 19 100 66     280 if ( $self->{'init_pid'} && $self->{'init_pid'} eq $pid ) {
82 16         178 MCE::Util::_destroy_socks($self, qw(c_sock c2_sock p_sock p2_sock));
83 16         496 delete($self->{c_mutex}), delete($self->{p_mutex});
84             }
85              
86 19         1601 return;
87             }
88              
89             sub impl {
90 6 50   6 1 109 $_[0]->{'impl'} || 'Not defined';
91             }
92              
93 15     15   47 sub _get_freeze { $freeze; }
94 15     15   35 sub _get_thaw { $thaw; }
95              
96             sub _ended {
97 12     12   134 warn "WARNING: ($_[0]) called on a channel that has been 'end'ed\n";
98              
99 12         7435 return;
100             }
101              
102             sub _read {
103 120     120   293 my $bytes = MCE::Util::_sysread( $_[0], $_[1], my $len = $_[2] );
104 120         263 my $read = $bytes;
105              
106 120   33     473 while ( $bytes && $read != $len ) {
107 0         0 $bytes = MCE::Util::_sysread( $_[0], $_[1], $len - $read, length($_[1]) );
108 0 0       0 $read += $bytes if $bytes;
109             }
110              
111 120         259 return;
112             }
113              
114             sub _pid {
115 29 50   29   165 $tid ? $$ .'.'. $tid : $$;
116             }
117              
118             1;
119              
120             __END__