File Coverage

blib/lib/Helios/Panoptes.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Helios::Panoptes;
2            
3 1     1   23408 use 5.008000;
  1         3  
  1         41  
4 1     1   5 use strict;
  1         2  
  1         36  
5 1     1   5 use warnings;
  1         6  
  1         38  
6 1     1   5 use base qw(CGI::Application);
  1         1  
  1         1349  
7 1     1   22068 use Data::Dumper;
  1         16916  
  1         88  
8            
9 1     1   1143 use CGI::Application::Plugin::DBH qw(dbh_config dbh);
  1         1982  
  1         77  
10 1     1   1023 use Error qw(:try);
  1         4138  
  1         6  
11            
12 1     1   702 use Helios::Service;
  0            
  0            
13            
14             our $VERSION = '1.51_3070';
15             our $CONF_PARAMS;
16            
17             =head1 NAME
18            
19             Helios::Panoptes - CGI::Application providing web admin interface to Helios distributed job
20             processing system
21            
22             =head1 DESCRIPTION
23            
24             Helios::Panoptes is the web interface to the Helios distributed job processing system. It
25             provides a central point of control for all of the services and jobs in a Helios collective. This
26             web interface can be used to track jobs through the system and manage workloads on a per service
27             and per host basis. Available workers may be increased or decreased as necessary, tuned to
28             match available resources. Job processing can be held, or Helios daemons can be HALTed, from this
29             interface.
30            
31             (Why I? Check your Wikipedia. Or better yet, Greek dictionary.)
32            
33             =head1 CGI::Application SETUP METHODS
34            
35             =head2 setup()
36            
37             The setup() method defines the available run modes, which include:
38            
39             =over 4
40            
41             =item ctrl_panel
42            
43             The Ctrl Panel is the web interface to the central Helios configuration parameter repository (the
44             helios_params_tb table in the Helios database).
45            
46             =item ctrl_panel_mod
47            
48             Used by the Ctrl Panel and Worker Admin run modes to change configuration parameters in
49             helios_params_tb.
50            
51             =item job_queue_view
52            
53             The Job Queue view provides views of waiting, running, and completed jobs in the Helios collective.
54            
55             =item collective
56            
57             The Collective Admin provides a simple, dashboard-style view of the current services running in
58             the Helios collective, broken down by host. The service class version is displayed, as well
59             as the daemons' uptime. Job processing can be held and unheld here, and the daemons' run modes
60             can be shifted between Normal and Overdrive mode. The maximum workers for each service can be
61             managed, and services can be shut down here via the HALT button.
62            
63             =item job_submit
64            
65             The Submit Job view provides a simple interface to submit a test job to the Helios collective for
66             debugging purposes.
67            
68             =back
69            
70             =cut
71            
72             sub setup {
73             my $self = shift;
74             $self->start_mode('job_queue_view');
75             $self->mode_param('rm');
76             $self->run_modes(
77             ctrl_panel => 'ctrl_panel',
78             ctrl_panel_mod => 'ctrl_panel_mod',
79             job_queue_view => 'job_queue_view',
80             job_detail => 'job_detail',
81             job_submit => 'job_submit',
82             collective => 'collective',
83             );
84            
85             my $inifile;
86             if (defined($ENV{HELIOS_INI}) ) {
87             $inifile = $ENV{HELIOS_INI};
88             } else {
89             $inifile = './helios.ini';
90             }
91             $self->{service} = Helios::Service->new();
92             $self->{service}->prep();
93             my $config = $self->{service}->getConfig();
94             $CONF_PARAMS = $config;
95            
96             # connect to db
97             $self->dbh_config($config->{dsn},$config->{user},$config->{password});
98             }
99            
100             =head2 teardown()
101            
102             The only thing that currently happens in teardown() is the database is disconnected.
103            
104             =cut
105            
106             sub teardown {
107             my $self = shift;
108            
109             $self->dbh->disconnect();
110             }
111            
112             =head1 RUN MODE METHODS
113            
114             These methods define code that back the particular application pages.
115            
116             =head2 ctrl_panel()
117            
118             This method controls the rendering of the Ctrl Panel view, used to display Helios configuration
119             parameters. The view also allows the user to change config parameters, although the actual config
120             modifications are handled by the ctrl_panel_mod() run mode.
121            
122             =cut
123            
124             sub ctrl_panel {
125             my $self = shift;
126             my $dbh = $self->dbh();
127             my $q = $self->query();
128            
129             my $output;
130            
131             my $sql = <
132             SELECT worker_class,
133             host,
134             param,
135             value
136             FROM helios_params_tb
137             ORDER BY worker_class, host, param
138             PNLSQL
139            
140             my $sth = $dbh->prepare($sql);
141             unless($sth) { throw Error::Simple($dbh->errstr); }
142            
143             $sth->execute() or throw Error::Simple($dbh->errstr());
144            
145             my $classes;
146             my $hosts;
147             my $params = [];
148             my $last_host;
149             my $last_class;
150             my $current_host;
151             my $current_class;
152             my $first_result = 1;
153             while (my $result = $sth->fetchrow_hashref() ) {
154             if ($first_result) {
155             $last_class = $result->{worker_class};
156             $last_host = $result->{host};
157             $first_result = 0;
158             }
159             if ($result->{worker_class} ne $last_class) {
160             $current_host->{PARAMS} = $params;
161             $current_host->{HOST} = $last_host;
162             $current_host->{WORKER_CLASS} = $last_class;
163             push(@$hosts, $current_host);
164             undef $params;
165             undef $current_host;
166             $last_host = $result->{host};
167            
168             $current_class->{HOSTS} = $hosts;
169             $current_class->{WORKER_CLASS} = $last_class;
170             push(@$classes, $current_class);
171             undef $hosts;
172             undef $current_class;
173             $last_class = $result->{worker_class};
174             }
175             if ($result->{host} ne $last_host) {
176             $current_host->{PARAMS} = $params;
177             $current_host->{HOST} = $last_host;
178             $current_host->{WORKER_CLASS} = $last_class;
179             push(@$hosts, $current_host);
180             undef $params;
181             undef $current_host;
182             $last_host = $result->{host};
183             }
184            
185             push(@$params, $result);
186             }
187            
188             $current_host->{PARAMS} = $params;
189             $current_host->{HOST} = $last_host;
190             $current_host->{WORKER_CLASS} = $last_class;
191             push(@$hosts, $current_host);
192            
193             $current_class->{HOSTS} = $hosts;
194             $current_class->{WORKER_CLASS} = $last_class;
195             push(@$classes, $current_class);
196            
197             my $tmpl = $self->load_tmpl(undef, die_on_bad_params => 0);
198             $tmpl->param(TITLE => "Helios - Control Panel");
199            
200             # only fill in parameters if we actually have some
201             if ( $classes->[0]->{HOSTS}->[0]->{HOST} ) {
202             $tmpl->param(CLASSES => $classes);
203             }
204             return $tmpl->output();
205             }
206            
207            
208             =head2 ctrl_panel_mod()
209            
210             Run mode used to modify Helios config parameters. Used by ctrl_panel() and collective().
211            
212             The ctrl_panel_mod run mode uses the following parameters:
213            
214             =over 4
215            
216             =item worker_class
217            
218             The worker (service) class of the changed parameter
219            
220             =item host
221            
222             The host of the changed parameter (* for all hosts)
223            
224             =item param
225            
226             THe name of the parameter
227            
228             =item value
229            
230             The value the parameter should be changed to
231            
232             =item action
233            
234             The action (add, modify, delete) to perform. A delete action will delete the param for the worker
235             class and host in question (obviously), add will add it, and modify will replace any existing
236             values of the parameter with the new value.
237            
238             =back
239            
240             =cut
241            
242             sub ctrl_panel_mod {
243             my $self = shift;
244             my $dbh = $self->dbh();
245             my $q = $self->query();
246             my $return_to = $q->param('return_to');
247            
248             my $sql;
249            
250             my $worker_class = $q->param('worker_class');
251             my $host = $q->param('host');
252             my $param = $q->param('param');
253             my $value = $q->param('value');
254             my $action = $q->param('action');
255            
256             unless ($worker_class && $host && $param && $action) {
257             throw Error::Simple("Worker class ($worker_class), host ($host), param ($param), and action ($action) required");
258             }
259            
260             $self->modParam($action, $worker_class, $host, $param, $value);
261            
262             if (defined($return_to)) {
263             if ( defined($q->param('groupby')) ) {
264             print $q->redirect("./panoptes.pl?rm=$return_to&groupby=".$q->param('groupby'));
265             }
266             print $q->redirect("./panoptes.pl?rm=$return_to");
267             } else {
268             print $q->redirect('./panoptes.pl?rm=ctrl_panel');
269             }
270             return 1;
271             }
272            
273            
274             =head2 job_queue_view()
275            
276             The job_queue_view() run mode handles the display of the lists of running, waiting, and completed
277             jobs. Note that although all Job Queue lists are dispatched to here, lists of completed jobs are
278             actually redirected to _job_queue_view_completed().
279            
280             =cut
281            
282             sub job_queue_view {
283             my $self = shift;
284             my $q = $self->query();
285             my $job_status = $q->param('status');
286             my $job_detail = $q->param('job_detail');
287             # "job_queue_view" is actually a basket of views, all sharing the same template
288             if ( defined($job_status) && $job_status eq 'done' && $job_detail) { return $self->_job_queue_view_done($q); }
289             if ( defined($job_status) && $job_status eq 'done' && !$job_detail) { return $self->_job_queue_count_done($q); }
290             if ( !$job_detail ) { return $self->_job_queue_count($q); }
291            
292             my $dbh = $self->dbh();
293             my $now = time();
294             my $funcmap = $self->loadFuncMap();
295             my $output;
296             my $sql;
297             my @where_clauses;
298            
299            
300             # defaults
301             my $time_horizon = 3600;
302             $job_status = 'run';
303            
304             $sql = <
305             SELECT funcid,
306             jobid,
307             arg,
308             uniqkey,
309             insert_time,
310             run_after,
311             grabbed_until,
312             priority,
313             coalesce
314             FROM
315             job j
316             ACTIVEJOBSQL
317            
318             # form values
319             if ( defined($q->param('time')) ) { $time_horizon = $q->param('time'); }
320             if ( defined($q->param('status')) ) { $job_status = $q->param('status'); }
321            
322             SWITCH: {
323             if ($job_status eq 'run') {
324             push(@where_clauses,"grabbed_until != 0");
325             $time_horizon = 'all';
326             last SWITCH;
327             }
328             if ($job_status eq 'wait') {
329             push(@where_clauses,"run_after < $now");
330             push(@where_clauses,"grabbed_until < $now");
331             last SWITCH;
332             }
333             #[] default
334             $time_horizon = 'all';
335             }
336            
337             # time horizon filter
338             if ( defined($time_horizon) && ($time_horizon ne '') && ($time_horizon ne 'all') ) {
339             push(@where_clauses, "run_after > ".($now - $time_horizon) );
340             }
341            
342             # complete WHERE
343             if (scalar(@where_clauses)) {
344             $sql .= " WHERE ". join(' AND ',@where_clauses);
345             }
346            
347             # ORDER BY
348             $sql .= " ORDER BY funcid asc, run_after desc";
349            
350             #t print $q->header();
351             #t print $sql;
352            
353             my $sth = $dbh->prepare($sql);
354             unless($sth) { throw Error::Simple($dbh->errstr); }
355            
356             $sth->execute() or throw Error::Simple($dbh->errstr());
357            
358             my @job_types;
359             my $job_count = 0;
360             my @dbresult;
361             my $current_job_class;
362             my $first_result = 1;
363             my $last_class = undef;
364             while ( my $result = $sth->fetchrow_arrayref() ) {
365             if ($first_result) {
366             $last_class = $result->[0];
367             $first_result = 0;
368             }
369             if ($result->[0] ne $last_class) {
370             push(@job_types, $current_job_class);
371             undef $current_job_class;
372             $last_class = $result->[0];
373             $job_count = 0;
374             }
375            
376             my $date_parts = $self->splitEpochDate($result->[5]);
377             my $grabbed_until = $self->splitEpochDate($result->[6]);
378             $current_job_class->{JOB_CLASS} = $funcmap->{$result->[0]};
379             $current_job_class->{JOB_COUNT} = ++$job_count;
380             push(@{ $current_job_class->{JOBS} },
381             { JOBID => $result->[1],
382             # ARG => $result->[2],
383             UNIQKEY => $result->[3],
384             INSERT_TIME => $result->[4],
385             RUN_AFTER => $date_parts->{YYYY}.'-'.$date_parts->{MM}.'-'.$date_parts->{DD}.' '.$date_parts->{HH24}.':'.$date_parts->{MI}.':'.$date_parts->{SS},
386             GRABBED_UNTIL => $grabbed_until->{YYYY}.'-'.$grabbed_until->{MM}.'-'.$grabbed_until->{DD}.' '.$grabbed_until->{HH24}.':'.$grabbed_until->{MI}.':'.$grabbed_until->{SS},
387             PRIORITY => $result->[7],
388             COALESCE => $result->[8]
389             });
390             }
391             push(@job_types, $current_job_class);
392            
393             my $tmpl = $self->load_tmpl(undef, die_on_bad_params => 0);
394             $tmpl->param(TITLE => "Helios - Job Queue");
395             $tmpl->param("STATUS_".$job_status, 1);
396             $tmpl->param("TIME_".$time_horizon, 1);
397             $tmpl->param("JOB_DETAIL_CHECKED" => 1);
398             $tmpl->param(JOB_CLASSES => \@job_types);
399             return $tmpl->output();
400             }
401            
402            
403             =head2 _job_queue_view_done()
404            
405             This method is called from job_queue_view() to deal with displaying completed jobs, which pulls
406             completed job data from helios_job_history_tb instead of the job table.
407            
408             =cut
409            
410             sub _job_queue_view_done {
411             my $self = shift;
412             my $q = shift;
413             my $dbh = $self->dbh();
414             my $now = time();
415             my $funcmap = $self->loadFuncMap();
416             my $job_status;
417             my $output;
418             my $sql;
419             my @where_clauses;
420            
421             # defaults
422             my $time_horizon = 3600;
423            
424             $sql = '
425             select *
426             from (select
427             if (@jid = jobid,
428             if (@time = complete_time,
429             @rnk := @rnk + least(0, @inc := @inc + 1),
430             @rnk := @rnk + greatest(@inc, @inc := 1)
431             + least(0, @time := complete_time)
432             ),
433             @rnk := 1 + least(0, @jid := jobid)
434             + least(0, @time := complete_time)
435             + least(0, @inc := 1)
436             ) rank,
437             jobid,
438             funcid,
439             run_after,
440             grabbed_until,
441             exitstatus,
442             complete_time
443             from helios_job_history_tb,
444             (select (@jid := 0)) as x
445             where complete_time >= ?
446             order by jobid, complete_time desc
447             ) as y
448             where rank < 2
449             order by funcid asc, complete_time desc
450             ';
451            
452             # form values
453             if ( defined($q->param('time')) ) { $time_horizon = $q->param('time'); }
454             if ( defined($q->param('time')) && ($q->param('time') eq '') ) { $time_horizon = 3600; }
455             if ( defined($q->param('status')) ) { $job_status = $q->param('status'); }
456            
457             #t print $q->header();
458             #t print $sql,"
\n";
459             #t print $now - $time_horizon,"
\n";
460            
461             my $sth = $dbh->prepare($sql);
462             unless($sth) { throw Error::Simple($dbh->errstr); }
463            
464             $sth->execute($now - $time_horizon) or throw Error::Simple($dbh->errstr());
465            
466             my @job_types;
467             my $job_count = 0;
468             my $job_count_failed = 0;
469             my @dbresult;
470             my $current_job_class;
471             my $first_result = 1;
472             my $last_class = undef;
473             my $last_jobid = 0;
474             while ( my $result = $sth->fetchrow_hashref() ) {
475             # print join("|",($result->{rank},$result->{funcid},$result->{jobid},$result->{complete_time},$result->{run_after},$result->{exitstatus},$result->{run_after},$result->{grabbed_until})),"
\n"; #p
476             if ($first_result) {
477             $last_class = $result->{funcid};
478             $first_result = 0;
479             }
480             if ($result->{funcid} ne $last_class) {
481             push(@job_types, $current_job_class);
482             undef $current_job_class;
483             $last_class = $result->{funcid};
484             $job_count = 0;
485             $job_count_failed = 0;
486             }
487            
488             my $date_parts = $self->splitEpochDate($result->{run_after});
489             my $complete_time = $self->splitEpochDate($result->{complete_time});
490             $current_job_class->{JOB_CLASS} = $funcmap->{$result->{funcid}};
491             $current_job_class->{JOB_COUNT} = ++$job_count;
492             if ( $result->{exitstatus} != 0 ) { $current_job_class->{JOB_COUNT_FAILED} = ++$job_count_failed; }
493             # if this jobid is the same as the last, that means it was a failure that was retried
494             # dump it, because we've already added the final completion of the job (whether success or fail)
495             # because we sorted "jobid, complete_time desc"
496             push(@{ $current_job_class->{JOBS} },
497             { JOBID => $result->{jobid},
498             # ARG => $result->[2],
499             RUN_AFTER => $date_parts->{YYYY}.'-'.$date_parts->{MM}.'-'.$date_parts->{DD}.' '.$date_parts->{HH24}.':'.$date_parts->{MI}.':'.$date_parts->{SS},
500             GRABBED_UNTIL => $result->{grabbed_until},
501             COMPLETE_TIME => $complete_time->{YYYY}.'-'.$complete_time->{MM}.'-'.$complete_time->{DD}.' '.$complete_time->{HH24}.':'.$complete_time->{MI}.':'.$complete_time->{SS},
502             EXITSTATUS => $result->{exitstatus}
503             });
504             $last_jobid = $result->{jobid};
505             }
506             push(@job_types, $current_job_class);
507             @job_types = sort { $a->{JOB_CLASS} cmp $b->{JOB_CLASS} } @job_types;
508             my $tmpl = $self->load_tmpl('job_queue_view.html', die_on_bad_params => 0);
509             $tmpl->param(TITLE => 'Helios - Job Queue');
510             $tmpl->param("STATUS_".$job_status, 1);
511             $tmpl->param("TIME_".$time_horizon, 1);
512             $tmpl->param("JOB_DETAIL_CHECKED" => 1);
513             $tmpl->param(JOB_CLASSES => \@job_types);
514             return $tmpl->output();
515             }
516            
517            
518             =head2 job_queue_count()
519            
520             This method will handle a job queue view that displays only counts.
521            
522             =cut
523            
524             sub _job_queue_count {
525             my $self = shift;
526             my $q = $self->query();
527             my $job_status = $q->param('status');
528            
529             my $dbh = $self->dbh();
530             my $now = time();
531             my $funcmap = $self->loadFuncMap();
532             my $output;
533             my $sql;
534             my @where_clauses;
535            
536             # defaults
537             my $time_horizon = 3600;
538             $job_status = 'run';
539            
540             # form values
541             if ( defined($q->param('time')) ) { $time_horizon = $q->param('time'); }
542             if ( defined($q->param('status')) ) { $job_status = $q->param('status'); }
543            
544             $sql = "SELECT funcid, count(*) FROM job ";
545            
546             SWITCH: {
547             if ($job_status eq 'run') {
548             push(@where_clauses,"grabbed_until != 0");
549             $time_horizon = 'all';
550             last SWITCH;
551             }
552             if ($job_status eq 'wait') {
553             push(@where_clauses,"run_after < $now");
554             push(@where_clauses,"grabbed_until < $now");
555             last SWITCH;
556             }
557             # default
558             $time_horizon = 'all';
559             }
560            
561             # time horizon filter
562             if ( defined($time_horizon) && ($time_horizon ne '') && ($time_horizon ne 'all') ) {
563             push(@where_clauses, "run_after > ".($now - $time_horizon) );
564             }
565            
566             # complete WHERE
567             if (scalar(@where_clauses)) {
568             $sql .= " WHERE ". join(' AND ',@where_clauses);
569             }
570            
571             # GROUP BY
572             $sql .= " GROUP BY funcid";
573            
574             # ORDER BY
575             $sql .= " ORDER BY funcid asc";
576            
577             my $sth = $dbh->prepare($sql);
578             unless($sth) { throw Error::Simple($dbh->errstr); }
579            
580             $sth->execute() or throw Error::Simple($dbh->errstr());
581            
582             my @job_types;
583             my $job_count = 0;
584             my @dbresult;
585             my $current_job_class;
586             my $first_result = 1;
587             my $last_class = undef;
588             while ( my $result = $sth->fetchrow_arrayref() ) {
589             if ($first_result) {
590             $last_class = $result->[0];
591             $first_result = 0;
592             }
593             if ($result->[0] ne $last_class) {
594             push(@job_types, $current_job_class);
595             undef $current_job_class;
596             $last_class = $result->[0];
597             $job_count = 0;
598             }
599            
600             $current_job_class->{JOB_CLASS} = $funcmap->{$result->[0]};
601             $current_job_class->{JOB_COUNT} = $result->[1];
602             }
603             push(@job_types, $current_job_class);
604            
605             my $tmpl = $self->load_tmpl('job_queue_view.html', die_on_bad_params => 0);
606             $tmpl->param(TITLE => "Helios - Job Queue");
607             $tmpl->param("STATUS_".$job_status, 1);
608             $tmpl->param("TIME_".$time_horizon, 1);
609             $tmpl->param("JOB_DETAIL_CHECKED" => 0);
610             $tmpl->param(JOB_CLASSES => \@job_types);
611             return $tmpl->output();
612             }
613            
614            
615             sub _job_queue_count_done {
616             my $self = shift;
617             my $q = shift;
618             my $dbh = $self->dbh();
619             my $now = time();
620             my $funcmap = $self->loadFuncMap();
621             my $job_status;
622             my $output;
623             my $sql;
624             my @where_clauses;
625            
626             # defaults
627             my $time_horizon = 3600;
628            
629             $sql = '
630             select funcid, if(exitstatus,1,0) as exitstatus, count(*) as count
631             from (select
632             if (@jid = jobid,
633             if (@time = complete_time,
634             @rnk := @rnk + least(0, @inc := @inc + 1),
635             @rnk := @rnk + greatest(@inc, @inc := 1)
636             + least(0, @time := complete_time)
637             ),
638             @rnk := 1 + least(0, @jid := jobid)
639             + least(0, @time := complete_time)
640             + least(0, @inc := 1)
641             ) rank,
642             jobid,
643             funcid,
644             run_after,
645             exitstatus,
646             complete_time
647             from helios_job_history_tb,
648             (select (@jid := 0)) as x
649             where complete_time >= ?
650             order by jobid, complete_time desc
651             ) as y
652             where rank < 2
653             GROUP BY funcid, if(exitstatus,1,0)
654             ';
655            
656             # form values
657             if ( defined($q->param('time')) ) { $time_horizon = $q->param('time'); }
658             if ( defined($q->param('time')) && ($q->param('time') eq '') ) { $time_horizon = 3600; }
659             if ( defined($q->param('status')) ) { $job_status = $q->param('status'); }
660            
661             #t print $q->header();
662             #t print $sql,"
\n";
663             #t print $now - $time_horizon,"
\n";
664            
665             my $sth = $dbh->prepare($sql);
666             unless($sth) { throw Error::Simple($dbh->errstr); }
667            
668             $sth->execute($now - $time_horizon) or throw Error::Simple($dbh->errstr());
669            
670             my @job_types;
671             my $current_funcid;
672             my $current_success_jobs = 0;
673             my $current_failed_jobs = 0;
674             my $last_funcid;
675             my $first_result = 1;
676            
677             while ( my $result = $sth->fetchrow_arrayref() ) {
678             # print join("|",@$result),"
\n"; #p
679             if ($first_result) {
680             $last_funcid = $result->[0];
681             $current_funcid = $result->[0];
682             $first_result = 0;
683             }
684            
685             if ($current_funcid ne $result->[0] ) {
686             # flush
687             push(@job_types, { JOB_CLASS => $funcmap->{$current_funcid},
688             JOB_COUNT => $current_success_jobs + $current_failed_jobs,
689             JOB_COUNT_FAILED => $current_failed_jobs
690             }
691             );
692             $last_funcid = $result->[0];
693             $current_success_jobs = 0;
694             $current_failed_jobs = 0;
695             }
696             $current_funcid = $result->[0];
697             if ($result->[1] == 0) {
698             $current_success_jobs = $result->[2];
699             } else {
700             $current_failed_jobs = $result->[2];
701             }
702             }
703             push(@job_types, { JOB_CLASS => $funcmap->{$current_funcid},
704             JOB_COUNT => $current_success_jobs + $current_failed_jobs,
705             JOB_COUNT_FAILED => $current_failed_jobs
706             }
707             );
708             @job_types = sort { $a->{JOB_CLASS} cmp $b->{JOB_CLASS} } @job_types;
709            
710             my $tmpl = $self->load_tmpl('job_queue_view.html', die_on_bad_params => 0);
711             $tmpl->param(TITLE => 'Helios - Job Queue');
712             $tmpl->param("STATUS_".$job_status, 1);
713             $tmpl->param("TIME_".$time_horizon, 1);
714             $tmpl->param("JOB_DETAIL_CHECKED" => 0);
715             $tmpl->param(JOB_CLASSES => \@job_types);
716             return $tmpl->output();
717             }
718            
719            
720            
721             =head2 job_submit()
722            
723             The job_submit() run mode allows for manual submission of a job to the Helios collective via
724             Panoptes. This run mode is useful mainly for debugging purposes.
725            
726             This run mode uses the Helios web job submission interface (submitJob.pl) and requires the
727             job_submit_url option being set in the [global] section of helios.ini so it can submit the job to
728             the appropriate URL. eg:
729            
730             [global]
731             job_submit_url=http://localhost/cgi-bin/submitJob.pl
732            
733             This run mode is really just displaying a form with some of the details filled in. Once you
734             submit the form, the response you receive is actually the response returned from submitJob.pl. If
735             job submission was successful, this is normally a file of type text/xml with a section
736             (normally containing 0) and a section (containing the id of the job just submitted). If
737             there was an error during submission, the response will be an HTTP error, and submitJob.pl will
738             log a message in the Helios log.
739            
740             =cut
741            
742             sub job_submit {
743             my $self = shift;
744             my $q = $self->query();
745            
746             my $classmap = $self->loadClassMap();
747            
748             my @classes;
749             foreach (sort keys %$classmap) {
750             push(@classes, { job_type => $_, job_class => $classmap->{$_} });
751             }
752            
753            
754             my $tmpl = $self->load_tmpl('job_submit.html', die_on_bad_params => 0);
755             $tmpl->param(TITLE => 'Helios - Job Submit');
756             $tmpl->param(CLASSES => \@classes);
757             $tmpl->param(JOB_SUBMIT_URL => $CONF_PARAMS->{job_submit_url});
758             return $tmpl->output();
759             }
760            
761            
762             =head2 collective()
763            
764             The collective() run mode provides the Collective display, a list of what Helios service daemons
765             are running in the collective, with some limited convenience controls for admins that don't want
766             to deal with the Ctrl Panel.
767            
768             The collective() method actually reads the groupby CGI parameter and dispatches to
769             _collective_host() or _collective_service(), depending on which the user specified (the default
770             is service).
771            
772             =cut
773            
774             sub collective {
775             my $self = shift;
776             my $q = $self->query();
777             if ($q->param('groupby') eq 'service') {
778             return $self->_collective_service();
779             } else {
780             return $self->_collective_host();
781             }
782             }
783            
784            
785             =head2 _collective_host()
786            
787             The _collective_host() method provides the Collective display grouped by host.
788            
789             =cut
790            
791             sub _collective_host {
792             my $self = shift;
793             my $dbh = $self->dbh();
794             my $q = $self->query();
795             my $config = $self->loadParams('host');
796            
797             my $register_threshold = time() - 360;
798            
799             my $sql = <
800             SELECT host,
801             worker_class,
802             worker_version,
803             process_id,
804             register_time,
805             start_time
806             FROM helios_worker_registry_tb
807             WHERE register_time > ?
808             ORDER BY host, worker_class
809             STATUSSQL
810            
811             my $sth = $dbh->prepare($sql);
812             unless ($sth) { throw Error::Simple($dbh->errstr); }
813            
814             $sth->execute($register_threshold) or throw Error::Simple($dbh->errstr());
815            
816             my @collective;
817             my @dbresult;
818             my $current_host;
819             my $first_result = 1;
820             my $last_host = undef;
821             while ( my $result = $sth->fetchrow_arrayref() ) {
822             if ($first_result) {
823             $last_host = $result->[0];
824             $first_result = 0;
825             }
826             if ($result->[0] ne $last_host) {
827             push(@collective, $current_host);
828             undef $current_host;
829             $last_host = $result->[0];
830             }
831            
832             my $date_parts = $self->splitEpochDate($result->[4]);
833             $current_host->{HOST} = $result->[0];
834            
835             # calc uptime
836             my $uptime_string = '';
837             {
838             use integer;
839             my $uptime = time() - $result->[5];
840             my $uptime_days = $uptime/86400;
841             my $uptime_hours = ($uptime % 86400)/3600;
842             my $uptime_mins = (($uptime % 86400) % 3600)/60;
843             if ($uptime_days != 0) { $uptime_string .= $uptime_days.'d '; }
844             if ($uptime_hours != 0) { $uptime_string .= $uptime_hours.'h '; }
845             if ($uptime_mins != 0) { $uptime_string .= $uptime_mins.'m '; }
846             }
847            
848             # max_workers
849             my $max_workers = 1;
850             if ( defined($config->{ $result->[0] }->{ $result->[1] }->{MAX_WORKERS}) ) {
851             $max_workers = $config->{ $result->[0] }->{ $result->[1] }->{MAX_WORKERS};
852             }
853            
854             # figure out status (normal/overdrive/holding/halting)
855             my $status;
856             my $halt_status = 0;
857             my $hold_status = 0;
858             my $overdrive_status = 0;
859             if ( (defined( $config->{$result->[0] }->{ $result->[1] }->{OVERDRIVE}) && ($config->{$result->[0] }->{ $result->[1] }->{OVERDRIVE} == 1) ) ||
860             (defined( $config->{'*'}->{ $result->[1] }->{OVERDRIVE}) && ($config->{'*'}->{ $result->[1] }->{OVERDRIVE} == 1) ) ) {
861             $overdrive_status = 1;
862             $status = "Overdrive";
863             }
864             if ( (defined( $config->{$result->[0] }->{ $result->[1] }->{HOLD}) && ($config->{$result->[0] }->{ $result->[1] }->{HOLD} == 1) ) ||
865             (defined( $config->{'*'}->{ $result->[1] }->{HOLD}) && ($config->{'*'}->{ $result->[1] }->{HOLD} == 1) ) ) {
866             $hold_status = 1;
867             $status = "HOLDING";
868             }
869             if ( defined( $config->{$result->[0] }->{ $result->[1] }->{HALT}) ||
870             defined( $config->{'*'}->{ $result->[1] }->{HALT}) ) {
871             $halt_status = 1;
872             $status = "HALTING";
873             }
874             push(@{ $current_host->{SERVICES} },
875             { HOST => $result->[0],
876             SERVICE_CLASS => $result->[1],
877             SERVICE_VERSION => $result->[2],
878             PROCESS_ID => $result->[3],
879             REGISTER_TIME => $date_parts->{YYYY}.'-'.$date_parts->{MM}.'-'.$date_parts->{DD}.' '.$date_parts->{HH24}.':'.$date_parts->{MI}.':'.$date_parts->{SS},
880             UPTIME => $uptime_string,
881             STATUS => $status,
882             MAX_WORKERS => $max_workers,
883             OVERDRIVE => $overdrive_status,
884             HOLDING => $hold_status,
885             HALTING => $halt_status,
886             });
887             }
888             push(@collective, $current_host);
889            
890             my $tmpl = $self->load_tmpl('collective_host.html', die_on_bad_params => 0);
891             $tmpl->param(TITLE => 'Helios - Collective View');
892             $tmpl->param(COLLECTIVE => \@collective);
893             return $tmpl->output();
894             }
895            
896            
897             =head2 _collective_service()
898            
899             The _collective_service() method provides the Collective display grouped by service.
900            
901             =cut
902            
903             sub _collective_service {
904             my $self = shift;
905             my $dbh = $self->dbh();
906             my $q = $self->query();
907             my $config = $self->loadParams('worker_class');
908            
909             my $register_threshold = time() - 360;
910            
911             my $sql = <
912             SELECT worker_class AS service,
913             host,
914             worker_version AS version,
915             process_id,
916             register_time,
917             start_time
918             FROM helios_worker_registry_tb
919             WHERE register_time > ?
920             ORDER BY service, host
921             STATUSSQL
922            
923             my $sth = $dbh->prepare($sql);
924             unless ($sth) { throw Error::Simple($dbh->errstr); }
925            
926             $sth->execute($register_threshold) or throw Error::Simple($dbh->errstr());
927            
928             my @collective;
929             my @dbresult;
930             my $current_service;
931             my $first_result = 1;
932             my $last_service = undef;
933             #t print $q->header();
934             while ( my $result = $sth->fetchrow_arrayref() ) {
935             #t print join("|", @$result),"
\n";
936             if ($first_result) {
937             $last_service = $result->[0];
938             $first_result = 0;
939             }
940             if ($result->[0] ne $last_service) {
941             push(@collective, $current_service);
942             undef $current_service;
943             $last_service = $result->[0];
944             }
945            
946             my $date_parts = $self->splitEpochDate($result->[4]);
947             $current_service->{SERVICE_CLASS} = $result->[0];
948            
949             # calc uptime
950             my $uptime_string = '';
951             {
952             use integer;
953             my $uptime = time() - $result->[5];
954             my $uptime_days = $uptime/86400;
955             my $uptime_hours = ($uptime % 86400)/3600;
956             my $uptime_mins = (($uptime % 86400) % 3600)/60;
957             if ($uptime_days != 0) { $uptime_string .= $uptime_days.'d '; }
958             if ($uptime_hours != 0) { $uptime_string .= $uptime_hours.'h '; }
959             if ($uptime_mins != 0) { $uptime_string .= $uptime_mins.'m '; }
960             }
961            
962             # max_workers
963             my $max_workers = 1;
964             if ( defined($config->{ $result->[0] }->{ '*' }->{MAX_WORKERS}) ) {
965             $max_workers = $config->{ $result->[0] }->{ '*' }->{MAX_WORKERS};
966             }
967             if ( defined($config->{ $result->[0] }->{ $result->[1] }->{MAX_WORKERS}) ) {
968             $max_workers = $config->{ $result->[0] }->{ $result->[1] }->{MAX_WORKERS};
969             }
970            
971             # figure out status (normal/overdrive/holding/halting)
972             my $status;
973             my $halt_status = 0;
974             my $hold_status = 0;
975             my $overdrive_status = 0;
976             # determine overdrive status
977             if ( defined($config->{$result->[0]}->{'*'}->{OVERDRIVE}) && ($config->{$result->[0]}->{'*'}->{OVERDRIVE} == 1) ) {
978             $overdrive_status = 1;
979             }
980             if ( defined($config->{$result->[0]}->{$result->[1]}->{OVERDRIVE}) ) {
981             $overdrive_status = $config->{$result->[0]}->{$result->[1]}->{OVERDRIVE};
982             }
983            
984             # determine holding status
985             if ( defined($config->{$result->[0]}->{'*'}->{HOLD}) && ($config->{$result->[0]}->{'*'}->{HOLD} == 1) ) {
986             $hold_status = 1;
987             }
988             if ( defined($config->{$result->[0]}->{$result->[1]}->{HOLD}) ) {
989             $hold_status = $config->{$result->[0]}->{$result->[1]}->{HOLD};
990             }
991            
992            
993             # determine halt status; if it's even defined, that means the service instance is HALTing
994             if ( defined( $config->{$result->[0] }->{ '*' }->{HALT}) ||
995             defined( $config->{$result->[0]}->{ $result->[1] }->{HALT}) ) {
996             $halt_status = 1;
997             $status = "HALTING";
998             }
999             push(@{ $current_service->{HOSTS} },
1000             { HOST => $result->[1],
1001             SERVICE_CLASS => $result->[0],
1002             SERVICE_VERSION => $result->[2],
1003             PROCESS_ID => $result->[3],
1004             REGISTER_TIME => $date_parts->{YYYY}.'-'.$date_parts->{MM}.'-'.$date_parts->{DD}.' '.$date_parts->{HH24}.':'.$date_parts->{MI}.':'.$date_parts->{SS},
1005             UPTIME => $uptime_string,
1006             STATUS => $status,
1007             MAX_WORKERS => $max_workers,
1008             OVERDRIVE => $overdrive_status,
1009             HOLDING => $hold_status,
1010             HALTING => $halt_status,
1011             });
1012             }
1013             push(@collective, $current_service);
1014            
1015             my $tmpl = $self->load_tmpl('collective_service.html', die_on_bad_params => 0);
1016             $tmpl->param(TITLE => 'Helios - Collective View');
1017             $tmpl->param(COLLECTIVE => \@collective);
1018             return $tmpl->output();
1019             }
1020            
1021            
1022            
1023            
1024             =head1 OTHER METHODS
1025            
1026             These are auxiliary/utility methods used by the run mode methods.
1027            
1028             =head2 splitEpochDate($epoch_seconds)
1029            
1030             Given a datetime in epoch seconds, this method returns a hashref containing the component date
1031             parts. The keys of the hash follow Oracle naming conventions because that is what the author was
1032             most familiar with:
1033            
1034             YYYY four-digit year
1035             MM two-digit month
1036             DD two-digit day
1037             HH twenty-four hour
1038             HH12 twelve hour
1039             HH24 twenty-four hour
1040             MI two-digit minutes
1041             SS two-digit seconds
1042             AMPM ante/post meridium
1043            
1044             =cut
1045            
1046             sub splitEpochDate {
1047             my $self = shift;
1048             my $epoch_secs = shift;
1049            
1050             my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($epoch_secs);
1051            
1052             my $return_date;
1053            
1054             $return_date->{YYYY} = sprintf("%04d", $year + 1900);
1055             $return_date->{MM} = sprintf("%02d", $mon+1);
1056             $return_date->{DD} = sprintf("%02d", $mday);
1057             $return_date->{MI} = sprintf("%02d", $min);
1058             $return_date->{SS} = sprintf("%02d", $sec);
1059            
1060             # hours
1061             if ($hour == 12) {
1062             $return_date->{AMPM} = 'PM';
1063             $return_date->{HH12} = '12';
1064             $return_date->{HH24} = '12';
1065             $return_date->{HH} = '12';
1066             } elsif ($hour == 0) {
1067             $return_date->{AMPM} = 'AM';
1068             $return_date->{HH12} = '12';
1069             $return_date->{HH24} = '00';
1070             $return_date->{HH} = '12';
1071             } elsif ($hour > 12) {
1072             $return_date->{AMPM} = 'PM';
1073             $return_date->{HH12} = sprintf("%02d", $hour - 12);
1074             $return_date->{HH24} = sprintf("%02d", $hour);
1075             $return_date->{HH} = sprintf("%02d", $hour);
1076             } else {
1077             # hour is AM
1078             $return_date->{AMPM} = 'AM';
1079             $return_date->{HH12} = sprintf("%02d", $hour);
1080             $return_date->{HH24} = sprintf("%02d", $hour);
1081             $return_date->{HH} = sprintf("%02d", $hour);
1082             }
1083            
1084             return $return_date;
1085             }
1086            
1087            
1088             =head2 loadClassMap([$keyfield])
1089            
1090             Load the contents of the helios_class_map table into memory, returning it as a hashref. If
1091             $keyfield is specified (job_type, job_class), it will be the key field in the hash. If
1092             $keyfield is not specified, job_type is the default.
1093            
1094             =cut
1095            
1096             sub loadClassMap {
1097             my $self = shift;
1098             my $keyfield = @_ ? shift : 'job_type';
1099             my $dbh = $self->dbh();
1100            
1101             my $classmap;
1102             my $sql = "SELECT job_type, job_class FROM helios_class_map ";
1103            
1104             my $sth = $dbh->prepare($sql);
1105             unless($sth) { throw Error::Simple($dbh->errstr); }
1106            
1107             $sth->execute() or throw Error::Simple($dbh->errstr());
1108            
1109             while (my $result = $sth->fetchrow_arrayref() ) {
1110             if ($keyfield eq 'job_class') {
1111             $classmap->{$result->[1]} = $result->[0];
1112             } else {
1113             $classmap->{$result->[0]} = $result->[1];
1114             }
1115             }
1116             return $classmap;
1117             }
1118            
1119            
1120             =head2 loadFuncMap
1121            
1122             Load the contents of the funcmap table into memory, because it's small and lots of methods
1123             need it. THe funcmap table associates a funcid with a service class name for internal purposes.
1124            
1125             =cut
1126            
1127             sub loadFuncMap {
1128             my $self = shift;
1129             my $dbh = $self->dbh();
1130            
1131             my $funcmap;
1132            
1133             my $sql = "SELECT funcid, funcname FROM funcmap";
1134             my $sth = $dbh->prepare($sql);
1135             unless($sth) { throw Error::Simple($dbh->errstr); }
1136            
1137             $sth->execute() or throw Error::Simple($dbh->errstr());
1138            
1139             while (my $result = $sth->fetchrow_arrayref() ) {
1140             $funcmap->{$result->[0]} = $result->[1];
1141             $funcmap->{$result->[1]} = $result->[0];
1142             }
1143             return $funcmap;
1144             }
1145            
1146            
1147             =head2 loadParams([$keyfield])
1148            
1149             Returns a hashref data structure containing all of the Helios config params. The keyfield can be
1150             either 'worker_class' or 'host' ('worker_class' is the default).
1151            
1152             =cut
1153            
1154             sub loadParams {
1155             my $self = shift;
1156             my $keyfield = @_ ? shift : 'worker_class';
1157             my $dbh = $self->dbh();
1158             $dbh->{FetchHashKeyName} = 'NAME_lc';
1159             my $keyfield2;
1160             my $config;
1161            
1162             if ($keyfield eq 'worker_class') {
1163             $keyfield2 = 'host';
1164             } else {
1165             $keyfield2 = 'worker_class';
1166             }
1167            
1168             my $sql = "SELECT $keyfield, $keyfield2, param, value FROM helios_params_tb ORDER BY $keyfield, $keyfield2";
1169             my $sth = $dbh->prepare($sql);
1170             unless ($sth) { throw Error::Simple($dbh->errstr()); }
1171             $sth->execute() or throw Error::Simple($dbh->errstr());
1172            
1173             while(my $result = $sth->fetchrow_arrayref()) {
1174             $config->{ $result->[0] }->{ $result->[1] }->{ $result->[2] } = $result->[3];
1175             }
1176            
1177             return $config;
1178             }
1179            
1180            
1181             =head2 modParam($action, $worker_class, $host, $param, [$value])
1182            
1183             Modify Helios config parameters. Used by ctrl_panel() and collective() displays.
1184            
1185             Valid values for $action:
1186            
1187             =over 4
1188            
1189             =item add
1190            
1191             Add the given parameter for the given class and host
1192            
1193             =item delete
1194            
1195             Delete the given parameter for the given class and host
1196            
1197             =item modify
1198            
1199             Modify the given parameter for the given class and host with a new value. Effectively the same
1200             as a delete followed by an add.
1201            
1202             =back
1203            
1204             Worker class is the name of the class.
1205            
1206             Host is the name of the host. Use '*' to make the parameter global to all instances of the worker
1207             class.
1208            
1209             Returns a true value if successful and throws an Error::Simple exception otherwise.
1210            
1211             =cut
1212            
1213             sub modParam {
1214             my $self = shift;
1215             my $dbh = $self->dbh();
1216             my $action = shift;
1217             my $worker_class = shift;
1218             my $host = shift;
1219             my $param = shift;
1220             my $value = shift;
1221            
1222             my $sql;
1223            
1224             unless ($worker_class && $host && $param && $action) {
1225             throw Error::Simple("Worker class ($worker_class), host ($host), param ($param), and action ($action) required");
1226             }
1227            
1228             SWITCH: {
1229             if ($action eq 'add') {
1230             $sql = 'INSERT INTO helios_params_tb (host, worker_class, param, value) VALUES (?,?,?,?)';
1231             $dbh->do($sql, undef, $host, $worker_class, $param, $value) or throw Error::Simple('modParam add FAILURE: '.$dbh->errstr);
1232             last SWITCH;
1233             }
1234             if ($action eq 'delete') {
1235             $sql = 'DELETE FROM helios_params_tb WHERE host = ? AND worker_class = ? AND param = ?';
1236             $dbh->do($sql, undef, $host, $worker_class, $param) or throw Error::Simple('modParam delete FAILURE: '.$dbh->errstr);
1237             last SWITCH;
1238             }
1239             if ($action eq 'modify') {
1240             $self->modParam('delete', $worker_class, $host, $param);
1241             $self->modParam('add', $worker_class, $host, $param, $value);
1242             last SWITCH;
1243             }
1244             throw Error::Simple("modParam invalid action: $action");
1245             }
1246            
1247             return 1;
1248             }
1249            
1250            
1251            
1252            
1253             1;
1254             __END__