File Coverage

blib/lib/Catmandu/Store/ElasticSearch/Bag.pm
Criterion Covered Total %
statement 15 62 24.1
branch 0 22 0.0
condition 0 11 0.0
subroutine 5 17 29.4
pod 0 8 0.0
total 20 120 16.6


line stmt bran cond sub pod time code
1             package Catmandu::Store::ElasticSearch::Bag;
2              
3 1     1   7 use Catmandu::Sane;
  1         1  
  1         14  
4 1     1   358 use Moo;
  1         44  
  1         8  
5 1     1   1152 use Catmandu::Hits;
  1         57901  
  1         35  
6 1     1   453 use Catmandu::Store::ElasticSearch::Searcher;
  1         2  
  1         32  
7 1     1   421 use Catmandu::Store::ElasticSearch::CQL;
  1         2  
  1         1534  
8              
9             with 'Catmandu::Bag';
10             with 'Catmandu::Searchable';
11              
12             has buffer_size => (is => 'ro', lazy => 1, builder => 'default_buffer_size');
13             has _bulk => (is => 'ro', lazy => 1, builder => '_build_bulk');
14             has cql_mapping => (is => 'ro');
15             has on_error => (is => 'ro', default => sub { 'IGNORE' });
16              
17 0     0 0   sub default_buffer_size { 100 }
18              
19             sub _build_bulk {
20 0     0     my ($self) = @_;
21 0           my %args = (
22             index => $self->store->index_name,
23             type => $self->name,
24             max_count => $self->buffer_size,
25 0           on_error => \&{$self->on_error},
26             );
27 0 0         if ($self->log->is_debug) {
28             $args{on_success} = sub {
29 0     0     my ($action, $res, $i) = @_; # TODO return doc instead of index
30 0           $self->log->debug($res);
31 0           };
32             }
33 0           $self->store->es->bulk_helper(%args);
34             }
35              
36             sub generator {
37 0     0 0   my ($self) = @_;
38             sub {
39 0     0     state $scroll = $self->store->es->scroll_helper(
40             index => $self->store->index_name,
41             type => $self->name,
42             search_type => 'scan',
43             size => $self->buffer_size, # TODO divide by number of shards
44             body => {
45             query => {match_all => {}},
46             },
47             );
48 0   0       my $data = $scroll->next // return;
49 0           $data->{_source};
50 0           };
51             }
52              
53             sub count {
54 0     0 0   my ($self) = @_;
55 0           $self->store->es->count(
56             index => $self->store->index_name,
57             type => $self->name,
58             )->{count};
59             }
60              
61             sub get { # TODO ignore missing
62             my ($self, $id) = @_;
63             try {
64             $self->store->es->get_source(
65             index => $self->store->index_name,
66             type => $self->name,
67             id => $id,
68             );
69             } catch_case [
70             'Search::Elasticsearch::Error::Missing' => sub { undef }
71             ];
72             }
73              
74             sub add {
75             my ($self, $data) = @_;
76             $self->_bulk->index({
77             id => $data->{_id},
78             source => $data,
79             });
80             }
81              
82             sub delete {
83             my ($self, $id) = @_;
84             $self->_bulk->delete({id => $id});
85             }
86              
87             sub delete_all { # TODO refresh
88 0     0 0   my ($self) = @_;
89 0           my $es = $self->store->es;
90 0           $es->delete_by_query(
91             index => $self->store->index_name,
92             type => $self->name,
93             body => {
94             query => {match_all => {}},
95             },
96             );
97             }
98              
99             sub delete_by_query { # TODO refresh
100             my ($self, %args) = @_;
101             my $es = $self->store->es;
102             $es->delete_by_query(
103             index => $self->store->index_name,
104             type => $self->name,
105             body => {
106             query => $args{query},
107             },
108             );
109             }
110              
111             sub commit {
112 0     0 0   my ($self) = @_;
113 0           $self->_bulk->flush;
114             }
115              
116             sub search {
117             my ($self, %args) = @_;
118              
119             my $start = delete $args{start};
120             my $limit = delete $args{limit};
121             my $bag = delete $args{reify};
122              
123             if ($bag) {
124             $args{fields} = [];
125             }
126              
127             my $res = $self->store->es->search(
128             index => $self->store->index_name,
129             type => $self->name,
130             body => {
131             %args,
132             from => $start,
133             size => $limit,
134             },
135             );
136              
137             my $docs = $res->{hits}{hits};
138              
139             my $hits = {
140             start => $start,
141             limit => $limit,
142             total => $res->{hits}{total},
143             };
144              
145             if ($bag) {
146             $hits->{hits} = [ map { $bag->get($_->{_id}) } @$docs ];
147             } elsif ($args{fields}) {
148             $hits->{hits} = [ map { $_->{fields} || {} } @$docs ];
149             } else {
150             $hits->{hits} = [ map { $_->{_source} } @$docs ];
151             }
152              
153             $hits = Catmandu::Hits->new($hits);
154              
155             for my $key (qw(facets suggest)) {
156             $hits->{$key} = $res->{$key} if exists $args{$key};
157             }
158              
159             if ($args{highlight}) {
160             for my $hit (@$docs) {
161             if (my $hl = $hit->{highlight}) {
162             $hits->{highlight}{$hit->{_id}} = $hl;
163             }
164             }
165             }
166              
167             $hits;
168             }
169              
170             sub searcher {
171             my ($self, %args) = @_;
172             Catmandu::Store::ElasticSearch::Searcher->new(%args, bag => $self);
173             }
174              
175             sub translate_sru_sortkeys {
176 0     0 0   my ($self, $sortkeys) = @_;
177 0           [ grep { defined $_ } map { $self->_translate_sru_sortkey($_) } split /\s+/, $sortkeys ];
  0            
  0            
178             }
179              
180             sub _translate_sru_sortkey {
181 0     0     my ($self, $sortkey) = @_;
182 0           my ($field, $schema, $asc) = split /,/, $sortkey;
183 0 0         $field || return;
184 0 0         if (my $map = $self->cql_mapping) {
185 0           $field = lc $field;
186 0 0         $field =~ s/(?<=[^_])_(?=[^_])//g if $map->{strip_separating_underscores};
187 0   0       $map = $map->{indexes} || return;
188 0   0       $map = $map->{$field} || return;
189 0 0         $map->{sort} || return;
190 0 0 0       if (ref $map->{sort} && $map->{sort}{field}) {
    0          
    0          
191 0           $field = $map->{sort}{field};
192             } elsif (ref $map->{field}) {
193 0           $field = $map->{field}->[0];
194             } elsif ($map->{field}) {
195 0           $field = $map->{field};
196             }
197             }
198 0   0       $asc //= 1;
199 0 0         +{ $field => $asc ? 'asc' : 'desc' };
200             }
201              
202             sub translate_cql_query {
203 0     0 0   my ($self, $query) = @_;
204 0           Catmandu::Store::ElasticSearch::CQL->new(mapping => $self->cql_mapping)->parse($query);
205             }
206              
207             sub normalize_query {
208 0     0 0   my ($self, $query) = @_;
209 0 0         if (ref $query) {
    0          
210 0           $query;
211             } elsif ($query) {
212 0           {query_string => {query => $query}};
213             } else {
214 0           {match_all => {}};
215             }
216             }
217              
218             =head1 SEE ALSO
219              
220             L, L
221              
222             =cut
223              
224             1;