File Coverage

blib/lib/Metabrik/Client/Elasticsearch.pm
Criterion Covered Total %
statement 9 2018 0.4
branch 0 1208 0.0
condition 0 270 0.0
subroutine 3 130 2.3
pod 2 125 1.6
total 14 3751 0.3


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # client::elasticsearch Brik
5             #
6             package Metabrik::Client::Elasticsearch;
7 2     2   1565 use strict;
  2         4  
  2         58  
8 2     2   10 use warnings;
  2         6  
  2         68  
9              
10 2     2   11 use base qw(Metabrik::Client::Rest);
  2         4  
  2         971  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable es es) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             datadir => [ qw(datadir) ],
20             nodes => [ qw(node_list) ],
21             cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22             date => [ qw(date) ],
23             index => [ qw(index) ],
24             type => [ qw(type) ],
25             from => [ qw(number) ],
26             size => [ qw(count) ],
27             max => [ qw(count) ],
28             max_flush_count => [ qw(count) ],
29             max_flush_size => [ qw(count) ],
30             rtimeout => [ qw(seconds) ],
31             sniff_rtimeout => [ qw(seconds) ],
32             try => [ qw(count) ],
33             use_bulk_autoflush => [ qw(0|1) ],
34             use_indexing_optimizations => [ qw(0|1) ],
35             use_ignore_id => [ qw(0|1) ],
36             use_type => [ qw(0|1) ],
37             csv_header => [ qw(fields) ],
38             csv_encoded_fields => [ qw(fields) ],
39             csv_object_fields => [ qw(fields) ],
40             encoding => [ qw(utf8|ascii) ],
41             disable_deprecation_logging => [ qw(0|1) ],
42             es_version => [ qw(0|1) ],
43             _es => [ qw(INTERNAL) ],
44             _bulk => [ qw(INTERNAL) ],
45             _scroll => [ qw(INTERNAL) ],
46             },
47             attributes_default => {
48             nodes => [ qw(http://localhost:9200) ],
49             cxn_pool => 'Sniff',
50             from => 0,
51             size => 10,
52             max => 0,
53             index => '*',
54             type => '*',
55             rtimeout => 60,
56             sniff_rtimeout => 3,
57             try => 3,
58             max_flush_count => 1_000,
59             max_flush_size => 1_000_000,
60             use_bulk_autoflush => 1,
61             use_indexing_optimizations => 0,
62             use_ignore_id => 0,
63             use_type => 1,
64             encoding => 'utf8',
65             disable_deprecation_logging => 0,
66             es_version => '7',
67             },
68             commands => {
69             open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
70             open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
71             open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
72             open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
73             close_scroll => [ ],
74             total_scroll => [ ],
75             next_scroll => [ qw(count|OPTIONAL) ],
76             reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
77             get_reindex_tasks => [ ],
78             cancel_reindex_task => [ qw(id) ],
79             get_taskid => [ qw(id) ],
80             show_reindex_progress => [ ],
81             loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
82             index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
83             index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
84             index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
85             clean_deleted_from_index => [ qw(index) ],
86             update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
87             update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
88             bulk_flush => [ qw(index|OPTIONAL) ],
89             query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
90             count => [ qw(index|OPTIONAL type|OPTIONAL) ],
91             get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
92             www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
93             delete_index => [ qw(index|indices_list) ],
94             update_alias => [ qw(new_index alias) ],
95             delete_document => [ qw(index type id) ],
96             delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
97             show_indices => [ qw(string_filter|OPTIONAL) ],
98             show_nodes => [ ],
99             show_health => [ ],
100             show_recovery => [ ],
101             show_allocation => [ ],
102             list_indices => [ qw(regex|OPTIONAL) ],
103             get_indices => [ qw(string_filter|OPTIONAL) ],
104             get_index => [ qw(index|indices_list) ],
105             get_index_stats => [ qw(index) ],
106             list_index_types => [ qw(index) ],
107             list_index_fields => [ qw(index) ],
108             list_indices_version => [ qw(index|indices_list) ],
109             open_index => [ qw(index|indices_list) ],
110             close_index => [ qw(index|indices_list) ],
111             get_aliases => [ qw(index) ],
112             put_alias => [ qw(index alias) ],
113             delete_alias => [ qw(index alias) ],
114             is_mapping_exists => [ qw(index mapping) ],
115             get_mappings => [ qw(index type|OPTIONAL) ],
116             create_index => [ qw(index shards|OPTIONAL) ],
117             create_index_with_mappings => [ qw(index mappings) ],
118             info => [ qw(nodes_list|OPTIONAL) ],
119             version => [ qw(nodes_list|OPTIONAL) ],
120             get_templates => [ ],
121             list_templates => [ ],
122             get_template => [ qw(name) ],
123             put_mapping => [ qw(index type mapping) ],
124             put_mapping_from_json_file => [ qw(index type file) ],
125             update_mapping_from_json_file => [ qw(file index type) ],
126             put_template => [ qw(name template) ],
127             put_template_from_json_file => [ qw(file name|OPTIONAL) ],
128             update_template_from_json_file => [ qw(file name|OPTIONAL) ],
129             get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
130             put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
131             set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
132             reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
133             list_index_readonly => [ ],
134             set_index_number_of_replicas => [ qw(index|indices_list number) ],
135             set_index_refresh_interval => [ qw(index|indices_list number) ],
136             get_index_settings => [ qw(index|indices_list) ],
137             get_index_readonly => [ qw(index|indices_list) ],
138             get_index_number_of_replicas => [ qw(index|indices) ],
139             get_index_refresh_interval => [ qw(index|indices_list) ],
140             get_index_number_of_shards => [ qw(index|indices_list) ],
141             delete_template => [ qw(name) ],
142             is_index_exists => [ qw(index) ],
143             is_type_exists => [ qw(index type) ],
144             is_document_exists => [ qw(index type document) ],
145             parse_error_string => [ qw(string) ],
146             refresh_index => [ qw(index) ],
147             export_as => [ qw(format index size|OPTIONAL callback|OPTIONAL) ],
148             export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
149             export_as_json => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
150             import_from => [ qw(format input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
151             import_from_csv => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
152             import_from_json => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
153             import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
154             get_stats_process => [ ],
155             get_process => [ ],
156             get_cluster_state => [ ],
157             get_cluster_health => [ ],
158             get_cluster_settings => [ ],
159             put_cluster_settings => [ qw(settings) ],
160             count_green_indices => [ ],
161             count_yellow_indices => [ ],
162             count_red_indices => [ ],
163             list_green_indices => [ ],
164             list_yellow_indices => [ ],
165             list_red_indices => [ ],
166             count_indices => [ ],
167             list_indices_status => [ ],
168             count_shards => [ ],
169             count_size => [ qw(string_filter|OPTIONAL) ],
170             count_total_size => [ qw(string_filter|OPTIONAL) ],
171             count_count => [ ],
172             list_datatypes => [ ],
173             get_hits_total => [ qw(results) ],
174             disable_shard_allocation => [ ],
175             enable_shard_allocation => [ ],
176             flush_synced => [ ],
177             create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
178             create_shared_fs_snapshot_repository => [ qw(location
179             repository_name|OPTIONAL) ],
180             get_snapshot_repositories => [ ],
181             get_snapshot_status => [ ],
182             delete_snapshot_repository => [ qw(repository_name) ],
183             create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
184             body|OPTIONAL) ],
185             create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
186             repository_name|OPTIONAL) ],
187             is_snapshot_finished => [ ],
188             get_snapshot_state => [ ],
189             get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
190             delete_snapshot => [ qw(snapshot_name repository_name) ],
191             restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
192             restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
193             },
194             require_modules => {
195             'Metabrik::String::Json' => [ ],
196             'Metabrik::File::Csv' => [ ],
197             'Metabrik::File::Json' => [ ],
198             'Metabrik::File::Dump' => [ ],
199             'Metabrik::Format::Number' => [ ],
200             'Metabrik::Worker::Parallel' => [ ],
201             'Search::Elasticsearch' => [ ],
202             },
203             };
204             }
205              
206             sub brik_preinit {
207 0     0 1   my $self = shift;
208              
209 0           eval("use Search::Elasticsearch;");
210 0 0         if ($Search::Elasticsearch::VERSION < 5) {
211 0           $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
212             "with: run perl::module install Search::Elasticsearch");
213             }
214              
215 0           return $self->SUPER::brik_preinit;
216             }
217              
218             sub open {
219 0     0 0   my $self = shift;
220 0           my ($nodes, $cxn_pool) = @_;
221              
222 0   0       $nodes ||= $self->nodes;
223 0   0       $cxn_pool ||= $self->cxn_pool;
224 0 0         $self->brik_help_run_undef_arg('open', $nodes) or return;
225 0 0         $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
226 0 0         $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
227 0 0         $self->brik_help_run_empty_array_arg('open', $nodes) or return;
228              
229 0           for my $node (@$nodes) {
230 0 0         if ($node !~ m{https?://}) {
231 0           return $self->log->error("open: invalid node[$node], must start with http(s)");
232             }
233             }
234              
235 0           my $timeout = $self->rtimeout;
236              
237 0           my $nodes_str = join('|', @$nodes);
238 0           $self->log->debug("open: using nodes [$nodes_str]");
239              
240             #
241             # Timeout description here:
242             #
243             # Search::Elasticsearch::Role::Cxn
244             #
245              
246 0           my %args = (
247             nodes => $nodes,
248             cxn_pool => $cxn_pool,
249             timeout => $timeout,
250             max_retries => $self->try,
251             retry_on_timeout => 1,
252             sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
253             request_timeout => 60, # seconds, default 30
254             ping_timeout => 5, # seconds, default 2
255             dead_timeout => 120, # seconds, detault 60
256             max_dead_timeout => 3600, # seconds, default 3600
257             sniff_request_timeout => 15, # seconds, default 2
258             #trace_to => 'Stderr', # For debug purposes
259             );
260              
261 0 0         if ($self->disable_deprecation_logging) {
262 0           $args{deprecate_to} = ['File', '/dev/null'];
263             }
264              
265 0           my $es = Search::Elasticsearch->new(%args);
266 0 0         if (! defined($es)) {
267 0           return $self->log->error("open: failed");
268             }
269              
270 0           $self->_es($es);
271              
272 0           return $nodes;
273             }
274              
275             #
276             # Search::Elasticsearch::Client::5_0::Bulk
277             #
278             sub open_bulk_mode {
279 0     0 0   my $self = shift;
280 0           my ($index, $type) = @_;
281              
282 0   0       $index ||= $self->index;
283 0   0       $type ||= $self->type;
284 0           my $es = $self->_es;
285 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
286 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
287 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
288              
289             my %args = (
290             index => $index,
291             on_error => sub {
292             #my ($action, $response, $i) = @_;
293              
294             #print Data::Dumper::Dumper($action)."\n";
295             #print Data::Dumper::Dumper($response)."\n";
296             #print Data::Dumper::Dumper($i)."\n";
297 0     0     print Data::Dumper::Dumper(\@_)."\n";
298             },
299 0           );
300              
301 0 0         if ($self->use_type) {
302 0           $args{type} = $type;
303             }
304              
305 0 0         if ($self->use_bulk_autoflush) {
306 0   0       my $max_count = $self->max_flush_count || 1_000;
307 0   0       my $max_size = $self->max_flush_size || 1_000_000;
308              
309 0           $args{max_count} = $max_count;
310 0           $args{max_size} = $max_size;
311 0           $args{max_time} = 0;
312              
313 0           $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
314             "max_flush_size [$max_size]");
315             }
316             else {
317 0           $args{max_count} = 0;
318 0           $args{max_size} = 0;
319 0           $args{max_time} = 0;
320 0           $args{on_error} = undef;
321             #$args{on_success} = sub {
322             #my ($action, $response, $i) = @_;
323             #};
324              
325 0           $self->log->info("open_bulk_mode: opening without automatic flushing");
326             }
327              
328 0           my $bulk;
329 0           eval {
330 0           $bulk = $es->bulk_helper(%args);
331             };
332 0 0         if ($@) {
333 0           chomp($@);
334 0           return $self->log->error("open_bulk_mode: failed: [$@]");
335             }
336              
337 0           $self->_bulk($bulk);
338              
339 0           return $self->nodes;
340             }
341              
342             sub open_scroll_scan_mode {
343 0     0 0   my $self = shift;
344 0           my ($index, $size) = @_;
345              
346 0 0         my $version = $self->version or return;
347 0 0         if ($version ge "5.0.0") {
348 0           return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
349             "$version, try open_scroll Command instead");
350             }
351              
352 0   0       $index ||= $self->index;
353 0   0       $size ||= $self->size;
354 0           my $es = $self->_es;
355 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
356 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
357 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
358              
359 0           my $scroll;
360 0           eval {
361 0           $scroll = $es->scroll_helper(
362             index => $index,
363             search_type => 'scan',
364             size => $size,
365             );
366             };
367 0 0         if ($@) {
368 0           chomp($@);
369 0           return $self->log->error("open_scroll_scan_mode: failed: $@");
370             }
371              
372 0           $self->_scroll($scroll);
373              
374 0           return $self->nodes;
375             }
376              
377             #
378             # Search::Elasticsearch::Client::5_0::Scroll
379             #
380             sub open_scroll {
381 0     0 0   my $self = shift;
382 0           my ($index, $size, $type, $query) = @_;
383              
384 0 0         my $version = $self->version or return;
385 0 0         if ($version lt "5.0.0") {
386 0           return $self->log->error("open_scroll: Command not supported for ES version ".
387             "$version, try open_scroll_scan_mode Command instead");
388             }
389              
390 0   0       $query ||= { query => { match_all => {} } };
391 0   0       $index ||= $self->index;
392 0   0       $type ||= $self->type;
393 0   0       $size ||= $self->size;
394 0           my $es = $self->_es;
395 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
396 0 0         $self->brik_help_run_undef_arg('open_scroll', $index) or return;
397 0 0         $self->brik_help_run_undef_arg('open_scroll', $size) or return;
398              
399 0           my $timeout = $self->rtimeout;
400              
401 0           my %args = (
402             scroll => "${timeout}s",
403             # Starting with Search::Elasticsearch 7.x, scroll_in_qs does not exist anymore
404             #scroll_in_qs => 1, # By default (0), pass scroll_id in request body. When 1, pass
405             # it in query string.
406             index => $index,
407             size => $size,
408             body => $query,
409             );
410 0 0         if ($self->use_type) {
411 0 0         if ($type ne '*') {
412 0           $args{type} = $type;
413             }
414             }
415              
416             #
417             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
418             #
419 0           my $scroll;
420 0           eval {
421 0           $scroll = $es->scroll_helper(%args);
422             };
423 0 0         if ($@) {
424 0           chomp($@);
425 0           return $self->log->error("open_scroll: failed: $@");
426             }
427              
428 0           $self->_scroll($scroll);
429              
430 0           $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
431              
432 0           return $self->nodes;
433             }
434              
435             #
436             # Search::Elasticsearch::Client::5_0::Scroll
437             #
438             sub close_scroll {
439 0     0 0   my $self = shift;
440              
441 0           my $scroll = $self->_scroll;
442 0 0         if (! defined($scroll)) {
443 0           return 1;
444             }
445              
446 0           $scroll->finish;
447 0           $self->_scroll(undef);
448              
449 0           return 1;
450             }
451              
452             sub total_scroll {
453 0     0 0   my $self = shift;
454              
455 0           my $scroll = $self->_scroll;
456 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
457              
458 0           my $total;
459 0           eval {
460 0           $total = $scroll->total;
461             };
462 0 0         if ($@) {
463 0           chomp($@);
464 0           return $self->log->error("total_scroll: failed with: [$@]");
465             }
466              
467 0           return $total;
468             }
469              
470             sub next_scroll {
471 0     0 0   my $self = shift;
472 0           my ($count) = @_;
473              
474 0   0       $count ||= 1;
475              
476 0           my $scroll = $self->_scroll;
477 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
478              
479 0           my $next;
480 0           eval {
481 0 0         if ($count > 1) {
482 0           my @docs = $scroll->next($count);
483 0 0         if (@docs > 0) {
484 0           $next = \@docs;
485             }
486             }
487             else {
488 0           $next = $scroll->next;
489             }
490             };
491 0 0         if ($@) {
492 0           chomp($@);
493 0           return $self->log->error("next_scroll: failed with: [$@]");
494             }
495              
496 0           return $next;
497             }
498              
499             #
500             # Search::Elasticsearch::Client::5_0::Direct
501             #
502             sub index_document {
503 0     0 0   my $self = shift;
504 0           my ($doc, $index, $type, $hash, $id) = @_;
505              
506 0   0       $index ||= $self->index;
507 0   0       $type ||= $self->type;
508 0           my $es = $self->_es;
509 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
510 0 0         $self->brik_help_run_undef_arg('index_document', $doc) or return;
511 0 0         $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
512 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
513 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
514              
515 0           my %args = (
516             index => $index,
517             body => $doc,
518             );
519 0 0         if (defined($id)) {
520 0           $args{id} = $id;
521             }
522              
523 0 0         if ($self->use_type) {
524 0           $args{type} = $type;
525             }
526              
527 0 0         if (defined($hash)) {
528 0 0         $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH')
529             or return;
530 0           my $this_hash = { %$hash };
531 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
532 0           $this_hash->{routing} = $doc->{$hash->{routing}};
533             }
534 0           %args = ( %args, %$this_hash );
535             }
536              
537 0           my $r;
538 0           eval {
539 0           $r = $es->index(%args);
540             };
541 0 0         if ($@) {
542 0           chomp($@);
543 0           return $self->log->error("index_document: index failed for ".
544             "index [$index]: [$@]");
545             }
546              
547 0           return $r;
548             }
549              
550             #
551             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
552             #
553             sub reindex {
554 0     0 0   my $self = shift;
555 0           my ($index, $new, $type) = @_;
556              
557 0           my $es = $self->_es;
558 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
559 0 0         $self->brik_help_run_undef_arg('reindex', $index) or return;
560 0 0         $self->brik_help_run_undef_arg('reindex', $new) or return;
561              
562 0           my %args = (
563             body => {
564             conflicts => 'proceed',
565             source => { index => $index },
566             dest => { index => $new },
567             },
568             wait_for_completion => 'false', # Immediately return the task.
569             );
570              
571             # Change the type for destination doc
572 0 0         if ($self->use_type) {
573 0 0         if (defined($type)) {
574 0           $args{body}{dest}{type} = $type;
575             }
576             }
577              
578 0           my $r;
579 0           eval {
580 0           $r = $es->reindex(%args);
581             };
582 0 0         if ($@) {
583 0           chomp($@);
584 0           return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
585             }
586              
587 0           return $r;
588             }
589              
590             #
591             # List reindex tasks
592             #
593             # curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
594             #
595             # Cancel reindex task
596             #
597             # curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
598             #
599              
600             #
601             # Search::Elasticsearch::Client::6_0::Direct::Tasks
602             #
603             sub get_reindex_tasks {
604 0     0 0   my $self = shift;
605              
606 0           my $es = $self->_es;
607 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
608              
609 0           my $t = $es->tasks;
610              
611 0           my $list = $t->list;
612 0           my $nodes = $list->{nodes};
613 0 0         if (! defined($nodes)) {
614 0           return $self->log->error("get_reindex_tasks: no nodes found");
615             }
616              
617 0           my %tasks = ();
618 0           for my $node (keys %$nodes) {
619 0           for my $id (keys %{$nodes->{$node}}) {
  0            
620 0           my $tasks = $nodes->{$node}{tasks};
621 0           for my $task (keys %$tasks) {
622 0           my $action = $tasks->{$task}{action};
623 0 0 0       if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
624 0           $tasks{$task} = $tasks->{$task};
625             }
626             }
627             }
628             }
629              
630 0           return \%tasks;
631             }
632              
633             sub cancel_reindex_task {
634 0     0 0   my $self = shift;
635 0           my ($id) = @_;
636              
637 0           my $es = $self->_es;
638 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
639 0 0         $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;
640              
641 0           my $t = $es->tasks;
642              
643 0           return $t->cancel(task_id => $id);
644             }
645              
646             sub get_taskid {
647 0     0 0   my $self = shift;
648 0           my ($id) = @_;
649              
650 0           my $es = $self->_es;
651 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
652 0 0         $self->brik_help_run_undef_arg('get_taskid', $id) or return;
653              
654 0           my $t = $es->tasks;
655              
656 0           return $t->get(task_id => $id);
657             }
658              
659             sub show_reindex_progress {
660 0     0 0   my $self = shift;
661              
662 0           my $es = $self->_es;
663 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
664              
665 0 0         my $tasks = $self->get_reindex_tasks or return;
666 0 0         if (! keys %$tasks) {
667 0           $self->log->info("show_reindex_progress: no reindex task in progress");
668 0           return 0;
669             }
670              
671 0           for my $id (keys %$tasks) {
672 0 0         my $task = $self->get_taskid($id) or next;
673              
674 0           my $status = $task->{task}{status};
675 0           my $desc = $task->{task}{description};
676 0           my $total = $status->{total};
677 0           my $created = $status->{created};
678 0           my $deleted = $status->{deleted};
679 0           my $updated = $status->{updated};
680              
681 0           my $perc = ($created + $deleted + $updated) / $total * 100;
682              
683 0           printf("> Task [%s]: %.02f%%\n", $desc, $perc);
684 0           print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
685             }
686              
687 0           return 1;
688             }
689              
690             sub loop_show_reindex_progress {
691 0     0 0   my $self = shift;
692 0           my ($sec) = @_;
693              
694 0   0       $sec ||= 60;
695 0           my $es = $self->_es;
696 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
697              
698 0           while (1) {
699 0 0         $self->show_reindex_progress or return;
700 0           sleep($sec);
701             }
702              
703 0           return 1;
704             }
705              
706             sub reindex_with_mapping_from_json_file {
707 0     0 0   my $self = shift;
708 0           my ($index, $new, $file) = @_;
709              
710 0           my $es = $self->_es;
711 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
712 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
713             or return;
714 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
715 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
716 0 0         $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
717             or return;
718              
719 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
720 0 0         my $json = $fj->read($file) or return;
721              
722 0           return $self->reindex($index, $new, $json);
723             }
724              
725             #
726             # Search::Elasticsearch::Client::5_0::Direct
727             #
728             # To execute this Command using routing requires to use the correct field
729             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
730             # this would be a little bit complicated to do in an efficient way.
731             #
732             sub update_document {
733 0     0 0   my $self = shift;
734 0           my ($doc, $id, $index, $type, $hash) = @_;
735              
736 0   0       $index ||= $self->index;
737 0   0       $type ||= $self->type;
738 0           my $es = $self->_es;
739 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
740 0 0         $self->brik_help_run_undef_arg('update_document', $doc) or return;
741 0 0         $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
742 0 0         $self->brik_help_run_undef_arg('update_document', $id) or return;
743 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
744 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
745              
746 0           my %args = (
747             id => $id,
748             index => $index,
749             body => { doc => $doc },
750             );
751              
752 0 0         if ($self->use_type) {
753 0           $args{type} = $type;
754             }
755              
756 0 0         if (defined($hash)) {
757 0 0         $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
758             or return;
759 0           %args = ( %args, %$hash );
760             }
761              
762 0           my $r;
763 0           eval {
764 0           $r = $es->update(%args);
765             };
766 0 0         if ($@) {
767 0           chomp($@);
768 0           return $self->log->error("update_document: index failed for index [$index]: [$@]");
769             }
770              
771 0           return $r;
772             }
773              
774             #
775             # Search::Elasticsearch::Client::5_0::Bulk
776             #
777             sub index_bulk {
778 0     0 0   my $self = shift;
779 0           my ($doc, $index, $type, $hash, $id) = @_;
780              
781 0           my $bulk = $self->_bulk;
782 0   0       $index ||= $self->index;
783 0   0       $type ||= $self->type;
784 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
785 0 0         $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
786 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
787 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
788              
789 0           my %args = (
790             source => $doc,
791             );
792 0 0         if (defined($id)) {
793 0           $args{id} = $id;
794             }
795              
796 0 0         if (defined($hash)) {
797 0 0         $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
798 0           my $this_hash = { %$hash };
799 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
800 0           $this_hash->{routing} = $doc->{$hash->{routing}};
801             }
802 0           %args = ( %args, %$this_hash );
803             }
804              
805 0           my $r;
806 0           eval {
807 0           $r = $bulk->add_action(index => \%args);
808             };
809 0 0         if ($@) {
810 0           chomp($@);
811 0           my $p = $self->parse_error_string($@);
812 0 0 0       if (defined($p) && exists($p->{class})) {
813 0           my $class = $p->{class};
814 0           my $code = $p->{code};
815 0           my $node = $p->{node};
816 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
817             "[$class] code [$code] for node [$node]");
818             }
819             else {
820 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
821             }
822             }
823              
824 0           return $r;
825             }
826              
827             #
828             # Allows to index multiple docs at one time
829             # $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
830             #
831             sub index_bulk_from_list {
832 0     0 0   my $self = shift;
833 0           my ($list, $index, $type, $hash) = @_;
834              
835 0           my $bulk = $self->_bulk;
836 0   0       $index ||= $self->index;
837 0   0       $type ||= $self->type;
838 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
839 0 0         $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
840 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY')
841             or return;
842 0 0         $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list)
843             or return;
844 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
845 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
846              
847 0 0         if (defined($hash)) {
848 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH')
849             or return;
850             }
851              
852 0           my @args = ();
853 0           for my $doc (@$list) {
854 0           my %args = (
855             source => $doc,
856             );
857 0 0         if (defined($hash)) {
858 0           my $this_hash = { %$hash };
859 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
860 0           $this_hash->{routing} = $doc->{$hash->{routing}};
861             }
862 0           %args = ( %args, %$this_hash );
863             }
864 0           push @args, \%args;
865             }
866              
867 0           my $r;
868 0           eval {
869 0           $r = $bulk->index(@args);
870             };
871 0 0         if ($@) {
872 0           chomp($@);
873 0           my $p = $self->parse_error_string($@);
874 0 0 0       if (defined($p) && exists($p->{class})) {
875 0           my $class = $p->{class};
876 0           my $code = $p->{code};
877 0           my $node = $p->{node};
878 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
879             "[$class] code [$code] for node [$node]");
880             }
881             else {
882 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
883             }
884             }
885              
886 0           return $r;
887             }
888              
889             #
890             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
891             #
892             sub clean_deleted_from_index {
893 0     0 0   my $self = shift;
894 0           my ($index) = @_;
895              
896 0 0         $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;
897              
898 0           my $es = $self->_es;
899 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
900              
901 0           my $indices = $self->_es->indices;
902              
903 0           my $r;
904 0           eval {
905 0           $r = $indices->forcemerge(
906             index => $index,
907             only_expunge_deletes => 'true',
908             );
909             };
910 0 0         if ($@) {
911 0           chomp($@);
912 0           my $p = $self->parse_error_string($@);
913 0 0 0       if (defined($p) && exists($p->{class})) {
914 0           my $class = $p->{class};
915 0           my $code = $p->{code};
916 0           my $node = $p->{node};
917 0           return $self->log->error("clean_deleted_from_index: failed for index ".
918             "[$index] with error [$class] code [$code] for node [$node]");
919             }
920             else {
921 0           return $self->log->error("clean_deleted_from_index: index failed for ".
922             "index [$index]: [$@]");
923             }
924             }
925              
926 0           return $r;
927             }
928              
929             #
930             # To execute this Command using routing requires to use the correct field
931             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
932             # this would be a little bit complicated to do in an efficient way.
933             #
934             sub update_document_bulk {
935 0     0 0   my $self = shift;
936 0           my ($doc, $index, $type, $hash, $id) = @_;
937              
938 0           my $bulk = $self->_bulk;
939 0   0       $index ||= $self->index;
940 0   0       $type ||= $self->type;
941 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
942 0 0         $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
943 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
944 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
945              
946 0           my %args = (
947             index => $index,
948             doc => $doc,
949             );
950              
951 0 0         if ($self->use_type) {
952 0           $args{type} = $type;
953             }
954              
955 0 0         if (defined($id)) {
956 0           $args{id} = $id;
957             }
958              
959 0 0         if (defined($hash)) {
960 0 0         $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
961             or return;
962 0           %args = ( %args, %$hash );
963             }
964              
965 0           my $r;
966 0           eval {
967 0           $r = $bulk->update(\%args);
968             };
969 0 0         if ($@) {
970 0           chomp($@);
971 0           my $p = $self->parse_error_string($@);
972 0 0 0       if (defined($p) && exists($p->{class})) {
973 0           my $class = $p->{class};
974 0           my $code = $p->{code};
975 0           my $node = $p->{node};
976 0           return $self->log->error("update_document_bulk: failed for index [$index] ".
977             "with error [$class] code [$code] for node [$node]");
978             }
979             else {
980 0           return $self->log->error("update_document_bulk: index failed for ".
981             "index [$index]: [$@]");
982             }
983             }
984              
985 0           return $r;
986             }
987              
988             #
989             # We may have to call refresh_index after a bulk_flush, so we give an additional
990             # optional Argument for given index.
991             #
992             sub bulk_flush {
993 0     0 0   my $self = shift;
994 0           my ($index) = @_;
995              
996 0           my $bulk = $self->_bulk;
997 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
998              
999 0           my $try = $self->try;
1000              
1001 0           RETRY:
1002              
1003             my $r;
1004 0           eval {
1005 0           $r = $bulk->flush;
1006             };
1007 0 0         if ($@) {
1008 0           chomp($@);
1009 0 0         if (--$try == 0) {
1010 0           my $p = $self->parse_error_string($@);
1011 0 0 0       if (defined($p) && exists($p->{class})) {
1012 0           my $class = $p->{class};
1013 0           my $code = $p->{code};
1014 0           my $node = $p->{node};
1015 0           return $self->log->error("bulk_flush: failed after [$try] tries with error ".
1016             "[$class] code [$code] for node [$node]");
1017             }
1018             else {
1019 0           return $self->log->error("bulk_flush: failed after [$try]: [$@]");
1020             }
1021             }
1022 0           $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
1023             "[$@]");
1024 0           sleep 10;
1025 0           goto RETRY;
1026             }
1027              
1028 0 0         if (defined($index)) {
1029 0           $self->refresh_index($index);
1030             }
1031              
1032 0           return $r;
1033             }
1034              
1035             #
1036             # Search::Elasticsearch::Client::2_0::Direct
1037             # Search::Elasticsearch::Client::5_0::Direct
1038             #
1039             sub count {
1040 0     0 0   my $self = shift;
1041 0           my ($index, $type) = @_;
1042              
1043 0   0       $index ||= $self->index;
1044 0   0       $type ||= $self->type;
1045 0           my $es = $self->_es;
1046 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1047              
1048 0           my %args = ();
1049 0 0 0       if (defined($index) && $index ne '*') {
1050 0           $args{index} = $index;
1051             }
1052 0 0         if ($self->use_type) {
1053 0 0 0       if (defined($type) && $type ne '*') {
1054 0           $args{type} = $type;
1055             }
1056             }
1057              
1058             #$args{body} = {
1059             #query => {
1060             #match => { title => 'Elasticsearch clients' },
1061             #},
1062             #}
1063              
1064 0           my $r;
1065 0 0         my $version = $self->version or return;
1066 0 0         if ($version ge "5.0.0") {
1067 0           eval {
1068 0           $r = $es->count(%args);
1069             };
1070             }
1071             else {
1072 0           eval {
1073 0           $r = $es->search(%args);
1074             };
1075             }
1076 0 0         if ($@) {
1077 0           chomp($@);
1078 0           return $self->log->error("count: count failed for index [$index]: [$@]");
1079             }
1080              
1081 0 0 0       if ($version ge "5.0.0") {
    0          
1082 0 0         if (exists($r->{count})) {
1083 0           return $r->{count};
1084             }
1085             }
1086             elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
1087 0           return $r->{hits}{total};
1088             }
1089              
1090 0           return $self->log->error("count: nothing found");
1091             }
1092              
1093             #
1094             # https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
1095             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
1096             #
1097             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1098             #
1099             # To perform a query using routing requires to use the correct field
1100             # value directly in $hash->{routing}. We cannot "guess" it from $q,
1101             # this would be a little bit complicated to do in an efficient way.
1102             #
1103             sub query {
1104 0     0 0   my $self = shift;
1105 0           my ($query, $index, $type, $hash) = @_;
1106              
1107 0   0       $index ||= $self->index;
1108 0   0       $type ||= $self->type;
1109 0           my $es = $self->_es;
1110 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1111 0 0         $self->brik_help_run_undef_arg('query', $query) or return;
1112 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1113 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1114 0 0         $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
1115              
1116 0           my $timeout = $self->rtimeout;
1117 0           my $es_version = $self->es_version;
1118              
1119 0           my %args = (
1120             index => $index,
1121             body => $query,
1122             );
1123              
1124 0 0         if ($es_version == 7) {
1125 0           $args{track_total_hits} = 'true';
1126             }
1127              
1128 0 0         if (defined($hash)) {
1129 0 0         $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
1130 0           %args = ( %args, %$hash );
1131             }
1132              
1133 0 0         if ($self->use_type) {
1134 0 0         if ($type ne '*') {
1135 0           $args{type} = $type;
1136             }
1137             }
1138              
1139 0           my $r;
1140 0           eval {
1141 0           $r = $es->search(%args);
1142             };
1143 0 0         if ($@) {
1144 0           chomp($@);
1145 0           return $self->log->error("query: failed for index [$index]: [$@]");
1146             }
1147              
1148 0           return $r;
1149             }
1150              
1151             sub get_from_id {
1152 0     0 0   my $self = shift;
1153 0           my ($id, $index, $type) = @_;
1154              
1155 0   0       $index ||= $self->index;
1156 0   0       $type ||= $self->type;
1157 0           my $es = $self->_es;
1158 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1159 0 0         $self->brik_help_run_undef_arg('get_from_id', $id) or return;
1160 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1161 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1162              
1163 0           my $r;
1164 0           eval {
1165 0           my %this_args = (
1166             index => $index,
1167             id => $id,
1168             );
1169 0 0         if ($self->use_type) {
1170 0           $this_args{type} = $type;
1171             }
1172 0           $r = $es->get(%this_args);
1173             };
1174 0 0         if ($@) {
1175 0           chomp($@);
1176 0           return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
1177             }
1178              
1179 0           return $r;
1180             }
1181              
1182             #
1183             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
1184             #
1185             sub www_search {
1186 0     0 0   my $self = shift;
1187 0           my ($query, $index, $type) = @_;
1188              
1189 0   0       $index ||= $self->index;
1190 0   0       $type ||= $self->type;
1191 0 0         $self->brik_help_run_undef_arg('www_search', $query) or return;
1192 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1193 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1194              
1195 0           my $from = $self->from;
1196 0           my $size = $self->size;
1197              
1198 0 0         my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
1199              
1200 0           my $nodes = $self->nodes;
1201 0           for my $node (@$nodes) {
1202             # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
1203 0           my $url = "$node/$index";
1204 0 0         if ($self->use_type) {
1205 0 0         if ($type ne '*') {
1206 0           $url .= "/$type";
1207             }
1208             }
1209 0           $url .= "/_search/?from=$from&size=$size&q=".$query;
1210              
1211 0 0         my $get = $self->SUPER::get($url) or next;
1212 0           my $body = $get->{content};
1213              
1214 0 0         my $decoded = $sj->decode($body) or next;
1215              
1216 0           return $decoded;
1217             }
1218              
1219 0           return;
1220             }
1221              
1222             #
1223             # Search::Elasticsearch::Client::2_0::Direct::Indices
1224             #
1225             sub delete_index {
1226 0     0 0   my $self = shift;
1227 0           my ($index) = @_;
1228              
1229 0           my $es = $self->_es;
1230 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1231 0 0         $self->brik_help_run_undef_arg('delete_index', $index) or return;
1232 0 0         $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
1233              
1234 0           my %args = (
1235             index => $index,
1236             );
1237              
1238 0           my $r;
1239 0           eval {
1240 0           $r = $es->indices->delete(%args);
1241             };
1242 0 0         if ($@) {
1243 0           chomp($@);
1244 0           return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
1245             }
1246              
1247 0           return $r;
1248             }
1249              
1250             #
1251             # Search::Elasticsearch::Client::2_0::Direct::Indices
1252             #
1253             # To execute this Command using routing requires to use the correct field
1254             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
1255             # this would be a little bit complicated to do in an efficient way.
1256             #
1257             sub delete_document {
1258 0     0 0   my $self = shift;
1259 0           my ($index, $type, $id, $hash) = @_;
1260              
1261 0           my $es = $self->_es;
1262 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1263 0 0         $self->brik_help_run_undef_arg('delete_document', $index) or return;
1264 0 0         $self->brik_help_run_undef_arg('delete_document', $id) or return;
1265              
1266 0           my %args = (
1267             index => $index,
1268             id => $id,
1269             );
1270              
1271 0 0         if ($self->use_type) {
1272 0           $args{type} = $type;
1273             }
1274              
1275 0 0         if (defined($hash)) {
1276 0 0         $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH')
1277             or return;
1278 0           %args = ( %args, %$hash );
1279             }
1280              
1281 0           my $r;
1282 0           eval {
1283 0           $r = $es->delete(%args);
1284             };
1285 0 0         if ($@) {
1286 0           chomp($@);
1287 0           return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
1288             }
1289              
1290 0           return $r;
1291             }
1292              
1293             #
1294             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
1295             #
1296             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1297             #
1298             sub delete_by_query {
1299 0     0 0   my $self = shift;
1300 0           my ($query, $index, $type, $proceed) = @_;
1301              
1302 0           my $es = $self->_es;
1303 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1304 0 0         $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
1305 0 0         $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
1306 0 0         $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
1307 0 0         $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
1308              
1309 0           my $timeout = $self->rtimeout;
1310              
1311 0           my %args = (
1312             index => $index,
1313             body => $query,
1314             );
1315              
1316 0 0         if ($self->use_type) {
1317 0           $args{type} = $type;
1318             }
1319              
1320 0 0 0       if (defined($proceed) && $proceed) {
1321 0           $args{conflicts} = 'proceed';
1322             }
1323              
1324 0           my $r;
1325 0           eval {
1326 0           $r = $es->delete_by_query(%args);
1327             };
1328 0 0         if ($@) {
1329 0           chomp($@);
1330 0           return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
1331             }
1332              
1333             # This may fail, we ignore it.
1334 0           $self->refresh_index($index);
1335              
1336 0           return $r;
1337             }
1338              
1339             #
1340             # Search::Elasticsearch::Client::2_0::Direct::Cat
1341             #
1342             # https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
1343             #
1344             sub show_indices {
1345 0     0 0   my $self = shift;
1346 0           my ($string) = @_;
1347              
1348 0           my $es = $self->_es;
1349 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1350              
1351 0           my $r;
1352 0           eval {
1353 0           $r = $es->cat->indices;
1354             };
1355 0 0         if ($@) {
1356 0           chomp($@);
1357 0           return $self->log->error("show_indices: failed: [$@]");
1358             }
1359              
1360 0           my @lines = split(/\n/, $r);
1361              
1362 0 0         if (@lines == 0) {
1363 0           $self->log->warning("show_indices: nothing returned, no index?");
1364             }
1365              
1366 0           my @filtered = ();
1367 0 0         if (defined($string)) {
1368 0           for (@lines) {
1369 0 0         if (m{$string}) {
1370 0           push @filtered, $_;
1371             }
1372             }
1373 0           @lines = @filtered;
1374             }
1375              
1376 0           return \@lines;
1377             }
1378              
1379             #
1380             # Search::Elasticsearch::Client::2_0::Direct::Cat
1381             #
1382             sub show_nodes {
1383 0     0 0   my $self = shift;
1384              
1385 0           my $es = $self->_es;
1386 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1387              
1388 0           my $r;
1389 0           eval {
1390 0           $r = $es->cat->nodes;
1391             };
1392 0 0         if ($@) {
1393 0           chomp($@);
1394 0           return $self->log->error("show_nodes: failed: [$@]");
1395             }
1396              
1397 0           my @lines = split(/\n/, $r);
1398              
1399 0 0         if (@lines == 0) {
1400 0           $self->log->warning("show_nodes: nothing returned, no nodes?");
1401             }
1402              
1403 0           return \@lines;
1404             }
1405              
1406             #
1407             # Search::Elasticsearch::Client::2_0::Direct::Cat
1408             #
1409             sub show_health {
1410 0     0 0   my $self = shift;
1411              
1412 0           my $es = $self->_es;
1413 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1414              
1415 0           my $r;
1416 0           eval {
1417 0           $r = $es->cat->health;
1418             };
1419 0 0         if ($@) {
1420 0           chomp($@);
1421 0           return $self->log->error("show_health: failed: [$@]");
1422             }
1423              
1424 0           my @lines = split(/\n/, $r);
1425              
1426 0 0         if (@lines == 0) {
1427 0           $self->log->warning("show_health: nothing returned, no recovery?");
1428             }
1429              
1430 0           return \@lines;
1431             }
1432              
1433             #
1434             # Search::Elasticsearch::Client::2_0::Direct::Cat
1435             #
1436             sub show_recovery {
1437 0     0 0   my $self = shift;
1438              
1439 0           my $es = $self->_es;
1440 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1441              
1442 0           my $r;
1443 0           eval {
1444 0           $r = $es->cat->recovery;
1445             };
1446 0 0         if ($@) {
1447 0           chomp($@);
1448 0           return $self->log->error("show_recovery: failed: [$@]");
1449             }
1450              
1451 0           my @lines = split(/\n/, $r);
1452              
1453 0 0         if (@lines == 0) {
1454 0           $self->log->warning("show_recovery: nothing returned, no index?");
1455             }
1456              
1457 0           return \@lines;
1458             }
1459              
1460             #
1461             # curl -s 'localhost:9200/_cat/allocation?v'
1462             #
1463             sub show_allocation {
1464 0     0 0   my $self = shift;
1465              
1466 0           my $es = $self->_es;
1467 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1468              
1469 0           my $r;
1470 0           eval {
1471 0           $r = $es->cat->allocation;
1472             };
1473 0 0         if ($@) {
1474 0           chomp($@);
1475 0           return $self->log->error("show_allocation: failed: [$@]");
1476             }
1477              
1478 0           my @lines = split(/\n/, $r);
1479              
1480 0 0         if (@lines == 0) {
1481 0           $self->log->warning("show_allocation: nothing returned, no index?");
1482             }
1483              
1484 0           return \@lines;
1485             }
1486              
1487             sub list_indices {
1488 0     0 0   my $self = shift;
1489 0           my ($regex) = @_;
1490              
1491 0 0         my $get = $self->get_indices or return;
1492              
1493 0           my @indices = ();
1494 0           for (@$get) {
1495 0 0         if (defined($regex)) {
1496 0 0         if ($_->{index} =~ m{$regex}) {
1497 0           push @indices, $_->{index};
1498             }
1499             }
1500             else {
1501 0           push @indices, $_->{index};
1502             }
1503             }
1504              
1505 0           return [ sort { $a cmp $b } @indices ];
  0            
1506             }
1507              
1508             sub get_indices {
1509 0     0 0   my $self = shift;
1510 0           my ($string) = @_;
1511              
1512 0 0         my $lines = $self->show_indices($string) or return;
1513 0 0         if (@$lines == 0) {
1514 0           $self->log->warning("get_indices: no index found");
1515 0           return [];
1516             }
1517              
1518             #
1519             # Format depends on ElasticSearch version. We try to detect the format.
1520             #
1521             # 5.0.0:
1522             # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1 146 0 251.8kb 251.8kb"
1523             #
1524 0           my @indices = ();
1525 0           for (@$lines) {
1526 0           my @t = split(/\s+/);
1527 0 0         if (@t == 10) { # Version 5.0.0
    0          
    0          
1528 0           my $color = $t[0];
1529 0           my $state = $t[1];
1530 0           my $index = $t[2];
1531 0           my $id = $t[3];
1532 0           my $shards = $t[4];
1533 0           my $replicas = $t[5];
1534 0           my $count = $t[6];
1535 0           my $count2 = $t[7];
1536 0           my $total_size = $t[8];
1537 0           my $size = $t[9];
1538 0           push @indices, {
1539             color => $color,
1540             state => $state,
1541             index => $index,
1542             id => $id,
1543             shards => $shards,
1544             replicas => $replicas,
1545             count => $count,
1546             total_size => $total_size,
1547             size => $size,
1548             };
1549             }
1550             elsif (@t == 9) {
1551 0           my $index = $t[2];
1552 0           push @indices, {
1553             index => $index,
1554             };
1555             }
1556             elsif (@t == 8) {
1557 0           my $index = $t[1];
1558 0           push @indices, {
1559             index => $index,
1560             };
1561             }
1562             }
1563              
1564 0           return \@indices;
1565             }
1566              
1567             #
1568             # Search::Elasticsearch::Client::5_0::Direct::Indices
1569             #
1570             sub get_index {
1571 0     0 0   my $self = shift;
1572 0           my ($index) = @_;
1573            
1574 0           my $es = $self->_es;
1575 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1576 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1577 0 0         $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1578              
1579 0           my %args = (
1580             index => $index,
1581             );
1582              
1583 0           my $r;
1584 0           eval {
1585 0           $r = $es->indices->get(%args);
1586             };
1587 0 0         if ($@) {
1588 0           chomp($@);
1589 0           return $self->log->error("get_index: get failed for index [$index]: [$@]");
1590             }
1591              
1592 0           return $r;
1593             }
1594              
1595             sub get_index_stats {
1596 0     0 0   my $self = shift;
1597 0           my ($index) = @_;
1598              
1599 0           my $es = $self->_es;
1600 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1601 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1602              
1603 0           my %args = (
1604             index => $index,
1605             );
1606              
1607 0           my $r;
1608 0           eval {
1609 0           $r = $es->indices->stats(%args);
1610             };
1611 0 0         if ($@) {
1612 0           chomp($@);
1613 0           return $self->log->error("get_index_stats: get failed for index [$index]: ".
1614             "[$@]");
1615             }
1616              
1617 0           return $r->{indices}{$index};
1618             }
1619              
1620             sub list_index_types {
1621 0     0 0   my $self = shift;
1622 0           my ($index) = @_;
1623              
1624 0           my $es = $self->_es;
1625 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1626 0 0         $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1627 0 0         $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1628              
1629 0 0         my $r = $self->get_mappings($index) or return;
1630 0 0         if (keys %$r > 1) {
1631 0           return $self->log->error("list_index_types: multiple indices found, choose one");
1632             }
1633              
1634 0           my @types = ();
1635 0           for my $this_index (keys %$r) {
1636 0           my $mappings = $r->{$this_index}{mappings};
1637 0           push @types, keys %$mappings;
1638             }
1639              
1640 0           my %uniq = map { $_ => 1 } @types;
  0            
1641              
1642 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1643             }
1644              
1645             #
1646             # By default, if you provide only one index and no type,
1647             # all types will be merged (including _default_)
1648             # If you specify one type (other than _default_), _default_ will be merged to it.
1649             #
1650             sub list_index_fields {
1651 0     0 0   my $self = shift;
1652 0           my ($index, $type) = @_;
1653              
1654 0           my $es = $self->_es;
1655 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1656 0 0         $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1657 0 0         $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1658              
1659 0           my $r;
1660 0 0         if (defined($type)) {
1661 0 0         $r = $self->get_mappings($index, $type) or return;
1662 0 0         if (keys %$r > 1) {
1663 0           return $self->log->error("list_index_fields: multiple indices found, ".
1664             "choose one");
1665             }
1666             # _default_ mapping may not exists.
1667 0 0         if ($self->is_mapping_exists($index, '_default_')) {
1668 0           my $r2 = $self->get_mappings($index, '_default_');
1669             # Merge
1670 0           for my $this_index (keys %$r2) {
1671 0           my $default = $r2->{$this_index}{mappings}{'_default_'};
1672 0           $r->{$this_index}{mappings}{_default_} = $default;
1673             }
1674             }
1675             }
1676             else {
1677 0 0         $r = $self->get_mappings($index) or return;
1678 0 0         if (keys %$r > 1) {
1679 0           return $self->log->error("list_index_fields: multiple indices found, ".
1680             "choose one");
1681             }
1682             }
1683              
1684 0           my @fields = ();
1685 0           for my $this_index (keys %$r) {
1686 0           my $mappings = $r->{$this_index}{mappings};
1687 0           for my $this_type (keys %$mappings) {
1688 0           my $properties = $mappings->{$this_type}{properties};
1689 0           push @fields, keys %$properties;
1690             }
1691             }
1692              
1693 0           my %uniq = map { $_ => 1 } @fields;
  0            
1694              
1695 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1696             }
1697              
1698             sub list_indices_version {
1699 0     0 0   my $self = shift;
1700 0           my ($index) = @_;
1701              
1702 0           my $es = $self->_es;
1703 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1704 0 0         $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1705 0 0         $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1706             or return;
1707              
1708 0 0         my $r = $self->get_index($index) or return;
1709              
1710 0           my @list = ();
1711 0           for my $this (keys %$r) {
1712 0           my $name = $this;
1713 0           my $version = $r->{$this}{settings}{index}{version}{created};
1714 0           push @list, {
1715             index => $name,
1716             version => $version,
1717             };
1718             }
1719              
1720 0           return \@list;
1721             }
1722              
1723             sub open_index {
1724 0     0 0   my $self = shift;
1725 0           my ($index) = @_;
1726              
1727 0           my $es = $self->_es;
1728 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1729 0 0         $self->brik_help_run_undef_arg('open_index', $index) or return;
1730 0 0         $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1731              
1732 0           my $r;
1733 0           eval {
1734 0           $r = $es->indices->open(
1735             index => $index,
1736             );
1737             };
1738 0 0         if ($@) {
1739 0           chomp($@);
1740 0           return $self->log->error("open_index: failed: [$@]");
1741             }
1742              
1743 0           return $r;
1744             }
1745              
1746             sub close_index {
1747 0     0 0   my $self = shift;
1748 0           my ($index) = @_;
1749              
1750 0           my $es = $self->_es;
1751 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1752 0 0         $self->brik_help_run_undef_arg('close_index', $index) or return;
1753 0 0         $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1754              
1755 0           my $r;
1756 0           eval {
1757 0           $r = $es->indices->close(
1758             index => $index,
1759             );
1760             };
1761 0 0         if ($@) {
1762 0           chomp($@);
1763 0           return $self->log->error("close_index: failed: [$@]");
1764             }
1765              
1766 0           return $r;
1767             }
1768              
1769             #
1770             # Search::Elasticsearch::Client::5_0::Direct::Indices
1771             #
1772             sub get_aliases {
1773 0     0 0   my $self = shift;
1774 0           my ($index) = @_;
1775              
1776 0   0       $index ||= $self->index;
1777 0           my $es = $self->_es;
1778 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1779              
1780             #
1781             # [DEPRECATION] [types removal] The parameter include_type_name should be
1782             # explicitly specified in get indices requests to prepare for 7.0. In 7.0
1783             # include_type_name will default to 'false', which means responses will
1784             # omit the type name in mapping definitions. - In request: {body => undef,
1785             # ignore => [],method => "GET",path => "/*",qs => {},serialize => "std"}
1786             #
1787              
1788 0           my %args = (
1789             index => $index,
1790             params => { include_type_name => 'false' },
1791             );
1792              
1793 0           my $r;
1794 0           eval {
1795 0           $r = $es->indices->get(%args);
1796             };
1797 0 0         if ($@) {
1798 0           chomp($@);
1799 0           return $self->log->error("get_aliases: get_aliases failed: [$@]");
1800             }
1801              
1802 0           my %aliases = ();
1803 0           for my $this (keys %$r) {
1804 0           $aliases{$this} = $r->{$this}{aliases};
1805             }
1806              
1807 0           return \%aliases;
1808             }
1809              
1810             #
1811             # Search::Elasticsearch::Client::5_0::Direct::Indices
1812             #
1813             sub put_alias {
1814 0     0 0   my $self = shift;
1815 0           my ($index, $alias) = @_;
1816              
1817 0           my $es = $self->_es;
1818 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1819 0 0         $self->brik_help_run_undef_arg('put_alias', $index) or return;
1820 0 0         $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1821              
1822 0           my %args = (
1823             index => $index,
1824             name => $alias,
1825             );
1826              
1827 0           my $r;
1828 0           eval {
1829 0           $r = $es->indices->put_alias(%args);
1830             };
1831 0 0         if ($@) {
1832 0           chomp($@);
1833 0           return $self->log->error("put_alias: put_alias failed: [$@]");
1834             }
1835              
1836 0           return $r;
1837             }
1838              
1839             #
1840             # Search::Elasticsearch::Client::5_0::Direct::Indices
1841             #
1842             sub delete_alias {
1843 0     0 0   my $self = shift;
1844 0           my ($index, $alias) = @_;
1845              
1846 0           my $es = $self->_es;
1847 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1848 0 0         $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1849 0 0         $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1850              
1851 0           my %args = (
1852             index => $index,
1853             name => $alias,
1854             );
1855              
1856 0           my $r;
1857 0           eval {
1858 0           $r = $es->indices->delete_alias(%args);
1859             };
1860 0 0         if ($@) {
1861 0           chomp($@);
1862 0           return $self->log->error("delete_alias: delete_alias failed: [$@]");
1863             }
1864              
1865 0           return $r;
1866             }
1867              
1868             sub update_alias {
1869 0     0 0   my $self = shift;
1870 0           my ($new_index, $alias) = @_;
1871              
1872 0           my $es = $self->_es;
1873 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1874 0 0         $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1875 0 0         $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1876              
1877             # Search for previous index with that alias, if any.
1878 0           my $prev_index;
1879 0 0         my $aliases = $self->get_aliases or return;
1880 0           while (my ($k, $v) = each %$aliases) {
1881 0           for my $this (keys %$v) {
1882 0 0         if ($this eq $alias) {
1883 0           $prev_index = $k;
1884 0           last;
1885             }
1886             }
1887 0 0         last if $prev_index;
1888             }
1889              
1890             # Delete previous alias if it exists.
1891 0 0         if (defined($prev_index)) {
1892 0 0         $self->delete_alias($prev_index, $alias) or return;
1893             }
1894              
1895 0           return $self->put_alias($new_index, $alias);
1896             }
1897              
1898             sub is_mapping_exists {
1899 0     0 0   my $self = shift;
1900 0           my ($index, $mapping) = @_;
1901              
1902 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1903 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1904              
1905 0 0         if (! $self->is_index_exists($index)) {
1906 0           return 0;
1907             }
1908              
1909 0 0         my $all = $self->get_mappings($index) or return;
1910 0           for my $this_index (keys %$all) {
1911 0           my $mappings = $all->{$this_index}{mappings};
1912 0           for my $this_mapping (keys %$mappings) {
1913 0 0         if ($this_mapping eq $mapping) {
1914 0           return 1;
1915             }
1916             }
1917             }
1918              
1919 0           return 0;
1920             }
1921              
1922             #
1923             # Search::Elasticsearch::Client::2_0::Direct::Indices
1924             #
1925             sub get_mappings {
1926 0     0 0   my $self = shift;
1927 0           my ($index, $type) = @_;
1928              
1929 0           my $es = $self->_es;
1930 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1931 0 0         $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1932 0 0         $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1933              
1934 0           my %args = (
1935             index => $index,
1936             params => { include_type_name => 'false' },
1937             );
1938              
1939 0 0         if ($self->use_type) {
1940 0           $args{type} = $type;
1941             }
1942              
1943 0           my $r;
1944 0           eval {
1945 0           $r = $es->indices->get_mapping(%args);
1946             };
1947 0 0         if ($@) {
1948 0           chomp($@);
1949 0           return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1950             "[$@]");
1951             }
1952              
1953 0           return $r;
1954             }
1955              
1956             #
1957             # Search::Elasticsearch::Client::2_0::Direct::Indices
1958             #
1959             sub create_index {
1960 0     0 0   my $self = shift;
1961 0           my ($index, $shards) = @_;
1962              
1963 0           my $es = $self->_es;
1964 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1965 0 0         $self->brik_help_run_undef_arg('create_index', $index) or return;
1966              
1967 0           my %args = (
1968             index => $index,
1969             );
1970              
1971 0 0         if (defined($shards)) {
1972 0           $args{body}{settings}{index}{number_of_shards} = $shards;
1973             }
1974              
1975 0           my $r;
1976 0           eval {
1977 0           $r = $es->indices->create(%args);
1978             };
1979 0 0         if ($@) {
1980 0           chomp($@);
1981 0           return $self->log->error("create_index: create failed ".
1982             "for index [$index]: [$@]");
1983             }
1984            
1985 0           return $r;
1986             }
1987              
1988             #
1989             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1990             #
1991             sub create_index_with_mappings {
1992 0     0 0   my $self = shift;
1993 0           my ($index, $mappings) = @_;
1994              
1995 0           my $es = $self->_es;
1996 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1997 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1998 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1999 0 0         $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
2000             or return;
2001              
2002 0           my $r;
2003 0           eval {
2004 0           $r = $es->indices->create(
2005             index => $index,
2006             body => {
2007             mappings => $mappings,
2008             },
2009             );
2010             };
2011 0 0         if ($@) {
2012 0           chomp($@);
2013 0           return $self->log->error("create_index_with_mappings: create failed for ".
2014             "index [$index]: [$@]");
2015             }
2016              
2017 0           return $r;
2018             }
2019              
2020             # GET http://localhost:9200/
2021             sub info {
2022 0     0 0   my $self = shift;
2023 0           my ($nodes) = @_;
2024              
2025 0   0       $nodes ||= $self->nodes;
2026 0 0         $self->brik_help_run_undef_arg('info', $nodes) or return;
2027 0 0         $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
2028 0 0         $self->brik_help_run_empty_array_arg('info', $nodes) or return;
2029              
2030 0           my $first = $nodes->[0];
2031              
2032 0 0         $self->get($first) or return;
2033              
2034 0           return $self->content;
2035             }
2036              
2037             sub version {
2038 0     0 0   my $self = shift;
2039 0           my ($nodes) = @_;
2040              
2041 0   0       $nodes ||= $self->nodes;
2042 0 0         $self->brik_help_run_undef_arg('version', $nodes) or return;
2043 0 0         $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
2044 0 0         $self->brik_help_run_empty_array_arg('version', $nodes) or return;
2045              
2046 0           my $first = $nodes->[0];
2047              
2048 0 0         $self->get($first) or return;
2049 0 0         my $content = $self->content or return;
2050              
2051 0           return $content->{version}{number};
2052             }
2053              
2054             #
2055             # Search::Elasticsearch::Client::2_0::Direct::Indices
2056             #
2057             sub get_templates {
2058 0     0 0   my $self = shift;
2059              
2060 0           my $es = $self->_es;
2061 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2062              
2063 0           my $r;
2064 0           eval {
2065 0           $r = $es->indices->get_template;
2066             };
2067 0 0         if ($@) {
2068 0           chomp($@);
2069 0           return $self->log->error("get_templates: failed: [$@]");
2070             }
2071              
2072 0           return $r;
2073             }
2074              
2075             sub list_templates {
2076 0     0 0   my $self = shift;
2077              
2078 0 0         my $content = $self->get_templates or return;
2079              
2080 0           return [ sort { $a cmp $b } keys %$content ];
  0            
2081             }
2082              
2083             #
2084             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2085             #
2086             sub get_template {
2087 0     0 0   my $self = shift;
2088 0           my ($template) = @_;
2089              
2090 0           my $es = $self->_es;
2091 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2092 0 0         $self->brik_help_run_undef_arg('get_template', $template) or return;
2093              
2094 0           my $r;
2095 0           eval {
2096 0           $r = $es->indices->get_template(
2097             name => $template,
2098             );
2099             };
2100 0 0         if ($@) {
2101 0           chomp($@);
2102 0           return $self->log->error("get_template: template failed for name [$template]: [$@]");
2103             }
2104              
2105 0           return $r;
2106             }
2107              
2108             sub put_mapping {
2109 0     0 0   my $self = shift;
2110 0           my ($index, $type, $mapping) = @_;
2111              
2112 0           my $es = $self->_es;
2113 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2114 0 0         $self->brik_help_run_undef_arg('put_mapping', $index) or return;
2115 0 0         $self->brik_help_run_undef_arg('put_mapping', $type) or return;
2116 0 0         $self->brik_help_run_undef_arg('put_mapping', $mapping) or return;
2117 0 0         $self->brik_help_run_invalid_arg('put_mapping', $mapping, 'HASH')
2118             or return;
2119              
2120 0           my $r;
2121 0           eval {
2122 0           my %this_args = (
2123             index => $index,
2124             body => $mapping,
2125             );
2126 0 0         if ($self->use_type) {
2127 0           $this_args{type} = $type;
2128             }
2129 0           $r = $es->indices->put_mapping(%this_args);
2130             };
2131 0 0         if ($@) {
2132 0           chomp($@);
2133 0           return $self->log->error("put_mapping: mapping failed ".
2134             "for index [$index]: [$@]");
2135             }
2136              
2137 0           return $r;
2138             }
2139              
2140             #
2141             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2142             #
2143             sub put_template {
2144 0     0 0   my $self = shift;
2145 0           my ($name, $template) = @_;
2146              
2147 0           my $es = $self->_es;
2148 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2149 0 0         $self->brik_help_run_undef_arg('put_template', $name) or return;
2150 0 0         $self->brik_help_run_undef_arg('put_template', $template) or return;
2151 0 0         $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
2152             or return;
2153              
2154 0           my $r;
2155 0           eval {
2156 0           $r = $es->indices->put_template(
2157             name => $name,
2158             body => $template,
2159             );
2160             };
2161 0 0         if ($@) {
2162 0           chomp($@);
2163 0           return $self->log->error("put_template: template failed ".
2164             "for name [$name]: [$@]");
2165             }
2166              
2167 0           return $r;
2168             }
2169              
2170             sub put_mapping_from_json_file {
2171 0     0 0   my $self = shift;
2172 0           my ($index, $type, $json_file) = @_;
2173              
2174 0           my $es = $self->_es;
2175 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2176 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
2177             or return;
2178 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
2179             or return;
2180 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
2181             or return;
2182 0 0         $self->brik_help_run_file_not_found('put_mapping_from_json_file',
2183             $json_file) or return;
2184              
2185 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2186 0 0         my $data = $fj->read($json_file) or return;
2187              
2188 0 0         if (! exists($data->{mappings})) {
2189 0           return $self->log->error("put_mapping_from_json_file: no mapping ".
2190             "data found");
2191             }
2192              
2193 0           return $self->put_mapping($index, $type, $data->{mappings});
2194             }
2195              
2196             sub update_mapping_from_json_file {
2197 0     0 0   my $self = shift;
2198 0           my ($json_file, $index, $type) = @_;
2199              
2200 0           my $es = $self->_es;
2201 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2202 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2203             $json_file) or return;
2204 0 0         $self->brik_help_run_file_not_found('update_mapping_from_json_file',
2205             $json_file) or return;
2206 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2207             $type) or return;
2208 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2209             $index) or return;
2210              
2211 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2212 0 0         my $data = $fj->read($json_file) or return;
2213              
2214 0 0         if (! exists($data->{mappings})) {
2215 0           return $self->log->error("update_mapping_from_json_file: ".
2216             "no data found");
2217             }
2218              
2219 0           my $mappings = $data->{mappings};
2220              
2221 0           return $self->put_mapping($index, $type, $mappings);
2222             }
2223              
2224             sub put_template_from_json_file {
2225 0     0 0   my $self = shift;
2226 0           my ($json_file, $name) = @_;
2227              
2228 0           my $es = $self->_es;
2229 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2230 0 0         $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
2231             or return;
2232 0 0         $self->brik_help_run_file_not_found('put_template_from_json_file',
2233             $json_file) or return;
2234              
2235 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2236 0 0         my $data = $fj->read($json_file) or return;
2237              
2238 0 0         if (!defined($name)) {
2239 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2240             }
2241              
2242 0 0         if (! defined($name)) {
2243 0           return $self->log->error("put_template_from_json_file: no template ".
2244             "name found");
2245             }
2246              
2247 0           return $self->put_template($name, $data);
2248             }
2249              
2250             sub update_template_from_json_file {
2251 0     0 0   my $self = shift;
2252 0           my ($json_file, $name) = @_;
2253              
2254 0           my $es = $self->_es;
2255 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2256 0 0         $self->brik_help_run_undef_arg('update_template_from_json_file',
2257             $json_file) or return;
2258 0 0         $self->brik_help_run_file_not_found('update_template_from_json_file',
2259             $json_file) or return;
2260              
2261 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2262 0 0         my $data = $fj->read($json_file) or return;
2263              
2264 0 0         if (!defined($name)) {
2265 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2266             }
2267              
2268 0 0         if (! defined($name)) {
2269 0           return $self->log->error("put_template_from_json_file: no template ".
2270             "name found");
2271             }
2272              
2273             # We ignore errors, template may not exist.
2274 0           $self->delete_template($name);
2275              
2276 0           return $self->put_template($name, $data);
2277             }
2278              
2279             #
2280             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2281             # Search::Elasticsearch::Client::2_0::Direct::Indices
2282             #
2283             sub get_settings {
2284 0     0 0   my $self = shift;
2285 0           my ($indices, $names) = @_;
2286              
2287 0           my $es = $self->_es;
2288 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2289              
2290 0           my %args = ();
2291 0 0         if (defined($indices)) {
2292 0 0         $self->brik_help_run_undef_arg('get_settings', $indices) or return;
2293 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
2294             or return;
2295 0           $args{index} = $indices;
2296             }
2297 0 0         if (defined($names)) {
2298 0 0         $self->brik_help_run_file_not_found('get_settings', $names) or return;
2299 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
2300             or return;
2301 0           $args{name} = $names;
2302             }
2303              
2304 0           my $r;
2305 0           eval {
2306 0           $r = $es->indices->get_settings(%args);
2307             };
2308 0 0         if ($@) {
2309 0           chomp($@);
2310 0           return $self->log->error("get_settings: failed: [$@]");
2311             }
2312              
2313 0           return $r;
2314             }
2315              
2316             #
2317             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2318             # Search::Elasticsearch::Client::2_0::Direct::Indices
2319             #
2320             # Example:
2321             #
2322             # run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
2323             #
2324             # XXX: should be renamed to put_index_settings
2325             #
2326             sub put_settings {
2327 0     0 0   my $self = shift;
2328 0           my ($settings, $indices) = @_;
2329              
2330 0           my $es = $self->_es;
2331 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2332 0 0         $self->brik_help_run_undef_arg('put_settings', $settings) or return;
2333 0 0         $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
2334              
2335 0           my %args = (
2336             body => $settings,
2337             );
2338 0 0         if (defined($indices)) {
2339 0 0         $self->brik_help_run_undef_arg('put_settings', $indices) or return;
2340 0 0         my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
2341             or return;
2342 0           $args{index} = $indices;
2343             }
2344              
2345 0           my $r;
2346 0           eval {
2347 0           $r = $es->indices->put_settings(%args);
2348             };
2349 0 0         if ($@) {
2350 0           chomp($@);
2351 0           return $self->log->error("put_settings: failed: [$@]");
2352             }
2353              
2354 0           return $r;
2355             }
2356              
2357             sub set_index_readonly {
2358 0     0 0   my $self = shift;
2359 0           my ($indices, $bool) = @_;
2360              
2361 0           my $es = $self->_es;
2362 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2363 0 0         $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
2364 0 0         $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
2365             or return;
2366              
2367 0 0         if (! defined($bool)) {
2368 0           $bool = 'true';
2369             }
2370             else {
2371 0 0         $bool = $bool ? 'true' : 'false';
2372             }
2373              
2374 0           my $settings = {
2375             'blocks.read_only' => $bool,
2376             'blocks.read_only_allow_delete' => 'true',
2377             };
2378              
2379 0           return $self->put_settings($settings, $indices);
2380             }
2381              
2382             #
2383             # curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
2384             # PUT synscan-2018-05/_settings
2385             # {
2386             # "index": {
2387             # "blocks":{
2388             # "read_only":"false",
2389             # "read_only_allow_delete":"true"
2390             # }
2391             # }
2392             #}
2393             #
2394             #
2395             # If it fails with the following error:
2396             #
2397             # [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
2398             #
2399             # Use Kibana dev console and copy/paste both requests:
2400             #
2401             # PUT _all/_settings
2402             # {
2403             # "index": {
2404             # "blocks": {
2405             # "read_only_allow_delete": "false"
2406             # }
2407             # }
2408             # }
2409             #
2410             sub reset_index_readonly {
2411 0     0 0   my $self = shift;
2412 0           my ($indices) = @_;
2413              
2414 0   0       $indices ||= '*';
2415 0           my $es = $self->_es;
2416 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2417 0 0         $self->brik_help_run_invalid_arg('reset_index_readonly', $indices,
2418             'ARRAY', 'SCALAR') or return;
2419              
2420 0           my $settings = {
2421             blocks => {
2422             read_only_allow_delete => 'false',
2423             },
2424             };
2425              
2426             # Settings on '*' indices should be enough to reset for everyone.
2427 0           my $r = $self->put_settings($settings, $indices);
2428             #$self->log->info(Data::Dumper::Dumper($r));
2429              
2430 0           return 1;
2431             }
2432              
2433             sub list_index_readonly {
2434 0     0 0   my $self = shift;
2435              
2436 0           my $es = $self->_es;
2437 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2438              
2439 0 0         my $list = $self->list_indices or return;
2440              
2441 0           my @indices = ();
2442 0           for my $this (@$list) {
2443 0 0         my $ro = $self->get_index_readonly($this) or next;
2444 0 0         if (defined($ro->{index}{provided_name})) {
2445 0           push @indices, $ro->{index}{provided_name};
2446             }
2447             }
2448              
2449 0           return \@indices;
2450             }
2451              
2452             sub set_index_number_of_replicas {
2453 0     0 0   my $self = shift;
2454 0           my ($indices, $number) = @_;
2455              
2456 0           my $es = $self->_es;
2457 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2458 0 0         $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
2459 0 0         $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2460             or return;
2461              
2462 0           my $settings = { number_of_replicas => $number };
2463              
2464 0           return $self->put_settings($settings, $indices);
2465             }
2466              
2467             sub set_index_refresh_interval {
2468 0     0 0   my $self = shift;
2469 0           my ($indices, $number) = @_;
2470              
2471 0           my $es = $self->_es;
2472 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2473 0 0         $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
2474 0 0         $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2475             or return;
2476              
2477             # If there is a meaningful value not postfixed with a unity,
2478             # we default to add a `s' for a number of seconds.
2479 0 0 0       if ($number =~ /^\d+$/ && $number > 0) {
2480 0           $number .= 's';
2481             }
2482              
2483 0           my $settings = { refresh_interval => $number };
2484              
2485 0           return $self->put_settings($settings, $indices);
2486             }
2487              
2488             sub get_index_settings {
2489 0     0 0   my $self = shift;
2490 0           my ($indices) = @_;
2491              
2492 0           my $es = $self->_es;
2493 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2494 0 0         $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
2495 0 0         $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
2496             or return;
2497              
2498 0           my $settings = $self->get_settings($indices);
2499              
2500 0           my %indices = ();
2501 0           for (keys %$settings) {
2502 0           $indices{$_} = $settings->{$_}{settings};
2503             }
2504              
2505 0           return \%indices;
2506             }
2507              
2508             sub get_index_readonly {
2509 0     0 0   my $self = shift;
2510 0           my ($indices) = @_;
2511              
2512 0           my $es = $self->_es;
2513 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2514 0 0         $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
2515 0 0         $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
2516             or return;
2517              
2518 0           my $settings = $self->get_settings($indices);
2519              
2520 0           my %indices = ();
2521 0           for (keys %$settings) {
2522             #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
2523 0           $indices{$_} = $settings->{$_}{settings};
2524             }
2525              
2526 0           return \%indices;
2527             }
2528              
2529             sub get_index_number_of_replicas {
2530 0     0 0   my $self = shift;
2531 0           my ($indices) = @_;
2532              
2533 0           my $es = $self->_es;
2534 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2535 0 0         $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
2536 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2537             or return;
2538              
2539 0           my $settings = $self->get_settings($indices);
2540              
2541 0           my %indices = ();
2542 0           for (keys %$settings) {
2543 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
2544             }
2545              
2546 0           return \%indices;
2547             }
2548              
2549             sub get_index_refresh_interval {
2550 0     0 0   my $self = shift;
2551 0           my ($indices, $number) = @_;
2552              
2553 0           my $es = $self->_es;
2554 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2555 0 0         $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
2556 0 0         $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2557             or return;
2558              
2559 0           my $settings = $self->get_settings($indices);
2560              
2561 0           my %indices = ();
2562 0           for (keys %$settings) {
2563 0           $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
2564             }
2565              
2566 0           return \%indices;
2567             }
2568              
2569             sub get_index_number_of_shards {
2570 0     0 0   my $self = shift;
2571 0           my ($indices, $number) = @_;
2572              
2573 0           my $es = $self->_es;
2574 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2575 0 0         $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
2576 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
2577             or return;
2578              
2579 0           my $settings = $self->get_settings($indices);
2580              
2581 0           my %indices = ();
2582 0           for (keys %$settings) {
2583 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
2584             }
2585              
2586 0           return \%indices;
2587             }
2588              
2589             #
2590             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2591             #
2592             sub delete_template {
2593 0     0 0   my $self = shift;
2594 0           my ($name) = @_;
2595              
2596 0           my $es = $self->_es;
2597 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2598 0 0         $self->brik_help_run_undef_arg('delete_template', $name) or return;
2599              
2600 0           my $r;
2601 0           eval {
2602 0           $r = $es->indices->delete_template(
2603             name => $name,
2604             );
2605             };
2606 0 0         if ($@) {
2607 0           chomp($@);
2608 0           return $self->log->error("delete_template: failed for name [$name]: [$@]");
2609             }
2610              
2611 0           return $r;
2612             }
2613              
2614             #
2615             # Return a boolean to state for index existence
2616             #
2617             sub is_index_exists {
2618 0     0 0   my $self = shift;
2619 0           my ($index) = @_;
2620              
2621 0           my $es = $self->_es;
2622 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2623 0 0         $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
2624              
2625 0           my $r;
2626 0           eval {
2627 0           $r = $es->indices->exists(
2628             index => $index,
2629             );
2630             };
2631 0 0         if ($@) {
2632 0           chomp($@);
2633 0           return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
2634             }
2635              
2636 0 0         return $r ? 1 : 0;
2637             }
2638              
2639             #
2640             # Return a boolean to state for index with type existence
2641             #
2642             sub is_type_exists {
2643 0     0 0   my $self = shift;
2644 0           my ($index, $type) = @_;
2645              
2646 0           my $es = $self->_es;
2647 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2648 0 0         $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
2649 0 0         $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
2650              
2651 0           my $r;
2652 0           eval {
2653 0           my %this_args = (
2654             index => $index,
2655             );
2656 0 0         if ($self->use_type) {
2657 0           $this_args{type} = $type;
2658             }
2659 0           $r = $es->indices->exists_type(%this_args);
2660             };
2661 0 0         if ($@) {
2662 0           chomp($@);
2663 0           return $self->log->error("is_type_exists: failed for index [$index] and ".
2664             "type [$type]: [$@]");
2665             }
2666              
2667 0 0         return $r ? 1 : 0;
2668             }
2669              
2670             #
2671             # Return a boolean to state for document existence
2672             #
2673             sub is_document_exists {
2674 0     0 0   my $self = shift;
2675 0           my ($index, $type, $document) = @_;
2676              
2677 0           my $es = $self->_es;
2678 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2679 0 0         $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2680 0 0         $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2681 0 0         $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2682 0 0         $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2683              
2684 0           my $r;
2685 0           eval {
2686 0           my %this_args = (
2687             index => $index,
2688             %$document,
2689             );
2690 0 0         if ($self->use_type) {
2691 0           $this_args{type} = $type;
2692             }
2693 0           $r = $es->exists(%this_args);
2694             };
2695 0 0         if ($@) {
2696 0           chomp($@);
2697 0           return $self->log->error("is_document_exists: failed for index [$index] and ".
2698             "type [$type]: [$@]");
2699             }
2700              
2701 0 0         return $r ? 1 : 0;
2702             }
2703              
2704             sub parse_error_string {
2705 0     0 0   my $self = shift;
2706 0           my ($string) = @_;
2707              
2708 0 0         $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2709              
2710             # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2711              
2712 0           my ($class, $node, $code, $message, $dump) = $string =~
2713             m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2714              
2715 0 0 0       if (defined($dump) && length($dump)) {
2716 0 0         my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2717 0           $dump = $sd->decode($dump);
2718             }
2719              
2720             # Sanity check
2721 0 0 0       if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
      0        
      0        
      0        
2722             && defined($dump) && ref($dump) eq 'HASH') {
2723             return {
2724 0           class => $class,
2725             node => $node,
2726             code => $code,
2727             message => $message,
2728             dump => $dump,
2729             };
2730             }
2731              
2732             # Were not able to decode, we return as-is.
2733             return {
2734 0           message => $string,
2735             };
2736             }
2737              
2738             #
2739             # Refresh an index to receive latest additions
2740             #
2741             # Search::Elasticsearch::Client::5_0::Direct::Indices
2742             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2743             #
2744             sub refresh_index {
2745 0     0 0   my $self = shift;
2746 0           my ($index) = @_;
2747              
2748 0           my $es = $self->_es;
2749 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2750 0 0         $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2751              
2752 0           my $try = $self->try;
2753              
2754 0           RETRY:
2755              
2756             my $r;
2757 0           eval {
2758 0           $r = $es->indices->refresh(
2759             index => $index,
2760             );
2761             };
2762 0 0         if ($@) {
2763 0 0         if (--$try == 0) {
2764 0           chomp($@);
2765 0           my $p = $self->parse_error_string($@);
2766 0 0 0       if (defined($p) && exists($p->{class})) {
2767 0           my $class = $p->{class};
2768 0           my $code = $p->{code};
2769 0           my $node = $p->{node};
2770 0           return $self->log->error("refresh_index: failed for index [$index] ".
2771             "after [$try] tries with error [$class] code [$code] for node [$node]");
2772             }
2773             else {
2774 0           return $self->log->error("refresh_index: failed for index [$index] ".
2775             "after [$try]: [$@]");
2776             }
2777             }
2778 0           sleep 60;
2779 0           goto RETRY;
2780             }
2781              
2782 0           return $r;
2783             }
2784              
2785             sub export_as {
2786 0     0 0   my $self = shift;
2787 0           my ($format, $index, $size, $cb) = @_;
2788              
2789 0   0       $size ||= 10_000;
2790 0           my $es = $self->_es;
2791 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2792 0 0         $self->brik_help_run_undef_arg('export_as', $format) or return;
2793 0 0         $self->brik_help_run_undef_arg('export_as', $index) or return;
2794 0 0         $self->brik_help_run_undef_arg('export_as', $size) or return;
2795              
2796 0 0 0       if ($format ne 'csv' && $format ne 'json') {
2797 0           return $self->log->error("export_as: unsupported export format ".
2798             "[$format]");
2799             }
2800              
2801 0           my $max = $self->max;
2802 0           my $datadir = $self->datadir;
2803              
2804 0           $self->log->debug("export_as: selecting scroll Command...");
2805              
2806 0           my $scroll;
2807 0 0         my $version = $self->version or return;
2808 0 0         if ($version lt "5.0.0") {
2809 0 0         $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2810             }
2811             else {
2812 0 0         $scroll = $self->open_scroll($index, $size) or return;
2813             }
2814              
2815 0           $self->log->debug("export_as: selecting scroll Command...OK.");
2816              
2817 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2818              
2819 0           my $out;
2820             my $csv_header;
2821 0 0         if ($format eq 'csv') {
    0          
2822 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
2823 0           $out->encoding($self->encoding);
2824 0           $out->separator(',');
2825 0           $out->escape('\\');
2826 0           $out->append(1);
2827 0           $out->first_line_is_header(0);
2828 0           $out->write_header(1);
2829 0           $out->use_quoting(1);
2830 0 0         if (defined($self->csv_header)) {
2831 0           my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
  0            
  0            
2832 0           $out->header($sorted);
2833             }
2834 0 0         if (defined($self->csv_encoded_fields)) {
2835 0           $out->encoded_fields($self->csv_encoded_fields);
2836             }
2837 0 0         if (defined($self->csv_object_fields)) {
2838 0           $out->object_fields($self->csv_object_fields);
2839             }
2840              
2841 0           $csv_header = $out->header;
2842             }
2843             elsif ($format eq 'json') {
2844 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
2845 0           $out->encoding($self->encoding);
2846             }
2847              
2848 0           my $total = $self->total_scroll;
2849 0           $self->log->info("export_as: total [$total] for index [$index]");
2850              
2851 0           my %types = ();
2852 0           my $read = 0;
2853 0           my $skipped = 0;
2854 0           my $exported = 0;
2855 0           my $start = time();
2856 0           my $done = $datadir."/$index.exported";
2857 0           my $start_time = time();
2858 0           my %chunk = ();
2859 0           while (my $next = $self->next_scroll(10000)) {
2860 0           for my $this (@$next) {
2861 0           $read++;
2862              
2863 0 0         if (defined($cb)) {
2864 0           $this = $cb->($this);
2865 0 0         if (! defined($this)) {
2866 0           $self->log->error("export_as: callback failed for index ".
2867             "[$index] at read [$read], skipping single entry");
2868 0           $skipped++;
2869 0           next;
2870             }
2871             }
2872              
2873 0           my $id = $this->{_id};
2874 0           my $doc = $this->{_source};
2875             # Prepare for when types will be removed from ES
2876 0   0       my $type = $this->{_type} || '_doc';
2877 0 0         if (! exists($types{$type})) {
2878 0 0         if ($format eq 'csv') {
    0          
2879             # If not given, we guess the CSV fields to use.
2880 0 0         if (! defined($csv_header)) {
2881 0 0         my $fields = $self->list_index_fields($index, $type)
2882             or return;
2883 0           $types{$type}{header} = [ '_id', @$fields ];
2884             }
2885             else {
2886 0           $types{$type}{header} = [ '_id', @$csv_header ];
2887             }
2888              
2889 0           $types{$type}{output} = $datadir."/$index:$type.csv";
2890             }
2891             elsif ($format eq 'json') {
2892 0           $types{$type}{output} = $datadir."/$index:$type.json";
2893             }
2894              
2895             # Verify it has not been exported yet
2896 0 0         if (-f $done) {
2897 0           return $self->log->error("export_as: export already done ".
2898             "for index [$index]");
2899             }
2900              
2901             $self->log->info("export_as: exporting to file [".
2902 0           $types{$type}{output}."] for type [$type], using ".
2903             "chunk size of [$size]");
2904             }
2905              
2906 0           my $h = { _id => $id };
2907              
2908 0           for my $k (keys %$doc) {
2909 0           $h->{$k} = $doc->{$k};
2910             }
2911              
2912 0 0         if ($format eq 'csv') {
2913 0           $out->header($types{$type}{header});
2914             }
2915              
2916 0           push @{$chunk{$type}}, $h;
  0            
2917 0 0         if (@{$chunk{$type}} > 999) {
  0            
2918 0           my $r = $out->write($chunk{$type}, $types{$type}{output});
2919 0 0         if (!defined($r)) {
2920 0           $self->log->warning("export_as: unable to process entry, ".
2921             "skipping");
2922 0           $skipped++;
2923 0           next;
2924             }
2925 0           $chunk{$type} = [];
2926             }
2927              
2928             # Log a status sometimes.
2929 0 0         if (! (++$exported % 100_000)) {
2930 0           my $now = time();
2931 0           my $perc = sprintf("%.02f", $exported / $total * 100);
2932 0           $self->log->info("export_as: fetched [$exported/$total] ".
2933             "($perc%) elements in ".($now - $start)." second(s) ".
2934             "from index [$index]");
2935 0           $start = time();
2936             }
2937              
2938             # Limit export to specified maximum
2939 0 0 0       if ($max > 0 && $exported >= $max) {
2940 0           $self->log->info("export_as: max export reached [$exported] ".
2941             "for index [$index], stopping");
2942 0           last;
2943             }
2944             }
2945             }
2946              
2947             # Process remaining data waiting to be written and build output file list
2948 0           my %files = ();
2949 0           for my $type (keys %types) {
2950 0 0         if (@{$chunk{$type}} > 0) {
  0            
2951 0           $out->write($chunk{$type}, $types{$type}{output});
2952 0           $files{$types{$type}{output}}++;
2953             }
2954             }
2955              
2956 0           $self->close_scroll;
2957              
2958 0           my $stop_time = time();
2959 0           my $duration = $stop_time - $start_time;
2960 0           my $eps = $exported;
2961 0 0         if ($duration > 0) {
2962 0           $eps = $exported / $duration;
2963             }
2964              
2965             my $result = {
2966             read => $read,
2967             exported => $exported,
2968             skipped => $read - $exported,
2969             total_count => $total,
2970             complete => ($exported == $total) ? 1 : 0,
2971             duration => $duration,
2972             eps => $eps,
2973 0 0         files => [ sort { $a cmp $b } keys %files ],
  0            
2974             };
2975              
2976             # Say the file has been processed, and put resulting stats.
2977 0 0         $fd->write($result, $done) or return;
2978              
2979 0           $self->log->info("export_as: done.");
2980              
2981 0           return $result;
2982             }
2983              
2984             sub export_as_csv {
2985 0     0 0   my $self = shift;
2986 0           my ($index, $size, $cb) = @_;
2987              
2988 0   0       $size ||= 10_000;
2989 0           my $es = $self->_es;
2990 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2991 0 0         $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2992 0 0         $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2993              
2994 0           return $self->export_as('csv', $index, $size, $cb);
2995             }
2996              
2997             sub export_as_json {
2998 0     0 0   my $self = shift;
2999 0           my ($index, $size, $cb) = @_;
3000              
3001 0   0       $size ||= 10_000;
3002 0           my $es = $self->_es;
3003 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3004 0 0         $self->brik_help_run_undef_arg('export_as_json', $index) or return;
3005 0 0         $self->brik_help_run_undef_arg('export_as_json', $size) or return;
3006              
3007 0           return $self->export_as('json', $index, $size, $cb);
3008             }
3009              
3010             #
3011             # Optimization instructions:
3012             # https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
3013             #
3014             sub import_from {
3015 0     0 0   my $self = shift;
3016 0           my ($format, $input, $index, $type, $hash, $cb) = @_;
3017              
3018 0           my $es = $self->_es;
3019 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3020 0 0         $self->brik_help_run_undef_arg('import_from', $format) or return;
3021 0 0         $self->brik_help_run_undef_arg('import_from', $input) or return;
3022 0 0         $self->brik_help_run_file_not_found('import_from', $input) or return;
3023              
3024 0 0 0       if ($format ne 'csv' && $format ne 'json') {
3025 0           return $self->log->error("import_from: unsupported export format ".
3026             "[$format]");
3027             }
3028              
3029             # If index and/or types are not defined, we try to get them from
3030             # input filename
3031 0 0 0       if (! defined($index) || ! defined($type)) {
3032             # Example: index-DATE:type.csv
3033 0 0         if ($input =~ m{^(.+):(.+)\.(?:csv|json)(?:.*)?$}) {
3034 0           my ($this_index, $this_type) = $input =~
3035             m{^(.+):(.+)\.(?:csv|json)(?:.*)?$};
3036 0   0       $index ||= $this_index;
3037 0   0       $type ||= $this_type;
3038             }
3039             }
3040              
3041             # Verify it has not been indexed yet
3042 0           my $done = "$input.imported";
3043 0 0         if (-f $done) {
3044 0           $self->log->info("import_from: import already done for file ".
3045             "[$input]");
3046 0           return 0;
3047             }
3048              
3049             # And default to Attributes if guess failed.
3050 0   0       $index ||= $self->index;
3051 0   0       $type ||= $self->type;
3052 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3053 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3054              
3055 0 0         if ($index eq '*') {
3056 0           return $self->log->error("import_from: cannot import to invalid ".
3057             "index [$index]");
3058             }
3059 0 0         if ($self->use_type) {
3060 0 0         if ($type eq '*') {
3061 0           return $self->log->error("import_from: cannot import to invalid ".
3062             "type [$type]");
3063             }
3064             }
3065              
3066 0           $self->log->debug("input [$input]");
3067 0           $self->log->debug("index [$index]");
3068 0           $self->log->debug("type [$type]");
3069              
3070 0           my $count_before = 0;
3071 0 0         if ($self->is_index_exists($index)) {
3072 0           $count_before = $self->count($index, $type);
3073 0 0         if (! defined($count_before)) {
3074 0           return;
3075             }
3076 0           $self->log->info("import_from: current index [$index] count is ".
3077             "[$count_before]");
3078             }
3079              
3080 0           my $max = $self->max;
3081              
3082 0 0         $self->open_bulk_mode($index, $type) or return;
3083              
3084 0           $self->log->info("import_from: importing file [$input] to index ".
3085             "[$index] with type [$type]");
3086              
3087 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3088              
3089 0           my $out;
3090 0 0         if ($format eq 'csv') {
    0          
3091 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
3092 0           $out->encoding($self->encoding);
3093 0           $out->separator(',');
3094 0           $out->escape('\\');
3095 0           $out->first_line_is_header(1);
3096 0           $out->encoded_fields($self->csv_encoded_fields);
3097 0           $out->object_fields($self->csv_object_fields);
3098             }
3099             elsif ($format eq 'json') {
3100 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
3101 0           $out->encoding($self->encoding);
3102             }
3103              
3104 0           my $refresh_interval;
3105             my $number_of_replicas;
3106 0           my $start = time();
3107 0           my $speed_settings = {};
3108 0           my $imported = 0;
3109 0           my $first = 1;
3110 0           my $read = 0;
3111 0           my $skipped_chunks = 0;
3112 0           my $start_time = time();
3113 0           while (my $this = $out->read_next($input)) {
3114 0           $read++;
3115              
3116 0           my $h = {};
3117 0 0         my $id = $self->use_ignore_id ? undef : $this->{_id};
3118 0           delete $this->{_id};
3119 0           for my $k (keys %$this) {
3120 0           my $value = $this->{$k};
3121             # We keep only fields when they have a value.
3122             # No need to index data that is empty.
3123 0 0 0       if (defined($value) && length($value)) {
3124 0           $h->{$k} = $value;
3125             }
3126             }
3127              
3128 0 0         if (defined($cb)) {
3129 0           $h = $cb->($h);
3130 0 0         if (! defined($h)) {
3131 0           $self->log->error("import_from: callback failed for ".
3132             "index [$index] at read [$read], skipping single entry");
3133 0           $skipped_chunks++;
3134 0           next;
3135             }
3136             }
3137              
3138             # Set routing based on the provided field name, if any.
3139 0           my $this_hash;
3140 0 0 0       if (defined($hash) && defined($hash->{routing})
      0        
3141             && defined($h->{$hash->{routing}})) {
3142 0           $this_hash = { %$hash }; # Make a copy to avoid overwriting
3143             # user provided value.
3144 0           $this_hash->{routing} = $h->{$hash->{routing}};
3145             }
3146              
3147             #$self->log->info(Data::Dumper::Dumper($h));
3148              
3149 0           my $r;
3150 0           eval {
3151 0           $r = $self->index_bulk($h, $index, $type, $this_hash, $id);
3152             };
3153 0 0         if ($@) {
3154 0           chomp($@);
3155 0           $self->log->warning("import_from: error [$@]");
3156             }
3157 0 0         if (! defined($r)) {
3158 0           $self->log->error("import_from: bulk processing failed for ".
3159             "index [$index] at read [$read], skipping chunk");
3160 0           $skipped_chunks++;
3161 0           next;
3162             }
3163              
3164             # Gather index settings, and set values for speed.
3165             # We don't do it earlier, cause we need index to be created,
3166             # and it should have been done from index_bulk Command.
3167 0 0 0       if ($first && $self->is_index_exists($index)) {
3168             # Save current values so we can restore them at the end of Command.
3169             # We ignore errors here, this is non-blocking for indexing.
3170 0           $refresh_interval = $self->get_index_refresh_interval($index);
3171 0           $refresh_interval = $refresh_interval->{$index};
3172 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3173 0           $number_of_replicas = $number_of_replicas->{$index};
3174 0 0         if ($self->use_indexing_optimizations) {
3175 0           $self->set_index_number_of_replicas($index, 0);
3176             }
3177 0           $self->set_index_refresh_interval($index, -1);
3178 0           $first = 0;
3179             }
3180              
3181             # Log a status sometimes.
3182 0 0         if (! (++$imported % 100_000)) {
3183 0           my $now = time();
3184 0           $self->log->info("import_from: imported [$imported] entries in ".
3185             ($now - $start)." second(s) to index [$index]");
3186 0           $start = time();
3187             }
3188              
3189             # Limit import to specified maximum
3190 0 0 0       if ($max > 0 && $imported >= $max) {
3191 0           $self->log->info("import_from: max import reached [$imported] for ".
3192             "index [$index], stopping");
3193 0           last;
3194             }
3195             }
3196              
3197 0           $self->bulk_flush;
3198              
3199 0           my $stop_time = time();
3200 0           my $duration = $stop_time - $start_time;
3201 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3202              
3203 0           $self->refresh_index($index);
3204              
3205 0 0         my $count_current = $self->count($index, $type) or return;
3206 0           $self->log->info("import_from: after index [$index] count is [$count_current] ".
3207             "at EPS [$eps]");
3208              
3209 0           my $skipped = 0;
3210 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3211 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3212 0           $imported = $read;
3213             }
3214             else {
3215 0           $skipped = $read - ($count_current - $count_before);
3216             }
3217              
3218 0           my $result = {
3219             read => $read,
3220             imported => $imported,
3221             skipped => $skipped,
3222             previous_count => $count_before,
3223             current_count => $count_current,
3224             complete => $complete,
3225             duration => $duration,
3226             eps => $eps,
3227             };
3228              
3229             # Say the file has been processed, and put resulting stats.
3230 0 0         $fd->write($result, $done) or return;
3231              
3232             # Restore previous settings, if any
3233 0 0         if (defined($refresh_interval)) {
3234 0           $self->set_index_refresh_interval($index, $refresh_interval);
3235             }
3236 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3237 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3238             }
3239              
3240 0           return $result;
3241             }
3242              
3243             sub import_from_csv {
3244 0     0 0   my $self = shift;
3245 0           my ($input, $index, $type, $hash, $cb) = @_;
3246              
3247 0           my $es = $self->_es;
3248 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3249 0 0         $self->brik_help_run_undef_arg('import_from_csv', $input) or return;
3250 0 0         $self->brik_help_run_file_not_found('import_from_csv', $input)
3251             or return;
3252              
3253 0           return $self->import_from('csv', $input, $index, $type, $hash, $cb);
3254             }
3255              
3256             sub import_from_json {
3257 0     0 0   my $self = shift;
3258 0           my ($input, $index, $type, $hash, $cb) = @_;
3259              
3260 0           my $es = $self->_es;
3261 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3262 0 0         $self->brik_help_run_undef_arg('import_from_json', $input) or return;
3263 0 0         $self->brik_help_run_file_not_found('import_from_json', $input)
3264             or return;
3265              
3266 0           return $self->import_from('json', $input, $index, $type, $hash, $cb);
3267             }
3268              
3269             #
3270             # Same as import_from_csv Command but in worker mode for speed.
3271             #
3272             sub import_from_csv_worker {
3273 0     0 0   my $self = shift;
3274 0           my ($input_csv, $index, $type, $hash, $cb) = @_;
3275              
3276 0           my $es = $self->_es;
3277 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3278 0 0         $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv)
3279             or return;
3280 0 0         $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv)
3281             or return;
3282              
3283             # If index and/or types are not defined, we try to get them from input filename
3284 0 0 0       if (! defined($index) || ! defined($type)) {
3285             # Example: index-DATE:type.csv
3286 0 0         if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
3287 0           my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
3288 0   0       $index ||= $this_index;
3289 0   0       $type ||= $this_type;
3290             }
3291             }
3292              
3293             # Verify it has not been indexed yet
3294 0           my $done = "$input_csv.imported";
3295 0 0         if (-f $done) {
3296 0           $self->log->info("import_from_csv_worker: import already done for ".
3297             "file [$input_csv]");
3298 0           return 0;
3299             }
3300              
3301             # And default to Attributes if guess failed.
3302 0   0       $index ||= $self->index;
3303 0   0       $type ||= $self->type;
3304 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3305 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3306              
3307 0 0         if ($index eq '*') {
3308 0           return $self->log->error("import_from_csv_worker: cannot import to invalid ".
3309             "index [$index]");
3310             }
3311 0 0         if ($self->use_type) {
3312 0 0         if ($type eq '*') {
3313 0           return $self->log->error("import_from_csv_worker: cannot import to ".
3314             "invalid type [$type]");
3315             }
3316             }
3317              
3318 0           $self->log->debug("input [$input_csv]");
3319 0           $self->log->debug("index [$index]");
3320 0           $self->log->debug("type [$type]");
3321              
3322 0           my $count_before = 0;
3323 0 0         if ($self->is_index_exists($index)) {
3324 0           $count_before = $self->count($index, $type);
3325 0 0         if (! defined($count_before)) {
3326 0           return;
3327             }
3328 0           $self->log->info("import_from_csv_worker: current index [$index] count is ".
3329             "[$count_before]");
3330             }
3331              
3332 0           my $max = $self->max;
3333              
3334 0 0         $self->open_bulk_mode($index, $type) or return;
3335              
3336             #my $batch = undef;
3337 0           my $batch = 10_000;
3338              
3339 0           $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
3340             "index [$index] with type [$type] and batch [$batch]");
3341              
3342 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3343              
3344 0 0         my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
3345 0           $fc->separator(',');
3346 0           $fc->escape('\\');
3347 0           $fc->first_line_is_header(1);
3348 0           $fc->encoded_fields($self->csv_encoded_fields);
3349 0           $fc->object_fields($self->csv_object_fields);
3350              
3351 0 0         my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
3352 0           $wp->pool_size(2);
3353              
3354 0 0         $wp->create_manager or return;
3355              
3356 0           my $refresh_interval;
3357             my $number_of_replicas;
3358 0           my $start = time();
3359 0           my $speed_settings = {};
3360 0           my $imported = 0;
3361 0           my $first = 1;
3362 0           my $read = 0;
3363 0           my $skipped_chunks = 0;
3364 0           my $start_time = time();
3365 0           while (my $list = $fc->read_next($input_csv, $batch)) {
3366              
3367             $wp->start(sub {
3368 0     0     my @list = ();
3369 0           for my $this (@$list) {
3370 0           $read++;
3371              
3372 0           my $h = {};
3373 0           my $id = $this->{_id};
3374 0           delete $this->{_id};
3375 0           for my $k (keys %$this) {
3376 0           my $value = $this->{$k};
3377             # We keep only fields when they have a value.
3378             # No need to index data that is empty.
3379 0 0 0       if (defined($value) && length($value)) {
3380 0           $h->{$k} = $value;
3381             }
3382             }
3383              
3384 0 0         if (defined($cb)) {
3385 0           $h = $cb->($h);
3386 0 0         if (! defined($h)) {
3387 0           $self->log->error("import_from_csv_worker: callback failed for ".
3388             "index [$index] at read [$read], skipping single entry");
3389 0           $skipped_chunks++;
3390 0           next;
3391             }
3392             }
3393              
3394 0           push @list, $h;
3395             }
3396              
3397 0           my $r;
3398 0           eval {
3399 0           $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
3400             };
3401 0 0         if ($@) {
3402 0           chomp($@);
3403 0           $self->log->warning("import_from_csv_worker: error [$@]");
3404             }
3405 0 0         if (! defined($r)) {
3406 0           $self->log->error("import_from_csv_worker: bulk processing failed for ".
3407             "index [$index] at read [$read], skipping chunk");
3408 0           $skipped_chunks++;
3409 0           next;
3410             }
3411              
3412             # Log a status sometimes.
3413 0 0         if (! ($imported % 10_000)) {
3414 0           my $now = time();
3415 0           my $diff = sprintf("%.02f", $now - $start);
3416 0           my $eps = sprintf("%.02f", $imported / $diff);
3417 0           $self->log->info("import_from_csv_worker: imported [$imported] entries ".
3418             "in [$diff] second(s) to index [$index] at EPS [$eps]");
3419 0           $start = time();
3420             }
3421              
3422 0           exit(0);
3423 0           });
3424              
3425             # Gather index settings, and set values for speed.
3426             # We don't do it earlier, cause we need index to be created,
3427             # and it should have been done from index_bulk Command.
3428 0 0 0       if ($first && $self->is_index_exists($index)) {
3429             # Save current values so we can restore them at the end of Command.
3430             # We ignore errors here, this is non-blocking for indexing.
3431 0           $refresh_interval = $self->get_index_refresh_interval($index);
3432 0           $refresh_interval = $refresh_interval->{$index};
3433 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3434 0           $number_of_replicas = $number_of_replicas->{$index};
3435 0 0         if ($self->use_indexing_optimizations) {
3436 0           $self->set_index_number_of_replicas($index, 0);
3437             }
3438 0           $self->set_index_refresh_interval($index, -1);
3439 0           $first = 0;
3440             }
3441              
3442             # Log a status sometimes.
3443             #$imported += @$list;
3444             #if (! ($imported % 10_000)) {
3445             #my $now = time();
3446             #my $diff = sprintf("%.02f", $now - $start);
3447             #my $eps = sprintf("%.02f", 10_000 / $diff);
3448             #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
3449             #"in [$diff] second(s) to index [$index] at EPS [$eps]");
3450             #$start = time();
3451             #}
3452              
3453             # Limit import to specified maximum
3454 0 0 0       if ($max > 0 && $imported >= $max) {
3455 0           $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
3456             "index [$index], stopping");
3457 0           last;
3458             }
3459              
3460 0 0         last if (@$list < $batch);
3461              
3462 0           $imported += @$list;
3463             }
3464              
3465 0           $wp->stop;
3466              
3467 0           $self->bulk_flush;
3468              
3469 0           my $stop_time = time();
3470 0           my $duration = $stop_time - $start_time;
3471 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3472              
3473 0           $self->refresh_index($index);
3474              
3475 0 0         my $count_current = $self->count($index, $type) or return;
3476 0           $self->log->info("import_from_csv_worker: after index [$index] count ".
3477             "is [$count_current] at EPS [$eps]");
3478              
3479 0           my $skipped = 0;
3480 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3481 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3482 0           $imported = $read;
3483             }
3484             else {
3485 0           $skipped = $read - ($count_current - $count_before);
3486             }
3487              
3488 0           my $result = {
3489             read => $read,
3490             imported => $imported,
3491             skipped => $skipped,
3492             previous_count => $count_before,
3493             current_count => $count_current,
3494             complete => $complete,
3495             duration => $duration,
3496             eps => $eps,
3497             };
3498              
3499             # Say the file has been processed, and put resulting stats.
3500 0 0         $fd->write($result, $done) or return;
3501              
3502             # Restore previous settings, if any
3503 0 0         if (defined($refresh_interval)) {
3504 0           $self->set_index_refresh_interval($index, $refresh_interval);
3505             }
3506 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3507 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3508             }
3509              
3510 0           return $result;
3511             }
3512              
3513             #
3514             # http://localhost:9200/_nodes/stats/process?pretty
3515             #
3516             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3517             #
3518             sub get_stats_process {
3519 0     0 0   my $self = shift;
3520              
3521 0           my $es = $self->_es;
3522 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3523              
3524 0           my $r;
3525 0           eval {
3526 0           $r = $es->nodes->stats(
3527             metric => [ qw(process) ],
3528             );
3529             };
3530 0 0         if ($@) {
3531 0           chomp($@);
3532 0           return $self->log->error("get_stats_process: failed: [$@]");
3533             }
3534              
3535 0           return $r;
3536             }
3537              
3538             #
3539             # curl http://localhost:9200/_nodes/process?pretty
3540             #
3541             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3542             #
3543             sub get_process {
3544 0     0 0   my $self = shift;
3545              
3546 0           my $es = $self->_es;
3547 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3548              
3549 0           my $r;
3550 0           eval {
3551 0           $r = $es->nodes->info(
3552             metric => [ qw(process) ],
3553             );
3554             };
3555 0 0         if ($@) {
3556 0           chomp($@);
3557 0           return $self->log->error("get_process: failed: [$@]");
3558             }
3559              
3560 0           return $r;
3561             }
3562              
3563             #
3564             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3565             #
3566             sub get_cluster_state {
3567 0     0 0   my $self = shift;
3568              
3569 0           my $es = $self->_es;
3570 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3571              
3572 0           my $r;
3573 0           eval {
3574 0           $r = $es->cluster->state;
3575             };
3576 0 0         if ($@) {
3577 0           chomp($@);
3578 0           return $self->log->error("get_cluster_state: failed: [$@]");
3579             }
3580              
3581 0           return $r;
3582             }
3583              
3584             #
3585             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3586             #
3587             sub get_cluster_health {
3588 0     0 0   my $self = shift;
3589              
3590 0           my $es = $self->_es;
3591 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3592              
3593 0           my $r;
3594 0           eval {
3595 0           $r = $es->cluster->health;
3596             };
3597 0 0         if ($@) {
3598 0           chomp($@);
3599 0           return $self->log->error("get_cluster_health: failed: [$@]");
3600             }
3601              
3602 0           return $r;
3603             }
3604              
3605             #
3606             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3607             #
3608             sub get_cluster_settings {
3609 0     0 0   my $self = shift;
3610              
3611 0           my $es = $self->_es;
3612 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3613              
3614 0           my $r;
3615 0           eval {
3616 0           $r = $es->cluster->get_settings;
3617             };
3618 0 0         if ($@) {
3619 0           chomp($@);
3620 0           return $self->log->error("get_cluster_settings: failed: [$@]");
3621             }
3622              
3623 0           return $r;
3624             }
3625              
3626             #
3627             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3628             #
3629             sub put_cluster_settings {
3630 0     0 0   my $self = shift;
3631 0           my ($settings) = @_;
3632              
3633 0           my $es = $self->_es;
3634 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3635 0 0         $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
3636 0 0         $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
3637              
3638 0           my %args = (
3639             body => $settings,
3640             );
3641              
3642 0           my $r;
3643 0           eval {
3644 0           $r = $es->cluster->put_settings(%args);
3645             };
3646 0 0         if ($@) {
3647 0           chomp($@);
3648 0           return $self->log->error("put_cluster_settings: failed: [$@]");
3649             }
3650              
3651 0           return $r;
3652             }
3653              
3654             sub count_green_indices {
3655 0     0 0   my $self = shift;
3656              
3657 0 0         my $get = $self->show_indices or return;
3658              
3659 0           my $count = 0;
3660 0           for (@$get) {
3661 0 0         if (/^\s*green\s+/) {
3662 0           $count++;
3663             }
3664             }
3665              
3666 0           return $count;
3667             }
3668              
3669             sub count_yellow_indices {
3670 0     0 0   my $self = shift;
3671              
3672 0 0         my $get = $self->show_indices or return;
3673              
3674 0           my $count = 0;
3675 0           for (@$get) {
3676 0 0         if (/^\s*yellow\s+/) {
3677 0           $count++;
3678             }
3679             }
3680              
3681 0           return $count;
3682             }
3683              
3684             sub count_red_indices {
3685 0     0 0   my $self = shift;
3686              
3687 0 0         my $get = $self->show_indices or return;
3688              
3689 0           my $count = 0;
3690 0           for (@$get) {
3691 0 0         if (/^\s*red\s+/) {
3692 0           $count++;
3693             }
3694             }
3695              
3696 0           return $count;
3697             }
3698              
3699             sub count_indices {
3700 0     0 0   my $self = shift;
3701              
3702 0 0         my $get = $self->show_indices or return;
3703              
3704 0           return scalar @$get;
3705             }
3706              
3707             sub list_indices_status {
3708 0     0 0   my $self = shift;
3709              
3710 0 0         my $get = $self->show_indices or return;
3711              
3712 0           my $count_red = 0;
3713 0           my $count_yellow = 0;
3714 0           my $count_green = 0;
3715 0           for (@$get) {
3716 0 0         if (/^\s*red\s+/) {
    0          
    0          
3717 0           $count_red++;
3718             }
3719             elsif (/^\s*yellow\s+/) {
3720 0           $count_yellow++;
3721             }
3722             elsif (/^\s*green\s+/) {
3723 0           $count_green++;
3724             }
3725             }
3726              
3727             return {
3728 0           red => $count_red,
3729             yellow => $count_yellow,
3730             green => $count_green,
3731             };
3732             }
3733              
3734             sub count_shards {
3735 0     0 0   my $self = shift;
3736              
3737 0 0         my $indices = $self->get_indices or return;
3738              
3739 0           my $count = 0;
3740 0           for (@$indices) {
3741 0           $count += $_->{shards};
3742             }
3743              
3744 0           return $count;
3745             }
3746              
3747             sub count_size {
3748 0     0 0   my $self = shift;
3749 0           my ($string) = @_;
3750              
3751 0 0         my $indices = $self->get_indices($string) or return;
3752              
3753 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3754 0           $fn->decimal_point(".");
3755 0           $fn->kibi_suffix("kb");
3756 0           $fn->mebi_suffix("mb");
3757 0           $fn->gibi_suffix("gb");
3758 0           $fn->kilo_suffix("KB");
3759 0           $fn->mega_suffix("MB");
3760 0           $fn->giga_suffix("GB");
3761              
3762 0           my $size = 0;
3763 0           for (@$indices) {
3764 0           $size += $fn->to_number($_->{size});
3765             }
3766              
3767 0           return $fn->from_number($size);
3768             }
3769              
3770             sub count_total_size {
3771 0     0 0   my $self = shift;
3772 0           my ($string) = @_;
3773              
3774 0 0         my $indices = $self->get_indices($string) or return;
3775              
3776 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3777 0           $fn->decimal_point(".");
3778 0           $fn->kibi_suffix("kb");
3779 0           $fn->mebi_suffix("mb");
3780 0           $fn->gibi_suffix("gb");
3781 0           $fn->kilo_suffix("KB");
3782 0           $fn->mega_suffix("MB");
3783 0           $fn->giga_suffix("GB");
3784              
3785 0           my $size = 0;
3786 0           for (@$indices) {
3787 0           $size += $fn->to_number($_->{total_size});
3788             }
3789              
3790 0           return $fn->from_number($size);
3791             }
3792              
3793             sub count_count {
3794 0     0 0   my $self = shift;
3795              
3796 0 0         my $indices = $self->get_indices or return;
3797              
3798 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3799 0           $fn->kilo_suffix('k');
3800 0           $fn->mega_suffix('m');
3801 0           $fn->giga_suffix('M');
3802              
3803 0           my $count = 0;
3804 0           for (@$indices) {
3805 0           $count += $_->{count};
3806             }
3807              
3808 0           return $fn->from_number($count);
3809             }
3810              
3811             sub list_green_indices {
3812 0     0 0   my $self = shift;
3813              
3814 0 0         my $get = $self->get_indices or return;
3815              
3816 0           my @indices = ();
3817 0           for (@$get) {
3818 0 0         if ($_->{color} eq 'green') {
3819 0           push @indices, $_->{index};
3820             }
3821             }
3822              
3823 0           return \@indices;
3824             }
3825              
3826             sub list_yellow_indices {
3827 0     0 0   my $self = shift;
3828              
3829 0 0         my $get = $self->get_indices or return;
3830              
3831 0           my @indices = ();
3832 0           for (@$get) {
3833 0 0         if ($_->{color} eq 'yellow') {
3834 0           push @indices, $_->{index};
3835             }
3836             }
3837              
3838 0           return \@indices;
3839             }
3840              
3841             sub list_red_indices {
3842 0     0 0   my $self = shift;
3843              
3844 0 0         my $get = $self->get_indices or return;
3845              
3846 0           my @indices = ();
3847 0           for (@$get) {
3848 0 0         if ($_->{color} eq 'red') {
3849 0           push @indices, $_->{index};
3850             }
3851             }
3852              
3853 0           return \@indices;
3854             }
3855              
3856             #
3857             # https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
3858             #
3859             sub list_datatypes {
3860 0     0 0   my $self = shift;
3861              
3862             return {
3863 0           core => [ qw(string long integer short byte double float data boolean binary) ],
3864             };
3865             }
3866              
3867             #
3868             # Return total hits for last www_search
3869             #
3870             sub get_hits_total {
3871 0     0 0   my $self = shift;
3872 0           my ($run) = @_;
3873              
3874 0 0         $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
3875              
3876 0 0         if (ref($run) eq 'HASH') {
3877 0 0 0       if (exists($run->{hits}) && exists($run->{hits}{total})) {
3878             # In ES 7.x, total is now a hash. We rewrite it to only keep the
3879             # number:
3880 0 0         if (ref($run->{hits}{total}) eq 'HASH') {
3881 0           return $run->{hits}{total}{value};
3882             }
3883 0           return $run->{hits}{total};
3884             }
3885             }
3886              
3887 0           return $self->log->error("get_hits_total: last Command not compatible");
3888             }
3889              
3890             sub disable_shard_allocation {
3891 0     0 0   my $self = shift;
3892              
3893 0           my $settings = {
3894             persistent => {
3895             'cluster.routing.allocation.enable' => 'none',
3896             }
3897             };
3898              
3899 0           return $self->put_cluster_settings($settings);
3900             }
3901              
3902             sub enable_shard_allocation {
3903 0     0 0   my $self = shift;
3904              
3905 0           my $settings = {
3906             persistent => {
3907             'cluster.routing.allocation.enable' => 'all',
3908             }
3909             };
3910              
3911 0           return $self->put_cluster_settings($settings);
3912             }
3913              
3914             sub flush_synced {
3915 0     0 0   my $self = shift;
3916              
3917 0           my $es = $self->_es;
3918 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3919              
3920 0           my $r;
3921 0           eval {
3922 0           $r = $es->indices->flush_synced;
3923             };
3924 0 0         if ($@) {
3925 0           chomp($@);
3926 0           return $self->log->error("flush_synced: failed: [$@]");
3927             }
3928              
3929 0           return $r;
3930             }
3931              
3932             #
3933             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3934             #
3935             # run client::elasticsearch create_snapshot_repository myrepo
3936             # "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
3937             #
3938             # You have to set path.repo in elasticsearch.yml like:
3939             # path.repo: ["/home/gomor/es-backups"]
3940             #
3941             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
3942             #
3943             sub create_snapshot_repository {
3944 0     0 0   my $self = shift;
3945 0           my ($body, $repository_name) = @_;
3946              
3947 0           my $es = $self->_es;
3948 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3949 0 0         $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
3950              
3951 0   0       $repository_name ||= 'repository';
3952              
3953 0           my %args = (
3954             repository => $repository_name,
3955             body => $body,
3956             );
3957              
3958 0           my $r;
3959 0           eval {
3960 0           $r = $es->snapshot->create_repository(%args);
3961             };
3962 0 0         if ($@) {
3963 0           chomp($@);
3964 0           return $self->log->error("create_snapshot_repository: failed: [$@]");
3965             }
3966              
3967 0           return $r;
3968             }
3969              
3970             sub create_shared_fs_snapshot_repository {
3971 0     0 0   my $self = shift;
3972 0           my ($location, $repository_name) = @_;
3973              
3974 0   0       $repository_name ||= 'repository';
3975 0 0         $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
3976              
3977 0 0         if ($location !~ m{^/}) {
3978 0           return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
3979             "a full directory path, this one is invalid [$location]");
3980             }
3981              
3982 0           my $body = {
3983             #type => 'fs',
3984             settings => {
3985             compress => 'true',
3986             location => $location,
3987             },
3988             };
3989              
3990 0           return $self->create_snapshot_repository($body, $repository_name);
3991             }
3992              
3993             #
3994             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
3995             #
3996             sub get_snapshot_repositories {
3997 0     0 0   my $self = shift;
3998              
3999 0           my $es = $self->_es;
4000 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4001              
4002 0           my $r;
4003 0           eval {
4004 0           $r = $es->snapshot->get_repository;
4005             };
4006 0 0         if ($@) {
4007 0           chomp($@);
4008 0           return $self->log->error("get_snapshot_repositories: failed: [$@]");
4009             }
4010              
4011 0           return $r;
4012             }
4013              
4014             #
4015             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
4016             #
4017             sub get_snapshot_status {
4018 0     0 0   my $self = shift;
4019              
4020 0           my $es = $self->_es;
4021 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4022              
4023 0           my $r;
4024 0           eval {
4025 0           $r = $es->snapshot->status;
4026             };
4027 0 0         if ($@) {
4028 0           chomp($@);
4029 0           return $self->log->error("get_snapshot_status: failed: [$@]");
4030             }
4031              
4032 0           return $r;
4033             }
4034              
4035             #
4036             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4037             #
4038             sub create_snapshot {
4039 0     0 0   my $self = shift;
4040 0           my ($snapshot_name, $repository_name, $body) = @_;
4041              
4042 0           my $es = $self->_es;
4043 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4044              
4045 0   0       $snapshot_name ||= 'snapshot';
4046 0   0       $repository_name ||= 'repository';
4047              
4048 0           my %args = (
4049             repository => $repository_name,
4050             snapshot => $snapshot_name,
4051             );
4052 0 0         if (defined($body)) {
4053 0           $args{body} = $body;
4054             }
4055              
4056 0           my $r;
4057 0           eval {
4058 0           $r = $es->snapshot->create(%args);
4059             };
4060 0 0         if ($@) {
4061 0           chomp($@);
4062 0           return $self->log->error("create_snapshot: failed: [$@]");
4063             }
4064              
4065 0           return $r;
4066             }
4067              
4068             sub create_snapshot_for_indices {
4069 0     0 0   my $self = shift;
4070 0           my ($indices, $snapshot_name, $repository_name) = @_;
4071              
4072 0 0         $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
4073              
4074 0   0       $snapshot_name ||= 'snapshot';
4075 0   0       $repository_name ||= 'repository';
4076              
4077 0           my $body = {
4078             indices => $indices,
4079             };
4080              
4081 0           return $self->create_snapshot($snapshot_name, $repository_name, $body);
4082             }
4083              
4084             sub is_snapshot_finished {
4085 0     0 0   my $self = shift;
4086              
4087 0 0         my $status = $self->get_snapshot_status or return;
4088              
4089 0 0         if (@{$status->{snapshots}} == 0) {
  0            
4090 0           return 1;
4091             }
4092              
4093 0           return 0;
4094             }
4095              
4096             sub get_snapshot_state {
4097 0     0 0   my $self = shift;
4098              
4099 0 0         if ($self->is_snapshot_finished) {
4100 0           return $self->log->info("get_snapshot_state: is already finished");
4101             }
4102              
4103 0 0         my $status = $self->get_snapshot_status or return;
4104              
4105 0           my @indices_done = ();
4106 0           my @indices_not_done = ();
4107              
4108 0           my $list = $status->{snapshots};
4109 0           for my $snapshot (@$list) {
4110 0           my $indices = $snapshot->{indices};
4111 0           for my $index (@$indices) {
4112 0           my $done = $index->{shards_stats}{done};
4113 0 0         if ($done) {
4114 0           push @indices_done, $index;
4115             }
4116             else {
4117 0           push @indices_not_done, $index;
4118             }
4119             }
4120             }
4121              
4122 0           return { done => \@indices_done, not_done => \@indices_not_done };
4123             }
4124              
4125       0 0   sub verify_snapshot_repository {
4126             }
4127              
4128             sub delete_snapshot_repository {
4129 0     0 0   my $self = shift;
4130 0           my ($repository_name) = @_;
4131              
4132 0           my $es = $self->_es;
4133 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4134 0 0         $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
4135              
4136 0           my $r;
4137 0           eval {
4138 0           $r = $es->snapshot->delete_repository(
4139             repository => $repository_name,
4140             );
4141             };
4142 0 0         if ($@) {
4143 0           chomp($@);
4144 0           return $self->log->error("delete_snapshot_repository: failed: [$@]");
4145             }
4146              
4147 0           return $r;
4148             }
4149              
4150             sub get_snapshot {
4151 0     0 0   my $self = shift;
4152 0           my ($snapshot_name, $repository_name) = @_;
4153              
4154 0           my $es = $self->_es;
4155 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4156              
4157 0   0       $snapshot_name ||= 'snapshot';
4158 0   0       $repository_name ||= 'repository';
4159              
4160 0           my $r;
4161 0           eval {
4162 0           $r = $es->snapshot->get(
4163             repository => $repository_name,
4164             snapshot => $snapshot_name,
4165             );
4166             };
4167 0 0         if ($@) {
4168 0           chomp($@);
4169 0           return $self->log->error("get_snapshot: failed: [$@]");
4170             }
4171              
4172 0           return $r;
4173             }
4174              
4175             #
4176             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4177             #
4178             sub delete_snapshot {
4179 0     0 0   my $self = shift;
4180 0           my ($snapshot_name, $repository_name) = @_;
4181              
4182 0           my $es = $self->_es;
4183 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4184 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
4185 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
4186              
4187 0           my $timeout = $self->rtimeout;
4188              
4189 0           my $r;
4190 0           eval {
4191 0           $r = $es->snapshot->delete(
4192             repository => $repository_name,
4193             snapshot => $snapshot_name,
4194             master_timeout => "${timeout}s",
4195             );
4196             };
4197 0 0         if ($@) {
4198 0           chomp($@);
4199 0           return $self->log->error("delete_snapshot: failed: [$@]");
4200             }
4201              
4202 0           return $r;
4203             }
4204              
4205             #
4206             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
4207             #
4208             sub restore_snapshot {
4209 0     0 0   my $self = shift;
4210 0           my ($snapshot_name, $repository_name, $body) = @_;
4211              
4212 0           my $es = $self->_es;
4213 0   0       $snapshot_name ||= 'snapshot';
4214 0   0       $repository_name ||= 'repository';
4215 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4216 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
4217 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
4218              
4219 0           my %args = (
4220             repository => $repository_name,
4221             snapshot => $snapshot_name,
4222             );
4223 0 0         if (defined($body)) {
4224 0           $args{body} = $body;
4225             }
4226              
4227 0           my $r;
4228 0           eval {
4229 0           $r = $es->snapshot->restore(%args);
4230             };
4231 0 0         if ($@) {
4232 0           chomp($@);
4233 0           return $self->log->error("restore_snapshot: failed: [$@]");
4234             }
4235              
4236 0           return $r;
4237             }
4238              
4239             sub restore_snapshot_for_indices {
4240 0     0 0   my $self = shift;
4241 0           my ($indices, $snapshot_name, $repository_name) = @_;
4242              
4243 0   0       $snapshot_name ||= 'snapshot';
4244 0   0       $repository_name ||= 'repository';
4245 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
4246 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
4247 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
4248              
4249 0           my $body = {
4250             indices => $indices,
4251             };
4252              
4253 0           return $self->restore_snapshot($snapshot_name, $repository_name, $body);
4254             }
4255              
4256             # shard occupation
4257             #
4258             # curl -XGET "http://127.0.0.1:9200/_cat/shards?v
4259             # Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
4260             #
4261             # disk occuption:
4262             # curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
4263             #
4264             #
4265             # Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
4266             #
4267              
4268             # Check memory lock
4269              
4270             # curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
4271             # {
4272             # "nodes" : {
4273             # "3XXX" : {
4274             # "process" : {
4275             # "mlockall" : true
4276             # }
4277             # }
4278             # }
4279             # }
4280              
4281             1;
4282              
4283             __END__