File Coverage

blib/lib/AnyEvent/Task/Server.pm
Criterion Covered Total %
statement 57 78 73.0
branch 10 20 50.0
condition 3 4 75.0
subroutine 12 16 75.0
pod 0 4 0.0
total 82 122 67.2


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Server;
2              
3 29     29   517260 use common::sense;
  29         48  
  29         175  
4              
5 29     29   2945 use AnyEvent;
  29         7861  
  29         584  
6 29     29   1095 use AnyEvent::Util;
  29         18189  
  29         1393  
7 29     29   15121 use AnyEvent::Socket;
  29         312275  
  29         3100  
8              
9 29     29   12390 use AnyEvent::Task::Util;
  29         47  
  29         776  
10 29     29   10686 use AnyEvent::Task::Server::Worker;
  29         70  
  29         19669  
11              
12              
13             sub new {
14 14     14 0 239 my ($class, %arg) = @_;
15 14         45 my $self = {};
16 14         70 bless $self, $class;
17              
18              
19 14         371 $self->{all_done_cv} = AE::cv;
20 14         2208 $self->{children} = {};
21 14         77 $self->{curr_worker_id} = 0;
22              
23              
24 14         53 $self->{name} = $arg{name};
25 14   100 0   311 $self->{setup} = $arg{setup} || sub {};
26 14   50 0   268 $self->{checkout_done} = $arg{checkout_done} || sub {};
27 14 100       93 $self->{hung_worker_timeout} = exists $arg{hung_worker_timeout} ? $arg{hung_worker_timeout} : (60*5);
28              
29              
30 14 50       77 if (defined $self->{name}) {
31 0         0 $0 = "AET-Server:$self->{name}";
32             }
33              
34              
35 14 50       55 if ($arg{listen}) {
36 14         54 $self->{listen} = $arg{listen};
37              
38 14         64 my $host = $self->{listen}->[0];
39 14         42 my $service = $self->{listen}->[1];
40              
41             $self->{server_guard} = tcp_server $host, $service, sub {
42 33     33   3679150 my ($fh) = @_;
43 33         334 $self->handle_new_connection($fh);
44 14         236 };
45             } else {
46 0         0 die "unspecified listen path";
47             }
48              
49              
50 14 50       4309 if (exists $arg{interface}) {
51 14         34 my $interface = $arg{interface};
52              
53 14 100       66 if (ref $interface eq 'CODE') {
    50          
54 10         27 $self->{interface} = $interface;
55             } elsif (ref $interface eq 'HASH') {
56             $self->{interface} = sub {
57 0     0   0 my $method = shift;
58 0         0 $interface->{$method}->(@_);
59 4         27 };
60             } else {
61 0         0 die "interface must be a sub or a hash";
62             }
63             } else {
64 0         0 die "unspecified interface";
65             }
66              
67              
68 14         97 return $self;
69             }
70              
71              
72             sub fork_task_server {
73 28     28 0 392 my (@args) = @_;
74              
75 28 50       120 if (wantarray) {
76             return AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
77 0     0   0 AnyEvent::Task::Server->new(@args)->run;
78 0         0 });
79             } else {
80             AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
81 14     14   297 AnyEvent::Task::Server->new(@args)->run;
82 0         0 return undef;
83 28         226 });
84              
85 14         673 return undef;
86             }
87             }
88              
89              
90              
91              
92             sub handle_new_connection {
93 33     33 0 51 my ($self, $fh) = @_;
94              
95 33         372 my ($monitor_fh1, $monitor_fh2) = AnyEvent::Util::portable_socketpair;
96              
97 33         1372 $self->{curr_worker_id}++;
98              
99 33         22752 my $rv = fork;
100              
101 33 50       1153 if ($rv) {
    0          
102 33         863 close($fh);
103 33         434 close($monitor_fh2);
104              
105 33         3851 $self->{children}->{$rv} = {
106             monitor_fh => $monitor_fh1,
107             };
108             } elsif ($rv == 0) {
109 0         0 close($monitor_fh1);
110              
111             ## Don't want keep-alive pipes of other workers open in this worker
112 0         0 foreach my $child (keys %{$self->{children}}) {
  0         0  
113 0         0 close($self->{children}->{$child}->{monitor_fh});
114             }
115              
116 0 0       0 if (defined $self->{name}) {
117 0         0 $0 = "AET-Worker:$self->{name}($self->{curr_worker_id})";
118             }
119              
120 0         0 AnyEvent::Task::Server::Worker::handle_worker($self, $fh, $monitor_fh2);
121 0         0 die "handle_worker should never return";
122             } else {
123 0         0 close($fh);
124 0         0 close($monitor_fh1);
125 0         0 close($monitor_fh2);
126 0         0 die "fork failed: $!";
127             }
128             }
129              
130              
131             sub run {
132 14     14 0 28 my ($self) = @_;
133              
134 14         195 $self->{all_done_cv}->recv;
135             }
136              
137              
138             1;