File Coverage

blib/lib/Async/Redis/AutoPipeline.pm
Criterion Covered Total %
statement 14 62 22.5
branch 0 12 0.0
condition 0 5 0.0
subroutine 5 13 38.4
pod 0 2 0.0
total 19 94 20.2


line stmt bran cond sub pod time code
1             package Async::Redis::AutoPipeline;
2              
3 68     68   470 use strict;
  68         166  
  68         3219  
4 68     68   375 use warnings;
  68         138  
  68         3601  
5 68     68   1163 use 5.018;
  68         276  
6              
7 68     68   389 use Future;
  68         137  
  68         1880  
8 68     68   342 use Future::IO;
  68         140  
  68         84310  
9              
10             sub new {
11 0     0 0   my ($class, %args) = @_;
12              
13             return bless {
14             redis => $args{redis},
15 0   0       max_depth => $args{max_depth} // 1000,
16             _queue => [],
17             _flush_pending => 0,
18             _flushing => 0,
19             }, $class;
20             }
21              
22             sub command {
23 0     0 0   my ($self, @args) = @_;
24              
25 0           my $future = Future->new;
26 0           push @{$self->{_queue}}, { cmd => \@args, future => $future };
  0            
27              
28             # Schedule flush exactly once per batch
29 0 0         unless ($self->{_flush_pending}) {
30 0           $self->{_flush_pending} = 1;
31 0           $self->_schedule_flush;
32             }
33              
34 0           return $future;
35             }
36              
37             sub _schedule_flush {
38 0     0     my ($self) = @_;
39              
40             # Use event loop's "next tick" mechanism
41             # sleep(0) yields to event loop then immediately returns
42             Future::IO->sleep(0)->on_done(sub {
43 0     0     $self->_do_flush;
44 0           });
45             }
46              
47             sub _do_flush {
48 0     0     my ($self) = @_;
49              
50             # Reentrancy guard
51 0 0         return if $self->{_flushing};
52 0           $self->{_flushing} = 1;
53              
54             # Reset pending flag before flush (allows new commands to queue)
55 0           $self->{_flush_pending} = 0;
56              
57             # Take current queue atomically
58 0           my @batch = splice @{$self->{_queue}};
  0            
59              
60 0 0         if (@batch) {
61             # Respect depth limit
62 0           my $max = $self->{max_depth};
63 0 0         if (@batch > $max) {
64             # Put excess back, schedule another flush
65 0           unshift @{$self->{_queue}}, splice(@batch, $max);
  0            
66 0           $self->{_flush_pending} = 1;
67 0           $self->_schedule_flush;
68             }
69              
70 0           $self->_send_batch(\@batch);
71             }
72              
73 0           $self->{_flushing} = 0;
74             }
75              
76             sub _send_batch {
77 0     0     my ($self, $batch) = @_;
78              
79 0           my $redis = $self->{redis};
80              
81             # Build pipeline commands
82 0           my @commands = map { $_->{cmd} } @$batch;
  0            
83 0           my @futures = map { $_->{future} } @$batch;
  0            
84              
85             # Execute pipeline and distribute results
86             # Keep reference to the pipeline future to prevent it from being lost
87 0           my $pipeline_f = $redis->_execute_pipeline(\@commands);
88              
89             $pipeline_f->on_done(sub {
90 0     0     my ($results) = @_;
91              
92 0           for my $i (0 .. $#$results) {
93 0           my $result = $results->[$i];
94 0           my $future = $futures[$i];
95              
96             # Check if result is an error string
97 0 0 0       if (defined $result && "$result" =~ /^Redis error:/) {
98 0           $future->fail($result);
99             }
100             else {
101 0           $future->done($result);
102             }
103             }
104 0           });
105              
106             $pipeline_f->on_fail(sub {
107 0     0     my ($error) = @_;
108              
109             # Transport failure - fail all futures
110 0           for my $future (@futures) {
111 0 0         $future->fail($error) unless $future->is_ready;
112             }
113 0           });
114              
115             # Retain the future to keep the async operation alive
116 0           $pipeline_f->retain;
117             }
118              
119             1;
120              
121             __END__