File Coverage

blib/lib/AnyEvent/ProcessPool/Process.pm
Criterion Covered Total %
statement 72 80 90.0
branch 14 18 77.7
condition 7 8 87.5
subroutine 20 24 83.3
pod 0 7 0.0
total 113 137 82.4


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Process;
2             # ABSTRACT: Manages an individual worker process
3             $AnyEvent::ProcessPool::Process::VERSION = '0.06';
4 3     3   144266 use strict;
  3         11  
  3         72  
5 3     3   14 use warnings;
  3         7  
  3         62  
6              
7 3     3   13 use Config;
  3         5  
  3         100  
8 3     3   700 use AnyEvent;
  3         3841  
  3         62  
9 3     3   793 use AnyEvent::Open3::Simple;
  3         54360  
  3         100  
10 3     3   801 use AnyEvent::ProcessPool::Task;
  3         23  
  3         92  
11 3     3   817 use AnyEvent::ProcessPool::Util 'next_id';
  3         9  
  3         212  
12 3     3   854 use String::Escape 'backslash';
  3         12761  
  3         190  
13 3     3   19 use Try::Catch;
  3         6  
  3         2061  
14              
15             my $perl = $Config{perlpath};
16             my $ext = $Config{_exe};
17             $perl .= $ext if $^O ne 'VMS' && $perl !~ /$ext$/i;
18             my @inc = map { sprintf('-I%s', backslash($_)) } @_, @INC;
19             my $cmd = join ' ', @inc, q(-MAnyEvent::ProcessPool::Worker -e 'AnyEvent::ProcessPool::Worker::run()');
20              
21             sub new {
22 57     57 0 19397 my ($class, %param) = @_;
23 57   100     214 my $include = $param{include} || [];
24              
25             return bless {
26             id => next_id,
27             limit => $param{limit},
28 57         184 include => join(' ', map { sprintf('-I%s', backslash($_)) } @$include),
  1         43  
29             started => undef,
30             process => undef,
31             ps => undef,
32             pending => [],
33             }, $class;
34             }
35              
36             sub DESTROY {
37 40     40   1067 my $self = shift;
38 40 50       98 $self->{ps}->close if $self->{ps};
39 40 50       91 if (ref $self->{pending}) {
40 40         58 foreach my $cv (@{$self->{pending}}) {
  40         243  
41 0 0       0 if ($cv) {
42 0         0 $cv->croak('AnyEvent::ProcessPool::Process went out of scope with pending tasks');
43             }
44             }
45             }
46             }
47              
48             sub pid {
49 4     4 0 287 my $self = shift;
50 4 100       9 return $self->{ps}->pid if $self->is_running;
51             }
52              
53             sub is_running {
54 41     41 0 424 my $self = shift;
55             return defined $self->{started}
56 41   66     327 && $self->{started}->ready;
57             }
58              
59             sub await {
60 33     33 0 135 my $self = shift;
61 33 100       108 $self->start unless $self->is_running;
62 33         3887 $self->{started}->recv;
63             }
64              
65             sub stop {
66 56     56 0 95 my $self = shift;
67 56 100       152 if (defined $self->{process}) {
68 19         84 $self->{ps}->close;
69 19         470 undef $self->{started};
70 19         46 undef $self->{process};
71 19         154 undef $self->{ps};
72             }
73             }
74              
75             sub start {
76 25     25 0 49 my $self = shift;
77 25         656 $self->{started} = AE::cv;
78             $self->{process} = AnyEvent::Open3::Simple->new(
79             on_start => sub{
80 25     25   377 $self->{started}->send;
81             },
82             on_stdout => sub{
83 30     30   1548941 my ($ps, $line) = @_;
84 30         313 my $task = AnyEvent::ProcessPool::Task->decode($line);
85 30         81 my $cv = shift @{$self->{pending}};
  30         156  
86 30         231 $cv->send($task);
87              
88 30 100 100     819 if ($self->{limit} && $ps->user->{reqs} <= 0) {
89 6         84 $self->stop;
90             }
91             },
92             on_stderr => sub{
93 0     0   0 warn "AnyEvent::ProcessPool::Worker: $_[1]\n";
94             },
95             on_error => sub{
96 0     0   0 die "error launching worker process: $_[0]";
97             },
98             on_signal => sub{
99 0     0   0 warn "worker terminated in response to signal: $_[1]";
100 0         0 $self->stop;
101             },
102             on_fail => sub{
103 0     0   0 warn "worker terminated with non-zero exit status: $_[1]";
104 0         0 $self->stop;
105             },
106 25         3861 );
107              
108             $self->{process}->run("$perl $self->{include} $cmd", sub{
109 25     25   90013 my $ps = shift;
110 25 100       229 $ps->user({reqs => $self->{limit}}) if $self->{limit};
111 25         190 $self->{ps} = $ps;
112 25         1785 });
113             }
114              
115             sub run {
116 30     30 0 81 my ($self, $task) = @_;
117 30         112 $self->await;
118              
119 30         1660 my $cv = AE::cv;
120 30         322 push @{$self->{pending}}, $cv;
  30         146  
121              
122 30         237 $self->{ps}->say($task->encode);
123 30 100       1997 --$self->{ps}->user->{reqs} if $self->{limit};
124              
125 30         293 return $cv;
126             }
127              
128             1;
129              
130             __END__