File Coverage

blib/lib/Cassandra/Client/AsyncAnyEvent.pm
Criterion Covered Total %
statement 14 95 14.7
branch 0 22 0.0
condition 0 11 0.0
subroutine 5 23 21.7
pod 0 12 0.0
total 19 163 11.6


line stmt bran cond sub pod time code
1             package Cassandra::Client::AsyncAnyEvent;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::AsyncAnyEvent::VERSION = '0.21';
4 13     13   224 use 5.010;
  13         61  
5 13     13   102 use strict;
  13         26  
  13         410  
6 13     13   62 use warnings;
  13         23  
  13         761  
7              
8 13     13   94 use Time::HiRes qw(CLOCK_MONOTONIC);
  13         29  
  13         173  
9 13     13   1285 use vars qw/@TIMEOUTS/;
  13         31  
  13         20114  
10              
11             sub new {
12 0     0 0   my ($class, %args)= @_;
13              
14 0           my $options= $args{options};
15              
16 0           require AnyEvent;
17              
18             return bless {
19 0   0       timer_granularity => ($options->{timer_granularity} || 0.1),
20             ae_read => {},
21             ae_write => {},
22             ae_timeout => undef,
23             fh_to_obj => {},
24             timeouts => [],
25             }, $class;
26             }
27              
28             sub register {
29 0     0 0   my ($self, $fh, $connection)= @_;
30 0           $self->{fh_to_obj}{$fh}= $connection;
31 0           return;
32             }
33              
34             sub unregister {
35 0     0 0   my ($self, $fh)= @_;
36 0           delete $self->{fh_to_obj}{$fh};
37 0 0 0       if ($self->{timeouts} && grep { $_->[1] == $fh && !$_->[3] } @{$self->{timeouts}}) {
  0 0          
  0            
38 0           warn 'In unregister(): not all timeouts were dismissed!';
39             }
40 0           @{$self->{timeouts}}= grep { $_->[1] != $fh } @{$self->{timeouts}};
  0            
  0            
  0            
41 0 0         undef $self->{ae_timeout} unless @{$self->{timeouts}};
  0            
42 0           return;
43             }
44              
45             sub register_read {
46 0     0 0   my ($self, $fh)= @_;
47 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
48              
49             $self->{ae_read}{$fh}= AnyEvent->io(
50             poll => 'r',
51             fh => $fh,
52             cb => sub {
53 0     0     $connection->can_read;
54             },
55 0           );
56              
57 0           return;
58             }
59              
60             sub register_write {
61 0     0 0   my ($self, $fh)= @_;
62 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
63              
64             $self->{ae_write}{$fh}= AnyEvent->io(
65             poll => 'w',
66             fh => $fh,
67             cb => sub {
68 0     0     $connection->can_write;
69             },
70 0           );
71              
72 0           return;
73             }
74              
75             sub unregister_read {
76 0     0 0   my ($self, $fh)= @_;
77 0           undef $self->{ae_read}{$fh};
78              
79 0           return;
80             }
81              
82             sub unregister_write {
83 0     0 0   my ($self, $fh)= @_;
84 0           undef $self->{ae_write}{$fh};
85              
86 0           return;
87             }
88              
89             sub deadline {
90 0     0 0   my ($self, $fh, $id, $timeout)= @_;
91 0           local *TIMEOUTS= $self->{timeouts};
92              
93 0 0         if (!$self->{ae_timeout}) {
94             $self->{ae_timeout}= AnyEvent->timer(
95             after => $self->{timer_granularity},
96             interval => $self->{timer_granularity},
97 0     0     cb => sub { $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC)) },
98 0           );
99             }
100              
101 0           my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
102 0           my $deadline= $curtime + $timeout;
103 0           my $additem= [ $deadline, $fh, $id, 0 ];
104              
105 0 0 0       if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
106             # Grumble... that's slow
107 0           push @TIMEOUTS, $additem;
108 0           @TIMEOUTS= sort { $a->[0] <=> $b->[0] } @TIMEOUTS;
  0            
109             } else {
110             # Common case
111 0           push @TIMEOUTS, $additem;
112             }
113              
114 0           return \($additem->[3]);
115             }
116              
117             sub handle_timeouts {
118 0     0 0   my ($self, $curtime)= @_;
119              
120 0           local *TIMEOUTS= $self->{timeouts};
121              
122 0           my %triggered_read;
123 0   0       while (@TIMEOUTS && $curtime >= $TIMEOUTS[0][0]) {
124 0           my $item= shift @TIMEOUTS;
125 0 0         if (!$item->[3]) { # If it timed out
126 0           my ($deadline, $fh, $id, $timedout)= @$item;
127 0           my $obj= $self->{fh_to_obj}{$fh};
128 0 0         $obj->can_read unless $triggered_read{$fh}++;
129 0 0         $obj->can_timeout($id) unless $item->[3]; # We may have received an answer...
130             }
131             }
132              
133 0 0         if (!@TIMEOUTS) {
134 0           $self->{ae_timeout}= undef;
135             }
136              
137 0           return;
138             }
139              
140             sub timer {
141 0     0 0   my ($self, $callback, $wait)= @_;
142 0           my $t; $t= AE::timer($wait, 0, sub {
143 0     0     undef $t;
144 0           $callback->();
145 0           });
146             }
147              
148             sub later {
149 0     0 0   my ($self, $callback)= @_;
150 0           &AE::postpone($callback);
151             }
152              
153             # $something->($async->wait(my $w)); my ($error, $result)= $w->();
154             sub wait {
155 0     0 0   my ($self)= @_;
156 0           my $output= \$_[1];
157              
158 0           my $cv= AnyEvent->condvar;
159 0           my @output;
160             my $callback= sub {
161 0     0     @output= @_;
162 0           $cv->send;
163 0           };
164              
165             $$output= sub {
166 0     0     $cv->recv;
167 0           return @output;
168 0           };
169              
170 0           return $callback;
171             }
172              
173             1;
174              
175             __END__