File Coverage

blib/lib/Async/Redis/Pipeline.pm
Criterion Covered Total %
statement 11 70 15.7
branch 0 26 0.0
condition 0 5 0.0
subroutine 4 12 33.3
pod 5 6 83.3
total 20 119 16.8


line stmt bran cond sub pod time code
1             package Async::Redis::Pipeline;
2              
3 73     73   569 use strict;
  73         219  
  73         3432  
4 73     73   413 use warnings;
  73         211  
  73         4075  
5 73     73   1236 use 5.018;
  73         277  
6              
7 73     73   433 use Future::AsyncAwait;
  73         161  
  73         547  
8              
9             sub new {
10 0     0 1   my ($class, %args) = @_;
11              
12             return bless {
13             redis => $args{redis},
14             commands => [],
15             executed => 0,
16 0   0       max_depth => $args{max_depth} // 10000,
17             }, $class;
18             }
19              
20             # Queue a command - returns self for chaining
21             sub _queue {
22 0     0     my ($self, $cmd, @args) = @_;
23              
24 0 0         die "Pipeline already executed" if $self->{executed};
25              
26 0 0         if (@{$self->{commands}} >= $self->{max_depth}) {
  0            
27 0           die "Pipeline depth limit ($self->{max_depth}) exceeded";
28             }
29              
30 0           push @{$self->{commands}}, [$cmd, @args];
  0            
31 0           return $self;
32             }
33              
34             # Generate AUTOLOAD to capture any command call
35             our $AUTOLOAD;
36              
37             sub AUTOLOAD {
38 0     0     my $self = shift;
39 0           my $cmd = $AUTOLOAD;
40 0           $cmd =~ s/.*:://;
41 0 0         return if $cmd eq 'DESTROY';
42              
43 0           return $self->_queue(uc($cmd), @_);
44             }
45              
46             # Allow explicit command() calls
47             sub command {
48 0     0 1   my ($self, $cmd, @args) = @_;
49 0           return $self->_queue($cmd, @args);
50             }
51              
52             # Explicit add() for backwards compatibility
53             sub add {
54 0     0 0   my ($self, @cmd) = @_;
55 0           return $self->_queue(@cmd);
56             }
57              
58             # Queue a registered script by name
59             # Usage: $pipe->run_script('name', @keys_then_args)
60             sub run_script {
61 0     0 1   my ($self, $name, @args) = @_;
62              
63 0 0         die "Pipeline already executed" if $self->{executed};
64 0 0         die "Script name required" unless defined $name;
65              
66 0 0         if (@{$self->{commands}} >= $self->{max_depth}) {
  0            
67 0           die "Pipeline depth limit ($self->{max_depth}) exceeded";
68             }
69              
70             # Queue with special marker - resolved at execute time
71 0           push @{$self->{commands}}, ['__SCRIPT__', $name, @args];
  0            
72 0           return $self;
73             }
74              
75 0     0 1   async sub execute {
76 0           my ($self) = @_;
77              
78             # Mark as executed (single-use)
79 0 0         if ($self->{executed}) {
80 0           return [];
81             }
82 0           $self->{executed} = 1;
83              
84 0           my @commands = @{$self->{commands}};
  0            
85 0 0         return [] unless @commands;
86              
87 0           my $redis = $self->{redis};
88              
89             # Resolve __SCRIPT__ entries to EVALSHA commands
90 0           my %scripts_used;
91 0           for my $cmd (@commands) {
92 0 0         if ($cmd->[0] eq '__SCRIPT__') {
93 0           my (undef, $name, @args) = @$cmd;
94              
95 0 0         my $script = $redis->get_script($name)
96             or die "Unknown script '$name' in pipeline - use define_command() first";
97              
98 0           $scripts_used{$name} = $script;
99              
100 0           my $num_keys = $script->num_keys;
101 0 0         if ($num_keys eq 'dynamic') {
102 0           $num_keys = shift @args;
103 0 0         die "Key count required as first argument for dynamic script '$name'"
104             unless defined $num_keys;
105             }
106              
107             # Convert to EVALSHA command
108 0           @$cmd = ('EVALSHA', $script->sha, $num_keys, @args);
109             }
110             }
111              
112             # Preload any scripts used (ensures EVALSHA will work)
113 0           for my $script (values %scripts_used) {
114 0           await $redis->script_load($script->script);
115             }
116              
117             # Apply key prefixing if configured
118 0 0 0       if (defined $redis->{prefix} && $redis->{prefix} ne '') {
119 0           require Async::Redis::KeyExtractor;
120 0           for my $cmd (@commands) {
121 0           my ($name, @args) = @$cmd;
122             @args = Async::Redis::KeyExtractor::apply_prefix(
123 0           $redis->{prefix}, $name, @args
124             );
125 0           @$cmd = ($name, @args);
126             }
127             }
128              
129             # Execute pipeline via Redis connection
130 0           return await $redis->_execute_pipeline(\@commands);
131             }
132              
133 0     0 1   sub count { scalar @{shift->{commands}} }
  0            
134              
135             1;
136              
137             __END__