File Coverage

blib/lib/App/Alice/Stream.pm
Criterion Covered Total %
statement 18 73 24.6
branch 0 12 0.0
condition 0 6 0.0
subroutine 6 21 28.5
pod 1 10 10.0
total 25 122 20.4


line stmt bran cond sub pod time code
1             package App::Alice::Stream;
2              
3 4     4   3018 use JSON;
  4         37020  
  4         28  
4 4     4   4532 use Time::HiRes qw/time/;
  4         7740  
  4         20  
5 4     4   730 use Try::Tiny;
  4         8  
  4         251  
6 4     4   23 use Any::Moose;
  4         8  
  4         46  
7              
8 4     4   2316 use strict;
  4         9  
  4         110  
9 4     4   22 use warnings;
  4         15  
  4         3339  
10              
11             has queue => (
12             is => 'rw',
13             isa => 'ArrayRef[HashRef]',
14             default => sub { [] },
15             );
16              
17 0     0 0   sub clear_queue {$_[0]->queue([])}
18 0     0 0   sub enqueue {push @{shift->queue}, @_}
  0            
19 0     0 0   sub queue_empty {return @{$_[0]->queue} == 0}
  0            
20              
21             has [qw/offset last_send start_time/]=> (
22             is => 'rw',
23             isa => 'Num',
24             default => 0,
25             );
26              
27             has [qw/delayed started closed/] => (
28             is => 'rw',
29             isa => 'Bool',
30             default => 0,
31             );
32              
33             has 'seperator' => (
34             is => 'ro',
35             isa => 'Str',
36             default => 'xalicex',
37             );
38              
39             has 'timer' => (
40             is => 'rw',
41             );
42              
43             has 'writer' => (
44             is => 'rw',
45             required => 1,
46             );
47              
48             has min_bytes => (
49             is => 'ro',
50             default => 1024,
51             );
52              
53             sub BUILD {
54 0     0 1   my $self = shift;
55 0           my $local_time = time;
56 0   0       my $remote_time = $self->start_time || $local_time;
57 0           $self->offset($local_time - $remote_time);
58 0           my $writer = $self->writer->(
59             [200, ['Content-Type' => 'multipart/mixed; boundary='.$self->seperator.'; charset=utf-8']]
60             );
61 0           $self->writer($writer);
62 0           $self->_send;
63             }
64              
65             sub _send {
66 0     0     my $self = shift;
67 0     0     try { $self->send }
68 0     0     catch { $self->close };
  0            
69             }
70              
71             sub send {
72 0     0 0   my ($self, @messages) = @_;
73 0 0         die "Sending on a closed stream" if $self->closed;
74 0 0         $self->enqueue(@messages) if @messages;
75 0 0 0       return if $self->delayed or $self->queue_empty;
76 0 0         if (my $delay = $self->flooded) {
77 0           $self->delay($delay);
78 0           return;
79             }
80 0           $self->writer->write( $self->to_string );
81 0           $self->flush;
82             }
83              
84             sub close {
85 0     0 0   my $self = shift;
86 0     0     try {$self->writer->write($self->to_string)};
  0            
87 0           $self->flush;
88 0           $self->writer->close;
89 0           $self->timer(undef);
90 0           $self->closed(1);
91             }
92              
93             sub flooded {
94 0     0 0   my $self = shift;
95 0           my $diff = time - $self->last_send;
96 0 0         if ($diff < 0.2) {
97 0           return 0.2 - $diff;
98             }
99 0           return 0;
100             }
101              
102             sub delay {
103 0     0 0   my ($self, $delay) = @_;
104 0           $self->delayed(1);
105             $self->timer(AnyEvent->timer(
106             after => $delay,
107             cb => sub {
108 0     0     $self->delayed(0);
109 0           $self->timer(undef);
110 0           $self->_send;
111             },
112 0           ));
113             }
114              
115             sub flush {
116 0     0 0   my $self = shift;
117 0           $self->clear_queue;
118 0           $self->last_send(time);
119             }
120              
121             sub to_string {
122 0     0 0   my $self = shift;
123 0           my $output;
124              
125 0 0         if (! $self->started) {
126 0           $output .= "--".$self->seperator."\n";
127 0           $self->started(1);
128             }
129              
130 0           $output .= to_json({
131             queue => $self->queue,
132             time => time - $self->offset,
133             }, {utf8 => 1});
134              
135 0           $output .= "\n--" . $self->seperator . "\n";
136 0           $output .= " " x ($self->min_bytes - length $output);
137              
138 0           return $output
139             }
140              
141             __PACKAGE__->meta->make_immutable;
142             1;