File Coverage

blib/lib/Amazon/SQS/Consumer.pm
Criterion Covered Total %
statement 12 12 100.0
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 16 16 100.0


line stmt bran cond sub pod time code
1             package Amazon::SQS::Consumer;
2              
3 1     1   6129 use 5.006;
  1         4  
  1         46  
4 1     1   6 use strict;
  1         2  
  1         38  
5 1     1   6 use warnings;
  1         1  
  1         44  
6              
7 1     1   7 use base 'Amazon::SQS::ProducerConsumer::Base';
  1         1  
  1         113  
8             use JSON::XS;
9             use Encode qw( encode_utf8 is_utf8 );
10              
11             use constant {
12             DEFAULT_N_MESSAGES => 10,
13             DEFAULT_WAIT_SECONDS => 30,
14             SECONDS_BETWEEN_TRIES => 10
15             };
16              
17             =head1 NAME
18              
19             Amazon::SQS::Consumer - Receive messages from an Amazon Simple Queue Service (SQS) queue
20              
21             =cut
22              
23             sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
24             $SIG{INT} = sub { say 'caught signal INT'; exit 0; };
25             $SIG{CHLD} = 'IGNORE';
26              
27             =head1 SYNOPSIS
28              
29             use Amazon::SQS::Consumer;
30              
31             my $in_queue = new Amazon::SQS::Consumer
32             AWSAccessKeyId => 'PUBLIC_KEY_HERE',
33             SecretAccessKey => 'SECRET_KEY_HERE',
34             queue => 'YourInputQueue';
35              
36             while ( my $item = $in_queue->next ) {
37             # Do stuff with the item
38             }
39              
40             =head1 METHODS
41              
42             =head2 new(%params)
43              
44             This is the constructor, it will return you an Amazon::SQS::Consumer object to work with. It takes these parameters:
45              
46             =over
47              
48             =item AWSAccessKeyId (required)
49              
50             Your AWS access key.
51              
52             =item SecretAccessKey (required)
53              
54             Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf.
55              
56             =item queue (required)
57              
58             The URL of the queue to receive messages from.
59              
60             =item wait_seconds (optional)
61              
62             The number of seconds to wait for a new message when the queue is empty.
63              
64             =item debug (optional)
65              
66             A flag to turn on debugging. It is turned off by default.
67              
68             =back
69              
70             =cut
71              
72             sub new {
73             my $class = shift;
74             my %args = @_;
75              
76             my $me = \%args;
77             bless $me, $class;
78             $me->initialize;
79             return $me;
80             }
81              
82             sub initialize {
83             my $me = shift;
84              
85             $me->{n_messages} ||= DEFAULT_N_MESSAGES;
86             $me->{wait_seconds} ||= DEFAULT_WAIT_SECONDS;
87             $me->SUPER::initialize;
88             }
89              
90             =head2 next()
91              
92             This will receive a message from this Publisher's queue. When the queue is empty it will wait a new message for wait_seconds seconds.
93              
94             =cut
95              
96             sub next {
97             my $me = shift;
98              
99             # If we're done with the previous message, delete it
100             $me->delete_previous();
101              
102             if ( @ARGV ) {
103             $me->{messages} = [ map { MessageId => undef, Body => $_ }, @ARGV ];
104             undef @ARGV;
105             $me->{no_loop} = 't';
106             }
107              
108             my $seconds_to_wait = $me->{wait_seconds};
109             do {
110              
111             # If there no messages in the cache, get some from the queue
112             $me->{messages} = $me->receive_messages(
113             Queue => $me->{queue},
114             MaxNumberOfMessages => $me->{n_messages},
115             defined $me->{timeout} ? ( VisibilityTimeout => $me->{timeout} ) : ()
116             ) unless defined $me->{messages} && @{$me->{messages}} or $me->{no_loop};
117              
118             # If there's a message in the cache, return it
119             if ( my $message = shift @{$me->{messages}} ) {
120             $me->{DeleteMessageHandle} = $message->{ReceiptHandle};
121             my $object;
122             eval {
123             my $body = $message->{Body};
124             $body = encode_utf8( $body ) if is_utf8( $body );
125             $object = decode_json $body;
126             };
127             if ( $@ ) {
128             say "left bad message in queue; could not decode JSON from $message->{Body}: $@";
129             } else {
130             return $object;
131             }
132             } elsif ( $me->{no_loop} ) {
133             $seconds_to_wait = 0;
134             } else {
135             # Otherwise, wait a few seconds and try again
136             say "waiting $seconds_to_wait seconds for new messages"
137             if $seconds_to_wait == $me->{wait_seconds};
138             sleep SECONDS_BETWEEN_TRIES;
139             $seconds_to_wait -= SECONDS_BETWEEN_TRIES;
140             }
141              
142             } while ( $me->{forever} or $seconds_to_wait > 0 );
143              
144             # If we've retried for a while and gotten no messages, give up
145             return undef;
146              
147             }
148              
149             sub delete_previous {
150             my $me = shift;
151              
152             if ( $me->{DeleteMessageHandle} ) {
153             say "deleting message $me->{DeleteMessageHandle}" if $me->{debug};
154             $me->delete_message( Queue => $me->{queue}, ReceiptHandle => $me->{DeleteMessageHandle} );
155             }
156             }
157              
158             sub defer { delete $_[0]->{DeleteMessageHandle} }
159              
160              
161             =head1 AUTHOR
162              
163             Nic Wolff,
164              
165             =head1 BUGS
166              
167             Please report any bugs or feature requests to C, or through
168             the web interface at L. I will be notified, and then you'll
169             automatically be notified of progress on your bug as I make changes.
170              
171              
172              
173              
174             =head1 SUPPORT
175              
176             You can find documentation for this module with the perldoc command.
177              
178             perldoc Amazon::SQS::ProducerConsumer
179              
180              
181             You can also look for information at:
182              
183             =over 4
184              
185             =item * RT: CPAN's request tracker (report bugs here)
186              
187             L
188              
189             =item * AnnoCPAN: Annotated CPAN documentation
190              
191             L
192              
193             =item * CPAN Ratings
194              
195             L
196              
197             =item * Search CPAN
198              
199             L
200              
201             =back
202              
203              
204             =head1 ACKNOWLEDGEMENTS
205              
206              
207             =head1 LICENSE AND COPYRIGHT
208              
209             Copyright 2011 Nic Wolff.
210              
211             This program is free software; you can redistribute it and/or modify it
212             under the terms of either: the GNU General Public License as published
213             by the Free Software Foundation; or the Artistic License.
214              
215             See http://dev.perl.org/licenses/ for more information.
216              
217              
218             =cut
219              
220             1; # End of Amazon::SQS::ProducerConsumer