File Coverage

blib/lib/Poe/Wheel/Spawner.pm
Criterion Covered Total %
statement 17 62 27.4
branch 0 22 0.0
condition 0 9 0.0
subroutine 6 15 40.0
pod 3 3 100.0
total 26 111 23.4


line stmt bran cond sub pod time code
1             package Poe::Wheel::Spawner;
2              
3 1     1   13695 use 5.006;
  1         3  
4 1     1   3 use strict;
  1         2  
  1         15  
5 1     1   2 use warnings;
  1         8  
  1         27  
6              
7 1         3 use fields qw/
8             pool_size
9             stop_if_done
10             workload
11             _workers_sig_count
12 1     1   545 /;
  1         998  
13              
14 1         5 use POE qw/
15             Wheel::Run
16             Filter::Reference
17 1     1   557 /;
  1         31973  
18              
19 1     1   85136 use version;
  1         2091  
  1         7  
20              
21             =head1 NAME
22              
23             Poe::Wheel::Spawner - A simple subprocess spawner
24              
25             =head1 DESCRIPTION
26              
27             Poe::Wheel::Spawner starts on L only one subprocess for the L. Until number of subprocesses does not exceed L each subprocess can L another one.
28              
29             =head1 VERSION
30              
31             Version v0.021.1
32              
33             =cut
34              
35             $Poe::Wheel::Spawner::VERSION = qv("v0.021.1");
36              
37             =head1 SYNOPSIS
38              
39             use M43::POE::Wheel::Spawner;
40              
41             my $foo = M43::POE::Wheel::Spawner->new(
42             pool_size => 2,
43             stop_if_done => 1,
44             workload => sub { _workload() }
45             );
46              
47             $foo->run();
48              
49             sub _workload {
50              
51             # request for a new sibling
52             $foo->spawn($$);
53              
54             # ...
55             }
56              
57             =head1 SUBROUTINES/METHODS
58              
59             =head2 new(%arg)
60              
61             Arguments:
62              
63             =over
64              
65             =item
66              
67             pool_size
68              
69             the number of maximal parallel executed C
70              
71             =item
72              
73             stop_if_done
74              
75             stop after all C pid's are exited.
76              
77             run endless if !C
78              
79             =item
80              
81             workload
82              
83             CODE reference to be executed
84              
85             =back
86              
87             =cut
88              
89             sub new {
90 0     0 1   my Poe::Wheel::Spawner $self = shift;
91 0           my (%opts) = @_;
92 0 0         unless (ref($self)) {
93 0           $self = fields::new($self);
94             }
95              
96 0 0         if (defined($opts{pool_size})) {
97 0 0         $opts{pool_size} =~ /^\d+$/
98             || die "'pool_size' property expects a positive integer value";
99             }
100              
101 0   0       $self->{pool_size} = int(delete($opts{pool_size}) || 0);
102              
103 0           $self->{stop_if_done} = delete($opts{stop_if_done});
104 0           $self->{workload} = delete($opts{workload});
105 0           $self->{_workers_sig_count} = 0;
106              
107 0 0         %opts && warn sprintf("ignore unsupported properties '%s'", keys(%opts));
108              
109 0           return $self;
110             } ## end sub new
111              
112             =head2 run(%arg)
113              
114             optional C<%arg> arguments for L:
115              
116             =over
117              
118             =item
119              
120             debug
121              
122             default 0
123              
124             =item
125              
126             trace
127              
128             default 0
129              
130             =back
131              
132             create a L
133              
134             run L
135              
136             =cut
137              
138             sub run {
139 0     0 1   my ($self, %arg) = @_;
140              
141 0 0         ref($self->{workload}) eq 'CODE'
142             || die "work_method is not a code reference";
143              
144             POE::Session->create(
145 0   0       options => { debug => $arg{debug} || 0, trace => $arg{trace} || 0 },
      0        
146             object_states => [
147             $self => {
148             _start => '_handle_start',
149             _next => '_handle_start',
150             _sig_child => '_handle_sig_child',
151             _done => '_handle_done',
152             _stderr => '_handle_stderr',
153             _stdout => '_handle_stdout',
154             }
155             ]
156             );
157              
158 0           POE::Kernel->run();
159             } ## end sub run
160              
161             =head2 spawn($pid)
162              
163             request to spawn
164              
165             =cut
166              
167             sub spawn {
168 0     0 1   my ($self, $pid) = @_;
169 0           my $filter = POE::Filter::Reference->new();
170 0           my $output = $filter->put([{ busy_worker_pid => $pid }]);
171              
172 0           print @$output;
173             } ## end sub spawn
174              
175             #=head2 _handle_start
176             #
177             #handle C<_start> and C<_next> events defined in POE::Session, which is initialized in C.
178             #
179             #start execution of C by C parallel running pids
180             #
181             #=cut
182              
183             sub _handle_start {
184 0     0     my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
185              
186 0           my $pids_count = scalar(keys(%{ $heap->{worker_by_pid} }));
  0            
187 0 0         ($pids_count >= $self->{pool_size}) && return;
188              
189             my $w = POE::Wheel::Run->new(
190 0     0     Program => sub { &{ $self->{workload} } },
  0            
191 0           StdoutFilter => POE::Filter::Reference->new(),
192             StdoutEvent => "_stdout",
193             StderrEvent => "_stderr",
194             CloseEvent => "_done",
195             );
196              
197 0           $heap->{worker_by_pid}->{ $w->PID } = $w;
198 0           $kernel->sig_child($w->PID, "_sig_child");
199             } ## end sub _handle_start
200              
201             #=head2 _handle_sig_child
202             #
203             #Clear heap. Trigger '_next' if !stop_if_done and currently no child is busy
204             #
205             #=cut
206              
207             sub _handle_sig_child {
208 0     0     my ($self, $kernel, $heap, $pid, $exit_val)
209             = @_[OBJECT, KERNEL, HEAP, ARG1, ARG2];
210              
211 0           ++$self->{_workers_sig_count};
212              
213 0           my $child = delete $heap->{worker_by_pid}{$pid};
214 0 0         unless ($child) {
215 0           POE::Kernel::_die("no child pid: $pid");
216             }
217              
218 0           delete $heap->{busy_worker_pid}->{$pid};
219              
220 0 0         if ($self->{stop_if_done}) {
221 0 0         ($self->{_workers_sig_count} >= $self->{pool_size}) && return;
222             }
223             else {
224 0 0         (scalar(keys(%{ $heap->{busy_worker_pid} })))
  0            
225             || $kernel->yield("_next");
226             }
227             } ## end sub _handle_sig_child
228              
229             #=head2 _handle_done
230             #
231             #is not implemented yet
232             #
233             #=cut
234              
235       0     sub _handle_done { }
236              
237             #=head2 _handle_stderr
238             #
239             #provide STDERR to POE::Kernel::_warn
240             #
241             #=cut
242              
243             sub _handle_stderr {
244 0     0     my ($self, $input, $wheel_id) = @_[OBJECT, ARG0, ARG1];
245 0           POE::Kernel::_warn("wheel $wheel_id STDERR: $input");
246             }
247              
248             #=head2 _handle_stdout
249             #
250             #evaluate from child to stdout printed result.
251             #
252             #trigger _next event if child asks - by using busy_worker_pid printed to stdout - for a sibling
253             #
254             #=cut
255              
256             sub _handle_stdout {
257 0     0     my ($self, $kernel, $heap, $result) = @_[OBJECT, KERNEL, HEAP, ARG0];
258 0 0 0       if (ref($result) eq 'HASH' && $result->{busy_worker_pid}) {
259 0           $heap->{busy_worker_pid}->{ $result->{busy_worker_pid} } = 1;
260 0           $kernel->yield("_next");
261             }
262             } ## end sub _handle_stdout
263              
264             1; # End of Poe::Wheel::Spawner
265              
266             =head1 AUTHOR
267              
268             Alexei Pastuchov Epalik at cpan.orgE.
269              
270             =head1 REPOSITORY
271              
272             L
273              
274             =head1 LICENSE AND COPYRIGHT
275              
276              
277             Copyright 2014-2016 by Alexei Pastuchov Epalik at cpan.orgE.
278              
279             This library is free software; you can redistribute it and/or modify
280             it under the same terms as Perl itself.
281              
282             =cut