File Coverage

blib/lib/Metabrik/Client/Elasticsearch.pm
Criterion Covered Total %
statement 9 2014 0.4
branch 0 1206 0.0
condition 0 270 0.0
subroutine 3 130 2.3
pod 2 125 1.6
total 14 3745 0.3


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # client::elasticsearch Brik
5             #
6             package Metabrik::Client::Elasticsearch;
7 2     2   1176 use strict;
  2         5  
  2         59  
8 2     2   11 use warnings;
  2         4  
  2         62  
9              
10 2     2   10 use base qw(Metabrik::Client::Rest);
  2         5  
  2         980  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable es es) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             datadir => [ qw(datadir) ],
20             nodes => [ qw(node_list) ],
21             cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22             date => [ qw(date) ],
23             index => [ qw(index) ],
24             type => [ qw(type) ],
25             from => [ qw(number) ],
26             size => [ qw(count) ],
27             max => [ qw(count) ],
28             max_flush_count => [ qw(count) ],
29             max_flush_size => [ qw(count) ],
30             rtimeout => [ qw(seconds) ],
31             sniff_rtimeout => [ qw(seconds) ],
32             try => [ qw(count) ],
33             use_bulk_autoflush => [ qw(0|1) ],
34             use_indexing_optimizations => [ qw(0|1) ],
35             use_ignore_id => [ qw(0|1) ],
36             use_type => [ qw(0|1) ],
37             csv_header => [ qw(fields) ],
38             csv_encoded_fields => [ qw(fields) ],
39             csv_object_fields => [ qw(fields) ],
40             encoding => [ qw(utf8|ascii) ],
41             _es => [ qw(INTERNAL) ],
42             _bulk => [ qw(INTERNAL) ],
43             _scroll => [ qw(INTERNAL) ],
44             },
45             attributes_default => {
46             nodes => [ qw(http://localhost:9200) ],
47             cxn_pool => 'Sniff',
48             from => 0,
49             size => 10,
50             max => 0,
51             index => '*',
52             type => '*',
53             rtimeout => 60,
54             sniff_rtimeout => 3,
55             try => 3,
56             max_flush_count => 1_000,
57             max_flush_size => 1_000_000,
58             use_bulk_autoflush => 1,
59             use_indexing_optimizations => 0,
60             use_ignore_id => 0,
61             use_type => 1,
62             encoding => 'utf8',
63             },
64             commands => {
65             open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
66             open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
67             open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
68             open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
69             close_scroll => [ ],
70             total_scroll => [ ],
71             next_scroll => [ qw(count|OPTIONAL) ],
72             reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
73             get_reindex_tasks => [ ],
74             cancel_reindex_task => [ qw(id) ],
75             get_taskid => [ qw(id) ],
76             show_reindex_progress => [ ],
77             loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
78             index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
79             index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
80             index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
81             clean_deleted_from_index => [ qw(index) ],
82             update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
83             update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
84             bulk_flush => [ qw(index|OPTIONAL) ],
85             query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
86             count => [ qw(index|OPTIONAL type|OPTIONAL) ],
87             get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
88             www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
89             delete_index => [ qw(index|indices_list) ],
90             update_alias => [ qw(new_index alias) ],
91             delete_document => [ qw(index type id) ],
92             delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
93             show_indices => [ qw(string_filter|OPTIONAL) ],
94             show_nodes => [ ],
95             show_health => [ ],
96             show_recovery => [ ],
97             show_allocation => [ ],
98             list_indices => [ qw(regex|OPTIONAL) ],
99             get_indices => [ qw(string_filter|OPTIONAL) ],
100             get_index => [ qw(index|indices_list) ],
101             get_index_stats => [ qw(index) ],
102             list_index_types => [ qw(index) ],
103             list_index_fields => [ qw(index) ],
104             list_indices_version => [ qw(index|indices_list) ],
105             open_index => [ qw(index|indices_list) ],
106             close_index => [ qw(index|indices_list) ],
107             get_aliases => [ qw(index) ],
108             put_alias => [ qw(index alias) ],
109             delete_alias => [ qw(index alias) ],
110             is_mapping_exists => [ qw(index mapping) ],
111             get_mappings => [ qw(index type|OPTIONAL) ],
112             create_index => [ qw(index shards|OPTIONAL) ],
113             create_index_with_mappings => [ qw(index mappings) ],
114             info => [ qw(nodes_list|OPTIONAL) ],
115             version => [ qw(nodes_list|OPTIONAL) ],
116             get_templates => [ ],
117             list_templates => [ ],
118             get_template => [ qw(name) ],
119             put_mapping => [ qw(index type mapping) ],
120             put_mapping_from_json_file => [ qw(index type file) ],
121             update_mapping_from_json_file => [ qw(file index type) ],
122             put_template => [ qw(name template) ],
123             put_template_from_json_file => [ qw(file name|OPTIONAL) ],
124             update_template_from_json_file => [ qw(file name|OPTIONAL) ],
125             get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
126             put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
127             set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
128             reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
129             list_index_readonly => [ ],
130             set_index_number_of_replicas => [ qw(index|indices_list number) ],
131             set_index_refresh_interval => [ qw(index|indices_list number) ],
132             get_index_settings => [ qw(index|indices_list) ],
133             get_index_readonly => [ qw(index|indices_list) ],
134             get_index_number_of_replicas => [ qw(index|indices) ],
135             get_index_refresh_interval => [ qw(index|indices_list) ],
136             get_index_number_of_shards => [ qw(index|indices_list) ],
137             delete_template => [ qw(name) ],
138             is_index_exists => [ qw(index) ],
139             is_type_exists => [ qw(index type) ],
140             is_document_exists => [ qw(index type document) ],
141             parse_error_string => [ qw(string) ],
142             refresh_index => [ qw(index) ],
143             export_as => [ qw(format index size|OPTIONAL callback|OPTIONAL) ],
144             export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
145             export_as_json => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
146             import_from => [ qw(format input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
147             import_from_csv => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
148             import_from_json => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
149             import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
150             get_stats_process => [ ],
151             get_process => [ ],
152             get_cluster_state => [ ],
153             get_cluster_health => [ ],
154             get_cluster_settings => [ ],
155             put_cluster_settings => [ qw(settings) ],
156             count_green_indices => [ ],
157             count_yellow_indices => [ ],
158             count_red_indices => [ ],
159             list_green_indices => [ ],
160             list_yellow_indices => [ ],
161             list_red_indices => [ ],
162             count_indices => [ ],
163             list_indices_status => [ ],
164             count_shards => [ ],
165             count_size => [ qw(string_filter|OPTIONAL) ],
166             count_total_size => [ qw(string_filter|OPTIONAL) ],
167             count_count => [ ],
168             list_datatypes => [ ],
169             get_hits_total => [ qw(results) ],
170             disable_shard_allocation => [ ],
171             enable_shard_allocation => [ ],
172             flush_synced => [ ],
173             create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
174             create_shared_fs_snapshot_repository => [ qw(location
175             repository_name|OPTIONAL) ],
176             get_snapshot_repositories => [ ],
177             get_snapshot_status => [ ],
178             delete_snapshot_repository => [ qw(repository_name) ],
179             create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
180             body|OPTIONAL) ],
181             create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
182             repository_name|OPTIONAL) ],
183             is_snapshot_finished => [ ],
184             get_snapshot_state => [ ],
185             get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
186             delete_snapshot => [ qw(snapshot_name repository_name) ],
187             restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
188             restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
189             },
190             require_modules => {
191             'Metabrik::String::Json' => [ ],
192             'Metabrik::File::Csv' => [ ],
193             'Metabrik::File::Json' => [ ],
194             'Metabrik::File::Dump' => [ ],
195             'Metabrik::Format::Number' => [ ],
196             'Metabrik::Worker::Parallel' => [ ],
197             'Search::Elasticsearch' => [ ],
198             },
199             };
200             }
201              
202             sub brik_preinit {
203 0     0 1   my $self = shift;
204              
205 0           eval("use Search::Elasticsearch;");
206 0 0         if ($Search::Elasticsearch::VERSION < 5) {
207 0           $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
208             "with: run perl::module install Search::Elasticsearch");
209             }
210              
211 0           return $self->SUPER::brik_preinit;
212             }
213              
214             sub open {
215 0     0 0   my $self = shift;
216 0           my ($nodes, $cxn_pool) = @_;
217              
218 0   0       $nodes ||= $self->nodes;
219 0   0       $cxn_pool ||= $self->cxn_pool;
220 0 0         $self->brik_help_run_undef_arg('open', $nodes) or return;
221 0 0         $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
222 0 0         $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
223 0 0         $self->brik_help_run_empty_array_arg('open', $nodes) or return;
224              
225 0           for my $node (@$nodes) {
226 0 0         if ($node !~ m{https?://}) {
227 0           return $self->log->error("open: invalid node[$node], must start with http(s)");
228             }
229             }
230              
231 0           my $timeout = $self->rtimeout;
232              
233 0           my $nodes_str = join('|', @$nodes);
234 0           $self->log->debug("open: using nodes [$nodes_str]");
235              
236             #
237             # Timeout description here:
238             #
239             # Search::Elasticsearch::Role::Cxn
240             #
241              
242 0           my $es = Search::Elasticsearch->new(
243             nodes => $nodes,
244             cxn_pool => $cxn_pool,
245             timeout => $timeout,
246             max_retries => $self->try,
247             retry_on_timeout => 1,
248             sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
249             request_timeout => 60, # seconds, default 30
250             ping_timeout => 5, # seconds, default 2
251             dead_timeout => 120, # seconds, detault 60
252             max_dead_timeout => 3600, # seconds, default 3600
253             sniff_request_timeout => 15, # seconds, default 2
254             #trace_to => 'Stderr', # For debug purposes
255             );
256 0 0         if (! defined($es)) {
257 0           return $self->log->error("open: failed");
258             }
259              
260 0           $self->_es($es);
261              
262 0           return $nodes;
263             }
264              
265             #
266             # Search::Elasticsearch::Client::5_0::Bulk
267             #
268             sub open_bulk_mode {
269 0     0 0   my $self = shift;
270 0           my ($index, $type) = @_;
271              
272 0   0       $index ||= $self->index;
273 0   0       $type ||= $self->type;
274 0           my $es = $self->_es;
275 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
276 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
277 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
278              
279             my %args = (
280             index => $index,
281             on_error => sub {
282             #my ($action, $response, $i) = @_;
283              
284             #print Data::Dumper::Dumper($action)."\n";
285             #print Data::Dumper::Dumper($response)."\n";
286             #print Data::Dumper::Dumper($i)."\n";
287 0     0     print Data::Dumper::Dumper(\@_)."\n";
288             },
289 0           );
290              
291 0 0         if ($self->use_type) {
292 0           $args{type} = $type;
293             }
294              
295 0 0         if ($self->use_bulk_autoflush) {
296 0   0       my $max_count = $self->max_flush_count || 1_000;
297 0   0       my $max_size = $self->max_flush_size || 1_000_000;
298              
299 0           $args{max_count} = $max_count;
300 0           $args{max_size} = $max_size;
301 0           $args{max_time} = 0;
302              
303 0           $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
304             "max_flush_size [$max_size]");
305             }
306             else {
307 0           $args{max_count} = 0;
308 0           $args{max_size} = 0;
309 0           $args{max_time} = 0;
310 0           $args{on_error} = undef;
311             #$args{on_success} = sub {
312             #my ($action, $response, $i) = @_;
313             #};
314              
315 0           $self->log->info("open_bulk_mode: opening without automatic flushing");
316             }
317              
318 0           my $bulk;
319 0           eval {
320 0           $bulk = $es->bulk_helper(%args);
321             };
322 0 0         if ($@) {
323 0           chomp($@);
324 0           return $self->log->error("open_bulk_mode: failed: [$@]");
325             }
326              
327 0           $self->_bulk($bulk);
328              
329 0           return $self->nodes;
330             }
331              
332             sub open_scroll_scan_mode {
333 0     0 0   my $self = shift;
334 0           my ($index, $size) = @_;
335              
336 0 0         my $version = $self->version or return;
337 0 0         if ($version ge "5.0.0") {
338 0           return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
339             "$version, try open_scroll Command instead");
340             }
341              
342 0   0       $index ||= $self->index;
343 0   0       $size ||= $self->size;
344 0           my $es = $self->_es;
345 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
346 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
347 0 0         $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
348              
349 0           my $scroll;
350 0           eval {
351 0           $scroll = $es->scroll_helper(
352             index => $index,
353             search_type => 'scan',
354             size => $size,
355             );
356             };
357 0 0         if ($@) {
358 0           chomp($@);
359 0           return $self->log->error("open_scroll_scan_mode: failed: $@");
360             }
361              
362 0           $self->_scroll($scroll);
363              
364 0           return $self->nodes;
365             }
366              
367             #
368             # Search::Elasticsearch::Client::5_0::Scroll
369             #
370             sub open_scroll {
371 0     0 0   my $self = shift;
372 0           my ($index, $size, $type, $query) = @_;
373              
374 0 0         my $version = $self->version or return;
375 0 0         if ($version lt "5.0.0") {
376 0           return $self->log->error("open_scroll: Command not supported for ES version ".
377             "$version, try open_scroll_scan_mode Command instead");
378             }
379              
380 0   0       $query ||= { query => { match_all => {} } };
381 0   0       $index ||= $self->index;
382 0   0       $type ||= $self->type;
383 0   0       $size ||= $self->size;
384 0           my $es = $self->_es;
385 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
386 0 0         $self->brik_help_run_undef_arg('open_scroll', $index) or return;
387 0 0         $self->brik_help_run_undef_arg('open_scroll', $size) or return;
388              
389 0           my $timeout = $self->rtimeout;
390              
391 0           my %args = (
392             scroll => "${timeout}s",
393             scroll_in_qs => 1, # By default (0), pass scroll_id in request body. When 1, pass
394             # it in query string.
395             index => $index,
396             size => $size,
397             body => $query,
398             );
399 0 0         if ($self->use_type) {
400 0 0         if ($type ne '*') {
401 0           $args{type} = $type;
402             }
403             }
404              
405             #
406             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
407             #
408 0           my $scroll;
409 0           eval {
410 0           $scroll = $es->scroll_helper(%args);
411             };
412 0 0         if ($@) {
413 0           chomp($@);
414 0           return $self->log->error("open_scroll: failed: $@");
415             }
416              
417 0           $self->_scroll($scroll);
418              
419 0           $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
420              
421 0           return $self->nodes;
422             }
423              
424             #
425             # Search::Elasticsearch::Client::5_0::Scroll
426             #
427             sub close_scroll {
428 0     0 0   my $self = shift;
429              
430 0           my $scroll = $self->_scroll;
431 0 0         if (! defined($scroll)) {
432 0           return 1;
433             }
434              
435 0           $scroll->finish;
436 0           $self->_scroll(undef);
437              
438 0           return 1;
439             }
440              
441             sub total_scroll {
442 0     0 0   my $self = shift;
443              
444 0           my $scroll = $self->_scroll;
445 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
446              
447 0           my $total;
448 0           eval {
449 0           $total = $scroll->total;
450             };
451 0 0         if ($@) {
452 0           chomp($@);
453 0           return $self->log->error("total_scroll: failed with: [$@]");
454             }
455              
456 0           return $total;
457             }
458              
459             sub next_scroll {
460 0     0 0   my $self = shift;
461 0           my ($count) = @_;
462              
463 0   0       $count ||= 1;
464              
465 0           my $scroll = $self->_scroll;
466 0 0         $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
467              
468 0           my $next;
469 0           eval {
470 0 0         if ($count > 1) {
471 0           my @docs = $scroll->next($count);
472 0 0         if (@docs > 0) {
473 0           $next = \@docs;
474             }
475             }
476             else {
477 0           $next = $scroll->next;
478             }
479             };
480 0 0         if ($@) {
481 0           chomp($@);
482 0           return $self->log->error("next_scroll: failed with: [$@]");
483             }
484              
485 0           return $next;
486             }
487              
488             #
489             # Search::Elasticsearch::Client::5_0::Direct
490             #
491             sub index_document {
492 0     0 0   my $self = shift;
493 0           my ($doc, $index, $type, $hash, $id) = @_;
494              
495 0   0       $index ||= $self->index;
496 0   0       $type ||= $self->type;
497 0           my $es = $self->_es;
498 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
499 0 0         $self->brik_help_run_undef_arg('index_document', $doc) or return;
500 0 0         $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
501 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
502 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
503              
504 0           my %args = (
505             index => $index,
506             body => $doc,
507             );
508 0 0         if (defined($id)) {
509 0           $args{id} = $id;
510             }
511              
512 0 0         if ($self->use_type) {
513 0           $args{type} = $type;
514             }
515              
516 0 0         if (defined($hash)) {
517 0 0         $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH')
518             or return;
519 0           my $this_hash = { %$hash };
520 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
521 0           $this_hash->{routing} = $doc->{$hash->{routing}};
522             }
523 0           %args = ( %args, %$this_hash );
524             }
525              
526 0           my $r;
527 0           eval {
528 0           $r = $es->index(%args);
529             };
530 0 0         if ($@) {
531 0           chomp($@);
532 0           return $self->log->error("index_document: index failed for ".
533             "index [$index]: [$@]");
534             }
535              
536 0           return $r;
537             }
538              
539             #
540             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
541             #
542             sub reindex {
543 0     0 0   my $self = shift;
544 0           my ($index, $new, $type) = @_;
545              
546 0           my $es = $self->_es;
547 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
548 0 0         $self->brik_help_run_undef_arg('reindex', $index) or return;
549 0 0         $self->brik_help_run_undef_arg('reindex', $new) or return;
550              
551 0           my %args = (
552             body => {
553             conflicts => 'proceed',
554             source => { index => $index },
555             dest => { index => $new },
556             },
557             wait_for_completion => 'false', # Immediately return the task.
558             );
559              
560             # Change the type for destination doc
561 0 0         if ($self->use_type) {
562 0 0         if (defined($type)) {
563 0           $args{body}{dest}{type} = $type;
564             }
565             }
566              
567 0           my $r;
568 0           eval {
569 0           $r = $es->reindex(%args);
570             };
571 0 0         if ($@) {
572 0           chomp($@);
573 0           return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
574             }
575              
576 0           return $r;
577             }
578              
579             #
580             # List reindex tasks
581             #
582             # curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
583             #
584             # Cancel reindex task
585             #
586             # curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
587             #
588              
589             #
590             # Search::Elasticsearch::Client::6_0::Direct::Tasks
591             #
592             sub get_reindex_tasks {
593 0     0 0   my $self = shift;
594              
595 0           my $es = $self->_es;
596 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
597              
598 0           my $t = $es->tasks;
599              
600 0           my $list = $t->list;
601 0           my $nodes = $list->{nodes};
602 0 0         if (! defined($nodes)) {
603 0           return $self->log->error("get_reindex_tasks: no nodes found");
604             }
605              
606 0           my %tasks = ();
607 0           for my $node (keys %$nodes) {
608 0           for my $id (keys %{$nodes->{$node}}) {
  0            
609 0           my $tasks = $nodes->{$node}{tasks};
610 0           for my $task (keys %$tasks) {
611 0           my $action = $tasks->{$task}{action};
612 0 0 0       if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
613 0           $tasks{$task} = $tasks->{$task};
614             }
615             }
616             }
617             }
618              
619 0           return \%tasks;
620             }
621              
622             sub cancel_reindex_task {
623 0     0 0   my $self = shift;
624 0           my ($id) = @_;
625              
626 0           my $es = $self->_es;
627 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
628 0 0         $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;
629              
630 0           my $t = $es->tasks;
631              
632 0           return $t->cancel(task_id => $id);
633             }
634              
635             sub get_taskid {
636 0     0 0   my $self = shift;
637 0           my ($id) = @_;
638              
639 0           my $es = $self->_es;
640 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
641 0 0         $self->brik_help_run_undef_arg('get_taskid', $id) or return;
642              
643 0           my $t = $es->tasks;
644              
645 0           return $t->get(task_id => $id);
646             }
647              
648             sub show_reindex_progress {
649 0     0 0   my $self = shift;
650              
651 0           my $es = $self->_es;
652 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
653              
654 0 0         my $tasks = $self->get_reindex_tasks or return;
655 0 0         if (! keys %$tasks) {
656 0           $self->log->info("show_reindex_progress: no reindex task in progress");
657 0           return 0;
658             }
659              
660 0           for my $id (keys %$tasks) {
661 0 0         my $task = $self->get_taskid($id) or next;
662              
663 0           my $status = $task->{task}{status};
664 0           my $desc = $task->{task}{description};
665 0           my $total = $status->{total};
666 0           my $created = $status->{created};
667 0           my $deleted = $status->{deleted};
668 0           my $updated = $status->{updated};
669              
670 0           my $perc = ($created + $deleted + $updated) / $total * 100;
671              
672 0           printf("> Task [%s]: %.02f%%\n", $desc, $perc);
673 0           print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
674             }
675              
676 0           return 1;
677             }
678              
679             sub loop_show_reindex_progress {
680 0     0 0   my $self = shift;
681 0           my ($sec) = @_;
682              
683 0   0       $sec ||= 60;
684 0           my $es = $self->_es;
685 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
686              
687 0           while (1) {
688 0 0         $self->show_reindex_progress or return;
689 0           sleep($sec);
690             }
691              
692 0           return 1;
693             }
694              
695             sub reindex_with_mapping_from_json_file {
696 0     0 0   my $self = shift;
697 0           my ($index, $new, $file) = @_;
698              
699 0           my $es = $self->_es;
700 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
701 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
702             or return;
703 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
704 0 0         $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
705 0 0         $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
706             or return;
707              
708 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
709 0 0         my $json = $fj->read($file) or return;
710              
711 0           return $self->reindex($index, $new, $json);
712             }
713              
714             #
715             # Search::Elasticsearch::Client::5_0::Direct
716             #
717             # To execute this Command using routing requires to use the correct field
718             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
719             # this would be a little bit complicated to do in an efficient way.
720             #
721             sub update_document {
722 0     0 0   my $self = shift;
723 0           my ($doc, $id, $index, $type, $hash) = @_;
724              
725 0   0       $index ||= $self->index;
726 0   0       $type ||= $self->type;
727 0           my $es = $self->_es;
728 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
729 0 0         $self->brik_help_run_undef_arg('update_document', $doc) or return;
730 0 0         $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
731 0 0         $self->brik_help_run_undef_arg('update_document', $id) or return;
732 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
733 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
734              
735 0           my %args = (
736             id => $id,
737             index => $index,
738             body => { doc => $doc },
739             );
740              
741 0 0         if ($self->use_type) {
742 0           $args{type} = $type;
743             }
744              
745 0 0         if (defined($hash)) {
746 0 0         $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
747             or return;
748 0           %args = ( %args, %$hash );
749             }
750              
751 0           my $r;
752 0           eval {
753 0           $r = $es->update(%args);
754             };
755 0 0         if ($@) {
756 0           chomp($@);
757 0           return $self->log->error("update_document: index failed for index [$index]: [$@]");
758             }
759              
760 0           return $r;
761             }
762              
763             #
764             # Search::Elasticsearch::Client::5_0::Bulk
765             #
766             sub index_bulk {
767 0     0 0   my $self = shift;
768 0           my ($doc, $index, $type, $hash, $id) = @_;
769              
770 0           my $bulk = $self->_bulk;
771 0   0       $index ||= $self->index;
772 0   0       $type ||= $self->type;
773 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
774 0 0         $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
775 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
776 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
777              
778 0           my %args = (
779             source => $doc,
780             );
781 0 0         if (defined($id)) {
782 0           $args{id} = $id;
783             }
784              
785 0 0         if (defined($hash)) {
786 0 0         $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
787 0           my $this_hash = { %$hash };
788 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
789 0           $this_hash->{routing} = $doc->{$hash->{routing}};
790             }
791 0           %args = ( %args, %$this_hash );
792             }
793              
794 0           my $r;
795 0           eval {
796 0           $r = $bulk->add_action(index => \%args);
797             };
798 0 0         if ($@) {
799 0           chomp($@);
800 0           my $p = $self->parse_error_string($@);
801 0 0 0       if (defined($p) && exists($p->{class})) {
802 0           my $class = $p->{class};
803 0           my $code = $p->{code};
804 0           my $node = $p->{node};
805 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
806             "[$class] code [$code] for node [$node]");
807             }
808             else {
809 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
810             }
811             }
812              
813 0           return $r;
814             }
815              
816             #
817             # Allows to index multiple docs at one time
818             # $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
819             #
820             sub index_bulk_from_list {
821 0     0 0   my $self = shift;
822 0           my ($list, $index, $type, $hash) = @_;
823              
824 0           my $bulk = $self->_bulk;
825 0   0       $index ||= $self->index;
826 0   0       $type ||= $self->type;
827 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
828 0 0         $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
829 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY')
830             or return;
831 0 0         $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list)
832             or return;
833 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
834 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
835              
836 0 0         if (defined($hash)) {
837 0 0         $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH')
838             or return;
839             }
840              
841 0           my @args = ();
842 0           for my $doc (@$list) {
843 0           my %args = (
844             source => $doc,
845             );
846 0 0         if (defined($hash)) {
847 0           my $this_hash = { %$hash };
848 0 0 0       if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
849 0           $this_hash->{routing} = $doc->{$hash->{routing}};
850             }
851 0           %args = ( %args, %$this_hash );
852             }
853 0           push @args, \%args;
854             }
855              
856 0           my $r;
857 0           eval {
858 0           $r = $bulk->index(@args);
859             };
860 0 0         if ($@) {
861 0           chomp($@);
862 0           my $p = $self->parse_error_string($@);
863 0 0 0       if (defined($p) && exists($p->{class})) {
864 0           my $class = $p->{class};
865 0           my $code = $p->{code};
866 0           my $node = $p->{node};
867 0           return $self->log->error("index_bulk: failed for index [$index] with error ".
868             "[$class] code [$code] for node [$node]");
869             }
870             else {
871 0           return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
872             }
873             }
874              
875 0           return $r;
876             }
877              
878             #
879             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
880             #
881             sub clean_deleted_from_index {
882 0     0 0   my $self = shift;
883 0           my ($index) = @_;
884              
885 0 0         $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;
886              
887 0           my $es = $self->_es;
888 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
889              
890 0           my $indices = $self->_es->indices;
891              
892 0           my $r;
893 0           eval {
894 0           $r = $indices->forcemerge(
895             index => $index,
896             only_expunge_deletes => 'true',
897             );
898             };
899 0 0         if ($@) {
900 0           chomp($@);
901 0           my $p = $self->parse_error_string($@);
902 0 0 0       if (defined($p) && exists($p->{class})) {
903 0           my $class = $p->{class};
904 0           my $code = $p->{code};
905 0           my $node = $p->{node};
906 0           return $self->log->error("clean_deleted_from_index: failed for index ".
907             "[$index] with error [$class] code [$code] for node [$node]");
908             }
909             else {
910 0           return $self->log->error("clean_deleted_from_index: index failed for ".
911             "index [$index]: [$@]");
912             }
913             }
914              
915 0           return $r;
916             }
917              
918             #
919             # To execute this Command using routing requires to use the correct field
920             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
921             # this would be a little bit complicated to do in an efficient way.
922             #
923             sub update_document_bulk {
924 0     0 0   my $self = shift;
925 0           my ($doc, $index, $type, $hash, $id) = @_;
926              
927 0           my $bulk = $self->_bulk;
928 0   0       $index ||= $self->index;
929 0   0       $type ||= $self->type;
930 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
931 0 0         $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
932 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
933 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
934              
935 0           my %args = (
936             index => $index,
937             doc => $doc,
938             );
939              
940 0 0         if ($self->use_type) {
941 0           $args{type} = $type;
942             }
943              
944 0 0         if (defined($id)) {
945 0           $args{id} = $id;
946             }
947              
948 0 0         if (defined($hash)) {
949 0 0         $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
950             or return;
951 0           %args = ( %args, %$hash );
952             }
953              
954 0           my $r;
955 0           eval {
956 0           $r = $bulk->update(\%args);
957             };
958 0 0         if ($@) {
959 0           chomp($@);
960 0           my $p = $self->parse_error_string($@);
961 0 0 0       if (defined($p) && exists($p->{class})) {
962 0           my $class = $p->{class};
963 0           my $code = $p->{code};
964 0           my $node = $p->{node};
965 0           return $self->log->error("update_document_bulk: failed for index [$index] ".
966             "with error [$class] code [$code] for node [$node]");
967             }
968             else {
969 0           return $self->log->error("update_document_bulk: index failed for ".
970             "index [$index]: [$@]");
971             }
972             }
973              
974 0           return $r;
975             }
976              
977             #
978             # We may have to call refresh_index after a bulk_flush, so we give an additional
979             # optional Argument for given index.
980             #
981             sub bulk_flush {
982 0     0 0   my $self = shift;
983 0           my ($index) = @_;
984              
985 0           my $bulk = $self->_bulk;
986 0 0         $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
987              
988 0           my $try = $self->try;
989              
990 0           RETRY:
991              
992             my $r;
993 0           eval {
994 0           $r = $bulk->flush;
995             };
996 0 0         if ($@) {
997 0           chomp($@);
998 0 0         if (--$try == 0) {
999 0           my $p = $self->parse_error_string($@);
1000 0 0 0       if (defined($p) && exists($p->{class})) {
1001 0           my $class = $p->{class};
1002 0           my $code = $p->{code};
1003 0           my $node = $p->{node};
1004 0           return $self->log->error("bulk_flush: failed after [$try] tries with error ".
1005             "[$class] code [$code] for node [$node]");
1006             }
1007             else {
1008 0           return $self->log->error("bulk_flush: failed after [$try]: [$@]");
1009             }
1010             }
1011 0           $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
1012             "[$@]");
1013 0           sleep 10;
1014 0           goto RETRY;
1015             }
1016              
1017 0 0         if (defined($index)) {
1018 0           $self->refresh_index($index);
1019             }
1020              
1021 0           return $r;
1022             }
1023              
1024             #
1025             # Search::Elasticsearch::Client::2_0::Direct
1026             # Search::Elasticsearch::Client::5_0::Direct
1027             #
1028             sub count {
1029 0     0 0   my $self = shift;
1030 0           my ($index, $type) = @_;
1031              
1032 0   0       $index ||= $self->index;
1033 0   0       $type ||= $self->type;
1034 0           my $es = $self->_es;
1035 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1036              
1037 0           my %args = ();
1038 0 0 0       if (defined($index) && $index ne '*') {
1039 0           $args{index} = $index;
1040             }
1041 0 0         if ($self->use_type) {
1042 0 0 0       if (defined($type) && $type ne '*') {
1043 0           $args{type} = $type;
1044             }
1045             }
1046              
1047             #$args{body} = {
1048             #query => {
1049             #match => { title => 'Elasticsearch clients' },
1050             #},
1051             #}
1052              
1053 0           my $r;
1054 0 0         my $version = $self->version or return;
1055 0 0         if ($version ge "5.0.0") {
1056 0           eval {
1057 0           $r = $es->count(%args);
1058             };
1059             }
1060             else {
1061 0           eval {
1062 0           my %this_args = (
1063             index => $index,
1064             search_type => 'count',
1065             body => {
1066             query => {
1067             match_all => {},
1068             },
1069             },
1070             );
1071 0 0         if ($self->use_type) {
1072 0           $this_args{type} = $type;
1073             }
1074 0           $r = $es->search(%args);
1075             };
1076             }
1077 0 0         if ($@) {
1078 0           chomp($@);
1079 0           return $self->log->error("count: count failed for index [$index]: [$@]");
1080             }
1081              
1082 0 0 0       if ($version ge "5.0.0") {
    0          
1083 0 0         if (exists($r->{count})) {
1084 0           return $r->{count};
1085             }
1086             }
1087             elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
1088 0           return $r->{hits}{total};
1089             }
1090              
1091 0           return $self->log->error("count: nothing found");
1092             }
1093              
1094             #
1095             # https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
1096             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
1097             #
1098             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1099             #
1100             # To perform a query using routing requires to use the correct field
1101             # value directly in $hash->{routing}. We cannot "guess" it from $q,
1102             # this would be a little bit complicated to do in an efficient way.
1103             #
1104             sub query {
1105 0     0 0   my $self = shift;
1106 0           my ($query, $index, $type, $hash) = @_;
1107              
1108 0   0       $index ||= $self->index;
1109 0   0       $type ||= $self->type;
1110 0           my $es = $self->_es;
1111 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1112 0 0         $self->brik_help_run_undef_arg('query', $query) or return;
1113 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1114 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1115 0 0         $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
1116              
1117 0           my $timeout = $self->rtimeout;
1118              
1119 0           my %args = (
1120             index => $index,
1121             body => $query,
1122             );
1123              
1124 0 0         if (defined($hash)) {
1125 0 0         $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
1126 0           %args = ( %args, %$hash );
1127             }
1128              
1129 0 0         if ($self->use_type) {
1130 0 0         if ($type ne '*') {
1131 0           $args{type} = $type;
1132             }
1133             }
1134              
1135 0           my $r;
1136 0           eval {
1137 0           $r = $es->search(%args);
1138             };
1139 0 0         if ($@) {
1140 0           chomp($@);
1141 0           return $self->log->error("query: failed for index [$index]: [$@]");
1142             }
1143              
1144 0           return $r;
1145             }
1146              
1147             sub get_from_id {
1148 0     0 0   my $self = shift;
1149 0           my ($id, $index, $type) = @_;
1150              
1151 0   0       $index ||= $self->index;
1152 0   0       $type ||= $self->type;
1153 0           my $es = $self->_es;
1154 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1155 0 0         $self->brik_help_run_undef_arg('get_from_id', $id) or return;
1156 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1157 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1158              
1159 0           my $r;
1160 0           eval {
1161 0           my %this_args = (
1162             index => $index,
1163             id => $id,
1164             );
1165 0 0         if ($self->use_type) {
1166 0           $this_args{type} = $type;
1167             }
1168 0           $r = $es->get(%this_args);
1169             };
1170 0 0         if ($@) {
1171 0           chomp($@);
1172 0           return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
1173             }
1174              
1175 0           return $r;
1176             }
1177              
1178             #
1179             # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
1180             #
1181             sub www_search {
1182 0     0 0   my $self = shift;
1183 0           my ($query, $index, $type) = @_;
1184              
1185 0   0       $index ||= $self->index;
1186 0   0       $type ||= $self->type;
1187 0 0         $self->brik_help_run_undef_arg('www_search', $query) or return;
1188 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
1189 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
1190              
1191 0           my $from = $self->from;
1192 0           my $size = $self->size;
1193              
1194 0 0         my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
1195              
1196 0           my $nodes = $self->nodes;
1197 0           for my $node (@$nodes) {
1198             # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
1199 0           my $url = "$node/$index";
1200 0 0         if ($self->use_type) {
1201 0 0         if ($type ne '*') {
1202 0           $url .= "/$type";
1203             }
1204             }
1205 0           $url .= "/_search/?from=$from&size=$size&q=".$query;
1206              
1207 0 0         my $get = $self->SUPER::get($url) or next;
1208 0           my $body = $get->{content};
1209              
1210 0 0         my $decoded = $sj->decode($body) or next;
1211              
1212 0           return $decoded;
1213             }
1214              
1215 0           return;
1216             }
1217              
1218             #
1219             # Search::Elasticsearch::Client::2_0::Direct::Indices
1220             #
1221             sub delete_index {
1222 0     0 0   my $self = shift;
1223 0           my ($index) = @_;
1224              
1225 0           my $es = $self->_es;
1226 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1227 0 0         $self->brik_help_run_undef_arg('delete_index', $index) or return;
1228 0 0         $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
1229              
1230 0           my %args = (
1231             index => $index,
1232             );
1233              
1234 0           my $r;
1235 0           eval {
1236 0           $r = $es->indices->delete(%args);
1237             };
1238 0 0         if ($@) {
1239 0           chomp($@);
1240 0           return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
1241             }
1242              
1243 0           return $r;
1244             }
1245              
1246             #
1247             # Search::Elasticsearch::Client::2_0::Direct::Indices
1248             #
1249             # To execute this Command using routing requires to use the correct field
1250             # value directly in $hash->{routing}. We cannot "guess" it from arguments,
1251             # this would be a little bit complicated to do in an efficient way.
1252             #
1253             sub delete_document {
1254 0     0 0   my $self = shift;
1255 0           my ($index, $type, $id, $hash) = @_;
1256              
1257 0           my $es = $self->_es;
1258 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1259 0 0         $self->brik_help_run_undef_arg('delete_document', $index) or return;
1260 0 0         $self->brik_help_run_undef_arg('delete_document', $type) or return;
1261 0 0         $self->brik_help_run_undef_arg('delete_document', $id) or return;
1262              
1263 0           my %args = (
1264             index => $index,
1265             id => $id,
1266             );
1267              
1268 0 0         if ($self->use_type) {
1269 0           $args{type} = $type;
1270             }
1271              
1272 0 0         if (defined($hash)) {
1273 0 0         $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH')
1274             or return;
1275 0           %args = ( %args, %$hash );
1276             }
1277              
1278 0           my $r;
1279 0           eval {
1280 0           $r = $es->delete(%args);
1281             };
1282 0 0         if ($@) {
1283 0           chomp($@);
1284 0           return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
1285             }
1286              
1287 0           return $r;
1288             }
1289              
1290             #
1291             # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
1292             #
1293             # Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1294             #
1295             sub delete_by_query {
1296 0     0 0   my $self = shift;
1297 0           my ($query, $index, $type, $proceed) = @_;
1298              
1299 0           my $es = $self->_es;
1300 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1301 0 0         $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
1302 0 0         $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
1303 0 0         $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
1304 0 0         $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
1305              
1306 0           my $timeout = $self->rtimeout;
1307              
1308 0           my %args = (
1309             index => $index,
1310             body => $query,
1311             );
1312              
1313 0 0         if ($self->use_type) {
1314 0           $args{type} = $type;
1315             }
1316              
1317 0 0 0       if (defined($proceed) && $proceed) {
1318 0           $args{conflicts} = 'proceed';
1319             }
1320              
1321 0           my $r;
1322 0           eval {
1323 0           $r = $es->delete_by_query(%args);
1324             };
1325 0 0         if ($@) {
1326 0           chomp($@);
1327 0           return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
1328             }
1329              
1330             # This may fail, we ignore it.
1331 0           $self->refresh_index($index);
1332              
1333 0           return $r;
1334             }
1335              
1336             #
1337             # Search::Elasticsearch::Client::2_0::Direct::Cat
1338             #
1339             # https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
1340             #
1341             sub show_indices {
1342 0     0 0   my $self = shift;
1343 0           my ($string) = @_;
1344              
1345 0           my $es = $self->_es;
1346 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1347              
1348 0           my $r;
1349 0           eval {
1350 0           $r = $es->cat->indices;
1351             };
1352 0 0         if ($@) {
1353 0           chomp($@);
1354 0           return $self->log->error("show_indices: failed: [$@]");
1355             }
1356              
1357 0           my @lines = split(/\n/, $r);
1358              
1359 0 0         if (@lines == 0) {
1360 0           $self->log->warning("show_indices: nothing returned, no index?");
1361             }
1362              
1363 0           my @filtered = ();
1364 0 0         if (defined($string)) {
1365 0           for (@lines) {
1366 0 0         if (m{$string}) {
1367 0           push @filtered, $_;
1368             }
1369             }
1370 0           @lines = @filtered;
1371             }
1372              
1373 0           return \@lines;
1374             }
1375              
1376             #
1377             # Search::Elasticsearch::Client::2_0::Direct::Cat
1378             #
1379             sub show_nodes {
1380 0     0 0   my $self = shift;
1381              
1382 0           my $es = $self->_es;
1383 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1384              
1385 0           my $r;
1386 0           eval {
1387 0           $r = $es->cat->nodes;
1388             };
1389 0 0         if ($@) {
1390 0           chomp($@);
1391 0           return $self->log->error("show_nodes: failed: [$@]");
1392             }
1393              
1394 0           my @lines = split(/\n/, $r);
1395              
1396 0 0         if (@lines == 0) {
1397 0           $self->log->warning("show_nodes: nothing returned, no nodes?");
1398             }
1399              
1400 0           return \@lines;
1401             }
1402              
1403             #
1404             # Search::Elasticsearch::Client::2_0::Direct::Cat
1405             #
1406             sub show_health {
1407 0     0 0   my $self = shift;
1408              
1409 0           my $es = $self->_es;
1410 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1411              
1412 0           my $r;
1413 0           eval {
1414 0           $r = $es->cat->health;
1415             };
1416 0 0         if ($@) {
1417 0           chomp($@);
1418 0           return $self->log->error("show_health: failed: [$@]");
1419             }
1420              
1421 0           my @lines = split(/\n/, $r);
1422              
1423 0 0         if (@lines == 0) {
1424 0           $self->log->warning("show_health: nothing returned, no recovery?");
1425             }
1426              
1427 0           return \@lines;
1428             }
1429              
1430             #
1431             # Search::Elasticsearch::Client::2_0::Direct::Cat
1432             #
1433             sub show_recovery {
1434 0     0 0   my $self = shift;
1435              
1436 0           my $es = $self->_es;
1437 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1438              
1439 0           my $r;
1440 0           eval {
1441 0           $r = $es->cat->recovery;
1442             };
1443 0 0         if ($@) {
1444 0           chomp($@);
1445 0           return $self->log->error("show_recovery: failed: [$@]");
1446             }
1447              
1448 0           my @lines = split(/\n/, $r);
1449              
1450 0 0         if (@lines == 0) {
1451 0           $self->log->warning("show_recovery: nothing returned, no index?");
1452             }
1453              
1454 0           return \@lines;
1455             }
1456              
1457             #
1458             # curl -s 'localhost:9200/_cat/allocation?v'
1459             #
1460             sub show_allocation {
1461 0     0 0   my $self = shift;
1462              
1463 0           my $es = $self->_es;
1464 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1465              
1466 0           my $r;
1467 0           eval {
1468 0           $r = $es->cat->allocation;
1469             };
1470 0 0         if ($@) {
1471 0           chomp($@);
1472 0           return $self->log->error("show_allocation: failed: [$@]");
1473             }
1474              
1475 0           my @lines = split(/\n/, $r);
1476              
1477 0 0         if (@lines == 0) {
1478 0           $self->log->warning("show_allocation: nothing returned, no index?");
1479             }
1480              
1481 0           return \@lines;
1482             }
1483              
1484             sub list_indices {
1485 0     0 0   my $self = shift;
1486 0           my ($regex) = @_;
1487              
1488 0 0         my $get = $self->get_indices or return;
1489              
1490 0           my @indices = ();
1491 0           for (@$get) {
1492 0 0         if (defined($regex)) {
1493 0 0         if ($_->{index} =~ m{$regex}) {
1494 0           push @indices, $_->{index};
1495             }
1496             }
1497             else {
1498 0           push @indices, $_->{index};
1499             }
1500             }
1501              
1502 0           return [ sort { $a cmp $b } @indices ];
  0            
1503             }
1504              
1505             sub get_indices {
1506 0     0 0   my $self = shift;
1507 0           my ($string) = @_;
1508              
1509 0 0         my $lines = $self->show_indices($string) or return;
1510 0 0         if (@$lines == 0) {
1511 0           $self->log->warning("get_indices: no index found");
1512 0           return [];
1513             }
1514              
1515             #
1516             # Format depends on ElasticSearch version. We try to detect the format.
1517             #
1518             # 5.0.0:
1519             # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1 146 0 251.8kb 251.8kb"
1520             #
1521 0           my @indices = ();
1522 0           for (@$lines) {
1523 0           my @t = split(/\s+/);
1524 0 0         if (@t == 10) { # Version 5.0.0
    0          
    0          
1525 0           my $color = $t[0];
1526 0           my $state = $t[1];
1527 0           my $index = $t[2];
1528 0           my $id = $t[3];
1529 0           my $shards = $t[4];
1530 0           my $replicas = $t[5];
1531 0           my $count = $t[6];
1532 0           my $count2 = $t[7];
1533 0           my $total_size = $t[8];
1534 0           my $size = $t[9];
1535 0           push @indices, {
1536             color => $color,
1537             state => $state,
1538             index => $index,
1539             id => $id,
1540             shards => $shards,
1541             replicas => $replicas,
1542             count => $count,
1543             total_size => $total_size,
1544             size => $size,
1545             };
1546             }
1547             elsif (@t == 9) {
1548 0           my $index = $t[2];
1549 0           push @indices, {
1550             index => $index,
1551             };
1552             }
1553             elsif (@t == 8) {
1554 0           my $index = $t[1];
1555 0           push @indices, {
1556             index => $index,
1557             };
1558             }
1559             }
1560              
1561 0           return \@indices;
1562             }
1563              
1564             #
1565             # Search::Elasticsearch::Client::5_0::Direct::Indices
1566             #
1567             sub get_index {
1568 0     0 0   my $self = shift;
1569 0           my ($index) = @_;
1570            
1571 0           my $es = $self->_es;
1572 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1573 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1574 0 0         $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1575              
1576 0           my %args = (
1577             index => $index,
1578             );
1579              
1580 0           my $r;
1581 0           eval {
1582 0           $r = $es->indices->get(%args);
1583             };
1584 0 0         if ($@) {
1585 0           chomp($@);
1586 0           return $self->log->error("get_index: get failed for index [$index]: [$@]");
1587             }
1588              
1589 0           return $r;
1590             }
1591              
1592             sub get_index_stats {
1593 0     0 0   my $self = shift;
1594 0           my ($index) = @_;
1595              
1596 0           my $es = $self->_es;
1597 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1598 0 0         $self->brik_help_run_undef_arg('get_index', $index) or return;
1599              
1600 0           my %args = (
1601             index => $index,
1602             );
1603              
1604 0           my $r;
1605 0           eval {
1606 0           $r = $es->indices->stats(%args);
1607             };
1608 0 0         if ($@) {
1609 0           chomp($@);
1610 0           return $self->log->error("get_index_stats: get failed for index [$index]: ".
1611             "[$@]");
1612             }
1613              
1614 0           return $r->{indices}{$index};
1615             }
1616              
1617             sub list_index_types {
1618 0     0 0   my $self = shift;
1619 0           my ($index) = @_;
1620              
1621 0           my $es = $self->_es;
1622 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1623 0 0         $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1624 0 0         $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1625              
1626 0 0         my $r = $self->get_mappings($index) or return;
1627 0 0         if (keys %$r > 1) {
1628 0           return $self->log->error("list_index_types: multiple indices found, choose one");
1629             }
1630              
1631 0           my @types = ();
1632 0           for my $this_index (keys %$r) {
1633 0           my $mappings = $r->{$this_index}{mappings};
1634 0           push @types, keys %$mappings;
1635             }
1636              
1637 0           my %uniq = map { $_ => 1 } @types;
  0            
1638              
1639 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1640             }
1641              
1642             #
1643             # By default, if you provide only one index and no type,
1644             # all types will be merged (including _default_)
1645             # If you specify one type (other than _default_), _default_ will be merged to it.
1646             #
1647             sub list_index_fields {
1648 0     0 0   my $self = shift;
1649 0           my ($index, $type) = @_;
1650              
1651 0           my $es = $self->_es;
1652 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1653 0 0         $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1654 0 0         $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1655              
1656 0           my $r;
1657 0 0         if (defined($type)) {
1658 0 0         $r = $self->get_mappings($index, $type) or return;
1659 0 0         if (keys %$r > 1) {
1660 0           return $self->log->error("list_index_fields: multiple indices found, ".
1661             "choose one");
1662             }
1663             # _default_ mapping may not exists.
1664 0 0         if ($self->is_mapping_exists($index, '_default_')) {
1665 0           my $r2 = $self->get_mappings($index, '_default_');
1666             # Merge
1667 0           for my $this_index (keys %$r2) {
1668 0           my $default = $r2->{$this_index}{mappings}{'_default_'};
1669 0           $r->{$this_index}{mappings}{_default_} = $default;
1670             }
1671             }
1672             }
1673             else {
1674 0 0         $r = $self->get_mappings($index) or return;
1675 0 0         if (keys %$r > 1) {
1676 0           return $self->log->error("list_index_fields: multiple indices found, ".
1677             "choose one");
1678             }
1679             }
1680              
1681 0           my @fields = ();
1682 0           for my $this_index (keys %$r) {
1683 0           my $mappings = $r->{$this_index}{mappings};
1684 0           for my $this_type (keys %$mappings) {
1685 0           my $properties = $mappings->{$this_type}{properties};
1686 0           push @fields, keys %$properties;
1687             }
1688             }
1689              
1690 0           my %uniq = map { $_ => 1 } @fields;
  0            
1691              
1692 0           return [ sort { $a cmp $b } keys %uniq ];
  0            
1693             }
1694              
1695             sub list_indices_version {
1696 0     0 0   my $self = shift;
1697 0           my ($index) = @_;
1698              
1699 0           my $es = $self->_es;
1700 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1701 0 0         $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1702 0 0         $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1703             or return;
1704              
1705 0 0         my $r = $self->get_index($index) or return;
1706              
1707 0           my @list = ();
1708 0           for my $this (keys %$r) {
1709 0           my $name = $this;
1710 0           my $version = $r->{$this}{settings}{index}{version}{created};
1711 0           push @list, {
1712             index => $name,
1713             version => $version,
1714             };
1715             }
1716              
1717 0           return \@list;
1718             }
1719              
1720             sub open_index {
1721 0     0 0   my $self = shift;
1722 0           my ($index) = @_;
1723              
1724 0           my $es = $self->_es;
1725 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1726 0 0         $self->brik_help_run_undef_arg('open_index', $index) or return;
1727 0 0         $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1728              
1729 0           my $r;
1730 0           eval {
1731 0           $r = $es->indices->open(
1732             index => $index,
1733             );
1734             };
1735 0 0         if ($@) {
1736 0           chomp($@);
1737 0           return $self->log->error("open_index: failed: [$@]");
1738             }
1739              
1740 0           return $r;
1741             }
1742              
1743             sub close_index {
1744 0     0 0   my $self = shift;
1745 0           my ($index) = @_;
1746              
1747 0           my $es = $self->_es;
1748 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1749 0 0         $self->brik_help_run_undef_arg('close_index', $index) or return;
1750 0 0         $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1751              
1752 0           my $r;
1753 0           eval {
1754 0           $r = $es->indices->close(
1755             index => $index,
1756             );
1757             };
1758 0 0         if ($@) {
1759 0           chomp($@);
1760 0           return $self->log->error("close_index: failed: [$@]");
1761             }
1762              
1763 0           return $r;
1764             }
1765              
1766             #
1767             # Search::Elasticsearch::Client::5_0::Direct::Indices
1768             #
1769             sub get_aliases {
1770 0     0 0   my $self = shift;
1771 0           my ($index) = @_;
1772              
1773 0   0       $index ||= $self->index;
1774 0           my $es = $self->_es;
1775 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1776              
1777             #
1778             # [DEPRECATION] [types removal] The parameter include_type_name should be
1779             # explicitly specified in get indices requests to prepare for 7.0. In 7.0
1780             # include_type_name will default to 'false', which means responses will
1781             # omit the type name in mapping definitions. - In request: {body => undef,
1782             # ignore => [],method => "GET",path => "/*",qs => {},serialize => "std"}
1783             #
1784              
1785 0           my %args = (
1786             index => $index,
1787             params => { include_type_name => 'false' },
1788             );
1789              
1790 0           my $r;
1791 0           eval {
1792 0           $r = $es->indices->get(%args);
1793             };
1794 0 0         if ($@) {
1795 0           chomp($@);
1796 0           return $self->log->error("get_aliases: get_aliases failed: [$@]");
1797             }
1798              
1799 0           my %aliases = ();
1800 0           for my $this (keys %$r) {
1801 0           $aliases{$this} = $r->{$this}{aliases};
1802             }
1803              
1804 0           return \%aliases;
1805             }
1806              
1807             #
1808             # Search::Elasticsearch::Client::5_0::Direct::Indices
1809             #
1810             sub put_alias {
1811 0     0 0   my $self = shift;
1812 0           my ($index, $alias) = @_;
1813              
1814 0           my $es = $self->_es;
1815 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1816 0 0         $self->brik_help_run_undef_arg('put_alias', $index) or return;
1817 0 0         $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1818              
1819 0           my %args = (
1820             index => $index,
1821             name => $alias,
1822             );
1823              
1824 0           my $r;
1825 0           eval {
1826 0           $r = $es->indices->put_alias(%args);
1827             };
1828 0 0         if ($@) {
1829 0           chomp($@);
1830 0           return $self->log->error("put_alias: put_alias failed: [$@]");
1831             }
1832              
1833 0           return $r;
1834             }
1835              
1836             #
1837             # Search::Elasticsearch::Client::5_0::Direct::Indices
1838             #
1839             sub delete_alias {
1840 0     0 0   my $self = shift;
1841 0           my ($index, $alias) = @_;
1842              
1843 0           my $es = $self->_es;
1844 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1845 0 0         $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1846 0 0         $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1847              
1848 0           my %args = (
1849             index => $index,
1850             name => $alias,
1851             );
1852              
1853 0           my $r;
1854 0           eval {
1855 0           $r = $es->indices->delete_alias(%args);
1856             };
1857 0 0         if ($@) {
1858 0           chomp($@);
1859 0           return $self->log->error("delete_alias: delete_alias failed: [$@]");
1860             }
1861              
1862 0           return $r;
1863             }
1864              
1865             sub update_alias {
1866 0     0 0   my $self = shift;
1867 0           my ($new_index, $alias) = @_;
1868              
1869 0           my $es = $self->_es;
1870 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1871 0 0         $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1872 0 0         $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1873              
1874             # Search for previous index with that alias, if any.
1875 0           my $prev_index;
1876 0 0         my $aliases = $self->get_aliases or return;
1877 0           while (my ($k, $v) = each %$aliases) {
1878 0           for my $this (keys %$v) {
1879 0 0         if ($this eq $alias) {
1880 0           $prev_index = $k;
1881 0           last;
1882             }
1883             }
1884 0 0         last if $prev_index;
1885             }
1886              
1887             # Delete previous alias if it exists.
1888 0 0         if (defined($prev_index)) {
1889 0 0         $self->delete_alias($prev_index, $alias) or return;
1890             }
1891              
1892 0           return $self->put_alias($new_index, $alias);
1893             }
1894              
1895             sub is_mapping_exists {
1896 0     0 0   my $self = shift;
1897 0           my ($index, $mapping) = @_;
1898              
1899 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1900 0 0         $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1901              
1902 0 0         if (! $self->is_index_exists($index)) {
1903 0           return 0;
1904             }
1905              
1906 0 0         my $all = $self->get_mappings($index) or return;
1907 0           for my $this_index (keys %$all) {
1908 0           my $mappings = $all->{$this_index}{mappings};
1909 0           for my $this_mapping (keys %$mappings) {
1910 0 0         if ($this_mapping eq $mapping) {
1911 0           return 1;
1912             }
1913             }
1914             }
1915              
1916 0           return 0;
1917             }
1918              
1919             #
1920             # Search::Elasticsearch::Client::2_0::Direct::Indices
1921             #
1922             sub get_mappings {
1923 0     0 0   my $self = shift;
1924 0           my ($index, $type) = @_;
1925              
1926 0           my $es = $self->_es;
1927 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1928 0 0         $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1929 0 0         $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1930              
1931 0           my %args = (
1932             index => $index,
1933             params => { include_type_name => 'false' },
1934             );
1935              
1936 0 0         if ($self->use_type) {
1937 0           $args{type} = $type;
1938             }
1939              
1940 0           my $r;
1941 0           eval {
1942 0           $r = $es->indices->get_mapping(%args);
1943             };
1944 0 0         if ($@) {
1945 0           chomp($@);
1946 0           return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1947             "[$@]");
1948             }
1949              
1950 0           return $r;
1951             }
1952              
1953             #
1954             # Search::Elasticsearch::Client::2_0::Direct::Indices
1955             #
1956             sub create_index {
1957 0     0 0   my $self = shift;
1958 0           my ($index, $shards) = @_;
1959              
1960 0           my $es = $self->_es;
1961 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1962 0 0         $self->brik_help_run_undef_arg('create_index', $index) or return;
1963              
1964 0           my %args = (
1965             index => $index,
1966             );
1967              
1968 0 0         if (defined($shards)) {
1969 0           $args{body}{settings}{index}{number_of_shards} = $shards;
1970             }
1971              
1972 0           my $r;
1973 0           eval {
1974 0           $r = $es->indices->create(%args);
1975             };
1976 0 0         if ($@) {
1977 0           chomp($@);
1978 0           return $self->log->error("create_index: create failed ".
1979             "for index [$index]: [$@]");
1980             }
1981            
1982 0           return $r;
1983             }
1984              
1985             #
1986             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1987             #
1988             sub create_index_with_mappings {
1989 0     0 0   my $self = shift;
1990 0           my ($index, $mappings) = @_;
1991              
1992 0           my $es = $self->_es;
1993 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
1994 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1995 0 0         $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1996 0 0         $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
1997             or return;
1998              
1999 0           my $r;
2000 0           eval {
2001 0           $r = $es->indices->create(
2002             index => $index,
2003             body => {
2004             mappings => $mappings,
2005             },
2006             );
2007             };
2008 0 0         if ($@) {
2009 0           chomp($@);
2010 0           return $self->log->error("create_index_with_mappings: create failed for ".
2011             "index [$index]: [$@]");
2012             }
2013              
2014 0           return $r;
2015             }
2016              
2017             # GET http://localhost:9200/
2018             sub info {
2019 0     0 0   my $self = shift;
2020 0           my ($nodes) = @_;
2021              
2022 0   0       $nodes ||= $self->nodes;
2023 0 0         $self->brik_help_run_undef_arg('info', $nodes) or return;
2024 0 0         $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
2025 0 0         $self->brik_help_run_empty_array_arg('info', $nodes) or return;
2026              
2027 0           my $first = $nodes->[0];
2028              
2029 0 0         $self->get($first) or return;
2030              
2031 0           return $self->content;
2032             }
2033              
2034             sub version {
2035 0     0 0   my $self = shift;
2036 0           my ($nodes) = @_;
2037              
2038 0   0       $nodes ||= $self->nodes;
2039 0 0         $self->brik_help_run_undef_arg('version', $nodes) or return;
2040 0 0         $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
2041 0 0         $self->brik_help_run_empty_array_arg('version', $nodes) or return;
2042              
2043 0           my $first = $nodes->[0];
2044              
2045 0 0         $self->get($first) or return;
2046 0 0         my $content = $self->content or return;
2047              
2048 0           return $content->{version}{number};
2049             }
2050              
2051             #
2052             # Search::Elasticsearch::Client::2_0::Direct::Indices
2053             #
2054             sub get_templates {
2055 0     0 0   my $self = shift;
2056              
2057 0           my $es = $self->_es;
2058 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2059              
2060 0           my $r;
2061 0           eval {
2062 0           $r = $es->indices->get_template;
2063             };
2064 0 0         if ($@) {
2065 0           chomp($@);
2066 0           return $self->log->error("get_templates: failed: [$@]");
2067             }
2068              
2069 0           return $r;
2070             }
2071              
2072             sub list_templates {
2073 0     0 0   my $self = shift;
2074              
2075 0 0         my $content = $self->get_templates or return;
2076              
2077 0           return [ sort { $a cmp $b } keys %$content ];
  0            
2078             }
2079              
2080             #
2081             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2082             #
2083             sub get_template {
2084 0     0 0   my $self = shift;
2085 0           my ($template) = @_;
2086              
2087 0           my $es = $self->_es;
2088 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2089 0 0         $self->brik_help_run_undef_arg('get_template', $template) or return;
2090              
2091 0           my $r;
2092 0           eval {
2093 0           $r = $es->indices->get_template(
2094             name => $template,
2095             );
2096             };
2097 0 0         if ($@) {
2098 0           chomp($@);
2099 0           return $self->log->error("get_template: template failed for name [$template]: [$@]");
2100             }
2101              
2102 0           return $r;
2103             }
2104              
2105             sub put_mapping {
2106 0     0 0   my $self = shift;
2107 0           my ($index, $type, $mapping) = @_;
2108              
2109 0           my $es = $self->_es;
2110 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2111 0 0         $self->brik_help_run_undef_arg('put_mapping', $index) or return;
2112 0 0         $self->brik_help_run_undef_arg('put_mapping', $type) or return;
2113 0 0         $self->brik_help_run_undef_arg('put_mapping', $mapping) or return;
2114 0 0         $self->brik_help_run_invalid_arg('put_mapping', $mapping, 'HASH')
2115             or return;
2116              
2117 0           my $r;
2118 0           eval {
2119 0           my %this_args = (
2120             index => $index,
2121             body => $mapping,
2122             );
2123 0 0         if ($self->use_type) {
2124 0           $this_args{type} = $type;
2125             }
2126 0           $r = $es->indices->put_mapping(%this_args);
2127             };
2128 0 0         if ($@) {
2129 0           chomp($@);
2130 0           return $self->log->error("put_mapping: mapping failed ".
2131             "for index [$index]: [$@]");
2132             }
2133              
2134 0           return $r;
2135             }
2136              
2137             #
2138             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2139             #
2140             sub put_template {
2141 0     0 0   my $self = shift;
2142 0           my ($name, $template) = @_;
2143              
2144 0           my $es = $self->_es;
2145 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2146 0 0         $self->brik_help_run_undef_arg('put_template', $name) or return;
2147 0 0         $self->brik_help_run_undef_arg('put_template', $template) or return;
2148 0 0         $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
2149             or return;
2150              
2151 0           my $r;
2152 0           eval {
2153 0           $r = $es->indices->put_template(
2154             name => $name,
2155             body => $template,
2156             );
2157             };
2158 0 0         if ($@) {
2159 0           chomp($@);
2160 0           return $self->log->error("put_template: template failed ".
2161             "for name [$name]: [$@]");
2162             }
2163              
2164 0           return $r;
2165             }
2166              
2167             sub put_mapping_from_json_file {
2168 0     0 0   my $self = shift;
2169 0           my ($index, $type, $json_file) = @_;
2170              
2171 0           my $es = $self->_es;
2172 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2173 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
2174             or return;
2175 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
2176             or return;
2177 0 0         $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
2178             or return;
2179 0 0         $self->brik_help_run_file_not_found('put_mapping_from_json_file',
2180             $json_file) or return;
2181              
2182 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2183 0 0         my $data = $fj->read($json_file) or return;
2184              
2185 0 0         if (! exists($data->{mappings})) {
2186 0           return $self->log->error("put_mapping_from_json_file: no mapping ".
2187             "data found");
2188             }
2189              
2190 0           return $self->put_mapping($index, $type, $data->{mappings});
2191             }
2192              
2193             sub update_mapping_from_json_file {
2194 0     0 0   my $self = shift;
2195 0           my ($json_file, $index, $type) = @_;
2196              
2197 0           my $es = $self->_es;
2198 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2199 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2200             $json_file) or return;
2201 0 0         $self->brik_help_run_file_not_found('update_mapping_from_json_file',
2202             $json_file) or return;
2203 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2204             $type) or return;
2205 0 0         $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2206             $index) or return;
2207              
2208 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2209 0 0         my $data = $fj->read($json_file) or return;
2210              
2211 0 0         if (! exists($data->{mappings})) {
2212 0           return $self->log->error("update_mapping_from_json_file: ".
2213             "no data found");
2214             }
2215              
2216 0           my $mappings = $data->{mappings};
2217              
2218 0           return $self->put_mapping($index, $type, $mappings);
2219             }
2220              
2221             sub put_template_from_json_file {
2222 0     0 0   my $self = shift;
2223 0           my ($json_file, $name) = @_;
2224              
2225 0           my $es = $self->_es;
2226 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2227 0 0         $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
2228             or return;
2229 0 0         $self->brik_help_run_file_not_found('put_template_from_json_file',
2230             $json_file) or return;
2231              
2232 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2233 0 0         my $data = $fj->read($json_file) or return;
2234              
2235 0 0         if (!defined($name)) {
2236 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2237             }
2238              
2239 0 0         if (! defined($name)) {
2240 0           return $self->log->error("put_template_from_json_file: no template ".
2241             "name found");
2242             }
2243              
2244 0           return $self->put_template($name, $data);
2245             }
2246              
2247             sub update_template_from_json_file {
2248 0     0 0   my $self = shift;
2249 0           my ($json_file, $name) = @_;
2250              
2251 0           my $es = $self->_es;
2252 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2253 0 0         $self->brik_help_run_undef_arg('update_template_from_json_file',
2254             $json_file) or return;
2255 0 0         $self->brik_help_run_file_not_found('update_template_from_json_file',
2256             $json_file) or return;
2257              
2258 0 0         my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2259 0 0         my $data = $fj->read($json_file) or return;
2260              
2261 0 0         if (!defined($name)) {
2262 0           ($name) = $json_file =~ m{([^/]+)\.json$};
2263             }
2264              
2265 0 0         if (! defined($name)) {
2266 0           return $self->log->error("put_template_from_json_file: no template ".
2267             "name found");
2268             }
2269              
2270             # We ignore errors, template may not exist.
2271 0           $self->delete_template($name);
2272              
2273 0           return $self->put_template($name, $data);
2274             }
2275              
2276             #
2277             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2278             # Search::Elasticsearch::Client::2_0::Direct::Indices
2279             #
2280             sub get_settings {
2281 0     0 0   my $self = shift;
2282 0           my ($indices, $names) = @_;
2283              
2284 0           my $es = $self->_es;
2285 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2286              
2287 0           my %args = ();
2288 0 0         if (defined($indices)) {
2289 0 0         $self->brik_help_run_undef_arg('get_settings', $indices) or return;
2290 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
2291             or return;
2292 0           $args{index} = $indices;
2293             }
2294 0 0         if (defined($names)) {
2295 0 0         $self->brik_help_run_file_not_found('get_settings', $names) or return;
2296 0 0         my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
2297             or return;
2298 0           $args{name} = $names;
2299             }
2300              
2301 0           my $r;
2302 0           eval {
2303 0           $r = $es->indices->get_settings(%args);
2304             };
2305 0 0         if ($@) {
2306 0           chomp($@);
2307 0           return $self->log->error("get_settings: failed: [$@]");
2308             }
2309              
2310 0           return $r;
2311             }
2312              
2313             #
2314             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2315             # Search::Elasticsearch::Client::2_0::Direct::Indices
2316             #
2317             # Example:
2318             #
2319             # run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
2320             #
2321             # XXX: should be renamed to put_index_settings
2322             #
2323             sub put_settings {
2324 0     0 0   my $self = shift;
2325 0           my ($settings, $indices) = @_;
2326              
2327 0           my $es = $self->_es;
2328 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2329 0 0         $self->brik_help_run_undef_arg('put_settings', $settings) or return;
2330 0 0         $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
2331              
2332 0           my %args = (
2333             body => $settings,
2334             );
2335 0 0         if (defined($indices)) {
2336 0 0         $self->brik_help_run_undef_arg('put_settings', $indices) or return;
2337 0 0         my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
2338             or return;
2339 0           $args{index} = $indices;
2340             }
2341              
2342 0           my $r;
2343 0           eval {
2344 0           $r = $es->indices->put_settings(%args);
2345             };
2346 0 0         if ($@) {
2347 0           chomp($@);
2348 0           return $self->log->error("put_settings: failed: [$@]");
2349             }
2350              
2351 0           return $r;
2352             }
2353              
2354             sub set_index_readonly {
2355 0     0 0   my $self = shift;
2356 0           my ($indices, $bool) = @_;
2357              
2358 0           my $es = $self->_es;
2359 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2360 0 0         $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
2361 0 0         $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
2362             or return;
2363              
2364 0 0         if (! defined($bool)) {
2365 0           $bool = 'true';
2366             }
2367             else {
2368 0 0         $bool = $bool ? 'true' : 'false';
2369             }
2370              
2371 0           my $settings = {
2372             'blocks.read_only' => $bool,
2373             'blocks.read_only_allow_delete' => 'true',
2374             };
2375              
2376 0           return $self->put_settings($settings, $indices);
2377             }
2378              
2379             #
2380             # curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
2381             # PUT synscan-2018-05/_settings
2382             # {
2383             # "index": {
2384             # "blocks":{
2385             # "read_only":"false",
2386             # "read_only_allow_delete":"true"
2387             # }
2388             # }
2389             #}
2390             #
2391             #
2392             # If it fails with the following error:
2393             #
2394             # [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
2395             #
2396             # Use Kibana dev console and copy/paste both requests:
2397             #
2398             # PUT _all/_settings
2399             # {
2400             # "index": {
2401             # "blocks": {
2402             # "read_only_allow_delete": "false"
2403             # }
2404             # }
2405             # }
2406             #
2407             sub reset_index_readonly {
2408 0     0 0   my $self = shift;
2409 0           my ($indices) = @_;
2410              
2411 0   0       $indices ||= '*';
2412 0           my $es = $self->_es;
2413 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2414 0 0         $self->brik_help_run_invalid_arg('reset_index_readonly', $indices,
2415             'ARRAY', 'SCALAR') or return;
2416              
2417 0           my $settings = {
2418             blocks => {
2419             read_only_allow_delete => 'false',
2420             },
2421             };
2422              
2423             # Settings on '*' indices should be enough to reset for everyone.
2424 0           my $r = $self->put_settings($settings, $indices);
2425             #$self->log->info(Data::Dumper::Dumper($r));
2426              
2427 0           return 1;
2428             }
2429              
2430             sub list_index_readonly {
2431 0     0 0   my $self = shift;
2432              
2433 0           my $es = $self->_es;
2434 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2435              
2436 0 0         my $list = $self->list_indices or return;
2437              
2438 0           my @indices = ();
2439 0           for my $this (@$list) {
2440 0 0         my $ro = $self->get_index_readonly($this) or next;
2441 0 0         if (defined($ro->{index}{provided_name})) {
2442 0           push @indices, $ro->{index}{provided_name};
2443             }
2444             }
2445              
2446 0           return \@indices;
2447             }
2448              
2449             sub set_index_number_of_replicas {
2450 0     0 0   my $self = shift;
2451 0           my ($indices, $number) = @_;
2452              
2453 0           my $es = $self->_es;
2454 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2455 0 0         $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
2456 0 0         $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2457             or return;
2458              
2459 0           my $settings = { number_of_replicas => $number };
2460              
2461 0           return $self->put_settings($settings, $indices);
2462             }
2463              
2464             sub set_index_refresh_interval {
2465 0     0 0   my $self = shift;
2466 0           my ($indices, $number) = @_;
2467              
2468 0           my $es = $self->_es;
2469 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2470 0 0         $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
2471 0 0         $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2472             or return;
2473              
2474             # If there is a meaningful value not postfixed with a unity,
2475             # we default to add a `s' for a number of seconds.
2476 0 0 0       if ($number =~ /^\d+$/ && $number > 0) {
2477 0           $number .= 's';
2478             }
2479              
2480 0           my $settings = { refresh_interval => $number };
2481              
2482 0           return $self->put_settings($settings, $indices);
2483             }
2484              
2485             sub get_index_settings {
2486 0     0 0   my $self = shift;
2487 0           my ($indices) = @_;
2488              
2489 0           my $es = $self->_es;
2490 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2491 0 0         $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
2492 0 0         $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
2493             or return;
2494              
2495 0           my $settings = $self->get_settings($indices);
2496              
2497 0           my %indices = ();
2498 0           for (keys %$settings) {
2499 0           $indices{$_} = $settings->{$_}{settings};
2500             }
2501              
2502 0           return \%indices;
2503             }
2504              
2505             sub get_index_readonly {
2506 0     0 0   my $self = shift;
2507 0           my ($indices) = @_;
2508              
2509 0           my $es = $self->_es;
2510 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2511 0 0         $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
2512 0 0         $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
2513             or return;
2514              
2515 0           my $settings = $self->get_settings($indices);
2516              
2517 0           my %indices = ();
2518 0           for (keys %$settings) {
2519             #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
2520 0           $indices{$_} = $settings->{$_}{settings};
2521             }
2522              
2523 0           return \%indices;
2524             }
2525              
2526             sub get_index_number_of_replicas {
2527 0     0 0   my $self = shift;
2528 0           my ($indices) = @_;
2529              
2530 0           my $es = $self->_es;
2531 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2532 0 0         $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
2533 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2534             or return;
2535              
2536 0           my $settings = $self->get_settings($indices);
2537              
2538 0           my %indices = ();
2539 0           for (keys %$settings) {
2540 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
2541             }
2542              
2543 0           return \%indices;
2544             }
2545              
2546             sub get_index_refresh_interval {
2547 0     0 0   my $self = shift;
2548 0           my ($indices, $number) = @_;
2549              
2550 0           my $es = $self->_es;
2551 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2552 0 0         $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
2553 0 0         $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2554             or return;
2555              
2556 0           my $settings = $self->get_settings($indices);
2557              
2558 0           my %indices = ();
2559 0           for (keys %$settings) {
2560 0           $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
2561             }
2562              
2563 0           return \%indices;
2564             }
2565              
2566             sub get_index_number_of_shards {
2567 0     0 0   my $self = shift;
2568 0           my ($indices, $number) = @_;
2569              
2570 0           my $es = $self->_es;
2571 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2572 0 0         $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
2573 0 0         $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
2574             or return;
2575              
2576 0           my $settings = $self->get_settings($indices);
2577              
2578 0           my %indices = ();
2579 0           for (keys %$settings) {
2580 0           $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
2581             }
2582              
2583 0           return \%indices;
2584             }
2585              
2586             #
2587             # http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2588             #
2589             sub delete_template {
2590 0     0 0   my $self = shift;
2591 0           my ($name) = @_;
2592              
2593 0           my $es = $self->_es;
2594 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2595 0 0         $self->brik_help_run_undef_arg('delete_template', $name) or return;
2596              
2597 0           my $r;
2598 0           eval {
2599 0           $r = $es->indices->delete_template(
2600             name => $name,
2601             );
2602             };
2603 0 0         if ($@) {
2604 0           chomp($@);
2605 0           return $self->log->error("delete_template: failed for name [$name]: [$@]");
2606             }
2607              
2608 0           return $r;
2609             }
2610              
2611             #
2612             # Return a boolean to state for index existence
2613             #
2614             sub is_index_exists {
2615 0     0 0   my $self = shift;
2616 0           my ($index) = @_;
2617              
2618 0           my $es = $self->_es;
2619 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2620 0 0         $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
2621              
2622 0           my $r;
2623 0           eval {
2624 0           $r = $es->indices->exists(
2625             index => $index,
2626             );
2627             };
2628 0 0         if ($@) {
2629 0           chomp($@);
2630 0           return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
2631             }
2632              
2633 0 0         return $r ? 1 : 0;
2634             }
2635              
2636             #
2637             # Return a boolean to state for index with type existence
2638             #
2639             sub is_type_exists {
2640 0     0 0   my $self = shift;
2641 0           my ($index, $type) = @_;
2642              
2643 0           my $es = $self->_es;
2644 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2645 0 0         $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
2646 0 0         $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
2647              
2648 0           my $r;
2649 0           eval {
2650 0           my %this_args = (
2651             index => $index,
2652             );
2653 0 0         if ($self->use_type) {
2654 0           $this_args{type} = $type;
2655             }
2656 0           $r = $es->indices->exists_type(%this_args);
2657             };
2658 0 0         if ($@) {
2659 0           chomp($@);
2660 0           return $self->log->error("is_type_exists: failed for index [$index] and ".
2661             "type [$type]: [$@]");
2662             }
2663              
2664 0 0         return $r ? 1 : 0;
2665             }
2666              
2667             #
2668             # Return a boolean to state for document existence
2669             #
2670             sub is_document_exists {
2671 0     0 0   my $self = shift;
2672 0           my ($index, $type, $document) = @_;
2673              
2674 0           my $es = $self->_es;
2675 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2676 0 0         $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2677 0 0         $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2678 0 0         $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2679 0 0         $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2680              
2681 0           my $r;
2682 0           eval {
2683 0           my %this_args = (
2684             index => $index,
2685             %$document,
2686             );
2687 0 0         if ($self->use_type) {
2688 0           $this_args{type} = $type;
2689             }
2690 0           $r = $es->exists(%this_args);
2691             };
2692 0 0         if ($@) {
2693 0           chomp($@);
2694 0           return $self->log->error("is_document_exists: failed for index [$index] and ".
2695             "type [$type]: [$@]");
2696             }
2697              
2698 0 0         return $r ? 1 : 0;
2699             }
2700              
2701             sub parse_error_string {
2702 0     0 0   my $self = shift;
2703 0           my ($string) = @_;
2704              
2705 0 0         $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2706              
2707             # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2708              
2709 0           my ($class, $node, $code, $message, $dump) = $string =~
2710             m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2711              
2712 0 0 0       if (defined($dump) && length($dump)) {
2713 0 0         my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2714 0           $dump = $sd->decode($dump);
2715             }
2716              
2717             # Sanity check
2718 0 0 0       if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
      0        
      0        
      0        
2719             && defined($dump) && ref($dump) eq 'HASH') {
2720             return {
2721 0           class => $class,
2722             node => $node,
2723             code => $code,
2724             message => $message,
2725             dump => $dump,
2726             };
2727             }
2728              
2729             # Were not able to decode, we return as-is.
2730             return {
2731 0           message => $string,
2732             };
2733             }
2734              
2735             #
2736             # Refresh an index to receive latest additions
2737             #
2738             # Search::Elasticsearch::Client::5_0::Direct::Indices
2739             # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2740             #
2741             sub refresh_index {
2742 0     0 0   my $self = shift;
2743 0           my ($index) = @_;
2744              
2745 0           my $es = $self->_es;
2746 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2747 0 0         $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2748              
2749 0           my $try = $self->try;
2750              
2751 0           RETRY:
2752              
2753             my $r;
2754 0           eval {
2755 0           $r = $es->indices->refresh(
2756             index => $index,
2757             );
2758             };
2759 0 0         if ($@) {
2760 0 0         if (--$try == 0) {
2761 0           chomp($@);
2762 0           my $p = $self->parse_error_string($@);
2763 0 0 0       if (defined($p) && exists($p->{class})) {
2764 0           my $class = $p->{class};
2765 0           my $code = $p->{code};
2766 0           my $node = $p->{node};
2767 0           return $self->log->error("refresh_index: failed for index [$index] ".
2768             "after [$try] tries with error [$class] code [$code] for node [$node]");
2769             }
2770             else {
2771 0           return $self->log->error("refresh_index: failed for index [$index] ".
2772             "after [$try]: [$@]");
2773             }
2774             }
2775 0           sleep 60;
2776 0           goto RETRY;
2777             }
2778              
2779 0           return $r;
2780             }
2781              
2782             sub export_as {
2783 0     0 0   my $self = shift;
2784 0           my ($format, $index, $size, $cb) = @_;
2785              
2786 0   0       $size ||= 10_000;
2787 0           my $es = $self->_es;
2788 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2789 0 0         $self->brik_help_run_undef_arg('export_as', $format) or return;
2790 0 0         $self->brik_help_run_undef_arg('export_as', $index) or return;
2791 0 0         $self->brik_help_run_undef_arg('export_as', $size) or return;
2792              
2793 0 0 0       if ($format ne 'csv' && $format ne 'json') {
2794 0           return $self->log->error("export_as: unsupported export format ".
2795             "[$format]");
2796             }
2797              
2798 0           my $max = $self->max;
2799 0           my $datadir = $self->datadir;
2800              
2801 0           $self->log->debug("export_as: selecting scroll Command...");
2802              
2803 0           my $scroll;
2804 0 0         my $version = $self->version or return;
2805 0 0         if ($version lt "5.0.0") {
2806 0 0         $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2807             }
2808             else {
2809 0 0         $scroll = $self->open_scroll($index, $size) or return;
2810             }
2811              
2812 0           $self->log->debug("export_as: selecting scroll Command...OK.");
2813              
2814 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2815              
2816 0           my $out;
2817             my $csv_header;
2818 0 0         if ($format eq 'csv') {
    0          
2819 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
2820 0           $out->encoding($self->encoding);
2821 0           $out->separator(',');
2822 0           $out->escape('\\');
2823 0           $out->append(1);
2824 0           $out->first_line_is_header(0);
2825 0           $out->write_header(1);
2826 0           $out->use_quoting(1);
2827 0 0         if (defined($self->csv_header)) {
2828 0           my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
  0            
  0            
2829 0           $out->header($sorted);
2830             }
2831 0 0         if (defined($self->csv_encoded_fields)) {
2832 0           $out->encoded_fields($self->csv_encoded_fields);
2833             }
2834 0 0         if (defined($self->csv_object_fields)) {
2835 0           $out->object_fields($self->csv_object_fields);
2836             }
2837              
2838 0           $csv_header = $out->header;
2839             }
2840             elsif ($format eq 'json') {
2841 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
2842 0           $out->encoding($self->encoding);
2843             }
2844              
2845 0           my $total = $self->total_scroll;
2846 0           $self->log->info("export_as: total [$total] for index [$index]");
2847              
2848 0           my %types = ();
2849 0           my $read = 0;
2850 0           my $skipped = 0;
2851 0           my $exported = 0;
2852 0           my $start = time();
2853 0           my $done = $datadir."/$index.exported";
2854 0           my $start_time = time();
2855 0           my %chunk = ();
2856 0           while (my $next = $self->next_scroll(10000)) {
2857 0           for my $this (@$next) {
2858 0           $read++;
2859              
2860 0 0         if (defined($cb)) {
2861 0           $this = $cb->($this);
2862 0 0         if (! defined($this)) {
2863 0           $self->log->error("export_as: callback failed for index ".
2864             "[$index] at read [$read], skipping single entry");
2865 0           $skipped++;
2866 0           next;
2867             }
2868             }
2869              
2870 0           my $id = $this->{_id};
2871 0           my $doc = $this->{_source};
2872             # Prepare for when types will be removed from ES
2873 0   0       my $type = $this->{_type} || '_doc';
2874 0 0         if (! exists($types{$type})) {
2875 0 0         if ($format eq 'csv') {
    0          
2876             # If not given, we guess the CSV fields to use.
2877 0 0         if (! defined($csv_header)) {
2878 0 0         my $fields = $self->list_index_fields($index, $type)
2879             or return;
2880 0           $types{$type}{header} = [ '_id', @$fields ];
2881             }
2882             else {
2883 0           $types{$type}{header} = [ '_id', @$csv_header ];
2884             }
2885              
2886 0           $types{$type}{output} = $datadir."/$index:$type.csv";
2887             }
2888             elsif ($format eq 'json') {
2889 0           $types{$type}{output} = $datadir."/$index:$type.json";
2890             }
2891              
2892             # Verify it has not been exported yet
2893 0 0         if (-f $done) {
2894 0           return $self->log->error("export_as: export already done ".
2895             "for index [$index]");
2896             }
2897              
2898             $self->log->info("export_as: exporting to file [".
2899 0           $types{$type}{output}."] for type [$type], using ".
2900             "chunk size of [$size]");
2901             }
2902              
2903 0           my $h = { _id => $id };
2904              
2905 0           for my $k (keys %$doc) {
2906 0           $h->{$k} = $doc->{$k};
2907             }
2908              
2909 0 0         if ($format eq 'csv') {
2910 0           $out->header($types{$type}{header});
2911             }
2912              
2913 0           push @{$chunk{$type}}, $h;
  0            
2914 0 0         if (@{$chunk{$type}} > 999) {
  0            
2915 0           my $r = $out->write($chunk{$type}, $types{$type}{output});
2916 0 0         if (!defined($r)) {
2917 0           $self->log->warning("export_as: unable to process entry, ".
2918             "skipping");
2919 0           $skipped++;
2920 0           next;
2921             }
2922 0           $chunk{$type} = [];
2923             }
2924              
2925             # Log a status sometimes.
2926 0 0         if (! (++$exported % 100_000)) {
2927 0           my $now = time();
2928 0           my $perc = sprintf("%.02f", $exported / $total * 100);
2929 0           $self->log->info("export_as: fetched [$exported/$total] ".
2930             "($perc%) elements in ".($now - $start)." second(s) ".
2931             "from index [$index]");
2932 0           $start = time();
2933             }
2934              
2935             # Limit export to specified maximum
2936 0 0 0       if ($max > 0 && $exported >= $max) {
2937 0           $self->log->info("export_as: max export reached [$exported] ".
2938             "for index [$index], stopping");
2939 0           last;
2940             }
2941             }
2942             }
2943              
2944             # Process remaining data waiting to be written and build output file list
2945 0           my %files = ();
2946 0           for my $type (keys %types) {
2947 0 0         if (@{$chunk{$type}} > 0) {
  0            
2948 0           $out->write($chunk{$type}, $types{$type}{output});
2949 0           $files{$types{$type}{output}}++;
2950             }
2951             }
2952              
2953 0           $self->close_scroll;
2954              
2955 0           my $stop_time = time();
2956 0           my $duration = $stop_time - $start_time;
2957 0           my $eps = $exported;
2958 0 0         if ($duration > 0) {
2959 0           $eps = $exported / $duration;
2960             }
2961              
2962             my $result = {
2963             read => $read,
2964             exported => $exported,
2965             skipped => $read - $exported,
2966             total_count => $total,
2967             complete => ($exported == $total) ? 1 : 0,
2968             duration => $duration,
2969             eps => $eps,
2970 0 0         files => [ sort { $a cmp $b } keys %files ],
  0            
2971             };
2972              
2973             # Say the file has been processed, and put resulting stats.
2974 0 0         $fd->write($result, $done) or return;
2975              
2976 0           $self->log->info("export_as: done.");
2977              
2978 0           return $result;
2979             }
2980              
2981             sub export_as_csv {
2982 0     0 0   my $self = shift;
2983 0           my ($index, $size, $cb) = @_;
2984              
2985 0   0       $size ||= 10_000;
2986 0           my $es = $self->_es;
2987 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
2988 0 0         $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2989 0 0         $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2990              
2991 0           return $self->export_as('csv', $index, $size, $cb);
2992             }
2993              
2994             sub export_as_json {
2995 0     0 0   my $self = shift;
2996 0           my ($index, $size, $cb) = @_;
2997              
2998 0   0       $size ||= 10_000;
2999 0           my $es = $self->_es;
3000 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3001 0 0         $self->brik_help_run_undef_arg('export_as_json', $index) or return;
3002 0 0         $self->brik_help_run_undef_arg('export_as_json', $size) or return;
3003              
3004 0           return $self->export_as('json', $index, $size, $cb);
3005             }
3006              
3007             #
3008             # Optimization instructions:
3009             # https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
3010             #
3011             sub import_from {
3012 0     0 0   my $self = shift;
3013 0           my ($format, $input, $index, $type, $hash, $cb) = @_;
3014              
3015 0           my $es = $self->_es;
3016 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3017 0 0         $self->brik_help_run_undef_arg('import_from', $format) or return;
3018 0 0         $self->brik_help_run_undef_arg('import_from', $input) or return;
3019 0 0         $self->brik_help_run_file_not_found('import_from', $input) or return;
3020              
3021 0 0 0       if ($format ne 'csv' && $format ne 'json') {
3022 0           return $self->log->error("import_from: unsupported export format ".
3023             "[$format]");
3024             }
3025              
3026             # If index and/or types are not defined, we try to get them from
3027             # input filename
3028 0 0 0       if (! defined($index) || ! defined($type)) {
3029             # Example: index-DATE:type.csv
3030 0 0         if ($input =~ m{^(.+):(.+)\.(?:csv|json)(?:.*)?$}) {
3031 0           my ($this_index, $this_type) = $input =~
3032             m{^(.+):(.+)\.(?:csv|json)(?:.*)?$};
3033 0   0       $index ||= $this_index;
3034 0   0       $type ||= $this_type;
3035             }
3036             }
3037              
3038             # Verify it has not been indexed yet
3039 0           my $done = "$input.imported";
3040 0 0         if (-f $done) {
3041 0           $self->log->info("import_from: import already done for file ".
3042             "[$input]");
3043 0           return 0;
3044             }
3045              
3046             # And default to Attributes if guess failed.
3047 0   0       $index ||= $self->index;
3048 0   0       $type ||= $self->type;
3049 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3050 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3051              
3052 0 0         if ($index eq '*') {
3053 0           return $self->log->error("import_from: cannot import to invalid ".
3054             "index [$index]");
3055             }
3056 0 0         if ($self->use_type) {
3057 0 0         if ($type eq '*') {
3058 0           return $self->log->error("import_from: cannot import to invalid ".
3059             "type [$type]");
3060             }
3061             }
3062              
3063 0           $self->log->debug("input [$input]");
3064 0           $self->log->debug("index [$index]");
3065 0           $self->log->debug("type [$type]");
3066              
3067 0           my $count_before = 0;
3068 0 0         if ($self->is_index_exists($index)) {
3069 0           $count_before = $self->count($index, $type);
3070 0 0         if (! defined($count_before)) {
3071 0           return;
3072             }
3073 0           $self->log->info("import_from: current index [$index] count is ".
3074             "[$count_before]");
3075             }
3076              
3077 0           my $max = $self->max;
3078              
3079 0 0         $self->open_bulk_mode($index, $type) or return;
3080              
3081 0           $self->log->info("import_from: importing file [$input] to index ".
3082             "[$index] with type [$type]");
3083              
3084 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3085              
3086 0           my $out;
3087 0 0         if ($format eq 'csv') {
    0          
3088 0 0         $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
3089 0           $out->encoding($self->encoding);
3090 0           $out->separator(',');
3091 0           $out->escape('\\');
3092 0           $out->first_line_is_header(1);
3093 0           $out->encoded_fields($self->csv_encoded_fields);
3094 0           $out->object_fields($self->csv_object_fields);
3095             }
3096             elsif ($format eq 'json') {
3097 0 0         $out = Metabrik::File::Json->new_from_brik_init($self) or return;
3098 0           $out->encoding($self->encoding);
3099             }
3100              
3101 0           my $refresh_interval;
3102             my $number_of_replicas;
3103 0           my $start = time();
3104 0           my $speed_settings = {};
3105 0           my $imported = 0;
3106 0           my $first = 1;
3107 0           my $read = 0;
3108 0           my $skipped_chunks = 0;
3109 0           my $start_time = time();
3110 0           while (my $this = $out->read_next($input)) {
3111 0           $read++;
3112              
3113 0           my $h = {};
3114 0 0         my $id = $self->use_ignore_id ? undef : $this->{_id};
3115 0           delete $this->{_id};
3116 0           for my $k (keys %$this) {
3117 0           my $value = $this->{$k};
3118             # We keep only fields when they have a value.
3119             # No need to index data that is empty.
3120 0 0 0       if (defined($value) && length($value)) {
3121 0           $h->{$k} = $value;
3122             }
3123             }
3124              
3125 0 0         if (defined($cb)) {
3126 0           $h = $cb->($h);
3127 0 0         if (! defined($h)) {
3128 0           $self->log->error("import_from: callback failed for ".
3129             "index [$index] at read [$read], skipping single entry");
3130 0           $skipped_chunks++;
3131 0           next;
3132             }
3133             }
3134              
3135             # Set routing based on the provided field name, if any.
3136 0           my $this_hash;
3137 0 0 0       if (defined($hash) && defined($hash->{routing})
      0        
3138             && defined($h->{$hash->{routing}})) {
3139 0           $this_hash = { %$hash }; # Make a copy to avoid overwriting
3140             # user provided value.
3141 0           $this_hash->{routing} = $h->{$hash->{routing}};
3142             }
3143              
3144             #$self->log->info(Data::Dumper::Dumper($h));
3145              
3146 0           my $r;
3147 0           eval {
3148 0           $r = $self->index_bulk($h, $index, $type, $this_hash, $id);
3149             };
3150 0 0         if ($@) {
3151 0           chomp($@);
3152 0           $self->log->warning("import_from: error [$@]");
3153             }
3154 0 0         if (! defined($r)) {
3155 0           $self->log->error("import_from: bulk processing failed for ".
3156             "index [$index] at read [$read], skipping chunk");
3157 0           $skipped_chunks++;
3158 0           next;
3159             }
3160              
3161             # Gather index settings, and set values for speed.
3162             # We don't do it earlier, cause we need index to be created,
3163             # and it should have been done from index_bulk Command.
3164 0 0 0       if ($first && $self->is_index_exists($index)) {
3165             # Save current values so we can restore them at the end of Command.
3166             # We ignore errors here, this is non-blocking for indexing.
3167 0           $refresh_interval = $self->get_index_refresh_interval($index);
3168 0           $refresh_interval = $refresh_interval->{$index};
3169 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3170 0           $number_of_replicas = $number_of_replicas->{$index};
3171 0 0         if ($self->use_indexing_optimizations) {
3172 0           $self->set_index_number_of_replicas($index, 0);
3173             }
3174 0           $self->set_index_refresh_interval($index, -1);
3175 0           $first = 0;
3176             }
3177              
3178             # Log a status sometimes.
3179 0 0         if (! (++$imported % 100_000)) {
3180 0           my $now = time();
3181 0           $self->log->info("import_from: imported [$imported] entries in ".
3182             ($now - $start)." second(s) to index [$index]");
3183 0           $start = time();
3184             }
3185              
3186             # Limit import to specified maximum
3187 0 0 0       if ($max > 0 && $imported >= $max) {
3188 0           $self->log->info("import_from: max import reached [$imported] for ".
3189             "index [$index], stopping");
3190 0           last;
3191             }
3192             }
3193              
3194 0           $self->bulk_flush;
3195              
3196 0           my $stop_time = time();
3197 0           my $duration = $stop_time - $start_time;
3198 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3199              
3200 0           $self->refresh_index($index);
3201              
3202 0 0         my $count_current = $self->count($index, $type) or return;
3203 0           $self->log->info("import_from: after index [$index] count is [$count_current] ".
3204             "at EPS [$eps]");
3205              
3206 0           my $skipped = 0;
3207 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3208 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3209 0           $imported = $read;
3210             }
3211             else {
3212 0           $skipped = $read - ($count_current - $count_before);
3213             }
3214              
3215 0           my $result = {
3216             read => $read,
3217             imported => $imported,
3218             skipped => $skipped,
3219             previous_count => $count_before,
3220             current_count => $count_current,
3221             complete => $complete,
3222             duration => $duration,
3223             eps => $eps,
3224             };
3225              
3226             # Say the file has been processed, and put resulting stats.
3227 0 0         $fd->write($result, $done) or return;
3228              
3229             # Restore previous settings, if any
3230 0 0         if (defined($refresh_interval)) {
3231 0           $self->set_index_refresh_interval($index, $refresh_interval);
3232             }
3233 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3234 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3235             }
3236              
3237 0           return $result;
3238             }
3239              
3240             sub import_from_csv {
3241 0     0 0   my $self = shift;
3242 0           my ($input, $index, $type, $hash, $cb) = @_;
3243              
3244 0           my $es = $self->_es;
3245 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3246 0 0         $self->brik_help_run_undef_arg('import_from_csv', $input) or return;
3247 0 0         $self->brik_help_run_file_not_found('import_from_csv', $input)
3248             or return;
3249              
3250 0           return $self->import_from('csv', $input, $index, $type, $hash, $cb);
3251             }
3252              
3253             sub import_from_json {
3254 0     0 0   my $self = shift;
3255 0           my ($input, $index, $type, $hash, $cb) = @_;
3256              
3257 0           my $es = $self->_es;
3258 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3259 0 0         $self->brik_help_run_undef_arg('import_from_json', $input) or return;
3260 0 0         $self->brik_help_run_file_not_found('import_from_json', $input)
3261             or return;
3262              
3263 0           return $self->import_from('json', $input, $index, $type, $hash, $cb);
3264             }
3265              
3266             #
3267             # Same as import_from_csv Command but in worker mode for speed.
3268             #
3269             sub import_from_csv_worker {
3270 0     0 0   my $self = shift;
3271 0           my ($input_csv, $index, $type, $hash, $cb) = @_;
3272              
3273 0           my $es = $self->_es;
3274 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3275 0 0         $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv)
3276             or return;
3277 0 0         $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv)
3278             or return;
3279              
3280             # If index and/or types are not defined, we try to get them from input filename
3281 0 0 0       if (! defined($index) || ! defined($type)) {
3282             # Example: index-DATE:type.csv
3283 0 0         if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
3284 0           my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
3285 0   0       $index ||= $this_index;
3286 0   0       $type ||= $this_type;
3287             }
3288             }
3289              
3290             # Verify it has not been indexed yet
3291 0           my $done = "$input_csv.imported";
3292 0 0         if (-f $done) {
3293 0           $self->log->info("import_from_csv_worker: import already done for ".
3294             "file [$input_csv]");
3295 0           return 0;
3296             }
3297              
3298             # And default to Attributes if guess failed.
3299 0   0       $index ||= $self->index;
3300 0   0       $type ||= $self->type;
3301 0 0         $self->brik_help_set_undef_arg('index', $index) or return;
3302 0 0         $self->brik_help_set_undef_arg('type', $type) or return;
3303              
3304 0 0         if ($index eq '*') {
3305 0           return $self->log->error("import_from_csv_worker: cannot import to invalid ".
3306             "index [$index]");
3307             }
3308 0 0         if ($self->use_type) {
3309 0 0         if ($type eq '*') {
3310 0           return $self->log->error("import_from_csv_worker: cannot import to ".
3311             "invalid type [$type]");
3312             }
3313             }
3314              
3315 0           $self->log->debug("input [$input_csv]");
3316 0           $self->log->debug("index [$index]");
3317 0           $self->log->debug("type [$type]");
3318              
3319 0           my $count_before = 0;
3320 0 0         if ($self->is_index_exists($index)) {
3321 0           $count_before = $self->count($index, $type);
3322 0 0         if (! defined($count_before)) {
3323 0           return;
3324             }
3325 0           $self->log->info("import_from_csv_worker: current index [$index] count is ".
3326             "[$count_before]");
3327             }
3328              
3329 0           my $max = $self->max;
3330              
3331 0 0         $self->open_bulk_mode($index, $type) or return;
3332              
3333             #my $batch = undef;
3334 0           my $batch = 10_000;
3335              
3336 0           $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
3337             "index [$index] with type [$type] and batch [$batch]");
3338              
3339 0 0         my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3340              
3341 0 0         my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
3342 0           $fc->separator(',');
3343 0           $fc->escape('\\');
3344 0           $fc->first_line_is_header(1);
3345 0           $fc->encoded_fields($self->csv_encoded_fields);
3346 0           $fc->object_fields($self->csv_object_fields);
3347              
3348 0 0         my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
3349 0           $wp->pool_size(2);
3350              
3351 0 0         $wp->create_manager or return;
3352              
3353 0           my $refresh_interval;
3354             my $number_of_replicas;
3355 0           my $start = time();
3356 0           my $speed_settings = {};
3357 0           my $imported = 0;
3358 0           my $first = 1;
3359 0           my $read = 0;
3360 0           my $skipped_chunks = 0;
3361 0           my $start_time = time();
3362 0           while (my $list = $fc->read_next($input_csv, $batch)) {
3363              
3364             $wp->start(sub {
3365 0     0     my @list = ();
3366 0           for my $this (@$list) {
3367 0           $read++;
3368              
3369 0           my $h = {};
3370 0           my $id = $this->{_id};
3371 0           delete $this->{_id};
3372 0           for my $k (keys %$this) {
3373 0           my $value = $this->{$k};
3374             # We keep only fields when they have a value.
3375             # No need to index data that is empty.
3376 0 0 0       if (defined($value) && length($value)) {
3377 0           $h->{$k} = $value;
3378             }
3379             }
3380              
3381 0 0         if (defined($cb)) {
3382 0           $h = $cb->($h);
3383 0 0         if (! defined($h)) {
3384 0           $self->log->error("import_from_csv_worker: callback failed for ".
3385             "index [$index] at read [$read], skipping single entry");
3386 0           $skipped_chunks++;
3387 0           next;
3388             }
3389             }
3390              
3391 0           push @list, $h;
3392             }
3393              
3394 0           my $r;
3395 0           eval {
3396 0           $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
3397             };
3398 0 0         if ($@) {
3399 0           chomp($@);
3400 0           $self->log->warning("import_from_csv_worker: error [$@]");
3401             }
3402 0 0         if (! defined($r)) {
3403 0           $self->log->error("import_from_csv_worker: bulk processing failed for ".
3404             "index [$index] at read [$read], skipping chunk");
3405 0           $skipped_chunks++;
3406 0           next;
3407             }
3408              
3409             # Log a status sometimes.
3410 0 0         if (! ($imported % 10_000)) {
3411 0           my $now = time();
3412 0           my $diff = sprintf("%.02f", $now - $start);
3413 0           my $eps = sprintf("%.02f", $imported / $diff);
3414 0           $self->log->info("import_from_csv_worker: imported [$imported] entries ".
3415             "in [$diff] second(s) to index [$index] at EPS [$eps]");
3416 0           $start = time();
3417             }
3418              
3419 0           exit(0);
3420 0           });
3421              
3422             # Gather index settings, and set values for speed.
3423             # We don't do it earlier, cause we need index to be created,
3424             # and it should have been done from index_bulk Command.
3425 0 0 0       if ($first && $self->is_index_exists($index)) {
3426             # Save current values so we can restore them at the end of Command.
3427             # We ignore errors here, this is non-blocking for indexing.
3428 0           $refresh_interval = $self->get_index_refresh_interval($index);
3429 0           $refresh_interval = $refresh_interval->{$index};
3430 0           $number_of_replicas = $self->get_index_number_of_replicas($index);
3431 0           $number_of_replicas = $number_of_replicas->{$index};
3432 0 0         if ($self->use_indexing_optimizations) {
3433 0           $self->set_index_number_of_replicas($index, 0);
3434             }
3435 0           $self->set_index_refresh_interval($index, -1);
3436 0           $first = 0;
3437             }
3438              
3439             # Log a status sometimes.
3440             #$imported += @$list;
3441             #if (! ($imported % 10_000)) {
3442             #my $now = time();
3443             #my $diff = sprintf("%.02f", $now - $start);
3444             #my $eps = sprintf("%.02f", 10_000 / $diff);
3445             #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
3446             #"in [$diff] second(s) to index [$index] at EPS [$eps]");
3447             #$start = time();
3448             #}
3449              
3450             # Limit import to specified maximum
3451 0 0 0       if ($max > 0 && $imported >= $max) {
3452 0           $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
3453             "index [$index], stopping");
3454 0           last;
3455             }
3456              
3457 0 0         last if (@$list < $batch);
3458              
3459 0           $imported += @$list;
3460             }
3461              
3462 0           $wp->stop;
3463              
3464 0           $self->bulk_flush;
3465              
3466 0           my $stop_time = time();
3467 0           my $duration = $stop_time - $start_time;
3468 0   0       my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3469              
3470 0           $self->refresh_index($index);
3471              
3472 0 0         my $count_current = $self->count($index, $type) or return;
3473 0           $self->log->info("import_from_csv_worker: after index [$index] count ".
3474             "is [$count_current] at EPS [$eps]");
3475              
3476 0           my $skipped = 0;
3477 0 0         my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3478 0 0         if ($complete) { # If complete, import has been retried, and everything is now ok.
3479 0           $imported = $read;
3480             }
3481             else {
3482 0           $skipped = $read - ($count_current - $count_before);
3483             }
3484              
3485 0           my $result = {
3486             read => $read,
3487             imported => $imported,
3488             skipped => $skipped,
3489             previous_count => $count_before,
3490             current_count => $count_current,
3491             complete => $complete,
3492             duration => $duration,
3493             eps => $eps,
3494             };
3495              
3496             # Say the file has been processed, and put resulting stats.
3497 0 0         $fd->write($result, $done) or return;
3498              
3499             # Restore previous settings, if any
3500 0 0         if (defined($refresh_interval)) {
3501 0           $self->set_index_refresh_interval($index, $refresh_interval);
3502             }
3503 0 0 0       if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3504 0           $self->set_index_number_of_replicas($index, $number_of_replicas);
3505             }
3506              
3507 0           return $result;
3508             }
3509              
3510             #
3511             # http://localhost:9200/_nodes/stats/process?pretty
3512             #
3513             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3514             #
3515             sub get_stats_process {
3516 0     0 0   my $self = shift;
3517              
3518 0           my $es = $self->_es;
3519 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3520              
3521 0           my $r;
3522 0           eval {
3523 0           $r = $es->nodes->stats(
3524             metric => [ qw(process) ],
3525             );
3526             };
3527 0 0         if ($@) {
3528 0           chomp($@);
3529 0           return $self->log->error("get_stats_process: failed: [$@]");
3530             }
3531              
3532 0           return $r;
3533             }
3534              
3535             #
3536             # curl http://localhost:9200/_nodes/process?pretty
3537             #
3538             # Search::Elasticsearch::Client::2_0::Direct::Nodes
3539             #
3540             sub get_process {
3541 0     0 0   my $self = shift;
3542              
3543 0           my $es = $self->_es;
3544 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3545              
3546 0           my $r;
3547 0           eval {
3548 0           $r = $es->nodes->info(
3549             metric => [ qw(process) ],
3550             );
3551             };
3552 0 0         if ($@) {
3553 0           chomp($@);
3554 0           return $self->log->error("get_process: failed: [$@]");
3555             }
3556              
3557 0           return $r;
3558             }
3559              
3560             #
3561             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3562             #
3563             sub get_cluster_state {
3564 0     0 0   my $self = shift;
3565              
3566 0           my $es = $self->_es;
3567 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3568              
3569 0           my $r;
3570 0           eval {
3571 0           $r = $es->cluster->state;
3572             };
3573 0 0         if ($@) {
3574 0           chomp($@);
3575 0           return $self->log->error("get_cluster_state: failed: [$@]");
3576             }
3577              
3578 0           return $r;
3579             }
3580              
3581             #
3582             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3583             #
3584             sub get_cluster_health {
3585 0     0 0   my $self = shift;
3586              
3587 0           my $es = $self->_es;
3588 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3589              
3590 0           my $r;
3591 0           eval {
3592 0           $r = $es->cluster->health;
3593             };
3594 0 0         if ($@) {
3595 0           chomp($@);
3596 0           return $self->log->error("get_cluster_health: failed: [$@]");
3597             }
3598              
3599 0           return $r;
3600             }
3601              
3602             #
3603             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3604             #
3605             sub get_cluster_settings {
3606 0     0 0   my $self = shift;
3607              
3608 0           my $es = $self->_es;
3609 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3610              
3611 0           my $r;
3612 0           eval {
3613 0           $r = $es->cluster->get_settings;
3614             };
3615 0 0         if ($@) {
3616 0           chomp($@);
3617 0           return $self->log->error("get_cluster_settings: failed: [$@]");
3618             }
3619              
3620 0           return $r;
3621             }
3622              
3623             #
3624             # Search::Elasticsearch::Client::2_0::Direct::Cluster
3625             #
3626             sub put_cluster_settings {
3627 0     0 0   my $self = shift;
3628 0           my ($settings) = @_;
3629              
3630 0           my $es = $self->_es;
3631 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3632 0 0         $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
3633 0 0         $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
3634              
3635 0           my %args = (
3636             body => $settings,
3637             );
3638              
3639 0           my $r;
3640 0           eval {
3641 0           $r = $es->cluster->put_settings(%args);
3642             };
3643 0 0         if ($@) {
3644 0           chomp($@);
3645 0           return $self->log->error("put_cluster_settings: failed: [$@]");
3646             }
3647              
3648 0           return $r;
3649             }
3650              
3651             sub count_green_indices {
3652 0     0 0   my $self = shift;
3653              
3654 0 0         my $get = $self->show_indices or return;
3655              
3656 0           my $count = 0;
3657 0           for (@$get) {
3658 0 0         if (/^\s*green\s+/) {
3659 0           $count++;
3660             }
3661             }
3662              
3663 0           return $count;
3664             }
3665              
3666             sub count_yellow_indices {
3667 0     0 0   my $self = shift;
3668              
3669 0 0         my $get = $self->show_indices or return;
3670              
3671 0           my $count = 0;
3672 0           for (@$get) {
3673 0 0         if (/^\s*yellow\s+/) {
3674 0           $count++;
3675             }
3676             }
3677              
3678 0           return $count;
3679             }
3680              
3681             sub count_red_indices {
3682 0     0 0   my $self = shift;
3683              
3684 0 0         my $get = $self->show_indices or return;
3685              
3686 0           my $count = 0;
3687 0           for (@$get) {
3688 0 0         if (/^\s*red\s+/) {
3689 0           $count++;
3690             }
3691             }
3692              
3693 0           return $count;
3694             }
3695              
3696             sub count_indices {
3697 0     0 0   my $self = shift;
3698              
3699 0 0         my $get = $self->show_indices or return;
3700              
3701 0           return scalar @$get;
3702             }
3703              
3704             sub list_indices_status {
3705 0     0 0   my $self = shift;
3706              
3707 0 0         my $get = $self->show_indices or return;
3708              
3709 0           my $count_red = 0;
3710 0           my $count_yellow = 0;
3711 0           my $count_green = 0;
3712 0           for (@$get) {
3713 0 0         if (/^\s*red\s+/) {
    0          
    0          
3714 0           $count_red++;
3715             }
3716             elsif (/^\s*yellow\s+/) {
3717 0           $count_yellow++;
3718             }
3719             elsif (/^\s*green\s+/) {
3720 0           $count_green++;
3721             }
3722             }
3723              
3724             return {
3725 0           red => $count_red,
3726             yellow => $count_yellow,
3727             green => $count_green,
3728             };
3729             }
3730              
3731             sub count_shards {
3732 0     0 0   my $self = shift;
3733              
3734 0 0         my $indices = $self->get_indices or return;
3735              
3736 0           my $count = 0;
3737 0           for (@$indices) {
3738 0           $count += $_->{shards};
3739             }
3740              
3741 0           return $count;
3742             }
3743              
3744             sub count_size {
3745 0     0 0   my $self = shift;
3746 0           my ($string) = @_;
3747              
3748 0 0         my $indices = $self->get_indices($string) or return;
3749              
3750 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3751 0           $fn->decimal_point(".");
3752 0           $fn->kibi_suffix("kb");
3753 0           $fn->mebi_suffix("mb");
3754 0           $fn->gibi_suffix("gb");
3755 0           $fn->kilo_suffix("KB");
3756 0           $fn->mega_suffix("MB");
3757 0           $fn->giga_suffix("GB");
3758              
3759 0           my $size = 0;
3760 0           for (@$indices) {
3761 0           $size += $fn->to_number($_->{size});
3762             }
3763              
3764 0           return $fn->from_number($size);
3765             }
3766              
3767             sub count_total_size {
3768 0     0 0   my $self = shift;
3769 0           my ($string) = @_;
3770              
3771 0 0         my $indices = $self->get_indices($string) or return;
3772              
3773 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3774 0           $fn->decimal_point(".");
3775 0           $fn->kibi_suffix("kb");
3776 0           $fn->mebi_suffix("mb");
3777 0           $fn->gibi_suffix("gb");
3778 0           $fn->kilo_suffix("KB");
3779 0           $fn->mega_suffix("MB");
3780 0           $fn->giga_suffix("GB");
3781              
3782 0           my $size = 0;
3783 0           for (@$indices) {
3784 0           $size += $fn->to_number($_->{total_size});
3785             }
3786              
3787 0           return $fn->from_number($size);
3788             }
3789              
3790             sub count_count {
3791 0     0 0   my $self = shift;
3792              
3793 0 0         my $indices = $self->get_indices or return;
3794              
3795 0 0         my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3796 0           $fn->kilo_suffix('k');
3797 0           $fn->mega_suffix('m');
3798 0           $fn->giga_suffix('M');
3799              
3800 0           my $count = 0;
3801 0           for (@$indices) {
3802 0           $count += $_->{count};
3803             }
3804              
3805 0           return $fn->from_number($count);
3806             }
3807              
3808             sub list_green_indices {
3809 0     0 0   my $self = shift;
3810              
3811 0 0         my $get = $self->get_indices or return;
3812              
3813 0           my @indices = ();
3814 0           for (@$get) {
3815 0 0         if ($_->{color} eq 'green') {
3816 0           push @indices, $_->{index};
3817             }
3818             }
3819              
3820 0           return \@indices;
3821             }
3822              
3823             sub list_yellow_indices {
3824 0     0 0   my $self = shift;
3825              
3826 0 0         my $get = $self->get_indices or return;
3827              
3828 0           my @indices = ();
3829 0           for (@$get) {
3830 0 0         if ($_->{color} eq 'yellow') {
3831 0           push @indices, $_->{index};
3832             }
3833             }
3834              
3835 0           return \@indices;
3836             }
3837              
3838             sub list_red_indices {
3839 0     0 0   my $self = shift;
3840              
3841 0 0         my $get = $self->get_indices or return;
3842              
3843 0           my @indices = ();
3844 0           for (@$get) {
3845 0 0         if ($_->{color} eq 'red') {
3846 0           push @indices, $_->{index};
3847             }
3848             }
3849              
3850 0           return \@indices;
3851             }
3852              
3853             #
3854             # https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
3855             #
3856             sub list_datatypes {
3857 0     0 0   my $self = shift;
3858              
3859             return {
3860 0           core => [ qw(string long integer short byte double float data boolean binary) ],
3861             };
3862             }
3863              
3864             #
3865             # Return total hits for last www_search
3866             #
3867             sub get_hits_total {
3868 0     0 0   my $self = shift;
3869 0           my ($run) = @_;
3870              
3871 0 0         $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
3872              
3873 0 0         if (ref($run) eq 'HASH') {
3874 0 0 0       if (exists($run->{hits}) && exists($run->{hits}{total})) {
3875 0           return $run->{hits}{total};
3876             }
3877             }
3878              
3879 0           return $self->log->error("get_hits_total: last Command not compatible");
3880             }
3881              
3882             sub disable_shard_allocation {
3883 0     0 0   my $self = shift;
3884              
3885 0           my $settings = {
3886             persistent => {
3887             'cluster.routing.allocation.enable' => 'none',
3888             }
3889             };
3890              
3891 0           return $self->put_cluster_settings($settings);
3892             }
3893              
3894             sub enable_shard_allocation {
3895 0     0 0   my $self = shift;
3896              
3897 0           my $settings = {
3898             persistent => {
3899             'cluster.routing.allocation.enable' => 'all',
3900             }
3901             };
3902              
3903 0           return $self->put_cluster_settings($settings);
3904             }
3905              
3906             sub flush_synced {
3907 0     0 0   my $self = shift;
3908              
3909 0           my $es = $self->_es;
3910 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3911              
3912 0           my $r;
3913 0           eval {
3914 0           $r = $es->indices->flush_synced;
3915             };
3916 0 0         if ($@) {
3917 0           chomp($@);
3918 0           return $self->log->error("flush_synced: failed: [$@]");
3919             }
3920              
3921 0           return $r;
3922             }
3923              
3924             #
3925             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3926             #
3927             # run client::elasticsearch create_snapshot_repository myrepo
3928             # "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
3929             #
3930             # You have to set path.repo in elasticsearch.yml like:
3931             # path.repo: ["/home/gomor/es-backups"]
3932             #
3933             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
3934             #
3935             sub create_snapshot_repository {
3936 0     0 0   my $self = shift;
3937 0           my ($body, $repository_name) = @_;
3938              
3939 0           my $es = $self->_es;
3940 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3941 0 0         $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
3942              
3943 0   0       $repository_name ||= 'repository';
3944              
3945 0           my %args = (
3946             repository => $repository_name,
3947             body => $body,
3948             );
3949              
3950 0           my $r;
3951 0           eval {
3952 0           $r = $es->snapshot->create_repository(%args);
3953             };
3954 0 0         if ($@) {
3955 0           chomp($@);
3956 0           return $self->log->error("create_snapshot_repository: failed: [$@]");
3957             }
3958              
3959 0           return $r;
3960             }
3961              
3962             sub create_shared_fs_snapshot_repository {
3963 0     0 0   my $self = shift;
3964 0           my ($location, $repository_name) = @_;
3965              
3966 0   0       $repository_name ||= 'repository';
3967 0 0         $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
3968              
3969 0 0         if ($location !~ m{^/}) {
3970 0           return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
3971             "a full directory path, this one is invalid [$location]");
3972             }
3973              
3974 0           my $body = {
3975             #type => 'fs',
3976             settings => {
3977             compress => 'true',
3978             location => $location,
3979             },
3980             };
3981              
3982 0           return $self->create_snapshot_repository($body, $repository_name);
3983             }
3984              
3985             #
3986             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
3987             #
3988             sub get_snapshot_repositories {
3989 0     0 0   my $self = shift;
3990              
3991 0           my $es = $self->_es;
3992 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
3993              
3994 0           my $r;
3995 0           eval {
3996 0           $r = $es->snapshot->get_repository;
3997             };
3998 0 0         if ($@) {
3999 0           chomp($@);
4000 0           return $self->log->error("get_snapshot_repositories: failed: [$@]");
4001             }
4002              
4003 0           return $r;
4004             }
4005              
4006             #
4007             # Search::Elasticsearch::Client::2_0::Direct::Snapshot
4008             #
4009             sub get_snapshot_status {
4010 0     0 0   my $self = shift;
4011              
4012 0           my $es = $self->_es;
4013 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4014              
4015 0           my $r;
4016 0           eval {
4017 0           $r = $es->snapshot->status;
4018             };
4019 0 0         if ($@) {
4020 0           chomp($@);
4021 0           return $self->log->error("get_snapshot_status: failed: [$@]");
4022             }
4023              
4024 0           return $r;
4025             }
4026              
4027             #
4028             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4029             #
4030             sub create_snapshot {
4031 0     0 0   my $self = shift;
4032 0           my ($snapshot_name, $repository_name, $body) = @_;
4033              
4034 0           my $es = $self->_es;
4035 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4036              
4037 0   0       $snapshot_name ||= 'snapshot';
4038 0   0       $repository_name ||= 'repository';
4039              
4040 0           my %args = (
4041             repository => $repository_name,
4042             snapshot => $snapshot_name,
4043             );
4044 0 0         if (defined($body)) {
4045 0           $args{body} = $body;
4046             }
4047              
4048 0           my $r;
4049 0           eval {
4050 0           $r = $es->snapshot->create(%args);
4051             };
4052 0 0         if ($@) {
4053 0           chomp($@);
4054 0           return $self->log->error("create_snapshot: failed: [$@]");
4055             }
4056              
4057 0           return $r;
4058             }
4059              
4060             sub create_snapshot_for_indices {
4061 0     0 0   my $self = shift;
4062 0           my ($indices, $snapshot_name, $repository_name) = @_;
4063              
4064 0 0         $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
4065              
4066 0   0       $snapshot_name ||= 'snapshot';
4067 0   0       $repository_name ||= 'repository';
4068              
4069 0           my $body = {
4070             indices => $indices,
4071             };
4072              
4073 0           return $self->create_snapshot($snapshot_name, $repository_name, $body);
4074             }
4075              
4076             sub is_snapshot_finished {
4077 0     0 0   my $self = shift;
4078              
4079 0 0         my $status = $self->get_snapshot_status or return;
4080              
4081 0 0         if (@{$status->{snapshots}} == 0) {
  0            
4082 0           return 1;
4083             }
4084              
4085 0           return 0;
4086             }
4087              
4088             sub get_snapshot_state {
4089 0     0 0   my $self = shift;
4090              
4091 0 0         if ($self->is_snapshot_finished) {
4092 0           return $self->log->info("get_snapshot_state: is already finished");
4093             }
4094              
4095 0 0         my $status = $self->get_snapshot_status or return;
4096              
4097 0           my @indices_done = ();
4098 0           my @indices_not_done = ();
4099              
4100 0           my $list = $status->{snapshots};
4101 0           for my $snapshot (@$list) {
4102 0           my $indices = $snapshot->{indices};
4103 0           for my $index (@$indices) {
4104 0           my $done = $index->{shards_stats}{done};
4105 0 0         if ($done) {
4106 0           push @indices_done, $index;
4107             }
4108             else {
4109 0           push @indices_not_done, $index;
4110             }
4111             }
4112             }
4113              
4114 0           return { done => \@indices_done, not_done => \@indices_not_done };
4115             }
4116              
4117       0 0   sub verify_snapshot_repository {
4118             }
4119              
4120             sub delete_snapshot_repository {
4121 0     0 0   my $self = shift;
4122 0           my ($repository_name) = @_;
4123              
4124 0           my $es = $self->_es;
4125 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4126 0 0         $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
4127              
4128 0           my $r;
4129 0           eval {
4130 0           $r = $es->snapshot->delete_repository(
4131             repository => $repository_name,
4132             );
4133             };
4134 0 0         if ($@) {
4135 0           chomp($@);
4136 0           return $self->log->error("delete_snapshot_repository: failed: [$@]");
4137             }
4138              
4139 0           return $r;
4140             }
4141              
4142             sub get_snapshot {
4143 0     0 0   my $self = shift;
4144 0           my ($snapshot_name, $repository_name) = @_;
4145              
4146 0           my $es = $self->_es;
4147 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4148              
4149 0   0       $snapshot_name ||= 'snapshot';
4150 0   0       $repository_name ||= 'repository';
4151              
4152 0           my $r;
4153 0           eval {
4154 0           $r = $es->snapshot->get(
4155             repository => $repository_name,
4156             snapshot => $snapshot_name,
4157             );
4158             };
4159 0 0         if ($@) {
4160 0           chomp($@);
4161 0           return $self->log->error("get_snapshot: failed: [$@]");
4162             }
4163              
4164 0           return $r;
4165             }
4166              
4167             #
4168             # Search::Elasticsearch::Client::5_0::Direct::Snapshot
4169             #
4170             sub delete_snapshot {
4171 0     0 0   my $self = shift;
4172 0           my ($snapshot_name, $repository_name) = @_;
4173              
4174 0           my $es = $self->_es;
4175 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4176 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
4177 0 0         $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
4178              
4179 0           my $timeout = $self->rtimeout;
4180              
4181 0           my $r;
4182 0           eval {
4183 0           $r = $es->snapshot->delete(
4184             repository => $repository_name,
4185             snapshot => $snapshot_name,
4186             master_timeout => "${timeout}s",
4187             );
4188             };
4189 0 0         if ($@) {
4190 0           chomp($@);
4191 0           return $self->log->error("delete_snapshot: failed: [$@]");
4192             }
4193              
4194 0           return $r;
4195             }
4196              
4197             #
4198             # https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
4199             #
4200             sub restore_snapshot {
4201 0     0 0   my $self = shift;
4202 0           my ($snapshot_name, $repository_name, $body) = @_;
4203              
4204 0           my $es = $self->_es;
4205 0   0       $snapshot_name ||= 'snapshot';
4206 0   0       $repository_name ||= 'repository';
4207 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
4208 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
4209 0 0         $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
4210              
4211 0           my %args = (
4212             repository => $repository_name,
4213             snapshot => $snapshot_name,
4214             );
4215 0 0         if (defined($body)) {
4216 0           $args{body} = $body;
4217             }
4218              
4219 0           my $r;
4220 0           eval {
4221 0           $r = $es->snapshot->restore(%args);
4222             };
4223 0 0         if ($@) {
4224 0           chomp($@);
4225 0           return $self->log->error("restore_snapshot: failed: [$@]");
4226             }
4227              
4228 0           return $r;
4229             }
4230              
4231             sub restore_snapshot_for_indices {
4232 0     0 0   my $self = shift;
4233 0           my ($indices, $snapshot_name, $repository_name) = @_;
4234              
4235 0   0       $snapshot_name ||= 'snapshot';
4236 0   0       $repository_name ||= 'repository';
4237 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
4238 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
4239 0 0         $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
4240              
4241 0           my $body = {
4242             indices => $indices,
4243             };
4244              
4245 0           return $self->restore_snapshot($snapshot_name, $repository_name, $body);
4246             }
4247              
4248             # shard occupation
4249             #
4250             # curl -XGET "http://127.0.0.1:9200/_cat/shards?v
4251             # Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
4252             #
4253             # disk occuption:
4254             # curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
4255             #
4256             #
4257             # Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
4258             #
4259              
4260             # Check memory lock
4261              
4262             # curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
4263             # {
4264             # "nodes" : {
4265             # "3XXX" : {
4266             # "process" : {
4267             # "mlockall" : true
4268             # }
4269             # }
4270             # }
4271             # }
4272              
4273             1;
4274              
4275             __END__