File Coverage

blib/lib/Parallel/Scoreboard.pm
Criterion Covered Total %
statement 81 87 93.1
branch 20 36 55.5
condition 4 12 33.3
subroutine 16 18 88.8
pod 4 4 100.0
total 125 157 79.6


line stmt bran cond sub pod time code
1             package Parallel::Scoreboard;
2              
3 3     3   2457 use strict;
  3         5  
  3         147  
4 3     3   16 use warnings;
  3         6  
  3         213  
5              
6 3     3   18 use Digest::MD5 qw(md5);
  3         5  
  3         350  
7 3     3   20 use Fcntl qw(:flock);
  3         4  
  3         537  
8 3     3   21 use IO::Handle;
  3         17  
  3         183  
9 3     3   1829 use POSIX qw(:fcntl_h);
  3         24997  
  3         24  
10 3     3   6624 use File::Path;
  3         8  
  3         337  
11              
12             our $VERSION = '0.09';
13              
14             use Class::Accessor::Lite (
15 3         30 ro => [ qw(base_dir worker_id) ],
16 3     3   2032 );
  3         4624  
17              
18             sub new {
19 3     3 1 23 my $klass = shift;
20 3         12 my %args = @_;
21             die "mandatory parameter:base_dir is missing"
22 3 50       51 unless $args{base_dir};
23             # create base_dir if necessary
24 3 50       42 if (! -e $args{base_dir}) {
25             mkpath $args{base_dir}
26 0 0 0     0 or -e $args{base_dir} or die "failed to create directory:$args{base_dir}:$!";
27             }
28             # build object
29             my $self = bless {
30 21     21   3874 worker_id => sub { $$ },
31 3         24 %args,
32             }, $klass;
33             # remove my status file, just in case
34 3         13 unlink $self->_build_filename();
35            
36 3         49 return $self;
37             }
38              
39             sub DESTROY {
40 3     3   1533 my $self = shift;
41             # if file is open, close and unlink
42 3 50       26 if ($self->{fh}) {
43 3         54 close $self->{fh};
44             # during global destruction we may already have lost this
45 3 100       31 unlink $self->_build_filename() if ($self->{base_dir});
46             }
47             }
48              
49             sub update {
50 3     3 1 429 my ($self, $status) = @_;
51             # open file at the first invocation (tmpfn => lock => rename)
52 3         80 my $id = $self->worker_id->();
53 3 50 33     19 if ($self->{fh} && $self->{id_for_fh} ne $id) {
54             # fork? close but do not unlock
55 0         0 close $self->{fh};
56 0         0 undef $self->{fh};
57             }
58 3 50       27 unless ($self->{fh}) {
59 3         10 my $fn = $self->_build_filename();
60 3 50       430 open my $fh, '>', "$fn.tmp"
61             or die "failed to open file:$fn.tmp:$!";
62 3         70 autoflush $fh 1;
63 3 50       224 flock $fh, LOCK_EX
64             or die "failed to flock file:$fn.tmp:$!";
65 3 50       386 rename "$fn.tmp", $fn
66             or die "failed to rename file:$fn.tmp to $fn:$!";
67 3         12 $self->{fh} = $fh;
68 3         11 $self->{id_for_fh} = $id;
69             }
70             # write to file with size of the status and its checksum
71 3 50       54 seek $self->{fh}, 0, SEEK_SET
72             or die "seek failed:$!";
73 3         8 print {$self->{fh}} (
  3         239  
74             md5($status),
75             pack("N", length $status),
76             $status,
77             );
78             }
79              
80             sub read_all {
81 4     4 1 2005275 my $self = shift;
82 4         32 my %ret;
83             $self->_for_all(
84             sub {
85 9     9   34 my ($id, $fh) = @_;
86             # detect collision using md5
87 9         55 for (1..10) {
88 9 50       144 seek $fh, 0, SEEK_SET
89             or die "seek failed:$!";
90 9         15 my $data = do { local $/; join '', <$fh> };
  9         69  
  9         353  
91             # silently ignore if data is too short
92 9 50       58 return if length($data) < 16 + 4;
93             # parse input
94 9         20 my $md5 = substr($data, 0, 16);
95 9         187 my $size = unpack("N", substr($data, 16, 4));
96 9         60 my $status = substr($data, 20, $size);
97             # compare md5 to detect collision
98             next
99 9 50       81 if md5($status) ne $md5;
100             # have read correct data, save and return
101 9         84 $ret{$id} = $status;
102 9         33 return;
103             }
104             # failed to read data in 10 consecutive attempts, bug?
105 0         0 warn "failed to read status of id:$id, skipping";
106             }
107 4         121 );
108 4         65 \%ret;
109             }
110              
111             sub cleanup {
112 0     0 1 0 my $self = shift;
113 0     0   0 $self->_for_all(sub {});
114             }
115              
116             sub _for_all {
117 4     4   40 my ($self, $cb) = @_;
118 4         1493 my @files = glob "$self->{base_dir}/status_*";
119 4         56 for my $fn (@files) {
120             # obtain id from filename (or else ignore);
121             # skip .tmp to avoid racing with update()
122 11 100       165 $fn =~ m|/status_(.*)(?
123             or next;
124 10         70 my $id = $1;
125             # ignore files removed after glob but before open
126 10 50       531 open my $fh, '+<', $fn
127             or next;
128             # check if the file is still opened by the owner process using flock
129 10 100 100     96 if ($id ne $self->worker_id->() && flock $fh, LOCK_EX | LOCK_NB) {
130             # the owner has died, remove status file
131 1         13 close $fh;
132             unlink $fn
133             or
134             not $!{ENOENT} and
135 1 50 0     196 warn "failed to remove an obsolete scoreboard file:$fn:$!";
136 1         13 next;
137             }
138             # invoke
139 9         46 $cb->($id, $fh);
140             # close
141 9         211 close $fh;
142             }
143             }
144              
145             sub _build_filename {
146 8     8   17 my $self = shift;
147 8         92 return "$self->{base_dir}/status_" . $self->worker_id->();
148             }
149              
150             1;
151             __END__