File Coverage

blib/lib/Parallel/Pvm/Scheduler.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Parallel::Pvm::Scheduler;
2              
3 1     1   21107 use 5.008;
  1         4  
  1         44  
4 1     1   5 use strict;
  1         2  
  1         30  
5 1     1   4 use warnings;
  1         5  
  1         35  
6              
7 1     1   418 use Parallel::Pvm 1.40;
  0            
  0            
8             ;
9             require Exporter;
10              
11             our @ISA = qw(Exporter);
12              
13             # Items to export into callers namespace by default. Note: do not export
14             # names by default without a very good reason. Use EXPORT_OK instead.
15             # Do not simply export all your public functions/methods/constants.
16              
17             # This allows declaration use Parallel::Pvm::Scheduler ':all';
18             # If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
19             # will save memory.
20             our %EXPORT_TAGS = ( 'all' => [ qw(
21            
22             ) ] );
23              
24             our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
25              
26             our @EXPORT = qw(
27            
28             );
29              
30             our $VERSION = '0.01';
31              
32              
33             # Preloaded methods go here.
34              
35              
36             =head1 NAME
37              
38             Parallel::Pvm::Scheduler - Perl extension for distributing jobs through PVM
39              
40             =head1 SYNOPSIS
41              
42             use Parallel::Pvm::Scheduler;
43             my $prm = new Parallel::Pvm::Scheduler();
44              
45             # Create an array of tasks:
46             $tasks[0][0] = "/myhome/program1";
47             $tasks[0][1] = "program1 parameter 1";
48             $tasks[0][2] = "program1 parameter 2";
49              
50             $tasks[1][0] = "/myhome/program2_noparameters";
51              
52             $tasks[2][0] = "/myhome/program1";
53             $tasks[2][1] = "program1 parameter 1";
54            
55             # Submit the tasks
56             for ($i = 0; $i < $taskcount; $i++)
57             {
58             $prm->submit("user-defined task description", $args[$i]);
59             }
60              
61             # Wait for the tasks to complete
62             $prm->recaptureHosts(1);
63              
64              
65             =head1 DESCRIPTION
66              
67             Parallel-Pvm-Scheduler is a module designed to allow one to distribute a large
68             number of independent jobs across a cluster using PVM. It first queries the number of
69             available machines in the cluster, then submits exactly 1 jobs per available machine.
70             When a job is finished, it recaptures the machine and submits the next job. It repeats
71             this until all the jobs are complete.
72              
73             If all the available machines are used, it will wait until 1 becomes free before submitting
74             the next job, hence it will sleep until a task is complete.
75              
76             =head2 EXPORT
77              
78             None by default.
79              
80             =head1 SEE ALSO
81              
82             Parallel::Pvm perl module
83              
84             =head1 AUTHOR
85              
86             Ryan Golhar, Egolharam@umdnj.edu
87              
88             =head1 COPYRIGHT AND LICENSE
89              
90             Copyright 2006 by Ryan Golhar
91              
92             This library is free software; you can redistribute it and/or modify
93             it under the same terms as Perl itself.
94              
95             =cut
96              
97             my $MSGTAG = 1;
98             my $MSGTASKEXIT = 999;
99              
100             =head2 new
101              
102             Title : new
103             Usage : my $prm = new Parallel::Pvm::Scheduler();
104             Function: Creates and initialized a PVM resource manager
105             Returns : Parallel::Pvm::Scheduler
106              
107             =cut
108              
109             sub new {
110             my ($class, @args) = @_;
111             my ($info,@CONF) = Parallel::Pvm::config;
112             my $FREEHOSTS = scalar(@CONF);
113             my %HOSTS;
114             my @TID;
115             my %TIDcmd;
116            
117             # Map host id to hostname and set busy flag to zero
118             foreach my $node (@CONF) {
119             my $hostid = $node->{'hi_tid'};
120              
121             $HOSTS{$hostid}{'name'} = $node->{'hi_name'};
122             $HOSTS{$hostid}{'busy'} = 0;
123             }
124            
125             my $self = {CONF => \@CONF, FREEHOSTS => $FREEHOSTS, HOSTS => \%HOSTS, TID => \@TID, TIDCMD => \%TIDcmd};
126             bless $self;
127             return $self;
128             }
129              
130             sub DESTROY {
131             Parallel::Pvm::exit;
132             }
133              
134             =head2 getHostCount
135              
136             Title : getHostCount
137             Usage : $hostcount = $prm->getHostCount();
138             Function: Returns the number of hosts within the PVM
139             Returns : integer
140              
141             =cut
142              
143             sub getHostCount
144             {
145             my ($self) = @_;
146             my $CONF = $self->{CONF};
147            
148             return scalar(@$CONF);
149             }
150              
151             =head2 getFreeHostCount
152              
153             Title : getFreeHostCount
154             Usage : $freehostcount = $prm->getFreeHostCount();
155             Function: Returns the number of free hosts within the PVM
156             Returns : integer
157              
158             =cut
159              
160             sub getFreeHostCount
161             {
162             my ($self) = @_;
163             my $FREEHOSTS = $self->{FREEHOSTS};
164              
165             return $FREEHOSTS;
166             }
167              
168             =head2 submit
169              
170             Title : submit
171             Usage : $prm->getFreeHostCount($taskdesc, @argv);
172             Function: Submits a task contained with an argument vector
173             Args : $taskdesc is a taskdescription. It can be anything set by the user
174             @argv is an argument vector. The first element is the program to execute
175             the remaining elements are the parameters to the program.
176             Notes : To execute multiple programs in serial for a single task, make a perl script
177             and submit the perl script as the task.
178            
179             =cut
180              
181             sub submit {
182             my ($self, $text, @pvmargv) = @_;
183            
184             my $TID_ref = $self->{TID};
185             my $TIDcmd_ref = $self->{TIDCMD};
186            
187             # If there are no free hosts, iterate through the taskids and
188             # determine which ones finished and free those hosts
189             while ($self->{FREEHOSTS} == 0) {
190             recaptureHosts($self);
191              
192             # After checking and there still aren't any free hosts,
193             # sleep for 3 seconds, then try again
194             if ($self->{FREEHOSTS} == 0) {
195             sleep(3);
196             }
197             }
198            
199             # Find a free host and spawn on that host
200             my $host = _allocateHost($self);
201             my $info = Parallel::Pvm::spawn('pvmit', 1, PvmTaskHost, $host, \@pvmargv);
202             if ($info <= 0) {
203             print STDERR "Error: Could not spawn child: $info, @pvmargv\n";
204             _deallocateHost($host);
205             }
206             push @$TID_ref, $info;
207              
208             my $cmds = join(' ', @pvmargv);
209             $TIDcmd_ref->{$info} = "$text\n$cmds";
210              
211             print STDERR "Started $info on $host\n";
212             }
213              
214             =head2 recaptureHosts
215              
216             Title : recaptureHosts
217             Usage : $prm->recaptureHosts($block);
218             Function: Checks tasks for completion and prints their output
219             Args : $block
220             = 0, then iterate through all the tasks and query for their completion.
221             if done, free the machine
222             = 1, then iterate through all tasks waiting for them to complete. Does not
223             return until the tasks are complete.
224             =cut
225              
226             sub recaptureHosts {
227             # If block == 1, then recapture all hosts and block when waiting
228             # else recapture whatever is available and continue
229             my ($self, $block) = @_;
230              
231             my $HOSTS_ref = $self->{HOSTS};
232             my $TID_ref = $self->{TID};
233             my $TIDcmd_ref = $self->{TIDCMD};
234             my $CONF_ref = $self->{CONF};
235            
236             $block = 0 if (!defined($block));
237            
238             # Iterate through the taskid and check to see if its done
239             # If its done, process and remove it
240             my $arrayindex = 0;
241             my $childtid;
242            
243             while ($arrayindex < scalar(@$TID_ref)) {
244             $childtid = @$TID_ref[$arrayindex];
245            
246             if ($childtid < 0) {
247             print STDERR "Error with PVM Task: $childtid\n";
248             } else {
249             my $bufid = Parallel::Pvm::probe($childtid, $MSGTAG);
250             if ($bufid < 0) {
251             print STDERR "There was an error probing on $childtid: $bufid!\n";
252             } elsif ($bufid > 0) {
253             my $hostid = Parallel::Pvm::tidtohost($childtid);
254             print STDERR "Recieved message from $childtid on ".$HOSTS_ref->{$hostid}{'name'}."\n";
255            
256             $bufid = Parallel::Pvm::recv($childtid, $MSGTAG);
257             my $output = Parallel::Pvm::unpack;
258             print "($childtid) ", $TIDcmd_ref->{$childtid}, "\n";
259             print "$output\n" if (defined($output));
260              
261             _deallocateHost($self, $hostid);
262            
263             # remove this child taskid
264             splice(@$TID_ref, $arrayindex, 1);
265             delete($TIDcmd_ref->{$childtid});
266            
267             # we don't want to increment the arrayindex or else
268             # we'll skip over the next task id.
269             next;
270             } else {
271             # $bufid == 0
272             # wait until this tasks completes
273             if ($block == 1) {
274             sleep(3);
275             next;
276             }
277             }
278             }
279             $arrayindex++;
280             }
281            
282             if ($block == 1) {
283             if ($self->{FREEHOSTS} != scalar(@$CONF_ref)) {
284             print STDERR "There was a problem reconciling freehosts with pvm configuration!!!\n";
285            
286             foreach my $node (@$CONF_ref) {
287             my $hostid = $node->{'hi_tid'};
288             my $hostname = $node->{'hi_name'};
289              
290             if ($HOSTS_ref->{$hostid}{'busy'} != 0) {
291             print STDERR "$hostname is not free\n";
292             }
293             }
294             }
295             }
296             }
297              
298             =head2 _allocateHost
299              
300             Title : _allocateHost
301             Function: Internal Function: Used to allocate which host runs the next task
302             =cut
303              
304             sub _allocateHost {
305             my ($self) = @_;
306             my $HOSTS_ref = $self->{HOSTS};
307            
308             # Locate a free host, allocate it, return it.
309             # First available algorithm
310             foreach my $hostid (keys %$HOSTS_ref) {
311            
312             if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
313             my $hostname = $HOSTS_ref->{$hostid}{'name'};
314            
315             print STDERR "Allocating host $hostname\n";
316            
317             $HOSTS_ref->{$hostid}{'busy'} = 1;
318             $self->{FREEHOSTS}--;
319            
320             return $hostname;
321             }
322             }
323            
324             # if we get here, no hosts were free.
325             die "Unable to locate free host!\n";
326             }
327              
328             =head2 _deallocateHost
329              
330             Title : _deallocateHost
331             Function: Internal Function: Frees a host making it available for another task
332             =cut
333              
334             sub _deallocateHost {
335             my ($self, $hostid) = @_;
336             my $HOSTS_ref = $self->{HOSTS};
337              
338             if ($HOSTS_ref->{$hostid}{'busy'} == 0) {
339             die "Host not allocated: ". $HOSTS_ref->{$hostid}{'name'} ."!\n";
340             }
341            
342             print STDERR "Deallocating host ". $HOSTS_ref->{$hostid}{'name'} ."\n";
343             $HOSTS_ref->{$hostid}{'busy'} = 0;
344             $self->{FREEHOSTS}++;
345             }
346              
347             1;
348             __END__