File Coverage

blib/lib/MCE/Core/Input/Request.pm
Criterion Covered Total %
statement 82 102 80.3
branch 30 58 51.7
condition 5 15 33.3
subroutine 4 8 50.0
pod n/a
total 121 183 66.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Array reference and Glob reference input reader.
4             ##
5             ## This package provides the request chunk method used internally by the worker
6             ## process. Distribution follows a bank-queuing model.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Input::Request;
13              
14 35     35   1069 use strict;
  35         70  
  35         1003  
15 35     35   204 use warnings;
  35         58  
  35         1860  
16              
17             our $VERSION = '1.887';
18              
19             ## Items below are folded into MCE.
20              
21             package # hide from rpm
22             MCE;
23              
24 35     35   186 no warnings qw( threads recursion uninitialized );
  35         46  
  35         28645  
25              
26             ###############################################################################
27             ## ----------------------------------------------------------------------------
28             ## Worker process -- Request chunk.
29             ##
30             ###############################################################################
31              
32             sub _worker_request_chunk {
33              
34 53     53   222 my ($self, $_proc_type) = @_;
35              
36 53         106 @_ = ();
37              
38             _croak('MCE::_worker_request_chunk: (user_func) is not specified')
39 53 50       224 unless (defined $self->{user_func});
40              
41 53 50       292 my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
42 53         157 my $_chn = $self->{_chn};
43 53         109 my $_DAT_LOCK = $self->{_dat_lock};
44 53         137 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
45 53         119 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
46 53         116 my $_lock_chn = $self->{_lock_chn};
47 53         123 my $_chunk_size = $self->{chunk_size};
48 53         155 my $_use_slurpio = $self->{use_slurpio};
49 53   33     384 my $_RS = $self->{RS} || $/;
50 53         108 my $_wuf = $self->{_wuf};
51              
52 53         110 my ($_dat_ex, $_dat_un, $_pid);
53              
54 53 50       150 if ($_lock_chn) {
55 0 0       0 $_pid = $INC{'threads.pm'} ? $$ .'.'. threads->tid() : $$;
56              
57             # inlined for performance
58             $_dat_ex = sub {
59             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
60 0 0   0   0 if $_is_MSWin32;
61             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
62 0 0       0 unless $_DAT_LOCK->{ $_pid };
63 0         0 };
64             $_dat_un = sub {
65             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
66 0 0   0   0 if $_DAT_LOCK->{ $_pid };
67 0         0 };
68             }
69              
70 53         267 my ($_chunk_id, $_len, $_output_tag);
71 53         0 my ($_chop_len, $_chop_str, $_p);
72              
73 53 100       185 if ($_proc_type == REQUEST_ARRAY) {
    100          
74 36         181 $_output_tag = OUTPUT_A_REF;
75 36         82 $_chop_len = 0;
76             }
77             elsif ($_proc_type == REQUEST_HASH) {
78 3         29 $_output_tag = OUTPUT_H_REF;
79 3         8 $_chop_len = 0;
80             }
81             else {
82 14         81 $_output_tag = OUTPUT_G_REF;
83 14 50 33     105 if (length $_RS > 1 && substr($_RS, 0, 1) eq "\n") {
84 0         0 $_chop_str = substr($_RS, 1);
85 0         0 $_chop_len = length $_chop_str;
86             } else {
87 14         61 $_chop_str = '';
88 14         42 $_chop_len = 0;
89             }
90             }
91              
92             ## -------------------------------------------------------------------------
93              
94 53     0   575 $self->{_next_jmp} = sub { goto _WORKER_REQUEST_CHUNK__NEXT; };
  0         0  
95 53     0   465 $self->{_last_jmp} = sub { goto _WORKER_REQUEST_CHUNK__LAST; };
  0         0  
96              
97 53         121 local $_;
98              
99             _WORKER_REQUEST_CHUNK__NEXT:
100              
101 53         107 while (1) {
102 197 50       1524 undef $_ if (length > MAX_GC_SIZE);
103              
104 197         454 $_ = '';
105              
106             ## Obtain the next chunk of data.
107             {
108 197 50       466 local $\ = undef if (defined $\);
  197         630  
109 197 50       580 local $/ = $LF if ($/ ne $LF );
110              
111 197 50       376 $_dat_ex->() if $_lock_chn;
112 197         698 print {$_DAT_W_SOCK} $_output_tag . $LF . $_chn . $LF;
  197         2335  
113 197 50       596 MCE::Util::_sock_ready($_DAU_W_SOCK, -1) if $_is_MSWin32;
114 197         129147 chomp($_len = <$_DAU_W_SOCK>);
115              
116 197 100       998 unless ($_len) {
117 53 50       185 $_dat_un->() if $_lock_chn;
118 53         349 return;
119             }
120              
121 144         606 chomp($_chunk_id = <$_DAU_W_SOCK>);
122              
123 144 50 66     941 if ($_chunk_id > 1 && $_chop_len) {
124 0         0 $_p = $_chop_len; $_ = $_chop_str;
  0         0  
125             } else {
126 144         255 $_p = 0;
127             }
128              
129 144         673 read $_DAU_W_SOCK, $_, $_len, $_p;
130              
131 144 50       340 $_dat_un->() if $_lock_chn;
132             }
133              
134             ## Call user function.
135 144 100       316 if ($_proc_type == REQUEST_ARRAY) {
    100          
136 122         1574 my $_chunk_ref = $self->{thaw}($_); undef $_;
  122         271  
137 122 100       312 $_ = ($_chunk_size == 1) ? $_chunk_ref->[0] : $_chunk_ref;
138 122         503 $_wuf->($self, $_chunk_ref, $_chunk_id);
139             }
140             elsif ($_proc_type == REQUEST_HASH) {
141 15         18 my $_chunk_ref = { @{ $self->{thaw}($_) } }; undef $_;
  15         226  
  15         42  
142 15         22 $_ = $_chunk_ref;
143 15         45 $_wuf->($self, $_chunk_ref, $_chunk_id);
144             }
145             else {
146 7         18 $_ = ${ $self->{thaw}($_) };
  7         1019  
147 7 100       100 if ($_use_slurpio) {
148 2 50 33     11 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
149 0         0 substr($_, -$_chop_len, $_chop_len, '');
150             }
151 2         6 local $_ = \$_;
152 2         12 $_wuf->($self, $_, $_chunk_id);
153             }
154             else {
155 5 50       23 if ($_chunk_size == 1) {
156 0 0 0     0 if ($_chop_len && substr($_, -$_chop_len) eq $_chop_str) {
157 0         0 substr($_, -$_chop_len, $_chop_len, '');
158             }
159 0         0 $_wuf->($self, [ $_ ], $_chunk_id);
160             }
161             else {
162 5         24 my @_recs;
163             {
164 5 50       11 local $/ = $_RS if ($/ ne $_RS);
  5         27  
165 5         65 _sync_buffer_to_array(\$_, \@_recs, $_chop_str);
166 5         11 undef $_;
167             }
168 5 50       15 if ($_chop_len) {
169 0         0 for my $i (0 .. @_recs - 1) {
170 0 0       0 if (substr($_recs[$i], -$_chop_len) eq $_chop_str) {
171 0         0 substr($_recs[$i], -$_chop_len, $_chop_len, '');
172             }
173             }
174             }
175 5         12 local $_ = \@_recs;
176 5         27 $_wuf->($self, \@_recs, $_chunk_id);
177             }
178             }
179             }
180             }
181              
182             _WORKER_REQUEST_CHUNK__LAST:
183              
184 0           return;
185             }
186              
187             1;
188              
189             __END__
190              
191             ###############################################################################
192             ## ----------------------------------------------------------------------------
193             ## Module usage.
194             ##
195             ###############################################################################
196              
197             =head1 NAME
198              
199             MCE::Core::Input::Request - Array reference and Glob reference input reader
200              
201             =head1 VERSION
202              
203             This document describes MCE::Core::Input::Request version 1.887
204              
205             =head1 DESCRIPTION
206              
207             This package provides the request chunk method used internally by the worker
208             process. Distribution follows a bank-queuing model.
209              
210             There is no public API.
211              
212             =head1 SEE ALSO
213              
214             The syntax for the C<input_data> option is described in L<MCE::Core>.
215              
216             =head1 AUTHOR
217              
218             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
219              
220             =cut
221