File Coverage

blib/lib/Helios/TS.pm
Criterion Covered Total %
statement 12 12 100.0
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 16 16 100.0


line stmt bran cond sub pod time code
1             package Helios::TS;
2              
3 1     1   15 use 5.008;
  1         2  
  1         30  
4 1     1   4 use strict;
  1         1  
  1         25  
5 1     1   4 use warnings;
  1         1  
  1         30  
6 1     1   4 use base qw(TheSchwartz);
  1         1  
  1         532  
7             use fields qw(active_worker_class); # new fields for this subclass
8             use Carp qw( croak );
9             use List::Util qw( shuffle );
10             use Helios::TS::Job;
11              
12             use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
13              
14             our $VERSION = '2.80';
15              
16             # FILE CHANGE HISTORY
17             # (This code is modified from the original TheSchwartz.pm where noted.)
18             # [LH] [2012-07-11]: driver_for(): Changed driver creation to use Helios driver
19             # to cache database connections.
20             # [LH] [2013-09-21]: find_job_for_workers(): Added code to enable job
21             # prioritization.
22             # [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without
23             # actual TheSchwartz::Worker subclasses to back them up. Switched to using
24             # Helios::TS::Job instead of base TheSchwartz::Job because of this.
25             # [LH] [2013-10-04]: work_once(): Commented out call to
26             # temporarily_remove_ability() because we do not think the issue it solves is
27             # a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a()
28             # is supposed to solve, and we're not sure MySQL indexes do anymore either).
29             # [LH] [2013-10-04]: Fix for Helios bug [RT79690], which appears to be a DBD
30             # problem where a LOB becomes unbound in a query.
31             # [LH] [2013-11-24]: Removed old code already commented out.
32              
33             our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
34             our $T_LOST_RACE;
35             our $FIND_JOB_BATCH_SIZE = 50;
36              
37             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
38             # [LH] [2013-10-04]: Virtual jobtypes: Helios::TS->{active_worker_class}
39             # attribute and accessors for it.
40             sub new {
41             my $class = shift;
42             my %params = @_;
43             my $self = fields::new($class);
44             $self->SUPER::new(@_); # init base fields
45             if ( defined($params{active_worker_class})) {
46             $self->{active_worker_class} = $params{active_worker_class};
47             }
48             return $self;
49             }
50              
51             sub active_worker_class {
52             my Helios::TS $hts = shift;
53             return $hts->{active_worker_class};
54             }
55             sub set_active_worker_class {
56             my Helios::TS $hts = shift;
57             $hts->{active_worker_class} = shift;
58             }
59             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
60              
61              
62             sub driver_for {
63             my Helios::TS $client = shift;
64             my($hashdsn) = @_;
65             my $driver;
66             my $t = time;
67             my $cache_duration = $client->{driver_cache_expiration};
68             if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) {
69             $driver = $client->{cached_drivers}{$hashdsn}{driver};
70             } else {
71             my $db = $client->{databases}{$hashdsn}
72             or croak "Ouch, I don't know about a database whose hash is $hashdsn";
73             if ($db->{driver}) {
74             $driver = $db->{driver};
75             } else {
76             # [LH] 2012-07-11: Changed driver creation to use Helios driver to
77             # cache database connections.
78             $driver = Helios::ObjectDriver::DBI->new(
79             dsn => $db->{dsn},
80             username => $db->{user},
81             password => $db->{pass},
82             );
83             }
84             $driver->prefix($db->{prefix}) if exists $db->{prefix};
85              
86             if ($cache_duration) {
87             $client->{cached_drivers}{$hashdsn}{driver} = $driver;
88             $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
89             }
90             }
91             return $driver;
92             }
93              
94              
95             sub find_job_for_workers {
96             my Helios::TS $client = shift;
97             my($worker_classes) = @_;
98             $worker_classes ||= $client->{current_abilities};
99             for my $hashdsn ($client->shuffled_databases) {
100             ## If the database is dead, skip it.
101             next if $client->is_database_dead($hashdsn);
102              
103             my $driver = $client->driver_for($hashdsn);
104             my $unixtime = $driver->dbd->sql_for_unixtime;
105              
106             my @jobs;
107             eval {
108             ## Search for jobs in this database where:
109             ## 1. funcname is in the list of abilities this $client supports;
110             ## 2. the job is scheduled to be run (run_after is in the past);
111             ## 3. no one else is working on the job (grabbed_until is in
112             ## in the past).
113             my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
114             @$worker_classes;
115              
116             # BEGIN CODE Copyright (C) 2012-3 by Logical Helion, LLC.
117             # [LH] [2013-09-21]: Added code to enable job prioritization.
118             my $direction = 'descend';
119             if ( $client->prioritize eq 'low' ) {
120             $direction = 'ascend';
121             }
122             # END CODE Copyright (C) 2012-3 by Logical Helion, LLC.
123              
124             # [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without
125             # actual TheSchwartz::Worker subclasses to back them up. Switched to using
126             # Helios::TS::Job instead of base TheSchwartz::Job because of this.
127             @jobs = $driver->search('Helios::TS::Job' => {
128             funcid => \@ids,
129             run_after => \ "<= $unixtime",
130             grabbed_until => \ "<= $unixtime",
131             }, { limit => $FIND_JOB_BATCH_SIZE,
132             ( $client->prioritize ? ( sort => 'priority',
133             direction => $direction ) : () )
134             }
135             );
136             };
137             if ($@) {
138             unless (OK_ERRORS->{ $driver->last_error || 0 }) {
139             $client->mark_database_as_dead($hashdsn);
140             }
141             }
142              
143             # for test harness race condition testing
144             $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
145              
146             my $job = $client->_grab_a_job($hashdsn, @jobs);
147             return $job if $job;
148             }
149             }
150              
151              
152             sub _grab_a_job {
153             my Helios::TS $client = shift;
154             my $hashdsn = shift;
155             my $driver = $client->driver_for($hashdsn);
156              
157             ## Got some jobs! Randomize them to avoid contention between workers.
158             my @jobs = shuffle(@_);
159              
160             JOB:
161             while (my $job = shift @jobs) {
162             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
163             # [LH] [2013-10-04] [RT79690] Check the job to see that it has an arg()
164             # value. If it doesn't, throw it away and get a new one. This won't
165             # prevent the LOB from unbinding, but it will work around it in a
166             # relatively transparent way.
167             unless ( ref($job->arg()) ) {
168             next;
169             }
170             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
171             ## Convert the funcid to a funcname, based on this database's map.
172             $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
173              
174             ## Update the job's grabbed_until column so that
175             ## no one else takes it.
176             # my $worker_class = $job->funcname;
177             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
178             # [LH] [2013-10-04] The worker class is the "Active Worker Class" if
179             # it's set. Otherwise, assume it's just the job's jobtype (funcname).
180             my $worker_class = $client->{active_worker_class} || $job->funcname;
181             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
182             my $old_grabbed_until = $job->grabbed_until;
183              
184             my $server_time = $client->get_server_time($driver)
185             or die "expected a server time";
186              
187             $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
188              
189             ## Update the job in the database, and end the transaction.
190             if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
191             ## We lost the race to get this particular job--another worker must
192             ## have got it and already updated it. Move on to the next job.
193             $T_LOST_RACE->() if $T_LOST_RACE;
194             next JOB;
195             }
196              
197             ## Now prepare the job, and return it.
198             my $handle = TheSchwartz::JobHandle->new({
199             dsn_hashed => $hashdsn,
200             jobid => $job->jobid,
201             });
202             $handle->client($client);
203             $job->handle($handle);
204             return $job;
205             }
206              
207             return undef;
208             }
209              
210              
211             sub work_once {
212             # [LH] [2013-10-04] Using Helios::TS not TheSchwartz.
213             my Helios::TS $client = shift;
214             my $job = shift; # optional specific job to work on
215              
216             ## Look for a job with our current set of abilities. Note that the
217             ## list of current abilities may not be equal to the full set of
218             ## abilities, to allow for even distribution between jobs.
219             $job ||= $client->find_job_for_workers;
220              
221             ## If we didn't find anything, restore our full abilities, and try
222             ## again.
223             if (!$job &&
224             @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
225             $client->restore_full_abilities;
226             $job = $client->find_job_for_workers;
227             }
228              
229             # [LH] [2013-10-04]: Virtual Jobtypes: Use the active_worker_class
230             # instead of the job's funcname if active_worker_class is set.
231             my $class = $client->{active_worker_class} || ($job ? $job->funcname : undef);
232              
233             if ($job) {
234             my $priority = $job->priority ? ", priority " . $job->priority : "";
235             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
236             if ($client->{active_worker_class}) {
237             $job->{active_worker_class} = $client->{active_worker_class};
238             }
239             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
240             $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
241             } else {
242             $client->debug("TheSchwartz::work_once found no jobs");
243             }
244              
245             ## If we still don't have anything, return.
246             return unless $job;
247              
248             ## Now that we found a job for this particular funcname, remove it
249             ## from our list of current abilities. So the next time we look for a
250             ## we'll find a job for a different funcname. This prevents starvation of
251             ## high funcid values because of the way MySQL's indexes work.
252             # [LH] [2013-10-04]: work_once(): Commented out call to
253             # temporarily_remove_ability() because we do not think the issue it solves is
254             # a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a()
255             # is supposed to solve, and we're not sure MySQL indexes do anymore either).
256             # $client->temporarily_remove_ability($class);
257              
258             $class->work_safely($job);
259              
260             ## We got a job, so return 1 so work_until_done (which calls this method)
261             ## knows to keep looking for jobs.
262             return 1;
263             }
264              
265              
266             1;
267             __END__