File Coverage

blib/lib/MCE/Core/Input/Request.pm
Criterion Covered Total %
statement 82 102 80.3
branch 30 58 51.7
condition 5 15 33.3
subroutine 4 8 50.0
pod n/a
total 121 183 66.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Array reference and Glob reference input reader.
4             ##
5             ## This package provides the request chunk method used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Request;
13              
14 35     35   1209 use strict;
  35         71  
  35         1159  
15 35     35   207 use warnings;
  35         67  
  35         2134  
16              
17             our $VERSION = '1.888';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 35     35   221 no warnings qw( threads recursion uninitialized );
  35         92  
  35         37766  
25              
26             ###############################################################################
27             ## ----------------------------------------------------------------------------
28             ## Worker process -- Request chunk.
29             ##
30             ###############################################################################
31              
32             sub _worker_request_chunk {
33              
34 53     53   235 my ($self, $_proc_type) = @_;
35              
36 53         174 @_ = ();
37              
38             _croak('MCE::_worker_request_chunk: (user_func) is not specified')
39 53 50       233 unless (defined $self->{user_func});
40              
41 53 50       258 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
42 53         133 my $_chn = $self->{_chn};
43 53         128 my $_DAT_LOCK = $self->{_dat_lock};
44 53         111 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
45 53         123 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
46 53         126 my $_lock_chn = $self->{_lock_chn};
47 53         122 my $_chunk_size = $self->{chunk_size};
48 53         188 my $_use_slurpio = $self->{use_slurpio};
49 53   33     541 my $_RS = $self->{RS} || $/;
50 53         139 my $_wuf = $self->{_wuf};
51              
52 53         132 my ($_dat_ex, $_dat_un, $_pid);
53              
54 53 50       161 if ($_lock_chn) {
55 0 0       0 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
56              
57             # inlined for performance
58             $_dat_ex = sub {
59             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
60 0 0   0   0 if $_is_MSWin32;
61             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
62 0 0       0 unless $_DAT_LOCK->{ $_pid };
63 0         0 };
64             $_dat_un = sub {
65             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
66 0 0   0   0 if $_DAT_LOCK->{ $_pid };
67 0         0 };
68             }
69              
70 53         184 my ($_chunk_id, $_len, $_output_tag);
71 53         0 my ($_chop_len, $_chop_str, $_p);
72              
73 53 100       243 if ($_proc_type == REQUEST_ARRAY) {
    100          
74 36         201 $_output_tag = OUTPUT_A_REF;
75 36         97 $_chop_len = 0;
76             }
77             elsif ($_proc_type == REQUEST_HASH) {
78 3         8 $_output_tag = OUTPUT_H_REF;
79 3         15 $_chop_len = 0;
80             }
81             else {
82 14         39 $_output_tag = OUTPUT_G_REF;
83 14 50 33     84 if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
84 0         0 $_chop_str = substr($_RS, 1);
85 0         0 $_chop_len = length $_chop_str;
86             } else {
87 14         136 $_chop_str = '';
88 14         47 $_chop_len = 0;
89             }
90             }
91              
92             ## -------------------------------------------------------------------------
93              
94 53     0   591 $self->{_next_jmp} = sub { goto _WORKER_REQUEST_CHUNK__NEXT; };
  0         0  
95 53     0   477 $self->{_last_jmp} = sub { goto _WORKER_REQUEST_CHUNK__LAST; };
  0         0  
96              
97 53         139 local $_;
98              
99             _WORKER_REQUEST_CHUNK__NEXT:
100              
101 53         103 while (1) {
102 197 50       1483 undef $_ if (length > MAX_GC_SIZE);
103              
104 197         641 $_ = '';
105              
106             ## Obtain the next chunk of data.
107             {
108 197 50       392 local $\ = undef if (defined $\);
  197         851  
109 197 50       681 local $/ = $LF if ($/ ne $LF );
110              
111 197 50       465 $_dat_ex->() if $_lock_chn;
112 197         730 print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
  197         3067  
113 197 50       873 MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
114 197         149560 chomp($_len = <$_DAU_W_SOCK>);
115              
116 197 100       1490 unless ($_len) {
117 53 50       255 $_dat_un->() if $_lock_chn;
118 53         475 return;
119             }
120              
121 144         672 chomp($_chunk_id = <$_DAU_W_SOCK>);
122              
123 144 50 66     1381 if ($_chunk_id > 1 && $_chop_len) {
124 0         0 $_p = $_chop_len; $_ = $_chop_str;
  0         0  
125             } else {
126 144         345 $_p = 0;
127             }
128              
129 144         805 read $_DAU_W_SOCK, $_, $_len, $_p;
130              
131 144 50       469 $_dat_un->() if $_lock_chn;
132             }
133              
134             ## Call user function.
135 144 100       493 if ($_proc_type == REQUEST_ARRAY) {
    100          
136 122         2157 my $_chunk_ref = $self->{thaw}($_); undef $_;
  122         395  
137 122 100       390 $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
138 122         814 $_wuf->($self, $_chunk_ref, $_chunk_id);
139             }
140             elsif ($_proc_type == REQUEST_HASH) {
141 15         34 my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_;
  15         347  
  15         64  
142 15         33 $_ = $_chunk_ref;
143 15         82 $_wuf->($self, $_chunk_ref, $_chunk_id);
144             }
145             else {
146 7         24 $_ = ${ $self->{thaw}($_) };
  7         114  
147 7 100       56 if ($_use_slurpio) {
148 2 50 33     11 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
149 0         0 substr($_, -$_chop_len, $_chop_len, '');
150             }
151 2         9 local $_ = \$_;
152 2         10 $_wuf->($self, $_, $_chunk_id);
153             }
154             else {
155 5 50       35 if ($_chunk_size == 1) {
156 0 0 0     0 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
157 0         0 substr($_, -$_chop_len, $_chop_len, '');
158             }
159 0         0 $_wuf->($self, [ $_ ], $_chunk_id);
160             }
161             else {
162 5         17 my @_recs;
163             {
164 5 50       14 local $/ = $_RS if ($/ ne $_RS);
  5         31  
165 5         148 _sync_buffer_to_array(\$_, \@_recs, $_chop_str);
166 5         19 undef $_;
167             }
168 5 50       38 if ($_chop_len) {
169 0         0 for my $i (0 .. @_recs - 1) {
170 0 0       0 if (substr($_recs[$i], -$_chop_len) eq $_chop_str) {
171 0         0 substr($_recs[$i], -$_chop_len, $_chop_len, '');
172             }
173             }
174             }
175 5         27 local $_ = \@_recs;
176 5         28 $_wuf->($self, \@_recs, $_chunk_id);
177             }
178             }
179             }
180             }
181              
182             _WORKER_REQUEST_CHUNK__LAST:
183              
184 0           return;
185             }
186              
187             1;
188              
189             __END__