File Coverage

blib/lib/AnyEvent/Task/Server/Worker.pm
Criterion Covered Total %
statement 21 87 24.1
branch 0 36 0.0
condition n/a
subroutine 7 13 53.8
pod 0 4 0.0
total 28 140 20.0


line stmt bran cond sub pod time code
1             package AnyEvent::Task::Server::Worker;
2              
3 29     29   97 use common::sense;
  29         33  
  29         152  
4              
5 29     29   1262 use AnyEvent::Util;
  29         35  
  29         1498  
6 29     29   103 use Guard;
  29         33  
  29         987  
7              
8 29     29   11295 use POSIX; ## POSIX::_exit is used so we don't unlink the unix socket file created by our parent before the fork
  29         109274  
  29         139  
9 29     29   70506 use IO::Select;
  29         34870  
  29         1175  
10 29     29   18793 use JSON::XS;
  29         151972  
  29         1817  
11 29     29   162 use Scalar::Util qw/blessed/;
  29         41  
  29         24241  
12              
13              
14             my $setup_has_been_run;
15             my $json;
16             my $sel;
17              
18              
19              
20             sub handle_worker {
21 0     0 0   eval {
22 0           handle_worker_wrapped(@_);
23             };
24              
25 0           POSIX::_exit(1);
26             }
27              
28              
29             sub handle_worker_wrapped {
30 0     0 0   my ($server, $fh, $monitor_fh) = @_;
31              
32 0           AnyEvent::Util::fh_nonblocking $fh, 0;
33 0           AnyEvent::Util::fh_nonblocking $monitor_fh, 0;
34              
35 0           $json = JSON::XS->new->utf8;
36              
37 0           $sel = IO::Select->new;
38 0           $sel->add($fh, $monitor_fh);
39              
40 0           while(1) {
41 0           my @all_ready = $sel->can_read;
42              
43 0           foreach my $ready (@all_ready) {
44 0 0         if ($ready == $monitor_fh) {
    0          
45             ## Lost connection to server
46 0           $sel->remove($monitor_fh);
47             } elsif ($ready == $fh) {
48 0           process_data($server, $fh);
49             }
50             }
51             }
52             }
53              
54              
55              
56             sub process_data {
57 0     0 0   my ($server, $fh) = @_;
58              
59 0     0     scope_guard { alarm 0 };
  0            
60 0     0     local $SIG{ALRM} = sub { print STDERR "Killing hung worker ($$)\n"; POSIX::_exit(1); };
  0            
  0            
61 0 0         alarm $server->{hung_worker_timeout} if $server->{hung_worker_timeout};
62              
63 0           my $read_rv = sysread $fh, my $buf, 4096;
64              
65 0 0         if (!defined $read_rv) {
    0          
66 0 0         return if $!{EINTR};
67 0           POSIX::_exit(1);
68             } elsif ($read_rv == 0) {
69 0           POSIX::_exit(1);
70             }
71              
72 0           for my $input ($json->incr_parse($buf)) {
73 0           my $output;
74 0           my $output_meta = {};
75              
76 0           my $cmd = shift @$input;
77 0           my $input_meta = shift @$input;
78              
79 0 0         if ($cmd eq 'do') {
    0          
80 0           my $val;
81              
82 0           local $AnyEvent::Task::Logger::log_defer_object;
83              
84 0           eval {
85 0 0         if (!$setup_has_been_run) {
86 0           $server->{setup}->();
87 0           $setup_has_been_run = 1;
88             }
89              
90 0           $val = scalar $server->{interface}->(@$input);
91             };
92              
93 0           my $err = $@;
94              
95             $output_meta->{ld} = $AnyEvent::Task::Logger::log_defer_object->{msg}
96 0 0         if defined $AnyEvent::Task::Logger::log_defer_object;
97              
98 0 0         if ($err) {
99 0 0         $err = "$err" if blessed $err;
100              
101 0 0         $err = "setup exception: $err" if !$setup_has_been_run;
102              
103 0           $output = ['er', $output_meta, $err,];
104             } else {
105 0 0         if (blessed $val) {
106 0           $val = "interface returned object: " . ref($val) . "=($val)";
107 0           $output = ['er', $output_meta, $val,];
108             } else {
109 0           $output = ['ok', $output_meta, $val,];
110             }
111             }
112              
113 0           my $output_json = eval { encode_json($output); };
  0            
114              
115 0 0         if ($@) {
116 0           $output = ['er', $output_meta, "error JSON encoding interface output: $@",];
117 0           $output_json = encode_json($output);
118             }
119              
120 0           my_syswrite($fh, $output_json);
121             } elsif ($cmd eq 'dn') {
122 0           $server->{checkout_done}->();
123             } else {
124 0           die "unknown command: $cmd";
125             }
126             }
127             }
128              
129              
130             sub my_syswrite {
131 0     0 0   my ($fh, $output) = @_;
132              
133 0           while(1) {
134 0           my $rv = syswrite $fh, $output;
135              
136 0 0         if (!defined $rv) {
137 0 0         next if $!{EINTR};
138 0           POSIX::_exit(1); ## probably parent died and we're getting broken pipe
139             }
140              
141 0 0         return if $rv == length($output);
142              
143 0           POSIX::_exit(1); ## partial write: probably the socket is set nonblocking
144             }
145             }
146              
147             1;