File Coverage

blib/lib/PJob/Server.pm
Criterion Covered Total %
statement 74 207 35.7
branch 12 56 21.4
condition 3 12 25.0
subroutine 14 37 37.8
pod 4 7 57.1
total 107 319 33.5


line stmt bran cond sub pod time code
1             package PJob::Server;
2             our $VERSION = '0.41';
3              
4             our $ALIAS = "POE JOB SERVER, Version: $VERSION";
5              
6 2     2   28519 use Any::Moose;
  2         67385  
  2         13  
7 2     2   148565 use Data::Dumper;
  2         17309  
  2         152  
8 2     2   2113 use POSIX qw/strftime/;
  2         16972  
  2         14  
9 2     2   2440 use Scalar::Util qw/reftype/;
  2         6  
  2         190  
10 2     2   12 use List::Util qw/first/;
  2         5  
  2         205  
11 2     2   35348 use List::MoreUtils qw/uniq/;
  2         3533  
  2         270  
12 2     2   28401 use POE qw/Component::Server::TCP Wheel::Run/;
  2         147229  
  2         18  
13             #use Smart::Comments;
14              
15             use constant {
16 2         6156 OUTPUT => 'Out',
17             ERROR => 'Err',
18             NOSUCHJOB => 'No Such A Job',
19             NOMORECON => 'Sorry, no more connection on this server',
20             NOTALLOWD => 'Sorry, you are not allowed on this server',
21             NOCLIEJOB => 'Sorry, no job found for you on this server',
22 2     2   378149 };
  2         6  
23              
24             has 'jobs' => (
25             is => 'rw',
26             isa => 'HashRef',
27             default => sub { {} },
28             );
29              
30             has 'port' => (
31             is => 'rw',
32             isa => 'Int',
33             default => '32080',
34             );
35              
36             has 'logfile' => (
37             is => 'rw',
38             isa => 'Str',
39             );
40              
41             has 'log_commands' => (
42             is => 'rw',
43             isa => 'Bool',
44             default => 0,
45             );
46              
47             has 'job_table' => (
48             is => 'rw',
49             isa => 'HashRef',
50             default => sub { {} },
51             );
52              
53             has '_dispatched' => (
54             is => 'rw',
55             isa => 'Bool',
56             default => 0,
57             );
58              
59             has '_pid' => (
60             is => 'rw',
61             default => sub { {} },
62             );
63              
64             has 'allowed_hosts' => (
65             is => 'rw',
66             isa => 'ArrayRef',
67             default => sub { [] },
68             );
69              
70             has 'max_connections' => (
71             is => 'rw',
72             isa => 'Int',
73             default => '-1',
74             );
75              
76             has '_interactive' => (
77             is => 'rw',
78             isa => 'ArrayRef',
79             default => sub { [] },
80             );
81              
82             # add programs to the job server
83             sub add {
84 1     1 1 3622 my ($self, @programs) = @_;
85              
86 1         4 foreach my $p (@programs) {
87 3 100 66     27 if (reftype $p && reftype $p eq 'HASH') {
    50          
88 1         2 $self->jobs({%{$self->jobs}, %{$p}});
  1         7  
  1         12  
89 1         3 next;
90             }
91             elsif (!reftype $p) {
92 2         4 $self->jobs({%{$self->jobs}, $p => $p});
  2         19  
93             }
94             }
95 1         4 return $self;
96             }
97              
98             sub add_interactive {
99 0     0 1 0 my ($self, @programs) = @_;
100 0         0 foreach my $p (@programs) {
101 0 0 0     0 if (reftype $p && reftype $p eq 'HASH') {
    0          
102 0         0 $self->jobs({%{$self->jobs}, %{$p}});
  0         0  
  0         0  
103 0         0 push @{$self->_interactive}, keys %{$p};
  0         0  
  0         0  
104 0         0 next;
105             }
106             elsif (!reftype $p) {
107 0         0 $self->jobs({%{$self->jobs}, $p => $p});
  0         0  
108 0         0 push @{$self->_interactive}, $p;
  0         0  
109             }
110             }
111 0         0 return $self;
112             }
113              
114             # run the job server
115             sub run {
116 0     0 1 0 my $self = shift;
117              
118 0         0 $self->_check_jobs;
119 0         0 $self->_append_jobs;
120 0         0 $self->_log_redirect;
121 0         0 $self->{_clients} = 0;
122             $self->{_session} = POE::Component::Server::TCP->new(
123             Alias => $ALIAS,
124             Port => $self->port,
125 0     0   0 ClientInput => sub { $self->_spawn(@_) },
126 0     0   0 ClientConnected => sub { $self->_client_connect(@_) },
127 0     0   0 ClientDisconnected => sub { $self->_client_disconnected(@_) },
128             InlineStates => {
129 0     0   0 job_stdout => sub { $self->send_to_client(OUTPUT, @_) },
130 0     0   0 job_stderr => sub { $self->send_to_client(ERROR, @_) },
131 0     0   0 job_close => sub { $self->_close(@_) },
132 0     0   0 job_signal => sub { $self->_sigchld(@_) },
133 0     0   0 usage => sub { $self->_usage(@_) },
134             }
135 0         0 );
136 0         0 $self->log(*STDOUT, "Started $ALIAS at Port: " . $self->port . "\n");
137 0         0 POE::Kernel->run();
138 0         0 return $self;
139             }
140              
141             # print usage information
142             sub _usage {
143 0     0   0 my $self = shift;
144 0         0 my $client = $_[HEAP]->{client};
145 0         0 my $remote_ip = $_[HEAP]->{remote_ip};
146 0         0 my $allowed_jobs = $self->job_table->{$remote_ip}; # jobs for this ip
147              
148 0         0 my $usage_str;
149              
150             #dispatched ? fetch from dispatched table, or else fetch from defined jobs
151 0 0       0 if ($self->_dispatched) {
152 0 0       0 if (@{$allowed_jobs}) {
  0         0  
153 0         0 $usage_str = 'Usage: ' . join ' ', sort @{$allowed_jobs};
  0         0  
154             }
155             else {
156 0         0 $usage_str = ERROR . "\t" . NOCLIEJOB;
157 0         0 $client->put($usage_str);
158 0         0 $_[KERNEL]->yield("shutdown");
159             }
160             }
161             else {
162 0         0 $usage_str = 'Usage: ' . join ' ', sort keys %{$self->jobs};
  0         0  
163             }
164 0         0 $client->put($usage_str);
165 0         0 $client->put('.');
166             }
167              
168             # run the program
169             sub _spawn {
170 0     0   0 my $self = shift;
171 0         0 my ($heap, $input) = @_[HEAP, ARG0];
172 0         0 my $client = $heap->{client};
173 0         0 my $remote_ip = $heap->{remote_ip};
174 0         0 my $remote_port = $heap->{remote_port};
175              
176 0 0       0 if ($heap->{job}->{$client}) {
177 0         0 $heap->{job}->{$client}->put($input);
178 0         0 return;
179             }
180 0 0       0 if ($input =~ /^quit$/i) {
181 0         0 $client->put("B'bye!");
182 0         0 $_[KERNEL]->yield("shutdown");
183 0         0 return;
184             }
185              
186 0 0       0 if ($input =~ /^usage$/i) {
187 0         0 $_[KERNEL]->yield('usage');
188 0         0 return;
189             }
190              
191             #dispatched, fetch jobs from dispatched job table, or else from defined jobs
192 0         0 my $program;
193 0 0       0 if ($self->_dispatched) {
194 0     0   0 $program = first { $_ eq $input } $self->job_table->{$remote_ip};
  0         0  
195             }
196             else {
197 0         0 $program = $self->jobs->{$input};
198             }
199              
200 0 0       0 unless (defined $program) {
201 0         0 $client->put(ERROR . "\t" . NOSUCHJOB);
202 0         0 $_[KERNEL]->yield("usage");
203 0         0 return;
204             }
205              
206 0 0   0   0 my $interactive = 1 if first { $_ eq $input } @{$self->_interactive};
  0         0  
  0         0  
207              
208 0 0       0 $self->log(*STDOUT, "$remote_ip:$remote_port : $program \n")
209             if $self->log_commands;
210              
211 0 0       0 my $kid = POE::Wheel::Run->new(
212             Program => $program,
213             StdoutEvent => 'job_stdout',
214             StderrEvent => 'job_stderr',
215             CloseEvent => 'job_close',
216             Conduit => $interactive ? 'pty' : 'pipe',
217             );
218              
219 0         0 $heap->{job}->{$client} = $kid;
220              
221             #just the program is enough right now. Feature can be added if necessary
222 0         0 $self->_pid->{$kid->PID} = $program;
223 0         0 $_[KERNEL]->sig_child($kid->PID, "job_signal");
224 0         0 $client->put("Job $program :::" . $kid->PID . " started.");
225             }
226              
227             # send information to client
228             sub send_to_client {
229 0     0 0 0 my $self = shift;
230 0         0 my $mark = shift;
231              
232 0         0 $_[HEAP]->{client}->put($mark . "\t" . $_[ARG0]);
233             }
234              
235             # not sure if it is needed
236             sub error_event {
237 0     0 0 0 my $self = shift;
238              
239             # my($oper,$errno,$errmsg) = @_[ARG0,ARG1,ARG2];
240             # $_[HEAP]->{client}->put("Error: $oper failed, message-- $errmsg");
241             }
242              
243             # _sigchld, delete Wheels stored in the HEAP and object
244             sub _sigchld {
245 0     0   0 my $self = shift;
246 0         0 my ($pid, $exit) = @_[ARG1, ARG2];
247 0         0 my $program = $self->_pid->{$pid};
248              
249 0 0       0 if ($exit != 0) {
250 0         0 $exit >>= 8;
251             }
252 0         0 $_[HEAP]->{client}->put("Job $program :::$pid exited with status $exit");
253 0         0 $_[HEAP]->{client}->put('.');
254 0         0 delete $_[HEAP]->{job}->{$_[HEAP]->{client}};
255 0         0 delete $self->_pid->{$pid};
256             }
257              
258             # not sure we need this or not
259 0     0   0 sub _close {
260              
261             # my $self = shift;
262              
263             # delete $_[HEAP]->{job};
264             }
265              
266             # connected, check max connections, check allowed hosts, yield usage and log information
267             sub _client_connect {
268 0     0   0 my $self = shift;
269 0         0 my ($kernel, $heap) = @_[KERNEL, HEAP];
270              
271 0         0 my $remote_ip = $heap->{remote_ip};
272 0         0 my $remote_port = $heap->{remote_port};
273 0         0 my $allow_hosts = $self->allowed_hosts;
274              
275             # reached max connection
276 0 0       0 if ($self->max_connections > 0) {
277 0 0       0 if ($self->{_clients} >= $self->max_connections) {
278 0         0 $self->send_to_client(ERROR, NOMORECON);
279 0         0 $kernel->yield('shutdown');
280 0         0 return;
281             }
282             }
283 0         0 $self->{_clients}++;
284              
285             # not allowed on this server
286 0 0 0     0 if (@{$allow_hosts} || $self->_dispatched) {
  0         0  
287 0 0   0   0 if (!first { $remote_ip eq $_ } @{$allow_hosts},
  0         0  
  0         0  
  0         0  
288             keys %{$self->job_table})
289             {
290 0         0 $heap->{client}->put(ERROR . "\t" . NOTALLOWD);
291 0         0 $kernel->yield('shutdown');
292 0         0 return;
293             }
294             }
295              
296             # allowed server
297 0         0 $kernel->yield('usage');
298 0         0 $self->log(*STDOUT,
299             "CONNECTION FROM ${remote_ip}:${remote_port} ESTABLISHED\n");
300             }
301              
302             # when disconnected, log it and reduce clients number by 1
303             sub _client_disconnected {
304 0     0   0 my $self = shift;
305 0         0 my $remote_ip = $_[HEAP]->{remote_ip};
306 0         0 my $remote_port = $_[HEAP]->{remote_port};
307              
308 0         0 $self->log(*STDERR, "DISCONNECTED FORM ${remote_ip}:${remote_port} \n");
309 0         0 $self->{_clients}--;
310             }
311              
312             # open log file and redirect stdout/stdin to it
313             sub _log_redirect {
314 1     1   1165 my $self = shift;
315              
316 1 50       7 if ($self->logfile) {
317 1 50       181 open STDOUT, '>>', $self->logfile or die $!;
318 1 50       28 open STDERR, ">&STDOUT" or die $!;
319             }
320             }
321              
322             # simple log method
323             sub log {
324 1     1 0 3 my $self = shift;
325 1         4 my ($fh, $output) = @_;
326 1         2 chomp $output;
327              
328 1 50       4 return unless $output;
329 1         263 my $now = strftime "%y/%m/%d %H:%M:%S", localtime;
330 1         119 print $fh "$now\t$output\n";
331             }
332              
333             # disaptch jobs to job table
334             sub job_dispatch {
335 1     1 1 539 my ($self, %table) = @_;
336              
337 1         6 $self->_dispatched(1);
338 1         5 foreach my $host (keys %table) {
339 2         3 foreach (@{$table{$host}}) {
  2         26  
340 5 50 33     30 if (reftype $_ && reftype $_ eq 'HASH') {
    50          
341 0         0 $self->add($_);
342 0         0 push @{$self->job_table->{$host}}, keys %$_;
  0         0  
343 0         0 next;
344             }
345             elsif (!reftype $_) {
346 5 100       6 if (exists ${$self->jobs}{$_}) {
  5         18  
347 4         5 push @{$self->job_table->{$host}}, $_;
  4         18  
348             }
349             else {
350 1         8 $self->log(*STDERR, "no program '$_' found in the jobs");
351             }
352             }
353             }
354             }
355              
356 1         6 my $comm_jobs = $self->job_table->{'*'};
357 1 50       5 return unless $comm_jobs;
358              
359 1         1 foreach my $key (keys %{$self->job_table}) {
  1         7  
360 2         5 my @all = uniq @{$self->job_table->{$key}}, @{$comm_jobs};
  2         7  
  2         20  
361 2         15 $self->job_table->{$key} = [@all];
362             }
363             }
364              
365             # Called before start the server. Dispatch the jobs for $self->allowed_hosts
366             sub _append_jobs {
367 1     1   976 my $self = shift;
368              
369 1         5 my $comm_jobs = delete $self->job_table->{'*'};
370 1         3 foreach my $host (@{$self->allowed_hosts}) {
  1         15  
371 2         3 my @all = uniq @{$self->job_table->{$host}}, @{$comm_jobs};
  2         8  
  2         11  
372 2         14 $self->job_table->{$host} = [@all];
373             }
374             }
375              
376             # don't set up any jobs like 'usage/quit'
377             sub _check_jobs {
378 0     0     my $self = shift;
379              
380 0 0   0     if (my $c = first { $_ =~ /^usage|quit$/i } keys %{$self->jobs}) {
  0            
  0            
381 0           $self->log(*STDERR, "'$c' is defined by default, choose another one");
382 0           exit 1;
383             }
384             }
385              
386             #
387             #sub _ {
388             # my $output = shift;
389             # $output = '\.' if $output =~ /^\.$/;
390             # return $output;
391             #}
392              
393 2     2   19 no Any::Moose;
  2         4  
  2         28  
394             __PACKAGE__->meta->make_immutable;
395             1;
396             __END__