File Coverage

blib/lib/AnyEvent/Consul/Exec.pm
Criterion Covered Total %
statement 29 101 28.7
branch 0 24 0.0
condition 0 6 0.0
subroutine 10 24 41.6
pod 0 2 0.0
total 39 157 24.8


line stmt bran cond sub pod time code
1             package AnyEvent::Consul::Exec;
2             $AnyEvent::Consul::Exec::VERSION = '0.002';
3             # ABSTRACT: Execute a remote command across a Consul cluster
4              
5 2     2   625947 use 5.020;
  2         7  
6 2     2   10 use warnings;
  2         4  
  2         54  
7 2     2   9 use strict;
  2         4  
  2         40  
8 2     2   524 use experimental qw(postderef);
  2         5293  
  2         11  
9              
10 2     2   863 use Consul 0.021;
  2         392469  
  2         85  
11 2     2   1054 use AnyEvent;
  2         6863  
  2         73  
12 2     2   524 use AnyEvent::Consul;
  2         31986  
  2         69  
13 2     2   25 use JSON::MaybeXS;
  2         5  
  2         148  
14 2     2   958 use Type::Params qw(compile);
  2         25295  
  2         23  
15 2     2   653 use Types::Standard qw(ClassName Dict Str Optional CodeRef ArrayRef Int slurpy);
  2         6  
  2         13  
16              
17             my @callbacks = map { "on_$_" } qw(submit ack output exit done error);
18              
19             sub new {
20             state $check = compile(
21             ClassName,
22             slurpy Dict[
23             command => Str,
24             wait => Optional[Int],
25             node => Optional[Str],
26             service => Optional[Str],
27             tag => Optional[Str],
28             consul_args => Optional[ArrayRef],
29 0     0 0   map { $_ => Optional[CodeRef] } @callbacks,
  0            
30             ],
31             );
32 0           my ($class, $self) = $check->(@_);
33 0   0 0     map { $self->{$_} //= sub {} } @callbacks;
  0            
34 0   0       $self->{wait} //= 2;
35 0   0       $self->{consul_args} //= [];
36 0           return bless $self, $class;
37             }
38              
39             sub _wait_responses {
40 0     0     my ($self, $index) = @_;
41              
42             $self->{_c}->kv->get_all(
43             "_rexec/$self->{_sid}",
44             index => $index,
45             cb => sub {
46 0     0     my ($kv, $meta) = @_;
47 0           my @changed = grep { $_->modify_index > $index } $kv->@*;
  0            
48              
49 0           for my $kv (@changed) {
50 0           my ($key) = $kv->key =~ m{^_rexec/$self->{_sid}/(.+)};
51 0 0         unless ($key) {
52 0           warn "W: consul told us '".$kv->key."' changed, but we aren't interested in it, consul bug?\n";
53 0           next;
54             }
55              
56 0 0         if ($key eq 'job') {
57 0           $self->{on_submit}->();
58 0           next;
59             }
60              
61 0           my ($node, $act, $id) = split '/', $key, 3;
62 0 0         unless ($act) {
63 0           warn "W: malformed rexec response: $key\n";
64             }
65              
66 0 0         if ($act eq 'ack') {
67 0           $self->{_nack}++;
68 0           $self->{on_ack}->($node);
69 0           next;
70             }
71              
72 0 0         if ($act eq 'out') {
73 0           $self->{on_output}->($node, $kv->value);
74 0           next;
75             }
76              
77 0 0         if ($act eq 'exit') {
78 0           $self->{_nexit}++;
79 0           $self->{on_exit}->($node, $kv->value);
80 0 0         if ($self->{_nack} == $self->{_nexit}) {
81             # XXX super naive. there might be some that haven't acked yet
82             # should schedule done for a lil bit in the future
83 0           $self->{_done} = 1;
84 0           $self->_cleanup(sub { $self->{on_done}->() });
  0            
85             }
86 0           next;
87             }
88              
89 0           warn "W: $node: unknown action: $act\n";
90             }
91              
92 0 0         $self->_wait_responses($meta->index) unless $self->{_done};
93             },
94 0           );
95             }
96              
97             sub _fire_event {
98 0     0     my ($self) = @_;
99             my $payload = {
100             Prefix => "_rexec",
101             Session => $self->{_sid},
102 0           };
103             $self->{_c}->event->fire(
104             "_rexec",
105             payload => encode_json($payload),
106             $self->{node} ? (node => $self->{node}) : (),
107             $self->{service} ? (service => $self->{service}) : (),
108             $self->{tag} ? (tag => $self->{tag}) : (),
109 0     0     cb => sub { $self->_wait_responses(0) },
110 0 0         );
    0          
    0          
111             }
112              
113             sub _setup_job {
114 0     0     my ($self) = @_;
115             my $job = {
116             Command => $self->{command},
117 0           Wait => $self->{wait} * 1_000_000_000, # nanoseconds
118             };
119 0     0     $self->{_c}->kv->put("_rexec/$self->{_sid}/job", encode_json($job), cb => sub { $self->_fire_event });
  0            
120             }
121              
122             sub _start_session {
123 0     0     my ($self) = @_;
124             $self->{_c}->session->create(Consul::Session->new(name => "exec", ttl => "10s"), cb => sub {
125 0     0     $self->{_sid} = shift;
126             $self->{_refresh_guard} = AnyEvent->timer(after => "5s", interval => "5s", cb => sub {
127 0           $self->{_c}->session->renew($self->{_sid});
128 0           });
129 0           $self->_setup_job;
130 0           });
131             }
132              
133             sub _cleanup {
134 0     0     my ($self, $cb) = @_;
135 0           delete $self->{_refresh_guard};
136 0 0         if ($self->{_sid}) {
137             $self->{_c}->kv->delete("_rexec/$self->{_sid}", recurse => 1, cb => sub {
138 0     0     delete $self->{_sid};
139 0           delete $self->{_c};
140 0           $cb->();
141 0           });
142             }
143             else {
144 0           delete $self->{_sid};
145 0           delete $self->{_c};
146 0           $cb->();
147             }
148             }
149              
150             sub start {
151 0     0 0   my ($self) = @_;
152             $self->{_c} = AnyEvent::Consul->new($self->{consul_args}->@*, error_cb => sub {
153 0     0     my ($err) = @_;
154 0           $self->_cleanup(sub { $self->{on_error}->($err) });
  0            
155 0           });
156 0           $self->_start_session;
157 0           return;
158             }
159              
160             1;
161              
162             =pod
163              
164             =encoding UTF-8
165              
166             =for markdown [![Build Status](https://secure.travis-ci.org/robn/AnyEvent-Consul-Exec.png)](http://travis-ci.org/robn/AnyEvent-Consul-Exec)
167              
168             =head1 NAME
169              
170             AnyEvent::Consul::Exec - Execute a remote command across a Consul cluster
171              
172             =head1 SYNOPSIS
173              
174             use AnyEvent;
175             use AnyEvent::Consul::Exec;
176            
177             my $cv = AE::cv;
178            
179             my $e = AnyEvent::Consul::Exec->new(
180            
181             # command to run
182             command => 'uptime',
183              
184             # number of seconds target will wait for command, without sending
185             # output, before terminating it
186             wait => 2,
187            
188             # called once job is submitted to Consul
189             on_submit => sub {
190             say "job submitted";
191             },
192            
193             # called as each target node starts to process the job
194             # multiple calls, once per node
195             on_ack => sub {
196             my ($node) = @_;
197             say "$node: ack";
198             },
199            
200             # called when a node has output from the job
201             # can be called zero or more times per node, as more output
202             # becomes available
203             on_output => sub {
204             my ($node, $output) = @_;
205             say "$node: output:";
206             say "$node> $_" for split("\n", $output);
207             },
208            
209             # called when the node completes a job
210             # multiple calls, one per node
211             on_exit => sub {
212             my ($node, $rc) = @_;
213             say "$node: exit: $rc";
214             },
215            
216             # called once all nodes have reported completion
217             # object is unusable past this point
218             on_done => sub {
219             say "job done";
220             $cv->send;
221             },
222            
223             # called if an error occurs anywhere during processing (not command errors)
224             # typically called if Consul is unable to service requests
225             # object is unusable past this point
226             on_error => sub {
227             my ($err) = @_;
228             say "error: $err";
229             $cv->send;
230             },
231             );
232            
233             # begin execution
234             $e->start;
235              
236             $cv->recv;
237              
238             =head1 DESCRIPTION
239              
240             AnyEvent::Consul::Exec is an interface to Consul's "exec" agent function. This
241             is the same thing you get when you run L.
242              
243             C is great, but its output is text-based, making it awkward to
244             parse to determine what happened on each node that ran the command.
245             C replaces the client portion with a library you can
246             use to get info about what is happening on each node as it happens.
247              
248             As the name implies, it expects to be run inside an L event loop.
249              
250             =head1 BASICS
251              
252             Start off by instantiating a C object with the command
253             you want to run:
254              
255             my $e = AnyEvent::Consul::Exec->new(
256             command => 'uptime',
257             );
258              
259             Then call C to kick it off:
260              
261             $e->start;
262              
263             As the C event loop progresses, the command will be executed on
264             remote nodes. Output and results of that command on each node will be posted to
265             callbacks you can optionally provide to the constructor.
266              
267             When calling the constructor, you can include the C option with an
268             arrayref as a value. Anything in that arrayref will be passed as-is to the
269             C constructor. Use this to set the various client options
270             documented in L and L.
271              
272             The C option will tell the target agent how long to wait, without
273             receiving output, before killing the command. This does the same thing as the
274             C<-wait> option to C.
275              
276             The C, C and C each take basic regexes that will be used to
277             match nodes to run the command on. See the corresponding options to C
278             for more info.
279              
280             =head1 CALLBACKS
281              
282             C will arrange for various callbacks to be called as
283             the command is run on each node and its output and exit code returned. Set this
284             up by passing code refs to the constructor:
285              
286             =over 4
287              
288             =item * C
289              
290             Called when the command is fully accepted by Consul (ie in the KV store, ready
291             for nodes to find).
292              
293             =item * C
294              
295             Called for each node as they notice the command has been entered into the KV
296             store and start running it.
297              
298             =item * C
299              
300             Called when a command emits some output. May be called multiple times per node,
301             or not at all if the command has no output.
302              
303             =item * C
304              
305             Called when a command completes.
306              
307             =item * C
308              
309             Called when all remote commands have completed. After this call, the object is
310             no longer useful.
311              
312             =item * C
313              
314             Called if an error occurs while communicating with Consul (local agent
315             unavailable, quorum loss, etc). After this call, the object is no longer
316             useful.
317              
318             =back
319              
320             =head1 CAVEATS
321              
322             Consul's remote execution protocol is internal to Consul itself and is not
323             documented. This module has been confirmed to work in Consul 0.9.0 (the latest
324             release at the time of writing). The Consul authors L
325             mechanism|https://github.com/hashicorp/consul/issues/1120> in the future, but
326             this module should continue to work.
327              
328             =head1 SUPPORT
329              
330             =head2 Bugs / Feature Requests
331              
332             Please report any bugs or feature requests through the issue tracker
333             at L.
334             You will be notified automatically of any progress on your issue.
335              
336             =head2 Source Code
337              
338             This is open source software. The code repository is available for
339             public review and contribution under the terms of the license.
340              
341             L
342              
343             git clone https://github.com/robn/AnyEvent-Consul-Exec.git
344              
345             =head1 AUTHORS
346              
347             =over 4
348              
349             =item *
350              
351             Rob N ★
352              
353             =back
354              
355             =head1 COPYRIGHT AND LICENSE
356              
357             This software is copyright (c) 2017 by Rob N ★ and was supported by FastMail
358             Pty Ltd.
359              
360             This is free software; you can redistribute it and/or modify it under
361             the same terms as the Perl 5 programming language system itself.
362              
363             =cut