File Coverage

blib/lib/Async/Redis/AutoPipeline.pm
Criterion Covered Total %
statement 17 65 26.1
branch 0 12 0.0
condition 0 5 0.0
subroutine 6 14 42.8
pod 0 2 0.0
total 23 98 23.4


line stmt bran cond sub pod time code
1             package Async::Redis::AutoPipeline;
2              
3 73     73   513 use strict;
  73         173  
  73         3549  
4 73     73   451 use warnings;
  73         159  
  73         3855  
5 73     73   1303 use 5.018;
  73         270  
6              
7 73     73   401 use Future;
  73         143  
  73         1979  
8 73     73   378 use Future::IO;
  73         137  
  73         4649  
9 73     73   490 use Scalar::Util qw(blessed);
  73         142  
  73         91531  
10              
11             sub new {
12 0     0 0   my ($class, %args) = @_;
13              
14             return bless {
15             redis => $args{redis},
16 0   0       max_depth => $args{max_depth} // 1000,
17             _queue => [],
18             _flush_pending => 0,
19             _flushing => 0,
20             }, $class;
21             }
22              
23             sub command {
24 0     0 0   my ($self, @args) = @_;
25              
26 0           my $future = Future->new;
27 0           push @{$self->{_queue}}, { cmd => \@args, future => $future };
  0            
28              
29             # Schedule flush exactly once per batch
30 0 0         unless ($self->{_flush_pending}) {
31 0           $self->{_flush_pending} = 1;
32 0           $self->_schedule_flush;
33             }
34              
35 0           return $future;
36             }
37              
38             sub _schedule_flush {
39 0     0     my ($self) = @_;
40              
41             # Use event loop's "next tick" mechanism
42             # sleep(0) yields to event loop then immediately returns
43             Future::IO->sleep(0)->on_done(sub {
44 0     0     $self->_do_flush;
45 0           });
46             }
47              
48             sub _do_flush {
49 0     0     my ($self) = @_;
50              
51             # Reentrancy guard
52 0 0         return if $self->{_flushing};
53 0           $self->{_flushing} = 1;
54              
55             # Reset pending flag before flush (allows new commands to queue)
56 0           $self->{_flush_pending} = 0;
57              
58             # Take current queue atomically
59 0           my @batch = splice @{$self->{_queue}};
  0            
60              
61 0 0         if (@batch) {
62             # Respect depth limit
63 0           my $max = $self->{max_depth};
64 0 0         if (@batch > $max) {
65             # Put excess back, schedule another flush
66 0           unshift @{$self->{_queue}}, splice(@batch, $max);
  0            
67 0           $self->{_flush_pending} = 1;
68 0           $self->_schedule_flush;
69             }
70              
71 0           $self->_send_batch(\@batch);
72             }
73              
74 0           $self->{_flushing} = 0;
75             }
76              
77             sub _send_batch {
78 0     0     my ($self, $batch) = @_;
79              
80 0           my $redis = $self->{redis};
81              
82             # Build pipeline commands
83 0           my @commands = map { $_->{cmd} } @$batch;
  0            
84 0           my @futures = map { $_->{future} } @$batch;
  0            
85              
86             # Execute pipeline and distribute results
87             # Keep reference to the pipeline future to prevent it from being lost
88 0           my $pipeline_f = $redis->_execute_pipeline(\@commands);
89              
90             $pipeline_f->on_done(sub {
91 0     0     my ($results) = @_;
92              
93 0           for my $i (0 .. $#$results) {
94 0           my $result = $results->[$i];
95 0           my $future = $futures[$i];
96              
97             # Check if result is an error object (from pipeline inline capture)
98 0 0 0       if (blessed($result) && $result->isa('Async::Redis::Error')) {
99 0           $future->fail($result);
100             }
101             else {
102 0           $future->done($result);
103             }
104             }
105 0           });
106              
107             $pipeline_f->on_fail(sub {
108 0     0     my ($error) = @_;
109              
110             # Transport failure - fail all futures
111 0           for my $future (@futures) {
112 0 0         $future->fail($error) unless $future->is_ready;
113             }
114 0           });
115              
116             # Retain the future to keep the async operation alive
117 0           $pipeline_f->retain;
118             }
119              
120             1;
121              
122             __END__