File Coverage

blib/lib/IPC/Manager/Spawn.pm
Criterion Covered Total %
statement 77 93 82.8
branch 12 32 37.5
condition 7 18 38.8
subroutine 16 17 94.1
pod 8 9 88.8
total 120 169 71.0


line stmt bran cond sub pod time code
1             package IPC::Manager::Spawn;
2 2     2   15 use strict;
  2         5  
  2         67  
3 2     2   7 use warnings;
  2         3  
  2         115  
4              
5             our $VERSION = '0.000005';
6              
7 2     2   9 use Carp qw/croak/;
  2         18  
  2         115  
8 2     2   9 use IPC::Manager::Serializer::JSON();
  2         4  
  2         130  
9              
10             use overload(
11             fallback => 1,
12              
13 2     2   38 '""' => sub { $_[0]->info },
14 2     2   12 );
  2         2  
  2         15  
15              
16 2         13 use Object::HashBase qw{
17            
18            
19            
20            
21            
22            
23            
24 2     2   1191 };
  2         11526  
25              
26             sub init {
27 2     2 0 78 my $self = shift;
28              
29 2   33     80 $self->{+PID} //= $$;
30 2   50     6 $self->{+GUARD} //= 1;
31              
32 2 50       7 croak "'protocol' is a required attribute" unless $self->{+PROTOCOL};
33 2 50       9 croak "'route' is a required attribute" unless $self->{+ROUTE};
34 2 50       8 croak "'serializer' is a required attribute" unless $self->{+SERIALIZER};
35             }
36              
37             sub info {
38 4     4 1 1376 my $self = shift;
39 4         7 return IPC::Manager::Serializer::JSON->serialize([@{$self}{PROTOCOL(), SERIALIZER(), ROUTE()}]);
  4         34  
40             }
41              
42             sub connect {
43 2     2 1 4 my $self = shift;
44 2         6 my ($id) = @_;
45 2         45 return $self->{+PROTOCOL}->connect($id, $self->{+SERIALIZER}, $self->{+ROUTE});
46             }
47              
48             sub terminate {
49 2     2 1 4 my $self = shift;
50 2         6 my ($con) = @_;
51 2   33     10 $con //= $self->connect('spawn');
52              
53 2         12 $con->broadcast({terminate => 1});
54              
55 2 50       19 if ($self->{+SIGNAL()}) {
56 0         0 for my $peer ($con->peers) {
57 0 0       0 my $pid = eval { $con->peer_pid($peer) } or next;
  0         0  
58 0 0       0 next if $pid == $$;
59 0 0       0 kill($self->{+SIGNAL()}, $pid) if $self->{+SIGNAL()};
60             }
61             }
62             }
63              
64             sub wait {
65 2     2 1 5 my $self = shift;
66 2         7 my ($con) = @_;
67 2   33     8 $con //= $self->connect('spawn');
68              
69 2         21 for my $client (IPC::Manager::Client->local_clients($self->{+ROUTE})) {
70 8 100       33 eval { $client->disconnect; 1 } or warn $@;
  8         48  
  6         25  
71             }
72              
73 2         6 while (1) {
74 2         5 my @found;
75 2         12 for my $peer ($con->peers) {
76 0 0       0 next if $peer eq $con->id;
77 0 0       0 next unless eval { $con->peer_pid($peer) };
  0         0  
78 0         0 push @found => $peer;
79             }
80              
81 2 50       12 last unless @found;
82              
83 0         0 print "Waiting for clients to go away: " . join(', ' => sort @found) . "\n";
84 0         0 sleep 1;
85             }
86             }
87              
88             sub sanity_delta {
89 2     2 1 3 my $self = shift;
90 2         7 my ($con) = @_;
91 2   33     7 $con //= $self->connect('spawn');
92              
93 2         35 my $stats = $con->all_stats;
94              
95 2         5 my $deltas = {};
96 2         22 for my $peer1 (keys %$stats) {
97 8         13 my $stat = $stats->{$peer1};
98 8   50     18 my $sent = $stat->{sent} // {};
99 8   50     18 my $read = $stat->{read} // {};
100              
101 8         14 for my $peer2 (keys %$sent) {
102 14         34 $deltas->{"$peer2 -> $peer1"} += $sent->{$peer2};
103             }
104              
105 8         17 for my $peer2 (keys %$read) {
106 14         32 $deltas->{"$peer1 -> $peer2"} -= $read->{$peer2};
107             }
108             }
109              
110 2 50       7 delete $deltas->{$_} for grep { /(:spawn|spawn:)/ || !$deltas->{$_} } keys %$deltas;
  14         60  
111              
112 2 50       8 return undef unless keys %$deltas;
113 2         17 return $deltas;
114             }
115              
116             sub sanity_check {
117 2     2 1 4 my $self = shift;
118              
119 2 50       9 my $delta = $self->sanity_delta(@_) or return;
120              
121 2         6 die "\nMessages sent vs received mismatch:\n Positive means sent and not recieved.\n negative means recieved more messages than were sent\n" . join("\n" => map { " $delta->{$_} $_" } sort keys %$delta) . "\n\n";
  2         45  
122             }
123              
124             sub DESTROY {
125 2     2   6252 my $self = shift;
126 2 50       13 return unless $self->{+GUARD};
127              
128 2         9 $self->shutdown();
129             }
130              
131             sub unspawn {
132 0     0 1 0 my $self = shift;
133              
134 0         0 $self->{+PROTOCOL}->unspawn($self->{+ROUTE}, delete $self->{+STASH});
135             }
136              
137             sub shutdown {
138 2     2 1 4 my $self = shift;
139              
140 2 50       13 return unless $self->{+PID} == $$;
141              
142 2         10 my $con = $self->connect('spawn');
143              
144 2         45 $self->terminate($con);
145 2         12 $self->wait($con);
146 2         21 $self->sanity_check($con);
147              
148 0           $con->disconnect;
149 0           $con = undef;
150              
151 0           $self->unspawn;
152             }
153              
154             1;
155              
156             __END__