| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package MediaCloud::JobManager::Broker::RabbitMQ; |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
|
|
11
|
1
|
|
|
1
|
|
11
|
use strict; |
|
|
1
|
|
|
|
|
4
|
|
|
|
1
|
|
|
|
|
47
|
|
|
12
|
1
|
|
|
1
|
|
8
|
use warnings; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
60
|
|
|
13
|
1
|
|
|
1
|
|
8
|
use Modern::Perl "2012"; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
12
|
|
|
14
|
|
|
|
|
|
|
|
|
15
|
1
|
|
|
1
|
|
292
|
use Moose; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
11
|
|
|
16
|
|
|
|
|
|
|
with 'MediaCloud::JobManager::Broker'; |
|
17
|
|
|
|
|
|
|
|
|
18
|
1
|
|
|
1
|
|
9828
|
use Net::AMQP::RabbitMQ; |
|
|
1
|
|
|
|
|
4358
|
|
|
|
1
|
|
|
|
|
37
|
|
|
19
|
1
|
|
|
1
|
|
9
|
use UUID::Tiny ':std'; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
234
|
|
|
20
|
1
|
|
|
1
|
|
330
|
use Tie::Cache; |
|
|
1
|
|
|
|
|
2214
|
|
|
|
1
|
|
|
|
|
31
|
|
|
21
|
1
|
|
|
1
|
|
487
|
use JSON; |
|
|
1
|
|
|
|
|
6295
|
|
|
|
1
|
|
|
|
|
7
|
|
|
22
|
1
|
|
|
1
|
|
161
|
use Data::Dumper; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
61
|
|
|
23
|
1
|
|
|
1
|
|
6
|
use Readonly; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
41
|
|
|
24
|
|
|
|
|
|
|
|
|
25
|
1
|
|
|
1
|
|
6
|
use Log::Log4perl qw(:easy); |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
9
|
|
|
26
|
|
|
|
|
|
|
Log::Log4perl->easy_init( |
|
27
|
|
|
|
|
|
|
{ |
|
28
|
|
|
|
|
|
|
level => $DEBUG, |
|
29
|
|
|
|
|
|
|
utf8 => 1, |
|
30
|
|
|
|
|
|
|
layout => "%d{ISO8601} [%P]: %m%n" |
|
31
|
|
|
|
|
|
|
} |
|
32
|
|
|
|
|
|
|
); |
|
33
|
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
$| = 1; |
|
36
|
|
|
|
|
|
|
|
|
37
|
1
|
|
|
1
|
|
832
|
use MediaCloud::JobManager; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
18
|
|
|
38
|
1
|
|
|
1
|
|
5
|
use MediaCloud::JobManager::Job; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
3032
|
|
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
Readonly my $RABBITMQ_DEFAULT_TIMEOUT => 60; |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
Readonly my $RABBITMQ_DEFAULT_RETRIES => 60; |
|
45
|
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
Readonly my $RABBITMQ_DELIVERY_MODE_NONPERSISTENT => 1; |
|
48
|
|
|
|
|
|
|
Readonly my $RABBITMQ_DELIVERY_MODE_PERSISTENT => 2; |
|
49
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
Readonly my $RABBITMQ_QUEUE_TRANSIENT => 0; |
|
52
|
|
|
|
|
|
|
Readonly my $RABBITMQ_QUEUE_DURABLE => 1; |
|
53
|
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
Readonly my %RABBITMQ_PRIORITIES => ( |
|
56
|
|
|
|
|
|
|
$MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_LOW => 0, |
|
57
|
|
|
|
|
|
|
$MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_NORMAL => 1, |
|
58
|
|
|
|
|
|
|
$MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_HIGH => 2, |
|
59
|
|
|
|
|
|
|
); |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
my $json = JSON->new->allow_nonref->canonical->utf8; |
|
63
|
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
has '_hostname' => ( is => 'rw', isa => 'Str' ); |
|
66
|
|
|
|
|
|
|
has '_port' => ( is => 'rw', isa => 'Int' ); |
|
67
|
|
|
|
|
|
|
has '_username' => ( is => 'rw', isa => 'Str' ); |
|
68
|
|
|
|
|
|
|
has '_password' => ( is => 'rw', isa => 'Str' ); |
|
69
|
|
|
|
|
|
|
has '_vhost' => ( is => 'rw', isa => 'Str' ); |
|
70
|
|
|
|
|
|
|
has '_timeout' => ( is => 'rw', isa => 'Int' ); |
|
71
|
|
|
|
|
|
|
has '_retries' => ( is => 'rw', isa => 'Int' ); |
|
72
|
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
my %_rabbitmq_connection_for_connection_id; |
|
75
|
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
my %_reply_to_queues_for_connection_id_function_name; |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
my %_results_caches_for_connection_id_function_name; |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
Readonly my $RABBITMQ_RESULTS_CACHE_MAXCOUNT => 1024 * 100; |
|
98
|
|
|
|
|
|
|
Readonly my $RABBITMQ_RESULTS_CACHE_MAXBYTES => 1024 * 1024 * 10; |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
sub BUILD |
|
102
|
|
|
|
|
|
|
{ |
|
103
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
104
|
0
|
|
|
|
|
|
my $args = shift; |
|
105
|
|
|
|
|
|
|
|
|
106
|
0
|
|
0
|
|
|
|
$self->_hostname( $args->{ hostname } // 'localhost' ); |
|
107
|
0
|
|
0
|
|
|
|
$self->_port( $args->{ port } // 5672 ); |
|
108
|
0
|
|
0
|
|
|
|
$self->_username( $args->{ username } // 'guest' ); |
|
109
|
0
|
|
0
|
|
|
|
$self->_password( $args->{ password } // 'guest' ); |
|
110
|
0
|
|
|
|
|
|
my $default_vhost = '/'; |
|
111
|
0
|
|
0
|
|
|
|
$self->_vhost( $args->{ vhost } // $default_vhost ); |
|
112
|
0
|
|
0
|
|
|
|
$self->_timeout( $args->{ timeout } // $RABBITMQ_DEFAULT_TIMEOUT ); |
|
113
|
0
|
|
0
|
|
|
|
$self->_retries( $args->{ retries } // $RABBITMQ_DEFAULT_RETRIES ); |
|
114
|
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
|
|
116
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
117
|
|
|
|
|
|
|
} |
|
118
|
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
sub _connection_identifier($) |
|
122
|
|
|
|
|
|
|
{ |
|
123
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
124
|
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
|
|
126
|
0
|
|
|
|
|
|
my $pid = $$; |
|
127
|
|
|
|
|
|
|
|
|
128
|
0
|
|
|
|
|
|
return sprintf( |
|
129
|
|
|
|
|
|
|
'PID=%d; hostname=%s; port=%d; username: %s; password=%s; vhost=%s, timeout=%d, retries=%d', |
|
130
|
|
|
|
|
|
|
$pid, $self->_hostname, $self->_port, $self->_username, |
|
131
|
|
|
|
|
|
|
$self->_password, $self->_vhost, $self->_timeout, $self->_retries |
|
132
|
|
|
|
|
|
|
); |
|
133
|
|
|
|
|
|
|
} |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
sub _mq($) |
|
137
|
|
|
|
|
|
|
{ |
|
138
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
139
|
|
|
|
|
|
|
|
|
140
|
0
|
|
|
|
|
|
my $conn_id = $self->_connection_identifier(); |
|
141
|
|
|
|
|
|
|
|
|
142
|
0
|
0
|
|
|
|
|
unless ( $_rabbitmq_connection_for_connection_id{ $conn_id } ) |
|
143
|
|
|
|
|
|
|
{ |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
|
|
146
|
0
|
|
|
|
|
|
DEBUG( "Connecting to RabbitMQ (PID: $$, hostname: " . |
|
147
|
|
|
|
|
|
|
$self->_hostname . ", port: " . $self->_port . ", username: " . $self->_username . ")..." ); |
|
148
|
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
|
|
150
|
0
|
|
|
|
|
|
my $mq; |
|
151
|
0
|
|
|
|
|
|
my $connected = 0; |
|
152
|
0
|
|
|
|
|
|
my $last_error_message; |
|
153
|
0
|
|
|
|
|
|
for ( my $retry = 0 ; $retry < $self->_retries ; ++$retry ) |
|
154
|
|
|
|
|
|
|
{ |
|
155
|
0
|
|
|
|
|
|
eval { |
|
156
|
0
|
0
|
|
|
|
|
if ( $retry > 0 ) |
|
157
|
|
|
|
|
|
|
{ |
|
158
|
0
|
|
|
|
|
|
DEBUG( "Retrying #$retry..." ); |
|
159
|
|
|
|
|
|
|
} |
|
160
|
|
|
|
|
|
|
|
|
161
|
0
|
|
|
|
|
|
$mq = Net::AMQP::RabbitMQ->new(); |
|
162
|
0
|
|
|
|
|
|
$mq->connect( |
|
163
|
|
|
|
|
|
|
$self->_hostname, |
|
164
|
|
|
|
|
|
|
{ |
|
165
|
|
|
|
|
|
|
user => $self->_username, |
|
166
|
|
|
|
|
|
|
password => $self->_password, |
|
167
|
|
|
|
|
|
|
port => $self->_port, |
|
168
|
|
|
|
|
|
|
vhost => $self->_vhost, |
|
169
|
|
|
|
|
|
|
timeout => $self->_timeout, |
|
170
|
|
|
|
|
|
|
} |
|
171
|
|
|
|
|
|
|
); |
|
172
|
|
|
|
|
|
|
}; |
|
173
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
174
|
|
|
|
|
|
|
{ |
|
175
|
0
|
|
|
|
|
|
$last_error_message = $@; |
|
176
|
0
|
|
|
|
|
|
WARN( "Unable to connect to RabbitMQ, will retry: $last_error_message" ); |
|
177
|
0
|
|
|
|
|
|
sleep( 1 ); |
|
178
|
|
|
|
|
|
|
} |
|
179
|
|
|
|
|
|
|
else |
|
180
|
|
|
|
|
|
|
{ |
|
181
|
0
|
|
|
|
|
|
$connected = 1; |
|
182
|
0
|
|
|
|
|
|
last; |
|
183
|
|
|
|
|
|
|
} |
|
184
|
|
|
|
|
|
|
} |
|
185
|
0
|
0
|
|
|
|
|
unless ( $connected ) |
|
186
|
|
|
|
|
|
|
{ |
|
187
|
0
|
|
|
|
|
|
LOGDIE( "Unable to connect to RabbitMQ, giving up: $last_error_message" ); |
|
188
|
|
|
|
|
|
|
} |
|
189
|
|
|
|
|
|
|
|
|
190
|
0
|
|
|
|
|
|
my $channel_number = _channel_number(); |
|
191
|
0
|
0
|
|
|
|
|
unless ( $channel_number ) |
|
192
|
|
|
|
|
|
|
{ |
|
193
|
0
|
|
|
|
|
|
LOGDIE( "Channel number is unset." ); |
|
194
|
|
|
|
|
|
|
} |
|
195
|
|
|
|
|
|
|
|
|
196
|
0
|
|
|
|
|
|
eval { |
|
197
|
0
|
|
|
|
|
|
$mq->channel_open( $channel_number ); |
|
198
|
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
|
|
200
|
0
|
|
|
|
|
|
$mq->basic_qos( $channel_number, { prefetch_count => 1 } ); |
|
201
|
|
|
|
|
|
|
}; |
|
202
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
203
|
|
|
|
|
|
|
{ |
|
204
|
0
|
|
|
|
|
|
LOGDIE( "Unable to open channel $channel_number: $@" ); |
|
205
|
|
|
|
|
|
|
} |
|
206
|
|
|
|
|
|
|
|
|
207
|
0
|
|
|
|
|
|
$_rabbitmq_connection_for_connection_id{ $conn_id } = $mq; |
|
208
|
0
|
|
|
|
|
|
$_reply_to_queues_for_connection_id_function_name{ $conn_id } = (); |
|
209
|
0
|
|
|
|
|
|
$_results_caches_for_connection_id_function_name{ $conn_id } = (); |
|
210
|
|
|
|
|
|
|
} |
|
211
|
|
|
|
|
|
|
|
|
212
|
0
|
|
|
|
|
|
return $_rabbitmq_connection_for_connection_id{ $conn_id }; |
|
213
|
|
|
|
|
|
|
} |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
sub _reply_to_queue($$) |
|
217
|
|
|
|
|
|
|
{ |
|
218
|
0
|
|
|
0
|
|
|
my ( $self, $function_name ) = @_; |
|
219
|
|
|
|
|
|
|
|
|
220
|
0
|
|
|
|
|
|
my $conn_id = $self->_connection_identifier(); |
|
221
|
|
|
|
|
|
|
|
|
222
|
0
|
0
|
|
|
|
|
unless ( defined $_reply_to_queues_for_connection_id_function_name{ $conn_id } ) |
|
223
|
|
|
|
|
|
|
{ |
|
224
|
0
|
|
|
|
|
|
$_reply_to_queues_for_connection_id_function_name{ $conn_id } = (); |
|
225
|
|
|
|
|
|
|
} |
|
226
|
|
|
|
|
|
|
|
|
227
|
0
|
0
|
|
|
|
|
unless ( $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } ) |
|
228
|
|
|
|
|
|
|
{ |
|
229
|
0
|
|
|
|
|
|
my $reply_to_queue = _random_uuid(); |
|
230
|
0
|
|
|
|
|
|
$_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } = $reply_to_queue; |
|
231
|
|
|
|
|
|
|
} |
|
232
|
|
|
|
|
|
|
|
|
233
|
0
|
|
|
|
|
|
return $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name }; |
|
234
|
|
|
|
|
|
|
} |
|
235
|
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
sub _results_cache_hashref($$) |
|
238
|
|
|
|
|
|
|
{ |
|
239
|
0
|
|
|
0
|
|
|
my ( $self, $function_name ) = @_; |
|
240
|
|
|
|
|
|
|
|
|
241
|
0
|
|
|
|
|
|
my $conn_id = $self->_connection_identifier(); |
|
242
|
|
|
|
|
|
|
|
|
243
|
0
|
0
|
|
|
|
|
unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id } ) |
|
244
|
|
|
|
|
|
|
{ |
|
245
|
0
|
|
|
|
|
|
$_results_caches_for_connection_id_function_name{ $conn_id } = (); |
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
|
|
248
|
0
|
0
|
|
|
|
|
unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } ) |
|
249
|
|
|
|
|
|
|
{ |
|
250
|
0
|
|
|
|
|
|
$_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } = {}; |
|
251
|
|
|
|
|
|
|
|
|
252
|
0
|
|
|
|
|
|
tie %{ $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } }, 'Tie::Cache', |
|
|
0
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
{ |
|
254
|
|
|
|
|
|
|
MaxCount => $RABBITMQ_RESULTS_CACHE_MAXCOUNT, |
|
255
|
|
|
|
|
|
|
MaxBytes => $RABBITMQ_RESULTS_CACHE_MAXBYTES |
|
256
|
|
|
|
|
|
|
}; |
|
257
|
|
|
|
|
|
|
} |
|
258
|
|
|
|
|
|
|
|
|
259
|
0
|
|
|
|
|
|
return $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name }; |
|
260
|
|
|
|
|
|
|
} |
|
261
|
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
sub _channel_number() |
|
264
|
|
|
|
|
|
|
{ |
|
265
|
|
|
|
|
|
|
|
|
266
|
0
|
|
|
0
|
|
|
return 1; |
|
267
|
|
|
|
|
|
|
} |
|
268
|
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
sub _declare_queue($$$$;$) |
|
270
|
|
|
|
|
|
|
{ |
|
271
|
0
|
|
|
0
|
|
|
my ( $self, $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ) = @_; |
|
272
|
|
|
|
|
|
|
|
|
273
|
0
|
0
|
|
|
|
|
unless ( defined $queue_name ) |
|
274
|
|
|
|
|
|
|
{ |
|
275
|
0
|
|
|
|
|
|
LOGCONFESS( 'Queue name is undefined' ); |
|
276
|
|
|
|
|
|
|
} |
|
277
|
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
279
|
|
|
|
|
|
|
|
|
280
|
0
|
|
|
|
|
|
my $channel_number = _channel_number(); |
|
281
|
0
|
|
|
|
|
|
my $options = { |
|
282
|
|
|
|
|
|
|
durable => $durable, |
|
283
|
|
|
|
|
|
|
auto_delete => 0, |
|
284
|
|
|
|
|
|
|
}; |
|
285
|
0
|
0
|
|
|
|
|
my $arguments = { |
|
286
|
|
|
|
|
|
|
'x-max-priority' => _priority_count(), |
|
287
|
|
|
|
|
|
|
'x-queue-mode' => ( $lazy_queue ? 'lazy' : 'default' ), |
|
288
|
|
|
|
|
|
|
}; |
|
289
|
|
|
|
|
|
|
|
|
290
|
0
|
|
|
|
|
|
eval { $mq->queue_declare( $channel_number, $queue_name, $options, $arguments ); }; |
|
|
0
|
|
|
|
|
|
|
|
291
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
292
|
|
|
|
|
|
|
{ |
|
293
|
0
|
|
|
|
|
|
LOGDIE( "Unable to declare queue '$queue_name': $@" ); |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
|
|
296
|
0
|
0
|
|
|
|
|
if ( $declare_and_bind_exchange ) |
|
297
|
|
|
|
|
|
|
{ |
|
298
|
0
|
|
|
|
|
|
my $exchange_name = $queue_name; |
|
299
|
0
|
|
|
|
|
|
my $routing_key = $queue_name; |
|
300
|
|
|
|
|
|
|
|
|
301
|
0
|
|
|
|
|
|
eval { |
|
302
|
0
|
|
|
|
|
|
$mq->exchange_declare( |
|
303
|
|
|
|
|
|
|
$channel_number, |
|
304
|
|
|
|
|
|
|
$exchange_name, |
|
305
|
|
|
|
|
|
|
{ |
|
306
|
|
|
|
|
|
|
durable => $durable, |
|
307
|
|
|
|
|
|
|
auto_delete => 0, |
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
); |
|
310
|
0
|
|
|
|
|
|
$mq->queue_bind( $channel_number, $queue_name, $exchange_name, $routing_key ); |
|
311
|
|
|
|
|
|
|
}; |
|
312
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
313
|
|
|
|
|
|
|
{ |
|
314
|
0
|
|
|
|
|
|
LOGDIE( "Unable to bind queue '$queue_name' to exchange '$exchange_name': $@" ); |
|
315
|
|
|
|
|
|
|
} |
|
316
|
|
|
|
|
|
|
} |
|
317
|
|
|
|
|
|
|
} |
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
sub _declare_task_queue($$;$) |
|
320
|
|
|
|
|
|
|
{ |
|
321
|
0
|
|
|
0
|
|
|
my ( $self, $queue_name, $lazy_queue ) = @_; |
|
322
|
|
|
|
|
|
|
|
|
323
|
0
|
0
|
|
|
|
|
unless ( defined $queue_name ) |
|
324
|
|
|
|
|
|
|
{ |
|
325
|
0
|
|
|
|
|
|
LOGCONFESS( 'Queue name is undefined' ); |
|
326
|
|
|
|
|
|
|
} |
|
327
|
|
|
|
|
|
|
|
|
328
|
0
|
|
|
|
|
|
my $durable = $RABBITMQ_QUEUE_DURABLE; |
|
329
|
0
|
|
|
|
|
|
my $declare_and_bind_exchange = 1; |
|
330
|
|
|
|
|
|
|
|
|
331
|
0
|
|
|
|
|
|
return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ); |
|
332
|
|
|
|
|
|
|
} |
|
333
|
|
|
|
|
|
|
|
|
334
|
|
|
|
|
|
|
sub _declare_results_queue($$;$) |
|
335
|
|
|
|
|
|
|
{ |
|
336
|
0
|
|
|
0
|
|
|
my ( $self, $queue_name, $lazy_queue ) = @_; |
|
337
|
|
|
|
|
|
|
|
|
338
|
0
|
0
|
|
|
|
|
unless ( defined $queue_name ) |
|
339
|
|
|
|
|
|
|
{ |
|
340
|
0
|
|
|
|
|
|
LOGCONFESS( 'Queue name is undefined' ); |
|
341
|
|
|
|
|
|
|
} |
|
342
|
|
|
|
|
|
|
|
|
343
|
0
|
|
|
|
|
|
my $durable = $RABBITMQ_QUEUE_TRANSIENT; |
|
344
|
0
|
|
|
|
|
|
my $declare_and_bind_exchange = 0; |
|
345
|
|
|
|
|
|
|
|
|
346
|
0
|
|
|
|
|
|
return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ); |
|
347
|
|
|
|
|
|
|
} |
|
348
|
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
sub _publish_json_message($$$;$$) |
|
350
|
|
|
|
|
|
|
{ |
|
351
|
0
|
|
|
0
|
|
|
my ( $self, $routing_key, $payload, $extra_options, $extra_props ) = @_; |
|
352
|
|
|
|
|
|
|
|
|
353
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
354
|
|
|
|
|
|
|
|
|
355
|
0
|
0
|
|
|
|
|
unless ( $routing_key ) |
|
356
|
|
|
|
|
|
|
{ |
|
357
|
0
|
|
|
|
|
|
LOGCONFESS( 'Routing key is undefined.' ); |
|
358
|
|
|
|
|
|
|
} |
|
359
|
0
|
0
|
|
|
|
|
unless ( $payload ) |
|
360
|
|
|
|
|
|
|
{ |
|
361
|
0
|
|
|
|
|
|
LOGCONFESS( 'Payload is undefined.' ); |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
|
|
364
|
0
|
|
|
|
|
|
my $payload_json; |
|
365
|
0
|
|
|
|
|
|
eval { $payload_json = $json->encode( $payload ); }; |
|
|
0
|
|
|
|
|
|
|
|
366
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
367
|
|
|
|
|
|
|
{ |
|
368
|
0
|
|
|
|
|
|
LOGDIE( "Unable to encode JSON message: $@" ); |
|
369
|
|
|
|
|
|
|
} |
|
370
|
|
|
|
|
|
|
|
|
371
|
0
|
|
|
|
|
|
my $channel_number = _channel_number(); |
|
372
|
|
|
|
|
|
|
|
|
373
|
0
|
|
|
|
|
|
my $options = {}; |
|
374
|
0
|
0
|
|
|
|
|
if ( $extra_options ) |
|
375
|
|
|
|
|
|
|
{ |
|
376
|
0
|
|
|
|
|
|
$options = { %{ $options }, %{ $extra_options } }; |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
377
|
|
|
|
|
|
|
} |
|
378
|
0
|
|
|
|
|
|
my $props = { |
|
379
|
|
|
|
|
|
|
content_type => 'application/json', |
|
380
|
|
|
|
|
|
|
content_encoding => 'utf-8', |
|
381
|
|
|
|
|
|
|
}; |
|
382
|
0
|
0
|
|
|
|
|
if ( $extra_props ) |
|
383
|
|
|
|
|
|
|
{ |
|
384
|
0
|
|
|
|
|
|
$props = { %{ $props }, %{ $extra_props } }; |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
} |
|
386
|
|
|
|
|
|
|
|
|
387
|
0
|
|
|
|
|
|
eval { $mq->publish( $channel_number, $routing_key, $payload_json, $options, $props ); }; |
|
|
0
|
|
|
|
|
|
|
|
388
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
389
|
|
|
|
|
|
|
{ |
|
390
|
0
|
|
|
|
|
|
LOGDIE( "Unable to publish message to routing key '$routing_key': $@" ); |
|
391
|
|
|
|
|
|
|
} |
|
392
|
|
|
|
|
|
|
} |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
sub _random_uuid() |
|
395
|
|
|
|
|
|
|
{ |
|
396
|
|
|
|
|
|
|
|
|
397
|
0
|
|
|
0
|
|
|
return create_uuid_as_string( UUID_RANDOM ); |
|
398
|
|
|
|
|
|
|
} |
|
399
|
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
sub _priority_to_int($) |
|
401
|
|
|
|
|
|
|
{ |
|
402
|
0
|
|
|
0
|
|
|
my $priority = shift; |
|
403
|
|
|
|
|
|
|
|
|
404
|
0
|
0
|
|
|
|
|
unless ( exists $RABBITMQ_PRIORITIES{ $priority } ) |
|
405
|
|
|
|
|
|
|
{ |
|
406
|
0
|
|
|
|
|
|
LOGDIE( "Unknown job priority: $priority" ); |
|
407
|
|
|
|
|
|
|
} |
|
408
|
|
|
|
|
|
|
|
|
409
|
0
|
|
|
|
|
|
return $RABBITMQ_PRIORITIES{ $priority }; |
|
410
|
|
|
|
|
|
|
} |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
sub _priority_count() |
|
413
|
|
|
|
|
|
|
{ |
|
414
|
0
|
|
|
0
|
|
|
return scalar( keys( %RABBITMQ_PRIORITIES ) ); |
|
415
|
|
|
|
|
|
|
} |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
sub _process_worker_message($$$) |
|
418
|
|
|
|
|
|
|
{ |
|
419
|
0
|
|
|
0
|
|
|
my ( $self, $function_name, $message ) = @_; |
|
420
|
|
|
|
|
|
|
|
|
421
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
422
|
|
|
|
|
|
|
|
|
423
|
0
|
|
|
|
|
|
my $correlation_id = $message->{ props }->{ correlation_id }; |
|
424
|
0
|
0
|
|
|
|
|
unless ( $correlation_id ) |
|
425
|
|
|
|
|
|
|
{ |
|
426
|
0
|
|
|
|
|
|
LOGDIE( '"correlation_id" is empty.' ); |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
|
|
431
|
0
|
|
|
|
|
|
my $reply_to = $message->{ props }->{ reply_to }; |
|
432
|
|
|
|
|
|
|
|
|
433
|
0
|
|
0
|
|
|
|
my $priority = $message->{ props }->{ priority } // 0; |
|
434
|
|
|
|
|
|
|
|
|
435
|
0
|
|
|
|
|
|
my $delivery_tag = $message->{ delivery_tag }; |
|
436
|
0
|
0
|
|
|
|
|
unless ( $delivery_tag ) |
|
437
|
|
|
|
|
|
|
{ |
|
438
|
0
|
|
|
|
|
|
LOGDIE( "'delivery_tag' is empty." ); |
|
439
|
|
|
|
|
|
|
} |
|
440
|
|
|
|
|
|
|
|
|
441
|
0
|
|
|
|
|
|
my $payload_json = $message->{ body }; |
|
442
|
0
|
0
|
|
|
|
|
unless ( $payload_json ) |
|
443
|
|
|
|
|
|
|
{ |
|
444
|
0
|
|
|
|
|
|
LOGDIE( 'Message payload is empty.' ); |
|
445
|
|
|
|
|
|
|
} |
|
446
|
|
|
|
|
|
|
|
|
447
|
0
|
|
|
|
|
|
my $payload; |
|
448
|
0
|
|
|
|
|
|
eval { $payload = $json->decode( $payload_json ); }; |
|
|
0
|
|
|
|
|
|
|
|
449
|
0
|
0
|
0
|
|
|
|
if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) ) |
|
|
|
|
0
|
|
|
|
|
|
450
|
|
|
|
|
|
|
{ |
|
451
|
0
|
|
|
|
|
|
LOGDIE( 'Unable to decode payload JSON: ' . $@ ); |
|
452
|
|
|
|
|
|
|
} |
|
453
|
|
|
|
|
|
|
|
|
454
|
0
|
0
|
|
|
|
|
if ( $payload->{ task } ne $function_name ) |
|
455
|
|
|
|
|
|
|
{ |
|
456
|
0
|
|
|
|
|
|
LOGDIE( "Task name is not '$function_name'; maybe you're using same queue for multiple types of jobs?" ); |
|
457
|
|
|
|
|
|
|
} |
|
458
|
|
|
|
|
|
|
|
|
459
|
0
|
|
|
|
|
|
my $celery_job_id = $payload->{ id }; |
|
460
|
0
|
|
|
|
|
|
my $args = $payload->{ kwargs }; |
|
461
|
|
|
|
|
|
|
|
|
462
|
|
|
|
|
|
|
|
|
463
|
0
|
|
|
|
|
|
my $job_result; |
|
464
|
0
|
|
|
|
|
|
eval { $job_result = $function_name->run_locally( $args, $celery_job_id ); }; |
|
|
0
|
|
|
|
|
|
|
|
465
|
0
|
|
|
|
|
|
my $error_message = $@; |
|
466
|
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
|
|
470
|
0
|
0
|
|
|
|
|
if ( $reply_to ) |
|
471
|
|
|
|
|
|
|
{ |
|
472
|
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
|
|
474
|
0
|
|
|
|
|
|
my $response; |
|
475
|
0
|
0
|
|
|
|
|
if ( $error_message ) |
|
476
|
|
|
|
|
|
|
{ |
|
477
|
0
|
|
|
|
|
|
ERROR( "Job '$celery_job_id' died: $@" ); |
|
478
|
0
|
|
|
|
|
|
$response = { |
|
479
|
|
|
|
|
|
|
'status' => 'FAILURE', |
|
480
|
|
|
|
|
|
|
'traceback' => "Job died: $error_message", |
|
481
|
|
|
|
|
|
|
'result' => { |
|
482
|
|
|
|
|
|
|
'exc_message' => 'Task has failed', |
|
483
|
|
|
|
|
|
|
'exc_type' => 'Exception', |
|
484
|
|
|
|
|
|
|
}, |
|
485
|
|
|
|
|
|
|
'task_id' => $celery_job_id, |
|
486
|
|
|
|
|
|
|
'children' => [] |
|
487
|
|
|
|
|
|
|
}; |
|
488
|
|
|
|
|
|
|
} |
|
489
|
|
|
|
|
|
|
else |
|
490
|
|
|
|
|
|
|
{ |
|
491
|
0
|
|
|
|
|
|
$response = { |
|
492
|
|
|
|
|
|
|
'status' => 'SUCCESS', |
|
493
|
|
|
|
|
|
|
'traceback' => undef, |
|
494
|
|
|
|
|
|
|
'result' => $job_result, |
|
495
|
|
|
|
|
|
|
'task_id' => $celery_job_id, |
|
496
|
|
|
|
|
|
|
'children' => [] |
|
497
|
|
|
|
|
|
|
}; |
|
498
|
|
|
|
|
|
|
} |
|
499
|
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
|
|
501
|
0
|
|
|
|
|
|
eval { |
|
502
|
0
|
|
|
|
|
|
$self->_declare_results_queue( $reply_to, $function_name->lazy_queue() ); |
|
503
|
0
|
|
|
|
|
|
$self->_publish_json_message( |
|
504
|
|
|
|
|
|
|
$reply_to, |
|
505
|
|
|
|
|
|
|
$response, |
|
506
|
|
|
|
|
|
|
{ |
|
507
|
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
}, |
|
509
|
|
|
|
|
|
|
{ |
|
510
|
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
delivery_mode => $RABBITMQ_DELIVERY_MODE_NONPERSISTENT, |
|
512
|
|
|
|
|
|
|
priority => $priority, |
|
513
|
|
|
|
|
|
|
correlation_id => $celery_job_id, |
|
514
|
|
|
|
|
|
|
} |
|
515
|
|
|
|
|
|
|
); |
|
516
|
|
|
|
|
|
|
}; |
|
517
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
518
|
|
|
|
|
|
|
{ |
|
519
|
0
|
|
|
|
|
|
LOGDIE( "Unable to publish job $celery_job_id result: $@" ); |
|
520
|
|
|
|
|
|
|
} |
|
521
|
|
|
|
|
|
|
} |
|
522
|
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
|
|
524
|
0
|
|
|
|
|
|
eval { $mq->ack( _channel_number(), $delivery_tag ); }; |
|
|
0
|
|
|
|
|
|
|
|
525
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
526
|
|
|
|
|
|
|
{ |
|
527
|
0
|
|
|
|
|
|
LOGDIE( "Unable to mark job $celery_job_id as completed: $@" ); |
|
528
|
|
|
|
|
|
|
} |
|
529
|
|
|
|
|
|
|
} |
|
530
|
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
sub start_worker($$) |
|
532
|
|
|
|
|
|
|
{ |
|
533
|
0
|
|
|
0
|
0
|
|
my ( $self, $function_name ) = @_; |
|
534
|
|
|
|
|
|
|
|
|
535
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
536
|
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
|
$self->_declare_task_queue( $function_name, $function_name->lazy_queue() ); |
|
538
|
|
|
|
|
|
|
|
|
539
|
0
|
|
|
|
|
|
my $consume_options = { |
|
540
|
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
no_ack => 0, |
|
543
|
|
|
|
|
|
|
}; |
|
544
|
0
|
|
|
|
|
|
my $consumer_tag = $mq->consume( _channel_number(), $function_name, $consume_options ); |
|
545
|
|
|
|
|
|
|
|
|
546
|
0
|
|
|
|
|
|
INFO( "Consumer tag: $consumer_tag" ); |
|
547
|
0
|
|
|
|
|
|
INFO( "Worker is ready and accepting jobs" ); |
|
548
|
0
|
|
|
|
|
|
my $recv_timeout = 0; |
|
549
|
0
|
|
|
|
|
|
while ( my $message = $mq->recv( 0 ) ) |
|
550
|
|
|
|
|
|
|
{ |
|
551
|
0
|
|
|
|
|
|
$self->_process_worker_message( $function_name, $message ); |
|
552
|
|
|
|
|
|
|
} |
|
553
|
|
|
|
|
|
|
} |
|
554
|
|
|
|
|
|
|
|
|
555
|
|
|
|
|
|
|
sub run_job_sync($$$$) |
|
556
|
|
|
|
|
|
|
{ |
|
557
|
0
|
|
|
0
|
0
|
|
my ( $self, $function_name, $args, $priority ) = @_; |
|
558
|
|
|
|
|
|
|
|
|
559
|
0
|
|
|
|
|
|
my $mq = $self->_mq(); |
|
560
|
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
|
|
562
|
0
|
|
|
|
|
|
my $publish_results = 1; |
|
563
|
0
|
|
|
|
|
|
my $celery_job_id = $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $publish_results ); |
|
564
|
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
|
|
566
|
0
|
|
|
|
|
|
my $reply_to_queue = $self->_reply_to_queue( $function_name ); |
|
567
|
0
|
|
|
|
|
|
eval { $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() ); }; |
|
|
0
|
|
|
|
|
|
|
|
568
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
569
|
|
|
|
|
|
|
{ |
|
570
|
0
|
|
|
|
|
|
LOGDIE( "Unable to declare results queue '$reply_to_queue': $@" ); |
|
571
|
|
|
|
|
|
|
} |
|
572
|
|
|
|
|
|
|
|
|
573
|
0
|
|
|
|
|
|
my $results_cache = $self->_results_cache_hashref( $function_name ); |
|
574
|
|
|
|
|
|
|
|
|
575
|
0
|
|
|
|
|
|
my $message; |
|
576
|
0
|
0
|
|
|
|
|
if ( exists $results_cache->{ $celery_job_id } ) |
|
577
|
|
|
|
|
|
|
{ |
|
578
|
|
|
|
|
|
|
|
|
579
|
0
|
|
|
|
|
|
DEBUG( "Results message for job ID '$celery_job_id' found in cache" ); |
|
580
|
0
|
|
|
|
|
|
$message = $results_cache->{ $celery_job_id }; |
|
581
|
0
|
|
|
|
|
|
delete $results_cache->{ $celery_job_id }; |
|
582
|
|
|
|
|
|
|
|
|
583
|
|
|
|
|
|
|
} |
|
584
|
|
|
|
|
|
|
else |
|
585
|
|
|
|
|
|
|
{ |
|
586
|
|
|
|
|
|
|
|
|
587
|
|
|
|
|
|
|
|
|
588
|
0
|
|
|
|
|
|
my $channel_number = _channel_number(); |
|
589
|
0
|
|
|
|
|
|
my $consume_options = {}; |
|
590
|
0
|
|
|
|
|
|
my $consumer_tag = $mq->consume( $channel_number, $reply_to_queue, $consume_options ); |
|
591
|
|
|
|
|
|
|
|
|
592
|
0
|
|
|
|
|
|
my $recv_timeout = 0; |
|
593
|
|
|
|
|
|
|
|
|
594
|
0
|
|
|
|
|
|
while ( my $queue_message = $mq->recv( 0 ) ) |
|
595
|
|
|
|
|
|
|
{ |
|
596
|
0
|
|
|
|
|
|
my $correlation_id = $queue_message->{ props }->{ correlation_id }; |
|
597
|
0
|
0
|
|
|
|
|
unless ( $correlation_id ) |
|
598
|
|
|
|
|
|
|
{ |
|
599
|
0
|
|
|
|
|
|
LOGDIE( '"correlation_id" is empty.' ); |
|
600
|
|
|
|
|
|
|
} |
|
601
|
|
|
|
|
|
|
|
|
602
|
0
|
0
|
|
|
|
|
if ( $correlation_id eq $celery_job_id ) |
|
603
|
|
|
|
|
|
|
{ |
|
604
|
0
|
|
|
|
|
|
DEBUG( "Found results message with job ID '$celery_job_id'." ); |
|
605
|
0
|
|
|
|
|
|
$message = $queue_message; |
|
606
|
0
|
|
|
|
|
|
last; |
|
607
|
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
} |
|
609
|
|
|
|
|
|
|
else |
|
610
|
|
|
|
|
|
|
{ |
|
611
|
|
|
|
|
|
|
|
|
612
|
0
|
|
|
|
|
|
DEBUG( "Results message '$correlation_id' does not belong to job ID '$celery_job_id'." ); |
|
613
|
0
|
|
|
|
|
|
$results_cache->{ $correlation_id } = $queue_message; |
|
614
|
|
|
|
|
|
|
} |
|
615
|
|
|
|
|
|
|
} |
|
616
|
|
|
|
|
|
|
} |
|
617
|
|
|
|
|
|
|
|
|
618
|
0
|
0
|
|
|
|
|
unless ( $message ) |
|
619
|
|
|
|
|
|
|
{ |
|
620
|
0
|
|
|
|
|
|
LOGDIE( "At this point, message should have been fetched either from broker or from cache" ); |
|
621
|
|
|
|
|
|
|
} |
|
622
|
|
|
|
|
|
|
|
|
623
|
0
|
|
|
|
|
|
my $correlation_id = $message->{ props }->{ correlation_id }; |
|
624
|
0
|
0
|
|
|
|
|
unless ( $correlation_id ) |
|
625
|
|
|
|
|
|
|
{ |
|
626
|
0
|
|
|
|
|
|
LOGDIE( '"correlation_id" is empty.' ); |
|
627
|
|
|
|
|
|
|
} |
|
628
|
0
|
0
|
|
|
|
|
if ( $correlation_id ne $celery_job_id ) |
|
629
|
|
|
|
|
|
|
{ |
|
630
|
|
|
|
|
|
|
|
|
631
|
0
|
|
|
|
|
|
DEBUG( "'correlation_id' ('$correlation_id') is not equal to job ID ('$celery_job_id')." ); |
|
632
|
0
|
|
|
|
|
|
next; |
|
633
|
|
|
|
|
|
|
} |
|
634
|
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
|
my $payload_json = $message->{ body }; |
|
636
|
0
|
0
|
|
|
|
|
unless ( $payload_json ) |
|
637
|
|
|
|
|
|
|
{ |
|
638
|
0
|
|
|
|
|
|
LOGDIE( 'Message payload is empty.' ); |
|
639
|
|
|
|
|
|
|
} |
|
640
|
|
|
|
|
|
|
|
|
641
|
0
|
|
|
|
|
|
my $payload; |
|
642
|
0
|
|
|
|
|
|
eval { $payload = $json->decode( $payload_json ); }; |
|
|
0
|
|
|
|
|
|
|
|
643
|
0
|
0
|
0
|
|
|
|
if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) ) |
|
|
|
|
0
|
|
|
|
|
|
644
|
|
|
|
|
|
|
{ |
|
645
|
0
|
|
|
|
|
|
LOGDIE( 'Unable to decode payload JSON: ' . $@ ); |
|
646
|
|
|
|
|
|
|
} |
|
647
|
|
|
|
|
|
|
|
|
648
|
0
|
0
|
|
|
|
|
if ( $payload->{ task_id } ne $celery_job_id ) |
|
649
|
|
|
|
|
|
|
{ |
|
650
|
0
|
|
|
|
|
|
LOGDIE( "'task_id' ('$payload->{ task_id }') is not equal to job ID ('$celery_job_id')." ); |
|
651
|
|
|
|
|
|
|
} |
|
652
|
|
|
|
|
|
|
|
|
653
|
|
|
|
|
|
|
|
|
654
|
0
|
0
|
|
|
|
|
if ( $payload->{ status } eq 'SUCCESS' ) |
|
|
|
0
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
{ |
|
656
|
|
|
|
|
|
|
|
|
657
|
0
|
|
|
|
|
|
return $payload->{ result }; |
|
658
|
|
|
|
|
|
|
|
|
659
|
|
|
|
|
|
|
} |
|
660
|
|
|
|
|
|
|
elsif ( $payload->{ status } eq 'FAILURE' ) |
|
661
|
|
|
|
|
|
|
{ |
|
662
|
|
|
|
|
|
|
|
|
663
|
0
|
|
|
|
|
|
LOGDIE( "Job '$celery_job_id' failed: " . $payload->{ traceback } ); |
|
664
|
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
} |
|
666
|
|
|
|
|
|
|
else |
|
667
|
|
|
|
|
|
|
{ |
|
668
|
|
|
|
|
|
|
|
|
669
|
0
|
|
|
|
|
|
LOGDIE( "Unknown 'status' value: " . $payload->{ status } ); |
|
670
|
|
|
|
|
|
|
} |
|
671
|
|
|
|
|
|
|
} |
|
672
|
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
sub run_job_async($$$$) |
|
674
|
|
|
|
|
|
|
{ |
|
675
|
0
|
|
|
0
|
0
|
|
my ( $self, $function_name, $args, $priority ) = @_; |
|
676
|
|
|
|
|
|
|
|
|
677
|
0
|
|
|
|
|
|
return $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $function_name->publish_results() ); |
|
678
|
|
|
|
|
|
|
} |
|
679
|
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
sub _run_job_on_rabbitmq($$$$$) |
|
681
|
|
|
|
|
|
|
{ |
|
682
|
0
|
|
|
0
|
|
|
my ( $self, $function_name, $args, $priority, $publish_results ) = @_; |
|
683
|
|
|
|
|
|
|
|
|
684
|
0
|
0
|
|
|
|
|
unless ( defined( $args ) ) |
|
685
|
|
|
|
|
|
|
{ |
|
686
|
0
|
|
|
|
|
|
$args = {}; |
|
687
|
|
|
|
|
|
|
} |
|
688
|
0
|
0
|
|
|
|
|
unless ( ref( $args ) eq ref( {} ) ) |
|
689
|
|
|
|
|
|
|
{ |
|
690
|
0
|
|
|
|
|
|
LOGDIE( "'args' is not a hashref." ); |
|
691
|
|
|
|
|
|
|
} |
|
692
|
|
|
|
|
|
|
|
|
693
|
0
|
|
|
|
|
|
my $celery_job_id = create_uuid_as_string( UUID_RANDOM ); |
|
694
|
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
|
|
696
|
0
|
|
|
|
|
|
my $payload = { |
|
697
|
|
|
|
|
|
|
'expires' => undef, |
|
698
|
|
|
|
|
|
|
'utc' => JSON::true, |
|
699
|
|
|
|
|
|
|
'args' => [], |
|
700
|
|
|
|
|
|
|
'chord' => undef, |
|
701
|
|
|
|
|
|
|
'callbacks' => undef, |
|
702
|
|
|
|
|
|
|
'errbacks' => undef, |
|
703
|
|
|
|
|
|
|
'taskset' => undef, |
|
704
|
|
|
|
|
|
|
'id' => $celery_job_id, |
|
705
|
|
|
|
|
|
|
'retries' => $function_name->retries(), |
|
706
|
|
|
|
|
|
|
'task' => $function_name, |
|
707
|
|
|
|
|
|
|
'timelimit' => [ undef, undef, ], |
|
708
|
|
|
|
|
|
|
'eta' => undef, |
|
709
|
|
|
|
|
|
|
'kwargs' => $args, |
|
710
|
|
|
|
|
|
|
}; |
|
711
|
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
|
|
713
|
0
|
|
|
|
|
|
$self->_declare_task_queue( $function_name, $function_name->lazy_queue() ); |
|
714
|
|
|
|
|
|
|
|
|
715
|
0
|
|
|
|
|
|
my $reply_to_queue; |
|
716
|
0
|
0
|
|
|
|
|
if ( $publish_results ) |
|
717
|
|
|
|
|
|
|
{ |
|
718
|
|
|
|
|
|
|
|
|
719
|
0
|
|
|
|
|
|
$reply_to_queue = $self->_reply_to_queue( $function_name ); |
|
720
|
0
|
|
|
|
|
|
$self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() ); |
|
721
|
|
|
|
|
|
|
} |
|
722
|
|
|
|
|
|
|
else |
|
723
|
|
|
|
|
|
|
{ |
|
724
|
0
|
|
|
|
|
|
$reply_to_queue = ''; |
|
725
|
|
|
|
|
|
|
} |
|
726
|
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
|
|
728
|
0
|
|
|
|
|
|
eval { |
|
729
|
0
|
|
|
|
|
|
my $rabbitmq_priority = _priority_to_int( $priority ); |
|
730
|
0
|
|
|
|
|
|
$self->_publish_json_message( |
|
731
|
|
|
|
|
|
|
$function_name, |
|
732
|
|
|
|
|
|
|
$payload, |
|
733
|
|
|
|
|
|
|
{ |
|
734
|
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
exchange => $function_name |
|
736
|
|
|
|
|
|
|
}, |
|
737
|
|
|
|
|
|
|
{ |
|
738
|
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
delivery_mode => $RABBITMQ_DELIVERY_MODE_PERSISTENT, |
|
740
|
|
|
|
|
|
|
priority => $rabbitmq_priority, |
|
741
|
|
|
|
|
|
|
correlation_id => $celery_job_id, |
|
742
|
|
|
|
|
|
|
reply_to => $reply_to_queue, |
|
743
|
|
|
|
|
|
|
} |
|
744
|
|
|
|
|
|
|
); |
|
745
|
|
|
|
|
|
|
}; |
|
746
|
0
|
0
|
|
|
|
|
if ( $@ ) |
|
747
|
|
|
|
|
|
|
{ |
|
748
|
0
|
|
|
|
|
|
LOGDIE( "Unable to add job '$celery_job_id' to queue: $@" ); |
|
749
|
|
|
|
|
|
|
} |
|
750
|
|
|
|
|
|
|
|
|
751
|
0
|
|
|
|
|
|
return $celery_job_id; |
|
752
|
|
|
|
|
|
|
} |
|
753
|
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
sub job_id_from_handle($$) |
|
755
|
|
|
|
|
|
|
{ |
|
756
|
0
|
|
|
0
|
0
|
|
my ( $self, $job_handle ) = @_; |
|
757
|
|
|
|
|
|
|
|
|
758
|
0
|
|
|
|
|
|
return $job_handle; |
|
759
|
|
|
|
|
|
|
} |
|
760
|
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
sub set_job_progress($$$$) |
|
762
|
|
|
|
|
|
|
{ |
|
763
|
0
|
|
|
0
|
0
|
|
my ( $self, $job, $numerator, $denominator ) = @_; |
|
764
|
|
|
|
|
|
|
|
|
765
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
766
|
|
|
|
|
|
|
} |
|
767
|
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
sub job_status($$$) |
|
769
|
|
|
|
|
|
|
{ |
|
770
|
0
|
|
|
0
|
0
|
|
my ( $self, $function_name, $job_id ) = @_; |
|
771
|
|
|
|
|
|
|
|
|
772
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
773
|
|
|
|
|
|
|
} |
|
774
|
|
|
|
|
|
|
|
|
775
|
|
|
|
|
|
|
sub show_jobs($) |
|
776
|
|
|
|
|
|
|
{ |
|
777
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
778
|
|
|
|
|
|
|
|
|
779
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
780
|
|
|
|
|
|
|
} |
|
781
|
|
|
|
|
|
|
|
|
782
|
|
|
|
|
|
|
sub cancel_job($) |
|
783
|
|
|
|
|
|
|
{ |
|
784
|
0
|
|
|
0
|
0
|
|
my ( $self, $job_id ) = @_; |
|
785
|
|
|
|
|
|
|
|
|
786
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
787
|
|
|
|
|
|
|
} |
|
788
|
|
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
sub server_status($$) |
|
790
|
|
|
|
|
|
|
{ |
|
791
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
792
|
|
|
|
|
|
|
|
|
793
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
794
|
|
|
|
|
|
|
} |
|
795
|
|
|
|
|
|
|
|
|
796
|
|
|
|
|
|
|
sub workers($) |
|
797
|
|
|
|
|
|
|
{ |
|
798
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
799
|
|
|
|
|
|
|
|
|
800
|
0
|
|
|
|
|
|
LOGDIE( "FIXME not implemented." ); |
|
801
|
|
|
|
|
|
|
} |
|
802
|
|
|
|
|
|
|
|
|
803
|
1
|
|
|
1
|
|
19
|
no Moose; |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
16
|
|
|
804
|
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
1; |
|
806
|
|
|
|
|
|
|
|