File Coverage

blib/lib/jQluster/Server/WebSocket.pm
Criterion Covered Total %
statement 58 68 85.2
branch 7 12 58.3
condition 2 5 40.0
subroutine 14 16 87.5
pod 2 2 100.0
total 83 103 80.5


line stmt bran cond sub pod time code
1             package jQluster::Server::WebSocket;
2 2     2   54105 use strict;
  2         5  
  2         85  
3 2     2   10 use warnings;
  2         4  
  2         70  
4 2     2   12 use base ("Plack::Component");
  2         7  
  2         1907  
5 2     2   39001 use jQluster::Server;
  2         6  
  2         60  
6 2     2   1784 use Plack::App::WebSocket;
  2         665857  
  2         77  
7 2     2   18 use Scalar::Util qw(weaken refaddr);
  2         3  
  2         109  
8 2     2   9 use Try::Tiny;
  2         4  
  2         93  
9 2     2   2026 use JSON qw(decode_json encode_json);
  2         52723  
  2         14  
10              
11             our $VERSION = "0.03";
12              
13             sub new {
14 1     1 1 5698 my ($class, @args) = @_;
15 1         13 my $self = $class->SUPER::new(@args);
16 1   50 0   86 $self->{logger} ||= sub {};
  0         0  
17 1         9 $self->{jqluster} = jQluster::Server->new(
18             logger => $self->{logger}
19             );
20 1         7 $self->{websocket_app} = $self->_create_websocket_app();
21 1         4 return $self;
22             }
23              
24             sub call {
25 3     3 1 187196 my ($self, $env) = @_;
26 3         42 return $self->{websocket_app}->call($env);
27             }
28              
29             sub _log {
30 0     0   0 my ($self, $level, $msg) = @_;
31 0         0 $self->{logger}->($level, $msg);
32             }
33              
34             sub _create_websocket_app {
35 1     1   2 my ($self) = @_;
36 1         5 weaken $self;
37             my $app = Plack::App::WebSocket->new(on_establish => sub {
38 3     3   21241 my ($conn) = @_;
39 3 50       24 return if !$self;
40             $conn->on(message => sub {
41 5         47502 my ($conn, $message_str) = @_;
42 5 50       31 return if !$self;
43             try {
44 5         206 my $message = decode_json($message_str);
45 5 50 33     42 if(!ref($message) || ref($message) ne "HASH") {
46 0         0 die("Message is not a HASH");
47             }
48 5 50       17 if(!defined($message->{message_type})) {
49 0         0 die("message_type is not defined in the message");
50             }
51 5 100       15 if($message->{message_type} eq "register") {
52 3         14 $self->_register($conn, $message);
53             }else {
54 2         17 $self->{jqluster}->distribute($message);
55             }
56             }catch {
57 0         0 my $e = shift;
58 0         0 $self->_log("error", $e);
59             }
60 3         35 });
  5         59  
61             $conn->on(finish => sub {
62 3 50       2918 return if !$self;
63 3         20 $self->{jqluster}->unregister(refaddr($conn));
64 3         543 undef $conn;
65 3         70 });
66 1         13 });
67 1         101 return $app;
68             }
69              
70             sub _register {
71 3     3   7 my ($self, $conn, $message) = @_;
72 3         10 weaken $self;
73             $self->{jqluster}->register(
74             unique_id => refaddr($conn),
75             message => $message,
76             sender => sub {
77 7     7   11 my ($send_message) = @_;
78             try {
79 7         250 $conn->send(encode_json($send_message));
80             }catch {
81 0           my $e = shift;
82 0           return !$self;
83 0           $self->_log("error", "send error: $e");
84             }
85 7         65 }
86 3         43 );
87             }
88              
89              
90             1;
91             __END__