File Coverage

blib/lib/IPC/MPS/EV.pm
Criterion Covered Total %
statement 210 307 68.4
branch 63 164 38.4
condition 9 50 18.0
subroutine 20 25 80.0
pod 0 13 0.0
total 302 559 54.0


line stmt bran cond sub pod time code
1             package IPC::MPS::EV;
2              
3 2     2   110884 use strict;
  2         4  
  2         44  
4 2     2   8 use warnings;
  2         2  
  2         36  
5              
6 2     2   6 use Exporter;
  2         2  
  2         130  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.21';
11              
12 2     2   10 use Carp;
  2         4  
  2         92  
13 2     2   22 use EV;
  2         4  
  2         44  
14 2     2   980 use Socket qw(SOCK_NONBLOCK);
  2         5526  
  2         248  
15 2     2   716 use IO::Socket;
  2         26908  
  2         6  
16 2     2   660 use Scalar::Util qw(refaddr);
  2         4  
  2         74  
17 2     2   1062 use Storable qw(freeze thaw);
  2         4902  
  2         5416  
18              
19              
20             my $DEBUG = 0;
21             $DEBUG and require Data::Dumper;
22              
23             my @spawn = ();
24             my %msg = ();
25             my %fh2vpid = ();
26             my %vpid2fh = ();
27             my %fh2fh = ();
28             my $self_vpid = 0;
29             my $self_parent_fh;
30             my $self_parent_vpid = 0;
31             my $self_parent_closed = 0;
32             my %listener = ();
33             my %node = ();
34             my %snd = ();
35             my $ipc_loop = 0;
36              
37             my %vpid2pid = ();
38 1     1 0 33 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         25  
39              
40             my @rcv = ();
41             my %r_bufs = ();
42             my %w_bufs = ();
43              
44             my %pack = ();
45             my %unpack = ();
46              
47             my %closed = ();
48              
49             my %fh2rw = ();
50             my %fh2ww = ();
51              
52             my ($waited_vpid, $waited_msg, @waited_rv);
53              
54             my $blksize = 1024 * 16;
55              
56              
57             END {
58 2 50 0 2   150 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
59 2         113 close $_ foreach values %fh2fh;
60             }
61              
62             sub spawn(&) {
63 2     2 0 164 my ($spawn) = @_;
64 2 50       120 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, PF_UNSPEC) or die "socketpair: $!";
65 2         10 my $vpid = refaddr $child;
66 2         6 push @spawn, [$vpid, $child, $parent, $spawn];
67 2         6 return $vpid;
68             }
69              
70              
71             sub msg($$) {
72 3     3 0 63 my ($msg, $sub) = @_;
73 3         24 $msg{$msg} = $sub;
74             }
75              
76              
77             sub snd($$;@) {
78 9     9 0 1266 my ($vpid, $msg, @args) = @_;
79 9 50       19 defined $vpid or carp("Argument vpid required"), return;
80 9 50       18 defined $msg or carp("Argument msg required"), return;
81 9 100       28 $vpid = $self_parent_vpid if $vpid == 0;
82 9 50       17 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
83 9         11 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         30  
84 9 50 33     50 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
85 9         26 w_event_cb_reg($vpid);
86 9         15 return 1;
87             }
88              
89              
90 0     0 0 0 sub quit() { EV::unloop }
91              
92              
93             sub snd_wt($$;@) {
94 1     1 0 504 my ($vpid, $msg, @args) = @_;
95 1 50       5 defined $vpid or carp("Argument vpid required"), return;
96 1 50       2 defined $msg or carp("Argument msg required"), return;
97 1         3 snd($vpid, $msg, @args);
98 1         4 wt($vpid, $msg);
99             }
100              
101              
102             sub listener($$;%) {
103 0     0 0 0 my ($host, $port, %args) = @_;
104 0 0       0 defined $host or carp("Argument host required"), return;
105 0 0       0 defined $port or carp("Argument port required"), return;
106 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
107 0 0       0 if ($sock) {
108 0 0       0 _pack_unpack($sock, %args) or return;
109 0         0 $listener{$sock} = $sock;
110             $fh2rw{$sock} = EV::io($sock, EV::READ, sub {
111 0     0   0 my $w = shift;
112 0         0 my $fh = $w->fh;
113 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
114 0         0 my $sock = $fh->accept;
115 0         0 $pack{$sock} = $pack{$fh};
116 0         0 $unpack{$sock} = $unpack{$fh};
117 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
118 0         0 my $vpid = refaddr $sock;
119 0         0 $node{$sock} = $vpid;
120 0         0 $fh2vpid{$sock} = $vpid;
121 0         0 $vpid2fh{$vpid} = $sock;
122 0         0 $fh2fh{$sock} = $sock;
123 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
124 0         0 });
125 0         0 return $sock;
126             } else {
127 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
128 0         0 return;
129             }
130             }
131              
132              
133             sub open_node($$;%) {
134 0     0 0 0 my ($host, $port, %args) = @_;
135 0 0       0 defined $host or carp("Argument host required"), return;
136 0 0       0 defined $port or carp("Argument port required"), return;
137 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
138 0         0 my $addr = sockaddr_in($port,inet_aton($host));
139 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
140 0         0 my $rv = $sock->connect($addr);
141 0 0       0 if ($rv) {
142 0 0       0 _pack_unpack($sock, %args) or return;
143 0         0 my $vpid = refaddr $sock;
144 0         0 $node{$sock} = $vpid;
145 0         0 $fh2vpid{$sock} = $vpid;
146 0         0 $vpid2fh{$vpid} = $sock;
147 0         0 $fh2fh{$sock} = $sock;
148 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
149 0         0 return $vpid;
150             } else {
151 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
152 0         0 return;
153             }
154             }
155              
156              
157             sub _pack_unpack($%) {
158 0     0   0 my ($fh, %args) = @_;
159 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
160 0         0 my $r = eval {
161 0         0 my $r = $unpack->($pack->({a => ["b"]}));
162 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
163             $$r{a}[0] and $$r{a}[0] eq "b")
164             {
165 0         0 return 1;
166             } else {
167 0         0 return 0;
168             }
169             };
170 0 0 0     0 if (not $r or $@) {
171 0         0 carp "False pack unpack test";
172 0         0 return;
173             }
174 0         0 $pack{$fh} = $pack;
175 0         0 $unpack{$fh} = $unpack;
176             } elsif ($args{pack} or $args{unpack}) {
177 0         0 carp "pack and unpack is pair options";
178 0         0 return;
179             }
180 0         0 return 1;
181             }
182              
183              
184             sub receive(&) {
185 3     3 0 84 my ($receive) = @_;
186              
187 3 50       10 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
188              
189 3         108 local $SIG{CHLD} = "IGNORE";
190 3         45 local $SIG{PIPE} = "IGNORE";
191              
192 3         10 foreach (@spawn) {
193 2         8 my ($vpid, $child, $parent, $spawn) = @$_;
194              
195 2         1751 my $kid_pid = fork;
196 2 50       119 defined $kid_pid or die "Can't fork: $!";
197              
198 2 100       61 unless ($kid_pid) {
199              
200 1         20 foreach (@spawn) {
201 1         43 close $$_[1];
202 1 50       28 close $$_[2] if $$_[2] ne $parent;
203             }
204              
205 1         15 close $_ foreach values %fh2fh, values %listener;
206 1         9 $_->stop foreach values %fh2rw, values %fh2ww;
207 1         13 @spawn = ();
208 1         20 %listener = ();
209 1         6 %node = ();
210 1         8 %msg = ();
211 1         10 %fh2vpid = ();
212 1         4 %vpid2fh = ();
213 1         14 %fh2fh = ();
214 1         49 %snd = ();
215              
216 1         3 %vpid2pid = ();
217              
218 1         3 $ipc_loop = 0;
219              
220 1         2 @rcv = ();
221 1         2 %r_bufs = ();
222 1         6 %w_bufs = ();
223              
224 1         7 %pack = ();
225 1         6 %unpack = ();
226              
227 1         3 %closed = ();
228              
229 1         4 %fh2rw = ();
230 1         1 %fh2ww = ();
231              
232 1         4 ($waited_vpid, $waited_msg, @waited_rv) = ();
233              
234 1         7 $self_parent_fh = $parent;
235 1         3 $self_parent_vpid = $self_vpid;
236              
237 1         7 $self_vpid = $vpid;
238              
239 1         5 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
240 1         13 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
241 1         3 $fh2fh{$self_parent_fh} = $self_parent_fh;
242              
243 1         43 $fh2rw{$self_parent_fh} = EV::io($self_parent_fh, EV::READ, \&r_event_cb);
244              
245 1         26 $spawn->();
246              
247 0         0 exit;
248             }
249             else {
250 1         49 $vpid2pid{$vpid} = $kid_pid;
251             }
252             }
253              
254              
255 2         20 foreach (@spawn) {
256 1         4 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
257 1         20 close $parent;
258 1         9 $fh2vpid{$child} = $vpid;
259 1         23 $vpid2fh{$vpid} = $child;
260 1         21 $fh2fh{$child} = $child;
261 1         65 $fh2rw{$child} = EV::io($child, EV::READ, \&r_event_cb);
262             }
263 2         67 @spawn = ();
264              
265              
266              
267 2         35 $receive->();
268              
269              
270              
271 2 50       7 unless ($ipc_loop) {
272 2         9 $ipc_loop = 1;
273 2         20 w_event_cb_reg();
274 2         1035 EV::loop;
275 0         0 $ipc_loop = 0;
276             }
277             }
278              
279              
280             sub wt($$) {
281 1     1 0 2 ($waited_vpid, $waited_msg) = @_;
282 1 50       3 defined $waited_vpid or carp("Argument vpid required"), return;
283 1 50       2 defined $waited_msg or carp("Argument msg required"), return;
284 1 50       3 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
285 1         4 foreach my $i (0 .. $#rcv) {
286 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
287 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
288 0         0 splice @rcv, $i, 1;
289 0 0       0 return wantarray ? @$args : $$args[0];
290             }
291             }
292 1 50       27 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
293 1         29 w_event_cb_reg();
294 1         14 EV::loop;
295 1         2 my @rv = @waited_rv;
296 1         2 ($waited_vpid, $waited_msg, @waited_rv) = ();
297 1 50       6 return wantarray ? @rv : $rv[0];
298             }
299              
300              
301             sub w_event_cb_reg {
302 26     26 0 45 my ($to_vpid) = @_;
303              
304 26 100       87 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
305 25 100       27 if (@{$snd{$to}}) {
  25         5639  
306 10         14 my $fh = $vpid2fh{$to};
307 10 100       21 unless ($fh) {
308 2 50       4 if (@spawn) {
309 2 50       4 carp "Probably have forgotten to call receive." if not defined $to_vpid;
310 2         8 next;
311             } else {
312 0 0       0 if ($self_parent_fh) {
313 0 0       0 unless ($self_parent_closed) {
314 0         0 $fh = $self_parent_fh;
315             } else {
316 0         0 next;
317             }
318             } else {
319 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
320 0         0 next;
321             }
322             }
323             }
324 8 50       23 unless (exists $w_bufs{$fh}) {
325 8         12 my $packet;
326 8 50       16 if (my $pack = $pack{$fh}) {
327 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
328             } else {
329 8         9 $packet = freeze shift @{$snd{$to}};
  8         33  
330             }
331 8         495 my $buf = join "", pack("N", length $packet), $packet;
332 8         22 $w_bufs{$fh} = $buf;
333 8 50 0     16 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
334 8         52 $fh2ww{$fh} = EV::io($fh, EV::WRITE, \&w_event_cb);
335             }
336             }
337             }
338             }
339              
340              
341              
342              
343             sub r_event_cb {
344 9     9 0 38 my $w = shift;
345 9         40 my $fh = $w->fh;
346              
347 9 50       26 $DEBUG > 1 and print "Read event from $self_vpid: \n";
348              
349 9         134 my $len = sysread $fh, (my $_buf), $blksize;
350 9 100       32 if ($len) {
    50          
351 8         30 $r_bufs{$fh} .= $_buf;
352             NEXT_MSG: {
353 8         10 my $buf = $r_bufs{$fh};
  8         17  
354 8 50       17 if (length $buf >= 4) {
355 8         47 my $packet_length = unpack "N", substr $buf, 0, 4, "";
356 8 50       19 if (length $buf >= $packet_length) {
357 8         23 my $packet = substr $buf, 0, $packet_length, "";
358 8   50     34 $r_bufs{$fh} = $buf || "";
359 8 50 0     23 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
360              
361 8         15 my ($from, $to, $msg, $args);
362 8 50       21 if (my $unpack = $unpack{$fh}) {
363 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
364             } else {
365 8         8 ($from, $to, $msg, $args) = @{thaw $packet};
  8         34  
366             }
367              
368 8 50       217 if ($node{$fh}) {
369 0         0 $from = $node{$fh};
370 0         0 $to = $self_vpid;
371             }
372              
373 8 50       21 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
374 8 50       21 if ($to == $self_vpid) {
    0          
375 8 50       13 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
376 8 100 66     32 if (defined $waited_vpid and defined $waited_msg) {
377 1         4 push @rcv, [$from, $msg, $args];
378             } else {
379 7 50       15 if ($msg{$msg}) {
380 7         16 push @rcv, [$from, $msg, $args];
381             } else {
382 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
383             }
384             }
385             } elsif ($vpid2fh{$to}) {
386 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
387 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
388 0         0 w_event_cb_reg();
389             } else {
390 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
391             }
392              
393 8 50       38 redo NEXT_MSG if $r_bufs{$fh};
394             }
395             }
396             }
397             } elsif (defined $len) {
398 1 50       5 if (exists $fh2ww{$fh}) {
399 0         0 delete $fh2ww{$fh};
400             }
401 1         2 my $vpid = delete $fh2vpid{$fh};
402 1         3 delete $vpid2fh{$vpid};
403 1         1 delete $fh2rw{$fh};
404 1         3 delete $r_bufs{$fh};
405 1         2 delete $w_bufs{$fh};
406 1         2 delete $fh2fh{$fh};
407 1         3 delete $vpid2pid{$vpid};
408 1         2 delete $pack{$fh};
409 1         3 delete $unpack{$fh};
410 1 50       3 if (my $node_vpid = $node{$fh}) {
411 0         0 delete $node{$fh};
412 0 0       0 if ($msg{NODE_CLOSED}) {
413 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
414 0         0 w_event_cb_reg();
415             }
416             } else {
417 1 50       4 if ($msg{SPAWN_CLOSED}) {
418 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
419 0         0 w_event_cb_reg();
420             }
421             }
422 1         1 delete $closed{$vpid};
423 1         22 close $fh;
424 1 50 33     17 if ($self_parent_fh and $self_parent_fh eq $fh) {
425 1         3 $self_parent_closed = 1;
426 1 50 33     3 unless (defined $waited_vpid and defined $waited_msg) {
427 1 50       3 unless (@rcv) {
428 1         322 exit;
429             }
430             }
431             }
432             } else {
433 0 0       0 $DEBUG and die "Can't read '$fh': $!";
434             }
435              
436 8 100 66     26 if (defined $waited_vpid and defined $waited_msg) {
437 1         6 foreach my $i (0 .. $#rcv) {
438 1         1 my ($from, $msg, $args)= @{$rcv[$i]};
  1         3  
439 1 50 33     11 if ($msg eq $waited_msg and $from eq $waited_vpid) {
440 1         3 splice @rcv, $i, 1;
441 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
442 1         2 @waited_rv = @$args;
443 1         3 EV::unloop();
444 1         4 return;
445             }
446             }
447 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
448 0         0 EV::unloop();
449 0         0 return;
450             }
451             } else {
452 7         23 while (my $rcv = shift @rcv) {
453 7         11 my ($from, $msg, $args)= @{$rcv};
  7         14  
454 7 50       31 $msg{$msg}->($from, @$args) unless $closed{$from};
455 6         14 w_event_cb_reg();
456             }
457             }
458             }
459              
460              
461              
462             sub w_event_cb {
463 8     8 0 19 my $w = shift;
464 8         26 my $fh = $w->fh;
465              
466 8 50       108 $DEBUG > 1 and print "Write event from $self_vpid.\n";
467 8 50       25 $fh2fh{$fh} or return;
468              
469 8         12 my $buf = $w_bufs{$fh};
470 8         262 my $len = syswrite $fh, $buf, $blksize;
471 8 50       25 if ($len) {
472 8         17 substr $buf, 0, $len, "";
473 8 50       19 if (length $buf) {
474 0         0 $w_bufs{$fh} = $buf;
475             } else {
476 8         25 delete $w_bufs{$fh};
477 8         21 delete $fh2ww{$fh};
478 8         21 w_event_cb_reg();
479             }
480             } else {
481 0 0         $DEBUG and die "Can't write to '$fh': $!";
482             }
483             }
484              
485              
486              
487             1;
488              
489              
490             __END__