File Coverage

blib/lib/AnyEvent/Consul/Exec.pm
Criterion Covered Total %
statement 29 115 25.2
branch 0 30 0.0
condition 0 6 0.0
subroutine 10 26 38.4
pod 0 2 0.0
total 39 179 21.7


line stmt bran cond sub pod time code
1             package AnyEvent::Consul::Exec;
2             $AnyEvent::Consul::Exec::VERSION = '0.003';
3             # ABSTRACT: Execute a remote command across a Consul cluster
4              
5 3     3   918039 use 5.020;
  3         9  
6 3     3   14 use warnings;
  3         5  
  3         80  
7 3     3   15 use strict;
  3         7  
  3         77  
8 3     3   833 use experimental qw(postderef);
  3         7335  
  3         17  
9              
10 3     3   791 use Consul 0.022;
  3         323532  
  3         88  
11 3     3   784 use AnyEvent;
  3         4228  
  3         65  
12 3     3   307 use AnyEvent::Consul;
  3         19344  
  3         71  
13 3     3   21 use JSON::MaybeXS;
  3         7  
  3         188  
14 3     3   1040 use Type::Params qw(compile);
  3         27497  
  3         24  
15 3     3   718 use Types::Standard qw(ClassName Dict Str Optional CodeRef ArrayRef Int slurpy);
  3         8  
  3         13  
16              
17             my @callbacks = map { "on_$_" } qw(submit ack output exit done error);
18              
19             sub new {
20             state $check = compile(
21             ClassName,
22             slurpy Dict[
23             command => Str,
24             wait => Optional[Int],
25             dc => Optional[Str],
26             node => Optional[Str],
27             service => Optional[Str],
28             tag => Optional[Str],
29             consul_args => Optional[ArrayRef],
30 0     0 0   map { $_ => Optional[CodeRef] } @callbacks,
  0            
31             ],
32             );
33 0           my ($class, $self) = $check->(@_);
34 0   0 0     map { $self->{$_} //= sub {} } @callbacks;
  0            
35 0   0       $self->{wait} //= 2;
36 0   0       $self->{consul_args} //= [];
37 0 0         $self->{dc_args} = $self->{dc} ? [dc => $self->{dc}] : [];
38 0           return bless $self, $class;
39             }
40              
41             sub _wait_responses {
42 0     0     my ($self, $index) = @_;
43              
44             $self->{_c}->kv->get_all(
45             "_rexec/$self->{_sid}",
46             index => $index,
47             $self->{dc_args}->@*,
48             cb => sub {
49 0     0     my ($kv, $meta) = @_;
50 0           my @changed = grep { $_->modify_index > $index } $kv->@*;
  0            
51              
52 0           for my $kv (@changed) {
53 0           my ($key) = $kv->key =~ m{^_rexec/$self->{_sid}/(.+)};
54 0 0         unless ($key) {
55 0           warn "W: consul told us '".$kv->key."' changed, but we aren't interested in it, consul bug?\n";
56 0           next;
57             }
58              
59 0 0         if ($key eq 'job') {
60 0           $self->{on_submit}->();
61 0           next;
62             }
63              
64 0           my ($node, $act, $id) = split '/', $key, 3;
65 0 0         unless ($act) {
66 0           warn "W: malformed rexec response: $key\n";
67             }
68              
69 0 0         if ($act eq 'ack') {
70 0           $self->{_nack}++;
71 0           $self->{on_ack}->($node);
72 0           next;
73             }
74              
75 0 0         if ($act eq 'out') {
76 0           $self->{on_output}->($node, $kv->value);
77 0           next;
78             }
79              
80 0 0         if ($act eq 'exit') {
81 0           $self->{_nexit}++;
82 0           $self->{on_exit}->($node, $kv->value);
83 0 0         if ($self->{_nack} == $self->{_nexit}) {
84             # XXX super naive. there might be some that haven't acked yet
85             # should schedule done for a lil bit in the future
86 0           $self->{_done} = 1;
87 0           $self->_cleanup(sub { $self->{on_done}->() });
  0            
88             }
89 0           next;
90             }
91              
92 0           warn "W: $node: unknown action: $act\n";
93             }
94              
95 0 0         $self->_wait_responses($meta->index) unless $self->{_done};
96             },
97 0           );
98             }
99              
100             sub _fire_event {
101 0     0     my ($self) = @_;
102             my $payload = {
103             Prefix => "_rexec",
104             Session => $self->{_sid},
105 0           };
106             $self->{_c}->event->fire(
107             "_rexec",
108             payload => encode_json($payload),
109             $self->{dc_args}->@*,
110             $self->{node} ? (node => $self->{node}) : (),
111             $self->{service} ? (service => $self->{service}) : (),
112             $self->{tag} ? (tag => $self->{tag}) : (),
113 0     0     cb => sub { $self->_wait_responses(0) },
114 0 0         );
    0          
    0          
115             }
116              
117             sub _setup_job {
118 0     0     my ($self) = @_;
119             my $job = {
120             Command => $self->{command},
121 0           Wait => $self->{wait} * 1_000_000_000, # nanoseconds
122             };
123             $self->{_c}->kv->put(
124             "_rexec/$self->{_sid}/job",
125             encode_json($job),
126             acquire => $self->{_sid},
127             $self->{dc_args}->@*,
128 0     0     cb => sub { $self->_fire_event },
129 0           );
130             }
131              
132             sub _start_session {
133 0     0     my ($self) = @_;
134              
135             my $session_started_cb = sub {
136 0     0     $self->{_sid} = shift;
137             $self->{_refresh_guard} = AnyEvent->timer(after => "5s", interval => "5s", cb => sub {
138             $self->{_c}->session->renew(
139             $self->{_sid},
140             $self->{dc_args}->@*,
141 0           );
142 0           });
143 0           $self->_setup_job;
144 0           };
145              
146 0 0         if ($self->{dc}) {
147             $self->{_c}->health->service(
148             "consul",
149             $self->{dc_args}->@*,
150             cb => sub {
151 0     0     my ($services) = @_;
152 0           my $service = shift $services->@*;
153 0 0         unless ($service) {
154             # XXX no consuls at remote DC
155             ...
156 0           }
157 0           my $node = $service->node->name;
158             $self->{_c}->session->create(
159             Consul::Session->new(
160             name => 'Remote exec via ...', # XXX local node name
161             behavior => 'delete',
162             ttl => "15s",
163             node => $node,
164             ),
165             $self->{dc_args}->@*,
166 0           cb => $session_started_cb,
167             );
168             },
169             error_cb => sub {
170 0     0     my ($err) = @_;
171 0           $self->_cleanup(sub { $self->{on_error}->($err) });
  0            
172             },
173 0           );
174             }
175              
176             else {
177             $self->{_c}->session->create(
178 0           Consul::Session->new(
179             name => 'Remote exec',
180             behavior => 'delete',
181             ttl => "15s",
182             ),
183             cb => $session_started_cb,
184             );
185             }
186             }
187              
188             sub _cleanup {
189 0     0     my ($self, $cb) = @_;
190 0           delete $self->{_refresh_guard};
191 0 0         if ($self->{_sid}) {
192             $self->{_c}->session->destroy(
193             $self->{_sid},
194             $self->{dc_args}->@*,
195             cb => sub {
196             $self->{_c}->kv->delete(
197             "_rexec/$self->{_sid}",
198             recurse => 1,
199             $self->{dc_args}->@*,
200             cb => sub {
201 0           delete $self->{_sid};
202 0           delete $self->{_c};
203 0           $cb->();
204             },
205 0     0     );
206 0           });
207             }
208             else {
209 0           delete $self->{_sid};
210 0           delete $self->{_c};
211 0           $cb->();
212             }
213             }
214              
215             sub start {
216 0     0 0   my ($self) = @_;
217             $self->{_c} = AnyEvent::Consul->new($self->{consul_args}->@*, error_cb => sub {
218 0     0     my ($err) = @_;
219 0           $self->_cleanup(sub { $self->{on_error}->($err) });
  0            
220 0           });
221 0           $self->_start_session;
222 0           return;
223             }
224              
225             1;
226              
227             =pod
228              
229             =encoding UTF-8
230              
231             =for markdown [![Build Status](https://secure.travis-ci.org/robn/AnyEvent-Consul-Exec.png)](http://travis-ci.org/robn/AnyEvent-Consul-Exec)
232              
233             =head1 NAME
234              
235             AnyEvent::Consul::Exec - Execute a remote command across a Consul cluster
236              
237             =head1 SYNOPSIS
238              
239             use AnyEvent;
240             use AnyEvent::Consul::Exec;
241            
242             my $cv = AE::cv;
243            
244             my $e = AnyEvent::Consul::Exec->new(
245            
246             # command to run
247             command => 'uptime',
248              
249             # number of seconds target will wait for command, without sending
250             # output, before terminating it
251             wait => 2,
252            
253             # called once job is submitted to Consul
254             on_submit => sub {
255             say "job submitted";
256             },
257            
258             # called as each target node starts to process the job
259             # multiple calls, once per node
260             on_ack => sub {
261             my ($node) = @_;
262             say "$node: ack";
263             },
264            
265             # called when a node has output from the job
266             # can be called zero or more times per node, as more output
267             # becomes available
268             on_output => sub {
269             my ($node, $output) = @_;
270             say "$node: output:";
271             say "$node> $_" for split("\n", $output);
272             },
273            
274             # called when the node completes a job
275             # multiple calls, one per node
276             on_exit => sub {
277             my ($node, $rc) = @_;
278             say "$node: exit: $rc";
279             },
280            
281             # called once all nodes have reported completion
282             # object is unusable past this point
283             on_done => sub {
284             say "job done";
285             $cv->send;
286             },
287            
288             # called if an error occurs anywhere during processing (not command errors)
289             # typically called if Consul is unable to service requests
290             # object is unusable past this point
291             on_error => sub {
292             my ($err) = @_;
293             say "error: $err";
294             $cv->send;
295             },
296             );
297            
298             # begin execution
299             $e->start;
300              
301             $cv->recv;
302              
303             =head1 DESCRIPTION
304              
305             AnyEvent::Consul::Exec is an interface to Consul's "exec" agent function. This
306             is the same thing you get when you run L.
307              
308             C is great, but its output is text-based, making it awkward to
309             parse to determine what happened on each node that ran the command.
310             C replaces the client portion with a library you can
311             use to get info about what is happening on each node as it happens.
312              
313             As the name implies, it expects to be run inside an L event loop.
314              
315             =head1 BASICS
316              
317             Start off by instantiating a C object with the command
318             you want to run:
319              
320             my $e = AnyEvent::Consul::Exec->new(
321             command => 'uptime',
322             );
323              
324             Then call C to kick it off:
325              
326             $e->start;
327              
328             As the C event loop progresses, the command will be executed on
329             remote nodes. Output and results of that command on each node will be posted to
330             callbacks you can optionally provide to the constructor.
331              
332             When calling the constructor, you can include the C option with an
333             arrayref as a value. Anything in that arrayref will be passed as-is to the
334             C constructor. Use this to set the various client options
335             documented in L and L.
336              
337             The C option will tell the target agent how long to wait, without
338             receiving output, before killing the command. This does the same thing as the
339             C<-wait> option to C.
340              
341             The C, C and C each take basic regexes that will be used to
342             match nodes to run the command on. See the corresponding options to C
343             for more info.
344              
345             The C option can take the name of the datacenter to run the command in. The
346             exec mechanism is limited to a single datacentre. This option will cause
347             L to find a Consul agent in the named datacenter and
348             execute the command there (without it, the local node is used).
349              
350             =head1 CALLBACKS
351              
352             C will arrange for various callbacks to be called as
353             the command is run on each node and its output and exit code returned. Set this
354             up by passing code refs to the constructor:
355              
356             =over 4
357              
358             =item * C
359              
360             Called when the command is fully accepted by Consul (ie in the KV store, ready
361             for nodes to find).
362              
363             =item * C
364              
365             Called for each node as they notice the command has been entered into the KV
366             store and start running it.
367              
368             =item * C
369              
370             Called when a command emits some output. May be called multiple times per node,
371             or not at all if the command has no output.
372              
373             =item * C
374              
375             Called when a command completes.
376              
377             =item * C
378              
379             Called when all remote commands have completed. After this call, the object is
380             no longer useful.
381              
382             =item * C
383              
384             Called if an error occurs while communicating with Consul (local agent
385             unavailable, quorum loss, etc). After this call, the object is no longer
386             useful.
387              
388             =back
389              
390             =head1 CAVEATS
391              
392             Consul's remote execution protocol is internal to Consul itself and is not
393             documented. This module has been confirmed to work in Consul 0.9.0 (the latest
394             release at the time of writing). The Consul authors L
395             mechanism|https://github.com/hashicorp/consul/issues/1120> in the future, but
396             this module should continue to work.
397              
398             =head1 SUPPORT
399              
400             =head2 Bugs / Feature Requests
401              
402             Please report any bugs or feature requests through the issue tracker
403             at L.
404             You will be notified automatically of any progress on your issue.
405              
406             =head2 Source Code
407              
408             This is open source software. The code repository is available for
409             public review and contribution under the terms of the license.
410              
411             L
412              
413             git clone https://github.com/robn/AnyEvent-Consul-Exec.git
414              
415             =head1 AUTHORS
416              
417             =over 4
418              
419             =item *
420              
421             Rob N ★
422              
423             =back
424              
425             =head1 COPYRIGHT AND LICENSE
426              
427             This software is copyright (c) 2017 by Rob N ★ and was supported by FastMail
428             Pty Ltd.
429              
430             This is free software; you can redistribute it and/or modify it under
431             the same terms as the Perl 5 programming language system itself.
432              
433             =cut