File Coverage

blib/lib/AnyEvent/ProcessPool/Process.pm
Criterion Covered Total %
statement 61 73 83.5
branch 10 14 71.4
condition 2 6 33.3
subroutine 17 21 80.9
pod 0 6 0.0
total 90 120 75.0


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Process;
2             # ABSTRACT: Supervisor for a single, forked process
3             $AnyEvent::ProcessPool::Process::VERSION = '0.06_001'; # TRIAL
4              
5 3     3   157939 $AnyEvent::ProcessPool::Process::VERSION = '0.06001';use common::sense;
  3         19  
  3         19  
6              
7 3     3   1080 use Moo;
  3         23001  
  3         14  
8 3     3   3062 use Carp;
  3         6  
  3         134  
9 3     3   622 use AnyEvent;
  3         3738  
  3         64  
10 3     3   1357 use AnyEvent::Handle;
  3         24612  
  3         112  
11 3     3   24 use AnyEvent::Util qw(fork_call portable_socketpair fh_nonblocking);
  3         6  
  3         146  
12 3     3   927 use AnyEvent::ProcessPool::Task;
  3         9  
  3         98  
13 3     3   993 use AnyEvent::ProcessPool::Util qw(next_id cpu_count);
  3         12  
  3         271  
14 3     3   20 use Try::Catch;
  3         7  
  3         2099  
15              
16             has limit => (is => 'ro');
17             has handle => (is => 'rw', clearer => 1, predicate => 'is_running');
18             has count => (is => 'rw', default => sub{ 0 });
19             has stopped => (is => 'rw', default => sub{ 0 });
20              
21             sub DEMOLISH {
22 55     55 0 12743 my $self = shift;
23 55 100       630 $self->stop if $self->is_running;
24             }
25              
26             sub stop {
27 69     69 0 105 my $self = shift;
28 69         157 $self->stopped(1);
29 69 100       221 $self->handle->push_shutdown if $self->handle;
30             }
31              
32             sub has_limit {
33 21     21 0 45 my $self = shift;
34 21         194 return defined $self->limit;
35             }
36              
37             sub has_capacity {
38 38     38 0 89 my $self = shift;
39 38   66     214 return $self->is_running && (!$self->has_limit || $self->count < $self->limit);
40             }
41              
42             sub run {
43 38     38 0 101 my ($self, $task) = @_;
44 38 50       154 return if $self->stopped;
45              
46 38 100       142 if (!$self->has_capacity) {
47 25 100       157 if (my $handle = $self->handle) {
48 8         259 $self->clear_handle;
49 8     0   114 $handle->on_eof(sub{ undef $handle });
  0         0  
50 8         86 $handle->push_shutdown;
51             }
52              
53 25         332 $self->spawn;
54             }
55              
56 38         2761 my $cv = AE::cv;
57              
58 38         1029 $self->count($self->count + 1);
59 38         434 $self->handle->push_write($task->encode . "\n");
60             $self->handle->push_read(line => "\n", sub{
61 38     38   23361 my ($handle, $line, $eol) = @_;
62 38         140 my $task = AnyEvent::ProcessPool::Task->decode($line);
63 38         192 $cv->send($task);
64 38         5398 });
65              
66 38         4008 return $cv;
67             }
68              
69             sub spawn {
70 27     27 0 57 my $self = shift;
71 27 50       80 return if $self->stopped;
72              
73 27         113 my ($child, $parent) = portable_socketpair;
74              
75 27         1561 fh_nonblocking $child, 1;
76             my $handle = AnyEvent::Handle->new(
77             fh => $child,
78 0     0   0 on_eol => sub{ warn "$$ EOL: @_" },
79 0     0   0 on_error => sub{ warn "$$ ERROR: @_" },
80 27         775 );
81              
82             my $forked = fork_call {
83 0     0   0 close $child;
84 0         0 local $| = 1;
85              
86 0         0 my $count = 0;
87              
88 0         0 while (defined(my $line = <$parent>)) {
89 0         0 my $task = AnyEvent::ProcessPool::Task->decode($line);
90 0         0 $task->execute;
91              
92 0         0 syswrite $parent, $task->encode . "\n";
93              
94 0 0 0     0 if ($self->has_limit && ++$count >= $self->limit) {
95 0         0 break;
96             }
97             }
98             }
99       14     sub {
100 27         5478 };
101              
102 27         36571 close $parent;
103              
104 27         436 $self->handle($handle);
105 27         609 $self->count(0);
106             }
107              
108             1;
109              
110             __END__