File Coverage

blib/lib/IPC/Queue/Duplex.pm
Criterion Covered Total %
statement 52 84 61.9
branch 10 30 33.3
condition 2 12 16.6
subroutine 9 11 81.8
pod 5 5 100.0
total 78 142 54.9


line stmt bran cond sub pod time code
1             # -*-cperl-*-
2             #
3             # IPC::Queue::Duplex - Filesystem based request / response queue
4             # Copyright (c) 2017 Ashish Gulhati
5             #
6             # $Id: lib/IPC/Queue/Duplex.pm v1.007 Sun Jun 25 01:43:14 PDT 2017 $
7              
8             package IPC::Queue::Duplex;
9              
10 3     3   44392 use warnings;
  3         6  
  3         93  
11 3     3   14 use strict;
  3         6  
  3         53  
12 3     3   1778 use File::Temp;
  3         54760  
  3         183  
13 3     3   1139 use File::Copy qw(cp);
  3         5100  
  3         140  
14 3     3   1042 use IPC::Queue::Duplex::Job;
  3         8  
  3         79  
15 3     3   21 use Fcntl qw(:flock);
  3         5  
  3         1905  
16              
17             our ( $VERSION ) = '$Revision: 1.007 $' =~ /\s+([\d\.]+)/;
18              
19             sub new {
20 4     4 1 78 my ($class, %args) = @_;
21 4 50 33     298 return unless $args{Dir} and -d $args{Dir};
22 4         38 bless { Dir => $args{Dir} }, $class;
23             }
24              
25             sub add {
26 10     10 1 7449 my ($self, $jobstr) = @_;
27 10 50       63 return unless $jobstr;
28 10         146 my $job = File::Temp->new( DIR => $self->{Dir}, SUFFIX => '.iqd');
29 10         7682 print $job "$jobstr\n";
30             # $job->unlink_on_destroy(0);
31 10         102 $job->close; my $filename = $job->filename; $filename =~ s/\.iqd/.job/;
  10         445  
  10         160  
32 10         49 cp($job->filename,$filename);
33 10         10921 bless { File => $filename }, 'IPC::Queue::Duplex::Job';
34             }
35              
36             sub addfile {
37 0     0 1 0 my ($self, $filename, $jobstr) = @_;
38 0 0 0     0 return unless $filename and $jobstr;
39 0         0 my $filetemp = $filename; $filetemp .= '.iqdtmp';
  0         0  
40 0         0 open JOBFILE, ">$filetemp";
41 0         0 print JOBFILE "$jobstr\n";
42 0         0 close JOBFILE;
43 0         0 rename $filetemp, $filename;
44 0         0 bless { File => $filename }, 'IPC::Queue::Duplex::Job';
45             }
46              
47             sub get {
48 407     407 1 4988 my $self = shift;
49 407         974 my $filework = '';
50 407         1033 while (1) {
51 407         928 my $filename; my $oldestage;
52 407         31540 foreach my $file (glob("$self->{Dir}/*.job")) {
53 10 50 33     96 if( !defined($filename) or -M $file > $oldestage ) {
54 10         36 $filename = $file;
55 10         1465 $oldestage = -M $file;
56             }
57             }
58 407 100       2304 last unless $filename;
59 10 50       334 next unless -e $filename;
60 10         1680 open(my $jobfh, ">>", $filename);
61 10 50       109 close $jobfh, next unless flock($jobfh, LOCK_EX | LOCK_NB);
62 10         40 $filework = $filename; $filework =~ s/\.job/.wrk/;
  10         90  
63 10 50       1564 close $jobfh, last if rename $filename,$filework;
64             }
65 407 100       1398 if ($filework) {
66 10         318 open (JOB, $filework);
67 10         153 my $jobstr = ; chomp $jobstr;
  10         39  
68 10         74 close JOB;
69 10         199 return bless { File => $filework, Request => $jobstr }, 'IPC::Queue::Duplex::Job';
70             }
71 397         1622 return undef;
72             }
73              
74             sub getresponse {
75 0     0 1   my $self = shift;
76 0           my ($filename, $filefin);
77 0           while (1) {
78 0           my $oldestage;
79 0           foreach my $file (glob("$self->{Dir}/*.fin")) {
80 0 0 0       if( !defined($filename) or -M $file > $oldestage ) {
81 0           $filename = $file;
82 0           $oldestage = -M $file;
83             }
84             }
85 0 0         last unless $filename;
86 0 0         next unless -e $filename;
87 0           open(my $jobfh, ">>", $filename);
88 0 0         close $jobfh, next unless flock($jobfh, LOCK_EX | LOCK_NB);
89 0           $filefin = $filename; $filefin =~ s/\.fin/.rsp/;
  0            
90 0 0         close $jobfh, last if rename $filename,$filefin;
91             }
92 0 0         if ($filefin) {
93 0           open (JOBFIN, $filefin);
94 0           my $response = ; chomp $response;
  0            
95 0           close JOBFIN; unlink $filefin;
  0            
96 0           return bless { File => $filename, Response => $response }, 'IPC::Queue::Duplex::Job';
97             }
98 0           return undef;
99             }
100              
101             1; # End of IPC::Queue::Duplex
102              
103             =head1 NAME
104              
105             IPC::Queue::Duplex - Filesystem based request / response queue
106              
107             =head1 VERSION
108              
109             $Revision: 1.007 $
110             $Date: Sun Jun 25 01:43:14 PDT 2017 $
111              
112             =head1 SYNOPSIS
113              
114             (Enqueuer)
115              
116             use IPC::Queue::Duplex;
117              
118             my $client = new IPC::Queue::Duplex (Dir => $dir);
119             my $job = $client->add($jobstr);
120             my $response = $job->response;
121              
122             (Worker)
123              
124             use IPC::Queue::Duplex;
125              
126             my $server = new IPC::Queue::Duplex (Dir => $dir);
127             my $job = $server->get:
128             process_job($job);
129             $job->finish($result);
130              
131             =head1 METHODS
132              
133             =head2 new
134              
135             Creates and returns a new IPC::Queue::Duplex object. Requires one
136             named parameter:
137              
138             =over
139              
140             Dir - The directory that will contain the queue for this object. It's
141             important to use a unique directory per queue,
142              
143             =back
144              
145             =head2 add
146              
147             Adds a job to the queue and returns an IPC::Queue::Duplex::Job
148             object. A single aregument is required, the job request as a string.
149              
150             =head2 get
151              
152             Gets a job from the queue and returns an IPC::Queue::Duplex::Job
153             object. Returns undef if there is no job waiting.
154              
155             =head2 addfile
156              
157             Adds a job to the queue, with an explicitly provided filename, and
158             returns an IPC::Queue::Duplex::Job object. Two arguments are required:
159             the filename to use for the job, and the job request string, in that
160             order.
161              
162             =head2 getresponse
163              
164             Get a response from the queue. Normally a requester would hold on to
165             their job object and get the response by calling the response method
166             on that object. However, a requester can can also handle responses
167             asynchronously and call this method to get the next waiting response
168             instead. Returns an IPC::Queue::Duplex::Job object, or undef if there
169             is no response on the queue.
170              
171             =head1 AUTHOR
172              
173             Ashish Gulhati, C<< >>
174              
175             =head1 BUGS
176              
177             Please report any bugs or feature requests to C, or through
178             the web interface at L. I will be notified, and then you'll
179             automatically be notified of progress on your bug as I make changes.
180              
181             =head1 SUPPORT
182              
183             You can find documentation for this module with the perldoc command.
184              
185             perldoc IPC::Queue::Duplex
186              
187             You can also look for information at:
188              
189             =over 4
190              
191             =item * RT: CPAN's request tracker
192              
193             L
194              
195             =item * AnnoCPAN: Annotated CPAN documentation
196              
197             L
198              
199             =item * CPAN Ratings
200              
201             L
202              
203             =item * Search CPAN
204              
205             L
206              
207             =back
208              
209             =head1 LICENSE AND COPYRIGHT
210              
211             Copyright (c) 2017 Ashish Gulhati.
212              
213             This program is free software; you can redistribute it and/or modify it
214             under the terms of the Artistic License 2.0.
215              
216             See L for the full
217             license terms.