File Coverage

blib/lib/Metabrik/Client/Elasticsearch.pm
Criterion Covered Total %
statement 9 2023 0.4
branch 0 1210 0.0
condition 0 270 0.0
subroutine 3 130 2.3
pod 2 125 1.6
total 14 3758 0.3


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # client::elasticsearch Brik
5             #
6             package Metabrik::Client::Elasticsearch;
7 2     2   1565 use strict;
  2         5  
  2         60  
8 2     2   12 use warnings;
  2         8  
  2         65  
9              
10 2     2   10 use base qw(Metabrik::Client::Rest);
  2         4  
  2         998  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable es es) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             datadir => [ qw(datadir) ],
20             nodes => [ qw(node_list) ],
21             cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22             date => [ qw(date) ],
23             index => [ qw(index) ],
24             type => [ qw(type) ],
25             from => [ qw(number) ],
26             size => [ qw(count) ],
27             max => [ qw(count) ],
28             max_flush_count => [ qw(count) ],
29             max_flush_size => [ qw(count) ],
30             rtimeout => [ qw(seconds) ],
31             sniff_rtimeout => [ qw(seconds) ],
32             try => [ qw(count) ],
33             use_bulk_autoflush => [ qw(0|1) ],
34             use_indexing_optimizations => [ qw(0|1) ],
35             use_ignore_id => [ qw(0|1) ],
36             use_type => [ qw(0|1) ],
37             csv_header => [ qw(fields) ],
38             csv_encoded_fields => [ qw(fields) ],
39             csv_object_fields => [ qw(fields) ],
40             encoding => [ qw(utf8|ascii) ],
41             disable_deprecation_logging => [ qw(0|1) ],
42             es_version => [ qw(0|1) ],
43             _es => [ qw(INTERNAL) ],
44             _bulk => [ qw(INTERNAL) ],
45             _scroll => [ qw(INTERNAL) ],
46             },
47             attributes_default => {
48             nodes => [ qw(http://localhost:9200) ],
49             cxn_pool => 'Sniff',
50             from => 0,
51             size => 10,
52             max => 0,
53             index => '*',
54             type => '*',
55             rtimeout => 60,
56             sniff_rtimeout => 3,
57             try => 3,
58             max_flush_count => 1_000,
59             max_flush_size => 1_000_000,
60             use_bulk_autoflush => 1,
61             use_indexing_optimizations => 0,
62             use_ignore_id => 0,
63             use_type => 1,
64             encoding => 'utf8',
65             disable_deprecation_logging => 0,
66             es_version => '7',
67             },
68             commands => {
69             open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
70             open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
71             open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
72             open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
73             close_scroll => [ ],
74             total_scroll => [ ],
75             next_scroll => [ qw(count|OPTIONAL) ],
76             reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
77             get_reindex_tasks => [ ],
78             cancel_reindex_task => [ qw(id) ],
79             get_taskid => [ qw(id) ],
80             show_reindex_progress => [ ],
81             loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
82             index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
83             index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
84             index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
85             clean_deleted_from_index => [ qw(index) ],
86             update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
87             update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
88             bulk_flush => [ qw(index|OPTIONAL) ],
89             query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
90             count => [ qw(index|OPTIONAL type|OPTIONAL) ],
91             get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
92             www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
93             delete_index => [ qw(index|indices_list) ],
94             update_alias => [ qw(new_index alias) ],
95             delete_document => [ qw(index type id) ],
96             delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
97             show_indices => [ qw(string_filter|OPTIONAL) ],
98             show_nodes => [ ],
99             show_health => [ ],
100             show_recovery => [ ],
101             show_allocation => [ ],
102             list_indices => [ qw(regex|OPTIONAL) ],
103             get_indices => [ qw(string_filter|OPTIONAL) ],
104             get_index => [ qw(index|indices_list) ],
105             get_index_stats => [ qw(index) ],
106             list_index_types => [ qw(index) ],
107             list_index_fields => [ qw(index) ],
108             list_indices_version => [ qw(index|indices_list) ],
109             open_index => [ qw(index|indices_list) ],
110             close_index => [ qw(index|indices_list) ],
111             get_aliases => [ qw(index) ],
112             put_alias => [ qw(index alias) ],
113             delete_alias => [ qw(index alias) ],
114             is_mapping_exists => [ qw(index mapping) ],
115             get_mappings => [ qw(index type|OPTIONAL) ],
116             create_index => [ qw(index shards|OPTIONAL) ],
117             create_index_with_mappings => [ qw(index mappings) ],
118             info => [ qw(nodes_list|OPTIONAL) ],
119             version => [ qw(nodes_list|OPTIONAL) ],
120             get_templates => [ ],
121             list_templates => [ ],
122             get_template => [ qw(name) ],
123             put_mapping => [ qw(index type mapping) ],
124             put_mapping_from_json_file => [ qw(index type file) ],
125             update_mapping_from_json_file => [ qw(file index type) ],
126             put_template => [ qw(name template) ],
127             put_template_from_json_file => [ qw(file name|OPTIONAL) ],
128             update_template_from_json_file => [ qw(file name|OPTIONAL) ],
129             get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
130             put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
131             set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
132             reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
133             list_index_readonly => [ ],
134             set_index_number_of_replicas => [ qw(index|indices_list number) ],
135             set_index_refresh_interval => [ qw(index|indices_list number) ],
136             get_index_settings => [ qw(index|indices_list) ],
137             get_index_readonly => [ qw(index|indices_list) ],
138             get_index_number_of_replicas => [ qw(index|indices) ],
139             get_index_refresh_interval => [ qw(index|indices_list) ],
140             get_index_number_of_shards => [ qw(index|indices_list) ],
141             delete_template => [ qw(name) ],
142             is_index_exists => [ qw(index) ],
143             is_type_exists => [ qw(index type) ],
144             is_document_exists => [ qw(index type document) ],
145             parse_error_string => [ qw(string) ],
146             refresh_index => [ qw(index) ],
147             export_as => [ qw(format index size|OPTIONAL callback|OPTIONAL) ],
148             export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
149             export_as_json => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
150             import_from => [ qw(format input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
151             import_from_csv => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
152             import_from_json => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
153             import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
154             get_stats_process => [ ],
155             get_process => [ ],
156             get_cluster_state => [ ],
157             get_cluster_health => [ ],
158             get_cluster_settings => [ ],
159             put_cluster_settings => [ qw(settings) ],
160             count_green_indices => [ ],
161             count_yellow_indices => [ ],
162             count_red_indices => [ ],
163             list_green_indices => [ ],
164             list_yellow_indices => [ ],
165             list_red_indices => [ ],
166             count_indices => [ ],
167             list_indices_status => [ ],
168             count_shards => [ ],
169             count_size => [ qw(string_filter|OPTIONAL) ],
170             count_total_size => [ qw(string_filter|OPTIONAL) ],
171             count_count => [ ],
172             list_datatypes => [ ],
173             get_hits_total => [ qw(results) ],
174             disable_shard_allocation => [ ],
175             enable_shard_allocation => [ ],
176             flush_synced => [ ],
177             create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
178             create_shared_fs_snapshot_repository => [ qw(location
179             repository_name|OPTIONAL) ],
180             get_snapshot_repositories => [ ],
181             get_snapshot_status => [ ],
182             delete_snapshot_repository => [ qw(repository_name) ],
183             create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
184             body|OPTIONAL) ],
185             create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
186             repository_name|OPTIONAL) ],
187             is_snapshot_finished => [ ],
188             get_snapshot_state => [ ],
189             get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
190             delete_snapshot => [ qw(snapshot_name repository_name) ],
191             restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
192             restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
193             },
194             require_modules => {
195             'Metabrik::String::Json' => [ ],
196             'Metabrik::File::Csv' => [ ],
197             'Metabrik::File::Json' => [ ],
198             'Metabrik::File::Dump' => [ ],
199             'Metabrik::Format::Number' => [ ],
200             'Metabrik::Worker::Parallel' => [ ],
201             'Search::Elasticsearch' => [ ],
202             },
203             };
204             }
205              
206             sub brik_preinit {
207 0     0 1   my $self = shift;
208              
209 0           eval("use Search::Elasticsearch;");
210 0 0         if ($Search::Elasticsearch::VERSION < 5) {
211 0           $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
212             "with: run perl::module install Search::Elasticsearch");
213             }
214              
215 0           return $self->SUPER::brik_preinit;
216             }
217              
218             sub open {
219 0     0 0   my $self = shift;
220 0           my ($nodes, $cxn_pool) = @_;
221              
222 0   0       $nodes ||= $self->nodes;
223 0   0       $cxn_pool ||= $self->cxn_pool;
224 0 0         $self->brik_help_run_undef_arg('open', $nodes) or return;
225 0 0         $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
226 0 0         $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
227 0 0         $self->brik_help_run_empty_array_arg('open', $nodes) or return;
228              
229 0           for my $node (@$nodes) {
230 0 0         if ($node !~ m{https?://}) {
231 0           return $self->log->error("open: invalid node[$node], must start with http(s)");
232             }
233             }
234              
235 0           my $timeout = $self->rtimeout;
236              
237 0           my $nodes_str = join('|', @$nodes);
238 0           $self->log->debug("open: using nodes [$nodes_str]");
239              
240             #
241             # Timeout description here:
242             #
243             # Search::Elasticsearch::Role::Cxn
244             #
245              
246 0           my %args = (
247             nodes => $nodes,
248             cxn_pool => $cxn_pool,
249             timeout => $timeout,
250             max_retries => $self->try,
251             retry_on_timeout => 1,
252             sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
253             request_timeout => 60, # seconds, default 30
254             ping_timeout => 5, # seconds, default 2
255             dead_timeout => 120, # seconds, detault 60
256             max_dead_timeout => 3600, # seconds, default 3600
257             sniff_request_timeout => 15, # seconds, default 2
258             #trace_to => 'Stderr', # For debug purposes
259             );
260              
261 0 0         if ($self->disable_deprecation_logging) {
262 0           $args{deprecate_to} = ['File', '/dev/null'];
263             }
264              
265 0           my $es = Search::Elasticsearch->new(%args);
266 0 0         if (! defined($es)) {
267 0           return $self->log->error("open: failed");
268             }
269              
270 0           $self->_es($es);
271              
272 0           return $nodes;
273             }
274              
275             #
276             # Search::Elasticsearch::Client::5_0::Bulk
277             #
278             sub open_bulk_mode {
279 0     0 0   my $self = shift;
280 0           my ($index, $type) = @_;
281              
282 0   0       $index ||= $self->index;
283 0   0       $type ||= $self->type;
284 0           my $es = $self->_es;
285 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
286 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
287 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
288              
289             my %args = (
290             index => $index,
291             on_error => sub {
292             #my ($action, $response, $i) = @_;
293              
294             #print Data::Dumper::Dumper($action)."\n";
295             #print Data::Dumper::Dumper($response)."\n";
296             #print Data::Dumper::Dumper($i)."\n";
297 0     0     print Data::Dumper::Dumper(\@_)."\n";
298             },
299 0           );
300              
301 0 0         if ($self->use_type) {
302 0           $args{type} = $type;
303             }
304              
305 0 0         if ($self->use_bulk_autoflush) {
306 0   0       my $max_count = $self->max_flush_count || 1_000;
307 0   0       my $max_size = $self->max_flush_size || 1_000_000;
308              
309 0           $args{max_count} = $max_count;
310 0           $args{max_size} = $max_size;
311 0           $args{max_time} = 0;
312              
313 0           $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
314             "max_flush_size [$max_size]");
315             }
316             else {
317 0           $args{max_count} = 0;
318 0           $args{max_size} = 0;
319 0           $args{max_time} = 0;
320 0           $args{on_error} = undef;
321             #$args{on_success} = sub {
322             #my ($action, $response, $i) = @_;
323             #};
324              
325 0           $self->log->info("open_bulk_mode: opening without automatic flushing");
326             }
327              
328 0           my $bulk;
329 0           eval {
330 0           $bulk = $es->bulk_helper(%args);
331             };
332 0 0         if ($@) {
333 0           chomp($@);
334 0           return $self->log->error("open_bulk_mode: failed: [$@]");
335             }
336              
337 0           $self->_bulk($bulk);
338              
339 0           return $self->nodes;
340             }
341              
342             sub open_scroll_scan_mode {
343 0     0 0   my $self = shift;
344 0           my ($index, $size) = @_;
345              
346 0 0         my $version = $self->version or return;
347 0 0         if ($version ge "5.0.0") {
348 0           return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
349             "$version, try open_scroll Command instead");
350             }
351              
352 0   0       $index ||= $self->index;
353 0   0       $size ||= $self->size;
354 0           my $es = $self->_es;
355 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
356 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
357 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
358              
359 0           my $scroll;
360 0           eval {
361 0           $scroll = $es->scroll_helper(
362             index => $index,
363             search_type => 'scan',
364             size => $size,
365             );
366             };
367 0 0         if ($@) {
368 0           chomp($@);
369 0           return $self->log->error("open_scroll_scan_mode: failed: $@");
370             }
371              
372 0           $self->_scroll($scroll);
373              
374 0           return $self->nodes;
375             }
376              
377             #
378             # Search::Elasticsearch::Client::5_0::Scroll
379             #
380             sub open_scroll {
381 0     0 0   my $self = shift;
382 0           my ($index, $size, $type, $query) = @_;
383              
384 0 0         my $version = $self->version or return;
385 0 0         if ($version lt "5.0.0") {
386 0           return $self->log->error("open_scroll: Command not supported for ES version ".
387             "$version, try open_scroll_scan_mode Command instead");
388             }
389              
390 0   0       $query ||= { query => { match_all => {} } };
391 0   0       $index ||= $self->index;
392 0   0       $type ||= $self->type;
393 0   0       $size ||= $self->size;
394 0           my $es = $self->_es;
395 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
396 0 0         $self->brik_help_run_undef_arg('open_scroll', $index) or return;
397 0 0         $self->brik_help_run_undef_arg('open_scroll', $size) or return;
398              
399 0           my $timeout = $self->rtimeout;
400              
401 0           my %args = (
402             scroll => "${timeout}s",
403             # Starting with Search::Elasticsearch 7.x, scroll_in_qs does not exist anymore
404             #scroll_in_qs => 1, # By default (0), pass scroll_id in request body. When 1, pass
405             # it in query string.
406             index => $index,
407             size => $size,
408             body => $query,
409             );
410 0 0         if ($self->use_type) {
411 0 0         if ($type ne '*') {
412 0           $args{type} = $type;
413             }
414             }
415              
416             #
417             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
418             #
419 0           my $scroll;
420 0           eval {
421 0           $scroll = $es->scroll_helper(%args);
422             };
423 0 0         if ($@) {
424 0           chomp($@);
425 0           return $self->log->error("open_scroll: failed: $@");
426             }
427              
428 0           $self->_scroll($scroll);
429              
430 0           $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
431              
432 0           return $self->nodes;
433             }
434              
435             #
436             # Search::Elasticsearch::Client::5_0::Scroll
437             #
438             sub close_scroll {
439 0     0 0   my $self = shift;
440              
441 0           my $scroll = $self->_scroll;
442 0 0         if (! defined($scroll)) {
443 0           return 1;
444             }
445              
446 0           $scroll->finish;
447 0           $self->_scroll(undef);
448              
449 0           return 1;
450             }
451              
452             sub total_scroll {
453 0     0 0   my $self = shift;
454              
455 0           my $scroll = $self->_scroll;
456 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
457              
458 0           my $total;
459 0           eval {
460 0           $total = $scroll->total;
461             };
462 0 0         if ($@) {
463 0           chomp($@);
464 0           return $self->log->error("total_scroll: failed with: [$@]");
465             }
466              
467 0           return $total;
468             }
469              
470             sub next_scroll {
471 0     0 0   my $self = shift;
472 0           my ($count) = @_;
473              
474 0   0       $count ||= 1;
475              
476 0           my $scroll = $self->_scroll;
477 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
478              
479 0           my $try = $self->try;
480              
481 0           RETRY:
482              
483             my $next;
484 0           eval {
485 0 0         if ($count > 1) {
486 0           my @docs = $scroll->next($count);
487 0 0         if (@docs > 0) {
488 0           $next = \@docs;
489             }
490             }
491             else {
492 0           $next = $scroll->next;
493             }
494             };
495 0 0         if ($@) {
496 0           chomp($@);
497 0 0         if (--$try == 0) {
498 0           return $self->log->error("next_scroll: failed after try [$try] tries ".
499             "with error [$@]");
500             }
501 0           $self->log->warning("next_scroll: sleeping 10 seconds before retry cause error: [$@]");
502 0           sleep 10;
503 0           goto RETRY;
504             }
505              
506 0           return $next;
507             }
508              
509             #
510             # Search::Elasticsearch::Client::5_0::Direct
511             #
512             sub index_document {
513 0     0 0   my $self = shift;
514 0           my ($doc, $index, $type, $hash, $id) = @_;
515              
516 0   0       $index ||= $self->index;
517 0   0       $type ||= $self->type;
518 0           my $es = $self->_es;
519 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
520 0 0         $self->brik_help_run_undef_arg('index_document', $doc) or return;
521 0 0         $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
522 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
523 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
524              
525 0           my %args = (
526             index => $index,
527             body => $doc,
528             );
529 0 0         if (defined($id)) {
530 0           $args{id} = $id;
531             }
532              
533 0 0         if ($self->use_type) {
534 0           $args{type} = $type;
535             }
536              
537 0 0         if (defined($hash)) {
538 0 0         $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH')
539             or return;
540 0           my $this_hash = { %$hash };
541 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
542 0           $this_hash->{routing} = $doc->{$hash->{routing}};
543             }
544 0           %args = ( %args, %$this_hash );
545             }
546              
547 0           my $r;
548 0           eval {
549 0           $r = $es->index(%args);
550             };
551 0 0         if ($@) {
552 0           chomp($@);
553 0           return $self->log->error("index_document: index failed for ".
554             "index [$index]: [$@]");
555             }
556              
557 0           return $r;
558             }
559              
560             #
561             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
562             #
563             sub reindex {
564 0     0 0   my $self = shift;
565 0           my ($index, $new, $type) = @_;
566              
567 0           my $es = $self->_es;
568 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
569 0 0         $self->brik_help_run_undef_arg('reindex', $index) or return;
570 0 0         $self->brik_help_run_undef_arg('reindex', $new) or return;
571              
572 0           my %args = (
573             body => {
574             conflicts => 'proceed',
575             source => { index => $index },
576             dest => { index => $new },
577             },
578             wait_for_completion => 'false', # Immediately return the task.
579             );
580              
581             # Change the type for destination doc
582 0 0         if ($self->use_type) {
583 0 0         if (defined($type)) {
584 0           $args{body}{dest}{type} = $type;
585             }
586             }
587              
588 0           my $r;
589 0           eval {
590 0           $r = $es->reindex(%args);
591             };
592 0 0         if ($@) {
593 0           chomp($@);
594 0           return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
595             }
596              
597 0           return $r;
598             }
599              
600             #
601             # List reindex tasks
602             #
603             # curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
604             #
605             # Cancel reindex task
606             #
607             # curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
608             #
609              
610             #
611             # Search::Elasticsearch::Client::6_0::Direct::Tasks
612             #
613             sub get_reindex_tasks {
614 0     0 0   my $self = shift;
615              
616 0           my $es = $self->_es;
617 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
618              
619 0           my $t = $es->tasks;
620              
621 0           my $list = $t->list;
622 0           my $nodes = $list->{nodes};
623 0 0         if (! defined($nodes)) {
624 0           return $self->log->error("get_reindex_tasks: no nodes found");
625             }
626              
627 0           my %tasks = ();
628 0           for my $node (keys %$nodes) {
629 0           for my $id (keys %{$nodes->{$node}}) {
  0            
630 0           my $tasks = $nodes->{$node}{tasks};
631 0           for my $task (keys %$tasks) {
632 0           my $action = $tasks->{$task}{action};
633 0 0 0       if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
634 0           $tasks{$task} = $tasks->{$task};
635             }
636             }
637             }
638             }
639              
640 0           return \%tasks;
641             }
642              
643             sub cancel_reindex_task {
644 0     0 0   my $self = shift;
645 0           my ($id) = @_;
646              
647 0           my $es = $self->_es;
648 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
649 0 0         $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;
650              
651 0           my $t = $es->tasks;
652              
653 0           return $t->cancel(task_id => $id);
654             }
655              
656             sub get_taskid {
657 0     0 0   my $self = shift;
658 0           my ($id) = @_;
659              
660 0           my $es = $self->_es;
661 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
662 0 0         $self->brik_help_run_undef_arg('get_taskid', $id) or return;
663              
664 0           my $t = $es->tasks;
665              
666 0           return $t->get(task_id => $id);
667             }
668              
669             sub show_reindex_progress {
670 0     0 0   my $self = shift;
671              
672 0           my $es = $self->_es;
673 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
674              
675 0 0         my $tasks = $self->get_reindex_tasks or return;
676 0 0         if (! keys %$tasks) {
677 0           $self->log->info("show_reindex_progress: no reindex task in progress");
678 0           return 0;
679             }
680              
681 0           for my $id (keys %$tasks) {
682 0 0         my $task = $self->get_taskid($id) or next;
683              
684 0           my $status = $task->{task}{status};
685 0           my $desc = $task->{task}{description};
686 0           my $total = $status->{total};
687 0           my $created = $status->{created};
688 0           my $deleted = $status->{deleted};
689 0           my $updated = $status->{updated};
690              
691 0           my $perc = ($created + $deleted + $updated) / $total * 100;
692              
693 0           printf("> Task [%s]: %.02f%%\n", $desc, $perc);
694 0           print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
695             }
696              
697 0           return 1;
698             }
699              
700             sub loop_show_reindex_progress {
701 0     0 0   my $self = shift;
702 0           my ($sec) = @_;
703              
704 0   0       $sec ||= 60;
705 0           my $es = $self->_es;
706 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
707              
708 0           while (1) {
709 0 0         $self->show_reindex_progress or return;
710 0           sleep($sec);
711             }
712              
713 0           return 1;
714             }
715              
716             sub reindex_with_mapping_from_json_file {
717 0     0 0   my $self = shift;
718 0           my ($index, $new, $file) = @_;
719              
720 0           my $es = $self->_es;
721 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
722 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
723             or return;
724 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
725 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
726 0 0         $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
727             or return;
728              
729 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
730 0 0         my $json = $fj->read($file) or return;
731              
732 0           return $self->reindex($index, $new, $json);
733             }
734              
735             #
736             # Search::Elasticsearch::Client::5_0::Direct
737             #
738             # To execute this Command using routing requires to use the correct field
739             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
740             # this would be a little bit complicated to do in an efficient way.
741             #
742             sub update_document {
743 0     0 0   my $self = shift;
744 0           my ($doc, $id, $index, $type, $hash) = @_;
745              
746 0   0       $index ||= $self->index;
747 0   0       $type ||= $self->type;
748 0           my $es = $self->_es;
749 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
750 0 0         $self->brik_help_run_undef_arg('update_document', $doc) or return;
751 0 0         $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
752 0 0         $self->brik_help_run_undef_arg('update_document', $id) or return;
753 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
754 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
755              
756 0           my %args = (
757             id => $id,
758             index => $index,
759             body => { doc => $doc },
760             );
761              
762 0 0         if ($self->use_type) {
763 0           $args{type} = $type;
764             }
765              
766 0 0         if (defined($hash)) {
767 0 0         $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
768             or return;
769 0           %args = ( %args, %$hash );
770             }
771              
772 0           my $r;
773 0           eval {
774 0           $r = $es->update(%args);
775             };
776 0 0         if ($@) {
777 0           chomp($@);
778 0           return $self->log->error("update_document: index failed for index [$index]: [$@]");
779             }
780              
781 0           return $r;
782             }
783              
784             #
785             # Search::Elasticsearch::Client::5_0::Bulk
786             #
787             sub index_bulk {
788 0     0 0   my $self = shift;
789 0           my ($doc, $index, $type, $hash, $id) = @_;
790              
791 0           my $bulk = $self->_bulk;
792 0   0       $index ||= $self->index;
793 0   0       $type ||= $self->type;
794 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
795 0 0         $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
796 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
797 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
798              
799 0           my %args = (
800             source => $doc,
801             );
802 0 0         if (defined($id)) {
803 0           $args{id} = $id;
804             }
805              
806 0 0         if (defined($hash)) {
807 0 0         $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
808 0           my $this_hash = { %$hash };
809 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
810 0           $this_hash->{routing} = $doc->{$hash->{routing}};
811             }
812 0           %args = ( %args, %$this_hash );
813             }
814              
815 0           my $r;
816 0           eval {
817 0           $r = $bulk->add_action(index => \%args);
818             };
819 0 0         if ($@) {
820 0           chomp($@);
821 0           my $p = $self->parse_error_string($@);
822 0 0 0       if (defined($p) && exists($p->{class})) {
823 0           my $class = $p->{class};
824 0           my $code = $p->{code};
825 0           my $node = $p->{node};
826 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
827             "[$class] code [$code] for node [$node]");
828             }
829             else {
830 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
831             }
832             }
833              
834 0           return $r;
835             }
836              
837             #
838             # Allows to index multiple docs at one time
839             # $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
840             #
841             sub index_bulk_from_list {
842 0     0 0   my $self = shift;
843 0           my ($list, $index, $type, $hash) = @_;
844              
845 0           my $bulk = $self->_bulk;
846 0   0       $index ||= $self->index;
847 0   0       $type ||= $self->type;
848 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
849 0 0         $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
850 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY')
851             or return;
852 0 0         $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list)
853             or return;
854 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
855 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
856              
857 0 0         if (defined($hash)) {
858 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH')
859             or return;
860             }
861              
862 0           my @args = ();
863 0           for my $doc (@$list) {
864 0           my %args = (
865             source => $doc,
866             );
867 0 0         if (defined($hash)) {
868 0           my $this_hash = { %$hash };
869 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
870 0           $this_hash->{routing} = $doc->{$hash->{routing}};
871             }
872 0           %args = ( %args, %$this_hash );
873             }
874 0           push @args, \%args;
875             }
876              
877 0           my $r;
878 0           eval {
879 0           $r = $bulk->index(@args);
880             };
881 0 0         if ($@) {
882 0           chomp($@);
883 0           my $p = $self->parse_error_string($@);
884 0 0 0       if (defined($p) && exists($p->{class})) {
885 0           my $class = $p->{class};
886 0           my $code = $p->{code};
887 0           my $node = $p->{node};
888 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
889             "[$class] code [$code] for node [$node]");
890             }
891             else {
892 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
893             }
894             }
895              
896 0           return $r;
897             }
898              
899             #
900             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
901             #
902             sub clean_deleted_from_index {
903 0     0 0   my $self = shift;
904 0           my ($index) = @_;
905              
906 0 0         $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;
907              
908 0           my $es = $self->_es;
909 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
910              
911 0           my $indices = $self->_es->indices;
912              
913 0           my $r;
914 0           eval {
915 0           $r = $indices->forcemerge(
916             index => $index,
917             only_expunge_deletes => 'true',
918             );
919             };
920 0 0         if ($@) {
921 0           chomp($@);
922 0           my $p = $self->parse_error_string($@);
923 0 0 0       if (defined($p) && exists($p->{class})) {
924 0           my $class = $p->{class};
925 0           my $code = $p->{code};
926 0           my $node = $p->{node};
927 0           return $self->log->error("clean_deleted_from_index: failed for index ".
928             "[$index] with error [$class] code [$code] for node [$node]");
929             }
930             else {
931 0           return $self->log->error("clean_deleted_from_index: index failed for ".
932             "index [$index]: [$@]");
933             }
934             }
935              
936 0           return $r;
937             }
938              
939             #
940             # To execute this Command using routing requires to use the correct field
941             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
942             # this would be a little bit complicated to do in an efficient way.
943             #
944             sub update_document_bulk {
945 0     0 0   my $self = shift;
946 0           my ($doc, $index, $type, $hash, $id) = @_;
947              
948 0           my $bulk = $self->_bulk;
949 0   0       $index ||= $self->index;
950 0   0       $type ||= $self->type;
951 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
952 0 0         $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
953 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
954 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
955              
956 0           my %args = (
957             index => $index,
958             doc => $doc,
959             );
960              
961 0 0         if ($self->use_type) {
962 0           $args{type} = $type;
963             }
964              
965 0 0         if (defined($id)) {
966 0           $args{id} = $id;
967             }
968              
969 0 0         if (defined($hash)) {
970 0 0         $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
971             or return;
972 0           %args = ( %args, %$hash );
973             }
974              
975 0           my $r;
976 0           eval {
977 0           $r = $bulk->update(\%args);
978             };
979 0 0         if ($@) {
980 0           chomp($@);
981 0           my $p = $self->parse_error_string($@);
982 0 0 0       if (defined($p) && exists($p->{class})) {
983 0           my $class = $p->{class};
984 0           my $code = $p->{code};
985 0           my $node = $p->{node};
986 0           return $self->log->error("update_document_bulk: failed for index [$index] ".
987             "with error [$class] code [$code] for node [$node]");
988             }
989             else {
990 0           return $self->log->error("update_document_bulk: index failed for ".
991             "index [$index]: [$@]");
992             }
993             }
994              
995 0           return $r;
996             }
997              
998             #
999             # We may have to call refresh_index after a bulk_flush, so we give an additional
1000             # optional Argument for given index.
1001             #
1002             sub bulk_flush {
1003 0     0 0   my $self = shift;
1004 0           my ($index) = @_;
1005              
1006 0           my $bulk = $self->_bulk;
1007 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
1008              
1009 0           my $try = $self->try;
1010              
1011 0           RETRY:
1012              
1013             my $r;
1014 0           eval {
1015 0           $r = $bulk->flush;
1016             };
1017 0 0         if ($@) {
1018 0           chomp($@);
1019 0 0         if (--$try == 0) {
1020 0           my $p = $self->parse_error_string($@);
1021 0 0 0       if (defined($p) && exists($p->{class})) {
1022 0           my $class = $p->{class};
1023 0           my $code = $p->{code};
1024 0           my $node = $p->{node};
1025 0           return $self->log->error("bulk_flush: failed after [$try] tries with error ".
1026             "[$class] code [$code] for node [$node]");
1027             }
1028             else {
1029 0           return $self->log->error("bulk_flush: failed after [$try]: [$@]");
1030             }
1031             }
1032 0           $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
1033             "[$@]");
1034 0           sleep 10;
1035 0           goto RETRY;
1036             }
1037              
1038 0 0         if (defined($index)) {
1039 0           $self->refresh_index($index);
1040             }
1041              
1042 0           return $r;
1043             }
1044              
1045             #
1046             # Search::Elasticsearch::Client::2_0::Direct
1047             # Search::Elasticsearch::Client::5_0::Direct
1048             #
1049             sub count {
1050 0     0 0   my $self = shift;
1051 0           my ($index, $type) = @_;
1052              
1053 0   0       $index ||= $self->index;
1054 0   0       $type ||= $self->type;
1055 0           my $es = $self->_es;
1056 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1057              
1058 0           my %args = ();
1059 0 0 0       if (defined($index) && $index ne '*') {
1060 0           $args{index} = $index;
1061             }
1062 0 0         if ($self->use_type) {
1063 0 0 0       if (defined($type) && $type ne '*') {
1064 0           $args{type} = $type;
1065             }
1066             }
1067              
1068             #$args{body} = {
1069             #query => {
1070             #match => { title => 'Elasticsearch clients' },
1071             #},
1072             #}
1073              
1074 0           my $r;
1075 0 0         my $version = $self->version or return;
1076 0 0         if ($version ge "5.0.0") {
1077 0           eval {
1078 0           $r = $es->count(%args);
1079             };
1080             }
1081             else {
1082 0           eval {
1083 0           $r = $es->search(%args);
1084             };
1085             }
1086 0 0         if ($@) {
1087 0           chomp($@);
1088 0           return $self->log->error("count: count failed for index [$index]: [$@]");
1089             }
1090              
1091 0 0 0       if ($version ge "5.0.0") {
    0          
1092 0 0         if (exists($r->{count})) {
1093 0           return $r->{count};
1094             }
1095             }
1096             elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
1097 0           return $r->{hits}{total};
1098             }
1099              
1100 0           return $self->log->error("count: nothing found");
1101             }
1102              
1103             #
1104             # https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
1105             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
1106             #
1107             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1108             #
1109             # To perform a query using routing requires to use the correct field
1110             # value directly in $hash->{routing}. We cannot "guess" it from $q,
1111             # this would be a little bit complicated to do in an efficient way.
1112             #
1113             sub query {
1114 0     0 0   my $self = shift;
1115 0           my ($query, $index, $type, $hash) = @_;
1116              
1117 0   0       $index ||= $self->index;
1118 0   0       $type ||= $self->type;
1119 0           my $es = $self->_es;
1120 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1121 0 0         $self->brik_help_run_undef_arg('query', $query) or return;
1122 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1123 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1124 0 0         $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
1125              
1126 0           my $timeout = $self->rtimeout;
1127 0           my $es_version = $self->es_version;
1128              
1129 0           my %args = (
1130             index => $index,
1131             body => $query,
1132             );
1133              
1134 0 0         if ($es_version == 7) {
1135 0           $args{track_total_hits} = 'true';
1136             }
1137              
1138 0 0         if (defined($hash)) {
1139 0 0         $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
1140 0           %args = ( %args, %$hash );
1141             }
1142              
1143 0 0         if ($self->use_type) {
1144 0 0         if ($type ne '*') {
1145 0           $args{type} = $type;
1146             }
1147             }
1148              
1149 0           my $r;
1150 0           eval {
1151 0           $r = $es->search(%args);
1152             };
1153 0 0         if ($@) {
1154 0           chomp($@);
1155 0           return $self->log->error("query: failed for index [$index]: [$@]");
1156             }
1157              
1158 0           return $r;
1159             }
1160              
1161             sub get_from_id {
1162 0     0 0   my $self = shift;
1163 0           my ($id, $index, $type) = @_;
1164              
1165 0   0       $index ||= $self->index;
1166 0   0       $type ||= $self->type;
1167 0           my $es = $self->_es;
1168 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1169 0 0         $self->brik_help_run_undef_arg('get_from_id', $id) or return;
1170 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1171 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1172              
1173 0           my $r;
1174 0           eval {
1175 0           my %this_args = (
1176             index => $index,
1177             id => $id,
1178             );
1179 0 0         if ($self->use_type) {
1180 0           $this_args{type} = $type;
1181             }
1182 0           $r = $es->get(%this_args);
1183             };
1184 0 0         if ($@) {
1185 0           chomp($@);
1186 0           return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
1187             }
1188              
1189 0           return $r;
1190             }
1191              
1192             #
1193             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
1194             #
1195             sub www_search {
1196 0     0 0   my $self = shift;
1197 0           my ($query, $index, $type) = @_;
1198              
1199 0   0       $index ||= $self->index;
1200 0   0       $type ||= $self->type;
1201 0 0         $self->brik_help_run_undef_arg('www_search', $query) or return;
1202 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1203 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1204              
1205 0           my $from = $self->from;
1206 0           my $size = $self->size;
1207              
1208 0 0         my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
1209              
1210 0           my $nodes = $self->nodes;
1211 0           for my $node (@$nodes) {
1212             # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
1213 0           my $url = "$node/$index";
1214 0 0         if ($self->use_type) {
1215 0 0         if ($type ne '*') {
1216 0           $url .= "/$type";
1217             }
1218             }
1219 0           $url .= "/_search/?from=$from&size=$size&q=".$query;
1220              
1221 0 0         my $get = $self->SUPER::get($url) or next;
1222 0           my $body = $get->{content};
1223              
1224 0 0         my $decoded = $sj->decode($body) or next;
1225              
1226 0           return $decoded;
1227             }
1228              
1229 0           return;
1230             }
1231              
1232             #
1233             # Search::Elasticsearch::Client::2_0::Direct::Indices
1234             #
1235             sub delete_index {
1236 0     0 0   my $self = shift;
1237 0           my ($index) = @_;
1238              
1239 0           my $es = $self->_es;
1240 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1241 0 0         $self->brik_help_run_undef_arg('delete_index', $index) or return;
1242 0 0         $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
1243              
1244 0           my %args = (
1245             index => $index,
1246             );
1247              
1248 0           my $r;
1249 0           eval {
1250 0           $r = $es->indices->delete(%args);
1251             };
1252 0 0         if ($@) {
1253 0           chomp($@);
1254 0           return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
1255             }
1256              
1257 0           return $r;
1258             }
1259              
1260             #
1261             # Search::Elasticsearch::Client::2_0::Direct::Indices
1262             #
1263             # To execute this Command using routing requires to use the correct field
1264             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
1265             # this would be a little bit complicated to do in an efficient way.
1266             #
1267             sub delete_document {
1268 0     0 0   my $self = shift;
1269 0           my ($index, $type, $id, $hash) = @_;
1270              
1271 0           my $es = $self->_es;
1272 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1273 0 0         $self->brik_help_run_undef_arg('delete_document', $index) or return;
1274 0 0         $self->brik_help_run_undef_arg('delete_document', $id) or return;
1275              
1276 0           my %args = (
1277             index => $index,
1278             id => $id,
1279             );
1280              
1281 0 0         if ($self->use_type) {
1282 0           $args{type} = $type;
1283             }
1284              
1285 0 0         if (defined($hash)) {
1286 0 0         $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH')
1287             or return;
1288 0           %args = ( %args, %$hash );
1289             }
1290              
1291 0           my $r;
1292 0           eval {
1293 0           $r = $es->delete(%args);
1294             };
1295 0 0         if ($@) {
1296 0           chomp($@);
1297 0           return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
1298             }
1299              
1300 0           return $r;
1301             }
1302              
1303             #
1304             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
1305             #
1306             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1307             #
1308             sub delete_by_query {
1309 0     0 0   my $self = shift;
1310 0           my ($query, $index, $type, $proceed) = @_;
1311              
1312 0           my $es = $self->_es;
1313 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1314 0 0         $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
1315 0 0         $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
1316 0 0         $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
1317 0 0         $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
1318              
1319 0           my $timeout = $self->rtimeout;
1320              
1321 0           my %args = (
1322             index => $index,
1323             body => $query,
1324             );
1325              
1326 0 0         if ($self->use_type) {
1327 0           $args{type} = $type;
1328             }
1329              
1330 0 0 0       if (defined($proceed) && $proceed) {
1331 0           $args{conflicts} = 'proceed';
1332             }
1333              
1334 0           my $r;
1335 0           eval {
1336 0           $r = $es->delete_by_query(%args);
1337             };
1338 0 0         if ($@) {
1339 0           chomp($@);
1340 0           return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
1341             }
1342              
1343             # This may fail, we ignore it.
1344 0           $self->refresh_index($index);
1345              
1346 0           return $r;
1347             }
1348              
1349             #
1350             # Search::Elasticsearch::Client::2_0::Direct::Cat
1351             #
1352             # https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
1353             #
1354             sub show_indices {
1355 0     0 0   my $self = shift;
1356 0           my ($string) = @_;
1357              
1358 0           my $es = $self->_es;
1359 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1360              
1361 0           my $r;
1362 0           eval {
1363 0           $r = $es->cat->indices;
1364             };
1365 0 0         if ($@) {
1366 0           chomp($@);
1367 0           return $self->log->error("show_indices: failed: [$@]");
1368             }
1369              
1370 0           my @lines = split(/\n/, $r);
1371              
1372 0 0         if (@lines == 0) {
1373 0           $self->log->warning("show_indices: nothing returned, no index?");
1374             }
1375              
1376 0           my @filtered = ();
1377 0 0         if (defined($string)) {
1378 0           for (@lines) {
1379 0 0         if (m{$string}) {
1380 0           push @filtered, $_;
1381             }
1382             }
1383 0           @lines = @filtered;
1384             }
1385              
1386 0           return \@lines;
1387             }
1388              
1389             #
1390             # Search::Elasticsearch::Client::2_0::Direct::Cat
1391             #
1392             sub show_nodes {
1393 0     0 0   my $self = shift;
1394              
1395 0           my $es = $self->_es;
1396 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1397              
1398 0           my $r;
1399 0           eval {
1400 0           $r = $es->cat->nodes;
1401             };
1402 0 0         if ($@) {
1403 0           chomp($@);
1404 0           return $self->log->error("show_nodes: failed: [$@]");
1405             }
1406              
1407 0           my @lines = split(/\n/, $r);
1408              
1409 0 0         if (@lines == 0) {
1410 0           $self->log->warning("show_nodes: nothing returned, no nodes?");
1411             }
1412              
1413 0           return \@lines;
1414             }
1415              
1416             #
1417             # Search::Elasticsearch::Client::2_0::Direct::Cat
1418             #
1419             sub show_health {
1420 0     0 0   my $self = shift;
1421              
1422 0           my $es = $self->_es;
1423 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1424              
1425 0           my $r;
1426 0           eval {
1427 0           $r = $es->cat->health;
1428             };
1429 0 0         if ($@) {
1430 0           chomp($@);
1431 0           return $self->log->error("show_health: failed: [$@]");
1432             }
1433              
1434 0           my @lines = split(/\n/, $r);
1435              
1436 0 0         if (@lines == 0) {
1437 0           $self->log->warning("show_health: nothing returned, no recovery?");
1438             }
1439              
1440 0           return \@lines;
1441             }
1442              
1443             #
1444             # Search::Elasticsearch::Client::2_0::Direct::Cat
1445             #
1446             sub show_recovery {
1447 0     0 0   my $self = shift;
1448              
1449 0           my $es = $self->_es;
1450 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1451              
1452 0           my $r;
1453 0           eval {
1454 0           $r = $es->cat->recovery;
1455             };
1456 0 0         if ($@) {
1457 0           chomp($@);
1458 0           return $self->log->error("show_recovery: failed: [$@]");
1459             }
1460              
1461 0           my @lines = split(/\n/, $r);
1462              
1463 0 0         if (@lines == 0) {
1464 0           $self->log->warning("show_recovery: nothing returned, no index?");
1465             }
1466              
1467 0           return \@lines;
1468             }
1469              
1470             #
1471             # curl -s 'localhost:9200/_cat/allocation?v'
1472             #
1473             sub show_allocation {
1474 0     0 0   my $self = shift;
1475              
1476 0           my $es = $self->_es;
1477 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1478              
1479 0           my $r;
1480 0           eval {
1481 0           $r = $es->cat->allocation;
1482             };
1483 0 0         if ($@) {
1484 0           chomp($@);
1485 0           return $self->log->error("show_allocation: failed: [$@]");
1486             }
1487              
1488 0           my @lines = split(/\n/, $r);
1489              
1490 0 0         if (@lines == 0) {
1491 0           $self->log->warning("show_allocation: nothing returned, no index?");
1492             }
1493              
1494 0           return \@lines;
1495             }
1496              
1497             sub list_indices {
1498 0     0 0   my $self = shift;
1499 0           my ($regex) = @_;
1500              
1501 0 0         my $get = $self->get_indices or return;
1502              
1503 0           my @indices = ();
1504 0           for (@$get) {
1505 0 0         if (defined($regex)) {
1506 0 0         if ($_->{index} =~ m{$regex}) {
1507 0           push @indices, $_->{index};
1508             }
1509             }
1510             else {
1511 0           push @indices, $_->{index};
1512             }
1513             }
1514              
1515 0           return [ sort { $a cmp $b } @indices ];
  0            
1516             }
1517              
1518             sub get_indices {
1519 0     0 0   my $self = shift;
1520 0           my ($string) = @_;
1521              
1522 0 0         my $lines = $self->show_indices($string) or return;
1523 0 0         if (@$lines == 0) {
1524 0           $self->log->warning("get_indices: no index found");
1525 0           return [];
1526             }
1527              
1528             #
1529             # Format depends on ElasticSearch version. We try to detect the format.
1530             #
1531             # 5.0.0:
1532             # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1 146 0 251.8kb 251.8kb"
1533             #
1534 0           my @indices = ();
1535 0           for (@$lines) {
1536 0           my @t = split(/\s+/);
1537 0 0         if (@t == 10) { # Version 5.0.0
    0          
    0          
1538 0           my $color = $t[0];
1539 0           my $state = $t[1];
1540 0           my $index = $t[2];
1541 0           my $id = $t[3];
1542 0           my $shards = $t[4];
1543 0           my $replicas = $t[5];
1544 0           my $count = $t[6];
1545 0           my $count2 = $t[7];
1546 0           my $total_size = $t[8];
1547 0           my $size = $t[9];
1548 0           push @indices, {
1549             color => $color,
1550             state => $state,
1551             index => $index,
1552             id => $id,
1553             shards => $shards,
1554             replicas => $replicas,
1555             count => $count,
1556             total_size => $total_size,
1557             size => $size,
1558             };
1559             }
1560             elsif (@t == 9) {
1561 0           my $index = $t[2];
1562 0           push @indices, {
1563             index => $index,
1564             };
1565             }
1566             elsif (@t == 8) {
1567 0           my $index = $t[1];
1568 0           push @indices, {
1569             index => $index,
1570             };
1571             }
1572             }
1573              
1574 0           return \@indices;
1575             }
1576              
1577             #
1578             # Search::Elasticsearch::Client::5_0::Direct::Indices
1579             #
1580             sub get_index {
1581 0     0 0   my $self = shift;
1582 0           my ($index) = @_;
1583            
1584 0           my $es = $self->_es;
1585 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1586 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1587 0 0         $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1588              
1589 0           my %args = (
1590             index => $index,
1591             );
1592              
1593 0           my $r;
1594 0           eval {
1595 0           $r = $es->indices->get(%args);
1596             };
1597 0 0         if ($@) {
1598 0           chomp($@);
1599 0           return $self->log->error("get_index: get failed for index [$index]: [$@]");
1600             }
1601              
1602 0           return $r;
1603             }
1604              
1605             sub get_index_stats {
1606 0     0 0   my $self = shift;
1607 0           my ($index) = @_;
1608              
1609 0           my $es = $self->_es;
1610 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1611 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1612              
1613 0           my %args = (
1614             index => $index,
1615             );
1616              
1617 0           my $r;
1618 0           eval {
1619 0           $r = $es->indices->stats(%args);
1620             };
1621 0 0         if ($@) {
1622 0           chomp($@);
1623 0           return $self->log->error("get_index_stats: get failed for index [$index]: ".
1624             "[$@]");
1625             }
1626              
1627 0           return $r->{indices}{$index};
1628             }
1629              
1630             sub list_index_types {
1631 0     0 0   my $self = shift;
1632 0           my ($index) = @_;
1633              
1634 0           my $es = $self->_es;
1635 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1636 0 0         $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1637 0 0         $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1638              
1639 0 0         my $r = $self->get_mappings($index) or return;
1640 0 0         if (keys %$r > 1) {
1641 0           return $self->log->error("list_index_types: multiple indices found, choose one");
1642             }
1643              
1644 0           my @types = ();
1645 0           for my $this_index (keys %$r) {
1646 0           my $mappings = $r->{$this_index}{mappings};
1647 0           push @types, keys %$mappings;
1648             }
1649              
1650 0           my %uniq = map { $_ => 1 } @types;
  0            
1651              
1652 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1653             }
1654              
1655             #
1656             # By default, if you provide only one index and no type,
1657             # all types will be merged (including _default_)
1658             # If you specify one type (other than _default_), _default_ will be merged to it.
1659             #
1660             sub list_index_fields {
1661 0     0 0   my $self = shift;
1662 0           my ($index, $type) = @_;
1663              
1664 0           my $es = $self->_es;
1665 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1666 0 0         $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1667 0 0         $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1668              
1669 0           my $r;
1670 0 0         if (defined($type)) {
1671 0 0         $r = $self->get_mappings($index, $type) or return;
1672 0 0         if (keys %$r > 1) {
1673 0           return $self->log->error("list_index_fields: multiple indices found, ".
1674             "choose one");
1675             }
1676             # _default_ mapping may not exists.
1677 0 0         if ($self->is_mapping_exists($index, '_default_')) {
1678 0           my $r2 = $self->get_mappings($index, '_default_');
1679             # Merge
1680 0           for my $this_index (keys %$r2) {
1681 0           my $default = $r2->{$this_index}{mappings}{'_default_'};
1682 0           $r->{$this_index}{mappings}{_default_} = $default;
1683             }
1684             }
1685             }
1686             else {
1687 0 0         $r = $self->get_mappings($index) or return;
1688 0 0         if (keys %$r > 1) {
1689 0           return $self->log->error("list_index_fields: multiple indices found, ".
1690             "choose one");
1691             }
1692             }
1693              
1694 0           my @fields = ();
1695 0           for my $this_index (keys %$r) {
1696 0           my $mappings = $r->{$this_index}{mappings};
1697 0           for my $this_type (keys %$mappings) {
1698 0           my $properties = $mappings->{$this_type}{properties};
1699 0           push @fields, keys %$properties;
1700             }
1701             }
1702              
1703 0           my %uniq = map { $_ => 1 } @fields;
  0            
1704              
1705 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1706             }
1707              
1708             sub list_indices_version {
1709 0     0 0   my $self = shift;
1710 0           my ($index) = @_;
1711              
1712 0           my $es = $self->_es;
1713 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1714 0 0         $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1715 0 0         $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1716             or return;
1717              
1718 0 0         my $r = $self->get_index($index) or return;
1719              
1720 0           my @list = ();
1721 0           for my $this (keys %$r) {
1722 0           my $name = $this;
1723 0           my $version = $r->{$this}{settings}{index}{version}{created};
1724 0           push @list, {
1725             index => $name,
1726             version => $version,
1727             };
1728             }
1729              
1730 0           return \@list;
1731             }
1732              
1733             sub open_index {
1734 0     0 0   my $self = shift;
1735 0           my ($index) = @_;
1736              
1737 0           my $es = $self->_es;
1738 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1739 0 0         $self->brik_help_run_undef_arg('open_index', $index) or return;
1740 0 0         $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1741              
1742 0           my $r;
1743 0           eval {
1744 0           $r = $es->indices->open(
1745             index => $index,
1746             );
1747             };
1748 0 0         if ($@) {
1749 0           chomp($@);
1750 0           return $self->log->error("open_index: failed: [$@]");
1751             }
1752              
1753 0           return $r;
1754             }
1755              
1756             sub close_index {
1757 0     0 0   my $self = shift;
1758 0           my ($index) = @_;
1759              
1760 0           my $es = $self->_es;
1761 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1762 0 0         $self->brik_help_run_undef_arg('close_index', $index) or return;
1763 0 0         $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1764              
1765 0           my $r;
1766 0           eval {
1767 0           $r = $es->indices->close(
1768             index => $index,
1769             );
1770             };
1771 0 0         if ($@) {
1772 0           chomp($@);
1773 0           return $self->log->error("close_index: failed: [$@]");
1774             }
1775              
1776 0           return $r;
1777             }
1778              
1779             #
1780             # Search::Elasticsearch::Client::5_0::Direct::Indices
1781             #
1782             sub get_aliases {
1783 0     0 0   my $self = shift;
1784 0           my ($index) = @_;
1785              
1786 0   0       $index ||= $self->index;
1787 0           my $es = $self->_es;
1788 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1789              
1790             #
1791             # [DEPRECATION] [types removal] The parameter include_type_name should be
1792             # explicitly specified in get indices requests to prepare for 7.0. In 7.0
1793             # include_type_name will default to 'false', which means responses will
1794             # omit the type name in mapping definitions. - In request: {body => undef,
1795             # ignore => [],method => "GET",path => "/*",qs => {},serialize => "std"}
1796             #
1797              
1798 0           my %args = (
1799             index => $index,
1800             params => { include_type_name => 'false' },
1801             );
1802              
1803 0           my $r;
1804 0           eval {
1805 0           $r = $es->indices->get(%args);
1806             };
1807 0 0         if ($@) {
1808 0           chomp($@);
1809 0           return $self->log->error("get_aliases: get_aliases failed: [$@]");
1810             }
1811              
1812 0           my %aliases = ();
1813 0           for my $this (keys %$r) {
1814 0           $aliases{$this} = $r->{$this}{aliases};
1815             }
1816              
1817 0           return \%aliases;
1818             }
1819              
1820             #
1821             # Search::Elasticsearch::Client::5_0::Direct::Indices
1822             #
1823             sub put_alias {
1824 0     0 0   my $self = shift;
1825 0           my ($index, $alias) = @_;
1826              
1827 0           my $es = $self->_es;
1828 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1829 0 0         $self->brik_help_run_undef_arg('put_alias', $index) or return;
1830 0 0         $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1831              
1832 0           my %args = (
1833             index => $index,
1834             name => $alias,
1835             );
1836              
1837 0           my $r;
1838 0           eval {
1839 0           $r = $es->indices->put_alias(%args);
1840             };
1841 0 0         if ($@) {
1842 0           chomp($@);
1843 0           return $self->log->error("put_alias: put_alias failed: [$@]");
1844             }
1845              
1846 0           return $r;
1847             }
1848              
1849             #
1850             # Search::Elasticsearch::Client::5_0::Direct::Indices
1851             #
1852             sub delete_alias {
1853 0     0 0   my $self = shift;
1854 0           my ($index, $alias) = @_;
1855              
1856 0           my $es = $self->_es;
1857 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1858 0 0         $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1859 0 0         $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1860              
1861 0           my %args = (
1862             index => $index,
1863             name => $alias,
1864             );
1865              
1866 0           my $r;
1867 0           eval {
1868 0           $r = $es->indices->delete_alias(%args);
1869             };
1870 0 0         if ($@) {
1871 0           chomp($@);
1872 0           return $self->log->error("delete_alias: delete_alias failed: [$@]");
1873             }
1874              
1875 0           return $r;
1876             }
1877              
1878             sub update_alias {
1879 0     0 0   my $self = shift;
1880 0           my ($new_index, $alias) = @_;
1881              
1882 0           my $es = $self->_es;
1883 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1884 0 0         $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1885 0 0         $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1886              
1887             # Search for previous index with that alias, if any.
1888 0           my $prev_index;
1889 0 0         my $aliases = $self->get_aliases or return;
1890 0           while (my ($k, $v) = each %$aliases) {
1891 0           for my $this (keys %$v) {
1892 0 0         if ($this eq $alias) {
1893 0           $prev_index = $k;
1894 0           last;
1895             }
1896             }
1897 0 0         last if $prev_index;
1898             }
1899              
1900             # Delete previous alias if it exists.
1901 0 0         if (defined($prev_index)) {
1902 0 0         $self->delete_alias($prev_index, $alias) or return;
1903             }
1904              
1905 0           return $self->put_alias($new_index, $alias);
1906             }
1907              
1908             sub is_mapping_exists {
1909 0     0 0   my $self = shift;
1910 0           my ($index, $mapping) = @_;
1911              
1912 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1913 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1914              
1915 0 0         if (! $self->is_index_exists($index)) {
1916 0           return 0;
1917             }
1918              
1919 0 0         my $all = $self->get_mappings($index) or return;
1920 0           for my $this_index (keys %$all) {
1921 0           my $mappings = $all->{$this_index}{mappings};
1922 0           for my $this_mapping (keys %$mappings) {
1923 0 0         if ($this_mapping eq $mapping) {
1924 0           return 1;
1925             }
1926             }
1927             }
1928              
1929 0           return 0;
1930             }
1931              
1932             #
1933             # Search::Elasticsearch::Client::2_0::Direct::Indices
1934             #
1935             sub get_mappings {
1936 0     0 0   my $self = shift;
1937 0           my ($index, $type) = @_;
1938              
1939 0           my $es = $self->_es;
1940 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1941 0 0         $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1942 0 0         $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1943              
1944 0           my %args = (
1945             index => $index,
1946             params => { include_type_name => 'false' },
1947             );
1948              
1949 0 0         if ($self->use_type) {
1950 0           $args{type} = $type;
1951             }
1952              
1953 0           my $r;
1954 0           eval {
1955 0           $r = $es->indices->get_mapping(%args);
1956             };
1957 0 0         if ($@) {
1958 0           chomp($@);
1959 0           return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1960             "[$@]");
1961             }
1962              
1963 0           return $r;
1964             }
1965              
1966             #
1967             # Search::Elasticsearch::Client::2_0::Direct::Indices
1968             #
1969             sub create_index {
1970 0     0 0   my $self = shift;
1971 0           my ($index, $shards) = @_;
1972              
1973 0           my $es = $self->_es;
1974 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1975 0 0         $self->brik_help_run_undef_arg('create_index', $index) or return;
1976              
1977 0           my %args = (
1978             index => $index,
1979             );
1980              
1981 0 0         if (defined($shards)) {
1982 0           $args{body}{settings}{index}{number_of_shards} = $shards;
1983             }
1984              
1985 0           my $r;
1986 0           eval {
1987 0           $r = $es->indices->create(%args);
1988             };
1989 0 0         if ($@) {
1990 0           chomp($@);
1991 0           return $self->log->error("create_index: create failed ".
1992             "for index [$index]: [$@]");
1993             }
1994            
1995 0           return $r;
1996             }
1997              
1998             #
1999             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
2000             #
2001             sub create_index_with_mappings {
2002 0     0 0   my $self = shift;
2003 0           my ($index, $mappings) = @_;
2004              
2005 0           my $es = $self->_es;
2006 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2007 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
2008 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
2009 0 0         $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
2010             or return;
2011              
2012 0           my $r;
2013 0           eval {
2014 0           $r = $es->indices->create(
2015             index => $index,
2016             body => {
2017             mappings => $mappings,
2018             },
2019             );
2020             };
2021 0 0         if ($@) {
2022 0           chomp($@);
2023 0           return $self->log->error("create_index_with_mappings: create failed for ".
2024             "index [$index]: [$@]");
2025             }
2026              
2027 0           return $r;
2028             }
2029              
2030             # GET http://localhost:9200/
2031             sub info {
2032 0     0 0   my $self = shift;
2033 0           my ($nodes) = @_;
2034              
2035 0   0       $nodes ||= $self->nodes;
2036 0 0         $self->brik_help_run_undef_arg('info', $nodes) or return;
2037 0 0         $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
2038 0 0         $self->brik_help_run_empty_array_arg('info', $nodes) or return;
2039              
2040 0           my $first = $nodes->[0];
2041              
2042 0 0         $self->get($first) or return;
2043              
2044 0           return $self->content;
2045             }
2046              
2047             sub version {
2048 0     0 0   my $self = shift;
2049 0           my ($nodes) = @_;
2050              
2051 0   0       $nodes ||= $self->nodes;
2052 0 0         $self->brik_help_run_undef_arg('version', $nodes) or return;
2053 0 0         $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
2054 0 0         $self->brik_help_run_empty_array_arg('version', $nodes) or return;
2055              
2056 0           my $first = $nodes->[0];
2057              
2058 0 0         $self->get($first) or return;
2059 0 0         my $content = $self->content or return;
2060              
2061 0           return $content->{version}{number};
2062             }
2063              
2064             #
2065             # Search::Elasticsearch::Client::2_0::Direct::Indices
2066             #
2067             sub get_templates {
2068 0     0 0   my $self = shift;
2069              
2070 0           my $es = $self->_es;
2071 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2072              
2073 0           my $r;
2074 0           eval {
2075 0           $r = $es->indices->get_template;
2076             };
2077 0 0         if ($@) {
2078 0           chomp($@);
2079 0           return $self->log->error("get_templates: failed: [$@]");
2080             }
2081              
2082 0           return $r;
2083             }
2084              
2085             sub list_templates {
2086 0     0 0   my $self = shift;
2087              
2088 0 0         my $content = $self->get_templates or return;
2089              
2090 0           return [ sort { $a cmp $b } keys %$content ];
  0            
2091             }
2092              
2093             #
2094             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2095             #
2096             sub get_template {
2097 0     0 0   my $self = shift;
2098 0           my ($template) = @_;
2099              
2100 0           my $es = $self->_es;
2101 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2102 0 0         $self->brik_help_run_undef_arg('get_template', $template) or return;
2103              
2104 0           my $r;
2105 0           eval {
2106 0           $r = $es->indices->get_template(
2107             name => $template,
2108             );
2109             };
2110 0 0         if ($@) {
2111 0           chomp($@);
2112 0           return $self->log->error("get_template: template failed for name [$template]: [$@]");
2113             }
2114              
2115 0           return $r;
2116             }
2117              
2118             sub put_mapping {
2119 0     0 0   my $self = shift;
2120 0           my ($index, $type, $mapping) = @_;
2121              
2122 0           my $es = $self->_es;
2123 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2124 0 0         $self->brik_help_run_undef_arg('put_mapping', $index) or return;
2125 0 0         $self->brik_help_run_undef_arg('put_mapping', $type) or return;
2126 0 0         $self->brik_help_run_undef_arg('put_mapping', $mapping) or return;
2127 0 0         $self->brik_help_run_invalid_arg('put_mapping', $mapping, 'HASH')
2128             or return;
2129              
2130 0           my $r;
2131 0           eval {
2132 0           my %this_args = (
2133             index => $index,
2134             body => $mapping,
2135             );
2136 0 0         if ($self->use_type) {
2137 0           $this_args{type} = $type;
2138             }
2139 0           $r = $es->indices->put_mapping(%this_args);
2140             };
2141 0 0         if ($@) {
2142 0           chomp($@);
2143 0           return $self->log->error("put_mapping: mapping failed ".
2144             "for index [$index]: [$@]");
2145             }
2146              
2147 0           return $r;
2148             }
2149              
2150             #
2151             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2152             #
2153             sub put_template {
2154 0     0 0   my $self = shift;
2155 0           my ($name, $template) = @_;
2156              
2157 0           my $es = $self->_es;
2158 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2159 0 0         $self->brik_help_run_undef_arg('put_template', $name) or return;
2160 0 0         $self->brik_help_run_undef_arg('put_template', $template) or return;
2161 0 0         $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
2162             or return;
2163              
2164 0           my $r;
2165 0           eval {
2166 0           $r = $es->indices->put_template(
2167             name => $name,
2168             body => $template,
2169             );
2170             };
2171 0 0         if ($@) {
2172 0           chomp($@);
2173 0           return $self->log->error("put_template: template failed ".
2174             "for name [$name]: [$@]");
2175             }
2176              
2177 0           return $r;
2178             }
2179              
2180             sub put_mapping_from_json_file {
2181 0     0 0   my $self = shift;
2182 0           my ($index, $type, $json_file) = @_;
2183              
2184 0           my $es = $self->_es;
2185 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2186 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
2187             or return;
2188 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
2189             or return;
2190 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
2191             or return;
2192 0 0         $self->brik_help_run_file_not_found('put_mapping_from_json_file',
2193             $json_file) or return;
2194              
2195 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2196 0 0         my $data = $fj->read($json_file) or return;
2197              
2198 0 0         if (! exists($data->{mappings})) {
2199 0           return $self->log->error("put_mapping_from_json_file: no mapping ".
2200             "data found");
2201             }
2202              
2203 0           return $self->put_mapping($index, $type, $data->{mappings});
2204             }
2205              
2206             sub update_mapping_from_json_file {
2207 0     0 0   my $self = shift;
2208 0           my ($json_file, $index, $type) = @_;
2209              
2210 0           my $es = $self->_es;
2211 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2212 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2213             $json_file) or return;
2214 0 0         $self->brik_help_run_file_not_found('update_mapping_from_json_file',
2215             $json_file) or return;
2216 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2217             $type) or return;
2218 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2219             $index) or return;
2220              
2221 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2222 0 0         my $data = $fj->read($json_file) or return;
2223              
2224 0 0         if (! exists($data->{mappings})) {
2225 0           return $self->log->error("update_mapping_from_json_file: ".
2226             "no data found");
2227             }
2228              
2229 0           my $mappings = $data->{mappings};
2230              
2231 0           return $self->put_mapping($index, $type, $mappings);
2232             }
2233              
2234             sub put_template_from_json_file {
2235 0     0 0   my $self = shift;
2236 0           my ($json_file, $name) = @_;
2237              
2238 0           my $es = $self->_es;
2239 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2240 0 0         $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
2241             or return;
2242 0 0         $self->brik_help_run_file_not_found('put_template_from_json_file',
2243             $json_file) or return;
2244              
2245 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2246 0 0         my $data = $fj->read($json_file) or return;
2247              
2248 0 0         if (!defined($name)) {
2249 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2250             }
2251              
2252 0 0         if (! defined($name)) {
2253 0           return $self->log->error("put_template_from_json_file: no template ".
2254             "name found");
2255             }
2256              
2257 0           return $self->put_template($name, $data);
2258             }
2259              
2260             sub update_template_from_json_file {
2261 0     0 0   my $self = shift;
2262 0           my ($json_file, $name) = @_;
2263              
2264 0           my $es = $self->_es;
2265 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2266 0 0         $self->brik_help_run_undef_arg('update_template_from_json_file',
2267             $json_file) or return;
2268 0 0         $self->brik_help_run_file_not_found('update_template_from_json_file',
2269             $json_file) or return;
2270              
2271 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2272 0 0         my $data = $fj->read($json_file) or return;
2273              
2274 0 0         if (!defined($name)) {
2275 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2276             }
2277              
2278 0 0         if (! defined($name)) {
2279 0           return $self->log->error("put_template_from_json_file: no template ".
2280             "name found");
2281             }
2282              
2283             # We ignore errors, template may not exist.
2284 0           $self->delete_template($name);
2285              
2286 0           return $self->put_template($name, $data);
2287             }
2288              
2289             #
2290             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2291             # Search::Elasticsearch::Client::2_0::Direct::Indices
2292             #
2293             sub get_settings {
2294 0     0 0   my $self = shift;
2295 0           my ($indices, $names) = @_;
2296              
2297 0           my $es = $self->_es;
2298 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2299              
2300 0           my %args = ();
2301 0 0         if (defined($indices)) {
2302 0 0         $self->brik_help_run_undef_arg('get_settings', $indices) or return;
2303 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
2304             or return;
2305 0           $args{index} = $indices;
2306             }
2307 0 0         if (defined($names)) {
2308 0 0         $self->brik_help_run_file_not_found('get_settings', $names) or return;
2309 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
2310             or return;
2311 0           $args{name} = $names;
2312             }
2313              
2314 0           my $r;
2315 0           eval {
2316 0           $r = $es->indices->get_settings(%args);
2317             };
2318 0 0         if ($@) {
2319 0           chomp($@);
2320 0           return $self->log->error("get_settings: failed: [$@]");
2321             }
2322              
2323 0           return $r;
2324             }
2325              
2326             #
2327             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2328             # Search::Elasticsearch::Client::2_0::Direct::Indices
2329             #
2330             # Example:
2331             #
2332             # run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
2333             #
2334             # XXX: should be renamed to put_index_settings
2335             #
2336             sub put_settings {
2337 0     0 0   my $self = shift;
2338 0           my ($settings, $indices) = @_;
2339              
2340 0           my $es = $self->_es;
2341 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2342 0 0         $self->brik_help_run_undef_arg('put_settings', $settings) or return;
2343 0 0         $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
2344              
2345 0           my %args = (
2346             body => $settings,
2347             );
2348 0 0         if (defined($indices)) {
2349 0 0         $self->brik_help_run_undef_arg('put_settings', $indices) or return;
2350 0 0         my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
2351             or return;
2352 0           $args{index} = $indices;
2353             }
2354              
2355 0           my $r;
2356 0           eval {
2357 0           $r = $es->indices->put_settings(%args);
2358             };
2359 0 0         if ($@) {
2360 0           chomp($@);
2361 0           return $self->log->error("put_settings: failed: [$@]");
2362             }
2363              
2364 0           return $r;
2365             }
2366              
2367             sub set_index_readonly {
2368 0     0 0   my $self = shift;
2369 0           my ($indices, $bool) = @_;
2370              
2371 0           my $es = $self->_es;
2372 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2373 0 0         $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
2374 0 0         $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
2375             or return;
2376              
2377 0 0         if (! defined($bool)) {
2378 0           $bool = 'true';
2379             }
2380             else {
2381 0 0         $bool = $bool ? 'true' : 'false';
2382             }
2383              
2384 0           my $settings = {
2385             'blocks.read_only' => $bool,
2386             'blocks.read_only_allow_delete' => 'true',
2387             };
2388              
2389 0           return $self->put_settings($settings, $indices);
2390             }
2391              
2392             #
2393             # curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
2394             # PUT synscan-2018-05/_settings
2395             # {
2396             # "index": {
2397             # "blocks":{
2398             # "read_only":"false",
2399             # "read_only_allow_delete":"true"
2400             # }
2401             # }
2402             #}
2403             #
2404             #
2405             # If it fails with the following error:
2406             #
2407             # [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
2408             #
2409             # Use Kibana dev console and copy/paste both requests:
2410             #
2411             # PUT _all/_settings
2412             # {
2413             # "index": {
2414             # "blocks": {
2415             # "read_only_allow_delete": "false"
2416             # }
2417             # }
2418             # }
2419             #
2420             sub reset_index_readonly {
2421 0     0 0   my $self = shift;
2422 0           my ($indices) = @_;
2423              
2424 0   0       $indices ||= '*';
2425 0           my $es = $self->_es;
2426 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2427 0 0         $self->brik_help_run_invalid_arg('reset_index_readonly', $indices,
2428             'ARRAY', 'SCALAR') or return;
2429              
2430 0           my $settings = {
2431             blocks => {
2432             read_only_allow_delete => 'false',
2433             },
2434             };
2435              
2436             # Settings on '*' indices should be enough to reset for everyone.
2437 0           my $r = $self->put_settings($settings, $indices);
2438             #$self->log->info(Data::Dumper::Dumper($r));
2439              
2440 0           return 1;
2441             }
2442              
2443             sub list_index_readonly {
2444 0     0 0   my $self = shift;
2445              
2446 0           my $es = $self->_es;
2447 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2448              
2449 0 0         my $list = $self->list_indices or return;
2450              
2451 0           my @indices = ();
2452 0           for my $this (@$list) {
2453 0 0         my $ro = $self->get_index_readonly($this) or next;
2454 0 0         if (defined($ro->{index}{provided_name})) {
2455 0           push @indices, $ro->{index}{provided_name};
2456             }
2457             }
2458              
2459 0           return \@indices;
2460             }
2461              
2462             sub set_index_number_of_replicas {
2463 0     0 0   my $self = shift;
2464 0           my ($indices, $number) = @_;
2465              
2466 0           my $es = $self->_es;
2467 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2468 0 0         $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
2469 0 0         $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2470             or return;
2471              
2472 0           my $settings = { number_of_replicas => $number };
2473              
2474 0           return $self->put_settings($settings, $indices);
2475             }
2476              
2477             sub set_index_refresh_interval {
2478 0     0 0   my $self = shift;
2479 0           my ($indices, $number) = @_;
2480              
2481 0           my $es = $self->_es;
2482 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2483 0 0         $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
2484 0 0         $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2485             or return;
2486              
2487             # If there is a meaningful value not postfixed with a unity,
2488             # we default to add a `s' for a number of seconds.
2489 0 0 0       if ($number =~ /^\d+$/ && $number > 0) {
2490 0           $number .= 's';
2491             }
2492              
2493 0           my $settings = { refresh_interval => $number };
2494              
2495 0           return $self->put_settings($settings, $indices);
2496             }
2497              
2498             sub get_index_settings {
2499 0     0 0   my $self = shift;
2500 0           my ($indices) = @_;
2501              
2502 0           my $es = $self->_es;
2503 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2504 0 0         $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
2505 0 0         $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
2506             or return;
2507              
2508 0           my $settings = $self->get_settings($indices);
2509              
2510 0           my %indices = ();
2511 0           for (keys %$settings) {
2512 0           $indices{$_} = $settings->{$_}{settings};
2513             }
2514              
2515 0           return \%indices;
2516             }
2517              
2518             sub get_index_readonly {
2519 0     0 0   my $self = shift;
2520 0           my ($indices) = @_;
2521              
2522 0           my $es = $self->_es;
2523 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2524 0 0         $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
2525 0 0         $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
2526             or return;
2527              
2528 0           my $settings = $self->get_settings($indices);
2529              
2530 0           my %indices = ();
2531 0           for (keys %$settings) {
2532             #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
2533 0           $indices{$_} = $settings->{$_}{settings};
2534             }
2535              
2536 0           return \%indices;
2537             }
2538              
2539             sub get_index_number_of_replicas {
2540 0     0 0   my $self = shift;
2541 0           my ($indices) = @_;
2542              
2543 0           my $es = $self->_es;
2544 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2545 0 0         $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
2546 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2547             or return;
2548              
2549 0           my $settings = $self->get_settings($indices);
2550              
2551 0           my %indices = ();
2552 0           for (keys %$settings) {
2553 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
2554             }
2555              
2556 0           return \%indices;
2557             }
2558              
2559             sub get_index_refresh_interval {
2560 0     0 0   my $self = shift;
2561 0           my ($indices, $number) = @_;
2562              
2563 0           my $es = $self->_es;
2564 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2565 0 0         $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
2566 0 0         $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2567             or return;
2568              
2569 0           my $settings = $self->get_settings($indices);
2570              
2571 0           my %indices = ();
2572 0           for (keys %$settings) {
2573 0           $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
2574             }
2575              
2576 0           return \%indices;
2577             }
2578              
2579             sub get_index_number_of_shards {
2580 0     0 0   my $self = shift;
2581 0           my ($indices, $number) = @_;
2582              
2583 0           my $es = $self->_es;
2584 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2585 0 0         $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
2586 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
2587             or return;
2588              
2589 0           my $settings = $self->get_settings($indices);
2590              
2591 0           my %indices = ();
2592 0           for (keys %$settings) {
2593 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
2594             }
2595              
2596 0           return \%indices;
2597             }
2598              
2599             #
2600             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2601             #
2602             sub delete_template {
2603 0     0 0   my $self = shift;
2604 0           my ($name) = @_;
2605              
2606 0           my $es = $self->_es;
2607 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2608 0 0         $self->brik_help_run_undef_arg('delete_template', $name) or return;
2609              
2610 0           my $r;
2611 0           eval {
2612 0           $r = $es->indices->delete_template(
2613             name => $name,
2614             );
2615             };
2616 0 0         if ($@) {
2617 0           chomp($@);
2618 0           return $self->log->error("delete_template: failed for name [$name]: [$@]");
2619             }
2620              
2621 0           return $r;
2622             }
2623              
2624             #
2625             # Return a boolean to state for index existence
2626             #
2627             sub is_index_exists {
2628 0     0 0   my $self = shift;
2629 0           my ($index) = @_;
2630              
2631 0           my $es = $self->_es;
2632 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2633 0 0         $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
2634              
2635 0           my $r;
2636 0           eval {
2637 0           $r = $es->indices->exists(
2638             index => $index,
2639             );
2640             };
2641 0 0         if ($@) {
2642 0           chomp($@);
2643 0           return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
2644             }
2645              
2646 0 0         return $r ? 1 : 0;
2647             }
2648              
2649             #
2650             # Return a boolean to state for index with type existence
2651             #
2652             sub is_type_exists {
2653 0     0 0   my $self = shift;
2654 0           my ($index, $type) = @_;
2655              
2656 0           my $es = $self->_es;
2657 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2658 0 0         $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
2659 0 0         $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
2660              
2661 0           my $r;
2662 0           eval {
2663 0           my %this_args = (
2664             index => $index,
2665             );
2666 0 0         if ($self->use_type) {
2667 0           $this_args{type} = $type;
2668             }
2669 0           $r = $es->indices->exists_type(%this_args);
2670             };
2671 0 0         if ($@) {
2672 0           chomp($@);
2673 0           return $self->log->error("is_type_exists: failed for index [$index] and ".
2674             "type [$type]: [$@]");
2675             }
2676              
2677 0 0         return $r ? 1 : 0;
2678             }
2679              
2680             #
2681             # Return a boolean to state for document existence
2682             #
2683             sub is_document_exists {
2684 0     0 0   my $self = shift;
2685 0           my ($index, $type, $document) = @_;
2686              
2687 0           my $es = $self->_es;
2688 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2689 0 0         $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2690 0 0         $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2691 0 0         $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2692 0 0         $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2693              
2694 0           my $r;
2695 0           eval {
2696 0           my %this_args = (
2697             index => $index,
2698             %$document,
2699             );
2700 0 0         if ($self->use_type) {
2701 0           $this_args{type} = $type;
2702             }
2703 0           $r = $es->exists(%this_args);
2704             };
2705 0 0         if ($@) {
2706 0           chomp($@);
2707 0           return $self->log->error("is_document_exists: failed for index [$index] and ".
2708             "type [$type]: [$@]");
2709             }
2710              
2711 0 0         return $r ? 1 : 0;
2712             }
2713              
2714             sub parse_error_string {
2715 0     0 0   my $self = shift;
2716 0           my ($string) = @_;
2717              
2718 0 0         $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2719              
2720             # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2721              
2722 0           my ($class, $node, $code, $message, $dump) = $string =~
2723             m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2724              
2725 0 0 0       if (defined($dump) && length($dump)) {
2726 0 0         my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2727 0           $dump = $sd->decode($dump);
2728             }
2729              
2730             # Sanity check
2731 0 0 0       if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
      0        
      0        
      0        
2732             && defined($dump) && ref($dump) eq 'HASH') {
2733             return {
2734 0           class => $class,
2735             node => $node,
2736             code => $code,
2737             message => $message,
2738             dump => $dump,
2739             };
2740             }
2741              
2742             # Were not able to decode, we return as-is.
2743             return {
2744 0           message => $string,
2745             };
2746             }
2747              
2748             #
2749             # Refresh an index to receive latest additions
2750             #
2751             # Search::Elasticsearch::Client::5_0::Direct::Indices
2752             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2753             #
2754             sub refresh_index {
2755 0     0 0   my $self = shift;
2756 0           my ($index) = @_;
2757              
2758 0           my $es = $self->_es;
2759 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2760 0 0         $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2761              
2762 0           my $try = $self->try;
2763              
2764 0           RETRY:
2765              
2766             my $r;
2767 0           eval {
2768 0           $r = $es->indices->refresh(
2769             index => $index,
2770             );
2771             };
2772 0 0         if ($@) {
2773 0 0         if (--$try == 0) {
2774 0           chomp($@);
2775 0           my $p = $self->parse_error_string($@);
2776 0 0 0       if (defined($p) && exists($p->{class})) {
2777 0           my $class = $p->{class};
2778 0           my $code = $p->{code};
2779 0           my $node = $p->{node};
2780 0           return $self->log->error("refresh_index: failed for index [$index] ".
2781             "after [$try] tries with error [$class] code [$code] for node [$node]");
2782             }
2783             else {
2784 0           return $self->log->error("refresh_index: failed for index [$index] ".
2785             "after [$try]: [$@]");
2786             }
2787             }
2788 0           sleep 60;
2789 0           goto RETRY;
2790             }
2791              
2792 0           return $r;
2793             }
2794              
2795             sub export_as {
2796 0     0 0   my $self = shift;
2797 0           my ($format, $index, $size, $cb) = @_;
2798              
2799 0   0       $size ||= 10_000;
2800 0           my $es = $self->_es;
2801 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2802 0 0         $self->brik_help_run_undef_arg('export_as', $format) or return;
2803 0 0         $self->brik_help_run_undef_arg('export_as', $index) or return;
2804 0 0         $self->brik_help_run_undef_arg('export_as', $size) or return;
2805              
2806 0 0 0       if ($format ne 'csv' && $format ne 'json') {
2807 0           return $self->log->error("export_as: unsupported export format ".
2808             "[$format]");
2809             }
2810              
2811 0           my $max = $self->max;
2812 0           my $datadir = $self->datadir;
2813              
2814 0           $self->log->debug("export_as: selecting scroll Command...");
2815              
2816 0           my $scroll;
2817 0 0         my $version = $self->version or return;
2818 0 0         if ($version lt "5.0.0") {
2819 0 0         $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2820             }
2821             else {
2822 0 0         $scroll = $self->open_scroll($index, $size) or return;
2823             }
2824              
2825 0           $self->log->debug("export_as: selecting scroll Command...OK.");
2826              
2827 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2828              
2829 0           my $out;
2830             my $csv_header;
2831 0 0         if ($format eq 'csv') {
    0          
2832 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
2833 0           $out->encoding($self->encoding);
2834 0           $out->separator(',');
2835 0           $out->escape('\\');
2836 0           $out->append(1);
2837 0           $out->first_line_is_header(0);
2838 0           $out->write_header(1);
2839 0           $out->use_quoting(1);
2840 0 0         if (defined($self->csv_header)) {
2841 0           my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
  0            
  0            
2842 0           $out->header($sorted);
2843             }
2844 0 0         if (defined($self->csv_encoded_fields)) {
2845 0           $out->encoded_fields($self->csv_encoded_fields);
2846             }
2847 0 0         if (defined($self->csv_object_fields)) {
2848 0           $out->object_fields($self->csv_object_fields);
2849             }
2850              
2851 0           $csv_header = $out->header;
2852             }
2853             elsif ($format eq 'json') {
2854 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
2855 0           $out->encoding($self->encoding);
2856             }
2857              
2858 0           my $total = $self->total_scroll;
2859 0           $self->log->info("export_as: total [$total] for index [$index]");
2860              
2861 0           my %types = ();
2862 0           my $read = 0;
2863 0           my $skipped = 0;
2864 0           my $exported = 0;
2865 0           my $start = time();
2866 0           my $done = $datadir."/$index.exported";
2867 0           my $start_time = time();
2868 0           my %chunk = ();
2869 0           while (my $next = $self->next_scroll(10000)) {
2870 0           for my $this (@$next) {
2871 0           $read++;
2872              
2873 0 0         if (defined($cb)) {
2874 0           $this = $cb->($this);
2875 0 0         if (! defined($this)) {
2876 0           $self->log->error("export_as: callback failed for index ".
2877             "[$index] at read [$read], skipping single entry");
2878 0           $skipped++;
2879 0           next;
2880             }
2881             }
2882              
2883 0           my $id = $this->{_id};
2884 0           my $doc = $this->{_source};
2885             # Prepare for when types will be removed from ES
2886 0   0       my $type = $this->{_type} || '_doc';
2887 0 0         if (! exists($types{$type})) {
2888 0 0         if ($format eq 'csv') {
    0          
2889             # If not given, we guess the CSV fields to use.
2890 0 0         if (! defined($csv_header)) {
2891 0 0         my $fields = $self->list_index_fields($index, $type)
2892             or return;
2893 0           $types{$type}{header} = [ '_id', @$fields ];
2894             }
2895             else {
2896 0           $types{$type}{header} = [ '_id', @$csv_header ];
2897             }
2898              
2899 0           $types{$type}{output} = $datadir."/$index:$type.csv";
2900             }
2901             elsif ($format eq 'json') {
2902 0           $types{$type}{output} = $datadir."/$index:$type.json";
2903             }
2904              
2905             # Verify it has not been exported yet
2906 0 0         if (-f $done) {
2907 0           return $self->log->error("export_as: export already done ".
2908             "for index [$index]");
2909             }
2910              
2911             $self->log->info("export_as: exporting to file [".
2912 0           $types{$type}{output}."] for type [$type], using ".
2913             "chunk size of [$size]");
2914             }
2915              
2916 0           my $h = { _id => $id };
2917              
2918 0           for my $k (keys %$doc) {
2919 0           $h->{$k} = $doc->{$k};
2920             }
2921              
2922 0 0         if ($format eq 'csv') {
2923 0           $out->header($types{$type}{header});
2924             }
2925              
2926 0           push @{$chunk{$type}}, $h;
  0            
2927 0 0         if (@{$chunk{$type}} > 999) {
  0            
2928 0           my $r = $out->write($chunk{$type}, $types{$type}{output});
2929 0 0         if (!defined($r)) {
2930 0           $self->log->warning("export_as: unable to process entry, ".
2931             "skipping");
2932 0           $skipped++;
2933 0           next;
2934             }
2935 0           $chunk{$type} = [];
2936             }
2937              
2938             # Log a status sometimes.
2939 0 0         if (! (++$exported % 100_000)) {
2940 0           my $now = time();
2941 0           my $perc = sprintf("%.02f", $exported / $total * 100);
2942 0           $self->log->info("export_as: fetched [$exported/$total] ".
2943             "($perc%) elements in ".($now - $start)." second(s) ".
2944             "from index [$index]");
2945 0           $start = time();
2946             }
2947              
2948             # Limit export to specified maximum
2949 0 0 0       if ($max > 0 && $exported >= $max) {
2950 0           $self->log->info("export_as: max export reached [$exported] ".
2951             "for index [$index], stopping");
2952 0           last;
2953             }
2954             }
2955             }
2956              
2957             # Process remaining data waiting to be written and build output file list
2958 0           my %files = ();
2959 0           for my $type (keys %types) {
2960 0 0         if (@{$chunk{$type}} > 0) {
  0            
2961 0           $out->write($chunk{$type}, $types{$type}{output});
2962 0           $files{$types{$type}{output}}++;
2963             }
2964             }
2965              
2966 0           $self->close_scroll;
2967              
2968 0           my $stop_time = time();
2969 0           my $duration = $stop_time - $start_time;
2970 0           my $eps = $exported;
2971 0 0         if ($duration > 0) {
2972 0           $eps = $exported / $duration;
2973             }
2974              
2975             my $result = {
2976             read => $read,
2977             exported => $exported,
2978             skipped => $read - $exported,
2979             total_count => $total,
2980             complete => ($exported == $total) ? 1 : 0,
2981             duration => $duration,
2982             eps => $eps,
2983 0 0         files => [ sort { $a cmp $b } keys %files ],
  0            
2984             };
2985              
2986             # Say the file has been processed, and put resulting stats.
2987 0 0         $fd->write($result, $done) or return;
2988              
2989 0           $self->log->info("export_as: done.");
2990              
2991 0           return $result;
2992             }
2993              
2994             sub export_as_csv {
2995 0     0 0   my $self = shift;
2996 0           my ($index, $size, $cb) = @_;
2997              
2998 0   0       $size ||= 10_000;
2999 0           my $es = $self->_es;
3000 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3001 0 0         $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
3002 0 0         $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
3003              
3004 0           return $self->export_as('csv', $index, $size, $cb);
3005             }
3006              
3007             sub export_as_json {
3008 0     0 0   my $self = shift;
3009 0           my ($index, $size, $cb) = @_;
3010              
3011 0   0       $size ||= 10_000;
3012 0           my $es = $self->_es;
3013 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3014 0 0         $self->brik_help_run_undef_arg('export_as_json', $index) or return;
3015 0 0         $self->brik_help_run_undef_arg('export_as_json', $size) or return;
3016              
3017 0           return $self->export_as('json', $index, $size, $cb);
3018             }
3019              
3020             #
3021             # Optimization instructions:
3022             # https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
3023             #
3024             sub import_from {
3025 0     0 0   my $self = shift;
3026 0           my ($format, $input, $index, $type, $hash, $cb) = @_;
3027              
3028 0           my $es = $self->_es;
3029 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3030 0 0         $self->brik_help_run_undef_arg('import_from', $format) or return;
3031 0 0         $self->brik_help_run_undef_arg('import_from', $input) or return;
3032 0 0         $self->brik_help_run_file_not_found('import_from', $input) or return;
3033              
3034 0 0 0       if ($format ne 'csv' && $format ne 'json') {
3035 0           return $self->log->error("import_from: unsupported export format ".
3036             "[$format]");
3037             }
3038              
3039             # If index and/or types are not defined, we try to get them from
3040             # input filename
3041 0 0 0       if (! defined($index) || ! defined($type)) {
3042             # Example: index-DATE:type.csv
3043 0 0         if ($input =~ m{^(.+):(.+)\.(?:csv|json)(?:.*)?$}) {
3044 0           my ($this_index, $this_type) = $input =~
3045             m{^(.+):(.+)\.(?:csv|json)(?:.*)?$};
3046 0   0       $index ||= $this_index;
3047 0   0       $type ||= $this_type;
3048             }
3049             }
3050              
3051             # Verify it has not been indexed yet
3052 0           my $done = "$input.imported";
3053 0 0         if (-f $done) {
3054 0           $self->log->info("import_from: import already done for file ".
3055             "[$input]");
3056 0           return 0;
3057             }
3058              
3059             # And default to Attributes if guess failed.
3060 0   0       $index ||= $self->index;
3061 0   0       $type ||= $self->type;
3062 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3063 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3064              
3065 0 0         if ($index eq '*') {
3066 0           return $self->log->error("import_from: cannot import to invalid ".
3067             "index [$index]");
3068             }
3069 0 0         if ($self->use_type) {
3070 0 0         if ($type eq '*') {
3071 0           return $self->log->error("import_from: cannot import to invalid ".
3072             "type [$type]");
3073             }
3074             }
3075              
3076 0           $self->log->debug("input [$input]");
3077 0           $self->log->debug("index [$index]");
3078 0           $self->log->debug("type [$type]");
3079              
3080 0           my $count_before = 0;
3081 0 0         if ($self->is_index_exists($index)) {
3082 0           $count_before = $self->count($index, $type);
3083 0 0         if (! defined($count_before)) {
3084 0           return;
3085             }
3086 0           $self->log->info("import_from: current index [$index] count is ".
3087             "[$count_before]");
3088             }
3089              
3090 0           my $max = $self->max;
3091              
3092 0 0         $self->open_bulk_mode($index, $type) or return;
3093              
3094 0           $self->log->info("import_from: importing file [$input] to index ".
3095             "[$index] with type [$type]");
3096              
3097 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3098              
3099 0           my $out;
3100 0 0         if ($format eq 'csv') {
    0          
3101 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
3102 0           $out->encoding($self->encoding);
3103 0           $out->separator(',');
3104 0           $out->escape('\\');
3105 0           $out->first_line_is_header(1);
3106 0           $out->encoded_fields($self->csv_encoded_fields);
3107 0           $out->object_fields($self->csv_object_fields);
3108             }
3109             elsif ($format eq 'json') {
3110 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
3111 0           $out->encoding($self->encoding);
3112             }
3113              
3114 0           my $refresh_interval;
3115             my $number_of_replicas;
3116 0           my $start = time();
3117 0           my $speed_settings = {};
3118 0           my $imported = 0;
3119 0           my $first = 1;
3120 0           my $read = 0;
3121 0           my $skipped_chunks = 0;
3122 0           my $start_time = time();
3123 0           while (my $this = $out->read_next($input)) {
3124 0           $read++;
3125              
3126 0           my $h = {};
3127 0 0         my $id = $self->use_ignore_id ? undef : $this->{_id};
3128 0           delete $this->{_id};
3129 0           for my $k (keys %$this) {
3130 0           my $value = $this->{$k};
3131             # We keep only fields when they have a value.
3132             # No need to index data that is empty.
3133 0 0 0       if (defined($value) && length($value)) {
3134 0           $h->{$k} = $value;
3135             }
3136             }
3137              
3138 0 0         if (defined($cb)) {
3139 0           $h = $cb->($h);
3140 0 0         if (! defined($h)) {
3141 0           $self->log->error("import_from: callback failed for ".
3142             "index [$index] at read [$read], skipping single entry");
3143 0           $skipped_chunks++;
3144 0           next;
3145             }
3146             }
3147              
3148             # Set routing based on the provided field name, if any.
3149 0           my $this_hash;
3150 0 0 0       if (defined($hash) && defined($hash->{routing})
      0        
3151             && defined($h->{$hash->{routing}})) {
3152 0           $this_hash = { %$hash }; # Make a copy to avoid overwriting
3153             # user provided value.
3154 0           $this_hash->{routing} = $h->{$hash->{routing}};
3155             }
3156              
3157             #$self->log->info(Data::Dumper::Dumper($h));
3158              
3159 0           my $r;
3160 0           eval {
3161 0           $r = $self->index_bulk($h, $index, $type, $this_hash, $id);
3162             };
3163 0 0         if ($@) {
3164 0           chomp($@);
3165 0           $self->log->warning("import_from: error [$@]");
3166             }
3167 0 0         if (! defined($r)) {
3168 0           $self->log->error("import_from: bulk processing failed for ".
3169             "index [$index] at read [$read], skipping chunk");
3170 0           $skipped_chunks++;
3171 0           next;
3172             }
3173              
3174             # Gather index settings, and set values for speed.
3175             # We don't do it earlier, cause we need index to be created,
3176             # and it should have been done from index_bulk Command.
3177 0 0 0       if ($first && $self->is_index_exists($index)) {
3178             # Save current values so we can restore them at the end of Command.
3179             # We ignore errors here, this is non-blocking for indexing.
3180 0           $refresh_interval = $self->get_index_refresh_interval($index);
3181 0           $refresh_interval = $refresh_interval->{$index};
3182 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3183 0           $number_of_replicas = $number_of_replicas->{$index};
3184 0 0         if ($self->use_indexing_optimizations) {
3185 0           $self->set_index_number_of_replicas($index, 0);
3186             }
3187 0           $self->set_index_refresh_interval($index, -1);
3188 0           $first = 0;
3189             }
3190              
3191             # Log a status sometimes.
3192 0 0         if (! (++$imported % 100_000)) {
3193 0           my $now = time();
3194 0           $self->log->info("import_from: imported [$imported] entries in ".
3195             ($now - $start)." second(s) to index [$index]");
3196 0           $start = time();
3197             }
3198              
3199             # Limit import to specified maximum
3200 0 0 0       if ($max > 0 && $imported >= $max) {
3201 0           $self->log->info("import_from: max import reached [$imported] for ".
3202             "index [$index], stopping");
3203 0           last;
3204             }
3205             }
3206              
3207 0           $self->bulk_flush;
3208              
3209 0           my $stop_time = time();
3210 0           my $duration = $stop_time - $start_time;
3211 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3212              
3213 0           $self->refresh_index($index);
3214              
3215 0 0         my $count_current = $self->count($index, $type) or return;
3216 0           $self->log->info("import_from: after index [$index] count is [$count_current] ".
3217             "at EPS [$eps]");
3218              
3219 0           my $skipped = 0;
3220 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3221 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3222 0           $imported = $read;
3223             }
3224             else {
3225 0           $skipped = $read - ($count_current - $count_before);
3226             }
3227              
3228 0           my $result = {
3229             read => $read,
3230             imported => $imported,
3231             skipped => $skipped,
3232             previous_count => $count_before,
3233             current_count => $count_current,
3234             complete => $complete,
3235             duration => $duration,
3236             eps => $eps,
3237             };
3238              
3239             # Say the file has been processed, and put resulting stats.
3240 0 0         $fd->write($result, $done) or return;
3241              
3242             # Restore previous settings, if any
3243 0 0         if (defined($refresh_interval)) {
3244 0           $self->set_index_refresh_interval($index, $refresh_interval);
3245             }
3246 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3247 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3248             }
3249              
3250 0           return $result;
3251             }
3252              
3253             sub import_from_csv {
3254 0     0 0   my $self = shift;
3255 0           my ($input, $index, $type, $hash, $cb) = @_;
3256              
3257 0           my $es = $self->_es;
3258 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3259 0 0         $self->brik_help_run_undef_arg('import_from_csv', $input) or return;
3260 0 0         $self->brik_help_run_file_not_found('import_from_csv', $input)
3261             or return;
3262              
3263 0           return $self->import_from('csv', $input, $index, $type, $hash, $cb);
3264             }
3265              
3266             sub import_from_json {
3267 0     0 0   my $self = shift;
3268 0           my ($input, $index, $type, $hash, $cb) = @_;
3269              
3270 0           my $es = $self->_es;
3271 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3272 0 0         $self->brik_help_run_undef_arg('import_from_json', $input) or return;
3273 0 0         $self->brik_help_run_file_not_found('import_from_json', $input)
3274             or return;
3275              
3276 0           return $self->import_from('json', $input, $index, $type, $hash, $cb);
3277             }
3278              
3279             #
3280             # Same as import_from_csv Command but in worker mode for speed.
3281             #
3282             sub import_from_csv_worker {
3283 0     0 0   my $self = shift;
3284 0           my ($input_csv, $index, $type, $hash, $cb) = @_;
3285              
3286 0           my $es = $self->_es;
3287 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3288 0 0         $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv)
3289             or return;
3290 0 0         $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv)
3291             or return;
3292              
3293             # If index and/or types are not defined, we try to get them from input filename
3294 0 0 0       if (! defined($index) || ! defined($type)) {
3295             # Example: index-DATE:type.csv
3296 0 0         if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
3297 0           my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
3298 0   0       $index ||= $this_index;
3299 0   0       $type ||= $this_type;
3300             }
3301             }
3302              
3303             # Verify it has not been indexed yet
3304 0           my $done = "$input_csv.imported";
3305 0 0         if (-f $done) {
3306 0           $self->log->info("import_from_csv_worker: import already done for ".
3307             "file [$input_csv]");
3308 0           return 0;
3309             }
3310              
3311             # And default to Attributes if guess failed.
3312 0   0       $index ||= $self->index;
3313 0   0       $type ||= $self->type;
3314 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3315 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3316              
3317 0 0         if ($index eq '*') {
3318 0           return $self->log->error("import_from_csv_worker: cannot import to invalid ".
3319             "index [$index]");
3320             }
3321 0 0         if ($self->use_type) {
3322 0 0         if ($type eq '*') {
3323 0           return $self->log->error("import_from_csv_worker: cannot import to ".
3324             "invalid type [$type]");
3325             }
3326             }
3327              
3328 0           $self->log->debug("input [$input_csv]");
3329 0           $self->log->debug("index [$index]");
3330 0           $self->log->debug("type [$type]");
3331              
3332 0           my $count_before = 0;
3333 0 0         if ($self->is_index_exists($index)) {
3334 0           $count_before = $self->count($index, $type);
3335 0 0         if (! defined($count_before)) {
3336 0           return;
3337             }
3338 0           $self->log->info("import_from_csv_worker: current index [$index] count is ".
3339             "[$count_before]");
3340             }
3341              
3342 0           my $max = $self->max;
3343              
3344 0 0         $self->open_bulk_mode($index, $type) or return;
3345              
3346             #my $batch = undef;
3347 0           my $batch = 10_000;
3348              
3349 0           $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
3350             "index [$index] with type [$type] and batch [$batch]");
3351              
3352 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3353              
3354 0 0         my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
3355 0           $fc->separator(',');
3356 0           $fc->escape('\\');
3357 0           $fc->first_line_is_header(1);
3358 0           $fc->encoded_fields($self->csv_encoded_fields);
3359 0           $fc->object_fields($self->csv_object_fields);
3360              
3361 0 0         my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
3362 0           $wp->pool_size(2);
3363              
3364 0 0         $wp->create_manager or return;
3365              
3366 0           my $refresh_interval;
3367             my $number_of_replicas;
3368 0           my $start = time();
3369 0           my $speed_settings = {};
3370 0           my $imported = 0;
3371 0           my $first = 1;
3372 0           my $read = 0;
3373 0           my $skipped_chunks = 0;
3374 0           my $start_time = time();
3375 0           while (my $list = $fc->read_next($input_csv, $batch)) {
3376              
3377             $wp->start(sub {
3378 0     0     my @list = ();
3379 0           for my $this (@$list) {
3380 0           $read++;
3381              
3382 0           my $h = {};
3383 0           my $id = $this->{_id};
3384 0           delete $this->{_id};
3385 0           for my $k (keys %$this) {
3386 0           my $value = $this->{$k};
3387             # We keep only fields when they have a value.
3388             # No need to index data that is empty.
3389 0 0 0       if (defined($value) && length($value)) {
3390 0           $h->{$k} = $value;
3391             }
3392             }
3393              
3394 0 0         if (defined($cb)) {
3395 0           $h = $cb->($h);
3396 0 0         if (! defined($h)) {
3397 0           $self->log->error("import_from_csv_worker: callback failed for ".
3398             "index [$index] at read [$read], skipping single entry");
3399 0           $skipped_chunks++;
3400 0           next;
3401             }
3402             }
3403              
3404 0           push @list, $h;
3405             }
3406              
3407 0           my $r;
3408 0           eval {
3409 0           $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
3410             };
3411 0 0         if ($@) {
3412 0           chomp($@);
3413 0           $self->log->warning("import_from_csv_worker: error [$@]");
3414             }
3415 0 0         if (! defined($r)) {
3416 0           $self->log->error("import_from_csv_worker: bulk processing failed for ".
3417             "index [$index] at read [$read], skipping chunk");
3418 0           $skipped_chunks++;
3419 0           next;
3420             }
3421              
3422             # Log a status sometimes.
3423 0 0         if (! ($imported % 10_000)) {
3424 0           my $now = time();
3425 0           my $diff = sprintf("%.02f", $now - $start);
3426 0           my $eps = sprintf("%.02f", $imported / $diff);
3427 0           $self->log->info("import_from_csv_worker: imported [$imported] entries ".
3428             "in [$diff] second(s) to index [$index] at EPS [$eps]");
3429 0           $start = time();
3430             }
3431              
3432 0           exit(0);
3433 0           });
3434              
3435             # Gather index settings, and set values for speed.
3436             # We don't do it earlier, cause we need index to be created,
3437             # and it should have been done from index_bulk Command.
3438 0 0 0       if ($first && $self->is_index_exists($index)) {
3439             # Save current values so we can restore them at the end of Command.
3440             # We ignore errors here, this is non-blocking for indexing.
3441 0           $refresh_interval = $self->get_index_refresh_interval($index);
3442 0           $refresh_interval = $refresh_interval->{$index};
3443 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3444 0           $number_of_replicas = $number_of_replicas->{$index};
3445 0 0         if ($self->use_indexing_optimizations) {
3446 0           $self->set_index_number_of_replicas($index, 0);
3447             }
3448 0           $self->set_index_refresh_interval($index, -1);
3449 0           $first = 0;
3450             }
3451              
3452             # Log a status sometimes.
3453             #$imported += @$list;
3454             #if (! ($imported % 10_000)) {
3455             #my $now = time();
3456             #my $diff = sprintf("%.02f", $now - $start);
3457             #my $eps = sprintf("%.02f", 10_000 / $diff);
3458             #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
3459             #"in [$diff] second(s) to index [$index] at EPS [$eps]");
3460             #$start = time();
3461             #}
3462              
3463             # Limit import to specified maximum
3464 0 0 0       if ($max > 0 && $imported >= $max) {
3465 0           $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
3466             "index [$index], stopping");
3467 0           last;
3468             }
3469              
3470 0 0         last if (@$list < $batch);
3471              
3472 0           $imported += @$list;
3473             }
3474              
3475 0           $wp->stop;
3476              
3477 0           $self->bulk_flush;
3478              
3479 0           my $stop_time = time();
3480 0           my $duration = $stop_time - $start_time;
3481 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3482              
3483 0           $self->refresh_index($index);
3484              
3485 0 0         my $count_current = $self->count($index, $type) or return;
3486 0           $self->log->info("import_from_csv_worker: after index [$index] count ".
3487             "is [$count_current] at EPS [$eps]");
3488              
3489 0           my $skipped = 0;
3490 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3491 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3492 0           $imported = $read;
3493             }
3494             else {
3495 0           $skipped = $read - ($count_current - $count_before);
3496             }
3497              
3498 0           my $result = {
3499             read => $read,
3500             imported => $imported,
3501             skipped => $skipped,
3502             previous_count => $count_before,
3503             current_count => $count_current,
3504             complete => $complete,
3505             duration => $duration,
3506             eps => $eps,
3507             };
3508              
3509             # Say the file has been processed, and put resulting stats.
3510 0 0         $fd->write($result, $done) or return;
3511              
3512             # Restore previous settings, if any
3513 0 0         if (defined($refresh_interval)) {
3514 0           $self->set_index_refresh_interval($index, $refresh_interval);
3515             }
3516 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3517 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3518             }
3519              
3520 0           return $result;
3521             }
3522              
3523             #
3524             # http://localhost:9200/_nodes/stats/process?pretty
3525             #
3526             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3527             #
3528             sub get_stats_process {
3529 0     0 0   my $self = shift;
3530              
3531 0           my $es = $self->_es;
3532 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3533              
3534 0           my $r;
3535 0           eval {
3536 0           $r = $es->nodes->stats(
3537             metric => [ qw(process) ],
3538             );
3539             };
3540 0 0         if ($@) {
3541 0           chomp($@);
3542 0           return $self->log->error("get_stats_process: failed: [$@]");
3543             }
3544              
3545 0           return $r;
3546             }
3547              
3548             #
3549             # curl http://localhost:9200/_nodes/process?pretty
3550             #
3551             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3552             #
3553             sub get_process {
3554 0     0 0   my $self = shift;
3555              
3556 0           my $es = $self->_es;
3557 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3558              
3559 0           my $r;
3560 0           eval {
3561 0           $r = $es->nodes->info(
3562             metric => [ qw(process) ],
3563             );
3564             };
3565 0 0         if ($@) {
3566 0           chomp($@);
3567 0           return $self->log->error("get_process: failed: [$@]");
3568             }
3569              
3570 0           return $r;
3571             }
3572              
3573             #
3574             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3575             #
3576             sub get_cluster_state {
3577 0     0 0   my $self = shift;
3578              
3579 0           my $es = $self->_es;
3580 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3581              
3582 0           my $r;
3583 0           eval {
3584 0           $r = $es->cluster->state;
3585             };
3586 0 0         if ($@) {
3587 0           chomp($@);
3588 0           return $self->log->error("get_cluster_state: failed: [$@]");
3589             }
3590              
3591 0           return $r;
3592             }
3593              
3594             #
3595             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3596             #
3597             sub get_cluster_health {
3598 0     0 0   my $self = shift;
3599              
3600 0           my $es = $self->_es;
3601 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3602              
3603 0           my $r;
3604 0           eval {
3605 0           $r = $es->cluster->health;
3606             };
3607 0 0         if ($@) {
3608 0           chomp($@);
3609 0           return $self->log->error("get_cluster_health: failed: [$@]");
3610             }
3611              
3612 0           return $r;
3613             }
3614              
3615             #
3616             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3617             #
3618             sub get_cluster_settings {
3619 0     0 0   my $self = shift;
3620              
3621 0           my $es = $self->_es;
3622 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3623              
3624 0           my $r;
3625 0           eval {
3626 0           $r = $es->cluster->get_settings;
3627             };
3628 0 0         if ($@) {
3629 0           chomp($@);
3630 0           return $self->log->error("get_cluster_settings: failed: [$@]");
3631             }
3632              
3633 0           return $r;
3634             }
3635              
3636             #
3637             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3638             #
3639             sub put_cluster_settings {
3640 0     0 0   my $self = shift;
3641 0           my ($settings) = @_;
3642              
3643 0           my $es = $self->_es;
3644 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3645 0 0         $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
3646 0 0         $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
3647              
3648 0           my %args = (
3649             body => $settings,
3650             );
3651              
3652 0           my $r;
3653 0           eval {
3654 0           $r = $es->cluster->put_settings(%args);
3655             };
3656 0 0         if ($@) {
3657 0           chomp($@);
3658 0           return $self->log->error("put_cluster_settings: failed: [$@]");
3659             }
3660              
3661 0           return $r;
3662             }
3663              
3664             sub count_green_indices {
3665 0     0 0   my $self = shift;
3666              
3667 0 0         my $get = $self->show_indices or return;
3668              
3669 0           my $count = 0;
3670 0           for (@$get) {
3671 0 0         if (/^\s*green\s+/) {
3672 0           $count++;
3673             }
3674             }
3675              
3676 0           return $count;
3677             }
3678              
3679             sub count_yellow_indices {
3680 0     0 0   my $self = shift;
3681              
3682 0 0         my $get = $self->show_indices or return;
3683              
3684 0           my $count = 0;
3685 0           for (@$get) {
3686 0 0         if (/^\s*yellow\s+/) {
3687 0           $count++;
3688             }
3689             }
3690              
3691 0           return $count;
3692             }
3693              
3694             sub count_red_indices {
3695 0     0 0   my $self = shift;
3696              
3697 0 0         my $get = $self->show_indices or return;
3698              
3699 0           my $count = 0;
3700 0           for (@$get) {
3701 0 0         if (/^\s*red\s+/) {
3702 0           $count++;
3703             }
3704             }
3705              
3706 0           return $count;
3707             }
3708              
3709             sub count_indices {
3710 0     0 0   my $self = shift;
3711              
3712 0 0         my $get = $self->show_indices or return;
3713              
3714 0           return scalar @$get;
3715             }
3716              
3717             sub list_indices_status {
3718 0     0 0   my $self = shift;
3719              
3720 0 0         my $get = $self->show_indices or return;
3721              
3722 0           my $count_red = 0;
3723 0           my $count_yellow = 0;
3724 0           my $count_green = 0;
3725 0           for (@$get) {
3726 0 0         if (/^\s*red\s+/) {
    0          
    0          
3727 0           $count_red++;
3728             }
3729             elsif (/^\s*yellow\s+/) {
3730 0           $count_yellow++;
3731             }
3732             elsif (/^\s*green\s+/) {
3733 0           $count_green++;
3734             }
3735             }
3736              
3737             return {
3738 0           red => $count_red,
3739             yellow => $count_yellow,
3740             green => $count_green,
3741             };
3742             }
3743              
3744             sub count_shards {
3745 0     0 0   my $self = shift;
3746              
3747 0 0         my $indices = $self->get_indices or return;
3748              
3749 0           my $count = 0;
3750 0           for (@$indices) {
3751 0           $count += $_->{shards};
3752             }
3753              
3754 0           return $count;
3755             }
3756              
3757             sub count_size {
3758 0     0 0   my $self = shift;
3759 0           my ($string) = @_;
3760              
3761 0 0         my $indices = $self->get_indices($string) or return;
3762              
3763 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3764 0           $fn->decimal_point(".");
3765 0           $fn->kibi_suffix("kb");
3766 0           $fn->mebi_suffix("mb");
3767 0           $fn->gibi_suffix("gb");
3768 0           $fn->kilo_suffix("KB");
3769 0           $fn->mega_suffix("MB");
3770 0           $fn->giga_suffix("GB");
3771              
3772 0           my $size = 0;
3773 0           for (@$indices) {
3774 0           $size += $fn->to_number($_->{size});
3775             }
3776              
3777 0           return $fn->from_number($size);
3778             }
3779              
3780             sub count_total_size {
3781 0     0 0   my $self = shift;
3782 0           my ($string) = @_;
3783              
3784 0 0         my $indices = $self->get_indices($string) or return;
3785              
3786 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3787 0           $fn->decimal_point(".");
3788 0           $fn->kibi_suffix("kb");
3789 0           $fn->mebi_suffix("mb");
3790 0           $fn->gibi_suffix("gb");
3791 0           $fn->kilo_suffix("KB");
3792 0           $fn->mega_suffix("MB");
3793 0           $fn->giga_suffix("GB");
3794              
3795 0           my $size = 0;
3796 0           for (@$indices) {
3797 0           $size += $fn->to_number($_->{total_size});
3798             }
3799              
3800 0           return $fn->from_number($size);
3801             }
3802              
3803             sub count_count {
3804 0     0 0   my $self = shift;
3805              
3806 0 0         my $indices = $self->get_indices or return;
3807              
3808 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3809 0           $fn->kilo_suffix('k');
3810 0           $fn->mega_suffix('m');
3811 0           $fn->giga_suffix('M');
3812              
3813 0           my $count = 0;
3814 0           for (@$indices) {
3815 0           $count += $_->{count};
3816             }
3817              
3818 0           return $fn->from_number($count);
3819             }
3820              
3821             sub list_green_indices {
3822 0     0 0   my $self = shift;
3823              
3824 0 0         my $get = $self->get_indices or return;
3825              
3826 0           my @indices = ();
3827 0           for (@$get) {
3828 0 0         if ($_->{color} eq 'green') {
3829 0           push @indices, $_->{index};
3830             }
3831             }
3832              
3833 0           return \@indices;
3834             }
3835              
3836             sub list_yellow_indices {
3837 0     0 0   my $self = shift;
3838              
3839 0 0         my $get = $self->get_indices or return;
3840              
3841 0           my @indices = ();
3842 0           for (@$get) {
3843 0 0         if ($_->{color} eq 'yellow') {
3844 0           push @indices, $_->{index};
3845             }
3846             }
3847              
3848 0           return \@indices;
3849             }
3850              
3851             sub list_red_indices {
3852 0     0 0   my $self = shift;
3853              
3854 0 0         my $get = $self->get_indices or return;
3855              
3856 0           my @indices = ();
3857 0           for (@$get) {
3858 0 0         if ($_->{color} eq 'red') {
3859 0           push @indices, $_->{index};
3860             }
3861             }
3862              
3863 0           return \@indices;
3864             }
3865              
3866             #
3867             # https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
3868             #
3869             sub list_datatypes {
3870 0     0 0   my $self = shift;
3871              
3872             return {
3873 0           core => [ qw(string long integer short byte double float data boolean binary) ],
3874             };
3875             }
3876              
3877             #
3878             # Return total hits for last www_search
3879             #
3880             sub get_hits_total {
3881 0     0 0   my $self = shift;
3882 0           my ($run) = @_;
3883              
3884 0 0         $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
3885              
3886 0 0         if (ref($run) eq 'HASH') {
3887 0 0 0       if (exists($run->{hits}) && exists($run->{hits}{total})) {
3888             # In ES 7.x, total is now a hash. We rewrite it to only keep the
3889             # number:
3890 0 0         if (ref($run->{hits}{total}) eq 'HASH') {
3891 0           return $run->{hits}{total}{value};
3892             }
3893 0           return $run->{hits}{total};
3894             }
3895             }
3896              
3897 0           return $self->log->error("get_hits_total: last Command not compatible");
3898             }
3899              
3900             sub disable_shard_allocation {
3901 0     0 0   my $self = shift;
3902              
3903 0           my $settings = {
3904             persistent => {
3905             'cluster.routing.allocation.enable' => 'none',
3906             }
3907             };
3908              
3909 0           return $self->put_cluster_settings($settings);
3910             }
3911              
3912             sub enable_shard_allocation {
3913 0     0 0   my $self = shift;
3914              
3915 0           my $settings = {
3916             persistent => {
3917             'cluster.routing.allocation.enable' => 'all',
3918             }
3919             };
3920              
3921 0           return $self->put_cluster_settings($settings);
3922             }
3923              
3924             sub flush_synced {
3925 0     0 0   my $self = shift;
3926              
3927 0           my $es = $self->_es;
3928 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3929              
3930 0           my $r;
3931 0           eval {
3932 0           $r = $es->indices->flush_synced;
3933             };
3934 0 0         if ($@) {
3935 0           chomp($@);
3936 0           return $self->log->error("flush_synced: failed: [$@]");
3937             }
3938              
3939 0           return $r;
3940             }
3941              
3942             #
3943             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3944             #
3945             # run client::elasticsearch create_snapshot_repository myrepo
3946             # "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
3947             #
3948             # You have to set path.repo in elasticsearch.yml like:
3949             # path.repo: ["/home/gomor/es-backups"]
3950             #
3951             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
3952             #
3953             sub create_snapshot_repository {
3954 0     0 0   my $self = shift;
3955 0           my ($body, $repository_name) = @_;
3956              
3957 0           my $es = $self->_es;
3958 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3959 0 0         $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
3960              
3961 0   0       $repository_name ||= 'repository';
3962              
3963 0           my %args = (
3964             repository => $repository_name,
3965             body => $body,
3966             );
3967              
3968 0           my $r;
3969 0           eval {
3970 0           $r = $es->snapshot->create_repository(%args);
3971             };
3972 0 0         if ($@) {
3973 0           chomp($@);
3974 0           return $self->log->error("create_snapshot_repository: failed: [$@]");
3975             }
3976              
3977 0           return $r;
3978             }
3979              
3980             sub create_shared_fs_snapshot_repository {
3981 0     0 0   my $self = shift;
3982 0           my ($location, $repository_name) = @_;
3983              
3984 0   0       $repository_name ||= 'repository';
3985 0 0         $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
3986              
3987 0 0         if ($location !~ m{^/}) {
3988 0           return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
3989             "a full directory path, this one is invalid [$location]");
3990             }
3991              
3992 0           my $body = {
3993             #type => 'fs',
3994             settings => {
3995             compress => 'true',
3996             location => $location,
3997             },
3998             };
3999              
4000 0           return $self->create_snapshot_repository($body, $repository_name);
4001             }
4002              
4003             #
4004             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
4005             #
4006             sub get_snapshot_repositories {
4007 0     0 0   my $self = shift;
4008              
4009 0           my $es = $self->_es;
4010 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4011              
4012 0           my $r;
4013 0           eval {
4014 0           $r = $es->snapshot->get_repository;
4015             };
4016 0 0         if ($@) {
4017 0           chomp($@);
4018 0           return $self->log->error("get_snapshot_repositories: failed: [$@]");
4019             }
4020              
4021 0           return $r;
4022             }
4023              
4024             #
4025             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
4026             #
4027             sub get_snapshot_status {
4028 0     0 0   my $self = shift;
4029              
4030 0           my $es = $self->_es;
4031 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4032              
4033 0           my $r;
4034 0           eval {
4035 0           $r = $es->snapshot->status;
4036             };
4037 0 0         if ($@) {
4038 0           chomp($@);
4039 0           return $self->log->error("get_snapshot_status: failed: [$@]");
4040             }
4041              
4042 0           return $r;
4043             }
4044              
4045             #
4046             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4047             #
4048             sub create_snapshot {
4049 0     0 0   my $self = shift;
4050 0           my ($snapshot_name, $repository_name, $body) = @_;
4051              
4052 0           my $es = $self->_es;
4053 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4054              
4055 0   0       $snapshot_name ||= 'snapshot';
4056 0   0       $repository_name ||= 'repository';
4057              
4058 0           my %args = (
4059             repository => $repository_name,
4060             snapshot => $snapshot_name,
4061             );
4062 0 0         if (defined($body)) {
4063 0           $args{body} = $body;
4064             }
4065              
4066 0           my $r;
4067 0           eval {
4068 0           $r = $es->snapshot->create(%args);
4069             };
4070 0 0         if ($@) {
4071 0           chomp($@);
4072 0           return $self->log->error("create_snapshot: failed: [$@]");
4073             }
4074              
4075 0           return $r;
4076             }
4077              
4078             sub create_snapshot_for_indices {
4079 0     0 0   my $self = shift;
4080 0           my ($indices, $snapshot_name, $repository_name) = @_;
4081              
4082 0 0         $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
4083              
4084 0   0       $snapshot_name ||= 'snapshot';
4085 0   0       $repository_name ||= 'repository';
4086              
4087 0           my $body = {
4088             indices => $indices,
4089             };
4090              
4091 0           return $self->create_snapshot($snapshot_name, $repository_name, $body);
4092             }
4093              
4094             sub is_snapshot_finished {
4095 0     0 0   my $self = shift;
4096              
4097 0 0         my $status = $self->get_snapshot_status or return;
4098              
4099 0 0         if (@{$status->{snapshots}} == 0) {
  0            
4100 0           return 1;
4101             }
4102              
4103 0           return 0;
4104             }
4105              
4106             sub get_snapshot_state {
4107 0     0 0   my $self = shift;
4108              
4109 0 0         if ($self->is_snapshot_finished) {
4110 0           return $self->log->info("get_snapshot_state: is already finished");
4111             }
4112              
4113 0 0         my $status = $self->get_snapshot_status or return;
4114              
4115 0           my @indices_done = ();
4116 0           my @indices_not_done = ();
4117              
4118 0           my $list = $status->{snapshots};
4119 0           for my $snapshot (@$list) {
4120 0           my $indices = $snapshot->{indices};
4121 0           for my $index (@$indices) {
4122 0           my $done = $index->{shards_stats}{done};
4123 0 0         if ($done) {
4124 0           push @indices_done, $index;
4125             }
4126             else {
4127 0           push @indices_not_done, $index;
4128             }
4129             }
4130             }
4131              
4132 0           return { done => \@indices_done, not_done => \@indices_not_done };
4133             }
4134              
4135       0 0   sub verify_snapshot_repository {
4136             }
4137              
4138             sub delete_snapshot_repository {
4139 0     0 0   my $self = shift;
4140 0           my ($repository_name) = @_;
4141              
4142 0           my $es = $self->_es;
4143 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4144 0 0         $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
4145              
4146 0           my $r;
4147 0           eval {
4148 0           $r = $es->snapshot->delete_repository(
4149             repository => $repository_name,
4150             );
4151             };
4152 0 0         if ($@) {
4153 0           chomp($@);
4154 0           return $self->log->error("delete_snapshot_repository: failed: [$@]");
4155             }
4156              
4157 0           return $r;
4158             }
4159              
4160             sub get_snapshot {
4161 0     0 0   my $self = shift;
4162 0           my ($snapshot_name, $repository_name) = @_;
4163              
4164 0           my $es = $self->_es;
4165 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4166              
4167 0   0       $snapshot_name ||= 'snapshot';
4168 0   0       $repository_name ||= 'repository';
4169              
4170 0           my $r;
4171 0           eval {
4172 0           $r = $es->snapshot->get(
4173             repository => $repository_name,
4174             snapshot => $snapshot_name,
4175             );
4176             };
4177 0 0         if ($@) {
4178 0           chomp($@);
4179 0           return $self->log->error("get_snapshot: failed: [$@]");
4180             }
4181              
4182 0           return $r;
4183             }
4184              
4185             #
4186             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4187             #
4188             sub delete_snapshot {
4189 0     0 0   my $self = shift;
4190 0           my ($snapshot_name, $repository_name) = @_;
4191              
4192 0           my $es = $self->_es;
4193 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4194 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
4195 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
4196              
4197 0           my $timeout = $self->rtimeout;
4198              
4199 0           my $r;
4200 0           eval {
4201 0           $r = $es->snapshot->delete(
4202             repository => $repository_name,
4203             snapshot => $snapshot_name,
4204             master_timeout => "${timeout}s",
4205             );
4206             };
4207 0 0         if ($@) {
4208 0           chomp($@);
4209 0           return $self->log->error("delete_snapshot: failed: [$@]");
4210             }
4211              
4212 0           return $r;
4213             }
4214              
4215             #
4216             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
4217             #
4218             sub restore_snapshot {
4219 0     0 0   my $self = shift;
4220 0           my ($snapshot_name, $repository_name, $body) = @_;
4221              
4222 0           my $es = $self->_es;
4223 0   0       $snapshot_name ||= 'snapshot';
4224 0   0       $repository_name ||= 'repository';
4225 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4226 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
4227 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
4228              
4229 0           my %args = (
4230             repository => $repository_name,
4231             snapshot => $snapshot_name,
4232             );
4233 0 0         if (defined($body)) {
4234 0           $args{body} = $body;
4235             }
4236              
4237 0           my $r;
4238 0           eval {
4239 0           $r = $es->snapshot->restore(%args);
4240             };
4241 0 0         if ($@) {
4242 0           chomp($@);
4243 0           return $self->log->error("restore_snapshot: failed: [$@]");
4244             }
4245              
4246 0           return $r;
4247             }
4248              
4249             sub restore_snapshot_for_indices {
4250 0     0 0   my $self = shift;
4251 0           my ($indices, $snapshot_name, $repository_name) = @_;
4252              
4253 0   0       $snapshot_name ||= 'snapshot';
4254 0   0       $repository_name ||= 'repository';
4255 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
4256 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
4257 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
4258              
4259 0           my $body = {
4260             indices => $indices,
4261             };
4262              
4263 0           return $self->restore_snapshot($snapshot_name, $repository_name, $body);
4264             }
4265              
4266             # shard occupation
4267             #
4268             # curl -XGET "http://127.0.0.1:9200/_cat/shards?v
4269             # Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
4270             #
4271             # disk occuption:
4272             # curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
4273             #
4274             #
4275             # Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
4276             #
4277              
4278             # Check memory lock
4279              
4280             # curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
4281             # {
4282             # "nodes" : {
4283             # "3XXX" : {
4284             # "process" : {
4285             # "mlockall" : true
4286             # }
4287             # }
4288             # }
4289             # }
4290              
4291             1;
4292              
4293             __END__