File Coverage

blib/lib/PkgForge/Handler/Incoming.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package PkgForge::Handler::Incoming; # -*-perl-*-
2 2     2   24375 use strict;
  2         5  
  2         129  
3 2     2   11 use warnings;
  2         4  
  2         99  
4              
5             # $Id: Incoming.pm.in 17739 2011-06-30 04:46:31Z squinney@INF.ED.AC.UK $
6             # $Source:$
7             # $Revision: 17739 $
8             # $HeadURL: https://svn.lcfg.org/svn/source/tags/PkgForge-Server/PkgForge_Server_1_1_10/lib/PkgForge/Handler/Incoming.pm.in $
9             # $Date: 2011-06-30 05:46:31 +0100 (Thu, 30 Jun 2011) $
10              
11             our $VERSION = '1.1.10';
12              
13 2     2   848 use English qw(-no_match_vars);
  2         4496  
  2         12  
14 2     2   3393 use File::Temp ();
  2         25106  
  2         41  
15 2     2   106762 use PkgForge::Job ();
  0            
  0            
16             use PkgForge::Queue ();
17             use Try::Tiny;
18              
19             use Readonly;
20             Readonly my $SECONDS_IN_MINUTE => 60;
21             Readonly my $TMPDIR_PERMS => oct('0750');
22              
23             use Moose;
24             use MooseX::Types::Moose qw(Int);
25              
26             extends 'PkgForge::Handler';
27              
28             with 'PkgForge::Registry::Role';
29              
30             has 'wait_for_job' => (
31             is => 'ro',
32             isa => Int,
33             default => 5 * $SECONDS_IN_MINUTE, # 5 minutes in seconds
34             required => 1,
35             documentation => 'Time to wait in case job is still being submitted',
36             );
37              
38             has '+logconf' => (
39             default => '/etc/pkgforge/log-incoming.cfg',
40             );
41              
42             no Moose;
43             __PACKAGE__->meta->make_immutable;
44              
45             sub preflight {
46             my ($self) = @_;
47              
48             my $in_dir = $self->incoming;
49              
50             if ( !-d $in_dir ) {
51             $self->logger->log_and_die(
52             level => 'critical',
53             message => "Incoming jobs directory '$in_dir' does not exist",
54             );
55             }
56              
57             my $out_dir = $self->accepted;
58              
59             if ( !-d $out_dir ) {
60             $self->logger->log_and_die(
61             level => 'critical',
62             message => "Accepted jobs directory '$out_dir' does not exist",
63             );
64             }
65              
66             try {
67             my $tmp = File::Temp->new(TEMPLATE => 'pkgforge-XXXX',
68             UNLINK => 1,
69             DIR => $out_dir );
70             $tmp->print("test\n") or die "Failed to print to temp file: $OS_ERROR\n";
71             $tmp->close or die "Could not close temp file: $OS_ERROR\n";
72             } catch {
73             $self->logger->log_and_die(
74             level => 'critical',
75             message => "Accepted jobs directory '$out_dir' is not writable: $_",
76             );
77             };
78              
79             my $tmpdir = $self->tmpdir;
80              
81             $self->logger->debug("Temporary directory is $tmpdir");
82              
83             if ( !-d $tmpdir ) {
84             my $ok = eval { File::Path::mkpath( $tmpdir, 0, $TMPDIR_PERMS ) };
85             if ( !$ok || $EVAL_ERROR ) {
86             $self->logger->log_and_die(
87             level => 'critical',
88             message => "Could not create temporary directory '$tmpdir': $EVAL_ERROR"
89             );
90             }
91             }
92              
93             chmod $TMPDIR_PERMS, $tmpdir or
94             $self->logger->log_and_die(
95             level => 'critical',
96             message => "Could not set permissions on temporary directory '$tmpdir': $OS_ERROR"
97             );
98              
99             $ENV{TMPDIR} = $tmpdir;
100              
101             return 1;
102             }
103              
104             sub execute {
105             my ($self) = @_;
106              
107             my $queue = $self->load_queue;
108             if ( !defined $queue ) {
109             return;
110             }
111              
112             for my $qentry ( $queue->entries ) {
113              
114             my $job = $self->load_job($qentry);
115             if ( !defined $job ) {
116             next;
117             }
118              
119             my $ok = $self->validate_job($job);
120             if ( !$ok ) {
121             next;
122             }
123              
124             my $accepted_job = $self->transfer_job($job);
125             if ( !defined $accepted_job ) {
126             next;
127             }
128              
129             my $registered = $self->register_tasks($accepted_job);
130             if ( !$registered ) {
131             next;
132             }
133              
134             $self->remove_from_incoming($job);
135             }
136              
137             return;
138             }
139              
140             sub load_queue {
141             my ($self) = @_;
142              
143             my $in_dir = $self->incoming;
144              
145             # Load the queue of jobs
146              
147             my $queue = try {
148             PkgForge::Queue->new(
149             directory => $in_dir,
150             logger => $self->logger,
151             );
152             } catch {
153             $self->logger->error( "Could not load a job queue from incoming directory '$in_dir': $_" );
154             return;
155             };
156              
157             if ( defined $queue ) {
158             $queue->erase_cruft;
159              
160             if ( $self->debug ) {
161             my $count = $queue->count_entries;
162             $self->logger->debug("Found $count entries in the incoming queue");
163             }
164             }
165              
166             return $queue;
167             }
168              
169             sub remove_from_incoming {
170             my ( $self, $obj ) = @_;
171              
172             # This will take either a qentry or a job object, they can both do scrub()
173              
174             if ( $self->debug ) {
175             $self->logger->debug("Scrubbing $obj from incoming queue");
176             }
177              
178             my @errors;
179             $obj->scrub( { error => \@errors } );
180             if ( scalar @errors > 0 ) {
181             $self->logger->error("Failed to erase incoming queue entry: @errors");
182             }
183              
184             return;
185             }
186              
187             sub load_job {
188             my ( $self, $qentry ) = @_;
189              
190             $self->logger->notice("Processing $qentry");
191              
192             # We start with no failure set. It can be changed to either of:
193             # 1. 'soft' - only a failure if the job is found to be overdue
194             # 2. 'hard' - a complete failure
195              
196             my $fail = '';
197             my $error_message;
198              
199             my $job = eval { PkgForge::Job->new_from_qentry($qentry) };
200              
201             if ( !$job || $EVAL_ERROR ) {
202             $fail = 'soft';
203             $error_message = $EVAL_ERROR;
204             $self->log_problem("Failed to load job $qentry, will retry until timeout is reached");
205             }
206              
207             # Check the job is not in the registry. If it is then we will
208             # continue if the status is "incoming".
209              
210             my $exists_but_ok = 0;
211             if ( !$fail ) {
212              
213             if ( $self->registry->job_exists($job) ) {
214             my $status = eval { $self->registry->get_job_status($job) };
215             if ( $status ne 'incoming' ) {
216             $self->log_problem("A job with ID $job has been previously registered");
217             $fail = 'hard';
218             } else {
219             $exists_but_ok = 1;
220             }
221             }
222             }
223              
224             if ( !$fail && !$exists_but_ok ) {
225             my $ok = eval { $self->registry->register_job($job) };
226              
227             if ( !$ok || $EVAL_ERROR ) {
228             $self->log_problem( "Failed to add job $job to registry",
229             $EVAL_ERROR );
230             $fail = 'hard';
231             } else {
232             $self->logger->notice("Registered job $job");
233             }
234              
235             }
236              
237             if ($fail) {
238             if ( $fail eq 'soft' &&
239             $qentry->overdue( $self->wait_for_job ) ) {
240             $fail = 'hard';
241             }
242              
243             if ( $fail eq 'hard' ) {
244             $self->log_problem( "Failed to load job $qentry", $error_message );
245             $self->remove_from_incoming($qentry);
246             }
247              
248             undef $job;
249             }
250              
251             return $job;
252             }
253              
254             sub validate_job {
255             my ( $self, $job ) = @_;
256              
257             my $valid = eval { $job->validate() };
258              
259             if ( !$valid || $EVAL_ERROR ) {
260             $self->log_problem( "Invalid job $job", $EVAL_ERROR );
261             # Packages may still be in transit so not a full fail
262             if ( $job->overdue( $self->wait_for_job ) ) {
263             $self->update_job_status( $job, 'invalid' );
264             $self->remove_from_incoming($job);
265             }
266              
267             return 0;
268             }
269              
270             $self->update_job_status( $job, 'valid' );
271             $self->logger->notice("Validated job $job, will accept");
272              
273             return 1;
274             }
275              
276             sub transfer_job {
277             my ( $self, $job ) = @_;
278              
279             my $out_dir = $self->accepted;
280              
281             my $new_obj = eval { $job->transfer($out_dir) };
282             if ( !$new_obj || $EVAL_ERROR ) { # Full fail, no waiting about
283             $self->log_problem( "Failed to transfer job $job to accepted queue",
284             $EVAL_ERROR );
285              
286             $self->update_job_status( $job, 'failed' );
287             $self->remove_from_incoming($job);
288              
289             return;
290             } else {
291             $self->update_job_status( $job, 'accepted' );
292             $self->logger->notice("Successfully accepted job $job");
293             }
294              
295             return $new_obj;
296             }
297              
298             sub register_tasks {
299             my ( $self, $job ) = @_;
300              
301             my $ok = eval { $self->registry->register_tasks($job) };
302              
303             if ( !$ok || $EVAL_ERROR ) {
304             $self->log_problem( "Failed to add tasks for job $job to registry",
305             $EVAL_ERROR );
306              
307             $self->update_job_status( $job, 'failed' );
308              
309             return 0;
310             } else {
311             $self->logger->notice("Registered tasks for $job");
312             }
313              
314             return 1;
315             }
316              
317             sub update_job_status {
318             my ( $self, $job, $status ) = @_;
319              
320             my $ok = eval { $self->registry->update_job_status( $job, $status ) };
321              
322             if ( !$ok || $EVAL_ERROR ) {
323             $self->log_problem( "Failed to update status for job $job to '$status'",
324             $EVAL_ERROR );
325             $self->remove_from_incoming($job);
326              
327             return 0;
328             } elsif ( $self->debug ) {
329             $self->logger->debug("Updated status for $job to $status");
330             }
331              
332             return 1;
333              
334             }
335              
336             1;
337             __END__
338              
339             =head1 NAME
340              
341             PkgForge::Handler::Incoming - Package Forge handler for the incoming directory
342              
343             =head1 VERSION
344              
345             This documentation refers to PkgForge::Handler::Incoming version 1.1.10
346              
347             =head1 SYNOPSIS
348              
349             use PkgForge::Handler::Incoming;
350              
351             my $handler = PkgForge::Handler::Incoming->new();
352              
353             # or
354             my $handler = PkgForge::Handler::Incoming->new_with_options();
355              
356             # or
357             my $handler = PkgForge::Handler::Incoming->new_with_config();
358              
359             $handler->execute();
360              
361             =head1 DESCRIPTION
362              
363             This Package Forge handler handles the incoming jobs queue. The
364             incoming queue is represented with a L<PkgForge::Queue> and is
365             processed in order of submission time. Any entry which does not look
366             like a job will be erased immediately. Anything else will be loaded as
367             a L<PkgForge::Job> object and validated. If the job is valid the
368             handler will move the job to the accepted queue and register the job
369             for the relevant build daemons.
370              
371             =head1 ATTRIBUTES
372              
373             See L<PkgForge::Handler> for all the attributes inherited by
374             application of that role. This class also adds the following
375             attributes:
376              
377             =over
378              
379             =item wait_for_job
380              
381             The time (in seconds) to wait for a job to be considered fully
382             submitted, the default is 5 minutes. Submitted jobs are considered for
383             acceptance on every pass of the incoming queue made by this
384             handler. If the job appears to be incomplete for any reason this is
385             the length of time the handler will wait for further data to
386             appear. After this time has passed any incomplete job may be erased.
387              
388             =back
389              
390             =head1 SUBROUTINES/METHODS
391              
392             See L<PkgForge::Handler> for all methods inherited from that
393             class. This class implements the required C<execute> method.
394              
395             =over
396              
397             =item execute()
398              
399             =item preflight()
400              
401             Runs through a set of pre-flight checks which must be correct before
402             running the C<execute> method. This is separated out so that it can be
403             called after the handler object is created but before the execution is
404             begun. This is particularly useful for when running as a daemon.
405              
406             =item load_queue()
407              
408             This method scans the incoming build job directory and loads anything
409             found into a L<PkgForge::Queue> object. If it is not possible to scan
410             the directory then this method will die since such a failure renders
411             this handler useless. It will remove anything which does not appear to
412             be a valid build job entry. The queue object will be returned.
413              
414             =item load_job($qentry)
415              
416             This method takes a L<PkgForge::Queue::Entry> object and attempts to
417             convert it into a full L<PkgForge::Job> object. If successfully loaded
418             the job will be registered in the Package Forge registry with a status
419             of C<incoming>. If the loading fails then a period of grace is given
420             in case the files associated with the job are still in transit. If the
421             failure continues to occur after the end of the grace period then the
422             submitted job will be deleted. On success the L<PkgForge::Job> object
423             will be returned, on failure (either temporary or permanent) the undef
424             value be returned.
425              
426             =item validate_job($job)
427              
428             This method takes a L<PkgForge::Job> object and carries out
429             validation. Mostly this validation it to ensure that the submitted job
430             has not been corrupted during the submission process. If successful
431             the job will be marked in the Package Forge registry as C<valid>. If
432             the job is found to be invalid then, in a similar way to the
433             C<load_job> method, a grace period is permitted. If the validation
434             failure continues to occur after the end of the grace period then the
435             submitted job will be deleted and marked in the registry as
436             C<invalid>. On success a true value will be returned, otherwise it
437             will return a false value.
438              
439             Please note that this method does not do any authorization checks.
440              
441             =item transfer_job($job)
442              
443             This method takes a L<PkgForge::Job> object and attempts to transfer
444             the job to the C<accepted> directory. If successful then the job will
445             be marked as C<accepted> in the Package Forge registry and a new
446             L<PkgForge::Object> will be returned which represents the accepted
447             job. If the transfer fails then the submitted job will be deleted and
448             it will be marked in the registry as C<failed>, the method will then
449             return undef.
450              
451             =item register_tasks($job)
452              
453             A convenience wrapper for the method of the same name provided by
454             L<PkgForge::Registry>. Will log errors, returns false on failure and
455             true on success.
456              
457             =item remove_from_incoming($object)
458              
459             This method will take either a L<PkgForge::Queue::Entry> or a
460             L<PkgForge::Job> object. It erases the entire directory holding all
461             files associated with the job, it also kills the object as it no
462             longer has any physical meaning. In each case, the C<scrub> method is
463             called, see the specific documentation for further details.
464              
465             =item update_job_status( $job, $status_name )
466              
467             A convenience wrapper for the method of the same name provided by
468             L<PkgForge::Registry>. Will log errors, returns false on failure and
469             true on success.
470              
471             =back
472              
473             =head1 CONFIGURATION AND ENVIRONMENT
474              
475             By default Package Forge handlers can be configured via the
476             C</etc/pkgforge/handlers.yml> YAML file. This class will also examine
477             the file C</etc/pkgforge/incoming.yml>, if it exists, and settings in
478             that file will have precedence. You can override the path to the
479             configuration file via the C<configfile> attribute.
480              
481             By default, the logging system can be configured via
482             C</etc/pkgforge/incoming.log>. If the file does not exist then the
483             handler will log to stderr.
484              
485             =head1 DEPENDENCIES
486              
487             This module is powered by L<Moose> and also uses L<MooseX::Types>,
488             L<Readonly>.
489              
490             =head1 SEE ALSO
491              
492             L<PkgForge>, L<PkgForge::Handler>, L<PkgForge::Job>,
493             L<PkgForge::Queue>, L<PkgForge::Queue::Entry>
494              
495             =head1 PLATFORMS
496              
497             This is the list of platforms on which we have tested this
498             software. We expect this software to work on any Unix-like platform
499             which is supported by Perl.
500              
501             ScientificLinux5, Fedora13
502              
503             =head1 BUGS AND LIMITATIONS
504              
505             Please report any bugs or problems (or praise!) to bugs@lcfg.org,
506             feedback and patches are also always very welcome.
507              
508             =head1 AUTHOR
509              
510             Stephen Quinney <squinney@inf.ed.ac.uk>
511              
512             =head1 LICENSE AND COPYRIGHT
513              
514             Copyright (C) 201O University of Edinburgh. All rights reserved.
515              
516             This library is free software; you can redistribute it and/or modify
517             it under the terms of the GPL, version 2 or later.
518              
519             =cut