File Coverage

blib/lib/IPC/QWorker.pm
Criterion Covered Total %
statement 81 85 95.2
branch 7 10 70.0
condition 1 3 33.3
subroutine 14 14 100.0
pod 0 6 0.0
total 103 118 87.2


line stmt bran cond sub pod time code
1             package IPC::QWorker;
2             # ABSTRACT: processing a queue in parallel
3              
4 12     12   89662 use 5.000;
  12         48  
  12         477  
5 12     12   83 use strict;
  12         25  
  12         409  
6 12     12   60 use warnings;
  12         34  
  12         371  
7 12     12   12336 use utf8;
  12         136  
  12         70  
8              
9             our $VERSION = '0.07'; # VERSION
10             our $DEBUG = 0;
11              
12 12     12   15612 use IO::Select;
  12         25756  
  12         744  
13              
14 12     12   6721 use IPC::QWorker::Worker;
  12         70  
  12         12071  
15              
16             sub new {
17 11     11 0 165 my $this = shift;
18 11   33     88 my $class = ref($this) || $this;
19 11         132 my $self = {
20             '_workers' => [],
21             '_queue' => [],
22             '_pids' => {},
23             '_ready_workers' => [],
24             '_io_select' => IO::Select->new(),
25             @_
26             };
27              
28 11         440 bless( $self, $class );
29 11         33 return ($self);
30             }
31              
32             sub create_workers {
33 11     11 0 5676 my $self = shift();
34 11         22 my $num_workers = shift();
35 11         22 my $worker;
36              
37 11         66 for ( my $i = 0 ; $i < $num_workers ; $i++ ) {
38             # create the worker
39 65         9449 $worker = IPC::QWorker::Worker->new(@_);
40             # add him to the list of workers
41 55         270 push( @{ $self->{'_workers'} }, $worker);
  55         1423  
42             # add him to the pid->workers index
43 55         1861 $self->{'_pids'}->{$worker->{'pid'}} = $worker;
44             # add him to IO::Select
45 55         1800 $self->{'_io_select'}->add( $worker->{'pipe'} );
46             }
47             }
48              
49             sub push_queue {
50 121     121 0 129 my $self = shift;
51              
52 121         114 push( @{ $self->{'_queue'} }, @_ );
  121         376  
53             }
54              
55             sub _get_ready_workers {
56 32     32   47 my $self = shift();
57 32         34 my $timeout = shift();
58 32         38 my @can_read_pipes;
59             my $i;
60 0         0 my $wpid;
61              
62             # if we have no ready workers find some
63 32         121 @can_read_pipes = $self->{'_io_select'}->can_read($timeout);
64 32 50       28246 if ($IPC::QWorker::DEBUG) {
65 0         0 print STDERR "found " . scalar(@can_read_pipes) . " ready workers!\n";
66             }
67 32         68 foreach $i (@can_read_pipes) {
68             # get pid from a msg like "12345 READY\n"
69 130         8420 ($wpid) = split(' ', readline($i));
70 130         370 $self->{'_pids'}->{$wpid}->{'ready'} = 1;
71 130         146 push(@{$self->{'_ready_workers'}}, $self->{'_pids'}->{$wpid});
  130         448  
72             }
73             }
74              
75             sub process_queue {
76 2     2 0 760 my $self = shift;
77 2         8 my $timeout = shift;
78 2         4 my $qentry;
79             my $worker;
80              
81 2 100       17 if(defined($timeout)) {
82             # if timeout is set wait for timeout till a worker is ready
83 1         35 $self->_get_ready_workers($timeout);
84 1         2 while($worker = shift(@{$self->{'_ready_workers'}})) {
  11         10087  
85 10         17 $worker->send_entry(shift(@{ $self->{'_queue'}}));
  10         49  
86             }
87             } else {
88             # loop over the Q till its empty
89             # will block while waiting for ready workers
90             # returns when the queue is empty
91 1         3 while($qentry = shift(@{ $self->{'_queue'}})) {
  111         15582  
92 110         264 while(!scalar(@{$self->{'_ready_workers'}})) {
  137         408  
93 27 50       662 if ($IPC::QWorker::DEBUG) {
94 0         0 print STDERR "no ready workers. wait for workers...\n";
95             }
96 27         69 $self->_get_ready_workers();
97             }
98              
99 110         520 $worker = shift(@{$self->{'_ready_workers'}});
  110         198  
100 110         359 $worker->send_entry($qentry);
101             }
102             }
103             }
104              
105             sub _get_busy_workers {
106 5     5   11 my $self = shift();
107 5         7 my @result;
108             my $worker;
109              
110 5         6 foreach $worker (@{$self->{'_workers'}}) {
  5         24  
111 50 100       113 if(!$worker->{'ready'}) {
112 19         32 push(@result, $worker);
113             }
114             }
115 5         104 return(@result);
116             }
117              
118             # will block till all workers are finished
119             sub flush_queue {
120 1     1 0 104 my $self = shift();
121 1         4 my @busy_workers;
122 1         50 my $select = IO::Select->new();
123              
124 1         36 while(scalar(@busy_workers = $self->_get_busy_workers())) {
125 4 50       12 if ($IPC::QWorker::DEBUG) {
126 0         0 print STDERR "still " . scalar(@busy_workers) . " busy workers...\n";
127             }
128 4         10 $self->_get_ready_workers();
129             }
130             }
131              
132             sub stop_workers {
133 1     1 0 139 my $self = shift;
134 1         2 my $worker;
135              
136             # may be we could also use signals here
137 1         3 foreach $worker ( @{ $self->{'_workers'} } ) {
  1         4  
138 10         99 $worker->exit_child();
139             }
140             }
141              
142             1;
143             __END__