File Coverage

blib/lib/Alvis/Pipeline/Read.pm
Criterion Covered Total %
statement 87 133 65.4
branch 24 70 34.2
condition n/a
subroutine 12 15 80.0
pod 3 3 100.0
total 126 221 57.0


line stmt bran cond sub pod time code
1             # $Id: Read.pm,v 1.12 2006/08/14 17:24:04 mike Exp $
2              
3             package Alvis::Pipeline::Read;
4 5     5   31 use vars qw(@ISA);
  5         10  
  5         311  
5             @ISA = qw(Alvis::Pipeline);
6              
7 5     5   28 use strict;
  5         9  
  5         143  
8 5     5   22 use warnings;
  5         56  
  5         126  
9 5     5   4921 use IO::File;
  5         64627  
  5         766  
10 5     5   6836 use IO::Socket::INET;
  5         100427  
  5         52  
11 5     5   3913 use Fcntl qw(:flock);
  5         34  
  5         8925  
12              
13              
14             sub new {
15 5     5 1 6565 my $class = shift();
16 5         103 my(%opts) = @_;
17              
18 5         40 my $this = bless {}, $class;
19 5 100       143 $this->{spooldir} = delete $opts{spooldir}
20             or die "new($class) with no spooldir";
21 4 100       46 $this->{port} = delete $opts{port}
22             or die "new($class) with no port";
23              
24 3         175 $this->_setopts(sleep => 10, %opts);
25              
26             # Asynchronicity: server process accepts pushes and stores them
27 3         38 $this->log(1, "forking");
28 3         3032 my $pid = fork();
29 3 50       191 die "couldn't fork: $!" if !defined $pid;
30 3 50       153 if ($pid == 0) {
31             # Child process
32 0         0 $this->_start_server();
33 0         0 die "_start_server() returned! It should never do that";
34             }
35              
36             # Automatic reaper prevents the child going zombie when we kill
37             # it. (Yes, "IGNORE" has a special-case meaning for SIGCHLD.)
38 3         266 $SIG{CHLD} = 'IGNORE';
39              
40 3         61 $this->{pid} = $pid;
41 3         351 $this->log(1, "parent $$ spawned harvesting child, pid=$pid");
42 3         126 return $this;
43             }
44              
45              
46             sub read {
47 18     18 1 17816 my $this = shift();
48 18         33 my($block) = @_;
49              
50 18         89 $this->log(2, "parent reading from spooldir");
51 18         58 my $dir = $this->{spooldir};
52 18         122 my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
53 18         76 while ($lastread == $lastwrite) {
54 5         17 $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
55 5 100       103 return undef if !$block;
56 3         10 $this->log(2, "no document yet, sleeping");
57 3         96 sleep $this->option("sleep");
58 3         129 ($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
59             }
60              
61 16         39 $lastread++;
62 16         44 my $filename = "$dir/$lastread";
63 16 50       111 my $f2 = new IO::File("<$filename")
64             or die "can't read file '$filename': $!";
65 16         1414 binmode $f2, ":utf8";
66 16         358 my $doc = join("", <$f2>);
67 16         77 $f2->close();
68 16         3062 unlink($filename);
69              
70 16         72 $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
71 16         112 return $doc;
72             }
73              
74              
75             sub close {
76 3     3 1 3959 my $this = shift();
77              
78             # We need to kill the child process that is running the OAI server
79             # so that its Internet port is cleared for subsequent invocations.
80             # Also so that the parent can exit cleanly.
81 3         15 my $pid = $this->{pid};
82              
83 3         132 kill 15, $pid;
84 3         3000575 sleep 1;
85              
86 3 50       73 if (kill 0, $pid) {
87 0         0 warn "kill -15 failed; killing $pid with rude signal 9";
88 0         0 kill 9, $pid;
89 0         0 sleep 1;
90             }
91              
92 3 50       70 if (kill 0, $pid) {
93 0         0 die "can't kill child server with pid $pid";
94             }
95             }
96              
97              
98             sub _start_server {
99 0     0   0 my $this = shift();
100              
101 0         0 $this->log(1, "opening listener on port ", $this->{port});
102 0 0       0 my $listener = new IO::Socket::INET(Listen => 1,
103             LocalPort => $this->{port},
104             Proto => "tcp",
105             ReuseAddr => 1)
106             or die("can't listen on port '" . $this->{port} . "': $!");
107              
108 0         0 while (1) {
109 0         0 $this->log(1, "accepting connection");
110 0 0       0 $this->{socket} = $listener->accept()
111             or die "can't accept connection: $!";
112              
113 0         0 binmode $this->{socket}, ":utf8";
114 0         0 $this->log(1, "started background process, pid $$");
115 0         0 while (1) {
116 0         0 my $doc = $this->_read();
117 0 0       0 last if !defined $doc;
118 0         0 $this->_store_file($doc);
119             }
120             }
121             }
122              
123              
124             sub _read {
125 0     0   0 my $this = shift();
126              
127 0 0       0 my $s = $this->{socket}
128             or die "$this reading from non-existent socket";
129              
130 0         0 my $magic = $s->getline();
131 0 0       0 return undef if !defined $magic;
132 0 0       0 $magic eq "Alvis::Pipeline\n" or die "incorrect magic '$magic'";
133 0 0       0 my $version = $s->getline() or die "can't get protocol version: $!";
134 0 0       0 $version == 1 or die "unsupported protocol version '$version'";
135 0 0       0 my $count = $s->getline() or die "can't get object-length byte-count: $!";
136 0         0 chomp($count);
137 0         0 my $buf;
138 0         0 my $nread = $s->read($buf, $count); ### multiple reads may be necessary
139 0 0       0 die "can't read document: $!" if !defined $nread;
140 0 0       0 die "document was short: $nread of $count bytes" if $nread != $count;
141 0 0       0 my $term = $s->getline() or die "can't get terminator: $!";
142 0 0       0 $term eq "--end--\n" or die "incorrect terminator '$term'";
143              
144 0         0 return $buf;
145             }
146              
147              
148             sub _store_file {
149 0     0   0 my $this = shift();
150 0         0 my($doc) = @_;
151              
152 0         0 $this->log(2, "child writing to spooldir");
153 0         0 my $dir = $this->{spooldir};
154 0         0 my($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
155              
156 0         0 $lastwrite++;
157 0         0 my $filename = "$dir/$lastwrite";
158 0 0       0 my $f2 = new IO::File(">$filename")
159             or die "can't create new file '$filename': $!";
160 0         0 binmode $f2, ":utf8";
161 0 0       0 $f2->print($doc) or die "can't write '$filename': $!";
162 0 0       0 $f2->close() or die "can't close '$filename': $!";
163              
164 0         0 $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
165             }
166              
167              
168             # A sequence file called "seq" is maintained in the spool directory,
169             # and is always locked when read and rewritten. The invariant it
170             # preserves between lock-read-write operations is that it contains two
171             # numbers, space-serarate, followed by a newline. The first number is
172             # that of the last document read from the spool directory. The second
173             # number is that of the last document written to the spool directory,
174             # or zero if no document has yet been written. If the two numbers are
175             # equal, there are no documents available to be read.
176             #
177             # _lock_and_read() and _write_and_unlock() together implement safe
178             # maintenance of the sequence file. The former returns a filehandle,
179             # locked; it is the caller's responsibility to unlock the returned
180             # filehandle using _write_and_unlock(), like this:
181             # ($fh, $lastread, $lastwrite) = $this->_lock_and_read($dir);
182             # # Do some stuff
183             # $this->_write_and_unlock($dir, $fh, $lastread, $lastwrite);
184             #
185             sub _lock_and_read {
186 21     21   102 my $this = shift();
187 21         42 my($dir) = @_;
188              
189 21         61 my $seqfile = "$dir/seq";
190 21         43 my $fh;
191 21 100       1137 if (! -d $dir) {
192 2 50       307 mkdir($dir, 0777)
193             or die "can't create directory '$dir': $!";
194 2 50       111 my $f = new IO::File(">$seqfile")
195             or die "can't create initial '$seqfile': $!";
196 2         850 $f->close();
197             }
198              
199 21 50       265 $fh = new IO::File("+<$seqfile")
200             or die "can't read '$seqfile': $!";
201              
202 21 50       7436 flock($fh, LOCK_EX) or die "can't lock '$seqfile': $!";
203 21 50       168 seek($fh, 0, SEEK_SET) or die "can't seek to start of '$seqfile': $!";
204 21         54 my($lastread, $lastwrite);
205 21         1197 my $line = $fh->getline();
206 21 100       2790 if (defined $line) {
207 19         164 ($lastread, $lastwrite) = ($line =~ /(\d+) (\d+)/);
208             } else {
209             # File is empty: must have just been created
210 2         11 $lastread = $lastwrite = 0;
211             }
212              
213 21         152 $this->log(3, "got lastread='$lastread', lastwrite='$lastwrite'");
214 21         124 return ($fh, $lastread, $lastwrite);
215             }
216              
217              
218             sub _write_and_unlock {
219 21     21   37 my $this = shift();
220 21         47 my($dir, $fh, $lastread, $lastwrite) = @_;
221              
222 21         1254 my $seqfile = "$dir/seq";
223 5     5   38 use Carp;
  5         19  
  5         1252  
224 21 50       167 seek($fh, 0, SEEK_SET) or confess "can't seek to start of '$seqfile': $!";
225 21 50       141 $fh->print("$lastread $lastwrite\n") or die "can't rewrite '$seqfile': $!";
226 21 50       641 flock($fh, LOCK_UN) or die "can't unlock '$seqfile': $!";
227 21 50       71 $fh->close() or die "Truly unbelievable";
228 21         511 $this->log(3, "put lastread='$lastread', lastwrite='$lastwrite'");
229             }
230              
231              
232             # Test harness follows
233             # my $p = bless {
234             # spooldir => "/tmp/ap",
235             # }, "Alvis::Pipeline::Read";
236             #
237             # if (@ARGV) {
238             # $p->_store_file(join("", @ARGV));
239             # } else {
240             # my $doc = $p->read();
241             # die "no document queued" if !defined $doc;
242             # print $doc;
243             # }
244              
245             1;