line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Amazon::SQS::Consumer; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
6129
|
use 5.006; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
46
|
|
4
|
1
|
|
|
1
|
|
6
|
use strict; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
38
|
|
5
|
1
|
|
|
1
|
|
6
|
use warnings; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
44
|
|
6
|
|
|
|
|
|
|
|
7
|
1
|
|
|
1
|
|
7
|
use base 'Amazon::SQS::ProducerConsumer::Base'; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
113
|
|
8
|
|
|
|
|
|
|
use JSON::XS; |
9
|
|
|
|
|
|
|
use Encode qw( encode_utf8 is_utf8 ); |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
use constant { |
12
|
|
|
|
|
|
|
DEFAULT_N_MESSAGES => 10, |
13
|
|
|
|
|
|
|
DEFAULT_WAIT_SECONDS => 30, |
14
|
|
|
|
|
|
|
SECONDS_BETWEEN_TRIES => 10 |
15
|
|
|
|
|
|
|
}; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
=head1 NAME |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
Amazon::SQS::Consumer - Receive messages from an Amazon Simple Queue Service (SQS) queue |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
=cut |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; } |
24
|
|
|
|
|
|
|
$SIG{INT} = sub { say 'caught signal INT'; exit 0; }; |
25
|
|
|
|
|
|
|
$SIG{CHLD} = 'IGNORE'; |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 SYNOPSIS |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
use Amazon::SQS::Consumer; |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
my $in_queue = new Amazon::SQS::Consumer |
32
|
|
|
|
|
|
|
AWSAccessKeyId => 'PUBLIC_KEY_HERE', |
33
|
|
|
|
|
|
|
SecretAccessKey => 'SECRET_KEY_HERE', |
34
|
|
|
|
|
|
|
queue => 'YourInputQueue'; |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
while ( my $item = $in_queue->next ) { |
37
|
|
|
|
|
|
|
# Do stuff with the item |
38
|
|
|
|
|
|
|
} |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
=head1 METHODS |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
=head2 new(%params) |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
This is the constructor, it will return you an Amazon::SQS::Consumer object to work with. It takes these parameters: |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
=over |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
=item AWSAccessKeyId (required) |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
Your AWS access key. |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
=item SecretAccessKey (required) |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf. |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=item queue (required) |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
The URL of the queue to receive messages from. |
59
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
=item wait_seconds (optional) |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
The number of seconds to wait for a new message when the queue is empty. |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
=item debug (optional) |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
A flag to turn on debugging. It is turned off by default. |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=back |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
=cut |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
sub new { |
73
|
|
|
|
|
|
|
my $class = shift; |
74
|
|
|
|
|
|
|
my %args = @_; |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
my $me = \%args; |
77
|
|
|
|
|
|
|
bless $me, $class; |
78
|
|
|
|
|
|
|
$me->initialize; |
79
|
|
|
|
|
|
|
return $me; |
80
|
|
|
|
|
|
|
} |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub initialize { |
83
|
|
|
|
|
|
|
my $me = shift; |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
$me->{n_messages} ||= DEFAULT_N_MESSAGES; |
86
|
|
|
|
|
|
|
$me->{wait_seconds} ||= DEFAULT_WAIT_SECONDS; |
87
|
|
|
|
|
|
|
$me->SUPER::initialize; |
88
|
|
|
|
|
|
|
} |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=head2 next() |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
This will receive a message from this Publisher's queue. When the queue is empty it will wait a new message for wait_seconds seconds. |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=cut |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
sub next { |
97
|
|
|
|
|
|
|
my $me = shift; |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
# If we're done with the previous message, delete it |
100
|
|
|
|
|
|
|
$me->delete_previous(); |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
if ( @ARGV ) { |
103
|
|
|
|
|
|
|
$me->{messages} = [ map { MessageId => undef, Body => $_ }, @ARGV ]; |
104
|
|
|
|
|
|
|
undef @ARGV; |
105
|
|
|
|
|
|
|
$me->{no_loop} = 't'; |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
my $seconds_to_wait = $me->{wait_seconds}; |
109
|
|
|
|
|
|
|
do { |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
# If there no messages in the cache, get some from the queue |
112
|
|
|
|
|
|
|
$me->{messages} = $me->receive_messages( |
113
|
|
|
|
|
|
|
Queue => $me->{queue}, |
114
|
|
|
|
|
|
|
MaxNumberOfMessages => $me->{n_messages}, |
115
|
|
|
|
|
|
|
defined $me->{timeout} ? ( VisibilityTimeout => $me->{timeout} ) : () |
116
|
|
|
|
|
|
|
) unless defined $me->{messages} && @{$me->{messages}} or $me->{no_loop}; |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
# If there's a message in the cache, return it |
119
|
|
|
|
|
|
|
if ( my $message = shift @{$me->{messages}} ) { |
120
|
|
|
|
|
|
|
$me->{DeleteMessageHandle} = $message->{ReceiptHandle}; |
121
|
|
|
|
|
|
|
my $object; |
122
|
|
|
|
|
|
|
eval { |
123
|
|
|
|
|
|
|
my $body = $message->{Body}; |
124
|
|
|
|
|
|
|
$body = encode_utf8( $body ) if is_utf8( $body ); |
125
|
|
|
|
|
|
|
$object = decode_json $body; |
126
|
|
|
|
|
|
|
}; |
127
|
|
|
|
|
|
|
if ( $@ ) { |
128
|
|
|
|
|
|
|
say "left bad message in queue; could not decode JSON from $message->{Body}: $@"; |
129
|
|
|
|
|
|
|
} else { |
130
|
|
|
|
|
|
|
return $object; |
131
|
|
|
|
|
|
|
} |
132
|
|
|
|
|
|
|
} elsif ( $me->{no_loop} ) { |
133
|
|
|
|
|
|
|
$seconds_to_wait = 0; |
134
|
|
|
|
|
|
|
} else { |
135
|
|
|
|
|
|
|
# Otherwise, wait a few seconds and try again |
136
|
|
|
|
|
|
|
say "waiting $seconds_to_wait seconds for new messages" |
137
|
|
|
|
|
|
|
if $seconds_to_wait == $me->{wait_seconds}; |
138
|
|
|
|
|
|
|
sleep SECONDS_BETWEEN_TRIES; |
139
|
|
|
|
|
|
|
$seconds_to_wait -= SECONDS_BETWEEN_TRIES; |
140
|
|
|
|
|
|
|
} |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
} while ( $me->{forever} or $seconds_to_wait > 0 ); |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
# If we've retried for a while and gotten no messages, give up |
145
|
|
|
|
|
|
|
return undef; |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
} |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
sub delete_previous { |
150
|
|
|
|
|
|
|
my $me = shift; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
if ( $me->{DeleteMessageHandle} ) { |
153
|
|
|
|
|
|
|
say "deleting message $me->{DeleteMessageHandle}" if $me->{debug}; |
154
|
|
|
|
|
|
|
$me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} ); |
155
|
|
|
|
|
|
|
} |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
sub defer { delete $_[0]->{DeleteMessageHandle} } |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
=head1 AUTHOR |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
Nic Wolff, |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
=head1 BUGS |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
Please report any bugs or feature requests to C, or through |
168
|
|
|
|
|
|
|
the web interface at L. I will be notified, and then you'll |
169
|
|
|
|
|
|
|
automatically be notified of progress on your bug as I make changes. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
=head1 SUPPORT |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
You can find documentation for this module with the perldoc command. |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
perldoc Amazon::SQS::ProducerConsumer |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
You can also look for information at: |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
=over 4 |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
=item * RT: CPAN's request tracker (report bugs here) |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
L |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
=item * AnnoCPAN: Annotated CPAN documentation |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
L |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
=item * CPAN Ratings |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
L |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
=item * Search CPAN |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
L |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
=back |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
=head1 LICENSE AND COPYRIGHT |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
Copyright 2011 Nic Wolff. |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
212
|
|
|
|
|
|
|
under the terms of either: the GNU General Public License as published |
213
|
|
|
|
|
|
|
by the Free Software Foundation; or the Artistic License. |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
See http://dev.perl.org/licenses/ for more information. |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
=cut |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
1; # End of Amazon::SQS::ProducerConsumer |