line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Helios::TS; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
13
|
use 5.008; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
34
|
|
4
|
1
|
|
|
1
|
|
3
|
use strict; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
20
|
|
5
|
1
|
|
|
1
|
|
3
|
use warnings; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
24
|
|
6
|
1
|
|
|
1
|
|
2
|
use base qw(TheSchwartz); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
445
|
|
7
|
|
|
|
|
|
|
use fields qw(active_worker_class); # new fields for this subclass |
8
|
|
|
|
|
|
|
use Carp qw( croak ); |
9
|
|
|
|
|
|
|
use List::Util qw( shuffle ); |
10
|
|
|
|
|
|
|
use Helios::TS::Job; |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
use constant OK_ERRORS => { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, }; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $VERSION = '2.80'; |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
# FILE CHANGE HISTORY |
17
|
|
|
|
|
|
|
# (This code is modified from the original TheSchwartz.pm where noted.) |
18
|
|
|
|
|
|
|
# [LH] [2012-07-11]: driver_for(): Changed driver creation to use Helios driver |
19
|
|
|
|
|
|
|
# to cache database connections. |
20
|
|
|
|
|
|
|
# [LH] [2013-09-21]: find_job_for_workers(): Added code to enable job |
21
|
|
|
|
|
|
|
# prioritization. |
22
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without |
23
|
|
|
|
|
|
|
# actual TheSchwartz::Worker subclasses to back them up. Switched to using |
24
|
|
|
|
|
|
|
# Helios::TS::Job instead of base TheSchwartz::Job because of this. |
25
|
|
|
|
|
|
|
# [LH] [2013-10-04]: work_once(): Commented out call to |
26
|
|
|
|
|
|
|
# temporarily_remove_ability() because we do not think the issue it solves is |
27
|
|
|
|
|
|
|
# a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a() |
28
|
|
|
|
|
|
|
# is supposed to solve, and we're not sure MySQL indexes do anymore either). |
29
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Fix for Helios bug [RT79690], which appears to be a DBD |
30
|
|
|
|
|
|
|
# problem where a LOB becomes unbound in a query. |
31
|
|
|
|
|
|
|
# [LH] [2013-11-24]: Removed old code already commented out. |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; |
34
|
|
|
|
|
|
|
our $T_LOST_RACE; |
35
|
|
|
|
|
|
|
our $FIND_JOB_BATCH_SIZE = 50; |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
38
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Virtual jobtypes: Helios::TS->{active_worker_class} |
39
|
|
|
|
|
|
|
# attribute and accessors for it. |
40
|
|
|
|
|
|
|
sub new { |
41
|
|
|
|
|
|
|
my $class = shift; |
42
|
|
|
|
|
|
|
my %params = @_; |
43
|
|
|
|
|
|
|
my $self = fields::new($class); |
44
|
|
|
|
|
|
|
$self->SUPER::new(@_); # init base fields |
45
|
|
|
|
|
|
|
if ( defined($params{active_worker_class})) { |
46
|
|
|
|
|
|
|
$self->{active_worker_class} = $params{active_worker_class}; |
47
|
|
|
|
|
|
|
} |
48
|
|
|
|
|
|
|
return $self; |
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
sub active_worker_class { |
52
|
|
|
|
|
|
|
my Helios::TS $hts = shift; |
53
|
|
|
|
|
|
|
return $hts->{active_worker_class}; |
54
|
|
|
|
|
|
|
} |
55
|
|
|
|
|
|
|
sub set_active_worker_class { |
56
|
|
|
|
|
|
|
my Helios::TS $hts = shift; |
57
|
|
|
|
|
|
|
$hts->{active_worker_class} = shift; |
58
|
|
|
|
|
|
|
} |
59
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
sub driver_for { |
63
|
|
|
|
|
|
|
my Helios::TS $client = shift; |
64
|
|
|
|
|
|
|
my($hashdsn) = @_; |
65
|
|
|
|
|
|
|
my $driver; |
66
|
|
|
|
|
|
|
my $t = time; |
67
|
|
|
|
|
|
|
my $cache_duration = $client->{driver_cache_expiration}; |
68
|
|
|
|
|
|
|
if ($cache_duration && $client->{cached_drivers}{$hashdsn}{create_ts} && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration > $t) { |
69
|
|
|
|
|
|
|
$driver = $client->{cached_drivers}{$hashdsn}{driver}; |
70
|
|
|
|
|
|
|
} else { |
71
|
|
|
|
|
|
|
my $db = $client->{databases}{$hashdsn} |
72
|
|
|
|
|
|
|
or croak "Ouch, I don't know about a database whose hash is $hashdsn"; |
73
|
|
|
|
|
|
|
if ($db->{driver}) { |
74
|
|
|
|
|
|
|
$driver = $db->{driver}; |
75
|
|
|
|
|
|
|
} else { |
76
|
|
|
|
|
|
|
# [LH] 2012-07-11: Changed driver creation to use Helios driver to |
77
|
|
|
|
|
|
|
# cache database connections. |
78
|
|
|
|
|
|
|
$driver = Helios::ObjectDriver::DBI->new( |
79
|
|
|
|
|
|
|
dsn => $db->{dsn}, |
80
|
|
|
|
|
|
|
username => $db->{user}, |
81
|
|
|
|
|
|
|
password => $db->{pass}, |
82
|
|
|
|
|
|
|
); |
83
|
|
|
|
|
|
|
} |
84
|
|
|
|
|
|
|
$driver->prefix($db->{prefix}) if exists $db->{prefix}; |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
if ($cache_duration) { |
87
|
|
|
|
|
|
|
$client->{cached_drivers}{$hashdsn}{driver} = $driver; |
88
|
|
|
|
|
|
|
$client->{cached_drivers}{$hashdsn}{create_ts} = $t; |
89
|
|
|
|
|
|
|
} |
90
|
|
|
|
|
|
|
} |
91
|
|
|
|
|
|
|
return $driver; |
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub find_job_for_workers { |
96
|
|
|
|
|
|
|
my Helios::TS $client = shift; |
97
|
|
|
|
|
|
|
my($worker_classes) = @_; |
98
|
|
|
|
|
|
|
$worker_classes ||= $client->{current_abilities}; |
99
|
|
|
|
|
|
|
for my $hashdsn ($client->shuffled_databases) { |
100
|
|
|
|
|
|
|
## If the database is dead, skip it. |
101
|
|
|
|
|
|
|
next if $client->is_database_dead($hashdsn); |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
my $driver = $client->driver_for($hashdsn); |
104
|
|
|
|
|
|
|
my $unixtime = $driver->dbd->sql_for_unixtime; |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
my @jobs; |
107
|
|
|
|
|
|
|
eval { |
108
|
|
|
|
|
|
|
## Search for jobs in this database where: |
109
|
|
|
|
|
|
|
## 1. funcname is in the list of abilities this $client supports; |
110
|
|
|
|
|
|
|
## 2. the job is scheduled to be run (run_after is in the past); |
111
|
|
|
|
|
|
|
## 3. no one else is working on the job (grabbed_until is in |
112
|
|
|
|
|
|
|
## in the past). |
113
|
|
|
|
|
|
|
my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) } |
114
|
|
|
|
|
|
|
@$worker_classes; |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
# BEGIN CODE Copyright (C) 2012-3 by Logical Helion, LLC. |
117
|
|
|
|
|
|
|
# [LH] [2013-09-21]: Added code to enable job prioritization. |
118
|
|
|
|
|
|
|
my $direction = 'descend'; |
119
|
|
|
|
|
|
|
if ( $client->prioritize eq 'low' ) { |
120
|
|
|
|
|
|
|
$direction = 'ascend'; |
121
|
|
|
|
|
|
|
} |
122
|
|
|
|
|
|
|
# END CODE Copyright (C) 2012-3 by Logical Helion, LLC. |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Implemented "virtual jobtypes" - funcmap entries without |
125
|
|
|
|
|
|
|
# actual TheSchwartz::Worker subclasses to back them up. Switched to using |
126
|
|
|
|
|
|
|
# Helios::TS::Job instead of base TheSchwartz::Job because of this. |
127
|
|
|
|
|
|
|
@jobs = $driver->search('Helios::TS::Job' => { |
128
|
|
|
|
|
|
|
funcid => \@ids, |
129
|
|
|
|
|
|
|
run_after => \ "<= $unixtime", |
130
|
|
|
|
|
|
|
grabbed_until => \ "<= $unixtime", |
131
|
|
|
|
|
|
|
}, { limit => $FIND_JOB_BATCH_SIZE, |
132
|
|
|
|
|
|
|
( $client->prioritize ? ( sort => 'priority', |
133
|
|
|
|
|
|
|
direction => $direction ) : () ) |
134
|
|
|
|
|
|
|
} |
135
|
|
|
|
|
|
|
); |
136
|
|
|
|
|
|
|
}; |
137
|
|
|
|
|
|
|
if ($@) { |
138
|
|
|
|
|
|
|
unless (OK_ERRORS->{ $driver->last_error || 0 }) { |
139
|
|
|
|
|
|
|
$client->mark_database_as_dead($hashdsn); |
140
|
|
|
|
|
|
|
} |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# for test harness race condition testing |
144
|
|
|
|
|
|
|
$T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE; |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
my $job = $client->_grab_a_job($hashdsn, @jobs); |
147
|
|
|
|
|
|
|
return $job if $job; |
148
|
|
|
|
|
|
|
} |
149
|
|
|
|
|
|
|
} |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
sub _grab_a_job { |
153
|
|
|
|
|
|
|
my Helios::TS $client = shift; |
154
|
|
|
|
|
|
|
my $hashdsn = shift; |
155
|
|
|
|
|
|
|
my $driver = $client->driver_for($hashdsn); |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
## Got some jobs! Randomize them to avoid contention between workers. |
158
|
|
|
|
|
|
|
my @jobs = shuffle(@_); |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
JOB: |
161
|
|
|
|
|
|
|
while (my $job = shift @jobs) { |
162
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
163
|
|
|
|
|
|
|
# [LH] [2013-10-04] [RT79690] Check the job to see that it has an arg() |
164
|
|
|
|
|
|
|
# value. If it doesn't, throw it away and get a new one. This won't |
165
|
|
|
|
|
|
|
# prevent the LOB from unbinding, but it will work around it in a |
166
|
|
|
|
|
|
|
# relatively transparent way. |
167
|
|
|
|
|
|
|
unless ( ref($job->arg()) ) { |
168
|
|
|
|
|
|
|
next; |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
171
|
|
|
|
|
|
|
## Convert the funcid to a funcname, based on this database's map. |
172
|
|
|
|
|
|
|
$job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) ); |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
## Update the job's grabbed_until column so that |
175
|
|
|
|
|
|
|
## no one else takes it. |
176
|
|
|
|
|
|
|
# my $worker_class = $job->funcname; |
177
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
178
|
|
|
|
|
|
|
# [LH] [2013-10-04] The worker class is the "Active Worker Class" if |
179
|
|
|
|
|
|
|
# it's set. Otherwise, assume it's just the job's jobtype (funcname). |
180
|
|
|
|
|
|
|
my $worker_class = $client->{active_worker_class} || $job->funcname; |
181
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
182
|
|
|
|
|
|
|
my $old_grabbed_until = $job->grabbed_until; |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
my $server_time = $client->get_server_time($driver) |
185
|
|
|
|
|
|
|
or die "expected a server time"; |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
$job->grabbed_until($server_time + ($worker_class->grab_for || 1)); |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
## Update the job in the database, and end the transaction. |
190
|
|
|
|
|
|
|
if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) { |
191
|
|
|
|
|
|
|
## We lost the race to get this particular job--another worker must |
192
|
|
|
|
|
|
|
## have got it and already updated it. Move on to the next job. |
193
|
|
|
|
|
|
|
$T_LOST_RACE->() if $T_LOST_RACE; |
194
|
|
|
|
|
|
|
next JOB; |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
## Now prepare the job, and return it. |
198
|
|
|
|
|
|
|
my $handle = TheSchwartz::JobHandle->new({ |
199
|
|
|
|
|
|
|
dsn_hashed => $hashdsn, |
200
|
|
|
|
|
|
|
jobid => $job->jobid, |
201
|
|
|
|
|
|
|
}); |
202
|
|
|
|
|
|
|
$handle->client($client); |
203
|
|
|
|
|
|
|
$job->handle($handle); |
204
|
|
|
|
|
|
|
return $job; |
205
|
|
|
|
|
|
|
} |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
return undef; |
208
|
|
|
|
|
|
|
} |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
sub work_once { |
212
|
|
|
|
|
|
|
# [LH] [2013-10-04] Using Helios::TS not TheSchwartz. |
213
|
|
|
|
|
|
|
my Helios::TS $client = shift; |
214
|
|
|
|
|
|
|
my $job = shift; # optional specific job to work on |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
## Look for a job with our current set of abilities. Note that the |
217
|
|
|
|
|
|
|
## list of current abilities may not be equal to the full set of |
218
|
|
|
|
|
|
|
## abilities, to allow for even distribution between jobs. |
219
|
|
|
|
|
|
|
$job ||= $client->find_job_for_workers; |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
## If we didn't find anything, restore our full abilities, and try |
222
|
|
|
|
|
|
|
## again. |
223
|
|
|
|
|
|
|
if (!$job && |
224
|
|
|
|
|
|
|
@{ $client->{current_abilities} } < @{ $client->{all_abilities} }) { |
225
|
|
|
|
|
|
|
$client->restore_full_abilities; |
226
|
|
|
|
|
|
|
$job = $client->find_job_for_workers; |
227
|
|
|
|
|
|
|
} |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
# [LH] [2013-10-04]: Virtual Jobtypes: Use the active_worker_class |
230
|
|
|
|
|
|
|
# instead of the job's funcname if active_worker_class is set. |
231
|
|
|
|
|
|
|
my $class = $client->{active_worker_class} || ($job ? $job->funcname : undef); |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
if ($job) { |
234
|
|
|
|
|
|
|
my $priority = $job->priority ? ", priority " . $job->priority : ""; |
235
|
|
|
|
|
|
|
# BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
236
|
|
|
|
|
|
|
if ($client->{active_worker_class}) { |
237
|
|
|
|
|
|
|
$job->{active_worker_class} = $client->{active_worker_class}; |
238
|
|
|
|
|
|
|
} |
239
|
|
|
|
|
|
|
# END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC. |
240
|
|
|
|
|
|
|
$job->debug("TheSchwartz::work_once got job of class '$class'$priority"); |
241
|
|
|
|
|
|
|
} else { |
242
|
|
|
|
|
|
|
$client->debug("TheSchwartz::work_once found no jobs"); |
243
|
|
|
|
|
|
|
} |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
## If we still don't have anything, return. |
246
|
|
|
|
|
|
|
return unless $job; |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
## Now that we found a job for this particular funcname, remove it |
249
|
|
|
|
|
|
|
## from our list of current abilities. So the next time we look for a |
250
|
|
|
|
|
|
|
## we'll find a job for a different funcname. This prevents starvation of |
251
|
|
|
|
|
|
|
## high funcid values because of the way MySQL's indexes work. |
252
|
|
|
|
|
|
|
# [LH] [2013-10-04]: work_once(): Commented out call to |
253
|
|
|
|
|
|
|
# temporarily_remove_ability() because we do not think the issue it solves is |
254
|
|
|
|
|
|
|
# a concern for Helios::TS (Oracle's indexes do not exhibit the issue t_r_a() |
255
|
|
|
|
|
|
|
# is supposed to solve, and we're not sure MySQL indexes do anymore either). |
256
|
|
|
|
|
|
|
# $client->temporarily_remove_ability($class); |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
$class->work_safely($job); |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
## We got a job, so return 1 so work_until_done (which calls this method) |
261
|
|
|
|
|
|
|
## knows to keep looking for jobs. |
262
|
|
|
|
|
|
|
return 1; |
263
|
|
|
|
|
|
|
} |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
1; |
267
|
|
|
|
|
|
|
__END__ |