File Coverage

blib/lib/App/Chart/Glib/Ex/DirBroadcast.pm
Criterion Covered Total %
statement 45 174 25.8
branch 5 56 8.9
condition 1 21 4.7
subroutine 14 27 51.8
pod 7 9 77.7
total 72 287 25.0


line stmt bran cond sub pod time code
1             # connect_front, or priority level
2             # message length limit
3              
4              
5              
6             # Copyright 2007, 2008, 2009, 2010, 2015 Kevin Ryde
7              
8             # This file is part of Chart.
9             #
10             # Chart is free software; you can redistribute it and/or modify it under the
11             # terms of the GNU General Public License as published by the Free Software
12             # Foundation; either version 3, or (at your option) any later version.
13             #
14             # Chart is distributed in the hope that it will be useful, but WITHOUT ANY
15             # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16             # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
17             # details.
18             #
19             # You should have received a copy of the GNU General Public License along
20             # with Chart. If not, see <http://www.gnu.org/licenses/>.
21              
22             package App::Chart::Glib::Ex::DirBroadcast;
23 1     1   437 use 5.008;
  1         3  
24 1     1   5 use strict;
  1         2  
  1         16  
25 1     1   4 use warnings;
  1         2  
  1         29  
26 1     1   4 use Carp 'carp','croak';
  1         1  
  1         46  
27 1     1   8 use File::Spec;
  1         3  
  1         43  
28              
29 1     1   305 use Class::Singleton 1.03; # 1.03 for _new_instance()
  1         274  
  1         29  
30 1     1   9 use base 'Class::Singleton';
  1         3  
  1         154  
31             *_new_instance = \&new;
32              
33 1     1   7 use constant MAXLEN => 16384;
  1         1  
  1         1297  
34              
35              
36             sub new {
37 1     1 1 8 my ($class, $directory) = @_;
38 1         4 return bless { directory => $directory }, $class;
39             }
40              
41             sub DESTROY {
42 1     1   711 my ($self) = @_;
43 1         3 delete $self->{'listen_source_ids'};
44             # close socket before removing file
45 1         1 delete $self->{'listen_sock'};
46 1 50       24 if (my $filename = delete $self->{'listen_filename'}) {
47             ### DirBroadcast remove: $filename
48 0         0 unlink ($filename);
49             }
50             }
51              
52             sub directory {
53 0     0 1 0 my ($self, $newval) = @_;
54 0 0       0 ref($self) or $self = $self->instance;
55              
56 0 0       0 if (@_ < 2) { return $self->{'directory'}; }
  0         0  
57 0 0       0 if ($self->{'listen_source_ids'}) {
58 0         0 croak 'DirBroadcast: cannot set directory after listen';
59             }
60 0         0 $self->{'directory'} = $newval;
61             }
62              
63             # connections is a hashref of key to arrayref of subrs, ie.
64             #
65             # $self->{'connections'} = { 'foo' => [ \&handler1, \&handler2 ],
66             # 'bar' => [ \&handler3, \&handler4 ] };
67             #
68             sub connect {
69 1     1 1 9 my ($self, $key, $subr) = @_;
70 1 50       7 ref($self) or $self = $self->instance;
71              
72 1   50     22 my $aref = ($self->{'connections'}->{$key} ||= []);
73 1         4 push @$aref, $subr;
74             }
75             sub connect_first {
76 0     0 0 0 my ($self, $key, $subr) = @_;
77 0 0       0 ref($self) or $self = $self->instance;
78 0   0     0 my $aref = ($self->{'connections'}->{$key} ||= []);
79 0         0 unshift @$aref, $subr;
80             }
81              
82             sub connect_for_object {
83 0     0 1 0 my ($self, $key, $subr, $obj) = @_;
84 0 0       0 ref($self) or $self = $self->instance;
85              
86 0         0 require Scalar::Util;
87 0         0 Scalar::Util::weaken ($obj);
88 0         0 my $csubr;
89             $csubr = sub {
90 0 0   0   0 if ($obj) {
91 0         0 $subr->($obj, @_);
92             } else {
93 0         0 _disconnect ($self, $key, $csubr);
94             }
95 0         0 };
96 0         0 $self->connect ($key, $csubr);
97             }
98              
99             sub _disconnect {
100 0     0   0 my ($self, $key, $subr) = @_;
101 0 0       0 if (my $aref = $self->{'connections'}->{$key}) {
102 0         0 @$aref = grep {$_ != $subr} @$aref;
  0         0  
103             }
104             }
105              
106             sub send_locally {
107 1     1 1 5 my ($self, $key, @data) = @_;
108 1 50       5 ref($self) or $self = $self->instance;
109              
110 1 50       9 if ($self->{'hold'}) {
111 0     0   0 push @{$self->{'hold_list'}}, sub { send_locally ($self, $key, @data); };
  0         0  
  0         0  
112              
113             } else {
114 1 50       3 if (my $aref = $self->{'connections'}->{$key}) {
115 1         3 foreach my $subr (@$aref) {
116 1         3 $subr->(@data);
117             }
118             }
119             }
120             }
121              
122             sub listen {
123 0     0 1   my ($self) = @_;
124 0 0         ref($self) or $self = $self->instance;
125              
126 0 0         if ($self->{'listen_source_ids'}) { return; } # already done
  0            
127 0           my $directory = $self->{'directory'};
128 0 0         if (! defined $directory) {
129 0           croak 'DirBroadcast cannot listen until broadcast directory is set';
130             }
131              
132 0           require File::Path;
133 0           File::Path::mkpath ($directory);
134              
135 0           require Sys::Hostname;
136 0           my $hostname = Sys::Hostname::hostname();
137 0           my $listen_filename = $self->{'listen_filename'}
138             = File::Spec->catfile ($directory, "$hostname.$$");
139 0           unlink ($listen_filename); # possible previous leftover
140              
141             # as usual socket() and friends get FD_CLOEXEC set automatically, no need
142             # to do anything special to avoid propagating $listen_sock fd down to
143             # subprocess jobs
144 0           require Socket;
145 0           require IO::Socket;
146             my $listen_sock = $self->{'listen_sock'}
147 0           = do { local $^F = 0; # ensure close-on-exec for the socket
  0            
148 0           IO::Socket->new (Domain => Socket::AF_UNIX(),
149             Type => Socket::SOCK_DGRAM(),
150             Local => $listen_filename) };
151 0 0         binmode ($listen_sock, ':raw') or die;
152             ### DirBroadcast listen: $listen_filename, $listen_sock->fileno
153              
154 0           require Glib;
155 0           require App::Chart::Glib::Ex::MoreUtils;
156 0           require Glib::Ex::SourceIds;
157              
158 0           $self->{'listen_source_ids'}
159             = Glib::Ex::SourceIds->new
160             (Glib::IO->add_watch ($listen_sock->fileno,
161             ['in', 'hup', 'err'],
162             \&_do_listen_read,
163             App::Chart::Glib::Ex::MoreUtils::ref_weak($self)));
164             }
165              
166             sub _do_listen_read {
167 0     0     my ($fd, $conditions, $ref_weak_self) = @_;
168 0   0       my $self = $$ref_weak_self || return 0; # Glib::SOURCE_REMOVE
169              
170             ### DirBroadcast read: $fd, $conditions
171 0           my $buf;
172 0           my $ret = $self->{'listen_sock'}->recv ($buf, MAXLEN, 0);
173 0 0         if (! defined $ret) {
174 0           warn __PACKAGE__." listen read error: $!";
175 0           delete $self->{'listen_source_ids'};
176 0           delete $self->{'listen_sock'};
177 0           return 0; # Glib::SOURCE_REMOVE
178             }
179             ### receive: " bytes=".length($buf), "'$ret'"
180 0           require Storable;
181 0           my $args;
182 0 0         if (! eval { $args = Storable::thaw ($buf); 1 }) {
  0            
  0            
183 0           carp "DirBroadcast: error thawing message, ignoring";
184 0           return 1; # Glib::SOURCE_CONTINUE
185             }
186              
187             ### $args
188 0           $self->send_locally (@$args);
189 0           return 1; # Glib::SOURCE_CONTINUE
190             }
191              
192             my $send_sock;
193              
194             sub send {
195 0     0 1   my ($self, $key, @data) = @_;
196 0 0         ref($self) or $self = $self->instance;
197              
198 0 0         if ($self->{'hold'}) {
199             ### send hold back: $key
200 0     0     push @{$self->{'hold_list'}}, sub { $self->send ($key, @data); };
  0            
  0            
201 0           return;
202             }
203             #### send: $key, \@data
204              
205 0           $self->send_locally ($key, @data);
206              
207 0           my $directory = $self->{'directory'};
208 0           require IO::Dir;
209 0   0       my $dh = IO::Dir->new ($directory) || return;
210 0           my @filenames = $dh->read;
211 0           $dh->close;
212              
213 0   0       my $pattern = ($self->{'pattern'} ||= do {
214 0           require Sys::Hostname;
215 0           my $hostname = Sys::Hostname::hostname();
216 0           qr/^\Q$hostname\E\.[0-9]+$/o
217             });
218              
219 0           my $frozen;
220 0           foreach my $filename (@filenames) {
221 0 0         if ($filename !~ $pattern) { next; }
  0            
222 0           $filename = File::Spec->catfile ($directory, $filename);
223              
224 0 0 0       if ($filename eq ($self->{'listen_filename'}||'')) {
225 0           next; # ourselves
226             }
227             ### DirBroadcast to: $filename
228              
229             # send_sock created on first of any DirBroadcast instance and then kept
230             # open
231 0   0       $send_sock ||= do {
232 0           require IO::Socket;
233 0           require Socket;
234 0           my $sock = do {
235 0           local $^F = 0; # ensure close-on-exec for the socket
236 0           IO::Socket->new (Domain => Socket::AF_UNIX(),
237             Type => Socket::SOCK_DGRAM());
238             };
239 0           $sock->blocking(0);
240 0 0         binmode ($sock, ':raw') or die;
241 0           $sock
242             };
243              
244             # put off freezing until we find someone to send to
245 0 0         if (! defined $frozen) {
246 0           require Storable;
247 0           $frozen = Storable::freeze ([$key, @data]);
248 0 0         if (length ($frozen) > MAXLEN) {
249 0           croak 'DirBroadcast: message too long: ',length($frozen);
250             }
251             }
252              
253 0           my $sun = Socket::sockaddr_un ($filename);
254 0           my $sent = $send_sock->send ($frozen, 0, $sun);
255 0 0 0       if (! defined $sent || $sent != length($frozen)) {
256             ### send: (! defined $sent && "removing, error $!") || "removing, short send $sent bytes"
257 0           unlink ($filename);
258             }
259             }
260             }
261              
262             sub hold {
263 0     0 0   my ($self) = @_;
264 0 0         ref($self) or $self = $self->instance;
265 0           return App::Chart::Glib::Ex::DirBroadcast::Hold->new ($self);
266             }
267              
268             package App::Chart::Glib::Ex::DirBroadcast::Hold;
269 1     1   7 use strict;
  1         2  
  1         17  
270 1     1   4 use warnings;
  1         2  
  1         138  
271              
272             sub new {
273 0     0     my ($class, $dirb) = @_;
274 0           my $self = bless { }, $class;
275 0           $self->{'target'} = $dirb;
276 0           require Scalar::Util;
277 0           Scalar::Util::weaken ($self->{'target'});
278 0           $dirb->{'hold'} ++;
279 0           return $self;
280             }
281              
282             sub DESTROY {
283 0     0     my ($self) = @_;
284 0   0       my $dirb = delete $self->{'target'} || return;
285 0 0         if (-- $dirb->{'hold'}) { return; }
  0            
286              
287 0           my $hold_list = $dirb->{'hold_list'};
288             ### DirBroadcast::Hold now run: $hold_list
289 0           while (my $subr = shift @$hold_list) {
290 0           $subr->();
291             }
292             }
293              
294             1;
295             __END__
296              
297             =head1 NAME
298              
299             App::Chart::Glib::Ex::DirBroadcast -- broadcast messages through a directory of named pipes
300              
301             =head1 SYNOPSIS
302              
303             use App::Chart::Glib::Ex::DirBroadcast;
304             App::Chart::Glib::Ex::DirBroadcast->directory ('/my/directory');
305             App::Chart::Glib::Ex::DirBroadcast->listen;
306              
307             App::Chart::Glib::Ex::DirBroadcast->connect ('my-key', sub { print @_; });
308              
309             App::Chart::Glib::Ex::DirBroadcast->send ('my-key', "hello\n");
310              
311             =head1 DESCRIPTION
312              
313             DirBroadcast is a message broadcasting system based on named pipes in a
314             given directory, with a Glib main loop IO watch listening and calling
315             connected handlers. It's intended for use between multiple running copies
316             of a single application so they can notify each other of changes to files
317             etc.
318              
319             Messages have a string "key" which is a name or type decided by the
320             application, and then any parameters which Storable can handle
321             (L<Storable>). You can have either a single broadcast directory used for
322             all purposes, or create multiple DirBroadcast objects. The method functions
323             described below take either the class name C<App::Chart::Glib::Ex::DirBroadcast> for the
324             single global, or a DirBroadcast object.
325              
326             =head1 FUNCTIONS
327              
328             =over 4
329              
330             =item C<< App::Chart::Glib::Ex::DirBroadcast->new ($directory) >>
331              
332             Create and return a new DirBroadcast object communicating through the given
333             C<$directory>. C<$directory> is created if it doesn't already exist (with a
334             C<croak> if that fails).
335              
336             my $dirb = App::Chart::Glib::Ex::DirBroadcast->new ('/var/run/myapp')
337              
338             =item C<< App::Chart::Glib::Ex::DirBroadcast->directory ($directory) >>
339              
340             =item C<< App::Chart::Glib::Ex::DirBroadcast->directory () >>
341              
342             =item C<< $dirb->directory ($directory) >>
343              
344             =item C<< $dirb->directory () >>
345              
346             Get or set the filesystem directory used for broadcasts.
347              
348             =item C<< App::Chart::Glib::Ex::DirBroadcast->send ($key, $data, ...) >>
349              
350             =item C<< App::Chart::Glib::Ex::DirBroadcast->send_locally ($key, $data, ...) >>
351              
352             =item C<< $dirb->send ($key, $data, ...) >>
353              
354             =item C<< $dirb->send_locally ($key, $data, ...) >>
355              
356             Send a message of C<$key> and optional C<$data> values. C<send> broadcasts
357             to all processes, including the current process, or C<send_locally> just to
358             the current process.
359              
360             A send within the current process just means direct calls to functions
361             registered by C<connect> below. This takes place immediately within the
362             C<send> or C<send_locally>, there's no queuing and the current process
363             doesn't have to have a C<listen> active.
364              
365             The data values can be anything C<Storable> can freeze (see L<Storable>).
366             For C<send_locally> there's no copying, the values are simply passed to the
367             connected functions, so the values can be anything at all.
368              
369             =item C<< App::Chart::Glib::Ex::DirBroadcast->listen () >>
370              
371             =item C<< $dirb->listen () >>
372              
373             Create a named pipe in the broadcast directory to receive messages from
374             other processes, and setup a C<< Glib::IO->add_watch >> to call the
375             functions registered with C<connect> when a message is received.
376              
377             =item C<< App::Chart::Glib::Ex::DirBroadcast->connect ($key, $subr) >>
378              
379             =item C<< $dirb->connect ($key, $subr) >>
380              
381             Connect coderef C<$subr> to be called for messages of C<$key>. The
382             arguments to C<$subr> are the data values passed to C<send>.
383              
384             =item C<< App::Chart::Glib::Ex::DirBroadcast->connect_for_object ($key, $objsubr, $obj) >>
385              
386             =item C<< $dirb->connect_for_object ($key, $osubr, $obj) >>
387              
388             Connect coderef C<$osubr> to be called for notifications of C<$key>, for as
389             long as Perl object C<$obj> exists. C<$obj> is the first argument in each
390             call, followed by the notify data,
391              
392             sub my_func {
393             my ($obj, $data...) = @_;
394             }
395              
396             If C<$obj> is destroyed then C<$osubr> is no longer called. Only a weak
397             reference to C<$obj> is kept, so just because it wants to hear about some
398             notifications it won't keep it alive forever.
399              
400             =back
401              
402             =head1 SEE ALSO
403              
404             L<Glib>, L<Glib::MainLoop>
405              
406             =cut