File Coverage

blib/lib/Metabrik/Client/Elasticsearch/Tasks.pm
Criterion Covered Total %
statement 9 58 15.5
branch 0 20 0.0
condition 0 5 0.0
subroutine 3 10 30.0
pod 2 7 28.5
total 14 100 14.0


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # client::elasticsearch::tasks Brik
5             #
6             package Metabrik::Client::Elasticsearch::Tasks;
7 1     1   692 use strict;
  1         3  
  1         31  
8 1     1   5 use warnings;
  1         2  
  1         28  
9              
10             #
11             # DOC: Search::Elasticsearch::Client::6_0::Direct::Tasks
12             #
13              
14 1     1   5 use base qw(Metabrik::Client::Elasticsearch);
  1         3  
  1         740  
15              
16             sub brik_properties {
17             return {
18 0     0 1   revision => '$Revision$',
19             tags => [ qw(unstable) ],
20             author => 'GomoR ',
21             license => 'http://opensource.org/licenses/BSD-3-Clause',
22             attributes => {
23             },
24             attributes_default => {
25             },
26             commands => {
27             list => [ ],
28             get_taskid => [ qw(id) ],
29             get_forcemerge => [ ],
30             show_forcemerge_progress => [ ],
31             loop_show_forcemerge_progress => [ qw(seconds|OPTIONAL) ],
32             },
33             };
34             }
35              
36             sub brik_init {
37 0     0 1   my $self = shift;
38              
39 0 0         $self->open or return 0;
40              
41 0           return $self->SUPER::brik_init;
42             }
43              
44             sub list {
45 0     0 0   my $self = shift;
46              
47 0           return $self->_es->tasks->list;
48             }
49              
50             sub get_taskid {
51 0     0 0   my $self = shift;
52 0           my ($id) = @_;
53              
54 0 0         $self->brik_help_run_undef_arg('get_taskid', $id) or return;
55              
56 0           my $tasks = $self->_es->tasks;
57              
58 0           return $tasks->get(task_id => $id);
59             }
60              
61             sub get_forcemerge {
62 0     0 0   my $self = shift;
63              
64 0 0         my $list = $self->list or return;
65              
66 0           my $nodes = $list->{nodes};
67 0 0         if (! defined($nodes)) {
68 0           return $self->log->error("get_forcemerge: no nodes found");
69             }
70              
71 0           my %tasks = ();
72 0           for my $node (keys %$nodes) {
73 0           for my $id (keys %{$nodes->{$node}}) {
  0            
74 0           my $tasks = $nodes->{$node}{tasks};
75 0           for my $task (keys %$tasks) {
76 0           my $action = $tasks->{$task}{action};
77 0 0 0       if ($action eq 'indices:admin/forcemerge'
78             && !exists($tasks{$task})) {
79 0           $tasks{$task} = $tasks->{$task};
80             }
81             }
82             }
83             }
84              
85 0           return \%tasks;
86             }
87              
88             sub show_forcemerge_progress {
89 0     0 0   my $self = shift;
90              
91 0 0         my $tasks = $self->get_forcemerge or return;
92 0 0         if (! keys %$tasks) {
93 0           $self->log->info("show_forcemerge_progress: no forcemerge task ".
94             "in progress");
95 0           return 0;
96             }
97              
98 0           my $count = 1;
99 0           for my $id (sort { $a cmp $b } keys %$tasks) {
  0            
100 0 0         my $task = $self->get_taskid($id) or next;
101              
102 0           my $desc = $task->{task}{description};
103 0           my $start_time = $task->{task}{start_time_in_millis};
104 0           my $running_time = $task->{task}{running_time_in_nanos};
105              
106 0           print "Task ID [$id] count [".$count++."] is running...\n";
107             }
108              
109 0           return 1;
110             }
111              
112             sub loop_show_forcemerge_progress {
113 0     0 0   my $self = shift;
114 0           my ($sec) = @_;
115              
116 0   0       $sec ||= 60;
117 0           my $es = $self->_es;
118 0 0         $self->brik_help_run_undef_arg('open', $es) or return;
119              
120 0           while (1) {
121 0 0         $self->show_forcemerge_progress or return;
122 0           sleep($sec);
123             }
124              
125 0           return 1;
126             }
127              
128             1;
129              
130             __END__