File Coverage

blib/lib/BrLock.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2             #!/usr/local/bin/perl -w
3              
4             package BrLock;
5              
6             =head1 NAME
7              
8             BrLock - Distributed Lock with minimal messages exchanges
9             over a reliable network.
10              
11             =head1 SYNOPSIS
12              
13             BrLock->new('cfg_file', # configuration file, (see DESCRIPTION).
14             '127.0.0.1', # this node's ip.
15             3001); # port to be bound to this node.
16              
17             BrLock->br_lock();
18             # enter critical region
19             BrLock->br_unlock();
20              
21             =head1 DESCRIPTION
22              
23             BrLock features a distributed lock, using the algorithm
24             Carvalho and Roucariol, On mutual exclusion in computer networks,
25             ACM Communications, Feb83.
26              
27             The algorithm features minimal messages for acquiring the next lock,
28             but with the trade-off of network being reliable enough to ensure that
29             all nodes are alive. In fact, one node won't be able to acquire the
30             next lock if it can't communicate to all other nodes (unless the node
31             which was the last one to acquire the lock).
32              
33             If this is a hard constraint for you, you may want to use IPC::Lock.
34              
35             The list containing all nodes that may apply for this lock must be
36             described in the configuration file passed as parameters to the
37             environment builder (see SYNOPSIS). The file must be as this:
38              
39             0 0
40             0.0.0.0 0
41             x.x.x.x port
42             y.y.y.y port
43             ...
44              
45             The first line must have two zeros, and the second must have the
46             0.0.0.0 ip and the 0 port (deprecated parameters, see TODO). The
47             next lines must have a node ip and a node port. All nodes must use
48             the same configuration file, so a node will read its own
49             parameters.
50              
51             A valid cfg_file for BrLock->new('cfg_file', '127.0.0.1', 3001),
52             for an environment with 3 nodes, is:
53             0 0
54             0.0.0.0 0
55             127.0.0.1 3002
56             127.0.0.1 3001
57             127.0.0.1 3003
58              
59             Note the networking setup will be made by this module. So, after
60             calling Br->new, the node will be listening at 127.0.0.1:3001 in
61             the above example.
62              
63              
64             =head1 TODO
65              
66             =over
67              
68             =item * Accept entire configuration as parameters thus not requiring
69             a configuration file.
70              
71             =item * Switch to PerlOO, so multiple locks can be used at once.
72              
73             =back
74              
75             =head1 AUTHORS
76              
77             Ribamar Santarosa
78              
79             Tarcisio Genaro
80              
81             =head1 SEE ALSO
82              
83            
84              
85             =cut
86              
87              
88              
89              
90 1     1   26290 use IO::Socket;
  1         30531  
  1         4  
91 1     1   1664 use Switch;
  1         57313  
  1         7  
92 1     1   205368 use threads;
  0            
  0            
93             use threads::shared;
94             use XML::Parser;
95             use BrLock::BrXML; # shipped toghether with this.
96             use BrLock::SomePerlFunc; # shipped toghether with this.
97             use warnings;
98             use diagnostics;
99              
100             use strict;
101             use base 'Exporter';
102              
103             our $VERSION = 0.1_00;
104             our @ISA = qw(Exporter);
105             our @EXPORT = qw(br_lock br_unlock br_free);
106              
107              
108             ###
109             # General use constants.
110             ###
111             use constant DEBUG => 0;
112             use constant TRUE => 1;
113             use constant FALSE => 0;
114             # max acceptance size for udp messages. It's not specified anywhere:
115             use constant UDP_MSG_SIZE => 1024;
116             # max random number for unknown tests (see file mutex.txt):
117             use constant TEST_RANDOM_NUMBER => 100000;
118             # if we are not informed what are our ip number:
119             use constant OUR_IP => "127.0.0.1";
120             #use constant OUR_IP => "143.106.73.160";
121             # if we are not informed what are our ip port:
122             use constant OUR_PORT => 3002;
123              
124              
125             ####
126             # Global variables
127             ###
128              
129             our $debug; # to print debug info.
130             our $our_port;
131             our $our_ip;
132             # file where to read config options.
133             our $config_file;
134              
135             # variables to be filled with configuration data.
136             our ( $s, # max waiting time for a requisition.
137             $t, # max resource use time.
138             $our_id, # our site ID.
139             %resource_info, # resource is any site. see the definition of
140             # this hash in the info below.
141             ):shared;
142              
143             # The list of sites is defined as a list of Hashes
144             # ( Site => $u , Port => $v , SiteId => $cur_siteid,
145             # AuthBy => TRUE or FALSE, RepDeferred => TRUE or FALSE )
146             # -- FALSE default for both; $cur_siteid -> current line number
147             # in the config_file (see below) the first line in the file has line
148             # number = -1.
149            
150             our @sites:shared = (); # list of sites in the kidding(see info below).
151             our $osn:shared = 0; # timestamp for our messages.
152              
153             ###
154             # Global protocol variables (we'll go to hell for using globals). Note
155             # this doesn't mean the procotol won't use the other globals. In an
156             # implemented class, these are private values.
157             ###
158              
159             our @baffer:shared = (); # buffer of received messages.
160             our $n_auth:shared = 0; # how many sites gave us auth (to optmized wait).
161             our $hsn:shared = 0; # max known timestamp.
162             our $inside:shared = FALSE; # are we using the resource?
163             our $waiting:shared = FALSE; # are we waiting for the resource?
164             our $br_end; # when set threads finish.
165              
166             ###
167             # Protocol constants.
168             ###
169              
170             use constant BR_REP => 0; # reply message.
171             use constant BR_REQ => 1; # request message.
172              
173              
174             # parse_cfgfile(f):
175             # parse the configuration file f and set the globals:
176             # $s, $t, @sites.
177             # The function assumes being ran only once. Undocumented
178             # behaviour if ran more than once.
179             #
180             # Parameters: the config file name.
181             #
182             # Returns:
183             # 0 -> success
184             # a string cotaining a message error (TODO not that good).
185             #
186             # TODO: not really a general function. this function is
187             # really a br_function as it set up br_ data structures.
188             #
189             sub parse_cfgfile {
190             my $file = $_[0];
191             #TODO: untested change
192             my $line;
193             my $F;
194             return "File $file not found.\n" unless open $F, $file;
195             return "Nothing in file $file.\n"
196             unless (defined $F and $line = <$F>);
197             # first line: $s $t
198             return "Can't parse first line of the config file $file.\n"
199             unless ($line =~ m/([^ ]+)[ ]+([^ ]+)[ ]*$/gi);
200             $s = $1;
201             chomp ($t = $2);
202             # the rest of the file: folks in the kidding.
203             my $cur_siteid = 0;
204             while($line = <$F>){
205             if($line =~ m/([^ ]+)[ ]+([^ ]+)[ ]*$/gi){
206             my ($u, $v);
207             $u = $1;
208             chomp ($v = $2);
209             if ( ($v eq $our_port) and ($u eq $our_ip) ){
210             # we've found our identification.
211             $our_id = $cur_siteid;
212             }
213             else{
214             my $ha = &share({});
215             $ha->{Site} = $u;
216             $ha->{Port} = $v;
217             $ha->{SiteId} = $cur_siteid;
218             $ha->{AuthBy} = FALSE;
219             $ha->{RepDeferred} = FALSE;
220             push @sites, $ha ;
221             }
222             $cur_siteid++;
223             }
224             else {
225             return "Can't parse config file $file.\n";
226             }
227             }
228             close $F;
229             # transferring resource information from @sites into its hash:
230             my $ri = shift @sites;
231             share(%resource_info);
232             %resource_info = ( Site => $ri->{Site},
233             Port => $ri->{Port},
234             SiteId => $ri->{SiteId}
235             );
236             return 0;
237             }
238              
239              
240              
241             ###
242             # Protocol functions. The names of these functions start
243             # with "br_", which recalls "Brazilian", which in turn recalls
244             # other things.
245             ###
246              
247             #sub br_send(msg_type, receiver, osn):
248             # Send the message $msg_type (which must be BR_REP or BR_REQ),
249             # to $receiver, saying that our timestamp is $osn.
250             #
251             # $receiver must be an element of the list @sites (a hash as defined
252             # in the definition of the @sites array).
253             #
254             # If everything was OK (well, we can't know if the package was
255             # received, we assume as OK if we can send it), returns 0. Else, we
256             # return 1.
257             #
258              
259             sub br_send {
260             my ($msg_type, $receiver, $osn) = @_;
261             # Prepare the XML message (Just remeber: No XML parsing here!).
262             $msg_type = "REP" if ($msg_type eq BR_REP);
263             $msg_type = "REQ" if ($msg_type eq BR_REQ);
264             if ($msg_type ne "REQ" and $msg_type ne "REP"){
265             # ops: bad argument passed...
266             print "br_send(): \$msg_type must be either BR_REP ".
267             "or BR_REQ.\n" if $debug;
268             return 1;
269             }
270             my $xml_str = xmlmessage_brpack ($msg_type, $our_id, $osn,
271             choose_integer(TEST_RANDOM_NUMBER) );
272             # Send a TCP pkg w/ the XML message to $receiver's host:port.
273             return 0 if send_tcp_string ($xml_str,
274             $receiver->{Site}, $receiver->{Port});
275             return 1; # problems in send_tcp_string...
276             }
277              
278             # br_xml_to_brdata(xml_str):
279             # converts the xml_str string returning a list ($msg, $j, $k) ---
280             # $msg being one of (BR_REP, BR_REQ). This list is ready to be used
281             # as parameter list to br_functions such br_receiving or br_send.
282             #
283             # Uses Globals:
284             # @sites (read only).
285             #
286             # TODO: verify return values. (undef, again?) / sanity tests.
287             #
288              
289             sub br_xml_to_brdata {
290             my $xml_str = $_[0];
291             my ($type, $site_id, $site_sequence, $random) =
292             xmlparse_brmsg ($xml_str);
293             my ($msg, $j, $k) = 0;
294             # setting $msg...
295             $msg = BR_REP if ($type eq "REP");
296             $msg = BR_REQ if ($type eq "REQ");
297             if ($type ne "REQ" and $type ne "REP"){
298             # ops: xmlparse_brmsg went wrong...
299             print "br_xml_to_brdata(): Message must be either \"REQ\"".
300             " or \"REP\".\n" if $debug;
301             return undef;
302             }
303             # searching for a site $j with $site_id in @sites...
304             foreach my $dummy_var ( @sites ){
305             if ( $dummy_var->{SiteId} eq $site_id){
306             $j = $dummy_var;
307             last;
308             }
309             }
310             if ( $j->{SiteId} ne $site_id){
311             # ops: fail, can't find this site_id in @sites;
312             print "Can't find site_id = [$site_id] in \@sites.\n" if $debug;
313             return undef;
314             }
315             # setting $msg...
316             $k = $site_sequence;
317             # returning...
318             return ($msg, $j, $k);
319             }
320              
321             # sub inside():
322             # returns TRUE if this host is in the moment in the critical region
323             # or FALSE if not.
324             sub inside {
325             return $inside;
326             }
327              
328             # sub waiting():
329             # returns TRUE if this host is in the moment waiting to enter
330             # the critical region; FALSE if not.
331             sub waiting {
332             return $waiting;
333             }
334              
335              
336             # br_n_auth():
337             # returns the number of sites we already got authorization.
338             sub br_n_auth {
339             my $vr = 0;
340             lock @sites;
341             foreach my $j (@sites) {
342             if ($j->{AuthBy}){
343             $vr++;
344             }
345             }
346             return $vr;
347             }
348              
349             # sub br_wanna_resource():
350             # when we find ourselves wondering how the life would be if we had
351             # the resource, we start this function (probably as a new thread).
352             # Note only one instance of this function must be running at any
353             # time, or strange things may happen. I don't know if the
354             # responsability of checking that falls under the implementation of
355             # this function; never count on it.
356             #
357             # No parameters. No return codes.
358             #
359             # TODO: NOTE: this function was split into br_lock() and br_unlock().
360              
361              
362             sub br_lock{
363             $waiting = TRUE;
364             $osn = $hsn + 1;
365             foreach my $j (@sites) {
366             if (not $j->{AuthBy}){
367             br_send (BR_REQ, $j , $osn);
368             print "br_wanna:send(REQ, $j->{Site}:$j->{Port}, $osn)\n"
369             if $debug;
370             }
371             }
372             my $na = br_n_auth();
373             # waiting for all sites to give us auth.
374             while ($na < @sites ){
375             $na = br_n_auth();
376             }
377             $inside = TRUE;
378             $waiting = FALSE;
379             }
380              
381             sub br_unlock{
382             $inside = FALSE;
383             foreach my $j (@sites) {
384             if ($j->{RepDeferred}){
385             $n_auth-- if $j->{AuthBy};
386             $j->{RepDeferred} = ($j->{AuthBy} = FALSE);
387             br_send (BR_REP, $j , $osn);
388             print "br_wanna:send(REP, $j->{Site}:$j->{Port}, $osn)\n"
389             if $debug;
390             }
391             else{
392             print "br_wanna:undeferred $j->{Site}:$j->{Port}.\n"
393             if $debug;
394             }
395             }
396             }
397              
398              
399             #sub br_receiving(msg, j, k):
400             # When $j has sent us a message $msg (which must be BR_REP or BR_REQ),
401             # with timestamp $k, this function must be called to process it. Do
402             # not multithread it; instead, use a buffer to handle the messages
403             # received (the algorithm presumes it process the messages in a fifo).
404              
405             sub br_receiving {
406             my ($msg, $j, $k) = @_;
407             #TODO: untested change
408             my $priority;
409             #debug if we are receiving correctly the parameters. After
410             #tested, we must remove up to the return and let things to happen!
411             print "br_rec:****($msg, $j->{Site}:$j->{Port}, $k)\n" if $debug;
412             $hsn = ( $k > $hsn ? $k : $hsn ) + 1;
413             print "br_rec:k=$k, osn=$osn, hsn=$hsn, n=$n_auth\n" if $debug;
414             switch ($msg){
415             case BR_REQ {
416             $priority =
417             (($k > $osn) or
418             ( ($k==$osn) and ($our_id < $j->{SiteId}) ) );
419             # if we feel we are better than the guy sending message, we
420             # kick out him.
421             if ( $inside or ($waiting and $priority) ){
422             print "br_rec:inside (k=$k)\n" if $debug and $inside;
423             print "br_rec:priority and waiting(k=$k)\n"
424             if $debug and $priority and $waiting;
425             print "br_rec: deferred $j->{Site}:$j->{Port})\n"
426             if $debug;
427             $j->{RepDeferred} = TRUE;
428             #TODO: realy return?
429             return;
430             }
431             # We lose the authorization from the guy and gently give
432             # him the BR_REP beucase we don't have enough priority.
433             #TODO: if ( (not ($inside or $waiting)) or:
434             # this $inside seems to be a tautology.
435             if ( (not ($inside or $waiting)) or
436             ( ($waiting) and
437             (not $priority) and
438             (not $j->{AuthBy})
439             )
440             ) {
441             print "br_rec:(not inside||wait) (k=$k)\n" if $debug
442             and (not ($inside or $waiting));
443             print "br_rec: not \$j->{AuthBy} (k=$k)\n" if $debug
444             and ($inside or $waiting);
445             print "br_rec:send(REP, $j->{Site}:$j->{Port})\n"
446             if $debug;
447             $n_auth-- if $j->{AuthBy};
448             $j->{AuthBy} = FALSE;
449             br_send (BR_REP, $j , $osn);
450             # Shouldn't we ask again REQ once we're waiting?
451             # Nope: if we're waiting and we haven't get auth
452             # yet, we're in his RepDeferred list and the guy
453             # will somehow send us the auth in the future.
454             return;
455             }
456             # We lose the authorization from the guy and gently give
457             # him the BR_REP because he has greatest priority, but we
458             # ask him to give us BR_REP as soon as possible, in order
459             # of us to enter his RepDeferred list.
460             if ( ($waiting) and
461             (not $priority) and
462             ($j->{AuthBy})
463             ) {
464             print "br_rec: \$j->{AuthBy} (k=$k)\n" if $debug ;
465             print "br_rec:send(REP, $j->{Site}:$j->{Port},$osn)\n"
466             if $debug;
467             print "br_rec:send(REQ, $j->{Site}:$j->{Port},$osn)\n"
468             if $debug;
469             $n_auth-- if $j->{AuthBy};
470             $j->{AuthBy} = FALSE;
471             br_send (BR_REP, $j , $osn);
472             br_send (BR_REQ, $j , $osn);
473             return;
474             }
475             }
476             case BR_REP {
477             # huuuuhhuuu... one more auth...
478             $n_auth++ if not $j->{AuthBy};
479             print "br_rec: REP ($j->{Site}:$j->{Port}, $k)\n"
480             if $debug ;
481             $j->{AuthBy} = TRUE;
482             return;
483             }
484             }
485             }
486              
487             # sub br_handle_received(baffer):
488             # this function calls br_receiving() for all elements in the global
489             # buffer @baffer, respecting the order (@baffer is a buffer of raw
490             # XML messages). However, this function doesn't stop if the buffer
491             # is empty: this function will run forever, waiting new messages
492             # in the buffer and calling br_receiving() for these new messages.
493             #
494              
495             # TODO: threads should never run forever, but test if an attribute
496             # saying that the application is over is set.
497              
498             sub br_handle_received {
499             # do:
500             # shift the first element from buffer (loop/next if empty).
501             # parse it.
502             # pass it to br_receiving.
503             # loop.
504             $| = 1;
505             while (not $br_end) {
506             my $xml_str = shift @baffer;
507             #print "($xml_str)\n" if $xml_str;
508             br_receiving (br_xml_to_brdata($xml_str)) if $xml_str;
509             }
510             }
511              
512             # br_listen():
513             # thread that accepts incomming connections, and bufferizes them
514             # into @baffer.
515              
516             # we're a "server", running 'till the end of the times, waiting for
517             # xml messages.
518              
519             # TODO: threads should never run forever, but test if an attribute
520             # saying that the application is over is set.
521              
522             sub br_listen{
523             while (not $br_end) {
524             my ($sock) = @_;
525             # printf "welcome br_listen.\n";
526             my $new_connect = $sock->accept();
527             # printf "newly connected.\n";
528             my $rec_msg = "";
529             while(<$new_connect>){
530             $rec_msg .= $_;
531             }
532             push @baffer, $rec_msg if $rec_msg;
533             $rec_msg = FALSE;
534             }
535             # printf "connect finished.\n";
536             }
537              
538              
539             ###
540             # New.
541             ###
542              
543              
544              
545             ####
546             # Global variables
547             ###
548              
549             sub new{
550             # OO stuff.
551             my $this = shift;
552             my $class = ref($this) || $this;
553             my $self = {};
554             bless $self, $class;
555              
556             # config. vars.
557             $config_file = $_[0];
558             $our_port = $_[1];
559             $our_ip = $_[2];
560              
561             # config. vars.
562             $s = 0;
563             $t = 0;
564             $our_id = 0;
565             %resource_info = ();
566             @sites = ();
567              
568             # protocol vars.
569             $osn = 0;
570             @baffer = ();
571             $n_auth = 0;
572             $hsn = 0;
573             $inside = FALSE;
574             $waiting = FALSE;
575              
576             $br_end = 0;
577              
578             $BrXML::brxml_debug = $debug = DEBUG;
579              
580             parse_cfgfile($config_file);
581              
582             my $sock = new IO::Socket::INET (
583             LocalPort => $our_port,
584             Proto => 'tcp',
585             Listen => 1,
586             Reuse => 1,
587              
588             );
589              
590              
591             # start the thread for handling rec messages.
592             # " pop @baffer, $xml_msg "
593             threads->new(\&br_handle_received);
594              
595             # start the thread to accept connections, to receive
596             # messages "push @baffer, $rec_msg "
597             threads->new(\&br_listen, $sock);
598             }
599              
600             sub br_free{
601             # TODO: find some way to stop threads.
602             # print "br_free(): about to set \$br_end.\n" if $debug;
603             # $br_end = 1;
604             # print "br_free(): \$br_end set.\n" if $debug;
605             }
606              
607             1;
608