File Coverage

blib/lib/APNS/Agent.pm
Criterion Covered Total %
statement 99 149 66.4
branch 13 38 34.2
condition 1 21 4.7
subroutine 23 29 79.3
pod 0 3 0.0
total 136 240 56.6


line stmt bran cond sub pod time code
1             package APNS::Agent;
2 2     2   187545 use 5.010;
  2         7  
  2         94  
3 2     2   12 use strict;
  2         4  
  2         68  
4 2     2   11 use warnings;
  2         12  
  2         109  
5              
6             our $VERSION = "0.06";
7              
8 2     2   2181 use AnyEvent::APNS;
  2         378515  
  2         77  
9 2     2   2006 use Cache::LRU;
  2         1445  
  2         64  
10 2     2   13 use Encode qw/decode_utf8/;
  2         4  
  2         208  
11 2     2   12 use JSON::XS;
  2         5  
  2         108  
12 2     2   1870 use Log::Minimal;
  2         75591  
  2         15  
13 2     2   2354 use Plack::Request;
  2         1026583  
  2         77  
14 2     2   1931 use Router::Boom::Method;
  2         20559  
  2         394  
15              
16             use Class::Accessor::Lite::Lazy 0.03 (
17             new => 1,
18             ro => [qw/
19             certificate
20             private_key
21             sandbox
22             debug_port
23             /],
24             ro_lazy => {
25             on_error_response => sub {
26             sub {
27 0         0 my $self = shift;
28 0         0 my %d = %{$_[0]};
  0         0  
29 0   0     0 warnf "identifier:%s\tstate:%s\ttoken:%s", $d{identifier}, $d{state}, $d{token} || '';
30             }
31 0         0 },
32 1         19 disconnect_interval => sub { 60 },
33 1         11 send_interval => sub { 0.01 },
34 1         18 _sent_cache => sub { Cache::LRU->new(size => 10000) },
35 1         11 _queue => sub { [] },
36             __apns => '_build_apns',
37 1         11 _sent => sub { 0 },
38             },
39 2         42 rw => [qw/_last_sent_at _disconnect_timer/],
40 2     2   1982 );
  2         2286  
41              
42             sub to_app {
43 1     1 0 10314 my $self = shift;
44              
45 1         13 my $router = Router::Boom::Method->new;
46 1         12 $router->add(POST => '/' => '_do_main');
47 1         26 $router->add(GET => '/monitor' => '_do_monitor');
48              
49             sub {
50 3     3   35331 my $env = shift;
51 3         33 my ($target_method) = $router->match(@$env{qw/REQUEST_METHOD PATH_INFO/});
52              
53 3 50       1117 return [404, [], ['NOT FOUND']] unless $target_method;
54              
55 3         38 my $req = Plack::Request->new($env);
56 3         46 $self->$target_method($req);
57 1         25 };
58             }
59              
60             sub _do_main {
61 1     1   3 my ($self, $req) = @_;
62              
63 1 50       7 my $token = $req->param('token') or return [400, [], ['Bad Request']];
64              
65 1         882 my $payload;
66 1 50       5 if (my $payload_json = $req->param('payload') ) {
    50          
67 0         0 state $json_driver = JSON::XS->new->utf8;
68 0         0 local $@;
69 0         0 $payload = eval { $json_driver->decode($payload_json) };
  0         0  
70 0 0       0 return [400, [], ['BAD REQUEST']] if $@;
71             }
72             elsif (my $alert = $req->param('alert')) {
73 1         37 $payload = +{
74             alert => decode_utf8($alert),
75             };
76             }
77 1 50       76 return [400, [], ['BAD REQUEST']] unless $payload;
78              
79 1         4 my @payloads = map {[$_, $payload]} split /,/, $token;
  1         5  
80 1         2 push @{$self->_queue}, @payloads;
  1         8  
81              
82 1         7 infof "event:payload queued\ttoken:%s", $token;
83 1 50       46 if ($self->__apns->connected) {
84 0         0 $self->_sending;
85             }
86             else {
87 1         16 $self->_connect_to_apns;
88             }
89 1         30 return [200, [], ['Accepted']];
90             }
91              
92             sub _do_monitor {
93 2     2   6 my ($self, $req) = @_;
94              
95 2         17 my $result = {
96             sent => $self->_sent,
97 2         13 queued => scalar( @{ $self->_queue } ),
98             };
99 2         45 my $body = encode_json($result);
100              
101 2         28 return [200, [
102             'Content-Type' => 'application/json; charset=utf-8',
103             'Content-Length' => length($body),
104             ], [$body]];
105             }
106              
107             sub _build_apns {
108 1     1   34031 my $self = shift;
109              
110             AnyEvent::APNS->new(
111             certificate => $self->certificate,
112             private_key => $self->private_key,
113             sandbox => $self->sandbox,
114             on_error => sub {
115 0     0   0 my ($handle, $fatal, $message) = @_;
116              
117 0         0 my $t; $t = AnyEvent->timer(
118             after => 0,
119             interval => 10,
120             cb => sub {
121 0         0 undef $t;
122 0         0 infof "event:reconnect";
123 0         0 $self->_connect_to_apns;
124             },
125 0         0 );
126 0         0 warnf "event:error\tfatal:$fatal\tmessage:$message";
127             },
128             on_connect => sub {
129 1     1   4338 infof "event:on_connect";
130 1         19 $self->_disconnect_timer($self->_build_disconnect_timer);
131              
132 1 50       32 if (@{$self->_queue}) {
  1         5  
133 1         22 $self->_sending;
134             }
135             },
136             on_error_response => sub {
137 0     0   0 my ($identifier, $state) = @_;
138 0   0     0 my $data = $self->_sent_cache->get($identifier) || {};
139 0         0 $self->on_error_response->($self, {
140             identifier => $identifier,
141             state => $state,
142             token => $data->{token},
143             payload => $data->{payload},
144             });
145             },
146 1 50       8 ($self->debug_port ? (debug_port => $self->debug_port) : ()),
147             );
148             }
149              
150             sub _apns {
151 2     2   5 my $self = shift;
152              
153 2         9 my $apns = $self->__apns;
154 2 100       29 $apns->connect unless $apns->connected;
155 2         900 $apns;
156             }
157 1     1   5 sub _connect_to_apns { goto \&_apns }
158              
159             sub _build_disconnect_timer {
160 1     1   3 my $self = shift;
161              
162 1 50       5 if (my $interval = $self->disconnect_interval) {
163             AnyEvent->timer(
164             after => $interval,
165             interval => $interval,
166             cb => sub {
167 0 0 0 0   0 if ($self->{__apns} && (time - ($self->_last_sent_at || 0) > $interval)) {
      0        
168 0         0 delete $self->{__apns};
169 0         0 delete $self->{_disconnect_timer};
170 0         0 infof "event:close apns";
171             }
172             },
173 1         22 );
174             }
175 0         0 else { undef }
176             }
177              
178             sub _sending {
179 1     1   2 my $self = shift;
180              
181             $self->{_send_timer} ||= AnyEvent->timer(
182             after => $self->send_interval,
183             interval => $self->send_interval,
184             cb => sub {
185 1     1   9459 my $msg = shift @{ $self->_queue };
  1         11  
186 1 50       24 if ($msg) {
187 1         7 $self->_send(@$msg);
188             }
189             else {
190 0         0 delete $self->{_send_timer};
191             }
192             },
193 1   33     11 );
194             }
195              
196             sub _send {
197 1     1   5 my ($self, $token, $payload) = @_;
198              
199 1         3 local $@;
200 1         2 my $identifier;
201 1         3 eval {
202 1         7 $identifier = $self->_apns->send(pack("H*", $token) => {
203             aps => $payload,
204             });
205             };
206              
207 1 50       2095 if (my $err = $@) {
208 0 0       0 if ($err =~ m!Can't call method "push_write" on an undefined value!) {
209             # AnyEvent::APNS->handle is missing
210 0         0 delete $self->{_send_timer};
211 0         0 unshift @{ $self->_queue }, [$token, $payload];
  0         0  
212 0         0 $self->_connect_to_apns;
213             }
214             else {
215 0         0 die $err;
216             }
217             }
218             else {
219 1         7 $self->_sent_cache->set($identifier => {
220             token => $token,
221             payload => $payload,
222             });
223 1         58 $self->_last_sent_at(time);
224 1         23 infof "event:send\ttoken:$token\tidentifier:$identifier";
225 1         13 $self->{_sent}++;
226 1         13 $identifier;
227             }
228             }
229              
230             sub parse_options {
231 0     0 0   my ($class, @argv) = @_;
232              
233 0           require Getopt::Long;
234 0           require Pod::Usage;
235 0           require Hash::Rename;
236              
237 0           my $p = Getopt::Long::Parser->new(
238             config => [qw/posix_default no_ignore_case auto_help pass_through bundling/]
239             );
240 0 0         $p->getoptionsfromarray(\@argv, \my %opt, qw/
241             certificate=s
242             private-key=s
243             disconnect-interval=i
244             sandbox!
245             debug-port=i
246             /) or Pod::Usage::pod2usage();
247 0 0 0       Pod::Usage::pod2usage() if !$opt{certificate} || !$opt{'private-key'};
248              
249 0     0     Hash::Rename::hash_rename(\%opt, code => sub {tr/-/_/});
  0            
250 0           (\%opt, \@argv);
251             }
252              
253             sub run {
254 0     0 0   my $self = shift;
255 0 0         my %args = @_ == 1 ? %{$_[0]} : @_;
  0            
256 0 0 0       if (!$args{listen} && !$args{port} && !$ENV{SERVER_STARTER_PORT}) {
      0        
257 0           $args{port} = 4905;
258             }
259 0           require Plack::Loader;
260 0           Plack::Loader->load(Twiggy => %args)->run($self->to_app);
261             }
262              
263             1;
264             __END__