line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Elasticsearch::Bulk; |
2
|
|
|
|
|
|
|
$Elasticsearch::Bulk::VERSION = '1.05'; |
3
|
4
|
|
|
4
|
|
235194
|
use Moo; |
|
4
|
|
|
|
|
82944
|
|
|
4
|
|
|
|
|
27
|
|
4
|
|
|
|
|
|
|
with 'Elasticsearch::Role::Bulk', 'Elasticsearch::Role::Is_Sync'; |
5
|
4
|
|
|
4
|
|
9312
|
use Elasticsearch::Util qw(parse_params throw); |
|
4
|
|
|
|
|
14
|
|
|
4
|
|
|
|
|
34
|
|
6
|
4
|
|
|
4
|
|
4589
|
use Try::Tiny; |
|
4
|
|
|
|
|
4130
|
|
|
4
|
|
|
|
|
250
|
|
7
|
4
|
|
|
4
|
|
3229
|
use namespace::clean; |
|
4
|
|
|
|
|
41412
|
|
|
4
|
|
|
|
|
32
|
|
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
#=================================== |
10
|
|
|
|
|
|
|
sub add_action { |
11
|
|
|
|
|
|
|
#=================================== |
12
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
13
|
0
|
|
|
|
|
|
my $buffer = $self->_buffer; |
14
|
0
|
|
|
|
|
|
my $max_size = $self->max_size; |
15
|
0
|
|
|
|
|
|
my $max_count = $self->max_count; |
16
|
|
|
|
|
|
|
|
17
|
0
|
|
|
|
|
|
while (@_) { |
18
|
0
|
|
|
|
|
|
my @json = $self->_encode_action( splice( @_, 0, 2 ) ); |
19
|
|
|
|
|
|
|
|
20
|
0
|
|
|
|
|
|
push @$buffer, @json; |
21
|
|
|
|
|
|
|
|
22
|
0
|
|
|
|
|
|
my $size = $self->_buffer_size; |
23
|
0
|
|
|
|
|
|
$size += length($_) + 1 for @json; |
24
|
0
|
|
|
|
|
|
$self->_buffer_size($size); |
25
|
|
|
|
|
|
|
|
26
|
0
|
|
|
|
|
|
my $count = $self->_buffer_count( $self->_buffer_count + 1 ); |
27
|
|
|
|
|
|
|
|
28
|
0
|
0
|
0
|
|
|
|
$self->flush |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
29
|
|
|
|
|
|
|
if ( $max_size && $size >= $max_size ) |
30
|
|
|
|
|
|
|
|| $max_count && $count >= $max_count; |
31
|
|
|
|
|
|
|
} |
32
|
0
|
|
|
|
|
|
return 1; |
33
|
|
|
|
|
|
|
} |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
#=================================== |
36
|
|
|
|
|
|
|
sub flush { |
37
|
|
|
|
|
|
|
#=================================== |
38
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
39
|
|
|
|
|
|
|
|
40
|
0
|
0
|
|
|
|
|
return { items => [] } |
41
|
|
|
|
|
|
|
unless $self->_buffer_size; |
42
|
|
|
|
|
|
|
|
43
|
0
|
0
|
|
|
|
|
if ( $self->verbose ) { |
44
|
0
|
|
|
|
|
|
local $| = 1; |
45
|
0
|
|
|
|
|
|
print "."; |
46
|
|
|
|
|
|
|
} |
47
|
0
|
|
|
|
|
|
my $buffer = $self->_buffer; |
48
|
|
|
|
|
|
|
my $results = try { |
49
|
0
|
|
|
0
|
|
|
$self->es->bulk( %{ $self->_bulk_args }, body => $buffer ); |
|
0
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
} |
51
|
|
|
|
|
|
|
catch { |
52
|
0
|
|
|
0
|
|
|
my $error = $_; |
53
|
0
|
0
|
0
|
|
|
|
$self->clear_buffer |
54
|
|
|
|
|
|
|
if $error->is('Request') |
55
|
|
|
|
|
|
|
and not $error->is('Unavailable'); |
56
|
|
|
|
|
|
|
|
57
|
0
|
|
|
|
|
|
die $error; |
58
|
0
|
|
|
|
|
|
}; |
59
|
0
|
|
|
|
|
|
$self->clear_buffer; |
60
|
0
|
|
|
|
|
|
$self->_report( $buffer, $results ); |
61
|
0
|
0
|
|
|
|
|
return defined wantarray ? $results : undef; |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
#=================================== |
65
|
|
|
|
|
|
|
sub reindex { |
66
|
|
|
|
|
|
|
#=================================== |
67
|
0
|
|
|
0
|
1
|
|
my ( $self, $params ) = parse_params(@_); |
68
|
0
|
0
|
|
|
|
|
my $src = $params->{source} |
69
|
|
|
|
|
|
|
or throw( 'Param', "Missing required param <source>" ); |
70
|
0
|
|
|
|
|
|
$src = {%$src}; |
71
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
|
my $transform = $self->_doc_transformer($params); |
73
|
|
|
|
|
|
|
|
74
|
0
|
0
|
|
|
|
|
if ( ref $src eq 'HASH' ) { |
75
|
0
|
|
0
|
|
|
|
my $es = delete $src->{es} || $self->es; |
76
|
0
|
|
|
|
|
|
my $scroll = $es->scroll_helper( |
77
|
|
|
|
|
|
|
search_type => 'scan', |
78
|
|
|
|
|
|
|
size => 500, |
79
|
|
|
|
|
|
|
%$src |
80
|
|
|
|
|
|
|
); |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
$src = sub { |
83
|
0
|
|
|
0
|
|
|
$scroll->refill_buffer; |
84
|
0
|
|
|
|
|
|
$scroll->drain_buffer; |
85
|
0
|
|
|
|
|
|
}; |
86
|
|
|
|
|
|
|
|
87
|
0
|
0
|
|
|
|
|
print "Reindexing " . $scroll->total . " docs\n" |
88
|
|
|
|
|
|
|
if $self->verbose; |
89
|
|
|
|
|
|
|
} |
90
|
|
|
|
|
|
|
|
91
|
0
|
|
|
|
|
|
while ( my @docs = $src->() ) { |
92
|
0
|
|
|
|
|
|
$self->index( grep {$_} map { $transform->($_) } @docs ); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
} |
94
|
0
|
|
|
|
|
|
$self->flush; |
95
|
0
|
|
|
|
|
|
return 1; |
96
|
|
|
|
|
|
|
} |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
1; |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=pod |
101
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
=encoding UTF-8 |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
=head1 NAME |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
Elasticsearch::Bulk - A helper module for the Bulk API and for reindexing |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=head1 VERSION |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
version 1.05 |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=head1 SYNOPSIS |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
use Elasticsearch; |
115
|
|
|
|
|
|
|
use Elasticsearch::Bulk; |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
my $es = Elasticsearch->new; |
118
|
|
|
|
|
|
|
my $bulk = Elasticsearch::Bulk->new( |
119
|
|
|
|
|
|
|
es => $es, |
120
|
|
|
|
|
|
|
index => 'my_index', |
121
|
|
|
|
|
|
|
type => 'my_type' |
122
|
|
|
|
|
|
|
); |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
# Index docs: |
125
|
|
|
|
|
|
|
$bulk->index({ id => 1, source => { foo => 'bar' }}); |
126
|
|
|
|
|
|
|
$bulk->add_action( index => { id => 1, source => { foo=> 'bar' }}); |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
# Create docs: |
129
|
|
|
|
|
|
|
$bulk->create({ id => 1, source => { foo => 'bar' }}); |
130
|
|
|
|
|
|
|
$bulk->add_action( create => { id => 1, source => { foo=> 'bar' }}); |
131
|
|
|
|
|
|
|
$bulk->create_docs({ foo => 'bar' }) |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
# Delete docs: |
134
|
|
|
|
|
|
|
$bulk->delete({ id => 1}); |
135
|
|
|
|
|
|
|
$bulk->add_action( delete => { id => 1 }); |
136
|
|
|
|
|
|
|
$bulk->delete_ids(1,2,3) |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
# Update docs: |
139
|
|
|
|
|
|
|
$bulk->update({ id => 1, script => '...' }); |
140
|
|
|
|
|
|
|
$bulk->add_action( update => { id => 1, script => '...' }); |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
# Manual flush |
143
|
|
|
|
|
|
|
$bulk->flush |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# Reindex docs: |
146
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
147
|
|
|
|
|
|
|
es => $es, |
148
|
|
|
|
|
|
|
index => 'new_index', |
149
|
|
|
|
|
|
|
verbose => 1 |
150
|
|
|
|
|
|
|
); |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
$bulk->reindex( source => { index => 'old_index' }); |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=head1 DESCRIPTION |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
This module provides a wrapper for the L<Elasticsearch::Client::Direct/bulk()> |
157
|
|
|
|
|
|
|
method which makes it easier to run multiple create, index, update or delete |
158
|
|
|
|
|
|
|
actions in a single request. It also provides a simple interface |
159
|
|
|
|
|
|
|
for L<reindexing documents|/REINDEXING DOCUMENTS>. |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
The L<Elasticsearch::Bulk> module acts as a queue, buffering up actions |
162
|
|
|
|
|
|
|
until it reaches a maximum count of actions, or a maximum size of JSON request |
163
|
|
|
|
|
|
|
body, at which point it issues a C<bulk()> request. |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Once you have finished adding actions, call L</flush()> to force the final |
166
|
|
|
|
|
|
|
C<bulk()> request on the items left in the queue. |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
This class does L<Elasticsearch::Role::Bulk> and |
169
|
|
|
|
|
|
|
L<Elasticsearch::Role::Is_Sync>. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=head1 CREATING A NEW INSTANCE |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
=head2 C<new()> |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
176
|
|
|
|
|
|
|
es => $es, # required |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
index => 'default_index', # optional |
179
|
|
|
|
|
|
|
type => 'default_type', # optional |
180
|
|
|
|
|
|
|
%other_bulk_params # optional |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
max_count => 1_000, # optional |
183
|
|
|
|
|
|
|
max_size => 1_000_000, # optional |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
verbose => 0 | 1, # optional |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
on_success => sub {...}, # optional |
188
|
|
|
|
|
|
|
on_error => sub {...}, # optional |
189
|
|
|
|
|
|
|
on_conflict => sub {...}, # optional |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
); |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
The C<new()> method returns a new C<$bulk> object. You must pass your |
195
|
|
|
|
|
|
|
Elasticsearch client as the C<es> argument. |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
The C<index> and C<type> parameters provide default values for |
198
|
|
|
|
|
|
|
C<index> and C<type>, which can be overridden in each action. |
199
|
|
|
|
|
|
|
You can also pass any other values which are accepted |
200
|
|
|
|
|
|
|
by the L<bulk()|Elasticsearch::Client::Direct/bulk()> method. |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
See L</flush()> for more information about the other parameters. |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
=head1 FLUSHING THE BUFFER |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=head2 C<flush()> |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
$result = $bulk->flush; |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
The C<flush()> method sends all buffered actions to Elasticsearch using |
211
|
|
|
|
|
|
|
a L<bulk()|Elasticsearch::Client::Direct/bulk()> request. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=head2 Auto-flushing |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
An automatic L</flush()> is triggered whenever the C<max_count> or C<max_size> |
216
|
|
|
|
|
|
|
threshold is breached. This causes all actions in the buffer to be |
217
|
|
|
|
|
|
|
sent to Elasticsearch. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=over |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
=item * C<max_count> |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
The maximum number of actions to allow before triggering a L</flush()>. |
224
|
|
|
|
|
|
|
This can be disabled by setting C<max_count> to C<0>. Defaults to |
225
|
|
|
|
|
|
|
C<1,000>. |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=item * C<max_size> |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
The maximum size of JSON request body to allow before triggering a |
230
|
|
|
|
|
|
|
L</flush()>. This can be disabled by setting C<max_size> to C<0>. Defaults |
231
|
|
|
|
|
|
|
to C<1_000,000> bytes. |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
=back |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=head2 Errors when flushing |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
There are three levels of error which can be thrown when L</flush()> |
238
|
|
|
|
|
|
|
is called, either manually or automatically. |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
=over |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
=item * Temporary Elasticsearch errors |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
For instance, a C<NoNodes> error which indicates that your cluster is down. |
245
|
|
|
|
|
|
|
These errors do not clear the buffer, as they can be retried later on. |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
=item * Request errors |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
For instance, if one of your actions is malformed (eg you are missing |
250
|
|
|
|
|
|
|
a required parameter like C<index>) then the whole L</flush()> request is |
251
|
|
|
|
|
|
|
aborted and the buffer is cleared of all actions. |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=item * Action errors |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
Individual actions may fail. For instance, a C<create> action will fail |
256
|
|
|
|
|
|
|
if a document with the same C<index>, C<type> and C<id> already exists. |
257
|
|
|
|
|
|
|
These action errors are reported via L<callbacks|/Using callbacks>. |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=back |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=head2 Using callbacks |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
By default, any I<Action errors> (see above) cause warnings to be |
264
|
|
|
|
|
|
|
written to C<STDERR>. However, you can use the C<on_error>, C<on_conflict> |
265
|
|
|
|
|
|
|
and C<on_success> callbacks for more fine-grained control. |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
All callbacks receive the following arguments: |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
=over |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=item C<$action> |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
The name of the action, ie C<index>, C<create>, C<update> or C<delete>. |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
=item C<$response> |
276
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
The response that Elasticsearch returned for this action. |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
=item C<$i> |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
The index of the action, ie the first action in the flush request |
282
|
|
|
|
|
|
|
will have C<$i> set to C<0>, the second will have C<$i> set to C<1> etc. |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
=back |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=head3 C<on_success> |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
$bulk = Elasticsearch->new( |
289
|
|
|
|
|
|
|
es => $es, |
290
|
|
|
|
|
|
|
on_success => sub { |
291
|
|
|
|
|
|
|
my ($action,$response,$i) = @_; |
292
|
|
|
|
|
|
|
# do something |
293
|
|
|
|
|
|
|
}, |
294
|
|
|
|
|
|
|
); |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
The C<on_success> callback is called for every action that has a successful |
297
|
|
|
|
|
|
|
response. |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
=head3 C<on_conflict> |
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
302
|
|
|
|
|
|
|
es => $es, |
303
|
|
|
|
|
|
|
on_conflict => sub { |
304
|
|
|
|
|
|
|
my ($action,$response,$i,$version) = @_; |
305
|
|
|
|
|
|
|
# do something |
306
|
|
|
|
|
|
|
}, |
307
|
|
|
|
|
|
|
); |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
The C<on_conflict> callback is called for actions that have triggered |
310
|
|
|
|
|
|
|
a C<Conflict> error, eg trying to C<create> a document which already |
311
|
|
|
|
|
|
|
exists. The C<$version> argument will contain the version number |
312
|
|
|
|
|
|
|
of the document currently stored in Elasticsearch (if found). |
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
=head3 C<on_error> |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
317
|
|
|
|
|
|
|
es => $es, |
318
|
|
|
|
|
|
|
on_error => sub { |
319
|
|
|
|
|
|
|
my ($action,$response,$i) = @_; |
320
|
|
|
|
|
|
|
# do something |
321
|
|
|
|
|
|
|
}, |
322
|
|
|
|
|
|
|
); |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
The C<on_error> callback is called for any error (unless the C<on_conflict>) |
325
|
|
|
|
|
|
|
callback has already been called). |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
=head2 Disabling callbacks and autoflush |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
If you want to be in control of flushing, and you just want to receive |
330
|
|
|
|
|
|
|
the raw response that Elasticsearch sends instead of using callbacks, |
331
|
|
|
|
|
|
|
then you can do so as follows: |
332
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
334
|
|
|
|
|
|
|
es => $es, |
335
|
|
|
|
|
|
|
max_count => 0, |
336
|
|
|
|
|
|
|
max_size => 0, |
337
|
|
|
|
|
|
|
on_error => undef |
338
|
|
|
|
|
|
|
); |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
$bulk->add_actions(....); |
341
|
|
|
|
|
|
|
$response = $bulk->flush; |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
=head1 CREATE, INDEX, UPDATE, DELETE |
344
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
=head2 C<add_action()> |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
$bulk->add_action( |
348
|
|
|
|
|
|
|
create => { ...params... }, |
349
|
|
|
|
|
|
|
index => { ...params... }, |
350
|
|
|
|
|
|
|
update => { ...params... }, |
351
|
|
|
|
|
|
|
delete => { ...params... } |
352
|
|
|
|
|
|
|
); |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
The C<add_action()> method allows you to add multiple C<create>, C<index>, |
355
|
|
|
|
|
|
|
C<update> and C<delete> actions to the queue. The first value is the action |
356
|
|
|
|
|
|
|
type, and the second value is the parameters that describe that action. |
357
|
|
|
|
|
|
|
See the individual helper methods below for details. |
358
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
B<Note:> Parameters like C<index> or C<type> can be specified as C<index> or as |
360
|
|
|
|
|
|
|
C<_index>, so the following two lines are equivalent: |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
index => { index => 'index', type => 'type', id => 1, source => {...}}, |
363
|
|
|
|
|
|
|
index => { _index => 'index', _type => 'type', _id => 1, _source => {...}}, |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
B<Note:> The C<index> and C<type> parameters can be specified in the |
366
|
|
|
|
|
|
|
params for any action, but if not specified, will default to the C<index> |
367
|
|
|
|
|
|
|
and C<type> values specified in L</new()>. These are required parameters: |
368
|
|
|
|
|
|
|
they must be specified either in L</new()> or in every action. |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
=head2 C<create()> |
371
|
|
|
|
|
|
|
|
372
|
|
|
|
|
|
|
$bulk->create( |
373
|
|
|
|
|
|
|
{ index => 'custom_index', source => { doc body }}, |
374
|
|
|
|
|
|
|
{ type => 'custom_type', id => 1, source => { doc body }}, |
375
|
|
|
|
|
|
|
... |
376
|
|
|
|
|
|
|
); |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
The C<create()> helper method allows you to add multiple C<create> actions. |
379
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/create()> |
380
|
|
|
|
|
|
|
except that the document body should be passed as the C<source> or C<_source> |
381
|
|
|
|
|
|
|
parameter, instead of as C<body>. |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
=head2 C<create_docs()> |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
$bulk->create_docs( |
386
|
|
|
|
|
|
|
{ doc body }, |
387
|
|
|
|
|
|
|
{ doc body }, |
388
|
|
|
|
|
|
|
... |
389
|
|
|
|
|
|
|
); |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
The C<create_docs()> helper is a shorter form of L</create()> which can be used |
392
|
|
|
|
|
|
|
when you are using the default C<index> and C<type> as set in L</new()> |
393
|
|
|
|
|
|
|
and you are not specifying a custom C<id> per document. In this case, |
394
|
|
|
|
|
|
|
you can just pass the individual document bodies. |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=head2 C<index()> |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
$bulk->index( |
399
|
|
|
|
|
|
|
{ index => 'custom_index', source => { doc body }}, |
400
|
|
|
|
|
|
|
{ type => 'custom_type', id => 1, source => { doc body }}, |
401
|
|
|
|
|
|
|
... |
402
|
|
|
|
|
|
|
); |
403
|
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
The C<index()> helper method allows you to add multiple C<index> actions. |
405
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/index()> |
406
|
|
|
|
|
|
|
except that the document body should be passed as the C<source> or C<_source> |
407
|
|
|
|
|
|
|
parameter, instead of as C<body>. |
408
|
|
|
|
|
|
|
|
409
|
|
|
|
|
|
|
=head2 C<delete()> |
410
|
|
|
|
|
|
|
|
411
|
|
|
|
|
|
|
$bulk->delete( |
412
|
|
|
|
|
|
|
{ index => 'custom_index', id => 1}, |
413
|
|
|
|
|
|
|
{ type => 'custom_type', id => 2}, |
414
|
|
|
|
|
|
|
... |
415
|
|
|
|
|
|
|
); |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
The C<delete()> helper method allows you to add multiple C<delete> actions. |
418
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/delete()>. |
419
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
=head2 C<delete_ids()> |
421
|
|
|
|
|
|
|
|
422
|
|
|
|
|
|
|
$bulk->delete_ids(1,2,3...) |
423
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
The C<delete_ids()> helper method can be used when all of the documents you |
425
|
|
|
|
|
|
|
want to delete have the default C<index> and C<type> as set in L</new()>. |
426
|
|
|
|
|
|
|
In this case, all you have to do is to pass in a list of IDs. |
427
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
=head2 C<update()> |
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
$bulk->update( |
431
|
|
|
|
|
|
|
{ id => 1, |
432
|
|
|
|
|
|
|
doc => { partial doc }, |
433
|
|
|
|
|
|
|
doc_as_upsert => 1 |
434
|
|
|
|
|
|
|
}, |
435
|
|
|
|
|
|
|
{ id => 2, |
436
|
|
|
|
|
|
|
lang => 'mvel', |
437
|
|
|
|
|
|
|
script => '_ctx.source.counter+=incr', |
438
|
|
|
|
|
|
|
params => { incr => 1}, |
439
|
|
|
|
|
|
|
upsert => { upsert doc } |
440
|
|
|
|
|
|
|
}, |
441
|
|
|
|
|
|
|
... |
442
|
|
|
|
|
|
|
); |
443
|
|
|
|
|
|
|
|
444
|
|
|
|
|
|
|
The C<update()> helper method allows you to add multiple C<update> actions. |
445
|
|
|
|
|
|
|
It accepts the same parameters as L<Elasticsearch::Client::Direct/update()>. |
446
|
|
|
|
|
|
|
An update can either use a I<partial doc> which gets merged with an existing |
447
|
|
|
|
|
|
|
doc (example 1 above), or can use a C<script> to update an existing doc |
448
|
|
|
|
|
|
|
(example 2 above). |
449
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
=head1 REINDEXING DOCUMENTS |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
A common use case for bulk indexing is to reindex a whole index when |
453
|
|
|
|
|
|
|
changing the type mappings or analysis chain. This typically |
454
|
|
|
|
|
|
|
combines bulk indexing with L<scrolled searches|Elasticsearch::Scroll>: |
455
|
|
|
|
|
|
|
the scrolled search pulls all of the data from the source index, and |
456
|
|
|
|
|
|
|
the bulk indexer indexes the data into the new index. |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
=head2 C<reindex()> |
459
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
$bulk->reindex( |
461
|
|
|
|
|
|
|
source => $source, # required |
462
|
|
|
|
|
|
|
transform => \&transform, # optional |
463
|
|
|
|
|
|
|
version_type => 'external|internal', # optional |
464
|
|
|
|
|
|
|
); |
465
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
The C<reindex()> method requires a C<$source> parameter, which provides |
467
|
|
|
|
|
|
|
the source for the documents which are to be reindexed. |
468
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
=head2 Reindexing from another index |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
If the C<source> argument is a HASH ref, then the hash is passed to |
472
|
|
|
|
|
|
|
L<Elasticsearch::Scroll/new()> to create a new scrolled search. |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
$bulk = Elasticsearch::Bulk->new( |
475
|
|
|
|
|
|
|
index => 'new_index', |
476
|
|
|
|
|
|
|
verbose => 1 |
477
|
|
|
|
|
|
|
); |
478
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
$bulk->reindex( |
480
|
|
|
|
|
|
|
source => { |
481
|
|
|
|
|
|
|
index => 'old_index', |
482
|
|
|
|
|
|
|
size => 500, # default |
483
|
|
|
|
|
|
|
search_type => 'scan' # default |
484
|
|
|
|
|
|
|
} |
485
|
|
|
|
|
|
|
); |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
If a default C<index> or C<type> has been specified in the call to |
488
|
|
|
|
|
|
|
L</new()>, then it will replace the C<index> and C<type> values for |
489
|
|
|
|
|
|
|
the docs returned from the scrolled search. In the example above, |
490
|
|
|
|
|
|
|
all docs will be retrieved from C<"old_index"> and will be bulk indexed |
491
|
|
|
|
|
|
|
into C<"new_index">. |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
=head2 Reindexing from a generic source |
494
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
The C<source> parameter also accepts a coderef or an anonymous sub, |
496
|
|
|
|
|
|
|
which should return one or more new documents every time it is executed. |
497
|
|
|
|
|
|
|
This allows you to pass any iterator, wrapped in an anonymous sub: |
498
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
my $iter = get_iterator_from_somewhere(); |
500
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
$bulk->reindex( |
502
|
|
|
|
|
|
|
source => sub { $iter->next } |
503
|
|
|
|
|
|
|
); |
504
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
=head2 Transforming docs on the fly |
506
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
The C<transform> parameter allows you to change documents on the fly, |
508
|
|
|
|
|
|
|
using a callback. The callback receives the document as the only argument, |
509
|
|
|
|
|
|
|
and should return the updated document, or C<undef> if the document should |
510
|
|
|
|
|
|
|
not be indexed: |
511
|
|
|
|
|
|
|
|
512
|
|
|
|
|
|
|
$bulk->reindex( |
513
|
|
|
|
|
|
|
source => { index => 'old_index' }, |
514
|
|
|
|
|
|
|
transform => sub { |
515
|
|
|
|
|
|
|
my $doc = shift; |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# don't index doc marked as valid:false |
518
|
|
|
|
|
|
|
return undef unless $doc->{_source}{valid}; |
519
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
# convert $tag to @tags |
521
|
|
|
|
|
|
|
$doc->{_source}{tags} = [ delete $doc->{_source}{tag}]; |
522
|
|
|
|
|
|
|
return $doc |
523
|
|
|
|
|
|
|
} |
524
|
|
|
|
|
|
|
); |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=head2 Reindexing from another cluster |
527
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
By default, L</reindex()> expects the source and destination indices |
529
|
|
|
|
|
|
|
to be in the same cluster. To pull data from one cluster and index it into |
530
|
|
|
|
|
|
|
another, you can use two separate C<$es> objects: |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
$es_target = Elasticsearch->new( nodes => 'localhost:9200' ); |
533
|
|
|
|
|
|
|
$es_source = Elasticsearch->new( nodes => 'search1:9200' ); |
534
|
|
|
|
|
|
|
|
535
|
|
|
|
|
|
|
Elasticsearch::Bulk->new( |
536
|
|
|
|
|
|
|
es => $es_target, |
537
|
|
|
|
|
|
|
verbose => 1 |
538
|
|
|
|
|
|
|
) |
539
|
|
|
|
|
|
|
-> reindex( |
540
|
|
|
|
|
|
|
source => { |
541
|
|
|
|
|
|
|
es => $es_source, |
542
|
|
|
|
|
|
|
index => 'my_index' |
543
|
|
|
|
|
|
|
} |
544
|
|
|
|
|
|
|
); |
545
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
=head2 Parents and routing |
547
|
|
|
|
|
|
|
|
548
|
|
|
|
|
|
|
If you are using parent-child relationships or custom C<routing> values, |
549
|
|
|
|
|
|
|
and you want to preserve these when you reindex your documents, then |
550
|
|
|
|
|
|
|
you will need to request these values specifically, as follows: |
551
|
|
|
|
|
|
|
|
552
|
|
|
|
|
|
|
$bulk->reindex( |
553
|
|
|
|
|
|
|
source => { |
554
|
|
|
|
|
|
|
index => 'old_index', |
555
|
|
|
|
|
|
|
fields => ['_source','_parent','_routing'] |
556
|
|
|
|
|
|
|
} |
557
|
|
|
|
|
|
|
); |
558
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
=head2 Working with version numbers |
560
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
Every document in Elasticsearch has a current C<version> number, which |
562
|
|
|
|
|
|
|
is used for L<optimistic concurrency control|http://en.wikipedia.org/wiki/Optimistic_concurrency_control>, |
563
|
|
|
|
|
|
|
that is, to ensure that you don't overwrite changes that have been made |
564
|
|
|
|
|
|
|
by another process. |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
All CRUD operations accept a C<version> parameter and a C<version_type> |
567
|
|
|
|
|
|
|
parameter which tells Elasticsearch that the change should only be made |
568
|
|
|
|
|
|
|
if the current document corresponds to these parameters. The |
569
|
|
|
|
|
|
|
C<version_type> parameter can have the following values: |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
=over |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
=item * C<internal> |
574
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
Use Elasticsearch version numbers. Documents are only changed if the |
576
|
|
|
|
|
|
|
document in Elasticsearch has the B<same> C<version> number that is |
577
|
|
|
|
|
|
|
specified in the CRUD operation. After the change, the new |
578
|
|
|
|
|
|
|
version number is C<version+1>. |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
=item * C<external> |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
Use an external versioning system, such as timestamps or version numbers |
583
|
|
|
|
|
|
|
from an external database. Documents are only changed if the document |
584
|
|
|
|
|
|
|
in Elasticsearch has a B<lower> C<version> number than the one |
585
|
|
|
|
|
|
|
specified in the CRUD operation. After the change, the new version |
586
|
|
|
|
|
|
|
number is C<version>. |
587
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
=back |
589
|
|
|
|
|
|
|
|
590
|
|
|
|
|
|
|
If you would like to reindex documents from one index to another, preserving |
591
|
|
|
|
|
|
|
the C<version> numbers from the original index, then you need the following: |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
$bulk->reindex( |
594
|
|
|
|
|
|
|
source => { |
595
|
|
|
|
|
|
|
index => 'old_index', |
596
|
|
|
|
|
|
|
version => 1, # retrieve version numbers in search |
597
|
|
|
|
|
|
|
}, |
598
|
|
|
|
|
|
|
version_type => 'external' # use these "external" version numbers |
599
|
|
|
|
|
|
|
); |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
=head1 AUTHOR |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
Clinton Gormley <drtech@cpan.org> |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
This software is Copyright (c) 2014 by Elasticsearch BV. |
608
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
This is free software, licensed under: |
610
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
The Apache License, Version 2.0, January 2004 |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
=cut |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
__END__ |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
# ABSTRACT: A helper module for the Bulk API and for reindexing |
618
|
|
|
|
|
|
|
|