File Coverage

lib/MediaCloud/JobManager/Broker/RabbitMQ.pm
Criterion Covered Total %
statement 42 306 13.7
branch 0 98 0.0
condition 0 28 0.0
subroutine 14 39 35.9
pod 0 11 0.0
total 56 482 11.6


line stmt bran cond sub pod time code
1             package MediaCloud::JobManager::Broker::RabbitMQ;
2              
3             #
4             # RabbitMQ job broker (using Celery protocol)
5             #
6             # Usage:
7             #
8             # MediaCloud::JobManager::Broker::RabbitMQ->new();
9             #
10              
11 1     1   4 use strict;
  1         1  
  1         29  
12 1     1   3 use warnings;
  1         1  
  1         32  
13 1     1   4 use Modern::Perl "2012";
  1         1  
  1         8  
14              
15 1     1   120 use Moose;
  1         1  
  1         7  
16             with 'MediaCloud::JobManager::Broker';
17              
18 1     1   6109 use Net::AMQP::RabbitMQ;
  1         5422  
  1         35  
19 1     1   8 use UUID::Tiny ':std';
  1         2  
  1         226  
20 1     1   543 use Tie::Cache;
  1         2027  
  1         33  
21 1     1   719 use JSON;
  1         7138  
  1         4  
22 1     1   133 use Data::Dumper;
  1         2  
  1         45  
23 1     1   4 use Readonly;
  1         1  
  1         35  
24              
25 1     1   4 use Log::Log4perl qw(:easy);
  1         2  
  1         11  
26             Log::Log4perl->easy_init(
27                 {
28                     level => $DEBUG,
29                     utf8 => 1,
30                     layout => "%d{ISO8601} [%P]: %m%n"
31                 }
32             );
33              
34             # flush sockets after every write
35             $| = 1;
36              
37 1     1   530 use MediaCloud::JobManager;
  1         3  
  1         14  
38 1     1   3 use MediaCloud::JobManager::Job;
  1         2  
  1         3150  
39              
40             # RabbitMQ default timeout
41             Readonly my $RABBITMQ_DEFAULT_TIMEOUT => 60;
42              
43             # RabbitMQ delivery modes
44             Readonly my $RABBITMQ_DELIVERY_MODE_NONPERSISTENT => 1;
45             Readonly my $RABBITMQ_DELIVERY_MODE_PERSISTENT => 2;
46              
47             # RabbitMQ queue durability
48             Readonly my $RABBITMQ_QUEUE_TRANSIENT => 0;
49             Readonly my $RABBITMQ_QUEUE_DURABLE => 1;
50              
51             # RabbitMQ priorities
52             Readonly my %RABBITMQ_PRIORITIES => (
53                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_LOW => 0,
54                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_NORMAL => 1,
55                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_HIGH => 2,
56             );
57              
58             # JSON (de)serializer
59             my $json = JSON->new->allow_nonref->canonical->utf8;
60              
61             # RabbitMQ connection credentials
62             has '_hostname' => ( is => 'rw', isa => 'Str' );
63             has '_port' => ( is => 'rw', isa => 'Int' );
64             has '_username' => ( is => 'rw', isa => 'Str' );
65             has '_password' => ( is => 'rw', isa => 'Str' );
66             has '_vhost' => ( is => 'rw', isa => 'Str' );
67             has '_timeout' => ( is => 'rw', isa => 'Int' );
68              
69             # RabbitMQ connection pool for every connection ID (PID + credentials)
70             my %_rabbitmq_connection_for_connection_id;
71              
72             # "reply_to" queues for connection ID + function name
73             #
74             # We emulate Celery's RPC via RabbitMQ behavior in which results are being
75             # stuffed in per-client result queues and can be retrieved only by the same
76             # client that requested the job using run_remotely() or add_to_queue():
77             #
78             # http://docs.celeryproject.org/en/latest/userguide/tasks.html#rpc-result-backend-rabbitmq-qpid
79             my %_reply_to_queues_for_connection_id_function_name;
80              
81             # Memory-limited results cache for connection ID + function name
82             #
83             # When fetching messages from "reply_to" queue for a specific name,
84             # run_remotely() can't requeue messages that don't belong to a specific job ID
85             # so it has to put it somewhere. This hash of hashes serves as a backlog for
86             # unused job results.
87             #
88             # It's not ideal that some job results might get invalidated but Celery does
89             # that too (purges results deemed too old).
90             my %_results_caches_for_connection_id_function_name;
91              
92             # Limits of results cache above
93             Readonly my $RABBITMQ_RESULTS_CACHE_MAXCOUNT => 1024 * 100;
94             Readonly my $RABBITMQ_RESULTS_CACHE_MAXBYTES => 1024 * 1024 * 10;
95              
96             # Constructor
97             sub BUILD
98             {
99 0     0 0       my $self = shift;
100 0               my $args = shift;
101              
102 0   0           $self->_hostname( $args->{ hostname } // 'localhost' );
103 0   0           $self->_port( $args->{ port } // 5672 );
104 0   0           $self->_username( $args->{ username } // 'guest' );
105 0   0           $self->_password( $args->{ password } // 'guest' );
106 0               my $default_vhost = '/';
107 0   0           $self->_vhost( $args->{ vhost } // $default_vhost );
108 0   0           $self->_timeout( $args->{ timeout } // $RABBITMQ_DEFAULT_TIMEOUT );
109              
110             # Connect to the current connection ID (PID + credentials)
111 0               my $mq = $self->_mq();
112             }
113              
114             # Used to uniquely identify RabbitMQ connections (by connection credentials and
115             # PID) so that we know when to reconnect
116             sub _connection_identifier($)
117             {
118 0     0         my $self = shift;
119              
120             # Reconnect when running on a fork too
121 0               my $pid = $$;
122              
123 0               return sprintf( 'PID=%d; hostname=%s; port=%d; username: %s; password=%s; vhost=%s, timeout=%d',
124                     $pid, $self->_hostname, $self->_port, $self->_username, $self->_password, $self->_vhost, $self->_timeout );
125             }
126              
127             # Returns RabbitMQ connection handler for the current connection ID
128             sub _mq($)
129             {
130 0     0         my $self = shift;
131              
132 0               my $conn_id = $self->_connection_identifier();
133              
134 0 0             unless ( $_rabbitmq_connection_for_connection_id{ $conn_id } )
135                 {
136              
137             # Connect to RabbitMQ, open channel
138 0                   DEBUG( "Connecting to RabbitMQ (PID: $$, hostname: " .
139                           $self->_hostname . ", port: " . $self->_port . ", username: " . $self->_username . ")..." );
140              
141             # RabbitMQ might not yet be up at the time of connecting, so try for up to a minute
142 0                   my $mq;
143 0                   my $connected = 0;
144 0                   my $last_error_message;
145 0                   for ( my $retry = 0 ; $retry < 60 ; ++$retry )
146                     {
147 0                       eval {
148 0 0                         if ( $retry > 0 )
149                             {
150 0                               DEBUG( "Retrying #$retry..." );
151                             }
152              
153 0                           $mq = Net::AMQP::RabbitMQ->new();
154 0                           $mq->connect(
155                                 $self->_hostname,
156                                 {
157                                     user => $self->_username,
158                                     password => $self->_password,
159                                     port => $self->_port,
160                                     vhost => $self->_vhost,
161                                     timeout => $self->_timeout,
162                                 }
163                             );
164                         };
165 0 0                     if ( $@ )
166                         {
167 0                           $last_error_message = $@;
168 0                           WARN( "Unable to connect to RabbitMQ, will retry: $last_error_message" );
169 0                           sleep( 1 );
170                         }
171                         else
172                         {
173 0                           $connected = 1;
174 0                           last;
175                         }
176                     }
177 0 0                 unless ( $connected )
178                     {
179 0                       LOGDIE( "Unable to connect to RabbitMQ, giving up: $last_error_message" );
180                     }
181              
182 0                   my $channel_number = _channel_number();
183 0 0                 unless ( $channel_number )
184                     {
185 0                       LOGDIE( "Channel number is unset." );
186                     }
187              
188 0                   eval {
189 0                       $mq->channel_open( $channel_number );
190              
191             # Fetch one message at a time
192 0                       $mq->basic_qos( $channel_number, { prefetch_count => 1 } );
193                     };
194 0 0                 if ( $@ )
195                     {
196 0                       LOGDIE( "Unable to open channel $channel_number: $@" );
197                     }
198              
199 0                   $_rabbitmq_connection_for_connection_id{ $conn_id } = $mq;
200 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
201 0                   $_results_caches_for_connection_id_function_name{ $conn_id } = ();
202                 }
203              
204 0               return $_rabbitmq_connection_for_connection_id{ $conn_id };
205             }
206              
207             # Returns "reply_to" queue name for current connection and provided function name
208             sub _reply_to_queue($$)
209             {
210 0     0         my ( $self, $function_name ) = @_;
211              
212 0               my $conn_id = $self->_connection_identifier();
213              
214 0 0             unless ( defined $_reply_to_queues_for_connection_id_function_name{ $conn_id } )
215                 {
216 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
217                 }
218              
219 0 0             unless ( $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } )
220                 {
221 0                   my $reply_to_queue = _random_uuid();
222 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } = $reply_to_queue;
223                 }
224              
225 0               return $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name };
226             }
227              
228             # Returns reference to results cache for current connection and provided function name
229             sub _results_cache_hashref($$)
230             {
231 0     0         my ( $self, $function_name ) = @_;
232              
233 0               my $conn_id = $self->_connection_identifier();
234              
235 0 0             unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id } )
236                 {
237 0                   $_results_caches_for_connection_id_function_name{ $conn_id } = ();
238                 }
239              
240 0 0             unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } )
241                 {
242 0                   $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } = {};
243              
244 0                   tie %{ $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } }, 'Tie::Cache',
  0            
245                       {
246                         MaxCount => $RABBITMQ_RESULTS_CACHE_MAXCOUNT,
247                         MaxBytes => $RABBITMQ_RESULTS_CACHE_MAXBYTES
248                       };
249                 }
250              
251 0               return $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name };
252             }
253              
254             # Channel number we should be talking to
255             sub _channel_number()
256             {
257             # Each PID + credentials pair has its own connection so we can just use constant channel
258 0     0         return 1;
259             }
260              
261             sub _declare_queue($$$$;$)
262             {
263 0     0         my ( $self, $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ) = @_;
264              
265 0 0             unless ( defined $queue_name )
266                 {
267 0                   LOGCONFESS( 'Queue name is undefined' );
268                 }
269              
270 0               my $mq = $self->_mq();
271              
272 0               my $channel_number = _channel_number();
273 0               my $options = {
274                     durable => $durable,
275                     auto_delete => 0,
276                 };
277 0 0             my $arguments = {
278                     'x-max-priority' => _priority_count(),
279                     'x-queue-mode' => ( $lazy_queue ? 'lazy' : 'default' ),
280                 };
281              
282 0               eval { $mq->queue_declare( $channel_number, $queue_name, $options, $arguments ); };
  0            
283 0 0             if ( $@ )
284                 {
285 0                   LOGDIE( "Unable to declare queue '$queue_name': $@" );
286                 }
287              
288 0 0             if ( $declare_and_bind_exchange )
289                 {
290 0                   my $exchange_name = $queue_name;
291 0                   my $routing_key = $queue_name;
292              
293 0                   eval {
294 0                       $mq->exchange_declare(
295                             $channel_number,
296                             $exchange_name,
297                             {
298                                 durable => $durable,
299                                 auto_delete => 0,
300                             }
301                         );
302 0                       $mq->queue_bind( $channel_number, $queue_name, $exchange_name, $routing_key );
303                     };
304 0 0                 if ( $@ )
305                     {
306 0                       LOGDIE( "Unable to bind queue '$queue_name' to exchange '$exchange_name': $@" );
307                     }
308                 }
309             }
310              
311             sub _declare_task_queue($$;$)
312             {
313 0     0         my ( $self, $queue_name, $lazy_queue ) = @_;
314              
315 0 0             unless ( defined $queue_name )
316                 {
317 0                   LOGCONFESS( 'Queue name is undefined' );
318                 }
319              
320 0               my $durable = $RABBITMQ_QUEUE_DURABLE;
321 0               my $declare_and_bind_exchange = 1;
322              
323 0               return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
324             }
325              
326             sub _declare_results_queue($$;$)
327             {
328 0     0         my ( $self, $queue_name, $lazy_queue ) = @_;
329              
330 0 0             unless ( defined $queue_name )
331                 {
332 0                   LOGCONFESS( 'Queue name is undefined' );
333                 }
334              
335 0               my $durable = $RABBITMQ_QUEUE_TRANSIENT;
336 0               my $declare_and_bind_exchange = 0;
337              
338 0               return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
339             }
340              
341             sub _publish_json_message($$$;$$)
342             {
343 0     0         my ( $self, $routing_key, $payload, $extra_options, $extra_props ) = @_;
344              
345 0               my $mq = $self->_mq();
346              
347 0 0             unless ( $routing_key )
348                 {
349 0                   LOGCONFESS( 'Routing key is undefined.' );
350                 }
351 0 0             unless ( $payload )
352                 {
353 0                   LOGCONFESS( 'Payload is undefined.' );
354                 }
355              
356 0               my $payload_json;
357 0               eval { $payload_json = $json->encode( $payload ); };
  0            
358 0 0             if ( $@ )
359                 {
360 0                   LOGDIE( "Unable to encode JSON message: $@" );
361                 }
362              
363 0               my $channel_number = _channel_number();
364              
365 0               my $options = {};
366 0 0             if ( $extra_options )
367                 {
368 0                   $options = { %{ $options }, %{ $extra_options } };
  0            
  0            
369                 }
370 0               my $props = {
371                     content_type => 'application/json',
372                     content_encoding => 'utf-8',
373                 };
374 0 0             if ( $extra_props )
375                 {
376 0                   $props = { %{ $props }, %{ $extra_props } };
  0            
  0            
377                 }
378              
379 0               eval { $mq->publish( $channel_number, $routing_key, $payload_json, $options, $props ); };
  0            
380 0 0             if ( $@ )
381                 {
382 0                   LOGDIE( "Unable to publish message to routing key '$routing_key': $@" );
383                 }
384             }
385              
386             sub _random_uuid()
387             {
388             # Celery uses v4 (random) UUIDs
389 0     0         return create_uuid_as_string( UUID_RANDOM );
390             }
391              
392             sub _priority_to_int($)
393             {
394 0     0         my $priority = shift;
395              
396 0 0             unless ( exists $RABBITMQ_PRIORITIES{ $priority } )
397                 {
398 0                   LOGDIE( "Unknown job priority: $priority" );
399                 }
400              
401 0               return $RABBITMQ_PRIORITIES{ $priority };
402             }
403              
404             sub _priority_count()
405             {
406 0     0         return scalar( keys( %RABBITMQ_PRIORITIES ) );
407             }
408              
409             sub _process_worker_message($$$)
410             {
411 0     0         my ( $self, $function_name, $message ) = @_;
412              
413 0               my $mq = $self->_mq();
414              
415 0               my $correlation_id = $message->{ props }->{ correlation_id };
416 0 0             unless ( $correlation_id )
417                 {
418 0                   LOGDIE( '"correlation_id" is empty.' );
419                 }
420              
421             # "reply_to" might be empty if sending back job results is disabled via
422             # !publish_results()
423 0               my $reply_to = $message->{ props }->{ reply_to };
424              
425 0   0           my $priority = $message->{ props }->{ priority } // 0;
426              
427 0               my $delivery_tag = $message->{ delivery_tag };
428 0 0             unless ( $delivery_tag )
429                 {
430 0                   LOGDIE( "'delivery_tag' is empty." );
431                 }
432              
433 0               my $payload_json = $message->{ body };
434 0 0             unless ( $payload_json )
435                 {
436 0                   LOGDIE( 'Message payload is empty.' );
437                 }
438              
439 0               my $payload;
440 0               eval { $payload = $json->decode( $payload_json ); };
  0            
441 0 0 0           if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
      0        
442                 {
443 0                   LOGDIE( 'Unable to decode payload JSON: ' . $@ );
444                 }
445              
446 0 0             if ( $payload->{ task } ne $function_name )
447                 {
448 0                   LOGDIE( "Task name is not '$function_name'; maybe you're using same queue for multiple types of jobs?" );
449                 }
450              
451 0               my $celery_job_id = $payload->{ id };
452 0               my $args = $payload->{ kwargs };
453              
454             # Do the job
455 0               my $job_result;
456 0               eval { $job_result = $function_name->run_locally( $args, $celery_job_id ); };
  0            
457 0               my $error_message = $@;
458              
459             # If the job has failed, run_locally() has already printed the error
460             # message multiple times at this point so we don't repeat outselves
461              
462 0 0             if ( $reply_to )
463                 { # undef if !publish_results()
464              
465             # Construct response based on whether the job succeeded or failed
466 0                   my $response;
467 0 0                 if ( $error_message )
468                     {
469 0                       ERROR( "Job '$celery_job_id' died: $@" );
470 0                       $response = {
471                             'status' => 'FAILURE',
472                             'traceback' => "Job died: $error_message",
473                             'result' => {
474                                 'exc_message' => 'Task has failed',
475                                 'exc_type' => 'Exception',
476                             },
477                             'task_id' => $celery_job_id,
478                             'children' => []
479                         };
480                     }
481                     else
482                     {
483 0                       $response = {
484                             'status' => 'SUCCESS',
485                             'traceback' => undef,
486                             'result' => $job_result,
487                             'task_id' => $celery_job_id,
488                             'children' => []
489                         };
490                     }
491              
492             # Send message back with the job result
493 0                   eval {
494 0                       $self->_declare_results_queue( $reply_to, $function_name->lazy_queue() );
495 0                       $self->_publish_json_message(
496                             $reply_to,
497                             $response,
498                             {
499             # Options
500                             },
501                             {
502             # Properties
503                                 delivery_mode => $RABBITMQ_DELIVERY_MODE_NONPERSISTENT,
504                                 priority => $priority,
505                                 correlation_id => $celery_job_id,
506                             }
507                         );
508                     };
509 0 0                 if ( $@ )
510                     {
511 0                       LOGDIE( "Unable to publish job $celery_job_id result: $@" );
512                     }
513                 }
514              
515             # ACK the message (mark the job as completed)
516 0               eval { $mq->ack( _channel_number(), $delivery_tag ); };
  0            
517 0 0             if ( $@ )
518                 {
519 0                   LOGDIE( "Unable to mark job $celery_job_id as completed: $@" );
520                 }
521             }
522              
523             sub start_worker($$)
524             {
525 0     0 0       my ( $self, $function_name ) = @_;
526              
527 0               my $mq = $self->_mq();
528              
529 0               $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );
530              
531 0               my $consume_options = {
532              
533             # Don't assume that the job is finished when it reaches the worker
534                     no_ack => 0,
535                 };
536 0               my $consumer_tag = $mq->consume( _channel_number(), $function_name, $consume_options );
537              
538 0               INFO( "Consumer tag: $consumer_tag" );
539 0               INFO( "Worker is ready and accepting jobs" );
540 0               my $recv_timeout = 0; # block until message is received
541 0               while ( my $message = $mq->recv( 0 ) )
542                 {
543 0                   $self->_process_worker_message( $function_name, $message );
544                 }
545             }
546              
547             sub run_job_sync($$$$)
548             {
549 0     0 0       my ( $self, $function_name, $args, $priority ) = @_;
550              
551 0               my $mq = $self->_mq();
552              
553             # Post the job
554 0               my $publish_results = 1; # always publish results when running synchronously
555 0               my $celery_job_id = $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $publish_results );
556              
557             # Declare result queue (ignore function's publish_results())
558 0               my $reply_to_queue = $self->_reply_to_queue( $function_name );
559 0               eval { $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() ); };
  0            
560 0 0             if ( $@ )
561                 {
562 0                   LOGDIE( "Unable to declare results queue '$reply_to_queue': $@" );
563                 }
564              
565 0               my $results_cache = $self->_results_cache_hashref( $function_name );
566              
567 0               my $message;
568 0 0             if ( exists $results_cache->{ $celery_job_id } )
569                 {
570             # Result for this job ID was fetched previously -- return from cache
571 0                   DEBUG( "Results message for job ID '$celery_job_id' found in cache" );
572 0                   $message = $results_cache->{ $celery_job_id };
573 0                   delete $results_cache->{ $celery_job_id };
574              
575                 }
576                 else
577                 {
578             # Result not yet fetched -- process the result queue
579              
580 0                   my $channel_number = _channel_number();
581 0                   my $consume_options = {};
582 0                   my $consumer_tag = $mq->consume( $channel_number, $reply_to_queue, $consume_options );
583              
584 0                   my $recv_timeout = 0; # block until message is received
585              
586 0                   while ( my $queue_message = $mq->recv( 0 ) )
587                     {
588 0                       my $correlation_id = $queue_message->{ props }->{ correlation_id };
589 0 0                     unless ( $correlation_id )
590                         {
591 0                           LOGDIE( '"correlation_id" is empty.' );
592                         }
593              
594 0 0                     if ( $correlation_id eq $celery_job_id )
595                         {
596 0                           DEBUG( "Found results message with job ID '$celery_job_id'." );
597 0                           $message = $queue_message;
598 0                           last;
599              
600                         }
601                         else
602                         {
603             # Message belongs to some other job -- add to cache and continue
604 0                           DEBUG( "Results message '$correlation_id' does not belong to job ID '$celery_job_id'." );
605 0                           $results_cache->{ $correlation_id } = $queue_message;
606                         }
607                     }
608                 }
609              
610 0 0             unless ( $message )
611                 {
612 0                   LOGDIE( "At this point, message should have been fetched either from broker or from cache" );
613                 }
614              
615 0               my $correlation_id = $message->{ props }->{ correlation_id };
616 0 0             unless ( $correlation_id )
617                 {
618 0                   LOGDIE( '"correlation_id" is empty.' );
619                 }
620 0 0             if ( $correlation_id ne $celery_job_id )
621                 {
622             # Message belongs to some other job -- requeue and skip
623 0                   DEBUG( "'correlation_id' ('$correlation_id') is not equal to job ID ('$celery_job_id')." );
624 0                   next;
625                 }
626              
627 0               my $payload_json = $message->{ body };
628 0 0             unless ( $payload_json )
629                 {
630 0                   LOGDIE( 'Message payload is empty.' );
631                 }
632              
633 0               my $payload;
634 0               eval { $payload = $json->decode( $payload_json ); };
  0            
635 0 0 0           if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
      0        
636                 {
637 0                   LOGDIE( 'Unable to decode payload JSON: ' . $@ );
638                 }
639              
640 0 0             if ( $payload->{ task_id } ne $celery_job_id )
641                 {
642 0                   LOGDIE( "'task_id' ('$payload->{ task_id }') is not equal to job ID ('$celery_job_id')." );
643                 }
644              
645             # Return job result
646 0 0             if ( $payload->{ status } eq 'SUCCESS' )
    0          
647                 {
648             # Job completed successfully
649 0                   return $payload->{ result };
650              
651                 }
652                 elsif ( $payload->{ status } eq 'FAILURE' )
653                 {
654             # Job failed -- pass the failure to the caller
655 0                   LOGDIE( "Job '$celery_job_id' failed: " . $payload->{ traceback } );
656              
657                 }
658                 else
659                 {
660             # Unknown value
661 0                   LOGDIE( "Unknown 'status' value: " . $payload->{ status } );
662                 }
663             }
664              
665             sub run_job_async($$$$)
666             {
667 0     0 0       my ( $self, $function_name, $args, $priority ) = @_;
668              
669 0               return $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $function_name->publish_results() );
670             }
671              
672             sub _run_job_on_rabbitmq($$$$$)
673             {
674 0     0         my ( $self, $function_name, $args, $priority, $publish_results ) = @_;
675              
676 0 0             unless ( defined( $args ) )
677                 {
678 0                   $args = {};
679                 }
680 0 0             unless ( ref( $args ) eq ref( {} ) )
681                 {
682 0                   LOGDIE( "'args' is not a hashref." );
683                 }
684              
685 0               my $celery_job_id = create_uuid_as_string( UUID_RANDOM );
686              
687             # Encode payload
688 0               my $payload = {
689                     'expires' => undef,
690                     'utc' => JSON::true,
691                     'args' => [],
692                     'chord' => undef,
693                     'callbacks' => undef,
694                     'errbacks' => undef,
695                     'taskset' => undef,
696                     'id' => $celery_job_id,
697                     'retries' => $function_name->retries(),
698                     'task' => $function_name,
699                     'timelimit' => [ undef, undef, ],
700                     'eta' => undef,
701                     'kwargs' => $args,
702                 };
703              
704             # Declare task queue
705 0               $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );
706              
707 0               my $reply_to_queue;
708 0 0             if ( $publish_results )
709                 {
710             # Declare result queue before posting a job (just like Celery does)
711 0                   $reply_to_queue = $self->_reply_to_queue( $function_name );
712 0                   $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() );
713                 }
714                 else
715                 {
716 0                   $reply_to_queue = ''; # undef doesn't work with Net::AMQP::RabbitMQ
717                 }
718              
719             # Post a job
720 0               eval {
721 0                   my $rabbitmq_priority = _priority_to_int( $priority );
722 0                   $self->_publish_json_message(
723                         $function_name,
724                         $payload,
725                         {
726             # Options
727                             exchange => $function_name
728                         },
729                         {
730             # Properties
731                             delivery_mode => $RABBITMQ_DELIVERY_MODE_PERSISTENT,
732                             priority => $rabbitmq_priority,
733                             correlation_id => $celery_job_id,
734                             reply_to => $reply_to_queue,
735                         }
736                     );
737                 };
738 0 0             if ( $@ )
739                 {
740 0                   LOGDIE( "Unable to add job '$celery_job_id' to queue: $@" );
741                 }
742              
743 0               return $celery_job_id;
744             }
745              
746             sub job_id_from_handle($$)
747             {
748 0     0 0       my ( $self, $job_handle ) = @_;
749              
750 0               return $job_handle;
751             }
752              
753             sub set_job_progress($$$$)
754             {
755 0     0 0       my ( $self, $job, $numerator, $denominator ) = @_;
756              
757 0               LOGDIE( "FIXME not implemented." );
758             }
759              
760             sub job_status($$$)
761             {
762 0     0 0       my ( $self, $function_name, $job_id ) = @_;
763              
764 0               LOGDIE( "FIXME not implemented." );
765             }
766              
767             sub show_jobs($)
768             {
769 0     0 0       my $self = shift;
770              
771 0               LOGDIE( "FIXME not implemented." );
772             }
773              
774             sub cancel_job($)
775             {
776 0     0 0       my ( $self, $job_id ) = @_;
777              
778 0               LOGDIE( "FIXME not implemented." );
779             }
780              
781             sub server_status($$)
782             {
783 0     0 0       my $self = shift;
784              
785 0               LOGDIE( "FIXME not implemented." );
786             }
787              
788             sub workers($)
789             {
790 0     0 0       my $self = shift;
791              
792 0               LOGDIE( "FIXME not implemented." );
793             }
794              
795 1     1   8 no Moose; # gets rid of scaffolding
  1         1  
  1         9  
796              
797             1;
798