File Coverage

blib/lib/AnyEvent/Handle/ZeroMQ.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package AnyEvent::Handle::ZeroMQ;
2              
3 1     1   29807 use 5.006;
  1         5  
  1         40  
4 1     1   9 use strict;
  1         5  
  1         36  
5 1     1   5 use warnings;
  1         7  
  1         60  
6              
7             =head1 NAME
8              
9             AnyEvent::Handle::ZeroMQ - Integrate AnyEvent and ZeroMQ with AnyEvent::Handle like ways.
10              
11             =head1 VERSION
12              
13             Version 0.09
14              
15             =cut
16              
17             our $VERSION = '0.09';
18              
19              
20             =head1 SYNOPSIS
21              
22             use AnyEvent::Handle::ZeroMQ;
23             use AE;
24             use ZeroMQ;
25              
26             my $ctx = ZeroMQ::Context->new;
27             my $socket = $ctx->socket(ZMQ_XREP);
28             $socket->bind('tcp://0:8888');
29              
30             my $hdl = AnyEvent::Handle::ZeroMQ->new(
31             socket => $socket,
32             on_drain => sub { print "the write queue is empty\n" },
33             on_error => sub { my($error_msg) = @_; ... },
34             # catch errors when occured in the reading callback
35             );
36             # or $hdl->on_drain( sub { ... } );
37             # or $hdl->on_error( sub { ... } );
38             $hdl->push_read( sub {
39             my($hdl, $data) = @_;
40              
41             my @out;
42             while( defined( my $msg = shift @$data ) ) {
43             push @out, $msg;
44             last if $msg->size == 0;
45             }
46             while( my $msg = shift @$data ) {
47             print "get: ",$msg->data,$/;
48             }
49             push @out, "get!";
50             $hdl->push_write(\@out);
51             } );
52              
53             AE::cv->recv;
54              
55             =cut
56              
57 1     1   6 use strict;
  1         2  
  1         43  
58 1     1   5 use warnings;
  1         1  
  1         33  
59              
60 1     1   939 use AE;
  1         9120  
  1         44  
61 1     1   578 use ZeroMQ qw(:all);
  0            
  0            
62             use Scalar::Util qw(weaken);
63              
64             use base qw(Exporter);
65             our %EXPORT_TAGS = ( constant => [qw(SOCKET RQUEUE WQUEUE RWATCHER WWATCHER ON_DRAIN DEALER ROUTER)] );
66             our @EXPORT_OK = map { @$_ } values %EXPORT_TAGS;
67              
68             use constant {
69             SOCKET => 0,
70             RQUEUE => 1,
71             WQUEUE => 2,
72             WATCHER => 3,
73             ON_DRAIN => 4,
74             DEALER => 5,
75             ROUTER => 6,
76             ON_ERROR => 7,
77             };
78              
79             =head1 METHODS
80              
81             =head2 new( socket => $zmq_socket, on_drain(optional) => cb(hdl) )
82              
83             =cut
84              
85             sub new {
86             my $class = shift;
87             my %args = @_;
88             my $socket = $args{socket};
89              
90             my $fd = $socket->getsockopt(ZMQ_FD);
91              
92             my($self, $wself);
93              
94             $self = $wself = bless [
95             $socket,
96             [],
97             [],
98             AE::io($fd, 0, sub { _consume_read_write($wself) }),
99             undef,
100             ], $class;
101              
102             weaken $wself;
103              
104             if( exists $args{on_drain} ) {
105             on_drain($self, $args{on_drain});
106             }
107             if( exists $args{on_error} ) {
108             on_error($self, $args{on_error});
109             }
110              
111             return $self;
112             }
113              
114             sub _consume_read_write {
115             _consume_write(@_);
116             _consume_read(@_);
117             }
118              
119             =head2 push_read( cb(hdl, data (array_ref) ) )
120              
121             =cut
122              
123             sub _consume_read {
124             my $self = shift;
125              
126             my $socket = $self->[SOCKET];
127             my $rqueue = $self->[RQUEUE];
128              
129             while( $socket->getsockopt(ZMQ_EVENTS) & ZMQ_POLLIN && @$rqueue ) {
130             my @msgs;
131             {
132             push @msgs, $socket->recv;
133             redo if $socket->getsockopt(ZMQ_RCVMORE);
134             }
135             my $cb = shift @$rqueue;
136             eval { $cb->($self, \@msgs) };
137             if( $@ ) {
138             if( $self->[ON_ERROR] ) {
139             $self->[ON_ERROR]($@);
140             }
141             else {
142             warn $@;
143             }
144             }
145             }
146             }
147              
148             sub push_read {
149             my $self = shift;
150             push @{$self->[RQUEUE]}, pop;
151             _consume_read($self);
152             }
153              
154             =head2 push_write( data (array_ref) )
155              
156             =cut
157              
158             sub _consume_write {
159             my $self = shift;
160              
161             my $socket = $self->[SOCKET];
162             my $wqueue = $self->[WQUEUE];
163              
164             my $write_something;
165             while( $socket->getsockopt(ZMQ_EVENTS) & ZMQ_POLLOUT && @$wqueue ) {
166             my $msgs = shift @$wqueue;
167             while( defined( my $msg = shift @$msgs ) ) {
168             $socket->send($msg, @$msgs ? ZMQ_SNDMORE : 0);
169             }
170              
171             $write_something = 1;
172             }
173              
174             $self->[ON_DRAIN]($self) if( !@$wqueue && $write_something && $self->[ON_DRAIN] );
175             }
176              
177             sub push_write {
178             my $self = shift;
179             push @{$self->[WQUEUE]}, shift;
180             _consume_write($self);
181             }
182              
183             if( !exists(&ZeroMQ::Socket::DESTROY) ) {
184             *ZeroMQ::Socket::DESTROY = sub {
185             my $self = shift;
186             eval { $self->close };
187             };
188             }
189              
190             =head2 old_cb = on_drain( cb(hdl) )
191              
192             =cut
193              
194             sub on_drain {
195             my $self = shift;
196             my $cb = pop;
197              
198             $cb->($self) if( $cb && !@{$self->[WQUEUE]} );
199              
200             my $old_cb = $self->[ON_DRAIN];
201             $self->[ON_DRAIN] = $cb;
202              
203             return $old_cb;
204             }
205              
206             =head2 old_cb = on_error( cb(hdl) )
207              
208             =cut
209              
210             sub on_error {
211             my $self = shift;
212             my $cb = pop;
213              
214             my $old_cb = $self->[ON_ERROR];
215             $self->[ON_ERROR] = $cb;
216              
217             return $old_cb;
218             }
219              
220             =head1 DIFFERENCES
221              
222             There is also a module called L in CPAN.
223              
224             AnyEvent::ZeroMQ::* is a huge, heavy,
225             and full-functioned framework, but this module is a simple,
226             lightweight library with less dependency, and runs faster.
227              
228             So this module is only occupy a smaller namespace
229             under AnyEvent::Handle::
230              
231             This module and AnyEvent::ZeroMQ::* are not replacable to each other.
232              
233             =head1 AUTHOR
234              
235             Cindy Wang (CindyLinz)
236              
237             =head1 BUGS
238              
239             Please report any bugs or feature requests to C, or through
240             the web interface at L. I will be notified, and then you'll
241             automatically be notified of progress on your bug as I make changes.
242              
243              
244              
245              
246             =head1 SUPPORT
247              
248             You can find documentation for this module with the perldoc command.
249              
250             perldoc AnyEvent::Handle::ZeroMQ
251              
252              
253             You can also look for information at:
254              
255             =over 4
256              
257             =item * RT: CPAN's request tracker (report bugs here)
258              
259             L
260              
261             =item * AnnoCPAN: Annotated CPAN documentation
262              
263             L
264              
265             =item * CPAN Ratings
266              
267             L
268              
269             =item * Search CPAN
270              
271             L
272              
273             =back
274              
275              
276             =head1 ACKNOWLEDGEMENTS
277              
278              
279             =head1 LICENSE AND COPYRIGHT
280              
281             Copyright 2011 Cindy Wang (CindyLinz).
282              
283             This program is free software; you can redistribute it and/or modify it
284             under the terms of either: the GNU General Public License as published
285             by the Free Software Foundation; or the Artistic License.
286              
287             See http://dev.perl.org/licenses/ for more information.
288              
289              
290             =cut
291              
292             1; # End of AnyEvent::Handle::ZeroMQ