File Coverage

blib/lib/Async/Redis/AutoPipeline.pm
Criterion Covered Total %
statement 27 73 36.9
branch 0 10 0.0
condition 2 4 50.0
subroutine 9 16 56.2
pod 0 2 0.0
total 38 105 36.1


line stmt bran cond sub pod time code
1             package Async::Redis::AutoPipeline;
2              
3 91     91   479 use strict;
  91         142  
  91         3079  
4 91     91   328 use warnings;
  91         139  
  91         3489  
5 91     91   1086 use 5.018;
  91         248  
6              
7 91     91   334 use Future;
  91         117  
  91         1536  
8 91     91   277 use Future::AsyncAwait;
  91         131  
  91         414  
9 91     91   3665 use Future::IO;
  91         165  
  91         3663  
10 91     91   469 use Async::Redis::Error::Disconnected;
  91         131  
  91         107091  
11              
12             sub new {
13 1     1 0 16 my ($class, %args) = @_;
14              
15             return bless {
16             redis => $args{redis},
17 1   50     9 max_depth => $args{max_depth} // 1000,
18             _queue => [],
19             _flush_pending => 0,
20             _flushing => 0,
21             }, $class;
22             }
23              
24             sub command {
25 0     0 0 0 my ($self, @args) = @_;
26              
27 0         0 my $future = Future->new;
28 0         0 push @{$self->{_queue}}, { cmd => \@args, future => $future };
  0         0  
29              
30             # Schedule flush exactly once per batch
31 0 0       0 unless ($self->{_flush_pending}) {
32 0         0 $self->{_flush_pending} = 1;
33 0         0 $self->_schedule_flush;
34             }
35              
36 0         0 return $future;
37             }
38              
39             sub _schedule_flush {
40 0     0   0 my ($self) = @_;
41              
42             # Use event loop's "next tick" mechanism
43             # sleep(0) yields to event loop then immediately returns
44             Future::IO->sleep(0)->on_done(sub {
45 0     0   0 $self->_do_flush;
46 0         0 });
47             }
48              
49             sub _do_flush {
50 0     0   0 my ($self) = @_;
51              
52             # Reentrancy guard
53 0 0       0 return if $self->{_flushing};
54 0         0 $self->{_flushing} = 1;
55              
56             # Reset pending flag before flush (allows new commands to queue)
57 0         0 $self->{_flush_pending} = 0;
58              
59             # Take current queue atomically
60 0         0 my @batch = splice @{$self->{_queue}};
  0         0  
61              
62 0 0       0 if (@batch) {
63             # Respect depth limit
64 0         0 my $max = $self->{max_depth};
65 0 0       0 if (@batch > $max) {
66             # Put excess back, schedule another flush
67 0         0 unshift @{$self->{_queue}}, splice(@batch, $max);
  0         0  
68 0         0 $self->{_flush_pending} = 1;
69 0         0 $self->_schedule_flush;
70             }
71              
72 0         0 $self->_send_batch(\@batch);
73             }
74              
75 0         0 $self->{_flushing} = 0;
76             }
77              
78             # Detach and return all queued-but-not-yet-flushed commands. Caller is
79             # responsible for failing their futures. Called by Async::Redis::_reader_fatal
80             # when the connection dies before a scheduled flush.
81             sub _detach_queued {
82 1     1   3 my ($self) = @_;
83 1   50     3 my $queued = $self->{_queue} // [];
84 1         2 $self->{_queue} = [];
85 1         2 $self->{_flush_pending} = 0;
86 1         2 return $queued;
87             }
88              
89             sub _send_batch {
90 0     0     my ($self, $batch) = @_;
91 0           my $redis = $self->{redis};
92              
93 0           my @commands = map { $_->{cmd} } @$batch;
  0            
94 0           my @futures = map { $_->{future} } @$batch;
  0            
95              
96 0     0     my $submit = (async sub {
97 0           my $buffer = '';
98 0           my @deadlines;
99 0           for my $cmd (@commands) {
100 0           $buffer .= $redis->_build_command(@$cmd);
101 0           push @deadlines, $redis->_calculate_deadline(@$cmd);
102             }
103              
104             await $redis->_with_write_gate(sub {
105             return (async sub {
106             if (!$redis->{_socket_live}) {
107             if ($redis->_reconnect_enabled) {
108             await $redis->_ensure_connected;
109             } else {
110             die Async::Redis::Error::Disconnected->new(
111             message => "Not connected",
112             );
113             }
114             }
115             for my $i (0 .. $#commands) {
116             $redis->_add_inflight(
117             $futures[$i],
118             $commands[$i][0],
119             [ @{$commands[$i]}[1..$#{$commands[$i]}] ],
120             $deadlines[$i],
121             'fail',
122             );
123             }
124             await $redis->_send($buffer);
125             })->();
126 0           });
127              
128 0           $redis->_ensure_reader;
129 0           })->();
130              
131             # Transport failure on submit cascades to every future that wasn't
132             # already failed by _reader_fatal.
133             $submit->on_fail(sub {
134 0     0     my ($err) = @_;
135 0           for my $f (@futures) {
136 0 0         $f->fail($err) unless $f->is_ready;
137             }
138 0           });
139              
140             # Ownership: the client's Future::Selector (_tasks) owns this submit
141             # task. Any caller currently awaiting inside run_until_ready sees a
142             # submit failure propagated via the selector. No ->retain needed.
143 0           $redis->{_tasks}->add(data => 'autopipe-submit', f => $submit);
144             }
145              
146             1;
147              
148             __END__