File Coverage

blib/lib/IO/Socket/RedisPubSub.pm
Criterion Covered Total %
statement 30 68 44.1
branch 2 18 11.1
condition 5 13 38.4
subroutine 8 12 66.6
pod 0 6 0.0
total 45 117 38.4


line stmt bran cond sub pod time code
1             package IO::Socket::RedisPubSub;
2 1     1   20468 use strict;
  1         3  
  1         32  
3 1     1   808 use IO::Socket;
  1         25455  
  1         7  
4              
5             BEGIN {
6 1     1   667 use Exporter ();
  1         6  
  1         23  
7 1     1   4 use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
  1         2  
  1         135  
8 1     1   2 $VERSION = '0.02';
9 1         12 @ISA = qw(Exporter);
10 1         2 @EXPORT = qw();
11 1         3 @EXPORT_OK = qw(publish subscribe pull);
12 1         775 %EXPORT_TAGS = ();
13             }
14              
15             1;
16              
17             # Read from the redis server and package the response into
18             # a hash ref.
19             sub _redis_read {
20 0     0   0 my $self = shift;
21 0         0 my $conn = $self->{connection};
22 0         0 my $start = $conn->getline;
23 0 0       0 return { type => 'int', value => $1 } if $start =~ /^:(\d+)/;
24 0 0       0 return { type => 'error', value => $1 } if $start =~ /^-(\d+)/;
25 0 0       0 return { type => 'line', value => $1 } if $start =~ /^\+(\d+)/;
26 0 0       0 if ( $start =~ /^\$(\d+)/ ) {
27 0 0       0 return { type => 'bulkerror' } if $1 == -1;
28 0         0 my $res;
29 0         0 $conn->read ( $res, $1 );
30 0         0 my $l = $conn->getline;
31 0         0 return { type => 'bulk', length => $1, value => $res };
32             }
33 0 0       0 if ( $start =~ /^\*(\d+)/ ) {
34 0 0       0 return { type => 'multibulkerror' } if $1 == -1;
35 0         0 my @res;
36 0         0 push @res, $self->_redis_read ( "multibulk $_" ) for 1 ... $1;
37 0         0 return { type => 'bulkmulti', values => \@res };
38             }
39 0         0 warn "bad read";
40 0         0 return undef;
41             }
42              
43             sub new {
44 1     1 0 12 my ($class, %args ) = @_;
45            
46 1   33     8 my $self = bless ({}, ref ($class) || $class);
47 1   50     6 my $host = $args{host} || 'localhost';
48 1   50     6 my $port = $args{port} || 6379;
49 1         6 $self->connect ( $host, $port );
50 1         4 return $self;
51             }
52              
53             sub publish {
54 0     0 0 0 my ( $self, $channel, $msg ) = @_;
55 0         0 my $conn = $self->{connection};
56 0         0 my $cl = length $channel;
57 0         0 my $ml = length $msg;
58              
59 0         0 my $cmd = "*3\r\n\$7\r\npublish\r\n\$$cl\r\n$channel\r\n\$$ml\r\n$msg\r\n";
60 0         0 $conn->print ( $cmd );
61 0         0 return $self->_redis_read ( 'publish' );
62             }
63              
64             sub subscribe {
65 0     0 0 0 my ( $self, $channel ) = @_;
66 0         0 my $conn = $self->{connection};
67 0         0 my $cl = length $channel;
68 0         0 my $cmd = "*2\r\n\$9\r\nsubscribe\r\n\$$cl\r\n$channel\r\n";
69 0         0 $conn->print ( $cmd );
70 0         0 return $self->_redis_read ( 'subscribe' );
71             }
72              
73             sub pull {
74 0     0 0 0 my ( $self ) = @_;
75 0         0 my $d = $self->_redis_read ( 'pull' );
76 0         0 my ( $type, $channel, $message ) =
77             ( $d->{values}->[0]->{value},
78             $d->{values}->[1]->{value},
79             $d->{values}->[2]->{value} );
80 0         0 return ( $channel, $message );
81             }
82              
83             sub connect {
84 1     1 0 3 my ( $self, $host, $port ) = @_;
85 1         5 $self->close;
86 1         12 $self->{connection} = IO::Socket::INET->new ( PeerAddr => "$host:$port" );
87 1 50 33     1231 return ( undef, undef ) unless $self->{connection} &&
88             $self->{connection}->connected;
89 0         0 my ( $myport, $myaddr ) = sockaddr_in ( $self->{connection}->sockname );
90 0         0 return ( inet_ntoa ( $myaddr ), $myport );
91             }
92              
93             sub close {
94 1     1 0 2 my $self = shift;
95 1         8 my $conn = $self->{connection};
96 1 50 33     5 $conn->close if $conn && $conn->connected;
97             }
98              
99             =head1 NAME
100              
101             IO::Socket::RedisPubSub - A simple redis publish/subscribe client.
102              
103             =head1 SYNOPSIS
104              
105             # Somewhere
106             use IO::Socket::RedisPubSub qw(subscribe pull);
107              
108             my $rs = IO::Socket::RedisPubSub->new;
109              
110             $rs->subscribe ( 'newsfeed' );
111              
112             while ( my ( $channel, $message ) = $rs->pull ) {
113             print "Got $message on $channel";
114             }
115              
116             # Elsewhere
117             use IO::Socket::RedisPubSub qw(publish);
118              
119             IO::Socket::RedisPubSub->new->publish ( 'newsfeed', 'hi there' );
120              
121            
122             =head1 DESCRIPTION
123              
124             A very simple redis client. Just uses the publish/subscribe features.
125              
126             =head1 AUTHOR
127              
128             Martin Redmond
129             CPAN ID: REDS
130             Tinychat.com
131             martin@tinychat.com
132             http://tinychat.com/about.html
133              
134             =head1 COPYRIGHT
135              
136             This program is free software; you can redistribute
137             it and/or modify it under the same terms as Perl itself.
138              
139             The full text of the license can be found in the
140             LICENSE file included with this module.
141              
142              
143             =head1 SEE ALSO
144              
145             perl(1).
146              
147             =cut
148