File Coverage

blib/lib/jQluster/Server.pm
Criterion Covered Total %
statement 55 63 87.3
branch 12 18 66.6
condition 3 5 60.0
subroutine 12 13 92.3
pod 4 4 100.0
total 86 103 83.5


line stmt bran cond sub pod time code
1             package jQluster::Server;
2 3     3   28241 use 5.10.0;
  3         13  
  3         130  
3 3     3   15 use strict;
  3         5  
  3         84  
4 3     3   15 use warnings;
  3         6  
  3         86  
5 3     3   16 use Carp;
  3         6  
  3         291  
6 3     3   2454 use Data::UUID;
  3         3817  
  3         3895  
7              
8             our $VERSION = "0.03";
9              
10             sub new {
11 9     9 1 19575 my ($class, %args) = @_;
12             my $self = bless {
13             id_generator => Data::UUID->new,
14             registry => {},
15             uids_for_remote_node_id => {},
16 0     0   0 logger => $args{logger} // sub { },
17 9   50     1160 }, $class;
18 9         64 return $self;
19             }
20              
21             sub _generate_message_id {
22 16     16   25 my ($self) = @_;
23 16         2563 return $self->{id_generator}->create_str();
24             }
25              
26             sub _log {
27 19     19   35 my ($self, $level, $msg) = @_;
28 19         65 $self->{logger}->($level, $msg);
29             }
30              
31             sub register {
32 15     15 1 1034 my ($self, %args) = @_;
33 15         31 foreach my $key (qw(unique_id message sender)) {
34 45 50       134 croak "$key parameter is mandatory" if not defined $args{$key};
35             }
36 15         35 foreach my $msg_key (qw(message_id from message_type)) {
37 45 50       129 if(!defined($args{message}{$msg_key})) {
38 0         0 croak "The register message does not have $msg_key field. Something is wrong.";
39             }
40             }
41 15 50       48 if($args{message}{message_type} ne "register") {
42 0         0 croak "Message type is $args{message}{message_type}, not 'register'. Something is wrong.";
43             }
44 15         65 my %reg_entry = (
45             unique_id => $args{unique_id},
46             sender => $args{sender},
47             remote_node_id => $args{message}{from}
48             );
49 15 100       57 if(exists $self->{registry}{$reg_entry{unique_id}}) {
50 1         27 croak "Duplicate registration for unique ID: $reg_entry{unique_id}";
51             }
52 14         41 $self->{registry}{$reg_entry{unique_id}} = \%reg_entry;
53 14         49 $self->{uids_for_remote_node_id}{$reg_entry{remote_node_id}}{$reg_entry{unique_id}} = 1;
54 14         64 $self->_log(info => "Accept registration: unique_id = $reg_entry{unique_id}, remote_node_id = $reg_entry{remote_node_id}");
55              
56 14         862 $self->distribute({
57             message_id => $self->_generate_message_id(),
58             message_type => "register_reply",
59             from => undef, to => $reg_entry{remote_node_id},
60             body => { error => undef, in_reply_to => $args{message}{message_id} }
61             });
62             }
63              
64             sub unregister {
65 6     6 1 57 my ($self, $unique_id) = @_;
66 6         17 my $entry = delete $self->{registry}{$unique_id};
67 6 100       20 return if !defined($entry);
68 5         17 delete $self->{uids_for_remote_node_id}{$entry->{remote_node_id}}{$entry->{unique_id}};
69 5         24 $self->_log(info => "Unregister: unique_id = $unique_id, remote_node_id = $entry->{remote_node_id}");
70             }
71              
72             my %REPLY_MESSAGE_TYPE_FOR = (
73             select_and_get => "select_and_get_reply",
74             select_and_listen => "select_and_listen_reply"
75             );
76              
77             sub _try_reply_error_to {
78 2     2   4 my ($self, $orig_message, $error) = @_;
79 2         5 my $reply_message_type = $REPLY_MESSAGE_TYPE_FOR{$orig_message->{message_type}};
80 2 50       7 if(!defined($reply_message_type)) {
81 0         0 $self->_log("error", "Unknown message type: $orig_message->{message_type}: cannot reply to it.");
82 0         0 return;
83             }
84             $self->distribute({
85 2         5 message_id => $self->_generate_message_id(),
86             message_type => $reply_message_type,
87             from => undef, to => $orig_message->{from},
88             body => { error => $error, in_reply_to => $orig_message->{message_id} }
89             });
90             }
91              
92             sub distribute {
93 23     23 1 135 my ($self, $message) = @_;
94 23         65 my $to = $message->{to};
95 23 50       54 if(!defined($to)) {
96 0         0 return;
97             }
98 23         79 my $uid_map = $self->{uids_for_remote_node_id}{$to};
99 23 100 66     161 if(!defined($uid_map) || !%$uid_map) {
100 2         9 $self->_try_reply_error_to($message, "Target remote node ($to) does not exist.");
101 2         13 return;
102             }
103 21         71 foreach my $uid (keys %$uid_map) {
104 26         344 my $entry = $self->{registry}{$uid};
105 26 50       53 if(!defined($entry)) {
106 0         0 $self->_log("error", "UID registry has a key for $uid, but it does not map to an entry object. Something is wrong.");
107 0         0 next;
108             }
109 26         71 $entry->{sender}->($message);
110             }
111             }
112              
113             1;
114              
115             __END__