File Coverage

blib/lib/Cassandra/Client/AsyncEV.pm
Criterion Covered Total %
statement 14 100 14.0
branch 0 28 0.0
condition 0 11 0.0
subroutine 5 23 21.7
pod 0 12 0.0
total 19 174 10.9


line stmt bran cond sub pod time code
1             package Cassandra::Client::AsyncEV;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::AsyncEV::VERSION = '0.21';
4 13     13   234 use 5.010;
  13         53  
5 13     13   67 use strict;
  13         26  
  13         311  
6 13     13   66 use warnings;
  13         26  
  13         780  
7              
8 13     13   88 use Time::HiRes qw(CLOCK_MONOTONIC);
  13         26  
  13         202  
9 13     13   1130 use vars qw/@TIMEOUTS/;
  13         54  
  13         18804  
10              
11             sub new {
12 0     0 0   my ($class, %args)= @_;
13              
14 0           my $options= $args{options};
15              
16 0           require EV;
17              
18             return bless {
19 0   0       timer_granularity => ($options->{timer_granularity} || 0.1),
20             ev_read => {},
21             ev_write => {},
22             ev_timeout => undef,
23             fh_to_obj => {},
24             timeouts => [],
25             ev => EV::Loop->new(),
26             }, $class;
27             }
28              
29             sub register {
30 0     0 0   my ($self, $fh, $connection)= @_;
31 0           $self->{fh_to_obj}{$fh}= $connection;
32 0           return;
33             }
34              
35             sub unregister {
36 0     0 0   my ($self, $fh)= @_;
37 0           delete $self->{fh_to_obj}{$fh};
38 0 0 0       if ($self->{timeouts} && grep { $_->[1] == $fh && !$_->[3] } @{$self->{timeouts}}) {
  0 0          
  0            
39 0           warn 'In unregister(): not all timeouts were dismissed!';
40             }
41 0           @{$self->{timeouts}}= grep { $_->[1] != $fh } @{$self->{timeouts}};
  0            
  0            
  0            
42 0 0         undef $self->{ev_timeout} unless @{$self->{timeouts}};
  0            
43 0           return;
44             }
45              
46             sub register_read {
47 0     0 0   my ($self, $fh)= @_;
48 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
49              
50 0     0     $self->{ev_read}{$fh}= $self->{ev}->io( $fh, &EV::READ, sub { $connection->can_read } );
  0            
51 0           return;
52             }
53              
54             sub register_write {
55 0     0 0   my ($self, $fh)= @_;
56 0 0         my $connection= $self->{fh_to_obj}{$fh} or die;
57              
58 0     0     $self->{ev_write}{$fh}= $self->{ev}->io( $fh, &EV::WRITE, sub { $connection->can_write } );
  0            
59 0           return;
60             }
61              
62             sub unregister_read {
63 0     0 0   my ($self, $fh)= @_;
64 0           undef $self->{ev_read}{$fh};
65              
66 0           return;
67             }
68              
69             sub unregister_write {
70 0     0 0   my ($self, $fh)= @_;
71 0           undef $self->{ev_write}{$fh};
72              
73 0           return;
74             }
75              
76             sub deadline {
77 0     0 0   my ($self, $fh, $id, $timeout)= @_;
78 0           local *TIMEOUTS= $self->{timeouts};
79              
80 0 0         if (!$self->{ev_timeout}) {
81             $self->{ev_timeout}= $self->{ev}->timer( $self->{timer_granularity}, $self->{timer_granularity}, sub {
82 0     0     $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC));
83 0           } );
84             }
85              
86 0           my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
87 0           my $deadline= $curtime + $timeout;
88 0           my $additem= [ $deadline, $fh, $id, 0 ];
89              
90 0 0 0       if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
91             # Grumble... that's slow
92 0           push @TIMEOUTS, $additem;
93 0           @TIMEOUTS= sort { $a->[0] <=> $b->[0] } @TIMEOUTS;
  0            
94             } else {
95             # Common case
96 0           push @TIMEOUTS, $additem;
97             }
98              
99 0           return \($additem->[3]);
100             }
101              
102             sub handle_timeouts {
103 0     0 0   my ($self, $curtime)= @_;
104              
105 0           local *TIMEOUTS= $self->{timeouts};
106              
107 0           my %triggered_read;
108 0   0       while (@TIMEOUTS && $curtime >= $TIMEOUTS[0][0]) {
109 0           my $item= shift @TIMEOUTS;
110 0 0         if (!$item->[3]) { # If it timed out
111 0           my ($deadline, $fh, $id, $timedout)= @$item;
112 0           my $obj= $self->{fh_to_obj}{$fh};
113 0 0         $obj->can_read unless $triggered_read{$fh}++;
114 0 0         $obj->can_timeout($id) unless $item->[3]; # We may have received an answer...
115             }
116             }
117              
118 0 0         if (!@TIMEOUTS) {
119 0           $self->{ev_timeout}= undef;
120             }
121              
122 0           return;
123             }
124              
125             sub timer {
126 0     0 0   my ($self, $callback, $wait)= @_;
127 0           my $t; $t= $self->{ev}->timer($wait, 0, sub {
128 0     0     undef $t;
129 0           $callback->();
130 0           });
131             }
132              
133             sub later {
134 0     0 0   my ($self, $callback)= @_;
135 0           $self->timer($callback, 0);
136             }
137              
138             # $something->($async->wait(my $w)); my ($error, $result)= $w->();
139             sub wait {
140 0     0 0   my ($self)= @_;
141 0           my $output= \$_[1];
142              
143 0           my ($done, $in_run);
144 0           my @output;
145             my $callback= sub {
146 0     0     $done= 1;
147 0           @output= @_;
148 0 0         $self->{ev}->break() if $in_run;
149 0           };
150              
151             $$output= sub {
152 0 0   0     if ($self->{in_wait}) {
153 0           die "Unable to recursively wait for callbacks; are you doing synchronous Cassandra queries from asynchronous callbacks?";
154             }
155 0           local $self->{in_wait}= 1;
156              
157 0           $in_run= 1;
158 0 0         $self->{ev}->run unless $done;
159 0           return @output;
160 0           };
161              
162 0           return $callback;
163             }
164              
165             1;
166              
167             __END__