File Coverage

blib/lib/App/Manoc/Netwalker/Poller/Workers.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package App::Manoc::Netwalker::Poller::Workers;
2 1     1   18398 use Moose;
  1         2  
  1         7  
3              
4             our $VERSION = '2.99.2'; ##TRIAL VERSION
5              
6 1     1   6066 use namespace::autoclean;
  1         2  
  1         7  
7              
8             with 'App::Manoc::Netwalker::WorkersRole', 'App::Manoc::Logger::Role';
9              
10 1     1   78 use Try::Tiny;
  1         2  
  1         53  
11 1     1   150 use POE qw(Filter::Reference Filter::Line);
  0            
  0            
12             use App::Manoc::Netwalker::Poller::Scoreboard;
13             use App::Manoc::Netwalker::Poller::DeviceTask;
14             use App::Manoc::Netwalker::Poller::ServerTask;
15              
16             has scoreboard => (
17             is => 'ro',
18             isa => 'App::Manoc::Netwalker::Poller::Scoreboard',
19             default => sub { App::Manoc::Netwalker::Poller::Scoreboard->new() },
20             );
21              
22              
23             sub worker_stdout {
24             my ( $self, $task_info, $job_id ) = @_;
25             my $class = $task_info->{class};
26             my $id = $task_info->{id};
27             my $status = $task_info->{status};
28              
29             $self->log->debug("got feedback class=$class, id=$id status=$status");
30              
31             $class eq 'device' and
32             $self->scoreboard->set_device_info( $id, $status, $job_id );
33             $class eq 'server' and
34             $self->scoreboard->set_server_info( $id, $status, $job_id );
35              
36             if ( $status eq 'DONE' ) {
37             my $report = App::Manoc::Netwalker::Poller::TaskReport->thaw( $task_info->{report} );
38             my $host = $report->host;
39             # TODO check status
40             my $has_errors = $report->has_error();
41             $self->log->debug("$class $host $status $has_errors");
42             }
43             }
44              
45              
46             sub worker_error {
47             my ( $self, $job_id ) = @_;
48              
49             $self->log->warn("Worker error job $job_id");
50             $self->scoreboard->delete_job_info($job_id);
51             }
52              
53              
54             sub worker_finished {
55             my ( $self, $job_id ) = @_;
56              
57             $self->log->debug("Job $job_id finished");
58              
59             my $info = $self->scoreboard->get_job_info($job_id);
60             $info or return;
61             my $status =
62             $info->[0] eq 'device' ? $self->scoreboard->get_device_status( $info->[1] ) :
63             $self->scoreboard->get_server_status( $info->[1] );
64              
65             defined($status) && $status eq 'RUNNING' and
66             $self->log->warn("Job $job_id finished but status was still RUNNING");
67              
68             $self->scoreboard->delete_job_info($job_id);
69             }
70              
71              
72             sub on_tick {
73             my ( $self, $kernel ) = @_;
74              
75             $self->schedule_devices();
76             $self->schedule_servers();
77             }
78              
79              
80             sub schedule_devices {
81             my $self = shift;
82              
83             # TODO better check
84             my $now = time();
85              
86             my $decommissioned_devices =
87             $self->schema->resultset('Device')->search( { decommissioned => 1 } )->get_column('id');
88              
89             my @device_ids = $self->schema->resultset('DeviceNWInfo')->search(
90             {
91             scheduled_attempt => { '<=' => $now },
92             device_id => { -not_in => $decommissioned_devices->as_query }
93             }
94             )->get_column('device_id')->all();
95              
96             $self->log->debug( "on tick: devices to refresh: " . join( ',', @device_ids ) );
97             foreach my $id (@device_ids) {
98              
99             # check if it's already scheduled
100             my $status = $self->scoreboard->get_device_status($id);
101             if ( defined($status) && ( $status eq 'QUEUED' || $status eq 'RUNNING' ) ) {
102             $self->log->debug("Device $id is $status, skipping");
103             next;
104             }
105              
106             $self->enqueue_device($id);
107             }
108             }
109              
110              
111             sub enqueue_device {
112             my ( $self, $device_id ) = @_;
113              
114             $self->scoreboard->set_device_info( $device_id, 'QUEUED' );
115             $self->enqueue( sub { $self->visit_device($device_id) } );
116             $self->log->debug("Enqueued device $device_id");
117             }
118              
119              
120             sub visit_device {
121             my ( $self, $device_id ) = @_;
122              
123             my $task_info = {
124             class => 'device',
125             id => $device_id,
126             status => 'RUNNING',
127             };
128             print @{ POE::Filter::Reference->new->put( [$task_info] ) };
129              
130             try {
131             my $updater = App::Manoc::Netwalker::Poller::DeviceTask->new(
132             {
133             schema => $self->schema,
134             config => $self->config,
135             device_id => $device_id,
136             }
137             );
138             $updater->update;
139              
140             $task_info->{status} = 'DONE';
141             $task_info->{report} = $updater->task_report->freeze;
142              
143             undef $updater;
144             }
145             catch {
146             $self->log->error("caught error in device updater: $_");
147             $task_info->{status} = 'ERROR';
148             };
149             print @{ POE::Filter::Reference->new->put( [$task_info] ) };
150             $self->log->debug("device updater job for $device_id finished");
151             }
152              
153              
154             sub schedule_servers {
155             my $self = shift;
156              
157             my $now = time();
158              
159             my $decommissioned_servers =
160             $self->schema->resultset('Server')->search( { decommissioned => 1 } )->get_column('id');
161              
162             my @server_ids = $self->schema->resultset('ServerNWInfo')->search(
163             {
164             scheduled_attempt => { '<=' => $now },
165             server_id => { -not_in => $decommissioned_servers->as_query }
166             }
167             )->get_column('server_id')->all();
168              
169             $self->log->debug( "on tick: servers to refresh: " . join( ',', @server_ids ) );
170             foreach my $id (@server_ids) {
171              
172             # check if it's already scheduled
173             my $status = $self->scoreboard->get_server_status($id);
174             if ( defined($status) && ( $status eq 'QUEUED' || $status eq 'RUNNING' ) ) {
175             $self->log->debug("Server $id is $status, skipping");
176             next;
177             }
178              
179             $self->enqueue_server($id);
180             }
181             }
182              
183              
184             sub enqueue_server {
185             my ( $self, $server_id ) = @_;
186              
187             $self->scoreboard->set_server_info( $server_id, 'QUEUED' );
188             $self->enqueue( sub { $self->visit_server($server_id) } );
189             $self->log->debug("Enqueued server $server_id");
190             }
191              
192              
193             sub visit_server {
194             my ( $self, $server_id ) = @_;
195              
196             my $task_info = {
197             class => 'server',
198             id => $server_id,
199             status => 'RUNNING',
200             };
201             print @{ POE::Filter::Reference->new->put( [$task_info] ) };
202              
203             try {
204             my $updater = App::Manoc::Netwalker::Poller::ServerTask->new(
205             {
206             schema => $self->schema,
207             config => $self->config,
208             server_id => $server_id,
209             }
210             );
211             $updater->update;
212              
213             $task_info->{status} = 'DONE';
214             $task_info->{report} = $updater->task_report->freeze;
215              
216             undef $updater;
217             }
218             catch {
219             $self->log->error("caught error in server updater: $_");
220             $task_info->{status} = 'ERROR';
221             };
222             print @{ POE::Filter::Reference->new->put( [$task_info] ) };
223              
224             $self->log->debug("server updater job for $server_id finished");
225              
226             }
227              
228             no Moose;
229             __PACKAGE__->meta->make_immutable;
230              
231             # Local Variables:
232             # mode: cperl
233             # indent-tabs-mode: nil
234             # cperl-indent-level: 4
235             # cperl-indent-parens-as-block: t
236             # End:
237              
238             __END__
239              
240             =pod
241              
242             =head1 NAME
243              
244             App::Manoc::Netwalker::Poller::Workers
245              
246             =head1 VERSION
247              
248             version 2.99.2
249              
250             =head2 worker_stdout
251              
252             Called when a child prints to STDOUT. Used to get status updates from
253             workers processes.
254              
255             =head2 worker_error
256              
257             =head2 worker_finished
258              
259             =head2 on_tick
260              
261             Called by the scheduler.
262              
263             =head2 schedule_devices
264              
265             =head2 enqueue_device
266              
267             =head2 visit_device
268              
269             =head2 schedule_servers
270              
271             =head2 enqueue_server
272              
273             =head2 visit_server
274              
275             =head1 AUTHORS
276              
277             =over 4
278              
279             =item *
280              
281             Gabriele Mambrini <gmambro@cpan.org>
282              
283             =item *
284              
285             Enrico Liguori
286              
287             =back
288              
289             =head1 COPYRIGHT AND LICENSE
290              
291             This software is copyright (c) 2017 by Gabriele Mambrini.
292              
293             This is free software; you can redistribute it and/or modify it under
294             the same terms as the Perl 5 programming language system itself.
295              
296             =cut