| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Elasticsearch::Bulk; |
|
2
|
|
|
|
|
|
|
$Elasticsearch::Bulk::VERSION = '1.05'; |
|
3
|
4
|
|
|
4
|
|
235194
|
use Moo; |
|
|
4
|
|
|
|
|
82944
|
|
|
|
4
|
|
|
|
|
27
|
|
|
4
|
|
|
|
|
|
|
with 'Elasticsearch::Role::Bulk', 'Elasticsearch::Role::Is_Sync'; |
|
5
|
4
|
|
|
4
|
|
9312
|
use Elasticsearch::Util qw(parse_params throw); |
|
|
4
|
|
|
|
|
14
|
|
|
|
4
|
|
|
|
|
34
|
|
|
6
|
4
|
|
|
4
|
|
4589
|
use Try::Tiny; |
|
|
4
|
|
|
|
|
4130
|
|
|
|
4
|
|
|
|
|
250
|
|
|
7
|
4
|
|
|
4
|
|
3229
|
use namespace::clean; |
|
|
4
|
|
|
|
|
41412
|
|
|
|
4
|
|
|
|
|
32
|
|
|
8
|
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
#=================================== |
|
10
|
|
|
|
|
|
|
sub add_action { |
|
11
|
|
|
|
|
|
|
#=================================== |
|
12
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
13
|
0
|
|
|
|
|
|
my $buffer = $self->_buffer; |
|
14
|
0
|
|
|
|
|
|
my $max_size = $self->max_size; |
|
15
|
0
|
|
|
|
|
|
my $max_count = $self->max_count; |
|
16
|
|
|
|
|
|
|
|
|
17
|
0
|
|
|
|
|
|
while (@_) { |
|
18
|
0
|
|
|
|
|
|
my @json = $self->_encode_action( splice( @_, 0, 2 ) ); |
|
19
|
|
|
|
|
|
|
|
|
20
|
0
|
|
|
|
|
|
push @$buffer, @json; |
|
21
|
|
|
|
|
|
|
|
|
22
|
0
|
|
|
|
|
|
my $size = $self->_buffer_size; |
|
23
|
0
|
|
|
|
|
|
$size += length($_) + 1 for @json; |
|
24
|
0
|
|
|
|
|
|
$self->_buffer_size($size); |
|
25
|
|
|
|
|
|
|
|
|
26
|
0
|
|
|
|
|
|
my $count = $self->_buffer_count( $self->_buffer_count + 1 ); |
|
27
|
|
|
|
|
|
|
|
|
28
|
0
|
0
|
0
|
|
|
|
$self->flush |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
29
|
|
|
|
|
|
|
if ( $max_size && $size >= $max_size ) |
|
30
|
|
|
|
|
|
|
|| $max_count && $count >= $max_count; |
|
31
|
|
|
|
|
|
|
} |
|
32
|
0
|
|
|
|
|
|
return 1; |
|
33
|
|
|
|
|
|
|
} |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
#=================================== |
|
36
|
|
|
|
|
|
|
sub flush { |
|
37
|
|
|
|
|
|
|
#=================================== |
|
38
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
39
|
|
|
|
|
|
|
|
|
40
|
0
|
0
|
|
|
|
|
return { items => [] } |
|
41
|
|
|
|
|
|
|
unless $self->_buffer_size; |
|
42
|
|
|
|
|
|
|
|
|
43
|
0
|
0
|
|
|
|
|
if ( $self->verbose ) { |
|
44
|
0
|
|
|
|
|
|
local $| = 1; |
|
45
|
0
|
|
|
|
|
|
print "."; |
|
46
|
|
|
|
|
|
|
} |
|
47
|
0
|
|
|
|
|
|
my $buffer = $self->_buffer; |
|
48
|
|
|
|
|
|
|
my $results = try { |
|
49
|
0
|
|
|
0
|
|
|
$self->es->bulk( %{ $self->_bulk_args }, body => $buffer ); |
|
|
0
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
} |
|
51
|
|
|
|
|
|
|
catch { |
|
52
|
0
|
|
|
0
|
|
|
my $error = $_; |
|
53
|
0
|
0
|
0
|
|
|
|
$self->clear_buffer |
|
54
|
|
|
|
|
|
|
if $error->is('Request') |
|
55
|
|
|
|
|
|
|
and not $error->is('Unavailable'); |
|
56
|
|
|
|
|
|
|
|
|
57
|
0
|
|
|
|
|
|
die $error; |
|
58
|
0
|
|
|
|
|
|
}; |
|
59
|
0
|
|
|
|
|
|
$self->clear_buffer; |
|
60
|
0
|
|
|
|
|
|
$self->_report( $buffer, $results ); |
|
61
|
0
|
0
|
|
|
|
|
return defined wantarray ? $results : undef; |
|
62
|
|
|
|
|
|
|
} |
|
63
|
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
#=================================== |
|
65
|
|
|
|
|
|
|
sub reindex { |
|
66
|
|
|
|
|
|
|
#=================================== |
|
67
|
0
|
|
|
0
|
1
|
|
my ( $self, $params ) = parse_params(@_); |
|
68
|
0
|
0
|
|
|
|
|
my $src = $params->{source} |
|
69
|
|
|
|
|
|
|
or throw( 'Param', "Missing required param <source>" ); |
|
70
|
0
|
|
|
|
|
|
$src = {%$src}; |
|
71
|
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
|
my $transform = $self->_doc_transformer($params); |
|
73
|
|
|
|
|
|
|
|
|
74
|
0
|
0
|
|
|
|
|
if ( ref $src eq 'HASH' ) { |
|
75
|
0
|
|
0
|
|
|
|
my $es = delete $src->{es} || $self->es; |
|
76
|
0
|
|
|
|
|
|
my $scroll = $es->scroll_helper( |
|
77
|
|
|
|
|
|
|
search_type => 'scan', |
|
78
|
|
|
|
|
|
|
size => 500, |
|
79
|
|
|
|
|
|
|
%$src |
|
80
|
|
|
|
|
|
|
); |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
$src = sub { |
|
83
|
0
|
|
|
0
|
|
|
$scroll->refill_buffer; |
|
84
|
0
|
|
|
|
|
|
$scroll->drain_buffer; |
|
85
|
0
|
|
|
|
|
|
}; |
|
86
|
|
|
|
|
|
|
|
|
87
|
0
|
0
|
|
|
|
|
print "Reindexing " . $scroll->total . " docs\n" |
|
88
|
|
|
|
|
|
|
if $self->verbose; |
|
89
|
|
|
|
|
|
|
} |
|
90
|
|
|
|
|
|
|
|
|
91
|
0
|
|
|
|
|
|
while ( my @docs = $src->() ) { |
|
92
|
0
|
|
|
|
|
|
$self->index( grep {$_} map { $transform->($_) } @docs ); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
} |
|
94
|
0
|
|
|
|
|
|
$self->flush; |
|
95
|
0
|
|
|
|
|
|
return 1; |
|
96
|
|
|
|
|
|
|
} |
|
97
|
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
1; |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=pod |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
=encoding UTF-8 |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
=head1 NAME |
|
105
|
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
Elasticsearch::Bulk - A helper module for the Bulk API and for reindexing |
|
107
|
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=head1 VERSION |
|
109
|
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
version 1.05 |
|
111
|
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
113
|
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
use Elasticsearch; |
|
115
|
|
|
|
|
|
|
use Elasticsearch::Bulk; |
|
116
|
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
my $es = Elasticsearch->new; |
|
118
|
|
|
|
|
|
|
my $bulk = Elasticsearch::Bulk->new( |
|
119
|
|
|
|
|
|
|
es => $es, |
|
120
|
|
|
|
|
|
|
index => 'my_index', |
|
121
|
|
|
|
|
|
|
type => 'my_type' |
|
122
|
|
|
|
|
|
|
); |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# Index docs: |
|
125
|
|
|
|
|
|
|
$bulk->index({ id => 1, source => { foo => 'bar' }}); |
|
126
|
|
|
|
|
|
|
$bulk->add_action( index => { id => 1, source => { foo=> 'bar' }}); |
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
# Create docs: |
|
129
|
|
|
|
|
|
|
$bulk->create({ id => 1, source => { foo => 'bar' }}); |
|
130
|
|
|
|
|
|
|
$bulk->add_action( create => { id => 1, source => { foo=> 'bar' }}); |
|
131
|
|
|
|
|
|
|
$bulk->create_docs({ foo => 'bar' }) |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
# Delete docs: |
|
134
|
|
|
|
|
|
|
$bulk->delete({ id => 1}); |
|
135
|
|
|
|
|
|
|
$bulk->add_action( delete => { id => 1 }); |
|
136
|
|
|
|
|
|
|
$bulk->delete_ids(1,2,3) |
|
137
|
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
# Update docs: |
|
139
|
|
|
|
|
|
|
$bulk->update({ id => 1, script => '...' }); |
|
140
|
|
|
|
|
|
|
$bulk->add_action( update => { id => 1, script => '...' }); |
|
141
|
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
# Manual flush |
|
143
|
|
|
|
|
|
|
$bulk->flush |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Reindex docs: |
|
146
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
147
|
|
|
|
|
|
|
es => $es, |
|
148
|
|
|
|
|
|
|
index => 'new_index', |
|
149
|
|
|
|
|
|
|
verbose => 1 |
|
150
|
|
|
|
|
|
|
); |
|
151
|
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
$bulk->reindex( source => { index => 'old_index' }); |
|
153
|
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
155
|
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
This module provides a wrapper for the L<Elasticsearch::Client::Direct/bulk()> |
|
157
|
|
|
|
|
|
|
method which makes it easier to run multiple create, index, update or delete |
|
158
|
|
|
|
|
|
|
actions in a single request. It also provides a simple interface |
|
159
|
|
|
|
|
|
|
for L<reindexing documents|/REINDEXING DOCUMENTS>. |
|
160
|
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
The L<Elasticsearch::Bulk> module acts as a queue, buffering up actions |
|
162
|
|
|
|
|
|
|
until it reaches a maximum count of actions, or a maximum size of JSON request |
|
163
|
|
|
|
|
|
|
body, at which point it issues a C<bulk()> request. |
|
164
|
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Once you have finished adding actions, call L</flush()> to force the final |
|
166
|
|
|
|
|
|
|
C<bulk()> request on the items left in the queue. |
|
167
|
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
This class does L<Elasticsearch::Role::Bulk> and |
|
169
|
|
|
|
|
|
|
L<Elasticsearch::Role::Is_Sync>. |
|
170
|
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=head1 CREATING A NEW INSTANCE |
|
172
|
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
=head2 C<new()> |
|
174
|
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
176
|
|
|
|
|
|
|
es => $es, # required |
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
index => 'default_index', # optional |
|
179
|
|
|
|
|
|
|
type => 'default_type', # optional |
|
180
|
|
|
|
|
|
|
%other_bulk_params # optional |
|
181
|
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
max_count => 1_000, # optional |
|
183
|
|
|
|
|
|
|
max_size => 1_000_000, # optional |
|
184
|
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
verbose => 0 | 1, # optional |
|
186
|
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
on_success => sub {...}, # optional |
|
188
|
|
|
|
|
|
|
on_error => sub {...}, # optional |
|
189
|
|
|
|
|
|
|
on_conflict => sub {...}, # optional |
|
190
|
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
); |
|
193
|
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
The C<new()> method returns a new C<$bulk> object. You must pass your |
|
195
|
|
|
|
|
|
|
Elasticsearch client as the C<es> argument. |
|
196
|
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
The C<index> and C<type> parameters provide default values for |
|
198
|
|
|
|
|
|
|
C<index> and C<type>, which can be overridden in each action. |
|
199
|
|
|
|
|
|
|
You can also pass any other values which are accepted |
|
200
|
|
|
|
|
|
|
by the L<bulk()|Elasticsearch::Client::Direct/bulk()> method. |
|
201
|
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
See L</flush()> for more information about the other parameters. |
|
203
|
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=head1 FLUSHING THE BUFFER |
|
205
|
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=head2 C<flush()> |
|
207
|
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
$result = $bulk->flush; |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
The C<flush()> method sends all buffered actions to Elasticsearch using |
|
211
|
|
|
|
|
|
|
a L<bulk()|Elasticsearch::Client::Direct/bulk()> request. |
|
212
|
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=head2 Auto-flushing |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
An automatic L</flush()> is triggered whenever the C<max_count> or C<max_size> |
|
216
|
|
|
|
|
|
|
threshold is breached. This causes all actions in the buffer to be |
|
217
|
|
|
|
|
|
|
sent to Elasticsearch. |
|
218
|
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=over |
|
220
|
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
=item * C<max_count> |
|
222
|
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
The maximum number of actions to allow before triggering a L</flush()>. |
|
224
|
|
|
|
|
|
|
This can be disabled by setting C<max_count> to C<0>. Defaults to |
|
225
|
|
|
|
|
|
|
C<1,000>. |
|
226
|
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=item * C<max_size> |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
The maximum size of JSON request body to allow before triggering a |
|
230
|
|
|
|
|
|
|
L</flush()>. This can be disabled by setting C<max_size> to C<0>. Defaults |
|
231
|
|
|
|
|
|
|
to C<1_000,000> bytes. |
|
232
|
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
=back |
|
234
|
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=head2 Errors when flushing |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
There are three levels of error which can be thrown when L</flush()> |
|
238
|
|
|
|
|
|
|
is called, either manually or automatically. |
|
239
|
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=over |
|
241
|
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
=item * Temporary Elasticsearch errors |
|
243
|
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
For instance, a C<NoNodes> error which indicates that your cluster is down. |
|
245
|
|
|
|
|
|
|
These errors do not clear the buffer, as they can be retried later on. |
|
246
|
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
=item * Request errors |
|
248
|
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
For instance, if one of your actions is malformed (eg you are missing |
|
250
|
|
|
|
|
|
|
a required parameter like C<index>) then the whole L</flush()> request is |
|
251
|
|
|
|
|
|
|
aborted and the buffer is cleared of all actions. |
|
252
|
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=item * Action errors |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
Individual actions may fail. For instance, a C<create> action will fail |
|
256
|
|
|
|
|
|
|
if a document with the same C<index>, C<type> and C<id> already exists. |
|
257
|
|
|
|
|
|
|
These action errors are reported via L<callbacks|/Using callbacks>. |
|
258
|
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=back |
|
260
|
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=head2 Using callbacks |
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
By default, any I<Action errors> (see above) cause warnings to be |
|
264
|
|
|
|
|
|
|
written to C<STDERR>. However, you can use the C<on_error>, C<on_conflict> |
|
265
|
|
|
|
|
|
|
and C<on_success> callbacks for more fine-grained control. |
|
266
|
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
All callbacks receive the following arguments: |
|
268
|
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
=over |
|
270
|
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=item C<$action> |
|
272
|
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
The name of the action, ie C<index>, C<create>, C<update> or C<delete>. |
|
274
|
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
=item C<$response> |
|
276
|
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
The response that Elasticsearch returned for this action. |
|
278
|
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
=item C<$i> |
|
280
|
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
The index of the action, ie the first action in the flush request |
|
282
|
|
|
|
|
|
|
will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc. |
|
283
|
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
=back |
|
285
|
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=head3 C<on_success> |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
$bulk = Elasticsearch->new( |
|
289
|
|
|
|
|
|
|
es => $es, |
|
290
|
|
|
|
|
|
|
on_success => sub { |
|
291
|
|
|
|
|
|
|
my ($action,$response,$i) = @_; |
|
292
|
|
|
|
|
|
|
# do something |
|
293
|
|
|
|
|
|
|
}, |
|
294
|
|
|
|
|
|
|
); |
|
295
|
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
The C<on_success> callback is called for every action that has a successful |
|
297
|
|
|
|
|
|
|
response. |
|
298
|
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
=head3 C<on_conflict> |
|
300
|
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
302
|
|
|
|
|
|
|
es => $es, |
|
303
|
|
|
|
|
|
|
on_conflict => sub { |
|
304
|
|
|
|
|
|
|
my ($action,$response,$i,$version) = @_; |
|
305
|
|
|
|
|
|
|
# do something |
|
306
|
|
|
|
|
|
|
}, |
|
307
|
|
|
|
|
|
|
); |
|
308
|
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
The C<on_conflict> callback is called for actions that have triggered |
|
310
|
|
|
|
|
|
|
a C<Conflict> error, eg trying to C<create> a document which already |
|
311
|
|
|
|
|
|
|
exists. The C<$version> argument will contain the version number |
|
312
|
|
|
|
|
|
|
of the document currently stored in Elasticsearch (if found). |
|
313
|
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
=head3 C<on_error> |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
317
|
|
|
|
|
|
|
es => $es, |
|
318
|
|
|
|
|
|
|
on_error => sub { |
|
319
|
|
|
|
|
|
|
my ($action,$response,$i) = @_; |
|
320
|
|
|
|
|
|
|
# do something |
|
321
|
|
|
|
|
|
|
}, |
|
322
|
|
|
|
|
|
|
); |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
The C<on_error> callback is called for any error (unless the C<on_conflict>) |
|
325
|
|
|
|
|
|
|
callback has already been called). |
|
326
|
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
=head2 Disabling callbacks and autoflush |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
If you want to be in control of flushing, and you just want to receive |
|
330
|
|
|
|
|
|
|
the raw response that Elasticsearch sends instead of using callbacks, |
|
331
|
|
|
|
|
|
|
then you can do so as follows: |
|
332
|
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
334
|
|
|
|
|
|
|
es => $es, |
|
335
|
|
|
|
|
|
|
max_count => 0, |
|
336
|
|
|
|
|
|
|
max_size => 0, |
|
337
|
|
|
|
|
|
|
on_error => undef |
|
338
|
|
|
|
|
|
|
); |
|
339
|
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
$bulk->add_actions(....); |
|
341
|
|
|
|
|
|
|
$response = $bulk->flush; |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
=head1 CREATE, INDEX, UPDATE, DELETE |
|
344
|
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
=head2 C<add_action()> |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
$bulk->add_action( |
|
348
|
|
|
|
|
|
|
create => { ...params... }, |
|
349
|
|
|
|
|
|
|
index => { ...params... }, |
|
350
|
|
|
|
|
|
|
update => { ...params... }, |
|
351
|
|
|
|
|
|
|
delete => { ...params... } |
|
352
|
|
|
|
|
|
|
); |
|
353
|
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
The C<add_action()> method allows you to add multiple C<create>, C<index>, |
|
355
|
|
|
|
|
|
|
C<update> and C<delete> actions to the queue. The first value is the action |
|
356
|
|
|
|
|
|
|
type, and the second value is the parameters that describe that action. |
|
357
|
|
|
|
|
|
|
See the individual helper methods below for details. |
|
358
|
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
B<Note:> Parameters like C<index> or C<type> can be specified as C<index> or as |
|
360
|
|
|
|
|
|
|
C<_index>, so the following two lines are equivalent: |
|
361
|
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
index => { index => 'index', type => 'type', id => 1, source => {...}}, |
|
363
|
|
|
|
|
|
|
index => { _index => 'index', _type => 'type', _id => 1, _source => {...}}, |
|
364
|
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
B<Note:> The C<index> and C<type> parameters can be specified in the |
|
366
|
|
|
|
|
|
|
params for any action, but if not specified, will default to the C<index> |
|
367
|
|
|
|
|
|
|
and C<type> values specified in L</new()>. These are required parameters: |
|
368
|
|
|
|
|
|
|
they must be specified either in L</new()> or in every action. |
|
369
|
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
=head2 C<create()> |
|
371
|
|
|
|
|
|
|
|
|
372
|
|
|
|
|
|
|
$bulk->create( |
|
373
|
|
|
|
|
|
|
{ index => 'custom_index', source => { doc body }}, |
|
374
|
|
|
|
|
|
|
{ type => 'custom_type', id => 1, source => { doc body }}, |
|
375
|
|
|
|
|
|
|
... |
|
376
|
|
|
|
|
|
|
); |
|
377
|
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
The C<create()> helper method allows you to add multiple C<create> actions. |
|
379
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/create()> |
|
380
|
|
|
|
|
|
|
except that the document body should be passed as the C<source> or C<_source> |
|
381
|
|
|
|
|
|
|
parameter, instead of as C<body>. |
|
382
|
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=head2 C<create_docs()> |
|
384
|
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
$bulk->create_docs( |
|
386
|
|
|
|
|
|
|
{ doc body }, |
|
387
|
|
|
|
|
|
|
{ doc body }, |
|
388
|
|
|
|
|
|
|
... |
|
389
|
|
|
|
|
|
|
); |
|
390
|
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
The C<create_docs()> helper is a shorter form of L</create()> which can be used |
|
392
|
|
|
|
|
|
|
when you are using the default C<index> and C<type> as set in L</new()> |
|
393
|
|
|
|
|
|
|
and you are not specifying a custom C<id> per document. In this case, |
|
394
|
|
|
|
|
|
|
you can just pass the individual document bodies. |
|
395
|
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=head2 C<index()> |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
$bulk->index( |
|
399
|
|
|
|
|
|
|
{ index => 'custom_index', source => { doc body }}, |
|
400
|
|
|
|
|
|
|
{ type => 'custom_type', id => 1, source => { doc body }}, |
|
401
|
|
|
|
|
|
|
... |
|
402
|
|
|
|
|
|
|
); |
|
403
|
|
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
The C<index()> helper method allows you to add multiple C<index> actions. |
|
405
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/index()> |
|
406
|
|
|
|
|
|
|
except that the document body should be passed as the C<source> or C<_source> |
|
407
|
|
|
|
|
|
|
parameter, instead of as C<body>. |
|
408
|
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
=head2 C<delete()> |
|
410
|
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
$bulk->delete( |
|
412
|
|
|
|
|
|
|
{ index => 'custom_index', id => 1}, |
|
413
|
|
|
|
|
|
|
{ type => 'custom_type', id => 2}, |
|
414
|
|
|
|
|
|
|
... |
|
415
|
|
|
|
|
|
|
); |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
The C<delete()> helper method allows you to add multiple C<delete> actions. |
|
418
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/delete()>. |
|
419
|
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
=head2 C<delete_ids()> |
|
421
|
|
|
|
|
|
|
|
|
422
|
|
|
|
|
|
|
$bulk->delete_ids(1,2,3...) |
|
423
|
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
The C<delete_ids()> helper method can be used when all of the documents you |
|
425
|
|
|
|
|
|
|
want to delete have the default C<index> and C<type> as set in L</new()>. |
|
426
|
|
|
|
|
|
|
In this case, all you have to do is to pass in a list of IDs. |
|
427
|
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
=head2 C<update()> |
|
429
|
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
$bulk->update( |
|
431
|
|
|
|
|
|
|
{ id => 1, |
|
432
|
|
|
|
|
|
|
doc => { partial doc }, |
|
433
|
|
|
|
|
|
|
doc_as_upsert => 1 |
|
434
|
|
|
|
|
|
|
}, |
|
435
|
|
|
|
|
|
|
{ id => 2, |
|
436
|
|
|
|
|
|
|
lang => 'mvel', |
|
437
|
|
|
|
|
|
|
script => '_ctx.source.counter+=incr', |
|
438
|
|
|
|
|
|
|
params => { incr => 1}, |
|
439
|
|
|
|
|
|
|
upsert => { upsert doc } |
|
440
|
|
|
|
|
|
|
}, |
|
441
|
|
|
|
|
|
|
... |
|
442
|
|
|
|
|
|
|
); |
|
443
|
|
|
|
|
|
|
|
|
444
|
|
|
|
|
|
|
The C<update()> helper method allows you to add multiple C<update> actions. |
|
445
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/update()>. |
|
446
|
|
|
|
|
|
|
An update can either use a I<partial doc> which gets merged with an existing |
|
447
|
|
|
|
|
|
|
doc (example 1 above), or can use a C<script> to update an existing doc |
|
448
|
|
|
|
|
|
|
(example 2 above). |
|
449
|
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
=head1 REINDEXING DOCUMENTS |
|
451
|
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
A common use case for bulk indexing is to reindex a whole index when |
|
453
|
|
|
|
|
|
|
changing the type mappings or analysis chain. This typically |
|
454
|
|
|
|
|
|
|
combines bulk indexing with L<scrolled searches|Elasticsearch::Scroll>: |
|
455
|
|
|
|
|
|
|
the scrolled search pulls all of the data from the source index, and |
|
456
|
|
|
|
|
|
|
the bulk indexer indexes the data into the new index. |
|
457
|
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
=head2 C<reindex()> |
|
459
|
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
$bulk->reindex( |
|
461
|
|
|
|
|
|
|
source => $source, # required |
|
462
|
|
|
|
|
|
|
transform => \&transform, # optional |
|
463
|
|
|
|
|
|
|
version_type => 'external|internal', # optional |
|
464
|
|
|
|
|
|
|
); |
|
465
|
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
The C<reindex()> method requires a C<$source> parameter, which provides |
|
467
|
|
|
|
|
|
|
the source for the documents which are to be reindexed. |
|
468
|
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
=head2 Reindexing from another index |
|
470
|
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
If the C<source> argument is a HASH ref, then the hash is passed to |
|
472
|
|
|
|
|
|
|
L<Elasticsearch::Scroll/new()> to create a new scrolled search. |
|
473
|
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
|
475
|
|
|
|
|
|
|
index => 'new_index', |
|
476
|
|
|
|
|
|
|
verbose => 1 |
|
477
|
|
|
|
|
|
|
); |
|
478
|
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
$bulk->reindex( |
|
480
|
|
|
|
|
|
|
source => { |
|
481
|
|
|
|
|
|
|
index => 'old_index', |
|
482
|
|
|
|
|
|
|
size => 500, # default |
|
483
|
|
|
|
|
|
|
search_type => 'scan' # default |
|
484
|
|
|
|
|
|
|
} |
|
485
|
|
|
|
|
|
|
); |
|
486
|
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
If a default C<index> or C<type> has been specified in the call to |
|
488
|
|
|
|
|
|
|
L</new()>, then it will replace the C<index> and C<type> values for |
|
489
|
|
|
|
|
|
|
the docs returned from the scrolled search. In the example above, |
|
490
|
|
|
|
|
|
|
all docs will be retrieved from C<"old_index"> and will be bulk indexed |
|
491
|
|
|
|
|
|
|
into C<"new_index">. |
|
492
|
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
=head2 Reindexing from a generic source |
|
494
|
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
The C<source> parameter also accepts a coderef or an anonymous sub, |
|
496
|
|
|
|
|
|
|
which should return one or more new documents every time it is executed. |
|
497
|
|
|
|
|
|
|
This allows you to pass any iterator, wrapped in an anonymous sub: |
|
498
|
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
my $iter = get_iterator_from_somewhere(); |
|
500
|
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
$bulk->reindex( |
|
502
|
|
|
|
|
|
|
source => sub { $iter->next } |
|
503
|
|
|
|
|
|
|
); |
|
504
|
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
=head2 Transforming docs on the fly |
|
506
|
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
The C<transform> parameter allows you to change documents on the fly, |
|
508
|
|
|
|
|
|
|
using a callback. The callback receives the document as the only argument, |
|
509
|
|
|
|
|
|
|
and should return the updated document, or C<undef> if the document should |
|
510
|
|
|
|
|
|
|
not be indexed: |
|
511
|
|
|
|
|
|
|
|
|
512
|
|
|
|
|
|
|
$bulk->reindex( |
|
513
|
|
|
|
|
|
|
source => { index => 'old_index' }, |
|
514
|
|
|
|
|
|
|
transform => sub { |
|
515
|
|
|
|
|
|
|
my $doc = shift; |
|
516
|
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# don't index doc marked as valid:false |
|
518
|
|
|
|
|
|
|
return undef unless $doc->{_source}{valid}; |
|
519
|
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
# convert $tag to @tags |
|
521
|
|
|
|
|
|
|
$doc->{_source}{tags} = [ delete $doc->{_source}{tag}]; |
|
522
|
|
|
|
|
|
|
return $doc |
|
523
|
|
|
|
|
|
|
} |
|
524
|
|
|
|
|
|
|
); |
|
525
|
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=head2 Reindexing from another cluster |
|
527
|
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
By default, L</reindex()> expects the source and destination indices |
|
529
|
|
|
|
|
|
|
to be in the same cluster. To pull data from one cluster and index it into |
|
530
|
|
|
|
|
|
|
another, you can use two separate C<$es> objects: |
|
531
|
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
$es_target = Elasticsearch->new( nodes => 'localhost:9200' ); |
|
533
|
|
|
|
|
|
|
$es_source = Elasticsearch->new( nodes => 'search1:9200' ); |
|
534
|
|
|
|
|
|
|
|
|
535
|
|
|
|
|
|
|
Elasticsearch::Bulk->new( |
|
536
|
|
|
|
|
|
|
es => $es_target, |
|
537
|
|
|
|
|
|
|
verbose => 1 |
|
538
|
|
|
|
|
|
|
) |
|
539
|
|
|
|
|
|
|
-> reindex( |
|
540
|
|
|
|
|
|
|
source => { |
|
541
|
|
|
|
|
|
|
es => $es_source, |
|
542
|
|
|
|
|
|
|
index => 'my_index' |
|
543
|
|
|
|
|
|
|
} |
|
544
|
|
|
|
|
|
|
); |
|
545
|
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
=head2 Parents and routing |
|
547
|
|
|
|
|
|
|
|
|
548
|
|
|
|
|
|
|
If you are using parent-child relationships or custom C<routing> values, |
|
549
|
|
|
|
|
|
|
and you want to preserve these when you reindex your documents, then |
|
550
|
|
|
|
|
|
|
you will need to request these values specifically, as follows: |
|
551
|
|
|
|
|
|
|
|
|
552
|
|
|
|
|
|
|
$bulk->reindex( |
|
553
|
|
|
|
|
|
|
source => { |
|
554
|
|
|
|
|
|
|
index => 'old_index', |
|
555
|
|
|
|
|
|
|
fields => ['_source','_parent','_routing'] |
|
556
|
|
|
|
|
|
|
} |
|
557
|
|
|
|
|
|
|
); |
|
558
|
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
=head2 Working with version numbers |
|
560
|
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
Every document in Elasticsearch has a current C<version> number, which |
|
562
|
|
|
|
|
|
|
is used for L<optimistic concurrency control|http://en.wikipedia.org/wiki/Optimistic_concurrency_control>, |
|
563
|
|
|
|
|
|
|
that is, to ensure that you don't overwrite changes that have been made |
|
564
|
|
|
|
|
|
|
by another process. |
|
565
|
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
All CRUD operations accept a C<version> parameter and a C<version_type> |
|
567
|
|
|
|
|
|
|
parameter which tells Elasticsearch that the change should only be made |
|
568
|
|
|
|
|
|
|
if the current document corresponds to these parameters. The |
|
569
|
|
|
|
|
|
|
C<version_type> parameter can have the following values: |
|
570
|
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
=over |
|
572
|
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
=item * C<internal> |
|
574
|
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
Use Elasticsearch version numbers. Documents are only changed if the |
|
576
|
|
|
|
|
|
|
document in Elasticsearch has the B<same> C<version> number that is |
|
577
|
|
|
|
|
|
|
specified in the CRUD operation. After the change, the new |
|
578
|
|
|
|
|
|
|
version number is C<version+1>. |
|
579
|
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
=item * C<external> |
|
581
|
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
Use an external versioning system, such as timestamps or version numbers |
|
583
|
|
|
|
|
|
|
from an external database. Documents are only changed if the document |
|
584
|
|
|
|
|
|
|
in Elasticsearch has a B<lower> C<version> number than the one |
|
585
|
|
|
|
|
|
|
specified in the CRUD operation. After the change, the new version |
|
586
|
|
|
|
|
|
|
number is C<version>. |
|
587
|
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
=back |
|
589
|
|
|
|
|
|
|
|
|
590
|
|
|
|
|
|
|
If you would like to reindex documents from one index to another, preserving |
|
591
|
|
|
|
|
|
|
the C<version> numbers from the original index, then you need the following: |
|
592
|
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
$bulk->reindex( |
|
594
|
|
|
|
|
|
|
source => { |
|
595
|
|
|
|
|
|
|
index => 'old_index', |
|
596
|
|
|
|
|
|
|
version => 1, # retrieve version numbers in search |
|
597
|
|
|
|
|
|
|
}, |
|
598
|
|
|
|
|
|
|
version_type => 'external' # use these "external" version numbers |
|
599
|
|
|
|
|
|
|
); |
|
600
|
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
=head1 AUTHOR |
|
602
|
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
Clinton Gormley <drtech@cpan.org> |
|
604
|
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
|
606
|
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
This software is Copyright (c) 2014 by Elasticsearch BV. |
|
608
|
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
This is free software, licensed under: |
|
610
|
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
The Apache License, Version 2.0, January 2004 |
|
612
|
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
=cut |
|
614
|
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
__END__ |
|
616
|
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
# ABSTRACT: A helper module for the Bulk API and for reindexing |
|
618
|
|
|
|
|
|
|
|