line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Amazon::SQS::Producer; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
24630
|
use 5.006; |
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
42
|
|
4
|
1
|
|
|
1
|
|
7
|
use strict; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
39
|
|
5
|
1
|
|
|
1
|
|
8
|
use warnings; |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
33
|
|
6
|
|
|
|
|
|
|
|
7
|
1
|
|
|
1
|
|
7
|
use base 'Amazon::SQS::ProducerConsumer::Base'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
700
|
|
8
|
|
|
|
|
|
|
use JSON::XS; |
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
use constant MAX_RETRIES => 3; |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
=head1 NAME |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
Amazon::SQS::Producer - Publish messages to an Amazon Simple Queue Service (SQS) queue |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=cut |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; } |
19
|
|
|
|
|
|
|
$SIG{INT} = sub { say 'caught signal INT'; exit 0; }; |
20
|
|
|
|
|
|
|
$SIG{CHLD} = 'IGNORE'; |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
=head1 SYNOPSIS |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
use Amazon::SQS::Producer; |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
my $out_queue = new Amazon::SQS::Producer |
27
|
|
|
|
|
|
|
AWSAccessKeyId => 'PUBLIC_KEY_HERE', |
28
|
|
|
|
|
|
|
SecretAccessKey => 'SECRET_KEY_HERE', |
29
|
|
|
|
|
|
|
queue => 'YourOutputQueue', |
30
|
|
|
|
|
|
|
consumer => 'ConsumerForOutputQueue'; |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
$out_queue->publish( |
33
|
|
|
|
|
|
|
$existingObjectRef, |
34
|
|
|
|
|
|
|
url => $enclosure_URL, |
35
|
|
|
|
|
|
|
pubdate => $pubDate, |
36
|
|
|
|
|
|
|
title => $title, |
37
|
|
|
|
|
|
|
description => $description, |
38
|
|
|
|
|
|
|
rss_guid => $guid, |
39
|
|
|
|
|
|
|
); |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
=head1 METHODS |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
=head2 new(%params) |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
This is the constructor, it will return you an Amazon::SQS::Producer object to work with. It takes these parameters: |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
=over |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
=item AWSAccessKeyId (required) |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
Your AWS access key. |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=item SecretAccessKey (required) |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf. |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=item queue (required) |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
The URL of the queue to publish messages to. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
=item consumer (optional) |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
The name of an executable that will consume messages from the queue we're publishing to. An instance will be launched after the each message is published, up to the maximum set by... |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=item start_consumers (optional) |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
The maximum number of consumer instance to launch. |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
=item debug (optional) |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
A flag to turn on debugging. It is turned off by default. |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
=back |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
=cut |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
sub new { |
78
|
|
|
|
|
|
|
my $class = shift; |
79
|
|
|
|
|
|
|
my %args = @_; |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
my $me = \%args; |
82
|
|
|
|
|
|
|
bless $me, $class; |
83
|
|
|
|
|
|
|
$me->initialize; |
84
|
|
|
|
|
|
|
return $me; |
85
|
|
|
|
|
|
|
} |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
sub initialize { |
88
|
|
|
|
|
|
|
my $me = shift; |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
$me->{sleep_after_starting_consumer} = 2 if not exists $me->{sleep_after_starting_consumer}; |
91
|
|
|
|
|
|
|
$me->SUPER::initialize; |
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=head2 publish(%params) |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
This will publish a message to this Publisher's queue, and start a consumer if this is the first message this Publisher has published. The message body will be a JSON representaton of the method's argument hash. If the first argument is a reference to a hash it will be dereferenced and merged with the other parameters given. |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
=cut |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
sub publish { |
101
|
|
|
|
|
|
|
if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer } |
102
|
|
|
|
|
|
|
if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer } |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
my $me = shift; |
105
|
|
|
|
|
|
|
my $old_data = shift if ref $_[0]; |
106
|
|
|
|
|
|
|
my $data = { %$old_data, @_ }; |
107
|
|
|
|
|
|
|
my $encoded_data = encode_json $data; |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
say "Queueing message: $encoded_data" if $data->{_debug}; |
110
|
|
|
|
|
|
|
return if $data->{_test}; |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
my $retries; |
113
|
|
|
|
|
|
|
my $message_id; |
114
|
|
|
|
|
|
|
until ( |
115
|
|
|
|
|
|
|
$message_id = $me->send_message( |
116
|
|
|
|
|
|
|
Queue => $me->{queue}, |
117
|
|
|
|
|
|
|
MessageBody => $encoded_data, |
118
|
|
|
|
|
|
|
) |
119
|
|
|
|
|
|
|
) { |
120
|
|
|
|
|
|
|
say "couldn't queue message: ", $me->error; |
121
|
|
|
|
|
|
|
if ( $retries++ < MAX_RETRIES ) { |
122
|
|
|
|
|
|
|
say "trying again in $retries seconds"; |
123
|
|
|
|
|
|
|
sleep $retries; |
124
|
|
|
|
|
|
|
} else { |
125
|
|
|
|
|
|
|
say "giving up trying to publish to queue $me->{queue} with message body: $encoded_data", |
126
|
|
|
|
|
|
|
return; |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
} |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
if ( $me->{consumer} and $me->{started_consumers}++ < $me->{start_consumers} ) { |
131
|
|
|
|
|
|
|
my $pid = fork; |
132
|
|
|
|
|
|
|
if ( not defined $pid ) { |
133
|
|
|
|
|
|
|
say "couldn't fork"; |
134
|
|
|
|
|
|
|
} elsif ( not $pid ) { |
135
|
|
|
|
|
|
|
close STDIN; open STDIN, '/dev/null'; |
136
|
|
|
|
|
|
|
close STDOUT; open STDOUT, '/dev/null'; |
137
|
|
|
|
|
|
|
close STDERR; open STDERR, '>>/tmp/getfeeds.log'; |
138
|
|
|
|
|
|
|
sleep $me->{sleep_after_starting_consumer}; |
139
|
|
|
|
|
|
|
exec $me->{consumer}; |
140
|
|
|
|
|
|
|
} else { |
141
|
|
|
|
|
|
|
say "started consumer $me->{consumer} with PID $pid for queue $me->{queue}"; |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
} |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
return $message_id; |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
} |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
sub fork_and_publish { |
150
|
|
|
|
|
|
|
my $me = shift; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
my $pid = fork; |
153
|
|
|
|
|
|
|
if ( not defined $pid ) { |
154
|
|
|
|
|
|
|
say "couldn't fork"; |
155
|
|
|
|
|
|
|
} elsif ( not $pid ) { |
156
|
|
|
|
|
|
|
$me->publish( @_ ); |
157
|
|
|
|
|
|
|
} else { |
158
|
|
|
|
|
|
|
say "forked to publish to queue $me->{queue} with PID $pid"; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
} |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
sub fork_consumer { |
163
|
|
|
|
|
|
|
my $me = shift; |
164
|
|
|
|
|
|
|
my $old_data = shift if ref $_[0]; |
165
|
|
|
|
|
|
|
my %data = @_; |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
if ( $me->{consumer} ) { |
168
|
|
|
|
|
|
|
my $pid = fork; |
169
|
|
|
|
|
|
|
if ( not defined $pid ) { |
170
|
|
|
|
|
|
|
say "couldn't fork"; |
171
|
|
|
|
|
|
|
} elsif ( not $pid ) { |
172
|
|
|
|
|
|
|
close STDIN; open STDIN, '/dev/null'; |
173
|
|
|
|
|
|
|
close STDOUT; open STDOUT, '/dev/null'; |
174
|
|
|
|
|
|
|
close STDERR; open STDERR, '>>/tmp/getfeeds.log'; |
175
|
|
|
|
|
|
|
$ENV{PATH} .= ':.'; |
176
|
|
|
|
|
|
|
sleep $me->{sleep_after_starting_consumer}; |
177
|
|
|
|
|
|
|
exec $me->{consumer}, encode_json { %$old_data, %data }; |
178
|
|
|
|
|
|
|
} else { |
179
|
|
|
|
|
|
|
say "forked consumer $me->{consumer} with PID $pid"; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
} |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
=head1 AUTHOR |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
Nic Wolff, |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=head1 BUGS |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
Please report any bugs or feature requests to C, or through |
191
|
|
|
|
|
|
|
the web interface at L. I will be notified, and then you'll |
192
|
|
|
|
|
|
|
automatically be notified of progress on your bug as I make changes. |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
=head1 SUPPORT |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
You can find documentation for this module with the perldoc command. |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
perldoc Amazon::SQS::ProducerConsumer |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
You can also look for information at: |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=over 4 |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
=item * RT: CPAN's request tracker (report bugs here) |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
L |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
=item * AnnoCPAN: Annotated CPAN documentation |
213
|
|
|
|
|
|
|
|
214
|
|
|
|
|
|
|
L |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
=item * CPAN Ratings |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
L |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
=item * Search CPAN |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
L |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
=back |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
=head1 LICENSE AND COPYRIGHT |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
Copyright 2011 Nic Wolff. |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
235
|
|
|
|
|
|
|
under the terms of either: the GNU General Public License as published |
236
|
|
|
|
|
|
|
by the Free Software Foundation; or the Artistic License. |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
See http://dev.perl.org/licenses/ for more information. |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=cut |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
1; # End of Amazon::SQS::ProducerConsumer |