File Coverage

blib/lib/Email/Blaster.pm
Criterion Covered Total %
statement 31 33 93.9
branch n/a
condition n/a
subroutine 11 11 100.0
pod n/a
total 42 44 95.4


line stmt bran cond sub pod time code
1             package Email::Blaster;
2              
3 2     2   14 use strict;
  2         2  
  2         48  
4 2     2   10 use warnings 'all';
  2         2  
  2         50  
5 2     2   8 use Carp 'confess';
  2         2  
  2         86  
6 2     2   1600 use forks;
  1         258876  
  1         10  
7 1     1   164 use forks::shared;
  1         3  
  1         8  
8 1     1   90 use POSIX 'ceil';
  1         3  
  1         9  
9 1     1   713 use HTTP::Date 'time2iso';
  1         3287  
  1         29  
10 1     1   100 use Time::HiRes qw( gettimeofday usleep );
  1         2  
  1         9  
11 1     1   307 use Digest::MD5 'md5_hex';
  1         2  
  1         24  
12 1     1   739 use Email::Blaster::ConfigLoader;
  1         4  
  1         32  
13 1     1   346 use Email::Blaster::Event;
  0            
  0            
14             use Email::Blaster::Event::Type;
15             use Email::Blaster::Transmission;
16              
17             our $VERSION = '0.0001_06';
18             our $InstanceClass = __PACKAGE__;
19             our $instance;
20             my @progress : shared = ( );
21              
22              
23             #==============================================================================
24             sub new
25             {
26             my ($class) = @_;
27            
28             no strict 'refs';
29             return ${"$InstanceClass\::instance"} if ${"$InstanceClass\::instance"};
30            
31             $class = ref($class) ? ref($class) : $class;
32            
33             my $s = ${"$InstanceClass\::instance"} = bless {
34             config => Email::Blaster::ConfigLoader->load(),
35             continue_running => 1,
36             }, $class;
37             $s->config->_init;
38            
39             # Load up our assembler:
40             $s->_load_class( $s->config->message_assembler );
41             $s->{message_assembler} = $s->config->message_assembler->new( );
42            
43             # Load up our sender:
44             $s->_load_class( $s->config->mail_sender );
45             $s->{mail_sender} = $s->config->mail_sender->new( );
46            
47             return $s;
48             }# end new()
49              
50              
51             #==============================================================================
52             sub run
53             {
54             my ($s) = @_;
55            
56             # Wait until we get a new transmission, then process it:
57             while( $s->continue_running )
58             {
59             my $trans;
60             warn "Waiting for a transmission...\n";
61             unless( $trans = $s->find_new_transmission() )
62             {
63             sleep(5);
64             next;
65             }# end unless();
66            
67             @progress = ( );
68             warn "Processing $trans...\n";
69             $trans->is_started( 1 );
70             $trans->started_on( time2iso() );
71             $trans->blaster_hostname( $s->config->hostname );
72             $trans->update();
73            
74             # Call our initializer(s):
75             $s->handle_event(
76             type => 'init_transmission',
77             transmission_id => $trans->id
78             );
79            
80             # Spread the workload across some workers:
81             my @workers = ( );
82             my @sendlogs = $trans->sendlogs;
83             my @bulk_sendlogs = grep { ! $_->throttled_domain } @sendlogs;
84             push @workers, $s->_init_throttled_workers( \@bulk_sendlogs );
85            
86             my $boss = threads->create(sub {
87             while( $s->continue_running && grep { $_->is_running } @workers )
88             {
89             my $running = scalar( grep { $_->is_running } @workers);
90             my ( $ids, $processed );
91             SCOPE: {
92             lock(@progress);
93             $processed = scalar(@progress);
94             $ids = [ @progress ];
95             @progress = ( );
96             };
97             $s->mark_sendlogs_as_finished( $ids ) if @$ids;
98             warn "Waiting for $running workers - Finished $processed this round...\n";
99             sleep(1);
100             }# end while()
101             });
102            
103             # Call our initializer(s):
104             $s->handle_event(
105             type => 'begin_transmission',
106             transmission_id => $trans->id
107             );
108            
109             # Wait for our workers to finish:
110             $_->join foreach ( $boss, @workers );
111            
112             # Don't forget any straggler sendlogs:
113             warn "Finished @{[ scalar(@progress) ]} in cleanup...\n";
114             $s->mark_sendlogs_as_finished( [ @progress ] ) if @progress;
115            
116             # Mark it as completed:
117             $trans->is_completed( 1 );
118             $trans->completed_on( time2iso() );
119             $trans->update();
120            
121             # Call our initializer(s):
122             $s->handle_event(
123             type => 'end_transmission',
124             transmission_id => $trans->id
125             );
126            
127             }# end while()
128             }# end run()
129              
130              
131             #==============================================================================
132             sub send_message
133             {
134             my ($s, $sendlog, $transmission) = @_;
135            
136             my $msg = $s->message_assembler->assemble(
137             $s,
138             $sendlog,
139             $transmission
140             );
141            
142             $s->mail_sender->send_message(
143             blaster => $s,
144             subject => $msg->{subject},
145             content => $msg->{content},
146             transmission => $transmission,
147             sendlog => $sendlog,
148             );
149             }# end send_message()
150              
151              
152             #==============================================================================
153             sub _init_throttled_workers
154             {
155             my ($s, $bulk_sendlogs) = @_;
156            
157             my @workers = ( );
158             my $per_worker = ceil( scalar(@$bulk_sendlogs) / $s->config->max_bulk_workers );
159             while( my @chunk = splice(@$bulk_sendlogs, 0, $per_worker) )
160             {
161             push @workers, threads->create(sub {
162             my $sendlogs = shift;
163             my $trans;
164             map {
165             $trans ||= $_->transmission;
166             my $queued_as = $s->send_message(
167             $_,
168             $trans
169             );
170             lock( @progress );
171             push @progress, $_->id . ':' . $queued_as;
172             } @$sendlogs;
173            
174             return;
175             }, \@chunk);
176             }# end while()
177            
178             return @workers;
179             }# end _init_throttled_workers()
180              
181              
182             #==============================================================================
183             sub mark_sendlogs_as_finished
184             {
185             my ($s, $items) = @_;
186            
187             my $sth = Email::Blaster::Model->db_Main->prepare(<<"SQL");
188             UPDATE sendlogs SET
189             is_sent = 1,
190             queued_as = ?,
191             sent_on = '@{[ time2iso ]}'
192             WHERE sendlog_id = ?
193             SQL
194             foreach my $item ( @$items )
195             {
196             my ($id, $queued_as) = split /:/, $item;
197             $sth->execute( $queued_as, $id );
198             }# end foreach()
199             $sth->finish();
200             }# end mark_sendlogs_as_finished()
201              
202              
203             #==============================================================================
204             sub find_new_transmission;
205              
206              
207             #==============================================================================
208             sub current { $instance || shift->new }
209             sub config { shift->{config} }
210             sub message_assembler { shift->{message_assembler} }
211             sub mail_sender { shift->{mail_sender} }
212              
213              
214             #==============================================================================
215             sub continue_running
216             {
217             my ($s) = shift;
218            
219             @_ ? $s->{continue_running} = shift : $s->{continue_running};
220             }# end continue_running()
221              
222              
223             #==============================================================================
224             sub handle_event
225             {
226             my ($s, %args) = @_;
227            
228             my ($type) = Email::Blaster::Event::Type->search( event_type_name => delete($args{type}) );
229              
230             my $event = Email::Blaster::Event->create(
231             %args,
232             event_type_id => $type->id,
233             );
234            
235             my $group = $type->event_type_name;
236             map {
237             $s->_load_class( $_ );
238             $_->new->run( $event );
239             } @{ $s->config->handlers->$group->handler };
240            
241             1;
242             }# end handle_event()
243              
244              
245             #==============================================================================
246             sub _load_class
247             {
248             my ($s, $class) = @_;
249            
250             (my $file = "$class.pm") =~ s/::/\//g;
251             eval { require $file unless $INC{$file}; 1 } or confess "Cannot load $class: $@";
252             }# end _load_class()
253              
254              
255             1;# return true:
256              
257             __END__