File Coverage

blib/lib/Mercury/Pattern/PushPull.pm
Criterion Covered Total %
statement 45 45 100.0
branch 5 6 83.3
condition 2 3 66.6
subroutine 9 9 100.0
pod 5 5 100.0
total 66 68 97.0


line stmt bran cond sub pod time code
1             package Mercury::Pattern::PushPull;
2             our $VERSION = '0.014';
3             # ABSTRACT: Manage a push/pull pattern for a single topic
4              
5             #pod =head1 SYNOPSIS
6             #pod
7             #pod # Connect the pusher
8             #pod my $push_ua = Mojo::UserAgent->new;
9             #pod my $push_tx = $ua->websocket( '/push/foo' );
10             #pod
11             #pod # Connect the puller socket
12             #pod my $pull_ua = Mojo::UserAgent->new;
13             #pod my $pull_tx = $ua->websocket( '/pull/foo' );
14             #pod
15             #pod # Connect the two sockets using push/pull
16             #pod my $pattern = Mercury::Pattern::PushPull->new;
17             #pod $pattern->add_pusher( $push_tx );
18             #pod $pattern->add_puller( $pull_tx );
19             #pod
20             #pod # Send a message
21             #pod $pull_tx->on( message => sub {
22             #pod my ( $tx, $msg ) = @_;
23             #pod print $msg; # Hello, World!
24             #pod } );
25             #pod $push_tx->send( 'Hello, World!' );
26             #pod
27             #pod =head1 DESCRIPTION
28             #pod
29             #pod This pattern connects pushers, which send messages, to pullers, which
30             #pod recieve messages. Each message sent by a pusher will be received by
31             #pod a single puller. This pattern is useful for dealing out jobs to workers.
32             #pod
33             #pod =head1 SEE ALSO
34             #pod
35             #pod =over
36             #pod
37             #pod =item L
38             #pod
39             #pod =item L
40             #pod
41             #pod =back
42             #pod
43             #pod =cut
44              
45 2     2   12 use Mojo::Base 'Mojo';
  2         4  
  2         9  
46              
47             #pod =attr pullers
48             #pod
49             #pod Connected websockets ready to receive messages.
50             #pod
51             #pod =cut
52              
53             has pullers => sub { [] };
54              
55             #pod =attr pushers
56             #pod
57             #pod Connected websockets who will be pushing messages.
58             #pod
59             #pod =cut
60              
61             has pushers => sub { [] };
62              
63             #pod =attr current_puller_index
64             #pod
65             #pod The puller we will use to send the next message from a pusher.
66             #pod
67             #pod =cut
68              
69             has current_puller_index => sub { 0 };
70              
71             #pod =method add_puller
72             #pod
73             #pod $pat->add_puller( $tx );
74             #pod
75             #pod Add a puller to this broker. Pullers are given messages in a round-robin, one
76             #pod at a time, by pushers.
77             #pod
78             #pod =cut
79              
80             sub add_puller {
81 6     6 1 42 my ( $self, $tx ) = @_;
82             $tx->on( finish => sub {
83 5     5   232 my ( $tx ) = @_;
84 5         20 $self->remove_puller( $tx );
85 6         41 } );
86 6         34 push @{ $self->pullers }, $tx;
  6         25  
87 6         25 return;
88             }
89              
90             #pod =method add_pusher
91             #pod
92             #pod $pat->add_pusher( $tx );
93             #pod
94             #pod Add a pusher to this broker. Pushers send messages to be processed by pullers.
95             #pod
96             #pod =cut
97              
98             sub add_pusher {
99 2     2 1 16 my ( $self, $tx ) = @_;
100             $tx->on( message => sub {
101 7     7   37957 my ( $tx, $msg ) = @_;
102 7         39 $self->send_message( $msg );
103 2         16 } );
104             $tx->on( finish => sub {
105 2     2   105 my ( $tx ) = @_;
106 2         11 $self->remove_pusher( $tx );
107 2         18 } );
108 2         9 push @{ $self->pushers }, $tx;
  2         8  
109 2         5 return;
110             }
111              
112             #pod =method send_message
113             #pod
114             #pod $pat->send_message( $msg );
115             #pod
116             #pod Send the given message to the next puller in line.
117             #pod
118             #pod =cut
119              
120             sub send_message {
121 8     8 1 50 my ( $self, $msg ) = @_;
122 8         23 my $i = $self->current_puller_index;
123 8         31 my @pullers = @{ $self->pullers };
  8         19  
124 8         53 $pullers[ $i ]->send( $msg );
125 8         1165 $self->current_puller_index( ( $i + 1 ) % @pullers );
126 8         54 return;
127             }
128              
129             #pod =method remove_puller
130             #pod
131             #pod $pat->remove_puller( $tx );
132             #pod
133             #pod Remove a puller from the list. Called automatically when the puller socket
134             #pod is closed.
135             #pod
136             #pod =cut
137              
138             sub remove_puller {
139 5     5 1 15 my ( $self, $tx ) = @_;
140 5         9 my @pullers = @{ $self->pullers };
  5         16  
141 5         37 for my $i ( 0.. $#pullers ) {
142 6 100       30 if ( $pullers[$i] eq $tx ) {
143 5         10 splice @{ $self->pullers }, $i, 1;
  5         14  
144 5         34 my $current_puller_index = $self->current_puller_index;
145 5 100 66     33 if ( $i > 0 && $current_puller_index >= $i ) {
146 1         3 $self->current_puller_index( $current_puller_index - 1 );
147             }
148 5         21 return;
149             }
150             }
151             }
152              
153             #pod =method remove_pusher
154             #pod
155             #pod $pat->remove_pusher( $tx );
156             #pod
157             #pod Remove a pusher from the list. Called automatically when the pusher socket
158             #pod is closed.
159             #pod
160             #pod =cut
161              
162             sub remove_pusher {
163 2     2 1 7 my ( $self, $tx ) = @_;
164 2         3 my @pushers = @{ $self->pushers };
  2         9  
165 2         16 for my $i ( 0.. $#pushers ) {
166 2 50       12 if ( $pushers[$i] eq $tx ) {
167 2         5 splice @pushers, $i, 1;
168 2         7 return;
169             }
170             }
171             }
172              
173             1;
174              
175             __END__