File Coverage

blib/lib/TheSchwartz.pm
Criterion Covered Total %
statement 39 439 8.8
branch 0 176 0.0
condition 0 66 0.0
subroutine 13 64 20.3
pod 30 45 66.6
total 82 790 10.3


line stmt bran cond sub pod time code
1             # $Id$
2              
3             package TheSchwartz;
4 24     24   159201 use 5.008001;
  24         232  
5 24     24   151 use strict;
  24         65  
  24         1221  
6             use fields
7 24     24   12898 qw( databases retry_seconds dead_dsns retry_at funcmap_cache verbose all_abilities current_abilities current_job cached_drivers driver_cache_expiration scoreboard prioritize floor batch_size strict_remove_ability);
  24         43596  
  24         114  
8              
9             our $VERSION = "1.17";
10              
11 24     24   4355 use Carp qw( croak );
  24         54  
  24         1967  
12 24     24   11729 use Data::ObjectDriver::Errors;
  24         4996  
  24         765  
13 24     24   12909 use Data::ObjectDriver::Driver::DBI;
  24         1230127  
  24         319  
14 24     24   1150 use Digest::MD5 qw( md5_hex );
  24         54  
  24         1811  
15 24     24   190 use List::Util qw( shuffle );
  24         50  
  24         2632  
16 24     24   11658 use TheSchwartz::FuncMap;
  24         65  
  24         682  
17 24     24   12300 use TheSchwartz::Job;
  24         74  
  24         719  
18 24     24   198 use TheSchwartz::JobHandle;
  24         47  
  24         111  
19              
20 24     24   742 use constant RETRY_DEFAULT => 30;
  24         54  
  24         2291  
21             use constant OK_ERRORS =>
22 24     24   148 { map { $_ => 1 } Data::ObjectDriver::Errors->UNIQUE_CONSTRAINT, };
  24         50  
  24         280  
  24         126137  
23              
24             # test harness hooks
25             our $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
26             our $T_LOST_RACE;
27              
28             ## Number of jobs to fetch at a time in find_job_for_workers.
29             our $FIND_JOB_BATCH_SIZE = 50;
30             our $RANDOMIZE_JOBS = 1;
31              
32             sub new {
33 0     0 1   my TheSchwartz $client = shift;
34 0           my %args = @_;
35 0 0         $client = fields::new($client) unless ref $client;
36              
37             croak "databases must be an arrayref if specified"
38 0 0 0       unless !exists $args{databases} || ref $args{databases} eq 'ARRAY';
39 0           my $databases = delete $args{databases};
40              
41 0   0       $client->{retry_seconds} = delete $args{retry_seconds} || RETRY_DEFAULT;
42 0           $client->set_prioritize( delete $args{prioritize} );
43 0           $client->set_verbose( delete $args{verbose} );
44 0           $client->set_scoreboard( delete $args{scoreboard} );
45             $client->{driver_cache_expiration} = delete $args{driver_cache_expiration}
46 0   0       || 0;
47 0   0       $client->{batch_size} = delete $args{batch_size} || $FIND_JOB_BATCH_SIZE;
48              
49 0           $client->{strict_remove_ability} = delete $args{strict_remove_ability};
50              
51 0           my $floor = delete $args{floor};
52 0 0         $client->set_floor($floor) if ($floor);
53              
54 0 0         croak "unknown options ", join( ', ', keys %args ) if keys %args;
55              
56 0           $client->hash_databases($databases);
57 0           $client->reset_abilities;
58 0           $client->{dead_dsns} = {};
59 0           $client->{retry_at} = {};
60 0           $client->{funcmap_cache} = {};
61              
62 0           return $client;
63             }
64              
65             sub debug {
66 0     0 0   my TheSchwartz $client = shift;
67 0 0         return unless $client->{verbose};
68 0           $client->{verbose}->(@_); # ($msg, $job) but $job is optional
69             }
70              
71             sub hash_databases {
72 0     0 0   my TheSchwartz $client = shift;
73 0           my ($list) = @_;
74 0           for my $ref (@$list) {
75 0           my $var;
76             my @parts;
77 0 0         if ( $ref->{driver} ) {
78 0           my $dbh;
79 0 0         if ( my $getter = $ref->{driver}->get_dbh ) {
80 0           $dbh = $getter->();
81             }
82             else {
83 0           $dbh = $ref->{driver}->dbh;
84             }
85 0           $dbh = tied( %{$dbh} );
  0            
86 0           my $dsn = "dbd:" . $dbh->{Driver}->{Name} . ":" . $dbh->{Name};
87 0   0       my $user = $dbh->{Username} || '';
88 0           @parts = ( $dsn, $user );
89             }
90             else {
91 0 0         @parts = map { $ref->{$_} || '' } qw(dsn user);
  0            
92             }
93 0           my $full = join '|', @parts;
94 0           $client->{databases}{ md5_hex($full) } = $ref;
95             }
96             }
97              
98             sub driver_for {
99 0     0 0   my TheSchwartz $client = shift;
100 0           my ($hashdsn) = @_;
101 0           my $driver;
102 0           my $t = time;
103 0           my $cache_duration = $client->{driver_cache_expiration};
104 0 0 0       if ( $cache_duration
      0        
105             && $client->{cached_drivers}{$hashdsn}{create_ts}
106             && $client->{cached_drivers}{$hashdsn}{create_ts} + $cache_duration
107             > $t )
108             {
109 0           $driver = $client->{cached_drivers}{$hashdsn}{driver};
110             }
111             else {
112 0 0         my $db = $client->{databases}{$hashdsn}
113             or croak
114             "Ouch, I don't know about a database whose hash is $hashdsn";
115 0 0         if ( $db->{driver} ) {
116 0           $driver = $db->{driver};
117             }
118             else {
119             $driver = Data::ObjectDriver::Driver::DBI->new(
120             dsn => $db->{dsn},
121             username => $db->{user},
122             password => $db->{pass},
123 0           );
124             }
125 0 0         $driver->prefix( $db->{prefix} ) if exists $db->{prefix};
126              
127 0 0         if ($cache_duration) {
128 0           $client->{cached_drivers}{$hashdsn}{driver} = $driver;
129 0           $client->{cached_drivers}{$hashdsn}{create_ts} = $t;
130             }
131             }
132 0           return $driver;
133             }
134              
135             sub mark_database_as_dead {
136 0     0 0   my TheSchwartz $client = shift;
137 0           my ($hashdsn) = @_;
138 0           $client->{dead_dsns}{$hashdsn} = 1;
139 0           $client->{retry_at}{$hashdsn} = time + $client->{retry_seconds};
140 0   0       $client->debug("Disabling DB $hashdsn because " . ($client->driver_for($hashdsn)->last_error() || 'unknown'));
141             }
142              
143             sub is_database_dead {
144 0     0 0   my TheSchwartz $client = shift;
145 0           my ($hashdsn) = @_;
146             ## If this database is marked as dead, check the retry time. If
147             ## it has passed, try the database again to see if it's undead.
148 0 0         if ( $client->{dead_dsns}{$hashdsn} ) {
149 0 0         if ( $client->{retry_at}{$hashdsn} < time ) {
150 0           delete $client->{dead_dsns}{$hashdsn};
151 0           delete $client->{retry_at}{$hashdsn};
152 0           return 0;
153             }
154             else {
155 0           return 1;
156             }
157             }
158 0           return 0;
159             }
160              
161             sub lookup_job {
162 0     0 1   my TheSchwartz $client = shift;
163 0           my $handle = $client->handle_from_string(@_);
164 0           my $driver = $client->driver_for( $handle->dsn_hashed );
165              
166 0           my $id = $handle->jobid;
167 0 0         my $job = $driver->lookup( 'TheSchwartz::Job' => $handle->jobid )
168             or return;
169              
170 0           $job->handle($handle);
171 0           $job->funcname(
172             $client->funcid_to_name( $driver, $handle->dsn_hashed, $job->funcid )
173             );
174 0           return $job;
175             }
176              
177             sub list_jobs {
178 0     0 1   my TheSchwartz $client = shift;
179 0           my $arg = shift;
180              
181 0           my ( %terms, %options );
182              
183             $terms{run_after} = { op => '<=', value => $arg->{run_after} }
184 0 0         if exists $arg->{run_after};
185              
186             $terms{grabbed_until} = { op => '<=', value => $arg->{grabbed_until} }
187 0 0         if exists $arg->{grabbed_until};
188              
189             $terms{jobid} = { op => '=', value => $arg->{jobid} }
190 0 0         if exists $arg->{jobid};
191              
192 0 0         die "No funcname" unless exists $arg->{funcname};
193              
194 0 0         $arg->{want_handle} = 1 unless defined $arg->{want_handle};
195              
196 0   0       my $limit = $arg->{limit} || $client->batch_size;
197              
198 0 0         if ( $arg->{coalesce} ) {
199 0   0       $arg->{coalesce_op} ||= '=';
200             }
201              
202 0           $options{limit} = $limit;
203 0 0         if ( $client->prioritize ) {
204             $options{sort} = [
205 0           { column => 'priority', direction => 'descend' },
206             { column => 'jobid' },
207             ];
208             }
209             else { # RT #34843
210 0           $options{sort} = [ { column => 'jobid' }, ];
211             }
212              
213 0 0         if ( $client->floor ) {
214 0           $terms{priority} = { op => '>=', value => $client->floor };
215             }
216              
217 0           my @jobs;
218 0           for my $hashdsn ( $client->shuffled_databases ) {
219             ## If the database is dead, skip it
220 0 0         next if $client->is_database_dead($hashdsn);
221 0           my $driver = $client->driver_for($hashdsn);
222 0 0         if ( ref( $arg->{funcname} ) ) {
223             $terms{funcid}
224 0           = [ map { $client->funcname_to_id( $driver, $hashdsn, $_ ) }
225 0           @{ $arg->{funcname} } ];
  0            
226             }
227             else {
228             $terms{funcid} = $client->funcname_to_id( $driver, $hashdsn,
229 0           $arg->{funcname} );
230             }
231              
232 0 0         if ( $arg->{want_handle} ) {
233             push @jobs, map {
234 0           my $handle = TheSchwartz::JobHandle->new(
  0            
235             { dsn_hashed => $hashdsn,
236             client => $client,
237             jobid => $_->jobid
238             }
239             );
240 0           $_->handle($handle);
241 0           $_;
242             } $driver->search( 'TheSchwartz::Job' => \%terms, \%options );
243             }
244             else {
245 0           push @jobs,
246             $driver->search( 'TheSchwartz::Job' => \%terms, \%options );
247             }
248             }
249 0           return @jobs;
250             }
251              
252             sub find_job_with_coalescing_prefix {
253 0     0 1   my TheSchwartz $client = shift;
254 0           my ( $funcname, $coval ) = @_;
255 0           $coval .= "%";
256 0           return $client->_find_job_with_coalescing( 'LIKE', $funcname, $coval );
257             }
258              
259             sub find_job_with_coalescing_value {
260 0     0 1   my TheSchwartz $client = shift;
261 0           return $client->_find_job_with_coalescing( '=', @_ );
262             }
263              
264             sub _find_job_with_coalescing {
265 0     0     my TheSchwartz $client = shift;
266 0           my ( $op, $funcname, $coval ) = @_;
267              
268 0           for my $hashdsn ( $client->shuffled_databases ) {
269             ## If the database is dead, skip it
270 0 0         next if $client->is_database_dead($hashdsn);
271              
272 0           my $driver = $client->driver_for($hashdsn);
273 0           my $unixtime = $driver->dbd->sql_for_unixtime;
274              
275 0           my %options = ( limit => $client->batch_size );
276 0 0         if ( $client->prioritize ) {
277             $options{sort} = [
278 0           { column => 'priority', direction => 'descend' },
279             { column => 'jobid' },
280             ];
281             }
282             else { # RT #34843
283 0           $options{sort} = [ { column => 'jobid' }, ];
284             }
285              
286 0           my @jobs;
287 0           eval {
288             ## Search for jobs in this database where:
289             ## 1. funcname is in the list of abilities this $client supports;
290             ## 2. the job is scheduled to be run (run_after is in the past);
291             ## 3. no one else is working on the job (grabbed_until is in
292             ## in the past).
293 0           my $funcid
294             = $client->funcname_to_id( $driver, $hashdsn, $funcname );
295              
296 0           my %terms = (
297             funcid => $funcid,
298             run_after => \"<= $unixtime",
299             grabbed_until => \"<= $unixtime",
300             coalesce => { op => $op, value => $coval },
301             );
302              
303 0 0         if ( $client->floor ) {
304 0           $terms{priority} = { op => '>=', value => $client->floor };
305             }
306              
307 0           @jobs = $driver->search(
308             'TheSchwartz::Job' => \%terms,
309             \%options,
310             );
311             };
312 0 0         if ($@) {
313 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
314 0           $client->mark_database_as_dead($hashdsn);
315             }
316             }
317              
318 0           my $job = $client->_grab_a_job( $hashdsn, @jobs );
319 0 0         return $job if $job;
320             }
321             }
322              
323             sub find_job_for_workers {
324 0     0 1   my TheSchwartz $client = shift;
325 0           my ($worker_classes) = @_;
326 0   0       $worker_classes ||= $client->{current_abilities};
327              
328 0           my %options = ( limit => $client->batch_size );
329 0 0         if ( $client->prioritize ) {
330             $options{sort} = [
331 0           { column => 'priority', direction => 'descend' },
332             { column => 'jobid' },
333             ];
334             }
335             else { # RT #34843
336 0           $options{sort} = [ { column => 'jobid' }, ];
337             }
338              
339 0           for my $hashdsn ( $client->shuffled_databases ) {
340             ## If the database is dead, skip it.
341 0 0         next if $client->is_database_dead($hashdsn);
342              
343 0           my $driver = $client->driver_for($hashdsn);
344 0           my $unixtime = $driver->dbd->sql_for_unixtime;
345              
346 0           my @jobs;
347 0           eval {
348             ## Search for jobs in this database where:
349             ## 1. funcname is in the list of abilities this $client supports;
350             ## 2. the job is scheduled to be run (run_after is in the past);
351             ## 3. no one else is working on the job (grabbed_until is in
352             ## in the past).
353 0           my @ids = map { $client->funcname_to_id( $driver, $hashdsn, $_ ) }
  0            
354             @$worker_classes;
355              
356 0           my %terms = (
357             funcid => \@ids,
358             run_after => \"<= $unixtime",
359             grabbed_until => \"<= $unixtime",
360             );
361              
362 0 0         if ( $client->floor ) {
363 0           $terms{priority} = { op => '>=', value => $client->floor };
364             }
365              
366 0           @jobs = $driver->search(
367             'TheSchwartz::Job' => \%terms,
368             \%options,
369             );
370             };
371 0 0         if ($@) {
372 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
373 0           $client->mark_database_as_dead($hashdsn);
374             }
375             }
376              
377             # for test harness race condition testing
378 0 0         $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->()
379             if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
380              
381 0           my $job = $client->_grab_a_job( $hashdsn, @jobs );
382 0 0         return $job if $job;
383             }
384             }
385              
386             sub get_server_time {
387 0     0 1   my TheSchwartz $client = shift;
388 0           my ($driver) = @_;
389 0           my $unixtime_sql = $driver->dbd->sql_for_unixtime;
390              
391             # RT #58049
392 0 0         $unixtime_sql .= ' FROM DUAL'
393             if ( $driver->dbd->isa('Data::ObjectDriver::Driver::DBD::Oracle') );
394              
395 0           return $driver->rw_handle->selectrow_array("SELECT $unixtime_sql");
396             }
397              
398             sub _grab_a_job {
399 0     0     my TheSchwartz $client = shift;
400 0           my $hashdsn = shift;
401 0           my $driver = $client->driver_for($hashdsn);
402              
403             ## Got some jobs! Randomize them to avoid contention between workers.
404 0 0         my @jobs = $RANDOMIZE_JOBS ? shuffle(@_) : @_;
405              
406             JOB:
407 0           while ( my $job = shift @jobs ) {
408             ## Convert the funcid to a funcname, based on this database's map.
409 0           $job->funcname(
410             $client->funcid_to_name( $driver, $hashdsn, $job->funcid ) );
411              
412             ## Update the job's grabbed_until column so that
413             ## no one else takes it.
414 0           my $worker_class = $job->funcname;
415 0           my $old_grabbed_until = $job->grabbed_until;
416              
417 0 0         my $server_time = $client->get_server_time($driver)
418             or die "expected a server time";
419              
420 0   0       $job->grabbed_until(
421             $server_time + ( $worker_class->grab_for || 1 ) );
422              
423             ## Update the job in the database, and end the transaction.
424             ## NOTE: For some reason, D::OD doesn't ensure the object's value is
425             ## in bounds of original search query. so we need to be more paranoic
426             ## to make sure it's not grabbed by other workers.
427 0           my $unixtime = $driver->dbd->sql_for_unixtime;
428 0 0         if ( $driver->update( $job, {
429             grabbed_until => [
430             '-and',
431             { op => '=', value => $old_grabbed_until},
432             \" <= $unixtime"
433             ]}) < 1 )
434             {
435             ## We lost the race to get this particular job--another worker must
436             ## have got it and already updated it. Move on to the next job.
437 0 0         $T_LOST_RACE->() if $T_LOST_RACE;
438 0           next JOB;
439             }
440              
441             ## Now prepare the job, and return it.
442 0           my $handle = TheSchwartz::JobHandle->new(
443             { dsn_hashed => $hashdsn,
444             jobid => $job->jobid,
445             }
446             );
447 0           $handle->client($client);
448 0           $job->handle($handle);
449 0           return $job;
450             }
451              
452 0           return;
453             }
454              
455             sub shuffled_databases {
456 0     0 0   my TheSchwartz $client = shift;
457 0           my @dsns = keys %{ $client->{databases} };
  0            
458 0           return shuffle(@dsns);
459             }
460              
461             sub insert_job_to_driver {
462 0     0 0   my $client = shift;
463 0           my ( $job, $driver, $hashdsn ) = @_;
464 0           eval {
465             ## Set the funcid of the job, based on the funcname. Since each
466             ## database has a separate cache, this needs to be calculated based
467             ## on the hashed DSN. Also: this might fail, if the database is dead.
468 0           $job->funcid(
469             $client->funcname_to_id( $driver, $hashdsn, $job->funcname ) );
470              
471             ## This is sub-optimal because of clock skew, but something is
472             ## better than a NULL value. And currently, nothing in TheSchwartz
473             ## code itself uses insert_time. TODO: use server time, but without
474             ## having to do a roundtrip just to get the server time.
475 0           $job->insert_time(time);
476              
477             ## Now, insert the job. This also might fail.
478 0           $driver->insert($job);
479             };
480 0 0         if ($@) {
    0          
481 0 0 0       unless ( OK_ERRORS->{ $driver->last_error || 0 } ) {
482 0           $client->mark_database_as_dead($hashdsn);
483             }
484             }
485             elsif ( $job->jobid ) {
486             ## We inserted the job successfully!
487             ## Attach a handle to the job, and return the handle.
488 0           my $handle = TheSchwartz::JobHandle->new(
489             { dsn_hashed => $hashdsn,
490             client => $client,
491             jobid => $job->jobid
492             }
493             );
494 0           $job->handle($handle);
495 0           return $handle;
496             }
497 0           return;
498             }
499              
500             sub insert_jobs {
501 0     0 1   my TheSchwartz $client = shift;
502 0           my (@jobs) = @_;
503              
504             ## Try each of the databases that are registered with $client, in
505             ## random order. If we successfully create the job, exit the loop.
506 0           my @handles;
507             DATABASE:
508 0           for my $hashdsn ( $client->shuffled_databases ) {
509             ## If the database is dead, skip it.
510 0 0         next if $client->is_database_dead($hashdsn);
511              
512 0           my $driver = $client->driver_for($hashdsn);
513 0           $driver->begin_work;
514 0           for my $j (@jobs) {
515 0           my $h = $client->insert_job_to_driver( $j, $driver, $hashdsn );
516 0 0         if ($h) {
517 0           push @handles, $h;
518             }
519             else {
520 0           $driver->rollback;
521 0           @handles = ();
522 0           next DATABASE;
523             }
524             }
525 0 0         last if eval { $driver->commit };
  0            
526 0           @handles = ();
527 0           next DATABASE;
528             }
529              
530 0 0         return wantarray ? @handles : scalar @handles;
531             }
532              
533             sub insert {
534 0     0 1   my TheSchwartz $client = shift;
535 0           my $job = shift;
536 0 0         if ( ref( $_[0] ) eq "TheSchwartz::Job" ) {
537 0           croak "Can't insert multiple jobs with method 'insert'\n";
538             }
539 0 0         unless ( ref($job) eq 'TheSchwartz::Job' ) {
540 0           $job = TheSchwartz::Job->new_from_array( $job, $_[0] );
541             }
542              
543             ## Try each of the databases that are registered with $client, in
544             ## random order. If we successfully create the job, exit the loop.
545 0           for my $hashdsn ( $client->shuffled_databases ) {
546             ## If the database is dead, skip it.
547 0 0         next if $client->is_database_dead($hashdsn);
548              
549 0           my $driver = $client->driver_for($hashdsn);
550              
551             ## Try to insert the job into this database. If we get a handle
552             ## back, return it.
553 0           my $handle = $client->insert_job_to_driver( $job, $driver, $hashdsn );
554 0 0         return $handle if $handle;
555             }
556              
557             ## If the job wasn't submitted successfully to any database, return.
558 0           return;
559             }
560              
561             sub handle_from_string {
562 0     0 0   my TheSchwartz $client = shift;
563 0           my $handle = TheSchwartz::JobHandle->new_from_string(@_);
564 0           $handle->client($client);
565 0           return $handle;
566             }
567              
568             sub can_do {
569 0     0 1   my TheSchwartz $client = shift;
570 0           my ($class) = @_;
571 0           push @{ $client->{all_abilities} }, $class;
  0            
572 0           push @{ $client->{current_abilities} }, $class;
  0            
573             }
574              
575             sub reset_abilities {
576 0     0 0   my TheSchwartz $client = shift;
577 0           $client->{all_abilities} = [];
578 0           $client->{current_abilities} = [];
579             }
580              
581             sub restore_full_abilities {
582 0     0 0   my $client = shift;
583 0           $client->{current_abilities} = [ @{ $client->{all_abilities} } ];
  0            
584             }
585              
586             sub temporarily_remove_ability {
587 0     0 0   my $client = shift;
588 0           my ($class) = @_;
589             $client->{current_abilities}
590 0           = [ grep { $_ ne $class } @{ $client->{current_abilities} } ];
  0            
  0            
591 0 0         if ( !@{ $client->{current_abilities} } ) {
  0            
592 0           $client->restore_full_abilities;
593             }
594             }
595              
596             sub work_on {
597 0     0 1   my TheSchwartz $client = shift;
598 0           my $hstr = shift; # Handle string
599 0 0         my $job = $client->lookup_job($hstr)
600             or return 0;
601 0           return $client->work_once($job);
602             }
603              
604             sub grab_and_work_on {
605 0     0 1   my TheSchwartz $client = shift;
606 0           my $hstr = shift; # Handle string
607 0 0         my $job = $client->lookup_job($hstr)
608             or return 0;
609              
610             ## check that the job is grabbable
611 0           my $hashdsn = $job->handle->dsn_hashed;
612 0           my $driver = $client->driver_for($hashdsn);
613 0           my $current_time = $client->get_server_time($driver);
614 0 0         return 0 if $current_time < $job->grabbed_until;
615              
616             ## grab the job the usual way
617 0 0         $job = $client->_grab_a_job( $hashdsn, $job )
618             or return 0;
619              
620 0           return $client->work_once($job);
621             }
622              
623             sub work {
624 0     0 1   my TheSchwartz $client = shift;
625 0           my ($delay) = @_;
626 0   0       $delay ||= 5;
627 0           while (1) {
628 0 0         sleep $delay unless $client->work_once;
629             }
630             }
631              
632             sub work_until_done {
633 0     0 1   my TheSchwartz $client = shift;
634 0           while (1) {
635 0 0         $client->work_once or last;
636             }
637             }
638              
639             ## Returns true if it did something, false if no jobs were found
640             sub work_once {
641 0     0 1   my TheSchwartz $client = shift;
642 0           my $job = shift; # optional specific job to work on
643              
644             ## Look for a job with our current set of abilities. Note that the
645             ## list of current abilities may not be equal to the full set of
646             ## abilities, to allow for even distribution between jobs.
647 0   0       $job ||= $client->find_job_for_workers;
648              
649             ## If we didn't find anything, restore our full abilities, and try
650             ## again.
651 0 0 0       if ( !$job
      0        
652             && !$client->{strict_remove_ability}
653 0           && @{ $client->{current_abilities} } < @{ $client->{all_abilities} } )
  0            
654             {
655 0           $client->restore_full_abilities;
656 0           $job = $client->find_job_for_workers;
657             }
658              
659 0 0         my $class = $job ? $job->funcname : undef;
660 0 0         if ($job) {
661 0 0         my $priority = $job->priority ? ", priority " . $job->priority : "";
662 0           $job->debug(
663             "TheSchwartz::work_once got job of class '$class'$priority");
664             }
665             else {
666 0           $client->debug("TheSchwartz::work_once found no jobs");
667             }
668              
669             ## If we still don't have anything, return.
670 0 0         return unless $job;
671              
672             ## Now that we found a job for this particular funcname, remove it
673             ## from our list of current abilities. So the next time we look for a
674             ## we'll find a job for a different funcname. This prevents starvation of
675             ## high funcid values because of the way MySQL's indexes work.
676             ## BUGBUG this looks odd since ordering by job_id should limit any skew ...
677 0 0         $client->temporarily_remove_ability($class) unless($client->{strict_remove_ability});
678              
679 0           $class->work_safely($job);
680              
681             ## We got a job, so return 1 so work_until_done (which calls this method)
682             ## knows to keep looking for jobs.
683 0           return 1;
684             }
685              
686             sub funcid_to_name {
687 0     0 0   my TheSchwartz $client = shift;
688 0           my ( $driver, $hashdsn, $funcid ) = @_;
689 0           my $cache = $client->_funcmap_cache($hashdsn);
690 0           return $cache->{funcid2name}{$funcid};
691             }
692              
693             sub funcname_to_id {
694 0     0 0   my TheSchwartz $client = shift;
695 0           my ( $driver, $hashdsn, $funcname ) = @_;
696 0           my $cache = $client->_funcmap_cache($hashdsn);
697 0 0         unless ( exists $cache->{funcname2id}{$funcname} ) {
698 0           my $map = TheSchwartz::FuncMap->create_or_find( $driver, $funcname );
699 0           $cache->{funcname2id}{ $map->funcname } = $map->funcid;
700 0           $cache->{funcid2name}{ $map->funcid } = $map->funcname;
701             }
702 0           return $cache->{funcname2id}{$funcname};
703             }
704              
705             sub _funcmap_cache {
706 0     0     my TheSchwartz $client = shift;
707 0           my ($hashdsn) = @_;
708 0 0         unless ( exists $client->{funcmap_cache}{$hashdsn} ) {
709 0           my $driver = $client->driver_for($hashdsn);
710 0           my @maps = $driver->search('TheSchwartz::FuncMap');
711 0           my $cache = { funcname2id => {}, funcid2name => {} };
712 0           for my $map (@maps) {
713 0           $cache->{funcname2id}{ $map->funcname } = $map->funcid;
714 0           $cache->{funcid2name}{ $map->funcid } = $map->funcname;
715             }
716 0           $client->{funcmap_cache}{$hashdsn} = $cache;
717             }
718 0           return $client->{funcmap_cache}{$hashdsn};
719             }
720              
721             # accessors
722              
723             sub verbose {
724 0     0 1   my TheSchwartz $client = shift;
725 0           return $client->{verbose};
726             }
727              
728             sub set_verbose {
729 0     0 1   my TheSchwartz $client = shift;
730 0           my $logger = shift; # or non-coderef to just print to stderr
731 0 0 0       if ( $logger && ref $logger ne "CODE" ) {
732             $logger = sub {
733 0     0     my $msg = shift;
734 0           $msg =~ s/\s+$//;
735 0           print STDERR "$msg\n";
736 0           };
737             }
738 0           $client->{verbose} = $logger;
739             }
740              
741             sub scoreboard {
742 0     0 1   my TheSchwartz $client = shift;
743              
744 0           return $client->{scoreboard};
745             }
746              
747             sub set_scoreboard {
748 0     0 1   my TheSchwartz $client = shift;
749 0           my ($dir) = @_;
750              
751 0 0         return unless $dir;
752              
753             # They want the scoreboard but don't care where it goes
754 0 0 0       if ( ( $dir eq '1' ) or ( $dir eq 'on' ) ) {
755              
756             # Find someplace in tmpfs to save this
757 0           foreach my $d (qw(/var/run /dev/shm)) {
758 0           $dir = $d;
759 0 0         last if -e $dir;
760             }
761             }
762              
763 0           $dir .= '/theschwartz';
764 0 0         unless ( -e $dir ) {
765 0 0         mkdir( $dir, 0755 )
766             or die "Can't create scoreboard directory '$dir': $!";
767             }
768              
769 0           $client->{scoreboard} = $dir . "/scoreboard.$$";
770             }
771              
772             sub start_scoreboard {
773 0     0 1   my TheSchwartz $client = shift;
774              
775             # Don't do anything if we're not configured to write to the scoreboard
776 0           my $scoreboard = $client->scoreboard;
777 0 0         return unless $scoreboard;
778              
779             # Don't do anything of (for some reason) we don't have a current job
780 0           my $job = $client->current_job;
781 0 0         return unless $job;
782              
783 0           my $class = $job->funcname;
784              
785 0 0         open( my $SB, '>', $scoreboard )
786             or $job->debug("Could not write scoreboard '$scoreboard': $!");
787 0   0       print $SB join(
      0        
788             "\n",
789             ( "pid=$$",
790             'funcname=' . ( $class || '' ),
791             'started=' . ( $job->grabbed_until - ( $class->grab_for || 1 ) ),
792             'arg=' . _serialize_args( $job->arg ),
793             )
794             ),
795             "\n";
796 0           close($SB);
797              
798 0           return;
799             }
800              
801             # Quick and dirty serializer. Don't use Data::Dumper because we don't need to
802             # recurse indefinitely and we want to truncate the output produced
803             sub _serialize_args {
804 0     0     my ($args) = @_;
805              
806 0 0         if ( ref $args ) {
807 0 0         if ( ref $args eq 'HASH' ) {
    0          
808             return join ',', map {
809 0   0       ( $_ || '' ) . '=' . substr( $args->{$_} || '', 0, 200 )
  0   0        
810             }
811             keys %$args;
812             }
813             elsif ( ref $args eq 'ARRAY' ) {
814 0   0       return join ',', map { substr( $_ || '', 0, 200 ) } @$args;
  0            
815             }
816             }
817             else {
818 0           return $args;
819             }
820             }
821              
822             sub end_scoreboard {
823 0     0 1   my TheSchwartz $client = shift;
824              
825             # Don't do anything if we're not configured to write to the scoreboard
826 0           my $scoreboard = $client->scoreboard;
827 0 0         return unless $scoreboard;
828              
829 0           my $job = $client->current_job;
830              
831 0 0         open( my $SB, '>>', $scoreboard )
832             or $job->debug("Could not append scoreboard '$scoreboard': $!");
833 0           print $SB "done=" . time . "\n";
834 0           close($SB);
835              
836 0           return;
837             }
838              
839             sub clean_scoreboard {
840 0     0 1   my TheSchwartz $client = shift;
841              
842             # Don't do anything if we're not configured to write to the scoreboard
843 0           my $scoreboard = $client->scoreboard;
844 0 0         return unless $scoreboard;
845              
846 0           unlink($scoreboard);
847             }
848              
849             sub prioritize {
850 0     0 1   my TheSchwartz $client = shift;
851 0           return $client->{prioritize};
852             }
853              
854             sub set_prioritize {
855 0     0 1   my TheSchwartz $client = shift;
856 0           $client->{prioritize} = shift;
857             }
858              
859             sub floor {
860 0     0 1   my TheSchwartz $client = shift;
861 0           return $client->{floor};
862             }
863              
864             sub set_floor {
865 0     0 1   my TheSchwartz $client = shift;
866 0 0         die "set_floor only works if prioritize is set."
867             unless ( $client->prioritize );
868 0           $client->{floor} = shift;
869             }
870              
871             sub batch_size {
872 0     0 1   my TheSchwartz $client = shift;
873 0           return $client->{batch_size};
874             }
875              
876             sub set_batch_size {
877 0     0 1   my TheSchwartz $client = shift;
878 0           $client->{batch_size} = shift;
879             }
880              
881             # current job being worked. so if something dies, work_safely knows which to mark as dead.
882             sub current_job {
883 0     0 0   my TheSchwartz $client = shift;
884 0           $client->{current_job};
885             }
886              
887             sub set_current_job {
888 0     0 0   my TheSchwartz $client = shift;
889 0           $client->{current_job} = shift;
890             }
891              
892             sub strict_remove_ability {
893 0     0 1   my TheSchwartz $client = shift;
894 0           return $client->{strict_remove_ability};
895             }
896              
897             sub set_strict_remove_ability {
898 0     0 1   my TheSchwartz $client = shift;
899 0           $client->{strict_remove_ability} = shift;
900             }
901              
902             DESTROY {
903 0     0     foreach my $arg (@_) {
904              
905             # Call 'clean_scoreboard' on TheSchwartz objects
906 0 0 0       if ( ref($arg) and $arg->isa('TheSchwartz') ) {
907 0           $arg->clean_scoreboard;
908             }
909             }
910             }
911              
912             1;
913              
914             __END__