File Coverage

blib/lib/AnyEvent/ZeroMQ/Handle.pm
Criterion Covered Total %
statement 2 4 50.0
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 4 6 66.6


line stmt bran cond sub pod time code
1             package AnyEvent::ZeroMQ::Handle;
2             BEGIN {
3 1     1   29061 $AnyEvent::ZeroMQ::Handle::VERSION = '0.01';
4             }
5             # ABSTRACT: AnyEvent::Handle-like interface for 0MQ sockets
6 1     1   2068 use Moose;
  0            
  0            
7              
8             use AnyEvent::ZeroMQ;
9             use AnyEvent::ZeroMQ::Types qw(IdentityStr);
10             use ZeroMQ::Raw::Constants qw(ZMQ_NOBLOCK ZMQ_IDENTITY);
11              
12             use Params::Util qw(_CODELIKE);
13             use Scalar::Util qw(weaken);
14             use Try::Tiny;
15             use POSIX qw(EAGAIN EWOULDBLOCK);
16              
17             use true;
18             use namespace::autoclean;
19              
20             has 'socket' => (
21             is => 'ro',
22             isa => 'ZeroMQ::Raw::Socket',
23             handles => [qw/bind connect/],
24             required => 1,
25             );
26              
27             before qw/bind connect/ => sub {
28             $_[0]->identity;
29             };
30              
31             after qw/bind connect/ => sub {
32             my $self = shift;
33             # this can change readability/writability status, so do the checks
34             # again
35             $self->read;
36             $self->write;
37             };
38              
39             has 'identity' => (
40             is => 'rw', # note: you can change this, but it has
41             # no effect until a new bind/connect.
42             isa => IdentityStr,
43             lazy_build => 1,
44             trigger => sub { shift->_change_identity(@_) },
45             );
46              
47             has 'on_read' => (
48             is => 'rw',
49             isa => 'CodeRef',
50             predicate => 'has_on_read',
51             clearer => 'clear_on_read',
52             trigger => sub { $_[0]->read },
53             );
54              
55             has 'on_error' => (
56             is => 'rw',
57             isa => 'CodeRef',
58             predicate => 'has_on_error',
59             clearer => 'clear_on_error',
60             );
61              
62             sub handle_error {
63             my ($self, $str) = @_;
64             return $self->on_error->($str)
65             if $self->has_on_error;
66              
67             warn "AnyEvent::ZeroMQ::Handle: error in callback (ignoring): $str";
68             }
69              
70             has 'on_drain' => (
71             is => 'rw',
72             isa => 'CodeRef',
73             predicate => 'has_on_drain',
74             clearer => 'clear_on_drain',
75             # i don't think we need to trigger this, since if we were
76             # writable, we would be drained.
77             );
78              
79             has [qw/read_watcher write_watcher/] => (
80             init_arg => undef,
81             is => 'ro',
82             lazy_build => 1,
83             );
84              
85             has [qw/read_buffer write_buffer/] => (
86             init_arg => undef,
87             is => 'ro',
88             default => sub { [] },
89             );
90              
91             sub _build_read_watcher {
92             my $self = shift;
93             weaken $self;
94             return AnyEvent::ZeroMQ->io(
95             poll => 'r',
96             socket => $self->socket,
97             cb => sub { $self->read },
98             );
99             }
100              
101             sub _build_write_watcher {
102             my $self = shift;
103             weaken $self;
104             return AnyEvent::ZeroMQ->io(
105             poll => 'w',
106             socket => $self->socket,
107             cb => sub { $self->write },
108             );
109             }
110              
111             sub _build_identity {
112             my ($self) = @_;
113             return $self->socket->getsockopt( ZMQ_IDENTITY );
114             }
115              
116             sub _change_identity {
117             my ($self, $new, $old) = @_;
118             return $self->socket->setsockopt( ZMQ_IDENTITY, $new );
119             }
120              
121             sub has_read_todo {
122             my $self = shift;
123             return exists $self->read_buffer->[0];
124             }
125              
126             sub readable {
127             my $self = shift;
128             return AnyEvent::ZeroMQ->probe( poll => 'r', socket => $self->socket );
129             }
130              
131             sub _read_once {
132             my ($self, $cb) = @_;
133             local $! = 0;
134             try {
135             my $msg = ZeroMQ::Raw::Message->new;
136             $self->socket->recv($msg, ZMQ_NOBLOCK);
137             $cb->($self, $msg->data);
138             }
139             catch {
140             if($! == EWOULDBLOCK || $! == EAGAIN){
141             return;
142             }
143             else {
144             $self->handle_error($_);
145             }
146             };
147             }
148              
149             sub read {
150             my $self = shift;
151              
152             while($self->readable && $self->has_read_todo){
153             $self->_read_once(shift @{$self->read_buffer});
154             }
155              
156             while($self->readable && $self->has_on_read){
157             $self->_read_once($self->on_read);
158             }
159              
160             if($self->has_read_todo || $self->has_on_read){
161             # ensure we have a watcher
162             $self->read_watcher;
163             }
164             else {
165             $self->clear_read_watcher;
166             }
167             }
168              
169             sub push_read {
170             my ($self, $cb) = @_;
171             push @{$self->read_buffer}, $cb;
172             $self->read;
173             }
174              
175             sub has_write_todo {
176             my $self = shift;
177             return exists $self->write_buffer->[0];
178             }
179              
180             sub writable {
181             my $self = shift;
182             return AnyEvent::ZeroMQ->probe( poll => 'w', socket => $self->socket );
183             }
184              
185             sub build_message {
186             my ($self, $cb_or_msg) = @_;
187             my $msg = $cb_or_msg;
188              
189             if(my $cb = _CODELIKE($cb_or_msg)){
190             $msg = $cb->($self);
191             }
192              
193             return $msg
194             if ref $msg && blessed $msg &&
195             $msg->isa('ZeroMQ::Raw::Message');
196              
197             return ZeroMQ::Raw::Message->new_from_scalar($msg)
198             if defined $msg;
199              
200             return;
201             }
202              
203             sub write {
204             my $self = shift;
205             $self->clear_write_watcher;
206              
207             my $wrote_something = 0;
208             while($self->writable && $self->has_write_todo){
209             $wrote_something++;
210             my $buf;
211             local $! = 0;
212             try {
213             $buf = shift @{$self->write_buffer};
214             my $msg = $self->build_message($buf);
215             $self->socket->send($msg, ZMQ_NOBLOCK) if $msg;
216             }
217             catch {
218             if($! == EWOULDBLOCK || $! == EAGAIN){
219             # the got_to_send ensures the string is generated by
220             # zmq and not by a dying write callback. if you
221             # supplied a callback that died with "would block",
222             # then it would be executed again and again and your
223             # program would lock up. bad.
224             unshift @{$self->write_buffer}, $buf if defined $buf;
225             }
226             else {
227             $self->handle_error($_);
228             }
229             }
230             }
231              
232             # XXX: the user is exposed to complexity here; he needs to clear
233             # the on_drain handler if he wants to push_write again.
234             $self->on_drain->($self, $self->writable)
235             if $wrote_something && $self->has_on_drain;
236              
237             $self->write_watcher if $self->has_write_todo;
238             }
239              
240             sub push_write {
241             my ($self, $msg) = @_;
242              
243             if(_CODELIKE($msg) || blessed $msg && $msg->isa('ZeroMQ::Raw::Message')){
244             push @{$self->write_buffer}, $msg;
245             }
246             else {
247             push @{$self->write_buffer}, ZeroMQ::Raw::Message->new_from_scalar($msg);
248             }
249             $self->write;
250             }
251              
252             with 'AnyEvent::ZeroMQ::Handle::Role::Readable',
253             'AnyEvent::ZeroMQ::Handle::Role::Writable',
254             'AnyEvent::ZeroMQ::Handle::Role::Generic';
255              
256             __PACKAGE__->meta->make_immutable;
257              
258             __END__
259             =pod
260              
261             =head1 NAME
262              
263             AnyEvent::ZeroMQ::Handle - AnyEvent::Handle-like interface for 0MQ sockets
264              
265             =head1 VERSION
266              
267             version 0.01
268              
269             =head1 AUTHOR
270              
271             Jonathan Rockway <jrockway@cpan.org>
272              
273             =head1 COPYRIGHT AND LICENSE
274              
275             This software is copyright (c) 2011 by Jonathan Rockway.
276              
277             This is free software; you can redistribute it and/or modify it under
278             the same terms as the Perl 5 programming language system itself.
279              
280             =cut
281