File Coverage

blib/lib/AnyEvent/Consul/Exec.pm
Criterion Covered Total %
statement 29 116 25.0
branch 0 30 0.0
condition 0 11 0.0
subroutine 10 26 38.4
pod 0 2 0.0
total 39 185 21.0


line stmt bran cond sub pod time code
1             package AnyEvent::Consul::Exec;
2             $AnyEvent::Consul::Exec::VERSION = '0.004';
3             # ABSTRACT: Execute a remote command across a Consul cluster
4              
5 4     4   1944358 use 5.020;
  4         49  
6 4     4   30 use warnings;
  4         8  
  4         136  
7 4     4   23 use strict;
  4         9  
  4         106  
8 4     4   2204 use experimental qw(postderef);
  4         14793  
  4         41  
9              
10 4     4   1444 use Consul 0.022;
  4         392990  
  4         159  
11 4     4   1188 use AnyEvent;
  4         5772  
  4         146  
12 4     4   618 use AnyEvent::Consul;
  4         26287  
  4         127  
13 4     4   38 use JSON::MaybeXS;
  4         9  
  4         392  
14 4     4   3590 use Type::Params qw(compile);
  4         49705  
  4         37  
15 4     4   1058 use Types::Standard qw(ClassName Dict Str Optional CodeRef ArrayRef Int slurpy);
  4         11  
  4         26  
16              
17             my @callbacks = map { "on_$_" } qw(submit ack output exit done error);
18              
19             sub new {
20             state $check = compile(
21             ClassName,
22             slurpy Dict[
23             command => Str,
24             wait => Optional[Int],
25             dc => Optional[Str],
26             node => Optional[Str],
27             min_node_count => Optional[Int],
28             service => Optional[Str],
29             tag => Optional[Str],
30             consul_args => Optional[ArrayRef],
31 0     0 0   map { $_ => Optional[CodeRef] } @callbacks,
  0            
32             ],
33             );
34 0           my ($class, $self) = $check->(@_);
35 0   0 0     map { $self->{$_} //= sub {} } @callbacks;
  0            
36 0   0       $self->{wait} //= 2;
37 0   0       $self->{consul_args} //= [];
38 0 0         $self->{dc_args} = $self->{dc} ? [dc => $self->{dc}] : [];
39 0   0       $self->{min_node_count} //= 0;
40 0           return bless $self, $class;
41             }
42              
43             sub _wait_responses {
44 0     0     my ($self, $index) = @_;
45              
46             $self->{_c}->kv->get_all(
47             "_rexec/$self->{_sid}",
48             index => $index,
49             $self->{dc_args}->@*,
50             cb => sub {
51 0     0     my ($kv, $meta) = @_;
52 0           my @changed = grep { $_->modify_index > $index } $kv->@*;
  0            
53              
54 0           for my $kv (@changed) {
55 0           my ($key) = $kv->key =~ m{^_rexec/$self->{_sid}/(.+)};
56 0 0         unless ($key) {
57 0           warn "W: consul told us '".$kv->key."' changed, but we aren't interested in it, consul bug?\n";
58 0           next;
59             }
60              
61 0 0         if ($key eq 'job') {
62 0           $self->{on_submit}->();
63 0           next;
64             }
65              
66 0           my ($node, $act, $id) = split '/', $key, 3;
67 0 0         unless ($act) {
68 0           warn "W: malformed rexec response: $key\n";
69             }
70              
71 0 0         if ($act eq 'ack') {
72 0           $self->{_nack}++;
73 0           $self->{on_ack}->($node);
74 0           next;
75             }
76              
77 0 0         if ($act eq 'out') {
78 0           $self->{on_output}->($node, $kv->value);
79 0           next;
80             }
81              
82 0 0         if ($act eq 'exit') {
83 0           $self->{_nexit}++;
84 0           $self->{on_exit}->($node, $kv->value);
85              
86             # XXX super naive. there might be some that haven't acked yet
87             # should schedule done for a lil bit in the future
88 0 0 0       if ( $self->{_nack} == $self->{_nexit}
89             && $self->{_nexit} >= $self->{min_node_count})
90             {
91 0           $self->{_done} = 1;
92 0           $self->_cleanup(sub { $self->{on_done}->() });
  0            
93             }
94 0           next;
95             }
96              
97 0           warn "W: $node: unknown action: $act\n";
98             }
99              
100 0 0         $self->_wait_responses($meta->index) unless $self->{_done};
101             },
102 0           );
103             }
104              
105             sub _fire_event {
106 0     0     my ($self) = @_;
107             my $payload = {
108             Prefix => "_rexec",
109             Session => $self->{_sid},
110 0           };
111             $self->{_c}->event->fire(
112             "_rexec",
113             payload => encode_json($payload),
114             $self->{dc_args}->@*,
115             $self->{node} ? (node => $self->{node}) : (),
116             $self->{service} ? (service => $self->{service}) : (),
117             $self->{tag} ? (tag => $self->{tag}) : (),
118 0     0     cb => sub { $self->_wait_responses(0) },
119 0 0         );
    0          
    0          
120             }
121              
122             sub _setup_job {
123 0     0     my ($self) = @_;
124             my $job = {
125             Command => $self->{command},
126 0           Wait => $self->{wait} * 1_000_000_000, # nanoseconds
127             };
128             $self->{_c}->kv->put(
129             "_rexec/$self->{_sid}/job",
130             encode_json($job),
131             acquire => $self->{_sid},
132             $self->{dc_args}->@*,
133 0     0     cb => sub { $self->_fire_event },
134 0           );
135             }
136              
137             sub _start_session {
138 0     0     my ($self) = @_;
139              
140             my $session_started_cb = sub {
141 0     0     $self->{_sid} = shift;
142             $self->{_refresh_guard} = AnyEvent->timer(after => "5s", interval => "5s", cb => sub {
143             $self->{_c}->session->renew(
144             $self->{_sid},
145             $self->{dc_args}->@*,
146 0           );
147 0           });
148 0           $self->_setup_job;
149 0           };
150              
151 0 0         if ($self->{dc}) {
152             $self->{_c}->health->service(
153             "consul",
154             $self->{dc_args}->@*,
155             cb => sub {
156 0     0     my ($services) = @_;
157 0           my $service = shift $services->@*;
158 0 0         unless ($service) {
159             # XXX no consuls at remote DC
160             ...
161 0           }
162 0           my $node = $service->node->name;
163             $self->{_c}->session->create(
164             Consul::Session->new(
165             name => 'Remote exec via ...', # XXX local node name
166             behavior => 'delete',
167             ttl => "15s",
168             node => $node,
169             ),
170             $self->{dc_args}->@*,
171 0           cb => $session_started_cb,
172             );
173             },
174             error_cb => sub {
175 0     0     my ($err) = @_;
176 0           $self->_cleanup(sub { $self->{on_error}->($err) });
  0            
177             },
178 0           );
179             }
180              
181             else {
182             $self->{_c}->session->create(
183 0           Consul::Session->new(
184             name => 'Remote exec',
185             behavior => 'delete',
186             ttl => "15s",
187             ),
188             cb => $session_started_cb,
189             );
190             }
191             }
192              
193             sub _cleanup {
194 0     0     my ($self, $cb) = @_;
195 0           delete $self->{_refresh_guard};
196 0 0         if ($self->{_sid}) {
197             $self->{_c}->session->destroy(
198             $self->{_sid},
199             $self->{dc_args}->@*,
200             cb => sub {
201             $self->{_c}->kv->delete(
202             "_rexec/$self->{_sid}",
203             recurse => 1,
204             $self->{dc_args}->@*,
205             cb => sub {
206 0           delete $self->{_sid};
207 0           delete $self->{_c};
208 0           $cb->();
209             },
210 0     0     );
211 0           });
212             }
213             else {
214 0           delete $self->{_sid};
215 0           delete $self->{_c};
216 0           $cb->();
217             }
218             }
219              
220             sub start {
221 0     0 0   my ($self) = @_;
222             $self->{_c} = AnyEvent::Consul->new($self->{consul_args}->@*, error_cb => sub {
223 0     0     my ($err) = @_;
224 0           $self->_cleanup(sub { $self->{on_error}->($err) });
  0            
225 0           });
226 0           $self->_start_session;
227 0           return;
228             }
229              
230             1;
231              
232             =pod
233              
234             =encoding UTF-8
235              
236             =for markdown [![Build Status](https://secure.travis-ci.org/robn/AnyEvent-Consul-Exec.png)](http://travis-ci.org/robn/AnyEvent-Consul-Exec)
237              
238             =head1 NAME
239              
240             AnyEvent::Consul::Exec - Execute a remote command across a Consul cluster
241              
242             =head1 SYNOPSIS
243              
244             use AnyEvent;
245             use AnyEvent::Consul::Exec;
246            
247             my $cv = AE::cv;
248            
249             my $e = AnyEvent::Consul::Exec->new(
250            
251             # command to run
252             command => 'uptime',
253              
254             # number of seconds target will wait for command, without sending
255             # output, before terminating it
256             wait => 2,
257            
258             # called once job is submitted to Consul
259             on_submit => sub {
260             say "job submitted";
261             },
262            
263             # called as each target node starts to process the job
264             # multiple calls, once per node
265             on_ack => sub {
266             my ($node) = @_;
267             say "$node: ack";
268             },
269            
270             # called when a node has output from the job
271             # can be called zero or more times per node, as more output
272             # becomes available
273             on_output => sub {
274             my ($node, $output) = @_;
275             say "$node: output:";
276             say "$node> $_" for split("\n", $output);
277             },
278            
279             # called when the node completes a job
280             # multiple calls, one per node
281             on_exit => sub {
282             my ($node, $rc) = @_;
283             say "$node: exit: $rc";
284             },
285            
286             # called once all nodes have reported completion
287             # object is unusable past this point
288             on_done => sub {
289             say "job done";
290             $cv->send;
291             },
292            
293             # called if an error occurs anywhere during processing (not command errors)
294             # typically called if Consul is unable to service requests
295             # object is unusable past this point
296             on_error => sub {
297             my ($err) = @_;
298             say "error: $err";
299             $cv->send;
300             },
301             );
302            
303             # begin execution
304             $e->start;
305              
306             $cv->recv;
307              
308             =head1 DESCRIPTION
309              
310             AnyEvent::Consul::Exec is an interface to Consul's "exec" agent function. This
311             is the same thing you get when you run L.
312              
313             C is great, but its output is text-based, making it awkward to
314             parse to determine what happened on each node that ran the command.
315             C replaces the client portion with a library you can
316             use to get info about what is happening on each node as it happens.
317              
318             As the name implies, it expects to be run inside an L event loop.
319              
320             =head1 BASICS
321              
322             Start off by instantiating a C object with the command
323             you want to run:
324              
325             my $e = AnyEvent::Consul::Exec->new(
326             command => 'uptime',
327             );
328              
329             Then call C to kick it off:
330              
331             $e->start;
332              
333             As the C event loop progresses, the command will be executed on
334             remote nodes. Output and results of that command on each node will be posted to
335             callbacks you can optionally provide to the constructor.
336              
337             When calling the constructor, you can include the C option with an
338             arrayref as a value. Anything in that arrayref will be passed as-is to the
339             C constructor. Use this to set the various client options
340             documented in L and L.
341              
342             The C option will tell the target agent how long to wait, without
343             receiving output, before killing the command. This does the same thing as the
344             C<-wait> option to C.
345              
346             The C, C and C each take basic regexes that will be used to
347             match nodes to run the command on. See the corresponding options to C
348             for more info.
349              
350             If you specify , at *least* this many nodes must report in
351             before we consider a job done. Without this, some nodes might report back
352             results before we've seen an ack from the others, and your job may prematurely
353             be canceled on those other nodes, or your on_done callback will be called
354             prematurely. This is most useful if C is a regex that matches an
355             explicit amount of nodes, for example:
356              
357             node => /^(host1|host2|host3)$/,
358             min_node_count => 3,
359              
360             The C option can take the name of the datacenter to run the command in. The
361             exec mechanism is limited to a single datacentre. This option will cause
362             L to find a Consul agent in the named datacenter and
363             execute the command there (without it, the local node is used).
364              
365             =head1 CALLBACKS
366              
367             C will arrange for various callbacks to be called as
368             the command is run on each node and its output and exit code returned. Set this
369             up by passing code refs to the constructor:
370              
371             =over 4
372              
373             =item * C
374              
375             Called when the command is fully accepted by Consul (ie in the KV store, ready
376             for nodes to find).
377              
378             =item * C
379              
380             Called for each node as they notice the command has been entered into the KV
381             store and start running it.
382              
383             =item * C
384              
385             Called when a command emits some output. May be called multiple times per node,
386             or not at all if the command has no output.
387              
388             =item * C
389              
390             Called when a command completes.
391              
392             =item * C
393              
394             Called when all remote commands have completed. After this call, the object is
395             no longer useful.
396              
397             =item * C
398              
399             Called if an error occurs while communicating with Consul (local agent
400             unavailable, quorum loss, etc). After this call, the object is no longer
401             useful.
402              
403             =back
404              
405             =head1 CAVEATS
406              
407             Consul's remote execution protocol is internal to Consul itself and is not
408             documented. This module has been confirmed to work in Consul 0.9.0 (the latest
409             release at the time of writing). The Consul authors L
410             mechanism|https://github.com/hashicorp/consul/issues/1120> in the future, but
411             this module should continue to work.
412              
413             =head1 SUPPORT
414              
415             =head2 Bugs / Feature Requests
416              
417             Please report any bugs or feature requests through the issue tracker
418             at L.
419             You will be notified automatically of any progress on your issue.
420              
421             =head2 Source Code
422              
423             This is open source software. The code repository is available for
424             public review and contribution under the terms of the license.
425              
426             L
427              
428             git clone https://github.com/robn/AnyEvent-Consul-Exec.git
429              
430             =head1 AUTHORS
431              
432             =over 4
433              
434             =item *
435              
436             Rob Norris
437              
438             =back
439              
440             =head1 CONTRIBUTORS
441              
442             =over 4
443              
444             =item *
445              
446             Matthew Horsfall
447              
448             =back
449              
450             =head1 COPYRIGHT AND LICENSE
451              
452             This software is copyright (c) 2017 by Rob N ★ and was supported by FastMail
453             Pty Ltd.
454              
455             This is free software; you can redistribute it and/or modify it under
456             the same terms as the Perl 5 programming language system itself.
457              
458             =cut