File Coverage

blib/lib/Cassandra/Client/AsyncAnyEvent.pm
Criterion Covered Total %
statement 14 93 15.0
branch 0 22 0.0
condition 0 11 0.0
subroutine 5 22 22.7
pod 0 11 0.0
total 19 159 11.9


line stmt bran cond sub pod time code
1             package Cassandra::Client::AsyncAnyEvent;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::AsyncAnyEvent::VERSION = '0.13_004'; # TRIAL
4              
5 1     1   13 $Cassandra::Client::AsyncAnyEvent::VERSION = '0.13004';use 5.010;
  1         3  
6 1     1   4 use strict;
  1         2  
  1         15  
7 1     1   4 use warnings;
  1         1  
  1         22  
8              
9 1     1   315 use Time::HiRes qw(CLOCK_MONOTONIC);
  1         1263  
  1         6  
10 1     1   238 use vars qw/@TIMEOUTS/;
  1         3  
  1         1002  
11              
12             sub new {
13 0     0 0   my ($class, %args)= @_;
14              
15 0           my $options= $args{options};
16              
17 0           require AnyEvent;
18              
19             return bless {
20 0   0       timer_granularity => ($options->{timer_granularity} || 0.1),
21             ae_read => {},
22             ae_write => {},
23             ae_timeout => undef,
24             fh_to_obj => {},
25             timeouts => [],
26             }, $class;
27             }
28              
29             sub register {
30 0     0 0   my ($self, $fh, $connection)= @_;
31 0           $self->{fh_to_obj}{$fh}= $connection;
32 0           return;
33             }
34              
35             sub unregister {
36 0     0 0   my ($self, $fh)= @_;
37 0           delete $self->{fh_to_obj}{$fh};
38 0 0 0       if ($self->{timeouts} && grep { $_->[1] == $fh && !$_->[3] } @{$self->{timeouts}}) {
  0 0          
  0            
39 0           warn 'In unregister(): not all timeouts were dismissed!';
40             }
41 0           @{$self->{timeouts}}= grep { $_->[1] != $fh } @{$self->{timeouts}};
  0            
  0            
  0            
42 0 0         undef $self->{ae_timeout} unless @{$self->{timeouts}};
  0            
43 0           return;
44             }
45              
46             sub register_read {
47 0     0 0   my ($self, $fh)= @_;
48 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
49              
50             $self->{ae_read}{$fh}= AnyEvent->io(
51             poll => 'r',
52             fh => $fh,
53             cb => sub {
54 0     0     $connection->can_read;
55             },
56 0           );
57              
58 0           return;
59             }
60              
61             sub register_write {
62 0     0 0   my ($self, $fh)= @_;
63 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
64              
65             $self->{ae_write}{$fh}= AnyEvent->io(
66             poll => 'w',
67             fh => $fh,
68             cb => sub {
69 0     0     $connection->can_write;
70             },
71 0           );
72              
73 0           return;
74             }
75              
76             sub unregister_read {
77 0     0 0   my ($self, $fh)= @_;
78 0           undef $self->{ae_read}{$fh};
79              
80 0           return;
81             }
82              
83             sub unregister_write {
84 0     0 0   my ($self, $fh)= @_;
85 0           undef $self->{ae_write}{$fh};
86              
87 0           return;
88             }
89              
90             sub deadline {
91 0     0 0   my ($self, $fh, $id, $timeout)= @_;
92 0           local *TIMEOUTS= $self->{timeouts};
93              
94 0 0         if (!$self->{ae_timeout}) {
95             $self->{ae_timeout}= AnyEvent->timer(
96             after => $self->{timer_granularity},
97             interval => $self->{timer_granularity},
98 0     0     cb => sub { $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC)) },
99 0           );
100             }
101              
102 0           my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
103 0           my $deadline= $curtime + $timeout;
104 0           my $additem= [ $deadline, $fh, $id, 0 ];
105              
106 0 0 0       if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
107             # Grumble... that's slow
108 0           push @TIMEOUTS, $additem;
109 0           @TIMEOUTS= sort { $a->[0] <=> $b->[0] } @TIMEOUTS;
  0            
110             } else {
111             # Common case
112 0           push @TIMEOUTS, $additem;
113             }
114              
115 0           return \($additem->[3]);
116             }
117              
118             sub handle_timeouts {
119 0     0 0   my ($self, $curtime)= @_;
120              
121 0           local *TIMEOUTS= $self->{timeouts};
122              
123 0           my %triggered_read;
124 0   0       while (@TIMEOUTS && $curtime >= $TIMEOUTS[0][0]) {
125 0           my $item= shift @TIMEOUTS;
126 0 0         if (!$item->[3]) { # If it timed out
127 0           my ($deadline, $fh, $id, $timedout)= @$item;
128 0           my $obj= $self->{fh_to_obj}{$fh};
129 0 0         $obj->can_read unless $triggered_read{$fh}++;
130 0 0         $obj->can_timeout($id) unless $item->[3]; # We may have received an answer...
131             }
132             }
133              
134 0 0         if (!@TIMEOUTS) {
135 0           $self->{ae_timeout}= undef;
136             }
137              
138 0           return;
139             }
140              
141             sub timer {
142 0     0 0   my ($self, $callback, $wait)= @_;
143 0           my $t; $t= AE::timer($wait, 0, sub {
144 0     0     undef $t;
145 0           $callback->();
146 0           });
147             }
148              
149             # $something->($async->wait(my $w)); my ($error, $result)= $w->();
150             sub wait {
151 0     0 0   my ($self)= @_;
152 0           my $output= \$_[1];
153              
154 0           my $cv= AnyEvent->condvar;
155 0           my @output;
156             my $callback= sub {
157 0     0     @output= @_;
158 0           $cv->send;
159 0           };
160              
161             $$output= sub {
162 0     0     $cv->recv;
163 0           return @output;
164 0           };
165              
166 0           return $callback;
167             }
168              
169             1;
170              
171             __END__