File Coverage

blib/lib/AnyEvent/ProcessPool/Process.pm
Criterion Covered Total %
statement 69 77 89.6
branch 14 18 77.7
condition 7 8 87.5
subroutine 19 23 82.6
pod 0 7 0.0
total 109 133 81.9


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Process;
2             # ABSTRACT: Manages an individual worker process
3             $AnyEvent::ProcessPool::Process::VERSION = '0.07';
4 3     3   172154 use common::sense;
  3         20  
  3         12  
5 3     3   131 use Config;
  3         4  
  3         98  
6 3     3   777 use AnyEvent;
  3         4243  
  3         57  
7 3     3   1218 use AnyEvent::Open3::Simple;
  3         61347  
  3         97  
8 3     3   1118 use AnyEvent::ProcessPool::Task;
  3         9  
  3         89  
9 3     3   1114 use AnyEvent::ProcessPool::Util 'next_id';
  3         10  
  3         223  
10 3     3   1307 use String::Escape 'backslash';
  3         13540  
  3         165  
11 3     3   36 use Try::Catch;
  3         5  
  3         2797  
12              
13             my $perl = $Config{perlpath};
14             my $ext = $Config{_exe};
15             $perl .= $ext if $^O ne 'VMS' && $perl !~ /$ext$/i;
16             my @inc = map { sprintf('-I%s', backslash($_)) } @_, @INC;
17             my $cmd = join ' ', @inc, q(-MAnyEvent::ProcessPool::Worker -e 'AnyEvent::ProcessPool::Worker::run()');
18              
19             sub new {
20 57     57 0 20139 my ($class, %param) = @_;
21 57   100     198 my $include = $param{include} || [];
22              
23             return bless {
24             id => next_id,
25             limit => $param{limit},
26 57         199 include => join(' ', map { sprintf('-I%s', backslash($_)) } @$include),
  1         78  
27             started => undef,
28             process => undef,
29             ps => undef,
30             pending => [],
31             }, $class;
32             }
33              
34             sub DESTROY {
35 40     40   1424 my $self = shift;
36 40 50       89 $self->{ps}->close if $self->{ps};
37 40 50       87 if (ref $self->{pending}) {
38 40         48 foreach my $cv (@{$self->{pending}}) {
  40         394  
39 0 0       0 if ($cv) {
40 0         0 $cv->croak('AnyEvent::ProcessPool::Process went out of scope with pending tasks');
41             }
42             }
43             }
44             }
45              
46             sub pid {
47 4     4 0 608 my $self = shift;
48 4 100       16 return $self->{ps}->pid if $self->is_running;
49             }
50              
51             sub is_running {
52 41     41 0 494 my $self = shift;
53             return defined $self->{started}
54 41   66     390 && $self->{started}->ready;
55             }
56              
57             sub await {
58 33     33 0 146 my $self = shift;
59 33 100       135 $self->start unless $self->is_running;
60 33         5982 $self->{started}->recv;
61             }
62              
63             sub stop {
64 56     56 0 87 my $self = shift;
65 56 100       147 if (defined $self->{process}) {
66 19         98 $self->{ps}->close;
67 19         579 undef $self->{started};
68 19         41 undef $self->{process};
69 19         207 undef $self->{ps};
70             }
71             }
72              
73             sub start {
74 25     25 0 50 my $self = shift;
75 25         597 $self->{started} = AE::cv;
76             $self->{process} = AnyEvent::Open3::Simple->new(
77             on_start => sub{
78 25     25   586 $self->{started}->send;
79             },
80             on_stdout => sub{
81 30     30   1921528 my ($ps, $line) = @_;
82 30         302 my $task = AnyEvent::ProcessPool::Task->decode($line);
83 30         76 my $cv = shift @{$self->{pending}};
  30         138  
84 30         207 $cv->send($task);
85              
86 30 100 100     813 if ($self->{limit} && $ps->user->{reqs} <= 0) {
87 6         103 $self->stop;
88             }
89             },
90             on_stderr => sub{
91 0     0   0 warn "AnyEvent::ProcessPool::Worker: $_[1]\n";
92             },
93             on_error => sub{
94 0     0   0 die "error launching worker process: $_[0]";
95             },
96             on_signal => sub{
97 0     0   0 warn "worker terminated in response to signal: $_[1]";
98 0         0 $self->stop;
99             },
100             on_fail => sub{
101 0     0   0 warn "worker terminated with non-zero exit status: $_[1]";
102 0         0 $self->stop;
103             },
104 25         4444 );
105              
106             $self->{process}->run("$perl $self->{include} $cmd", sub{
107 25     25   179899 my $ps = shift;
108 25 100       377 $ps->user({reqs => $self->{limit}}) if $self->{limit};
109 25         281 $self->{ps} = $ps;
110 25         1967 });
111             }
112              
113             sub run {
114 30     30 0 80 my ($self, $task) = @_;
115 30         162 $self->await;
116              
117 30         2438 my $cv = AE::cv;
118 30         391 push @{$self->{pending}}, $cv;
  30         265  
119              
120 30         419 $self->{ps}->say($task->encode);
121 30 100       1860 --$self->{ps}->user->{reqs} if $self->{limit};
122              
123 30         425 return $cv;
124             }
125              
126             1;
127              
128             __END__