line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
######################################## |
2
|
|
|
|
|
|
|
# |
3
|
|
|
|
|
|
|
# Author: David Spadea |
4
|
|
|
|
|
|
|
# Web: http://www.spadea.net |
5
|
|
|
|
|
|
|
# |
6
|
|
|
|
|
|
|
# This code is release under the same terms |
7
|
|
|
|
|
|
|
# as the PERL interpreter. |
8
|
|
|
|
|
|
|
# |
9
|
|
|
|
|
|
|
######################################## |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
package MultiThread; |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
our $VERSION = '0.9'; |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
package MultiThread::Base; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
require 5.008; |
18
|
|
|
|
|
|
|
|
19
|
1
|
|
|
1
|
|
36680
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
40
|
|
20
|
1
|
|
|
1
|
|
6
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
29
|
|
21
|
|
|
|
|
|
|
|
22
|
1
|
|
|
1
|
|
5369
|
use threads; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
use threads::shared; |
24
|
|
|
|
|
|
|
use Thread::Queue; |
25
|
|
|
|
|
|
|
use Data::Dumper; |
26
|
|
|
|
|
|
|
use Sys::CPU; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
use Storable qw(freeze thaw); |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
sub new |
31
|
|
|
|
|
|
|
{ |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
my $class = shift; |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
my $self = {}; |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
share($$self{ProcessingCount}); |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
$$self{ProcessingCount} = 0; |
40
|
|
|
|
|
|
|
share($$self{Shutdown}); |
41
|
|
|
|
|
|
|
share($$self{Responses}); |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
$$self{Threads} = []; |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
$self = bless($self, $class); |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
return $self; |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
sub _request_queue |
52
|
|
|
|
|
|
|
{ |
53
|
|
|
|
|
|
|
my $self = shift; |
54
|
|
|
|
|
|
|
return $$self{Requests}; |
55
|
|
|
|
|
|
|
} |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
sub _response_queue |
59
|
|
|
|
|
|
|
{ |
60
|
|
|
|
|
|
|
my $self = shift; |
61
|
|
|
|
|
|
|
return $$self{Responses}; |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
sub shutdown |
65
|
|
|
|
|
|
|
{ |
66
|
|
|
|
|
|
|
my $self = shift; |
67
|
|
|
|
|
|
|
$$self{Shutdown} = 1; |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
foreach my $thread (@{ $$self{Threads} } ) |
70
|
|
|
|
|
|
|
{ |
71
|
|
|
|
|
|
|
$thread->join if ($thread->tid); |
72
|
|
|
|
|
|
|
} |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
return 1; |
75
|
|
|
|
|
|
|
} |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
sub worker |
78
|
|
|
|
|
|
|
{ |
79
|
|
|
|
|
|
|
my $self = shift; |
80
|
|
|
|
|
|
|
my $workersub = shift; |
81
|
|
|
|
|
|
|
my $inputq = shift; |
82
|
|
|
|
|
|
|
my $outputq = shift; |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
while(1) |
85
|
|
|
|
|
|
|
{ |
86
|
|
|
|
|
|
|
my $ticket = $inputq->dequeue_nb; |
87
|
|
|
|
|
|
|
if (! $ticket) |
88
|
|
|
|
|
|
|
{ |
89
|
|
|
|
|
|
|
# Only shut down if all work has been processed (no requests). |
90
|
|
|
|
|
|
|
if ($$self{Shutdown}) |
91
|
|
|
|
|
|
|
{ |
92
|
|
|
|
|
|
|
return 0; |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sleep 1; |
96
|
|
|
|
|
|
|
} |
97
|
|
|
|
|
|
|
else |
98
|
|
|
|
|
|
|
{ |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
$ticket = thaw($ticket); |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
#printf ("%s thr %d request: %s", |
103
|
|
|
|
|
|
|
# ref($self), threads->tid, Dumper($$ticket{Request}) ); |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
my @resp = eval { $workersub->( @{ $$ticket{Request} }) }; |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
my $exception = $@ if $@; |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
$$ticket{Response} = \@resp; |
110
|
|
|
|
|
|
|
$$ticket{Request} = \@resp; # in case we're sending this downstream |
111
|
|
|
|
|
|
|
$$ticket{Exception} = $exception; |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
my $resp = freeze( $ticket ); |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
$outputq->enqueue( $resp ); |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
$exception = undef; |
118
|
|
|
|
|
|
|
$resp = undef; |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
} |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
sub pending_responses |
124
|
|
|
|
|
|
|
{ |
125
|
|
|
|
|
|
|
my $self = shift; |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
return ( $$self{ProcessingCount} + $$self{Responses}->pending ) > 0 ? 1 : 0; |
128
|
|
|
|
|
|
|
} |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
sub send_request |
131
|
|
|
|
|
|
|
{ |
132
|
|
|
|
|
|
|
my $self = shift; |
133
|
|
|
|
|
|
|
my @request = @_; |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
$$self{TicketNumber}++; # no need to lock. Only modified in main thread. |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
# OriginalRequest is set here and never modified. It should be sent back to the caller in the response queue. |
138
|
|
|
|
|
|
|
my $reqticket = { TicketNumber => $$self{TicketNumber}, Request => \@request, OriginalRequest => \@request }; |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
$$self{Requests}->enqueue(freeze($reqticket)); |
141
|
|
|
|
|
|
|
$$self{ProcessingCount}++; |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
return $$self{TicketNumber}; |
144
|
|
|
|
|
|
|
} |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
sub get_response |
147
|
|
|
|
|
|
|
{ |
148
|
|
|
|
|
|
|
my $self = shift; |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
my %opts = @_; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
my $resp; |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
if ($opts{NoWait}) |
155
|
|
|
|
|
|
|
{ |
156
|
|
|
|
|
|
|
$resp = thaw ( $$self{Responses}->dequeue_nb ); |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
else |
159
|
|
|
|
|
|
|
{ |
160
|
|
|
|
|
|
|
$resp = thaw ( $$self{Responses}->dequeue ); |
161
|
|
|
|
|
|
|
} |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
delete $$resp{Request}; # Was probably modified. Remove to eliminate confusion. |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
$$self{ProcessingCount}-- if $resp; |
166
|
|
|
|
|
|
|
return $resp; |
167
|
|
|
|
|
|
|
} |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
package MultiThread::Pipeline; |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
=head1 MultiThread::Pipeline |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
use MultiThread; |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
my $pool = MultiThread::WorkerPool->new( EntryPoint => \&add_one ); |
177
|
|
|
|
|
|
|
my $pipeline = MultiThread::Pipeline->new( Pipeline => [ $pool, \&add_two ] ); |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
# Push 10 requests into the queue for processing. |
180
|
|
|
|
|
|
|
# Worker processing will begin immediately. |
181
|
|
|
|
|
|
|
map { |
182
|
|
|
|
|
|
|
$ticketnum = $pipeline->send_request( $_ ); |
183
|
|
|
|
|
|
|
} ( 1..10 ); |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
# Gather responses back from the response queue. They may |
186
|
|
|
|
|
|
|
# not be in the original order. Use the TicketNumber or OriginalRequest |
187
|
|
|
|
|
|
|
# attributes of the ticket to identify the work unit. TicketNumber will |
188
|
|
|
|
|
|
|
# correspond to the ticket number returned by $workpool->send_request(). |
189
|
|
|
|
|
|
|
# |
190
|
|
|
|
|
|
|
# DO NOT count on TicketNumber being an integer. It may be necessary |
191
|
|
|
|
|
|
|
# to use alphanumeric at some point to avoid numeric overflows for large |
192
|
|
|
|
|
|
|
# workloads or long-running processes. Simply compare TicketNumbers |
193
|
|
|
|
|
|
|
# as strings, and you'll be safe. |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
while ( $pipeline->pending_responses ) |
196
|
|
|
|
|
|
|
{ |
197
|
|
|
|
|
|
|
# get_response has a NoWait => 1 option for non-blocking reads |
198
|
|
|
|
|
|
|
# if you'd rather write a polling loop instead. |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
my $ticket = $pipeline->get_response; # or get_response( NoWait => 1) |
201
|
|
|
|
|
|
|
printf "Answer was %s\n", $$ticket{Response}->[0]; |
202
|
|
|
|
|
|
|
} |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
$pipeline->shutdown; |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
sub add_one { |
207
|
|
|
|
|
|
|
my $input = shift; |
208
|
|
|
|
|
|
|
return $input + 1; |
209
|
|
|
|
|
|
|
} |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
sub add_two { |
212
|
|
|
|
|
|
|
my $input = shift; |
213
|
|
|
|
|
|
|
return $input + 2; |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
=cut |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
=head1 PURPOSE |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
This module implements a Pipeline multithreading model. Several concurrent |
221
|
|
|
|
|
|
|
threads are started -- one for each subroutine in the pipeline. The subs and |
222
|
|
|
|
|
|
|
other MultiThread objects are daisy-chained together by queues. The output queue |
223
|
|
|
|
|
|
|
of one step in the pipeline is the input queue of the following step. |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
In the contrived example above, add_one is run by a WorkerPool object, and the |
226
|
|
|
|
|
|
|
WorkerPool object is placed first in the pipeline. It takes the request |
227
|
|
|
|
|
|
|
and adds one to it, returning the result. The result of add_one is fed as a request |
228
|
|
|
|
|
|
|
directly into add_two, which adds two and returns the result. Because add_two is the final |
229
|
|
|
|
|
|
|
step in the chain, its output will be returned to the user via the get_response method. |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
MultiThread::Pipeline is great when you have multiple steps that take different times to complete. |
232
|
|
|
|
|
|
|
MultiThread::Pipeline handles the inter-step queuing for you, so you don't need to worry about |
233
|
|
|
|
|
|
|
what happens when one step outruns another. Each step simply processes asynchronously |
234
|
|
|
|
|
|
|
as quickly as it can. |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
One major consideration with MultiThread::Pipeline versus MultiThread::WorkerPool is that |
237
|
|
|
|
|
|
|
MultiThread::Pipeline starts one thread for every sub in the pipeline, without regard for the |
238
|
|
|
|
|
|
|
number of CPUs on the system. |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=cut |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
=head1 METHODS |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
=cut |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
require 5.008; |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
use strict; |
250
|
|
|
|
|
|
|
use warnings; |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
use base qw( MultiThread::Base ); |
253
|
|
|
|
|
|
|
use Thread::Queue; |
254
|
|
|
|
|
|
|
use Data::Dumper; |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
use Storable qw(freeze thaw); |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
=head2 new |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
Create a new MultiThread::Pipeline object. |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
MultiThread::Pipeline->new( %opts); |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=head3 Pipeline |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
This required parameter takes an arrayref of coderefs or other MultiThread objects |
267
|
|
|
|
|
|
|
which represent the pipeline. |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
A single thread will be started for each coderef, and they will be daisychained together |
270
|
|
|
|
|
|
|
in the order given in the array. The first sub will consume the original request, |
271
|
|
|
|
|
|
|
and the last sub in the chain will return its results to the caller. |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
You can also mix in other pre-instantiated MultiThread objects, and they will |
274
|
|
|
|
|
|
|
function as expected. In the synopsis example, the first step in the Pipeline is a |
275
|
|
|
|
|
|
|
WorkerPool, the results of which are fed into the &add_two sub. You can theoretically |
276
|
|
|
|
|
|
|
use as many MultiThread objects as you want in a Pipeline and they should all play nice |
277
|
|
|
|
|
|
|
together. |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
=cut |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
sub new |
283
|
|
|
|
|
|
|
{ |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
my $class = shift; |
286
|
|
|
|
|
|
|
my %opts = @_; |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
unless ( $opts{Pipeline} ) |
289
|
|
|
|
|
|
|
{ |
290
|
|
|
|
|
|
|
print "You must supply a arrayref of coderefs using the Pipeline parameter!\n"; |
291
|
|
|
|
|
|
|
return undef; |
292
|
|
|
|
|
|
|
} |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
my $self = $class->SUPER::new; |
295
|
|
|
|
|
|
|
$$self{Pipeline} = $opts{Pipeline}; |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
my %defaults = ( |
298
|
|
|
|
|
|
|
); |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
map { |
301
|
|
|
|
|
|
|
$opts{$_} = $defaults{$_} unless defined $opts{$_}; |
302
|
|
|
|
|
|
|
} keys %defaults; |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
# Allow MultiThread objects to lead the PipeLine. This should work |
305
|
|
|
|
|
|
|
# whether the object is a WorkerPool or another PipeLine. |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
#printf ("First Pipeline ref is a %s\n", ref ($$self{Pipeline}[0])); |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
if (ref ($$self{Pipeline}[0]) =~ /^MultiThread/ ) |
310
|
|
|
|
|
|
|
{ |
311
|
|
|
|
|
|
|
$$self{Requests} = $$self{Pipeline}[0]->_request_queue; |
312
|
|
|
|
|
|
|
} |
313
|
|
|
|
|
|
|
elsif ( ref($$self{Pipeline}[0]) eq 'CODE' ) |
314
|
|
|
|
|
|
|
{ |
315
|
|
|
|
|
|
|
$$self{Requests} = Thread::Queue->new(); |
316
|
|
|
|
|
|
|
} |
317
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
$self = bless($self, $class); |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
$self->start_pipeline($$self{Pipeline}); |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
return $self; |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
} |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
# Bridge the output and input queues of two back-to-back MultiThread::* objects. |
329
|
|
|
|
|
|
|
# Because the objects are already constructed, we can't dictate the queues they |
330
|
|
|
|
|
|
|
# use internally. This sub will run in its own thread and will be inserted |
331
|
|
|
|
|
|
|
# automatically into the pipeline wherever two MultiThread objects are |
332
|
|
|
|
|
|
|
# back to back. |
333
|
|
|
|
|
|
|
sub _bridge_queues |
334
|
|
|
|
|
|
|
{ |
335
|
|
|
|
|
|
|
#print "Bridging values: " . Dumper(\@_); |
336
|
|
|
|
|
|
|
return (@_); |
337
|
|
|
|
|
|
|
} |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
sub start_pipeline |
340
|
|
|
|
|
|
|
{ |
341
|
|
|
|
|
|
|
my $self = shift; |
342
|
|
|
|
|
|
|
my $entrypoints = shift; |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
my ($inputq, $outputq); |
345
|
|
|
|
|
|
|
my (@newentries); |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
my $nextworker = 0; |
348
|
|
|
|
|
|
|
foreach my $step ( @{$entrypoints} ) |
349
|
|
|
|
|
|
|
{ |
350
|
|
|
|
|
|
|
$nextworker++; |
351
|
|
|
|
|
|
|
push @newentries, $step; |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
if ( ref($step) =~ /^MultiThread/ and ref( $entrypoints->[$nextworker] ) =~ /^MultiThread/ ) |
354
|
|
|
|
|
|
|
{ |
355
|
|
|
|
|
|
|
#print "Detected back-to-back MultiThread objects. Inserting bridge...\n"; |
356
|
|
|
|
|
|
|
push @newentries, \&_bridge_queues; |
357
|
|
|
|
|
|
|
} |
358
|
|
|
|
|
|
|
} |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
#print "New pipeline: " . Dumper(\@newentries); |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
$entrypoints = \@newentries; |
363
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
$inputq = $$self{Requests}; |
365
|
|
|
|
|
|
|
|
366
|
|
|
|
|
|
|
$nextworker = 0; |
367
|
|
|
|
|
|
|
foreach my $worker (@{$entrypoints}) |
368
|
|
|
|
|
|
|
{ |
369
|
|
|
|
|
|
|
$nextworker++; |
370
|
|
|
|
|
|
|
#printf "Worker is a %s\n", ref($worker); |
371
|
|
|
|
|
|
|
if (ref($worker) eq 'CODE') |
372
|
|
|
|
|
|
|
{ |
373
|
|
|
|
|
|
|
# We need to look ahead in the pipeline to see if the next |
374
|
|
|
|
|
|
|
# object to be chained in has an existing input queue. If so, |
375
|
|
|
|
|
|
|
# we need to use that as our current-item outputq. |
376
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
if ( defined $entrypoints->[$nextworker] and ref( $entrypoints->[$nextworker] ) =~ /^MultiThread/ ) |
378
|
|
|
|
|
|
|
{ |
379
|
|
|
|
|
|
|
$outputq = $entrypoints->[$nextworker]->_request_queue; |
380
|
|
|
|
|
|
|
#print "Got outputq $outputq\n"; |
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
else |
383
|
|
|
|
|
|
|
{ |
384
|
|
|
|
|
|
|
$outputq = Thread::Queue->new; |
385
|
|
|
|
|
|
|
} |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
my $t = threads->create(\&MultiThread::Base::worker, $self, $worker, $inputq, $outputq); |
388
|
|
|
|
|
|
|
push (@{ $$self{Threads} }, $t) if ($t); |
389
|
|
|
|
|
|
|
} |
390
|
|
|
|
|
|
|
elsif ( ref($worker) =~ '^MultiThread' ) |
391
|
|
|
|
|
|
|
{ |
392
|
|
|
|
|
|
|
# Next worker will NEVER be a MultiThread object |
393
|
|
|
|
|
|
|
# because we re-wrote the pipeline to break up MT objects |
394
|
|
|
|
|
|
|
# with a _bridge_queues CODEREF. |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
$outputq = $worker->_response_queue; |
397
|
|
|
|
|
|
|
} |
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
$inputq = $outputq; |
400
|
|
|
|
|
|
|
} |
401
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
$$self{Responses} = $outputq; |
403
|
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
return 1; |
405
|
|
|
|
|
|
|
} |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
# In a Pipeline situation where the Pipeline may contain other MultiThread::* |
408
|
|
|
|
|
|
|
# objects, we need to shut them down first before shutting down the parent |
409
|
|
|
|
|
|
|
# structures. Otherwise we'll be leaking threads like crazy. |
410
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
sub shutdown |
412
|
|
|
|
|
|
|
{ |
413
|
|
|
|
|
|
|
my $self = shift; |
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
foreach my $worker (@{ $self->{Pipeline} } ) |
416
|
|
|
|
|
|
|
{ |
417
|
|
|
|
|
|
|
if ( ref($worker) =~ /^MultiThread/ ) |
418
|
|
|
|
|
|
|
{ |
419
|
|
|
|
|
|
|
$worker->shutdown; |
420
|
|
|
|
|
|
|
} |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
return $self->SUPER::shutdown; |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
|
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
package MultiThread::WorkerPool; |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
=head1 MultiThread::WorkerPool |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
use MultiThread; |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
my $workerpool = MultiThread::WorkerPool->new( EntryPoint => \&add_one ); |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
# Push 10 requests into the queue for processing. |
437
|
|
|
|
|
|
|
# Worker processing will begin immediately. |
438
|
|
|
|
|
|
|
map { |
439
|
|
|
|
|
|
|
$ticketnum = $workerpool->send_request( $_ ); |
440
|
|
|
|
|
|
|
} ( 1..10 ); |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
# Gather responses back from the response queue. They may |
443
|
|
|
|
|
|
|
# not be in the original order. Use the TicketNumber or OriginalRequest |
444
|
|
|
|
|
|
|
# attributes of the ticket to identify the work unit. TicketNumber will |
445
|
|
|
|
|
|
|
# correspond to the ticket number returned by $workpool->send_request(). |
446
|
|
|
|
|
|
|
# |
447
|
|
|
|
|
|
|
# DO NOT count on TicketNumber being an integer. It may be necessary |
448
|
|
|
|
|
|
|
# to use alphanumeric at some point to avoid numeric overflows for large |
449
|
|
|
|
|
|
|
# workloads or long-running processes. Simply compare TicketNumbers |
450
|
|
|
|
|
|
|
# as strings, and you'll be safe. |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
while ( $workerpool->pending_responses ) |
453
|
|
|
|
|
|
|
{ |
454
|
|
|
|
|
|
|
# get_response has a NoWait => 1 option for non-blocking reads |
455
|
|
|
|
|
|
|
# if you'd rather write a polling loop instead. |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
my $ticket = $workerpool->get_response; # or get_response( NoWait => 1) |
458
|
|
|
|
|
|
|
printf "Answer was %s\n", $$ticket{Response}->[0]; |
459
|
|
|
|
|
|
|
} |
460
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
$workerpool->shutdown; |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
sub add_one { |
464
|
|
|
|
|
|
|
my $input = shift; |
465
|
|
|
|
|
|
|
return $input + 1; |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
=cut |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
=head1 PURPOSE |
472
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
This module implements a WorkerPool multithreading model. Several concurrent |
474
|
|
|
|
|
|
|
threads are started using a single sub for processing. All requests are serviced |
475
|
|
|
|
|
|
|
in parallel using the sub provided. |
476
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
MultiThread::WorkerPool is ideal when you have many items that must all be processed |
478
|
|
|
|
|
|
|
similarly, as quickly as possible. Simply write the sub that will handle the processing |
479
|
|
|
|
|
|
|
and hand it off to MultiThread::WorkerPool to run several instances of your sub |
480
|
|
|
|
|
|
|
to process your work items. |
481
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
All items are put onto a single work queue, and the first available thread will |
483
|
|
|
|
|
|
|
consume and process it. All threads in a Worker Pool are identical. Compare this |
484
|
|
|
|
|
|
|
to a MultiThread::Pipeline, where each thread runs a different subroutine. |
485
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
=cut |
487
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
=head1 METHODS |
489
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
=cut |
491
|
|
|
|
|
|
|
|
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
require 5.008; |
494
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
use strict; |
496
|
|
|
|
|
|
|
use warnings; |
497
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
# This has to be before "use Thread::Queue"! |
499
|
|
|
|
|
|
|
use base qw(MultiThread::Base); |
500
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
use threads::shared; |
502
|
|
|
|
|
|
|
use Thread::Queue; |
503
|
|
|
|
|
|
|
use Data::Dumper; |
504
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
=head2 new |
506
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
Create a new MultiThread::WorkerPool object. |
508
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
MultiThread::WorkerPool->new( %opts ); |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
=head3 MaxWorkers |
512
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
The MaxWorkers parameter overrides automatic detection of CPU count. Normally, |
514
|
|
|
|
|
|
|
the WorkerPool will figure out how many CPUs are on the host machine, and will |
515
|
|
|
|
|
|
|
start an equal number of workers. If it incorrectly detects CPU count for your machine, |
516
|
|
|
|
|
|
|
or if you know it's safe to start more or less, you can use this parameter |
517
|
|
|
|
|
|
|
to do so. |
518
|
|
|
|
|
|
|
|
519
|
|
|
|
|
|
|
=head3 EntryPoint |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
Pass in a sub reference to the initial sub to be called in each thread. This sub will |
522
|
|
|
|
|
|
|
be called for each item on the queue, and will run in parallel with itself. |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
=cut |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
sub new |
527
|
|
|
|
|
|
|
{ |
528
|
|
|
|
|
|
|
|
529
|
|
|
|
|
|
|
my $class = shift; |
530
|
|
|
|
|
|
|
my %opts = @_; |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
unless ( $opts{EntryPoint} ) |
533
|
|
|
|
|
|
|
{ |
534
|
|
|
|
|
|
|
print "You must supply a coderef using the EntryPoint parameter!\n"; |
535
|
|
|
|
|
|
|
return undef; |
536
|
|
|
|
|
|
|
} |
537
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
my %defaults = ( |
539
|
|
|
|
|
|
|
MaxWorkers => &get_CPU_count() |
540
|
|
|
|
|
|
|
); |
541
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
map { |
544
|
|
|
|
|
|
|
$opts{$_} = $defaults{$_} unless defined $opts{$_}; |
545
|
|
|
|
|
|
|
} keys %defaults; |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
my $self = $class->SUPER::new; |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
$$self{EntryPoint} = $opts{EntryPoint}; |
550
|
|
|
|
|
|
|
$$self{MaxWorkers} = $opts{MaxWorkers}; |
551
|
|
|
|
|
|
|
|
552
|
|
|
|
|
|
|
#printf "Starting %d worker threads.\n", $$self{MaxWorkers}; |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
$self = bless($self, $class); |
555
|
|
|
|
|
|
|
|
556
|
|
|
|
|
|
|
$self->start_pool; |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
return $self; |
559
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
} |
561
|
|
|
|
|
|
|
|
562
|
|
|
|
|
|
|
=head1 worker_count |
563
|
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
Returns the number of worker threads in this WorkerPool instance. |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
=cut |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
sub worker_count |
569
|
|
|
|
|
|
|
{ |
570
|
|
|
|
|
|
|
my $self = shift; |
571
|
|
|
|
|
|
|
return $self->{MaxWorkers}; |
572
|
|
|
|
|
|
|
} |
573
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
# I think this can be combined with MultiThread::Pipeline::start_pipeline and moved to MultiThread::Base. |
575
|
|
|
|
|
|
|
# They're very similar. |
576
|
|
|
|
|
|
|
sub start_pool |
577
|
|
|
|
|
|
|
{ |
578
|
|
|
|
|
|
|
my $self = shift; |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
my $class = ref($self); |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
my $inputq = Thread::Queue->new; |
583
|
|
|
|
|
|
|
my $outputq = Thread::Queue->new; |
584
|
|
|
|
|
|
|
my $entrypoint = $$self{EntryPoint}; |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
$$self{Requests} = $inputq; |
587
|
|
|
|
|
|
|
$$self{Responses} = $outputq; |
588
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
share($inputq); |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
for (my $x = 0; $x < $$self{MaxWorkers}; $x++) |
592
|
|
|
|
|
|
|
{ |
593
|
|
|
|
|
|
|
my $t = threads->create(\&MultiThread::Base::worker, $self, $entrypoint, $inputq, $outputq); |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
push (@{ $$self{Threads} }, $t) if ($t); |
596
|
|
|
|
|
|
|
} |
597
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
return 1; |
599
|
|
|
|
|
|
|
} |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
sub get_CPU_count |
602
|
|
|
|
|
|
|
{ |
603
|
|
|
|
|
|
|
my $procs = Sys::CPU::cpu_count(); |
604
|
|
|
|
|
|
|
return $procs ? $procs : 1; # In case cpu_count returns 0 or undef |
605
|
|
|
|
|
|
|
} |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=head1 COMMON INSTANCE METHODS |
608
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
Both MultiThread::WorkerPool and MultiThread::Pipeline derive from MultiThread::Base, |
610
|
|
|
|
|
|
|
so they share a number of methods. |
611
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
=head2 pending_responses |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
Returns a boolean signifying whether there are still outstanding requests to be |
615
|
|
|
|
|
|
|
processed. This will return true until the last response has been collected. |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
This method takes no arguments. |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
|
620
|
|
|
|
|
|
|
=head2 get_response |
621
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
When worker subs finish their work, their return values are put back onto a response queue |
623
|
|
|
|
|
|
|
for collection. Call this method on your WorkerPool or Pipeline object to retrieve the |
624
|
|
|
|
|
|
|
return tickets, one at a time. |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
The value returned is not only the return value of the worker thread. It is a hash containing |
627
|
|
|
|
|
|
|
TicketNumber, OriginalRequest, Response, and Exception. |
628
|
|
|
|
|
|
|
|
629
|
|
|
|
|
|
|
=head3 Exception |
630
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
If the Pipeline or WorkerPool die()'s, this will be set to the die() message. This is so that |
632
|
|
|
|
|
|
|
a single request's problem does not prevent other requests from running to completion. |
633
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
I do not provide a facility for allowing one request to kill the whole program because there's |
635
|
|
|
|
|
|
|
no way of knowing the state of each request at the time the program died. If you really want |
636
|
|
|
|
|
|
|
that behavior, do something like this in your get_response loop: |
637
|
|
|
|
|
|
|
|
638
|
|
|
|
|
|
|
die($$ticket{Exception}) if ($$ticket{Exception}); |
639
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
=head3 OriginalRequest |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
The original request as given to send_request. This is necessary because the Request |
644
|
|
|
|
|
|
|
is set to the return value of each sub for use in Pipelining. |
645
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
=head3 Response |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
The return values of the final sub in a Pipeline or the Worker in a WorkerPool. This will be an |
649
|
|
|
|
|
|
|
ARRAYREF because PERL subs can return arrays. You'll need to dereference it. Versions of MultiThread |
650
|
|
|
|
|
|
|
prior to 0.9 only handled a single return value, which is why dereferencing was not necessary prior to |
651
|
|
|
|
|
|
|
that version. |
652
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
=head3 TicketNumber |
654
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
The request number assigned to the request and returned to the caller of send_request. This |
656
|
|
|
|
|
|
|
number persists throughout the process for the purpose of matching up the response with the |
657
|
|
|
|
|
|
|
request. |
658
|
|
|
|
|
|
|
|
659
|
|
|
|
|
|
|
=head2 send_request |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
This enqueues a request for processing. It takes no arguments of its own; all arguments |
662
|
|
|
|
|
|
|
given will be passed directly to the subs you provide as @_. Call this exactly as you |
663
|
|
|
|
|
|
|
would call your worker/pipeline methods directly. Just remember that any arguments |
664
|
|
|
|
|
|
|
given must be serializable by the Storable module. |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
This sub returns a unique ticket number (unique within the scope of the current instance). |
667
|
|
|
|
|
|
|
This ticket number will be present in the response as well, so you can match up the |
668
|
|
|
|
|
|
|
request with the response ticket if you need to. |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
=head2 shutdown |
671
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
Tell the WorkerPool or Pipeline that it should finish its pending processing, stop the worker |
673
|
|
|
|
|
|
|
processes, and exit. Shutdown will wait for all threads to exit before returning to the caller. |
674
|
|
|
|
|
|
|
In cases of nested objects, e.g. Pipelines containing WorkerPools, the parent object will call |
675
|
|
|
|
|
|
|
shutdown() on its child MultiThread objects as well. |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
=cut |
678
|
|
|
|
|
|
|
|
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
=head1 BUGS |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
Be careful that you're passing serializable data types that can be freeze()'d and thaw()'d. |
683
|
|
|
|
|
|
|
These modules make extensive use of Thread::Queue, which requires all structures |
684
|
|
|
|
|
|
|
be serialized before being passed onto the queues. |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
=head1 AUTHOR |
688
|
|
|
|
|
|
|
|
689
|
|
|
|
|
|
|
David Spadea |
690
|
|
|
|
|
|
|
http://www.spadea.net |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
=cut |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
=head1 COPYRIGHT & LICENSE |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
Copyright 2008 David Spadea, all rights reserved. |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
699
|
|
|
|
|
|
|
under the same terms as Perl itself. |
700
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
=cut |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
1; |
705
|
|
|
|
|
|
|
|