File Coverage

blib/lib/IPC/Manager/Base/FS.pm
Criterion Covered Total %
statement 111 153 72.5
branch 24 54 44.4
condition 6 15 40.0
subroutine 26 38 68.4
pod 25 29 86.2
total 192 289 66.4


line stmt bran cond sub pod time code
1             package IPC::Manager::Base::FS;
2 2     2   1017 use strict;
  2         5  
  2         91  
3 2     2   9 use warnings;
  2         3  
  2         121  
4              
5             our $VERSION = '0.000005';
6              
7 2     2   56 use File::Spec;
  2         4  
  2         61  
8 2     2   475 use IO::Select;
  2         1534  
  2         102  
9              
10 2     2   11 use Carp qw/croak/;
  2         3  
  2         112  
11 2     2   1983 use File::Temp qw/tempdir/;
  2         30383  
  2         161  
12 2     2   14 use File::Path qw/remove_tree/;
  2         4  
  2         84  
13              
14 2     2   8 use parent 'IPC::Manager::Client';
  2         21  
  2         12  
15 2         11 use Object::HashBase qw{
16             +path
17             +pidfile
18             +resume_file
19             +select
20 2     2   140 };
  2         3  
21              
22 0     0 0 0 sub pending_messages { 0 }
23              
24 0     0 0 0 sub ready_messages { croak "Not Implemented" }
25 0     0 1 0 sub get_messages { croak "Not Implemented" }
26 0     0 1 0 sub send_message { croak "Not Implemented" }
27 0     0 1 0 sub check_path { croak "Not Implemented" }
28 0     0 1 0 sub make_path { croak "Not Implemented" }
29 0     0 1 0 sub path_type { croak "Not Implemented" }
30              
31 15     15 1 37 sub have_resume_file { -e $_[0]->resume_file }
32              
33             sub select {
34 9     9 0 16 my $self = shift;
35              
36 9 100       31 return $self->{+SELECT} if $self->{+SELECT};
37              
38 4         44 my $sel = IO::Select->new;
39 4         74 $sel->add($self->handles_for_select);
40              
41 4         235 return $self->{+SELECT} = $sel;
42             }
43              
44             sub all_stats {
45 2     2 1 5 my $self = shift;
46              
47 2         4 my $out = {};
48              
49 2 50       92 opendir(my $dh, $self->{+ROUTE}) or die "Could not open dir: $!";
50 2         52 for my $file (readdir($dh)) {
51 20 100       73 next unless $file =~ m/^(.+)\.stats$/;
52 8         21 my $peer = $1;
53 8 50       288 open(my $fh, '<', File::Spec->catfile($self->{+ROUTE}, $file)) or die "Could not open stats file: $!";
54 8         21 $out->{$peer} = do { local $/; $self->{+SERIALIZER}->deserialize(<$fh>) };
  8         22  
  8         260  
55 8         87 close($fh);
56             }
57              
58 2         7 close($dh);
59              
60 2         20 return $out;
61             }
62              
63             sub stats_file {
64 28     28 1 44 my $self = shift;
65 28         2841 return File::Spec->catfile($self->{+ROUTE}, "$self->{+ID}.stats");
66             }
67              
68             sub write_stats {
69 22     22 1 82 my $self = shift;
70              
71 22 50       100 open(my $fh, '>', $self->stats_file) or die "Could not open stats file: $!";
72 22         347 print $fh $self->{+SERIALIZER}->serialize($self->{+STATS});
73 22         1804 close($fh);
74             }
75              
76             sub read_stats {
77 6     6 1 47 my $self = shift;
78              
79 6 50       25 open(my $fh, '<', $self->stats_file) or die "Could not open stats file: $!";
80 6         16 my $stats = do { local $/; <$fh> };
  6         44  
  6         132  
81 6         49 close($fh);
82 6         43 $self->{+SERIALIZER}->deserialize($stats);
83             }
84              
85             sub pidfile {
86 8     8 1 12 my $self = shift;
87 8   33     56 return $self->{+PIDFILE} //= $self->peer_pid_file($self->{+ID});
88             }
89              
90             sub path {
91 41     41 1 58 my $self = shift;
92 41   66     2800 return $self->{+PATH} //= File::Spec->catfile($self->{+ROUTE}, $self->{+ID});
93             }
94              
95             sub resume_file {
96 40     40 1 60 my $self = shift;
97 40   66     492 return $self->{+RESUME_FILE} //= File::Spec->catfile($self->{+ROUTE}, $self->{+ID} . ".resume");
98             }
99              
100             sub peer_pid_file {
101 8     8 1 27 my $self = shift;
102 8         20 my ($peer_id) = @_;
103              
104 8         130 return File::Spec->catfile($self->{+ROUTE}, $peer_id . ".pid");
105             }
106              
107             sub init {
108 8     8 0 12 my $self = shift;
109              
110 8         27 $self->SUPER::init();
111              
112 8         17 my $id = $self->{+ID};
113 8         30 my $path = $self->path;
114              
115 8         26 my $pt = $self->path_type;
116              
117 8 50       20 if ($self->{+RECONNECT}) {
118 0 0       0 croak "${id} ${pt} does not exist" unless $self->check_path($path);
119 0         0 my $pidfile = $self->pidfile;
120 0 0       0 if (open(my $fh, '<', $pidfile)) {
121 0         0 chomp(my $pid = <$fh>);
122 0 0 0     0 croak "Looks like the connection is already running in pid $pid" if $pid && $self->pid_is_running($pid);
123 0         0 close($fh);
124             }
125             }
126             else {
127 8 50       323 croak "${id} ${pt} already exists" if -e $path;
128 8         30 $self->make_path($path);
129             }
130              
131 8         40 $self->write_pid;
132             }
133              
134             sub clear_pid {
135 0     0 1 0 my $self = shift;
136              
137 0         0 my $pidfile = $self->pidfile;
138 0 0       0 unlink($pidfile) or die "Could not unlink pidfile '$pidfile': $!";
139             }
140              
141             sub write_pid {
142 8     8 1 16 my $self = shift;
143              
144 8         24 my $pidfile = $self->pidfile;
145 8 50       1089 open(my $fh, '>', $pidfile) or die "Could not open pidfile '$pidfile': $!";
146 8         136 print $fh $self->{+PID};
147 8         453 close($fh);
148             }
149              
150             sub requeue_message {
151 0     0 1 0 my $self = shift;
152 0         0 $self->pid_check;
153 0 0       0 open(my $fh, '>>', $self->resume_file) or die "Could not open resume file: $!";
154 0         0 for my $msg (@_) {
155 0         0 print $fh $self->{+SERIALIZER}->serialize($msg), "\n";
156             }
157 0         0 close($fh);
158             }
159              
160             sub read_resume_file {
161 25     25 1 40 my $self = shift;
162              
163 25         40 my @out;
164              
165 25         67 my $rf = $self->resume_file;
166 25 50       567 return @out unless -e $rf;
167              
168 0 0       0 open(my $fh, '<', $rf) or die "Could not open resume file: $!";
169 0         0 while (my $line = <$fh>) {
170 0         0 push @out => IPC::Manager::Message->new($self->{+SERIALIZER}->deserialize($line));
171             }
172 0         0 close($fh);
173              
174 0 0       0 unlink($rf) or die "Could not unlink resume file";
175              
176 0         0 return @out;
177             }
178              
179             sub post_disconnect_hook {
180 8     8 1 16 my $self = shift;
181 8         35 $self->SUPER::post_disconnect_hook;
182 8         19 remove_tree($self->path, {keep_root => 0, safe => 1});
183             }
184              
185             sub pre_suspend_hook {
186 0     0 1 0 my $self = shift;
187 0         0 $self->clear_pid;
188             }
189              
190             sub peers {
191 10     10 1 17 my $self = shift;
192              
193 10         15 my @out;
194              
195 10 50       555 opendir(my $dh, $self->{+ROUTE}) or die "Could not open dir: $!";
196 10         331 for my $file (readdir($dh)) {
197 94 100       204 next if $file eq $self->{+ID};
198 86 100       220 next if $file =~ m/^(\.|_)/;
199 66 100       165 next if $file =~ m/\.pid$/;
200 32 100       67 $self->peer_exists($file) or next;
201              
202 18         50 push @out => $file;
203             }
204              
205 10         34 close($dh);
206              
207 10         159 return sort @out;
208             }
209              
210             sub peer_pid {
211 0     0 1 0 my $self = shift;
212 0         0 my ($peer_id) = @_;
213              
214 0 0       0 my $path = $self->peer_exists($peer_id) or return undef;
215 0         0 my $pidfile = $self->peer_pid_file($peer_id);
216 0 0       0 return 0 unless -f $pidfile;
217 0 0       0 open(my $fh, '<', $pidfile) or return 0;
218 0         0 chomp(my $pid = <$fh>);
219 0         0 close($fh);
220 0         0 return $pid;
221             }
222              
223             sub peer_exists {
224 58     58 1 87 my $self = shift;
225 58         105 my ($peer_id) = @_;
226              
227 58 50       108 croak "'peer_id' is required" unless $peer_id;
228              
229 58         456 my $path = File::Spec->catdir($self->{+ROUTE}, $peer_id);
230 58 100       209 return $path if $self->check_path($path);
231 14         117 return undef;
232             }
233              
234             sub spawn {
235 2     2 1 4 my $class = shift;
236 2         8 my (%params) = @_;
237              
238 2   33     27 my $template = delete $params{template} // "PerlIPCManager-$$-XXXXXX";
239 2         12 my $dir = tempdir($template, TMPDIR => 1, CLEANUP => 0, %params);
240              
241 2         1331 return "$dir";
242             }
243              
244             sub unspawn {
245 0     0 1   my $class = shift;
246 0           my ($route) = @_;
247 0           remove_tree($route, {keep_root => 0, safe => 1});
248             }
249              
250             1;
251              
252             __END__