File Coverage

blib/lib/PubNub/PubSub.pm
Criterion Covered Total %
statement 45 104 43.2
branch 6 42 14.2
condition 8 43 18.6
subroutine 9 14 64.2
pod 4 4 100.0
total 72 207 34.7


line stmt bran cond sub pod time code
1             package PubNub::PubSub;
2              
3 2     2   32781 use strict;
  2         5  
  2         77  
4 2     2   22 use v5.10;
  2         5  
  2         118  
5             our $VERSION = '1.0.0';
6              
7 2     2   11 use Carp;
  2         7  
  2         180  
8 2     2   1132 use Mojo::JSON qw/encode_json/;
  2         103604  
  2         115  
9 2     2   1219 use Mojo::UserAgent;
  2         370812  
  2         21  
10 2     2   72 use Mojo::Util qw/url_escape/;
  2         4  
  2         87  
11              
12 2     2   869 use PubNub::PubSub::Message;
  2         4  
  2         1615  
13              
14             sub new {
15 1     1 1 16 my $class = shift;
16 1 50       8 my %args = @_ % 2 ? %{$_[0]} : @_;
  0         0  
17              
18 1   50     7 $args{host} ||= 'pubsub.pubnub.com';
19 1   50     4 $args{port} ||= 80;
20 1   50     7 $args{timeout} ||= 60; # for ua timeout
21 1   50     4 $args{publish_queue} ||= [];
22              
23 1 50       4 my $proto = ($args{port} == 443) ? 'https://' : 'http://';
24 1   33     5 $args{web_host} ||= $proto . $args{host};
25              
26 1         3 return bless \%args, $class;
27             }
28              
29             sub __ua {
30 0     0   0 my $self = shift;
31              
32 0 0       0 return $self->{ua} if exists $self->{ua};
33              
34 0         0 my $ua = Mojo::UserAgent->new;
35 0         0 $ua->max_redirects(3);
36 0         0 $ua->inactivity_timeout($self->{timeout});
37 0         0 $ua->proxy->detect; # env proxy
38 0         0 $ua->cookie_jar(0);
39 0         0 $ua->max_connections(100);
40 0         0 $self->{ua} = $ua;
41              
42 0         0 return $ua;
43             }
44              
45             sub publish {
46 0     0 1 0 my $self = shift;
47              
48 0 0       0 my %params = @_ % 2 ? %{$_[0]} : @_;
  0         0  
49 0   0     0 my $callback = $params{callback} || $self->{publish_callback};
50              
51 0         0 my $ua = $self->__ua;
52              
53 0         0 my @steps = map {
54 0         0 my $ref = $_;
55 0         0 my $url = $ref->{url};
56             sub {
57 0     0   0 my $delay = shift;
58 0         0 my $end = $delay->begin;
59             $ua->get($url => sub {
60 0 0       0 $callback->($_[1]->res, $ref->{message}) if $callback;
61 0         0 $end->();
62 0         0 });
63             }
64 0         0 } $self->__construct_publish_urls(%params);
65              
66 0         0 Mojo::IOLoop->delay(@steps)->wait;
67             }
68              
69             sub __construct_publish_urls {
70 3     3   8505 my ($self, %params) = @_;
71              
72 3   33     18 my $pub_key = $params{pub_key} || $self->{pub_key};
73 3 50       5 $pub_key or croak "pub_key is required.";
74 3   33     14 my $sub_key = $params{sub_key} || $self->{sub_key};
75 3 50       7 $sub_key or croak "sub_key is required.";
76 3   33     11 my $channel = $params{channel} || $self->{channel};
77 3 50       6 $channel or croak "channel is required.";
78 3 50       7 $params{messages} or croak "messages is required.";
79              
80 6         992 return map {
81 6         17 my $json = $_->json;
82 6         294 my $uri = Mojo::URL->new( $self->{web_host} . qq~/publish/$pub_key/$sub_key/0/$channel/0/~ . url_escape($json) );
83 6         1015 $uri->query($_->query_params(\%params));
84 6         226 { url => $uri->to_string, message => $_ };
85 3         4 } map { PubNub::PubSub::Message->new($_) } @{$params{messages}};
  3         9  
86             }
87              
88             sub subscribe {
89 0     0 1   my $self = shift;
90 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
91              
92 0   0       my $sub_key = $params{sub_key} || $self->{sub_key};
93 0 0         $sub_key or croak "sub_key is required.";
94 0   0       my $channel = $params{channel} || $self->{channel};
95 0 0         $channel or croak "channel is required.";
96              
97 0 0         my $callback = $params{callback} or croak "callback is required.";
98 0   0       my $timetoken = $params{timetoken} || '0';
99              
100 0           my $ua = $self->__ua;
101              
102 0           my $tx = $ua->get($self->{web_host} . "/subscribe/$sub_key/$channel/0/$timetoken");
103 0 0         unless ($tx->success) {
104             # for example $tx->error->{message} =~ /Inactivity timeout/
105 0           return $self->subscribe(%params, timetoken => $timetoken);
106             }
107 0           my $json = $tx->res->json;
108              
109 0 0         my $rtn = $callback ? $callback->(@{ $json->[0] }) : 1;
  0            
110 0 0         return unless $rtn;
111              
112 0           $timetoken = $json->[1];
113 0           return $self->subscribe(%params, timetoken => $timetoken);
114             }
115              
116             sub history {
117 0     0 1   my $self = shift;
118              
119 0 0 0       if (scalar(@_) == 1 and ref($_[0]) ne 'HASH' and $_[0] =~ /^\d+$/) {
      0        
120 0           @_ = (count => $_[0]);
121 0           warn "->history(\$num) is deprecated and will be removed in next few releases.\n";
122             }
123              
124 0 0         my %params = @_ % 2 ? %{$_[0]} : @_;
  0            
125              
126 0   0       my $sub_key = delete $params{sub_key} || $self->{sub_key};
127 0 0         $sub_key or croak "sub_key is required.";
128 0   0       my $channel = delete $params{channel} || $self->{channel};
129 0 0         $channel or croak "channel is required.";
130              
131 0           my $ua = $self->__ua;
132              
133 0           my $tx = $ua->get($self->{web_host} . "/v2/history/sub-key/$sub_key/channel/$channel" => form => \%params);
134 0 0         return [$tx->error->{message}] unless $tx->success;
135 0           return $tx->res->json;
136             }
137              
138             1;
139             __END__