File Coverage

blib/lib/AnyEvent/ZeroMQ/Subscribe.pm
Criterion Covered Total %
statement 2 4 50.0
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 4 6 66.6


line stmt bran cond sub pod time code
1             package AnyEvent::ZeroMQ::Subscribe;
2             BEGIN {
3 2     2   4061 $AnyEvent::ZeroMQ::Subscribe::VERSION = '0.01';
4             }
5             # ABSTRACT: Non-blocking OO abstraction over ZMQ_SUB publish/subscribe sockets
6 2     2   875 use Moose;
  0            
  0            
7             use true;
8             use namespace::autoclean;
9             use MooseX::Types::Set::Object;
10             use Scalar::Util qw(weaken);
11             use ZeroMQ::Raw::Constants qw(ZMQ_SUB ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE);
12              
13             with 'AnyEvent::ZeroMQ::Role::WithHandle' =>
14             { socket_type => ZMQ_SUB, socket_direction => '' },
15             'MooseX::Traits';
16              
17             has '+_trait_namespace' => ( default => 'AnyEvent::ZeroMQ::Subscribe::Trait' );
18              
19             has 'topics' => (
20             is => 'rw',
21             isa => 'Set::Object',
22             coerce => 1,
23             default => sub { [''] },
24             trigger => sub {
25             my ($self, $new, $old) = @_;
26             $self->_topics_changed($new, $old);
27             },
28             );
29              
30             sub _topics_changed {
31             my ($self, $new, $old) = @_;
32             return unless $old;
33             # sets are excellent, let's go shopping
34             my $subscribe = $new - $old;
35             my $unsubscribe = $old - $new;
36             $self->_unsubscribe($_) for $unsubscribe->members;
37             $self->_subscribe($_) for $subscribe->members;
38             return $new;
39             }
40              
41             has 'on_read' => (
42             is => 'rw',
43             isa => 'CodeRef',
44             predicate => 'has_on_read',
45             clearer => 'clear_on_read',
46             trigger => sub {
47             my ($self, $val) = @_;
48             weaken $self;
49             $self->handle->on_read(sub { $self->_receive_item(@_) });
50             },
51             );
52              
53             sub _receive_item {
54             my ($self, $h, $item, @rest) = @_;
55             # if we don't has_on_read, got_item can never be called.
56             confess 'BUG: receive_item called but there is no on_read'
57             unless $self->has_on_read; # but check anyway.
58              
59             $self->_call_callback( $self->on_read, $item, @rest );
60             }
61              
62             sub _call_callback { # i wonder what this does
63             my ($self, $cb, $item, @rest) = @_;
64             return $cb->($self, $item, @rest); # who would have guessed!
65             }
66              
67             sub push_read {
68             my ($self, $cb) = @_;
69             weaken $self;
70             $self->handle->push_read(sub {
71             my ($h, $item, @rest) = @_;
72             $self->_call_callback($cb, $item, @rest);
73             });
74             }
75              
76             sub _subscribe {
77             my ($self, $topic) = @_;
78             $self->handle->socket->setsockopt(ZMQ_SUBSCRIBE, $topic);
79             }
80              
81             sub _unsubscribe {
82             my ($self, $topic) = @_;
83             $self->handle->socket->setsockopt(ZMQ_UNSUBSCRIBE, $topic);
84             }
85              
86             after 'BUILD' => sub {
87             my $self = shift;
88             $self->_subscribe($_) for $self->topics->members;
89             };
90              
91             with 'AnyEvent::ZeroMQ::Handle::Role::Generic',
92             'AnyEvent::ZeroMQ::Handle::Role::Readable';
93              
94             __PACKAGE__->meta->make_immutable;
95              
96             __END__
97             =pod
98              
99             =head1 NAME
100              
101             AnyEvent::ZeroMQ::Subscribe - Non-blocking OO abstraction over ZMQ_SUB publish/subscribe sockets
102              
103             =head1 VERSION
104              
105             version 0.01
106              
107             =head1 AUTHOR
108              
109             Jonathan Rockway <jrockway@cpan.org>
110              
111             =head1 COPYRIGHT AND LICENSE
112              
113             This software is copyright (c) 2011 by Jonathan Rockway.
114              
115             This is free software; you can redistribute it and/or modify it under
116             the same terms as the Perl 5 programming language system itself.
117              
118             =cut
119