File Coverage

blib/lib/TheSchwartz/Moosified.pm
Criterion Covered Total %
statement 36 308 11.6
branch 0 92 0.0
condition 0 37 0.0
subroutine 12 41 29.2
pod 8 21 38.1
total 56 499 11.2


line stmt bran cond sub pod time code
1             package TheSchwartz::Moosified;
2              
3 2     2   30032 use Moose;
  2         648867  
  2         13  
4 2     2   9240 use Moose::Util::TypeConstraints;
  2         4  
  2         18  
5 2     2   2329 use Carp;
  2         7  
  2         126  
6 2     2   8 use Scalar::Util qw( refaddr );
  2         2  
  2         74  
7 2     2   8 use List::Util qw( shuffle );
  2         2  
  2         90  
8 2     2   7 use File::Spec ();
  2         2  
  2         23  
9 2     2   1304 use Storable ();
  2         4774  
  2         54  
10 2     2   764 use TheSchwartz::Moosified::Utils qw/insert_id sql_for_unixtime bind_param_attr run_in_txn order_by_priority/;
  2         3  
  2         111  
11 2     2   713 use TheSchwartz::Moosified::Job;
  2         7  
  2         73  
12 2     2   17 use TheSchwartz::Moosified::JobHandle;
  2         2  
  2         6632  
13              
14             our $VERSION = '0.05_005';
15             our $AUTHORITY = 'cpan:FAYLAND';
16              
17             ## Number of jobs to fetch at a time in find_job_for_workers.
18             our $FIND_JOB_BATCH_SIZE = 50;
19              
20             # subtype-s
21             my $sub_verbose = sub {
22             my $msg = shift;
23             $msg =~ s/\s+$//;
24             print STDERR "$msg\n";
25             };
26             subtype 'TheSchwartz.Moosified.Verbose'
27             => as 'CodeRef'
28             => where { 1; };
29             coerce 'TheSchwartz.Moosified.Verbose'
30             => from 'Int'
31             => via {
32             if ($_) {
33             return $sub_verbose;
34             } else {
35             return sub { 0 };
36             }
37             };
38              
39             has 'verbose' => ( is => 'rw', isa => 'TheSchwartz.Moosified.Verbose', coerce => 1, default => 0 );
40             has 'prioritize' => ( is => 'rw', isa => 'Bool', default => 0 );
41              
42             has 'retry_seconds' => (is => 'rw', isa => 'Int', default => 30);
43             has 'retry_at' => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
44              
45             has 'databases' => (
46             is => 'rw',
47             isa => 'ArrayRef',
48             default => sub { [] },
49             );
50              
51             has 'all_abilities' => ( is => 'rw', isa => 'ArrayRef', default => sub { [] } );
52             has 'current_abilities' => ( is => 'rw', isa => 'ArrayRef', default => sub { [] } );
53             has 'current_job' => ( is => 'rw', isa => 'Object' );
54              
55             has 'funcmap_cache' => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
56              
57             has 'scoreboard' => (
58             is => 'rw',
59             isa => 'Str',
60             trigger => \&_trigger_scoreboard,
61             );
62              
63             has 'prefix' => ( is => 'rw', isa => 'Str', default => '' );
64             has 'error_length' => ( is => 'rw', isa => 'Int', default => 255 );
65              
66             sub debug {
67 0     0 0   my $self = shift;
68            
69 0 0         return unless $self->verbose;
70 0           $self->verbose->(@_);
71             }
72              
73             sub shuffled_databases {
74 0     0 0   my $self = shift;
75 0           return shuffle( @{ $self->databases } );
  0            
76             }
77              
78             sub _try_insert {
79 0     0     my $self = shift;
80 0           my $job = shift;
81 0           my $dbh = shift;
82              
83 0           $job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) );
84              
85             run_in_txn {
86 0     0     $job->insert_time(time());
87              
88 0           my $row = $job->as_hashref;
89 0 0 0       if ($dbh->{Driver}{Name} && $dbh->{Driver}{Name} eq 'Pg') {
90 0           delete $row->{jobid};
91             }
92 0           my @col = keys %$row;
93              
94 0           my $table_job = $self->prefix . 'job';
95 0           my $sql = sprintf 'INSERT INTO %s (%s) VALUES (%s)',
96             $table_job, join( ", ", @col ), join( ", ", ("?") x @col );
97              
98 0           my $sth = $dbh->prepare_cached($sql);
99 0           my $i = 1;
100 0           for my $col (@col) {
101             $sth->bind_param(
102             $i++,
103 0           $row->{$col},
104             bind_param_attr( $dbh, $col ),
105             );
106             }
107 0           $sth->execute();
108              
109 0           my $jobid = insert_id( $dbh, $sth, $table_job, "jobid" );
110 0           $job->jobid($jobid);
111 0           } $dbh;
112             }
113              
114             sub insert {
115 0     0 1   my $self = shift;
116              
117 0           my $job;
118 0 0         if ( ref $_[0] eq 'TheSchwartz::Moosified::Job' ) {
119 0           $job = $_[0];
120             }
121             else {
122 0           $job = TheSchwartz::Moosified::Job->new(funcname => $_[0], arg => $_[1]);
123             }
124 0 0         $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg;
125              
126 0           for my $dbh ( $self->shuffled_databases ) {
127 0           eval {
128 0           $self->_try_insert($job,$dbh);
129             };
130 0 0         $self->debug("insert failed: $@") if $@;
131              
132 0 0         next unless $job->jobid;
133              
134             ## We inserted the job successfully!
135             ## Attach a handle to the job, and return the handle.
136 0           my $handle = TheSchwartz::Moosified::JobHandle->new({
137             dbh => $dbh,
138             client => $self,
139             jobid => $job->jobid
140             });
141 0           $job->handle($handle);
142 0           return $handle;
143             }
144              
145 0           return;
146             }
147              
148             sub find_job_for_workers {
149 0     0 1   my $client = shift;
150              
151 0           my ($worker_classes) = @_;
152 0   0       $worker_classes ||= $client->{current_abilities};
153            
154 0 0         return unless (scalar @$worker_classes);
155              
156 0           my $limit = $FIND_JOB_BATCH_SIZE;
157              
158 0           for my $dbh ( $client->shuffled_databases ) {
159              
160 0           my $unixtime = sql_for_unixtime($dbh);
161 0 0         my $order_by = $client->prioritize ? order_by_priority($dbh) : '';
162              
163 0           my @jobs;
164 0           eval {
165             ## Search for jobs in this database where:
166             ## 1. funcname is in the list of abilities this $client supports;
167             ## 2. the job is scheduled to be run (run_after is in the past);
168             ## 3. no one else is working on the job (grabbed_until is in
169             ## in the past).
170 0           my @ids = map { $client->funcname_to_id( $dbh, $_ ) }
  0            
171             @$worker_classes;
172              
173 0           my $ids = join(',', @ids);
174 0           my $table_job = $client->prefix . 'job';
175 0           my $sql = qq~SELECT * FROM $table_job WHERE funcid IN ($ids) AND run_after <= $unixtime AND grabbed_until <= $unixtime $order_by LIMIT $limit~;
176              
177 0           my $sth = $dbh->prepare_cached($sql);
178 0           $sth->execute();
179 0           while ( my $ref = $sth->fetchrow_hashref ) {
180 0           my $job = TheSchwartz::Moosified::Job->new( $ref );
181 0           push @jobs, $job;
182             }
183 0           $sth->finish;
184             };
185             # if ($@) {
186             #
187             # }
188              
189 0           my $job = $client->_grab_a_job($dbh, @jobs);
190 0 0         return $job if $job;
191             }
192             }
193              
194             sub get_server_time {
195 0     0 0   my ( $client, $dbh ) = @_;
196 0           my $unixtime_sql = sql_for_unixtime($dbh);
197 0           return $dbh->selectrow_array("SELECT $unixtime_sql");
198             }
199              
200             sub _grab_a_job {
201 0     0     my ( $client, $dbh, @jobs ) = @_;
202              
203             ## Got some jobs! Randomize them to avoid contention between workers.
204 0           @jobs = shuffle(@jobs);
205              
206             JOB:
207 0           while (my $job = shift @jobs) {
208             ## Convert the funcid to a funcname, based on this database's map.
209 0           $job->funcname( $client->funcid_to_name($dbh, $job->funcid) );
210              
211             ## Update the job's grabbed_until column so that
212             ## no one else takes it.
213 0           my $worker_class = $job->funcname;
214 0           my $old_grabbed_until = $job->grabbed_until;
215            
216 0 0         my $server_time = $client->get_server_time($dbh)
217             or die "expected a server time";
218            
219 0   0       my $new_grabbed_until = $server_time + ($worker_class->grab_for || 1);
220              
221             # Prevent a condition that could result in more than one client
222             # grabbing the same job b/c it doesn't look grabbed
223 0 0         next JOB if ($new_grabbed_until == $old_grabbed_until);
224              
225             ## Update the job in the database, and end the transaction.
226              
227 0           my $table_job = $client->prefix . 'job';
228 0           my $sql = qq~UPDATE $table_job SET grabbed_until = ? WHERE jobid = ? AND grabbed_until = ?~;
229 0           my $sth = $dbh->prepare($sql);
230 0           $sth->execute($new_grabbed_until, $job->jobid, $old_grabbed_until);
231 0           my $rows = $sth->rows;
232              
233 0 0         next JOB unless $rows == 1;
234            
235 0           $job->grabbed_until( $new_grabbed_until );
236              
237             ## Now prepare the job, and return it.
238 0           my $handle = TheSchwartz::Moosified::JobHandle->new({
239             dbh => $dbh,
240             client => $client,
241             jobid => $job->jobid,
242             });
243 0           $job->handle($handle);
244 0           return $job;
245             }
246              
247 0           return undef;
248             }
249              
250             sub list_jobs {
251 0     0 0   my ( $self, $arg ) = @_;
252              
253 0 0         die "No funcname" unless exists $arg->{funcname};
254              
255 0           my @options;
256             push @options, {
257             key => 'run_after',
258             op => '<=',
259             value => $arg->{run_after}
260 0 0         } if exists $arg->{run_after};
261             push @options, {
262             key => 'grabbed_until',
263             op => '<=',
264             value => $arg->{grabbed_until}
265 0 0         } if exists $arg->{grabbed_until};
266              
267 0 0         if ( $arg->{coalesce} ) {
268 0   0       $arg->{coalesce_op} ||= '=';
269             push @options, {
270             key => 'coalesce',
271             op => $arg->{coalesce_op},
272             value => $arg->{coalesce}
273 0           };
274             }
275            
276 0   0       my $limit = $arg->{limit} || $FIND_JOB_BATCH_SIZE;
277              
278 0           my @jobs;
279 0           for my $dbh ( $self->shuffled_databases ) {
280 0 0         my $order_by = $self->prioritize ? order_by_priority($dbh) : '';
281              
282 0           eval {
283            
284 0           my ($funcid, $funcop);
285 0 0         if ( ref($arg->{funcname}) ) { # ARRAYREF
286 0           $funcid = '(' . join(',', map { $self->funcname_to_id($dbh, $_) } @{$arg->{funcname}}) . ')';
  0            
  0            
287 0           $funcop = 'IN';
288             } else {
289 0           $funcid = $self->funcname_to_id($dbh, $arg->{funcname});
290 0           $funcop = '=';
291             }
292              
293 0           my $table_job = $self->prefix . 'job';
294 0           my $sql = qq~SELECT * FROM $table_job WHERE funcid $funcop $funcid~;
295 0           my @value = ();
296 0           for (@options) {
297 0           $sql .= " AND $_->{key} $_->{op} ?";
298 0           push @value, $_->{value};
299             }
300 0           $sql .= qq~ $order_by LIMIT $limit~;
301              
302 0           my $sth = $dbh->prepare_cached($sql);
303 0           $sth->execute(@value);
304 0           while ( my $ref = $sth->fetchrow_hashref ) {
305 0           $ref->{funcname} = $self->funcid_to_name($dbh, $ref->{funcid});
306 0           my $job = TheSchwartz::Moosified::Job->new( $ref );
307 0 0         if ($arg->{want_handle}) {
308 0           my $handle = TheSchwartz::Moosified::JobHandle->new({
309             dbh => $dbh,
310             client => $self,
311             jobid => $job->jobid
312             });
313 0           $job->handle($handle);
314             }
315 0           push @jobs, $job;
316             }
317 0           $sth->finish;
318             };
319             }
320              
321 0           return @jobs;
322             }
323              
324             sub find_job_with_coalescing_prefix {
325 0     0 1   my $client = shift;
326 0           my ($funcname, $coval) = @_;
327 0           $coval .= "%";
328 0           return $client->_find_job_with_coalescing('LIKE', $funcname, $coval);
329             }
330              
331             sub find_job_with_coalescing_value {
332 0     0 1   my $client = shift;
333 0           return $client->_find_job_with_coalescing('=', @_);
334             }
335              
336             sub _find_job_with_coalescing {
337 0     0     my $client = shift;
338 0           my ($op, $funcname, $coval) = @_;
339              
340 0           my $limit = $FIND_JOB_BATCH_SIZE;
341 0 0         my $order_by = $client->prioritize ? 'ORDER BY priority DESC' : '';
342              
343 0           for my $dbh ( $client->shuffled_databases ) {
344 0           my $unixtime = sql_for_unixtime($dbh);
345              
346 0           my @jobs;
347 0           eval {
348             ## Search for jobs in this database where:
349             ## 1. funcname is in the list of abilities this $client supports;
350             ## 2. the job is scheduled to be run (run_after is in the past);
351             ## 3. no one else is working on the job (grabbed_until is in
352             ## in the past).
353 0           my $funcid = $client->funcname_to_id($dbh, $funcname);
354              
355 0           my $table_job = $client->prefix . 'job';
356 0           my $sql = qq~SELECT * FROM $table_job WHERE funcid = ? AND run_after <= $unixtime AND grabbed_until <= $unixtime AND coalesce $op ? $order_by LIMIT $limit~;
357 0           my $sth = $dbh->prepare_cached($sql);
358 0           $sth->execute( $funcid, $coval );
359 0           while ( my $ref = $sth->fetchrow_hashref ) {
360 0           $ref->{funcname} = $funcname;
361 0           my $job = TheSchwartz::Moosified::Job->new( $ref );
362 0           push @jobs, $job;
363             }
364 0           $sth->finish;
365             };
366             # if ($@) {
367             #
368             # }
369              
370 0           my $job = $client->_grab_a_job($dbh, @jobs);
371 0 0         return $job if $job;
372             }
373             }
374              
375             sub can_do {
376 0     0 1   my $client = shift;
377 0           my ($class) = @_;
378 0           push @{ $client->{all_abilities} }, $class;
  0            
379 0           push @{ $client->{current_abilities} }, $class;
  0            
380             }
381              
382             sub reset_abilities {
383 0     0 0   my $client = shift;
384 0           $client->{all_abilities} = [];
385 0           $client->{current_abilities} = [];
386             }
387              
388             sub restore_full_abilities {
389 0     0 0   my $client = shift;
390 0           $client->{current_abilities} = [ @{ $client->{all_abilities} } ];
  0            
391             }
392              
393             sub temporarily_remove_ability {
394 0     0 0   my $client = shift;
395 0           my ($class) = @_;
396             $client->{current_abilities} = [
397 0           grep { $_ ne $class } @{ $client->{current_abilities} }
  0            
  0            
398             ];
399 0 0         if (!@{ $client->{current_abilities} }) {
  0            
400 0           $client->restore_full_abilities;
401             }
402             }
403              
404             sub work {
405 0     0 1   my $client = shift;
406 0           my ($delay) = @_;
407 0   0       $delay ||= 5;
408 0           while (1) {
409 0 0         sleep $delay unless $client->work_once;
410             }
411             }
412              
413             sub work_until_done {
414 0     0 1   my $client = shift;
415 0           while (1) {
416 0 0         $client->work_once or last;
417             }
418             }
419              
420             ## Returns true if it did something, false if no jobs were found
421             sub work_once {
422 0     0 1   my $client = shift;
423 0           my $job = shift; # optional specific job to work on
424              
425             ## Look for a job with our current set of abilities. Note that the
426             ## list of current abilities may not be equal to the full set of
427             ## abilities, to allow for even distribution between jobs.
428 0   0       $job ||= $client->find_job_for_workers;
429              
430             ## If we didn't find anything, restore our full abilities, and try
431             ## again.
432 0 0 0       if (!$job &&
433 0           @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
  0            
434 0           $client->restore_full_abilities;
435 0           $job = $client->find_job_for_workers;
436             }
437              
438 0 0         my $class = $job ? $job->funcname : undef;
439 0 0         if ($job) {
440 0 0         my $priority = $job->priority ? ", priority " . $job->priority : "";
441 0           $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
442             } else {
443 0           $client->debug("TheSchwartz::work_once found no jobs");
444             }
445              
446             ## If we still don't have anything, return.
447 0 0         return unless $job;
448              
449             ## Now that we found a job for this particular funcname, remove it
450             ## from our list of current abilities. So the next time we look for a
451             ## we'll find a job for a different funcname. This prevents starvation of
452             ## high funcid values because of the way MySQL's indexes work.
453 0           $client->temporarily_remove_ability($class);
454              
455 0           $class->work_safely($job);
456              
457             ## We got a job, so return 1 so work_until_done (which calls this method)
458             ## knows to keep looking for jobs.
459 0           return 1;
460             }
461              
462             sub funcid_to_name {
463 0     0 0   my ( $client, $dbh, $funcid ) = @_;
464 0           my $cache = $client->_funcmap_cache($dbh);
465 0           return $cache->{funcid2name}{$funcid};
466             }
467              
468             sub funcname_to_id {
469 0     0 0   my ( $self, $dbh, $funcname ) = @_;
470              
471 0           my $dbid = refaddr $dbh;
472 0           my $cache = $self->_funcmap_cache($dbh);
473 0           my $table_funcmap = $self->prefix . 'funcmap';
474              
475 0 0         unless ( exists $cache->{funcname2id}{$funcname} ) {
476 0           my $id;
477 0           eval {
478             run_in_txn {
479             ## This might fail in a race condition since funcname is UNIQUE
480 0     0     my $sth = $dbh->prepare_cached(
481             "INSERT INTO $table_funcmap (funcname) VALUES (?)");
482 0           $sth->execute($funcname);
483              
484 0           $id = insert_id( $dbh, $sth, $table_funcmap, "funcid" );
485 0           } $dbh;
486             };
487              
488             ## If we got an exception, try to load the record again
489 0 0         if ($@) {
490 0           my $sth = $dbh->prepare_cached(
491             "SELECT funcid FROM $table_funcmap WHERE funcname = ?");
492 0           $sth->execute($funcname);
493 0           ($id) = $sth->fetchrow_array;
494 0           $sth->finish;
495 0 0         croak "Can't find or create funcname $funcname: $@" unless $id;
496             }
497              
498 0           $cache->{funcname2id}{ $funcname } = $id;
499 0           $cache->{funcid2name}{ $id } = $funcname;
500 0           $self->funcmap_cache->{$dbid} = $cache;
501             }
502              
503 0           $cache->{funcname2id}{$funcname};
504             }
505              
506             sub _funcmap_cache {
507 0     0     my ( $client, $dbh ) = @_;
508 0           my $dbid = refaddr $dbh;
509 0           my $table_funcmap = $client->prefix . 'funcmap';
510 0 0         unless ( exists $client->funcmap_cache->{$dbid} ) {
511 0           my $cache = { funcname2id => {}, funcid2name => {} };
512 0           my $sth = $dbh->prepare_cached("SELECT funcid, funcname FROM $table_funcmap");
513 0           $sth->execute;
514 0           while ( my $row = $sth->fetchrow_arrayref ) {
515 0           $cache->{funcname2id}{ $row->[1] } = $row->[0];
516 0           $cache->{funcid2name}{ $row->[0] } = $row->[1];
517             }
518 0           $sth->finish;
519 0           $client->funcmap_cache->{$dbid} = $cache;
520             }
521 0           return $client->funcmap_cache->{$dbid};
522             }
523              
524             sub _trigger_scoreboard {
525 0     0     my ($self, $dir) = @_;
526            
527 0 0         return unless $dir;
528 0 0         return if (-f $dir); # no endless loop
529              
530             # They want the scoreboard but don't care where it goes
531 0 0 0       if (($dir eq '1') or ($dir eq 'on')) {
532 0           $dir = File::Spec->tmpdir();
533             }
534              
535 0           $self->{scoreboard} = $dir."/theschwartz.scoreboard.$$";
536             }
537              
538             sub start_scoreboard {
539 0     0 0   my $client = shift;
540              
541             # Don't do anything if we're not configured to write to the scoreboard
542 0           my $scoreboard = $client->scoreboard;
543 0 0         return unless $scoreboard;
544              
545             # Don't do anything of (for some reason) we don't have a current job
546 0           my $job = $client->current_job;
547 0 0         return unless $job;
548              
549 0           my $class = $job->funcname;
550              
551 0 0         open(my $sb, '>', $scoreboard)
552             or $job->debug("Could not write scoreboard '$scoreboard': $!");
553 0   0       print $sb join("\n", ("pid=$$",
      0        
554             'funcname='.($class||''),
555             'started='.($job->grabbed_until-($class->grab_for||1)),
556             'arg='._serialize_args($job->arg),
557             )
558             ), "\n";
559 0           close($sb);
560              
561 0           return;
562             }
563              
564             # Quick and dirty serializer. Don't use Data::Dumper because we don't need to
565             # recurse indefinitely and we want to truncate the output produced
566             sub _serialize_args {
567 0     0     my ($args) = @_;
568              
569 0 0         if (ref $args) {
570 0 0         if (ref $args eq 'HASH') {
    0          
571             return join ',',
572 0   0       map { ($_||'').'='.substr($args->{$_}||'', 0, 200) }
  0   0        
573             keys %$args;
574             } elsif (ref $args eq 'ARRAY') {
575             return join ',',
576 0   0       map { substr($_||'', 0, 200) }
  0            
577             @$args;
578             }
579             } else {
580 0           return $args;
581             }
582             }
583              
584             sub end_scoreboard {
585 0     0 0   my $client = shift;
586              
587             # Don't do anything if we're not configured to write to the scoreboard
588 0           my $scoreboard = $client->scoreboard;
589 0 0         return unless $scoreboard;
590              
591 0           my $job = $client->current_job;
592              
593 0 0         open(my $sb, '>>', $scoreboard)
594             or $client->debug("Could not append scoreboard '$scoreboard': $!");
595 0           print $sb "done=".time."\n";
596 0           close($sb);
597              
598 0           return;
599             }
600              
601             sub clean_scoreboard {
602 0     0 0   my $client = shift;
603              
604             # Don't do anything if we're not configured to write to the scoreboard
605 0           my $scoreboard = $client->scoreboard;
606 0 0         return unless $scoreboard;
607              
608 0           unlink($scoreboard);
609             }
610              
611             sub DEMOLISH {
612 0     0 0   foreach my $arg (@_) {
613             # Call 'clean_scoreboard' on TheSchwartz objects
614 0 0 0       if (ref($arg) and $arg->isa('TheSchwartz::Moosified')) {
615 0           $arg->clean_scoreboard;
616             }
617             }
618             }
619              
620 2     2   16 no Moose;
  2         3  
  2         15  
621 2     2   693 no Moose::Util::TypeConstraints;
  2         3  
  2         19  
622             1; # End of TheSchwartz::Moosified
623             __END__
624              
625             =head1 NAME
626              
627             TheSchwartz::Moosified - TheSchwartz based on Moose!
628              
629             =head1 SYNOPSIS
630              
631             use TheSchwartz::Moosified;
632              
633             my $client = TheSchwartz::Moosified->new();
634             $client->databases([$dbh]);
635            
636             # rest are the same as TheSchwartz
637            
638             # in some place we insert job into TheSchwartz::Moosified
639             # in another place we run this job
640            
641             # 1, insert job in cgi/Catalyst
642             use TheSchwartz::Moosified;
643             my $client = TheSchwartz::Moosified->new();
644             $client->databases([$dbh]);
645             $client->insert('My::Worker::A', { args1 => 1, args2 => 2 } );
646            
647             # 2, defined the heavy things in My::Worker::A
648             package My::Worker::A;
649             use base 'TheSchwartz::Moosified::Worker';
650             sub work {
651             my ($class, $job) = @_;
652            
653             # $job is an instance of TheSchwartz::Moosified::Job
654             my $args = $job->args;
655             # do heavy things like resize photos, add 1 to 2 etc.
656             $job->completed;
657             }
658            
659             # 3, run the worker in a non-stop script
660             use TheSchwartz::Moosified;
661             my $client = TheSchwartz::Moosified->new();
662             $client->databases([$dbh]);
663             $client->can_do('My::Worker::A');
664             $client->work();
665              
666             =head1 DESCRIPTION
667              
668             L<TheSchwartz> is a powerful job queue. This module is a Moose implemention.
669              
670             read more on L<TheSchwartz>
671              
672             =head1 SETTING
673              
674             =over 4
675              
676             =item * C<databases>
677              
678             Databases containing TheSchwartz jobs, shuffled before each use.
679              
680             my $dbh1 = DBI->conncet(@dbi_info);
681             my $dbh2 = $schema->storage->dbh;
682             my $client = TheSchwartz::Moosified->new( databases => [ $dbh1, $dbh2 ] );
683            
684             # or
685             my $client = TheSchwartz::Moosified->new();
686             $client->databases( [ $dbh1, $dbh2 ] );
687              
688             =item * C<verbose>
689              
690             controls debug logging.
691              
692             my $client = TheSchwartz::Moosified->new( verbose => 1 );
693            
694             # or
695             my $client = TheSchwartz::Moosified->new();
696             $client->verbose( 1 );
697             $client->verbose( sub {
698             my $msg = shift;
699             print STDERR "[INFO] $msg\n";
700             } );
701              
702             =item * C<prefix>
703              
704             optional prefix for tables. compatible with L<TheSchwartz::Simple>
705              
706             my $client = TheSchwartz::Moosified->new( prefix => 'theschwartz_' );
707              
708             =item * C<scoreboard>
709              
710             save job info to file. by default, the file will be saved at $tmpdir/theschwartz/scoreboard.$$
711              
712             my $client = TheSchwartz::Moosified->new( scoreboard => 1 );
713            
714             # or
715             my $client = TheSchwartz::Moosified->new();
716             # be sure the file is there
717             $client->scoreboard( "/home/fayland/theschwartz/scoreboard.log" );
718              
719             =item * C<error_length>
720              
721             optional, defaults to 255. Messages logged to the C<failure_log> (the
722             C<error> table) are truncated to this length. Setting this to zero means no
723             truncation (although the database you are using may truncate this for you).
724              
725             =back
726              
727             =head1 POSTING JOBS
728              
729             The methods of TheSchwartz clients used by applications posting jobs to the
730             queue are:
731              
732             =head2 C<$client-E<gt>insert( $job )>
733              
734             Adds the given C<TheSchwartz::Job> to one of the client's job databases.
735              
736             =head2 C<$client-E<gt>insert( $funcname, $arg )>
737              
738             Adds a new job with funcname C<$funcname> and arguments C<$arg> to the queue.
739              
740             =head1 WORKING
741              
742             The methods of TheSchwartz clients for use in worker processes are:
743              
744             =head2 C<$client-E<gt>can_do( $ability )>
745              
746             Adds C<$ability> to the list of abilities C<$client> is capable of performing.
747             Subsequent calls to that client's C<work> methods will find jobs requiring the
748             given ability.
749              
750             =head2 C<$client-E<gt>work_once()>
751              
752             Find and perform one job C<$client> can do.
753              
754             =head2 C<$client-E<gt>work_until_done()>
755              
756             Find and perform jobs C<$client> can do until no more such jobs are found in
757             any of the client's job databases.
758              
759             =head2 C<$client-E<gt>work( [$delay] )>
760              
761             Find and perform any jobs C<$client> can do, forever. When no job is available,
762             the working process will sleep for C<$delay> seconds (or 5, if not specified)
763             before looking again.
764              
765             =head2 C<$client-E<gt>find_job_for_workers( [$abilities] )>
766              
767             Returns a C<TheSchwartz::Job> for a random job that the client can do. If
768             specified, the job returned matches one of the abilities in the arrayref
769             C<$abilities>, rather than C<$client>'s abilities.
770              
771             =head2 C<$client-E<gt>find_job_with_coalescing_value( $ability, $coval )>
772              
773             Returns a C<TheSchwartz::Job> for a random job for a worker capable of
774             C<$ability> and with a coalescing value of C<$coval>.
775              
776             =head2 C<$client-E<gt>find_job_with_coalescing_prefix( $ability, $coval )>
777              
778             Returns a C<TheSchwartz::Job> for a random job for a worker capable of
779             C<$ability> and with a coalescing value beginning with C<$coval>.
780              
781             Note the C<TheSchwartz> implementation of this function uses a C<LIKE> query to
782             find matching jobs, with all the attendant performance implications for your
783             job databases.
784              
785             =head1 SEE ALSO
786              
787             L<TheSchwartz>, L<TheSchwartz::Simple>
788              
789             =head1 AUTHOR
790              
791             Fayland Lam, C<< <fayland at gmail.com> >>
792              
793             Jeremy Stashewsky, C<< <jstash+cpan at gmail.com> >>
794              
795             =head1 COPYRIGHT & LICENSE
796              
797             Copyright 2008 Fayland Lam, all rights reserved.
798              
799             This program is free software; you can redistribute it and/or modify it
800             under the same terms as Perl itself.
801              
802             =cut