File Coverage

blib/lib/Parallel/Scoreboard.pm
Criterion Covered Total %
statement 81 87 93.1
branch 19 36 52.7
condition 4 12 33.3
subroutine 16 18 88.8
pod 4 4 100.0
total 124 157 78.9


line stmt bran cond sub pod time code
1             package Parallel::Scoreboard;
2              
3 2     2   1045 use strict;
  2         3  
  2         75  
4 2     2   10 use warnings;
  2         3  
  2         70  
5              
6 2     2   7 use Digest::MD5 qw(md5);
  2         4  
  2         115  
7 2     2   9 use Fcntl qw(:flock);
  2         3  
  2         268  
8 2     2   11 use IO::Handle;
  2         4  
  2         92  
9 2     2   1089 use POSIX qw(:fcntl_h);
  2         11401  
  2         10  
10 2     2   2532 use File::Path;
  2         4  
  2         170  
11              
12             our $VERSION = '0.08';
13              
14             use Class::Accessor::Lite (
15 2         15 ro => [ qw(base_dir worker_id) ],
16 2     2   1160 );
  2         2277  
17              
18             sub new {
19 2     2 1 11 my $klass = shift;
20 2         6 my %args = @_;
21             die "mandatory parameter:base_dir is missing"
22 2 50       7 unless $args{base_dir};
23             # create base_dir if necessary
24 2 50       30 if (! -e $args{base_dir}) {
25             mkpath $args{base_dir}
26 0 0 0     0 or -e $args{base_dir} or die "failed to create directory:$args{base_dir}:$!";
27             }
28             # build object
29             my $self = bless {
30 16     16   421 worker_id => sub { $$ },
31 2         30 %args,
32             }, $klass;
33             # remove my status file, just in case
34 2         7 unlink $self->_build_filename();
35            
36 2         7 return $self;
37             }
38              
39             sub DESTROY {
40 2     2   1364 my $self = shift;
41             # if file is open, close and unlink
42 2 50       34 if ($self->{fh}) {
43 2         32 close $self->{fh};
44             # during global destruction we may already have lost this
45 2 100       46 unlink $self->_build_filename() if ($self->{base_dir});
46             }
47             }
48              
49             sub update {
50 2     2 1 250 my ($self, $status) = @_;
51             # open file at the first invocation (tmpfn => lock => rename)
52 2         6 my $id = $self->worker_id->();
53 2 50 33     17 if ($self->{fh} && $self->{id_for_fh} ne $id) {
54             # fork? close but do not unlock
55 0         0 close $self->{fh};
56 0         0 undef $self->{fh};
57             }
58 2 50       5 unless ($self->{fh}) {
59 2         13 my $fn = $self->_build_filename();
60 2 50       14241 open my $fh, '>', "$fn.tmp"
61             or die "failed to open file:$fn.tmp:$!";
62 2         43 autoflush $fh 1;
63 2 50       193 flock $fh, LOCK_EX
64             or die "failed to flock file:$fn.tmp:$!";
65 2 50       203 rename "$fn.tmp", $fn
66             or die "failed to rename file:$fn.tmp to $fn:$!";
67 2         8 $self->{fh} = $fh;
68 2         22 $self->{id_for_fh} = $id;
69             }
70             # write to file with size of the status and its checksum
71 2 50       23 seek $self->{fh}, 0, SEEK_SET
72             or die "seek failed:$!";
73 2         3 print {$self->{fh}} (
  2         188  
74             md5($status),
75             pack("N", length $status),
76             $status,
77             );
78             }
79              
80             sub read_all {
81 3     3 1 2003151 my $self = shift;
82 3         15 my %ret;
83             $self->_for_all(
84             sub {
85 8     8   14 my ($id, $fh) = @_;
86             # detect collision using md5
87 8         29 for (1..10) {
88 8 50       35 seek $fh, 0, SEEK_SET
89             or die "seek failed:$!";
90 8         5 my $data = do { local $/; join '', <$fh> };
  8         39  
  8         175  
91             # silently ignore if data is too short
92 8 50       27 return if length($data) < 16 + 4;
93             # parse input
94 8         15 my $md5 = substr($data, 0, 16);
95 8         31 my $size = unpack("N", substr($data, 16, 4));
96 8         14 my $status = substr($data, 20, $size);
97             # compare md5 to detect collision
98             next
99 8 50       61 if md5($status) ne $md5;
100             # have read correct data, save and return
101 8         29 $ret{$id} = $status;
102 8         19 return;
103             }
104             # failed to read data in 10 consecutive attempts, bug?
105 0         0 warn "failed to read status of id:$id, skipping";
106             }
107 3         93 );
108 3         38 \%ret;
109             }
110              
111             sub cleanup {
112 0     0 1 0 my $self = shift;
113 0     0   0 $self->_for_all(sub {});
114             }
115              
116             sub _for_all {
117 3     3   13 my ($self, $cb) = @_;
118 3         659 my @files = glob "$self->{base_dir}/status_*";
119 3         35 for my $fn (@files) {
120             # obtain id from filename (or else ignore)
121 9 50       109 $fn =~ m|/status_(.*)$|
122             or next;
123 9         45 my $id = $1;
124             # ignore files removed after glob but before open
125 9 50       367 open my $fh, '+<', $fn
126             or next;
127             # check if the file is still opened by the owner process using flock
128 9 100 100     43 if ($id ne $self->worker_id->() && flock $fh, LOCK_EX | LOCK_NB) {
129             # the owner has died, remove status file
130 1         7 close $fh;
131             unlink $fn
132             or
133             not $!{ENOENT} and
134 1 50 0     203 warn "failed to remove an obsolete scoreboard file:$fn:$!";
135 1         8 next;
136             }
137             # invoke
138 8         27 $cb->($id, $fh);
139             # close
140 8         82 close $fh;
141             }
142             }
143              
144             sub _build_filename {
145 5     5   9 my $self = shift;
146 5         30 return "$self->{base_dir}/status_" . $self->worker_id->();
147             }
148              
149             1;
150             __END__