File Coverage

blib/lib/Mojo/MySQL5/PubSub.pm
Criterion Covered Total %
statement 9 86 10.4
branch 0 26 0.0
condition 1 13 7.6
subroutine 3 11 27.2
pod 3 3 100.0
total 16 139 11.5


line stmt bran cond sub pod time code
1             package Mojo::MySQL5::PubSub;
2 7     7   29 use Mojo::Base 'Mojo::EventEmitter';
  7         8  
  7         27  
3              
4 7     7   885 use Scalar::Util 'weaken';
  7         11  
  7         352  
5              
6 7   50 7   25 use constant DEBUG => $ENV{MOJO_PUBSUB_DEBUG} || 0;
  7         9  
  7         6628  
7              
8             has 'mysql';
9              
10             sub DESTROY {
11 0     0     my $self = shift;
12 0 0 0       return unless $self->{db} and $self->mysql;
13 0           $self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ?', $self->_subscriber_pid);
14             }
15              
16             sub listen {
17 0     0 1   my ($self, $channel, $cb) = @_;
18 0           my $pid = $self->_subscriber_pid;
19 0           warn "listen channel:$channel subscriber:$pid\n" if DEBUG;
20 0           $self->mysql->db->query('replace mojo_pubsub_subscribe(pid, channel, ts) values (?, ?, current_timestamp)',
21             $pid, $channel);
22 0           push @{$self->{chans}{$channel}}, $cb;
  0            
23 0           return $cb;
24             }
25              
26             sub notify {
27 0     0 1   my ($self, $channel, $payload) = @_;
28 0           my $db = $self->mysql->db;
29 0   0       $payload //= '';
30 0           warn "notify channel:$channel $payload\n" if DEBUG;
31 0 0         $self->_init($db) unless $self->{init}++;
32 0           $db->query('insert into mojo_pubsub_notify(channel, payload) values (?, ?)', $channel, $payload);
33 0           return $self;
34             }
35              
36             sub unlisten {
37 0     0 1   my ($self, $channel, $cb) = @_;
38 0           my $pid = $self->_subscriber_pid;
39 0           warn "unlisten channel:$channel subscriber:$pid\n" if DEBUG;
40 0           my $chan = $self->{chans}{$channel};
41 0           @$chan = grep { $cb ne $_ } @$chan;
  0            
42 0 0         return $self if @$chan;
43 0           $self->mysql->db->query('delete from mojo_pubsub_subscribe where pid = ? and channel = ?', $pid, $channel);
44 0           delete $self->{chans}{$channel};
45 0           return $self;
46             }
47              
48             sub _notifications {
49 0     0     my $self = shift;
50 0           my $result = $self->{db}->query('select id, channel, payload from mojo_pubsub_notify where id > ?', $self->{last_id});
51              
52 0           while (my $row = $result->array) {
53 0           my ($id, $channel, $payload) = @$row;
54 0           $self->{last_id} = $id;
55 0 0         next unless exists $self->{chans}{$channel};
56 0           warn "received $id on $channel: $payload\n" if DEBUG;
57 0           for my $cb (@{$self->{chans}{$channel}}) { $self->$cb($payload) }
  0            
  0            
58             }
59             }
60              
61             sub _init {
62 0     0     my ($self, $db) = @_;
63              
64 0           $self->mysql->migrations->name('pubsub')->from_data->migrate;
65              
66             # cleanup old subscriptions and notifications
67 0           $db->query('delete from mojo_pubsub_notify where ts < date_add(current_timestamp, interval -10 minute)');
68 0           $db->query('delete from mojo_pubsub_subscribe where ts < date_add(current_timestamp, interval -1 hour)');
69             }
70              
71             sub _subscriber_pid {
72 0     0     my $self = shift;
73              
74             # Fork-safety
75 0 0 0       if (($self->{pid} //= $$) ne $$) {
76 0 0         my $pid = $self->{db}->pid if $self->{db};
77 0           warn 'forked subscriber pid:' . ($pid || 'N/A') . "\n" if DEBUG;
78 0 0         $self->{db}->disconnect if $pid;
79 0           delete @$self{qw(chans init pid db)};
80             }
81              
82 0 0         return $self->{db}->pid if $self->{db};
83              
84 0           $self->{db} = $self->mysql->db;
85 0           my $pid = $self->{db}->pid;
86 0           $self->{db}->connection->url->options->{query_timeout} = 610;
87              
88 0 0         $self->_init($self->{db}) unless $self->{init}++;
89              
90 0 0         if (defined $self->{last_id}) {
91              
92             # read unread notifications
93 0           $self->_notifications;
94             }
95             else {
96             # get id of the last message
97 0           my $array = $self->{db}->query('select id from mojo_pubsub_notify order by id desc limit 1')->array;
98 0 0         $self->{last_id} = defined $array ? $array->[0] : 0;
99             }
100              
101             # re-subscribe
102 0           $self->{db}->query('replace mojo_pubsub_subscribe(pid, channel) values (?, ?)', $pid, $_)
103 0           for keys %{$self->{chans}};
104              
105 0           weaken $self->{db}->{mysql};
106 0           weaken $self;
107              
108 0           my $cb;
109             $cb = sub {
110 0     0     my ($db, $err, $result) = @_;
111 0 0 0       if ($err) {
    0          
112 0           warn "wake up error: $err" if DEBUG;
113 0           eval { $db->disconnect };
  0            
114 0           delete $self->{db};
115 0           eval { $self->_subscriber_pid };
  0            
116             }
117             elsif ($self and $self->{db}) {
118 0           $self->_notifications;
119 0           $db->query('update mojo_pubsub_subscribe set ts = current_timestamp where pid = ?', $pid);
120 0           $db->query('select sleep(600)', $cb);
121             }
122 0           };
123 0           $self->{db}->query('select sleep(600)', $cb);
124              
125 0           warn "reconnect subscriber pid: $pid\n" if DEBUG;
126 0           $self->emit(reconnect => $self->{db});
127              
128 0           return $pid;
129             }
130              
131             1;
132              
133             =encoding utf8
134              
135             =head1 NAME
136              
137             Mojo::MySQL5::PubSub - Publish/Subscribe
138              
139             =head1 SYNOPSIS
140              
141             use Mojo::MySQL5::PubSub;
142              
143             my $pubsub = Mojo::MySQL5::PubSub->new(mysql => $mysql);
144             my $cb = $pubsub->listen(foo => sub {
145             my ($pubsub, $payload) = @_;
146             say "Received: $payload";
147             });
148             $pubsub->notify(foo => 'bar');
149             $pubsub->unlisten(foo => $cb);
150              
151             =head1 DESCRIPTION
152              
153             L is implementation of the publish/subscribe
154             pattern used by L.
155              
156             Although MySQL does not have C like PostgreSQL and other RDBMs,
157             this module implements similar feature.
158              
159             Single Database connection waits for notification by executing C on server.
160             C and subscribed channels in stored in C table.
161             Inserting new row in C table triggers C for
162             all connections waiting for notification.
163              
164             C privilege is needed for MySQL user to see other users processes.
165             C privilege is needed to be able to execute C for statements
166             started by other users.
167             C privilege may be needed to be able to define trigger.
168              
169             If your applications use this module using different MySQL users it is important
170             the migration script to be executed by user having C privilege on the database.
171              
172              
173             =head1 EVENTS
174              
175             L inherits all events from L and can
176             emit the following new ones.
177              
178             =head2 reconnect
179              
180             $pubsub->on(reconnect => sub {
181             my ($pubsub, $db) = @_;
182             ...
183             });
184              
185             Emitted after switching to a new database connection for sending and receiving
186             notifications.
187              
188             =head1 ATTRIBUTES
189              
190             L implements the following attributes.
191              
192             =head2 mysql
193              
194             my $mysql = $pubsub->mysql;
195             $pubsub = $pubsub->mysql(Mojo::MySQL5->new);
196              
197             L object this publish/subscribe container belongs to.
198              
199             =head1 METHODS
200              
201             L inherits all methods from L and
202             implements the following new ones.
203              
204             =head2 listen
205              
206             my $cb = $pubsub->listen(foo => sub {...});
207              
208             Subscribe to a channel, there is no limit on how many subscribers a channel can
209             have.
210              
211             # Subscribe to the same channel twice
212             $pubsub->listen(foo => sub {
213             my ($pubsub, $payload) = @_;
214             say "One: $payload";
215             });
216             $pubsub->listen(foo => sub {
217             my ($pubsub, $payload) = @_;
218             say "Two: $payload";
219             });
220              
221             =head2 notify
222              
223             $pubsub = $pubsub->notify('foo');
224             $pubsub = $pubsub->notify(foo => 'bar');
225              
226             Notify a channel.
227              
228             =head2 unlisten
229              
230             $pubsub = $pubsub->unlisten(foo => $cb);
231              
232             Unsubscribe from a channel.
233              
234             =head1 DEBUGGING
235              
236             You can set the C environment variable to get some
237             advanced diagnostics information printed to C.
238              
239             MOJO_PUBSUB_DEBUG=1
240              
241             =head1 SEE ALSO
242              
243             L, L, L.
244              
245             =cut
246              
247             __DATA__