File Coverage

blib/lib/Amazon/SQS/Producer.pm
Criterion Covered Total %
statement 12 12 100.0
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 16 16 100.0


line stmt bran cond sub pod time code
1             package Amazon::SQS::Producer;
2              
3 1     1   24630 use 5.006;
  1         5  
  1         42  
4 1     1   7 use strict;
  1         2  
  1         39  
5 1     1   8 use warnings;
  1         6  
  1         33  
6              
7 1     1   7 use base 'Amazon::SQS::ProducerConsumer::Base';
  1         2  
  1         700  
8             use JSON::XS;
9              
10             use constant MAX_RETRIES => 3;
11              
12             =head1 NAME
13              
14             Amazon::SQS::Producer - Publish messages to an Amazon Simple Queue Service (SQS) queue
15              
16             =cut
17              
18             sub say (@) { warn join ' ', (split ' ', scalar localtime)[2,1,4,3], "[$$]", (split '/', $0)[-1], @_, "\n"; return @_; }
19             $SIG{INT} = sub { say 'caught signal INT'; exit 0; };
20             $SIG{CHLD} = 'IGNORE';
21              
22             =head1 SYNOPSIS
23              
24             use Amazon::SQS::Producer;
25              
26             my $out_queue = new Amazon::SQS::Producer
27             AWSAccessKeyId => 'PUBLIC_KEY_HERE',
28             SecretAccessKey => 'SECRET_KEY_HERE',
29             queue => 'YourOutputQueue',
30             consumer => 'ConsumerForOutputQueue';
31              
32             $out_queue->publish(
33             $existingObjectRef,
34             url => $enclosure_URL,
35             pubdate => $pubDate,
36             title => $title,
37             description => $description,
38             rss_guid => $guid,
39             );
40              
41             =head1 METHODS
42              
43             =head2 new(%params)
44              
45             This is the constructor, it will return you an Amazon::SQS::Producer object to work with. It takes these parameters:
46              
47             =over
48              
49             =item AWSAccessKeyId (required)
50              
51             Your AWS access key.
52              
53             =item SecretAccessKey (required)
54              
55             Your secret key, WARNING! don't give this out or someone will be able to use your account and incur charges on your behalf.
56              
57             =item queue (required)
58              
59             The URL of the queue to publish messages to.
60              
61             =item consumer (optional)
62              
63             The name of an executable that will consume messages from the queue we're publishing to. An instance will be launched after the each message is published, up to the maximum set by...
64              
65             =item start_consumers (optional)
66              
67             The maximum number of consumer instance to launch.
68              
69             =item debug (optional)
70              
71             A flag to turn on debugging. It is turned off by default.
72              
73             =back
74              
75             =cut
76              
77             sub new {
78             my $class = shift;
79             my %args = @_;
80              
81             my $me = \%args;
82             bless $me, $class;
83             $me->initialize;
84             return $me;
85             }
86              
87             sub initialize {
88             my $me = shift;
89              
90             $me->{sleep_after_starting_consumer} = 2 if not exists $me->{sleep_after_starting_consumer};
91             $me->SUPER::initialize;
92             }
93              
94             =head2 publish(%params)
95              
96             This will publish a message to this Publisher's queue, and start a consumer if this is the first message this Publisher has published. The message body will be a JSON representaton of the method's argument hash. If the first argument is a reference to a hash it will be dereferenced and merged with the other parameters given.
97              
98             =cut
99              
100             sub publish {
101             if ( ref $_[0] and ! $_[0]->{queue} ) { goto &fork_consumer }
102             if ( ref $_[1] and $_[1]->{_chain_consumers} ) { goto &fork_consumer }
103              
104             my $me = shift;
105             my $old_data = shift if ref $_[0];
106             my $data = { %$old_data, @_ };
107             my $encoded_data = encode_json $data;
108              
109             say "Queueing message: $encoded_data" if $data->{_debug};
110             return if $data->{_test};
111              
112             my $retries;
113             my $message_id;
114             until (
115             $message_id = $me->send_message(
116             Queue => $me->{queue},
117             MessageBody => $encoded_data,
118             )
119             ) {
120             say "couldn't queue message: ", $me->error;
121             if ( $retries++ < MAX_RETRIES ) {
122             say "trying again in $retries seconds";
123             sleep $retries;
124             } else {
125             say "giving up trying to publish to queue $me->{queue} with message body: $encoded_data",
126             return;
127             }
128             }
129              
130             if ( $me->{consumer} and $me->{started_consumers}++ < $me->{start_consumers} ) {
131             my $pid = fork;
132             if ( not defined $pid ) {
133             say "couldn't fork";
134             } elsif ( not $pid ) {
135             close STDIN; open STDIN, '/dev/null';
136             close STDOUT; open STDOUT, '/dev/null';
137             close STDERR; open STDERR, '>>/tmp/getfeeds.log';
138             sleep $me->{sleep_after_starting_consumer};
139             exec $me->{consumer};
140             } else {
141             say "started consumer $me->{consumer} with PID $pid for queue $me->{queue}";
142             }
143             }
144              
145             return $message_id;
146              
147             }
148              
149             sub fork_and_publish {
150             my $me = shift;
151              
152             my $pid = fork;
153             if ( not defined $pid ) {
154             say "couldn't fork";
155             } elsif ( not $pid ) {
156             $me->publish( @_ );
157             } else {
158             say "forked to publish to queue $me->{queue} with PID $pid";
159             }
160             }
161              
162             sub fork_consumer {
163             my $me = shift;
164             my $old_data = shift if ref $_[0];
165             my %data = @_;
166              
167             if ( $me->{consumer} ) {
168             my $pid = fork;
169             if ( not defined $pid ) {
170             say "couldn't fork";
171             } elsif ( not $pid ) {
172             close STDIN; open STDIN, '/dev/null';
173             close STDOUT; open STDOUT, '/dev/null';
174             close STDERR; open STDERR, '>>/tmp/getfeeds.log';
175             $ENV{PATH} .= ':.';
176             sleep $me->{sleep_after_starting_consumer};
177             exec $me->{consumer}, encode_json { %$old_data, %data };
178             } else {
179             say "forked consumer $me->{consumer} with PID $pid";
180             }
181             }
182             }
183              
184             =head1 AUTHOR
185              
186             Nic Wolff,
187              
188             =head1 BUGS
189              
190             Please report any bugs or feature requests to C, or through
191             the web interface at L. I will be notified, and then you'll
192             automatically be notified of progress on your bug as I make changes.
193              
194              
195              
196              
197             =head1 SUPPORT
198              
199             You can find documentation for this module with the perldoc command.
200              
201             perldoc Amazon::SQS::ProducerConsumer
202              
203              
204             You can also look for information at:
205              
206             =over 4
207              
208             =item * RT: CPAN's request tracker (report bugs here)
209              
210             L
211              
212             =item * AnnoCPAN: Annotated CPAN documentation
213              
214             L
215              
216             =item * CPAN Ratings
217              
218             L
219              
220             =item * Search CPAN
221              
222             L
223              
224             =back
225              
226              
227             =head1 ACKNOWLEDGEMENTS
228              
229              
230             =head1 LICENSE AND COPYRIGHT
231              
232             Copyright 2011 Nic Wolff.
233              
234             This program is free software; you can redistribute it and/or modify it
235             under the terms of either: the GNU General Public License as published
236             by the Free Software Foundation; or the Artistic License.
237              
238             See http://dev.perl.org/licenses/ for more information.
239              
240              
241             =cut
242              
243             1; # End of Amazon::SQS::ProducerConsumer