File Coverage

blib/lib/Amazon/MWS/Uploader.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Amazon::MWS::Uploader;
2              
3 6     6   86010 use utf8;
  6         10  
  6         39  
4 6     6   202 use strict;
  6         9  
  6         154  
5 6     6   27 use warnings;
  6         9  
  6         178  
6              
7 6     6   9896 use DBI;
  6         102119  
  6         480  
8 6     6   3696 use Amazon::MWS::XML::Feed;
  6         22  
  6         218  
9 6     6   3248 use Amazon::MWS::XML::Order;
  6         20  
  6         276  
10 6     6   3415 use Amazon::MWS::Client;
  6         16  
  6         229  
11 6     6   2724 use Amazon::MWS::XML::Response::FeedSubmissionResult;
  0            
  0            
12             use Amazon::MWS::XML::Response::OrderReport;
13             use Data::Dumper;
14             use File::Spec;
15             use DateTime;
16             use SQL::Abstract;
17             use Try::Tiny;
18             use Path::Tiny;
19             use Scalar::Util qw/blessed/;
20             use XML::Compile::Schema;
21              
22             use Moo;
23             use MooX::Types::MooseLike::Base qw(:all);
24             use namespace::clean;
25              
26             our $VERSION = '0.18';
27              
28             use constant {
29             AMW_ORDER_WILDCARD_ERROR => 999999,
30             DEBUG => $ENV{AMZ_UPLOADER_DEBUG},
31             };
32              
33             =head1 NAME
34              
35             Amazon::MWS::Uploader -- high level agent to upload products to AMWS
36              
37             =head1 DESCRIPTION
38              
39             This module provide an high level interface to the upload process. It
40             has to keep track of the state to resume the uploading, which could
41             get stuck on the Amazon's side processing, so database credentials
42             have to be provided (or the database handle itself).
43              
44             The table structure needed is defined and commented in sql/amazon.sql
45              
46             =head1 SYNOPSIS
47              
48             my $agent = Amazon::MWS::Uploader->new(
49             db_dsn => 'DBI:mysql:database=XXX',
50             db_username => 'xxx',
51             db_password => 'xxx',
52             db_options => \%options
53             # or dbh => $dbh,
54            
55             schema_dir => '/path/to/xml_schema',
56             feed_dir => '/path/to/directory/for/xml',
57            
58             merchant_id => 'xxx',
59             access_key_id => 'xxx',
60             secret_key => 'xxx',
61            
62             marketplace_id => 'xxx',
63             endpoint => 'xxx',
64            
65             products => \@products,
66             );
67            
68             # say once a day, retrieve the full batch and send it up
69             $agent->upload;
70            
71             # every 10 minutes or so, continue the work started with ->upload, if any
72             $agent->resume;
73              
74              
75             =head1 UPGRADE NOTES
76              
77             When migrating from 0.05 to 0.06 please execute this SQL statement
78              
79             ALTER TABLE amazon_mws_products ADD COLUMN listed BOOLEAN;
80             UPDATE amazon_mws_products SET listed = 1 WHERE status = 'ok';
81              
82             When upgrading to 0.16, please execute this SQL statement:
83              
84             ALTER TABLE amazon_mws_products ADD COLUMN warnings TEXT;
85              
86             =head1 ACCESSORS
87              
88             The following keys must be passed at the constructor and can be
89             accessed read-only:
90              
91             =over 4
92              
93             =item dbh
94              
95             The DBI handle. If not provided will be built using the following
96             self-describing accessor:
97              
98             =item db_dsn
99              
100             =item db_username
101              
102             =item db_password
103              
104             =item db_options
105              
106             E.g.
107              
108             {
109             mysql_enable_utf8 => 1,
110             }
111              
112             AutoCommit and RaiseError are set by us.
113              
114             =cut
115              
116             has db_dsn => (is => 'ro');
117             has db_password => (is => 'ro');
118             has db_username => (is => 'ro');
119             has db_options => (is => 'ro',
120             isa => AnyOf[Undef,HashRef],
121             );
122             has dbh => (is => 'lazy');
123              
124             =item skus_warnings_modes
125              
126             Determines how to treat warnings. This is a hash reference with the
127             code of the warning as key and one of the following modes as value:
128              
129             =over 4
130              
131             =item warn
132              
133             Prints warning from Amazon with C<warn> function (default mode).
134              
135             =item print
136              
137             Prints warning from Amazon with C<print> function (default mode).
138              
139             =item skip
140              
141             Ignores warning from Amazon.
142              
143             =back
144              
145             =cut
146              
147             has skus_warnings_modes => (is => 'rw',
148             isa => HashRef,
149             default => sub {{}},
150             );
151              
152             =item order_days_range
153              
154             When calling get_orders, check the orders for the last X days. It
155             accepts an integer which should be in the range 1-30. Defaults to 7.
156              
157             Keep in mind that if you change the default and you have a lot of
158             orders, you will get throttled because for each order we retrieve the
159             orderline as well.
160              
161             DEVEL NOTE: a possible smart fix would be to store this object in the
162             order (or into a closure) and make the orderline a lazy attribute
163             which will call C<ListOrderItems>.
164              
165             =cut
166              
167             has order_days_range => (is => 'rw',
168             default => sub { 7 },
169             isa => sub {
170             my $days = $_[0];
171             die "Not an integer"
172             unless is_Int($days);
173             die "$days is out of range 1-30"
174             unless $days > 0 && $days < 31;
175             });
176              
177             =item shop_id
178              
179             You can pass an arbitrary identifier to the constructor which will be
180             used to keep the database records separated if you have multiple
181             amazon accounts. If not provided, the merchant id will be used, which
182             will work, but it's harder (for the humans) to spot and debug.
183              
184             =cut
185              
186             has shop_id => (is => 'ro');
187              
188             has _unique_shop_id => (is => 'lazy');
189              
190             sub _build__unique_shop_id {
191             my $self = shift;
192             if (my $id = $self->shop_id) {
193             return $id;
194             }
195             else {
196             return $self->merchant_id;
197             }
198             }
199              
200             =item debug
201              
202             Print out additional information.
203              
204             =item logfile
205              
206             Passed to L<Amazon::MWS::Client> constructor.
207              
208             =cut
209              
210             has debug => (is => 'ro');
211              
212             has logfile => (is => 'ro');
213              
214             =item quiet
215              
216             Boolean. Do not warn on timeouts and aborts (just print) if set to
217             true.
218              
219             =cut
220              
221             has quiet => (is => 'ro');
222              
223             sub _build_dbh {
224             my $self = shift;
225             my $dsn = $self->db_dsn;
226             die "Missing dns" unless $dsn;
227             my $options = $self->db_options || {};
228             # forse raise error and auto-commit
229             $options->{RaiseError} = 1;
230             $options->{AutoCommit} = 1;
231             my $dbh = DBI->connect($dsn, $self->db_username, $self->db_password,
232             $options) or die "Couldn't connect to $dsn!";
233             return $dbh;
234             }
235              
236             =item purge_missing_products
237              
238             If true, the first time C<products_to_upload> is called, products not
239             passed to the C<products> constructor will be purged from the
240             C<amazon_mws_products> table. Default to false.
241              
242             This setting is DEPRECATED because can have some unwanted
243             side-effects. You are recommended to delete the obsoleted products
244             yourself.
245              
246             =cut
247              
248             has purge_missing_products => (is => 'rw');
249              
250              
251             =item reset_all_errors
252              
253             If set to a true value, don't skip previously failed items and
254             effectively reset all of them.
255              
256             Also, when the accessor is set for send_shipping_confirmation, try to
257             upload again previously failed orders.
258              
259             =cut
260              
261             has reset_all_errors => (is => 'ro');
262              
263             =item reset_errors
264              
265             A string containing a comma separated list of error codes, optionally
266             prefixed with a "!" (to reverse its meaning).
267              
268             Example:
269              
270             "!6024,6023"
271              
272             Meaning: reupload all the products whose error code is B<not> 6024 or
273             6023.
274              
275             "6024,6023"
276              
277             Meaning: reupload the products whose error code was 6024 or 6023
278              
279             =cut
280              
281             has reset_errors => (is => 'ro',
282             isa => sub {
283             my $string = $_[0];
284             # undef/0/'' is fine
285             if ($string) {
286             die "reset_errors must be a comma separated list of error code, optionally prefixed by a '!' to negate its meaning"
287             if $string !~ m/^\s*!?\s*(([0-9]+)(\s*,\s*)?)+/;
288             }
289             });
290              
291              
292             has _reset_error_structure => (is => 'lazy');
293              
294             sub _build__reset_error_structure {
295             my $self = shift;
296             my $reset_string = $self->reset_errors || '';
297             $reset_string =~ s/^\s*//;
298             $reset_string =~ s/\s*$//;
299             return unless $reset_string;
300              
301             my $negate = 0;
302             if ($reset_string =~ m/^\s*!\s*(.+)/) {
303             $reset_string = $1;
304             $negate = 1;
305             }
306             my %codes = map { $_ => 1 } grep { $_ } split(/\s*,\s*/, $reset_string);
307             return unless %codes;
308             return {
309             negate => $negate,
310             codes => \%codes,
311             };
312             }
313              
314              
315             =item force
316              
317             Same as above, but only for the selected items. An arrayref is
318             expected here with the B<skus>.
319              
320             =cut
321              
322             has force => (is => 'ro',
323             isa => ArrayRef,
324             );
325              
326              
327             has _force_hashref => (is => 'lazy');
328              
329             sub _build__force_hashref {
330             my $self = shift;
331             my %forced;
332             if (my $arrayref = $self->force) {
333             %forced = map { $_ => 1 } @$arrayref;
334             }
335             return \%forced;
336             }
337              
338             =item limit_inventory
339              
340             If set to an integer, limit the inventory to this value. Setting this
341             to 0 will disable it.
342              
343             =item job_hours_timeout
344              
345             If set to an integer, abort the job after X hours are elapsed since
346             the job was started. Default to 3 hours. Set to 0 to disable (not
347             recommended).
348              
349             This doesn't affect jobs for order acknowledgements (C<order_ack>), see below.
350              
351             =item order_ack_days_timeout
352              
353             Order acknowlegments times out at different rate, because it's somehow
354             sensitive.
355              
356             =cut
357              
358             has job_hours_timeout => (is => 'ro',
359             isa => Int,
360             default => sub { 3 });
361              
362             has order_ack_days_timeout => (is => 'ro',
363             isa => Int,
364             default => sub { 30 });
365              
366             has limit_inventory => (is => 'ro',
367             isa => Int);
368              
369             =item schema_dir
370              
371             The directory where the xsd files for the feed building can be found.
372              
373             =item feeder
374              
375             A L<Amazon::MWS::XML::Feed> object. Lazy attribute, you shouldn't pass
376             this to the constructor, it is lazily built using C<products>,
377             C<merchant_id> and C<schema_dir>.
378              
379             =item feed_dir
380              
381             A working directory where to stash the uploaded feeds for inspection
382             if problems are detected.
383              
384             =item schema
385              
386             The L<XML::Compile::Schema> object, built lazily from C<feed_dir>
387              
388             =item xml_writer
389              
390             The xml writer, built lazily.
391              
392             =item xml_reader
393              
394             The xml reader, built lazily.
395              
396             =cut
397              
398             has schema_dir => (is => 'ro',
399             required => 1,
400             isa => sub {
401             die "$_[0] is not a directory" unless -d $_[0];
402             });
403              
404             has feed_dir => (is => 'ro',
405             required => 1,
406             isa => sub {
407             die "$_[0] is not a directory" unless -d $_[0];
408             });
409              
410             has schema => (is => 'lazy');
411              
412             sub _build_schema {
413             my $self = shift;
414             my $files = File::Spec->catfile($self->schema_dir, '*.xsd');
415             my $schema = XML::Compile::Schema->new([glob $files]);
416             return $schema;
417             }
418              
419             has xml_writer => (is => 'lazy');
420              
421             sub _build_xml_writer {
422             my $self = shift;
423             return $self->schema->compile(WRITER => 'AmazonEnvelope');
424             }
425              
426             has xml_reader => (is => 'lazy');
427              
428             sub _build_xml_reader {
429             my $self = shift;
430             return $self->schema->compile(READER => 'AmazonEnvelope');
431             }
432              
433              
434             =item generic_feeder
435              
436             Return a L<Amazon::MWS::XML::GenericFeed> object to build a feed using
437             the XML writer.
438              
439             =cut
440              
441             sub generic_feeder {
442             my $self = shift;
443             return Amazon::MWS::XML::GenericFeed->new(
444             xml_writer => $self->xml_writer,
445             merchant_id => $self->merchant_id,
446             );
447             }
448              
449              
450             =item merchant_id
451              
452             The merchant ID provided by Amazon.
453              
454             =item access_key_id
455              
456             Provided by Amazon.
457              
458             =item secret_key
459              
460             Provided by Amazon.
461              
462             =item marketplace_id
463              
464             L<http://docs.developer.amazonservices.com/en_US/dev_guide/DG_Endpoints.html>
465              
466             =item endpoint
467              
468             Ditto.
469              
470             =cut
471              
472             has merchant_id => (is => 'ro', required => 1);
473             has access_key_id => (is => 'ro', required => 1);
474             has secret_key => (is => 'ro', required => 1);
475             has marketplace_id => (is => 'ro', required => 1);
476             has endpoint => (is => 'ro', required => 1);
477              
478             =item products
479              
480             An arrayref of L<Amazon::MWS::XML::Product> objects, or anything that
481             (properly) responds to C<as_product_hash>, C<as_inventory_hash>,
482             C<as_price_hash>. See L<Amazon::MWS::XML::Product> for details.
483              
484             B<This is set as read-write, so you can set the product after the
485             object construction, but if you change it afterward, you will get
486             unexpected results>.
487              
488             This routine also check if the product needs upload and delete
489             disappeared products. If you are doing the check yourself, use
490             C<checked_products>.
491              
492             =item checked_products
493              
494             As C<products>, but no check is performed. This takes precedence.
495              
496             =item sqla
497              
498             Lazy attribute to hold the C<SQL::Abstract> object.
499              
500             =cut
501              
502             has products => (is => 'rw',
503             isa => ArrayRef);
504              
505             has sqla => (
506             is => 'ro',
507             default => sub {
508             return SQL::Abstract->new;
509             }
510             );
511              
512             has existing_products => (is => 'lazy');
513              
514             sub _build_existing_products {
515             my $self = shift;
516             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku
517             timestamp_string
518             status
519             listed
520             error_code
521             /],
522             {
523             status => { -not_in => [qw/deleted/] },
524             shop_id => $self->_unique_shop_id,
525             }));
526             my %uploaded;
527             while (my $row = $sth->fetchrow_hashref) {
528             $row->{timestamp_string} ||= 0;
529             $uploaded{$row->{sku}} = $row;
530             }
531             return \%uploaded;
532             }
533              
534             has products_to_upload => (is => 'lazy');
535              
536             has checked_products => (is => 'rw', isa => ArrayRef);
537              
538             sub _build_products_to_upload {
539             my $self = shift;
540             if (my $checked = $self->checked_products) {
541             return $checked;
542             }
543             my $product_arrayref = $self->products;
544             return [] unless $product_arrayref && @$product_arrayref;
545             my @products = @$product_arrayref;
546             my $existing = $self->existing_products;
547             my @todo;
548             foreach my $product (@products) {
549             my $sku = $product->sku;
550             if (my $exists = $existing->{$sku}) {
551             # mark the item as visited
552             $exists->{_examined} = 1;
553             }
554             print "Checking $sku\n" if $self->debug;
555             next unless $self->product_needs_upload($product->sku, $product->timestamp_string);
556              
557             print "Scheduling product " . $product->sku . " for upload\n";
558             if (my $limit = $self->limit_inventory) {
559             my $real = $product->inventory;
560             if ($real > $limit) {
561             print "Limiting the $sku inventory from $real to $limit\n" if $self->debug;
562             $product->inventory($limit);
563             }
564             }
565             if (my $children = $product->children) {
566             my @good_children;
567             foreach my $child (@$children) {
568             # skip failed children, but if the current status of
569             # parent is failed, and we reached this point, retry.
570             if ($existing->{$child} and
571             $existing->{$sku} and
572             $existing->{$sku}->{status} ne 'failed' and
573             $existing->{$child}->{status} eq 'failed') {
574             print "Ignoring failed variant $child\n";
575             }
576             else {
577             push @good_children, $child;
578             }
579             }
580             $product->children(\@good_children);
581             }
582             push @todo, $product;
583             }
584             if ($self->purge_missing_products) {
585             # nuke the products not passed
586             # print Dumper($existing);
587             my @deletions = map { $_->{sku} }
588             grep { !$_->{_examined} }
589             values %$existing;
590             if (@deletions) {
591             $self->delete_skus(@deletions);
592             }
593             }
594             return \@todo;
595             }
596              
597              
598             =item client
599              
600             An L<Amazon::MWS::Client> object, built lazily, so you don't have to
601             pass it.
602              
603             =back
604              
605             =cut
606              
607             has client => (is => 'lazy');
608              
609             sub _build_client {
610             my $self = shift;
611             my %mws_args = map { $_ => $self->$_ } (qw/merchant_id
612             marketplace_id
613             access_key_id
614             secret_key
615             debug
616             logfile
617             endpoint/);
618              
619             return Amazon::MWS::Client->new(%mws_args);
620             }
621              
622             has _mismatch_patterns => (is => 'lazy', isa => HashRef);
623              
624             sub _build__mismatch_patterns {
625             my $self = shift;
626             my $merchant_re = qr{\s+\((?:Merchant|Verkäufer):\s+'(.*?)'\s+/};
627             my $amazon_re = qr{\s+.*?/\s*Amazon:\s+'(.*?)'\)};
628             my %patterns = (
629             # informative only
630             asin => qr{ASIN(?:\s+überein)?\s+([0-9A-Za-z]+)},
631              
632             shop_part_number => qr{part_number$merchant_re},
633             amazon_part_number => qr{part_number$amazon_re},
634              
635             shop_title => qr{item_name$merchant_re},
636             amazon_title => qr{item_name$amazon_re},
637              
638             shop_manufacturer => qr{manufacturer$merchant_re},
639             amazon_manufacturer => qr{manufacturer$amazon_re},
640              
641             shop_brand => qr{brand$merchant_re},
642             amazon_brand => qr{brand$amazon_re},
643              
644             shop_color => qr{color$merchant_re},
645             amazon_color => qr{color$amazon_re},
646              
647             shop_size => qr{size$merchant_re},
648             amazon_size => qr{size$amazon_re},
649              
650             );
651             return \%patterns;
652             }
653              
654              
655             =head1 MAIN METHODS
656              
657             =head2 upload
658              
659             If the products is set, begin the routine to upload them. Because of
660             the asynchronous way AMWS works, at some point it will bail out,
661             saving the state in the database. You should reinstantiate the object
662             and call C<resume> on it every 10 minutes or so.
663              
664             The workflow is described here:
665             L<http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html>
666              
667             This has to be done for each feed: Product, Inventory, Price, Image,
668             Relationship (for variants).
669              
670             This method first generate the feeds in the feed directory, and then
671             calls C<resume>, which is in charge for the actual uploading.
672              
673             =head2 resume
674              
675             Restore the state and resume where it was left.
676              
677             This method accepts an optional list of parameters. Each parameter may be:
678              
679             =over 4
680              
681             =item a scalar
682              
683             This is considered a job id.
684              
685             =item a hashref
686              
687             This will be merged in the query to retrieve the pending jobs. A
688             sample usage could be:
689              
690             $upload->resume({ task => [qw/upload product_deletion/] });
691              
692             to resume only those specific tasks.
693              
694             =back
695              
696             =head2 get_pending_jobs
697              
698             Return the list of hashref with the pending jobs out of the database.
699             Accepts the same parameters as C<resume> (which actually calls this
700             method).
701              
702             =cut
703              
704             =head1 INTERNAL METHODS
705              
706             =head2 prepare_feeds($type, { name => $feed_name, content => "<xml>..."}, { name => $feed_name2, content => "<xml>..."}, ....)
707              
708             Prepare the feed of type $type with the feeds provided as additional
709             arguments.
710              
711             Return the job id
712              
713              
714             =cut
715              
716             sub _feed_job_dir {
717             my ($self, $job_id, $create) = @_;
718             die unless $job_id;
719             my $shop_id = $self->_unique_shop_id;
720             $shop_id =~ s/[^0-9A-Za-z_-]//g;
721             die "The shop id without word characters results in an empty string"
722             unless $shop_id;
723             my $feed_root = File::Spec->catdir($self->feed_dir,
724             $shop_id);
725             if ($create) {
726             mkdir $feed_root unless -d $feed_root;
727             }
728              
729             my $feed_subdir = File::Spec->catdir($feed_root,
730             $job_id);
731             if ($create) {
732             mkdir $feed_subdir unless -d $feed_subdir;
733             }
734             return $feed_subdir;
735             }
736              
737             sub _feed_file_for_method {
738             my ($self, $job_id, $feed_type) = @_;
739             die unless $job_id && $feed_type;
740             my $feed_subdir = $self->_feed_job_dir($job_id, "create");
741             my $file = File::Spec->catfile($feed_subdir, $feed_type . '.xml');
742             return File::Spec->rel2abs($file);
743             }
744              
745             sub _slurp_file {
746             my ($self, $file) = @_;
747             open (my $fh, '<', $file) or die "Couldn't open $file $!";
748             local $/ = undef;
749             my $content = <$fh>;
750             close $fh;
751             return $content;
752             }
753              
754             sub upload {
755             my $self = shift;
756             # create the feeds to be uploaded using the products
757             my @products = @{ $self->products_to_upload };
758              
759             unless (@products) {
760             print "No products, can't upload anything\n";
761             return;
762             }
763             my $feeder = Amazon::MWS::XML::Feed->new(
764             products => \@products,
765             xml_writer => $self->xml_writer,
766             merchant_id => $self->merchant_id,
767             );
768             my @feeds;
769             foreach my $feed_name (qw/product
770             inventory
771             price
772             image
773             variants
774             /) {
775             my $method = $feed_name . "_feed";
776             if (my $content = $feeder->$method) {
777             push @feeds, {
778             name => $feed_name,
779             content => $content,
780             };
781             }
782             }
783             if (my $job_id = $self->prepare_feeds(upload => \@feeds)) {
784             $self->_mark_products_as_pending($job_id, @products);
785             return $job_id;
786             }
787             return;
788             }
789              
790             sub _mark_products_as_pending {
791             my ($self, $job_id, @products) = @_;
792             die "Bad usage" unless $job_id;
793             # these skus were cleared up when asking for the products to upload
794             foreach my $p (@products) {
795             my %identifier = (
796             sku => $p->sku,
797             shop_id => $self->_unique_shop_id,
798             );
799             my %data = (
800             amws_job_id => $job_id,
801             status => 'pending',
802             warnings => '', # clear out
803             timestamp_string => $p->timestamp_string,
804             );
805             my $check = $self
806             ->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku/], { %identifier }));
807             my $existing = $check->fetchrow_hashref;
808             $check->finish;
809             if ($existing) {
810             $self->_exe_query($self->sqla->update(amazon_mws_products => \%data, \%identifier));
811             }
812             else {
813             $self->_exe_query($self->sqla->insert(amazon_mws_products => { %identifier, %data }));
814             }
815             }
816             }
817              
818              
819             sub prepare_feeds {
820             my ($self, $task, $feeds) = @_;
821             die "Missing task ($task) and feeds ($feeds)" unless $task && $feeds;
822             return unless @$feeds; # nothing to do
823             my $job_id = $task . "-" . DateTime->now->strftime('%F-%H-%M-%S');
824             my $job_started_epoch = time();
825              
826             $self->_exe_query($self->sqla
827             ->insert(amazon_mws_jobs => {
828             amws_job_id => $job_id,
829             shop_id => $self->_unique_shop_id,
830             task => $task,
831             job_started_epoch => $job_started_epoch,
832             }));
833              
834             # to complete the process, we need to fill out these five
835             # feeds. every feed has the same procedure, as per
836             # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
837             # so we put a flag on the feed when it is done. The processing
838             # of the feed itself is tracked in the amazon_mws_feeds
839              
840             # TODO: we could pass to the object some flags to filter out results.
841             foreach my $feed (@$feeds) {
842             # write out the feed if we got something to do, and add a row
843             # to the feeds.
844              
845             # when there is no content, no need to create a job for it.
846             if (my $content = $feed->{content}) {
847             my $name = $feed->{name} or die "Missing feed_name";
848             my $file = $self->_feed_file_for_method($job_id, $name);
849             open (my $fh, '>', $file) or die "Couldn't open $file $!";
850             print $fh $content;
851             close $fh;
852             # and prepare a row for it
853              
854             my $insertion = {
855             feed_name => $name,
856             feed_file => $file,
857             amws_job_id => $job_id,
858             shop_id => $self->_unique_shop_id,
859             };
860             $self->_exe_query($self->sqla
861             ->insert(amazon_mws_feeds => $insertion));
862             }
863             }
864             return $job_id;
865             }
866              
867              
868             sub get_pending_jobs {
869             my ($self, @args) = @_;
870             my %additional;
871             my @named_jobs;
872             foreach my $arg (@args) {
873             if (!ref($arg)) {
874             push @named_jobs, $arg;
875             }
876             elsif (ref($arg) eq 'HASH') {
877             # add the filters
878             foreach my $key (keys %$arg) {
879             if ($additional{$key}) {
880             die "Attempt to overwrite $key in the additional parameters!\n";
881             }
882             else {
883             $additional{$key} = $arg->{$key};
884             }
885             }
886             }
887             else {
888             die "Argument must be either a scalar with a job name and/or "
889             . "an hashref with additional filters!";
890             }
891             }
892             if (@named_jobs) {
893             $additional{amws_job_id} = { -in => \@named_jobs };
894             }
895             my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
896             {
897             %additional,
898             aborted => 0,
899             success => 0,
900             shop_id => $self->_unique_shop_id,
901             },
902             { -asc => 'job_started_epoch'});
903             my $pending = $self->_exe_query($stmt, @bind);
904             my %jobs;
905             while (my $row = $pending->fetchrow_hashref) {
906             $jobs{$row->{task}} ||= [];
907             push @{$jobs{$row->{task}}}, $row;
908             }
909             my @out;
910             foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
911             if (my $list = delete $jobs{$task}) {
912             if ($task eq 'order_ack') {
913             for (1..2) {
914             push @out, pop @$list if @$list;
915             }
916             }
917             elsif ($task eq 'shipping_confirmation') {
918             while (@$list) {
919             push @out, pop @$list;
920             }
921             }
922             else {
923             push @out, @$list if @$list;
924             }
925             }
926             }
927             return @out;
928             }
929              
930             sub resume {
931             my ($self, @args) = @_;
932             foreach my $row ($self->get_pending_jobs(@args)) {
933             print "Working on $row->{amws_job_id}\n";
934             # check if the job dir exists
935             if (-d $self->_feed_job_dir($row->{amws_job_id})) {
936             if (my $seconds_elapsed = $self->job_timed_out($row)) {
937             $self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
938             . Dumper($row));
939             $self->cancel_job($row->{task}, $row->{amws_job_id},
940             "Job timed out after $seconds_elapsed seconds");
941             next;
942             }
943             $self->process_feeds($row);
944             }
945             else {
946             warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
947             " found, removing job id $row->{amws_job_id}\n";
948             $self->cancel_job($row->{task}, $row->{amws_job_id},
949             "Job canceled due to missing feed directory");
950             }
951             }
952             }
953              
954             =head2 cancel_job($task, $job_id, $reason)
955              
956             Abort the job setting the aborted flag in C<amazon_mws_jobs> table.
957              
958             =cut
959              
960             sub cancel_job {
961             my ($self, $task, $job_id, $reason) = @_;
962             $self->_exe_query($self->sqla->update('amazon_mws_jobs',
963             {
964             aborted => 1,
965             status => $reason,
966             },
967             {
968             amws_job_id => $job_id,
969             shop_id => $self->_unique_shop_id,
970             }));
971              
972             # and revert the products' status
973             my $status;
974             if ($task eq 'product_deletion') {
975             # let's pretend we were deleting good products
976             $status = 'ok';
977             }
978             elsif ($task eq 'upload') {
979             $status = 'redo';
980             }
981             if ($status) {
982             print "Updating product to $status for products with job id $job_id\n";
983             $self->_exe_query($self->sqla->update('amazon_mws_products',
984             { status => $status },
985             {
986             amws_job_id => $job_id,
987             shop_id => $self->_unique_shop_id,
988             }));
989             }
990             }
991              
992              
993              
994             =head2 process_feeds(\%job_row)
995              
996             Given the hashref with the db row of the job, check at which point it
997             is and resume.
998              
999             =cut
1000              
1001             sub process_feeds {
1002             my ($self, $row) = @_;
1003             # print Dumper($row);
1004             # upload the feeds one by one and stop if something is blocking
1005             my $job_id = $row->{amws_job_id};
1006             print "Processing job $job_id\n";
1007              
1008             # query the feeds table for this job
1009             my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
1010             {
1011             amws_job_id => $job_id,
1012             aborted => 0,
1013             success => 0,
1014             shop_id => $self->_unique_shop_id,
1015             },
1016             ['amws_feed_pk']);
1017              
1018             my $sth = $self->_exe_query($stmt, @bind);
1019             my $unfinished;
1020             while (my $feed = $sth->fetchrow_hashref) {
1021             last unless $self->upload_feed($feed);
1022             }
1023             $sth->finish;
1024              
1025             ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
1026             {
1027             shop_id => $self->_unique_shop_id,
1028             amws_job_id => $job_id,
1029             });
1030              
1031             $sth = $self->_exe_query($stmt, @bind);
1032              
1033             my ($total, $success, $aborted) = (0, 0, 0);
1034              
1035             # query again and check if we have aborted jobs;
1036             while (my $feed = $sth->fetchrow_hashref) {
1037             $total++;
1038             $success++ if $feed->{success};
1039             $aborted++ if $feed->{aborted};
1040             }
1041              
1042             # a job was aborted
1043             my $update;
1044             if ($aborted) {
1045             $update = {
1046             aborted => 1,
1047             status => 'Feed error',
1048             };
1049             $self->_print_or_warn_error("Job $job_id aborted!\n");
1050             }
1051             elsif ($success == $total) {
1052             $update = { success => 1 };
1053             print "Job successful!\n";
1054             # if we're here, all the products are fine, so mark them as
1055             # such if it's an upload job
1056             if ($row->{task} eq 'upload') {
1057             $self->_exe_query($self->sqla->update('amazon_mws_products',
1058             { status => 'ok',
1059             listed_date => DateTime->now,
1060             listed => 1,
1061             },
1062             {
1063             amws_job_id => $job_id,
1064             shop_id => $self->_unique_shop_id,
1065             }));
1066             }
1067             }
1068             else {
1069             print "Job still to be processed\n";
1070             }
1071             if ($update) {
1072             $self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
1073             {
1074             amws_job_id => $job_id,
1075             shop_id => $self->_unique_shop_id,
1076             }));
1077             }
1078             }
1079              
1080             =head2 upload_feed($type, $feed_id);
1081              
1082             Routine to upload the feed. Return true if it's complete, false
1083             otherwise.
1084              
1085             =cut
1086              
1087             sub upload_feed {
1088             my ($self, $record) = @_;
1089             my $job_id = $record->{amws_job_id};
1090             my $type = $record->{feed_name};
1091             my $feed_id = $record->{feed_id};
1092             print "Checking $type feed for $job_id\n";
1093             # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_FeedType.html
1094              
1095              
1096             my %names = (
1097             product => '_POST_PRODUCT_DATA_',
1098             inventory => '_POST_INVENTORY_AVAILABILITY_DATA_',
1099             price => '_POST_PRODUCT_PRICING_DATA_',
1100             image => '_POST_PRODUCT_IMAGE_DATA_',
1101             variants => '_POST_PRODUCT_RELATIONSHIP_DATA_',
1102             order_ack => '_POST_ORDER_ACKNOWLEDGEMENT_DATA_',
1103             shipping_confirmation => '_POST_ORDER_FULFILLMENT_DATA_',
1104             );
1105              
1106             die "Unrecognized type $type" unless $names{$type};
1107              
1108             # no feed id, it's a new batch
1109             if (!$feed_id) {
1110             print "No feed id found, doing a request for $job_id $type\n";
1111             my $feed_content = $self->_slurp_file($record->{feed_file});
1112             my $res;
1113             try {
1114             $res = $self->client
1115             ->SubmitFeed(content_type => 'text/xml; charset=utf-8',
1116             FeedType => $names{$type},
1117             FeedContent => $feed_content,
1118             MarketplaceIdList => [$self->marketplace_id],
1119             );
1120             }
1121             catch {
1122             warn "Failure to submit $type feed: \n";
1123             if (ref($_)) {
1124             if ($_->can('xml')) {
1125             warn $_->xml;
1126             }
1127             else {
1128             warn Dumper($_);
1129             }
1130             }
1131             else {
1132             warn $_;
1133             }
1134             };
1135             # do not register the failure on die, because in this case (no
1136             # response) there could be throttling, or network failure
1137             die unless $res;
1138              
1139             # update the feed_id row storing it and updating.
1140             if ($feed_id = $record->{feed_id} = $res->{FeedSubmissionId}) {
1141             $self->_exe_query($self->sqla
1142             ->update(amazon_mws_feeds => $record,
1143             {
1144             amws_feed_pk => $record->{amws_feed_pk},
1145             shop_id => $self->_unique_shop_id,
1146             }));
1147             }
1148             else {
1149             # something is really wrong here, we have to die
1150             die "Couldn't get a submission id, response is " . Dumper($res);
1151             }
1152             }
1153             print "Feed is $feed_id\n";
1154              
1155             if (!$record->{processing_complete}) {
1156             if ($self->_check_processing_complete($feed_id, $type)) {
1157             # update the record and set the flag to true
1158             $self->_exe_query($self->sqla
1159             ->update('amazon_mws_feeds',
1160             { processing_complete => 1 },
1161             {
1162             feed_id => $feed_id,
1163             shop_id => $self->_unique_shop_id,
1164             }));
1165             }
1166             else {
1167             print "Still processing\n";
1168             return;
1169             }
1170             }
1171              
1172             # check if we didn't already processed it
1173             if (!$record->{aborted} || !$record->{success}) {
1174             # we need a class to parse the result.
1175             my $result = $self->submission_result($feed_id);
1176             if ($result->is_success) {
1177             $self->_exe_query($self->sqla
1178             ->update('amazon_mws_feeds',
1179             { success => 1 },
1180             {
1181             feed_id => $feed_id,
1182             shop_id => $self->_unique_shop_id,
1183             }));
1184             # if we have a success, print the warnings on the stderr.
1185             # if we have a failure, the warnings will just confuse us.
1186              
1187             if ($type eq 'order_ack') {
1188             # flip the confirmation bit
1189             $self->_exe_query($self->sqla->update(amazon_mws_orders => { confirmed => 1 },
1190             { amws_job_id => $job_id,
1191             shop_id => $self->_unique_shop_id }));
1192             }
1193             elsif ($type eq 'shipping_confirmation') {
1194             $self->_exe_query($self->sqla->update(amazon_mws_orders => { shipping_confirmation_ok => 1 },
1195             { shipping_confirmation_job_id => $job_id,
1196             shop_id => $self->_unique_shop_id }));
1197             }
1198             if (my $warn = $result->warnings) {
1199             if (my $warns = $result->skus_warnings) {
1200             foreach my $w (@$warns) {
1201             $self->_error_logger(warning => $w->{code},
1202             "$w->{sku}: $w->{error}");
1203             # and register it in the db
1204             if ($w->{sku} && $w->{error}) {
1205             $self->_exe_query($self->sqla->update('amazon_mws_products',
1206             { warnings => "$job_id $w->{code} $w->{error}" },
1207             {
1208             sku => $w->{sku},
1209             shop_id => $self->_unique_shop_id,
1210             }));
1211             }
1212             }
1213             }
1214             else {
1215             warn "$warn\n";
1216             }
1217             }
1218             return 1;
1219             }
1220             else {
1221             foreach my $err ($result->report_errors) {
1222             $self->_error_logger($err->{type},
1223             $err->{code},
1224             $err->{message});
1225             }
1226             $self->_exe_query($self->sqla
1227             ->update('amazon_mws_feeds',
1228             {
1229             aborted => 1,
1230             errors => $result->errors,
1231             },
1232             {
1233             feed_id => $feed_id,
1234             shop_id => $self->_unique_shop_id,
1235             }));
1236             $self->register_errors($job_id, $result);
1237            
1238             if ($type eq 'order_ack') {
1239             $self->register_order_ack_errors($job_id, $result);
1240             }
1241             elsif ($type eq 'shipping_confirmation') {
1242             $self->register_ship_order_errors($job_id, $result);
1243             }
1244            
1245             # and we stop this job, has errors
1246             return 0;
1247             }
1248             }
1249             return $record->{success};
1250             }
1251              
1252             sub _exe_query {
1253             my ($self, $stmt, @bind) = @_;
1254             my $sth = $self->dbh->prepare($stmt);
1255             print $stmt, Dumper(\@bind) if DEBUG;
1256             eval {
1257             $sth->execute(@bind);
1258             };
1259             if ($@) {
1260             die "Failed to execute $stmt with params" . Dumper(\@bind);
1261             }
1262             return $sth;
1263             }
1264              
1265             sub _check_processing_complete {
1266             my ($self, $feed_id, $type) = @_;
1267             my $res;
1268             try {
1269             $res = $self->client->GetFeedSubmissionList;
1270             } catch {
1271             my $exception = $_;
1272             if (ref($exception) && $exception->can('xml')) {
1273             warn "checking processing complete error for $type $feed_id: " . $exception->xml;
1274             }
1275             else {
1276             warn "checking processing complete for $type $feed_id: " . Dumper($exception);
1277             }
1278             };
1279             die unless $res;
1280             print "Checking if the processing is complete for $type $feed_id\n"; # . Dumper($res);
1281             my $found;
1282             if (my $list = $res->{FeedSubmissionInfo}) {
1283             foreach my $feed (@$list) {
1284             if ($feed->{FeedSubmissionId} eq $feed_id) {
1285             $found = $feed;
1286             last;
1287             }
1288             }
1289              
1290             # check the result
1291             if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
1292             return 1;
1293             }
1294             elsif ($found) {
1295             print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
1296             return;
1297             }
1298             else {
1299             # there is a remote possibility that in it in another
1300             # page, but it should be very unlikely, as we block the
1301             # process when the first one is not complete
1302             print "$feed_id not found in submission list\n";
1303             return;
1304             }
1305             }
1306             else {
1307             warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
1308             return;
1309             }
1310             }
1311              
1312             =head2 submission_result($feed_id)
1313              
1314             Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
1315             for the given feed ID.
1316              
1317             =cut
1318              
1319             sub submission_result {
1320             my ($self, $feed_id) = @_;
1321             my $xml;
1322             try {
1323             $xml = $self->client
1324             ->GetFeedSubmissionResult(FeedSubmissionId => $feed_id);
1325             } catch {
1326             my $exception = $_;
1327             if (ref($exception) && $exception->can('xml')) {
1328             warn "submission result error: " . $exception->xml;
1329             }
1330             else {
1331             warn "submission result error: " . Dumper($exception);
1332             }
1333             };
1334             die unless $xml;
1335             return Amazon::MWS::XML::Response::FeedSubmissionResult
1336             ->new(
1337             xml => $xml,
1338             xml_reader => $self->xml_reader,
1339             );
1340             }
1341              
1342             =head2 get_orders($from_date)
1343              
1344             This is a self-contained method and doesn't require a product list.
1345             The from_date must be a L<DateTime> object. If not provided, it will
1346             the last week.
1347              
1348             Returns a list of Amazon::MWS::XML::Order objects.
1349              
1350             Beware that it's possible you get some items with 0 quantity, i.e.
1351             single items cancelled. The application code needs to be prepared to
1352             deal with such phantom items. You can check each order looping over
1353             C<$order->items> checking for C<$item->quantity>.
1354              
1355             =cut
1356              
1357             sub get_orders {
1358             my ($self, $from_date) = @_;
1359             unless ($from_date) {
1360             $from_date = DateTime->now;
1361             $from_date->subtract(days => $self->order_days_range);
1362             }
1363             my @order_structs;
1364             my $res;
1365             try {
1366             $res = $self->client->ListOrders(
1367             MarketplaceId => [$self->marketplace_id],
1368             CreatedAfter => $from_date,
1369             );
1370             push @order_structs, @{ $res->{Orders}->{Order} };
1371             }
1372             catch {
1373             die Dumper($_);
1374             };
1375             while (my $next_token = $res->{NextToken}) {
1376             # print "Found next token!\n";
1377             try {
1378             $res = $self->client->ListOrdersByNextToken(NextToken => $next_token);
1379             push @order_structs, @{ $res->{Orders}->{Order} };
1380             }
1381             catch {
1382             die Dumper($_);
1383             };
1384             }
1385             my @orders;
1386             foreach my $order (@order_structs) {
1387             my $amws_id = $order->{AmazonOrderId};
1388             die "Missing amazon AmazonOrderId?!" unless $amws_id;
1389              
1390             my $get_orderline = sub {
1391             # begin of the closure
1392             my $orderline;
1393             my @items;
1394             try {
1395             $orderline = $self->client->ListOrderItems(AmazonOrderId => $amws_id);
1396             push @items, @{ $orderline->{OrderItems}->{OrderItem} };
1397             }
1398             catch {
1399             my $err = $_;
1400             if (blessed($err) && $err->isa('Amazon::MWS::Exception::Throttled')) {
1401             die "Request is throttled. Consider to adjust order_days_range as documented at https://metacpan.org/pod/Amazon::MWS::Uploader#ACCESSORS";
1402             }
1403             else {
1404             die Dumper($err);
1405             }
1406             };
1407             while (my $next = $orderline->{NextToken}) {
1408             try {
1409             $orderline =
1410             $self->client->ListOrderItemsByNextToken(NextToken => $next);
1411             push @items, @{ $orderline->{OrderItems}->{OrderItem} };
1412             }
1413             catch {
1414             die Dumper($_);
1415             };
1416             }
1417             return \@items;
1418             # end of the closure
1419             };
1420              
1421             push @orders, Amazon::MWS::XML::Order->new(order => $order,
1422             retrieve_orderline_sub => $get_orderline);
1423             }
1424             return @orders;
1425             }
1426              
1427             =head2 order_already_registered($order)
1428              
1429             Check in the amazon_mws_orders table if we already registered this
1430             order.
1431              
1432             Return the row for this table (as an hashref) if present, nothing
1433             underwise.
1434              
1435             =cut
1436              
1437             sub order_already_registered {
1438             my ($self, $order) = @_;
1439             die "Bad usage, missing order" unless $order;
1440             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => '*',
1441             {
1442             amazon_order_id => $order->amazon_order_number,
1443             shop_id => $self->_unique_shop_id,
1444             }));
1445             if (my $exists = $sth->fetchrow_hashref) {
1446             $sth->finish;
1447             return $exists;
1448             }
1449             else {
1450             return;
1451             }
1452             }
1453              
1454             =head2 acknowledge_successful_order(@orders)
1455              
1456             Accept a list of L<Amazon::MWS::XML::Order> objects, prepare a
1457             acknowledge feed with the C<Success> status, and insert the orders in
1458             the database.
1459              
1460             =cut
1461              
1462             sub acknowledge_successful_order {
1463             my ($self, @orders) = @_;
1464             my @orders_to_register;
1465             foreach my $ord (@orders) {
1466             if (my $existing = $self->order_already_registered($ord)) {
1467             if ($existing->{confirmed}) {
1468             print "Skipping already confirmed order $existing->{amazon_order_id} => $existing->{shop_order_id}\n";
1469             }
1470             else {
1471             # it's not complete, so print out diagnostics
1472             warn "Order $existing->{amazon_order_id} uncompletely registered with id $existing->{shop_order_id}, please indagate why (skipping)\n" . Dumper($existing);
1473             }
1474             }
1475             else {
1476             push @orders_to_register, $ord;
1477             }
1478             }
1479             return unless @orders_to_register;
1480              
1481             my $feed_content = $self->acknowledge_feed(Success => @orders_to_register);
1482             # here we have only one feed to upload and check
1483             my $job_id = $self->prepare_feeds(order_ack => [{
1484             name => 'order_ack',
1485             content => $feed_content,
1486             }]);
1487             # store the pairing amazon order id / shop order id in our table
1488             foreach my $order (@orders_to_register) {
1489             my %order_pairs = (
1490             shop_id => $self->_unique_shop_id,
1491             amazon_order_id => $order->amazon_order_number,
1492             # this will die if we try to insert an undef order_number
1493             shop_order_id => $order->order_number,
1494             amws_job_id => $job_id,
1495             );
1496             $self->_exe_query($self->sqla->insert(amazon_mws_orders => \%order_pairs));
1497             }
1498             }
1499              
1500              
1501             =head2 acknowledge_feed($status, @orders)
1502              
1503             The first argument is usually C<Success>. The other arguments is a
1504             list of L<Amazon::MWS::XML::Order> objects.
1505              
1506             =cut
1507              
1508              
1509             sub acknowledge_feed {
1510             my ($self, $status, @orders) = @_;
1511             die "Missing status" unless $status;
1512             die "Missing orders" unless @orders;
1513              
1514             my $feeder = $self->generic_feeder;
1515              
1516             my $counter = 1;
1517             my @messages;
1518             foreach my $order (@orders) {
1519             my $data = $order->as_ack_order_hashref;
1520             $data->{StatusCode} = $status;
1521             push @messages, {
1522             MessageID => $counter++,
1523             OrderAcknowledgement => $data,
1524             };
1525             }
1526             return $feeder->create_feed(OrderAcknowledgement => \@messages);
1527             }
1528              
1529             =head2 delete_skus(@skus)
1530              
1531             Accept a list of skus. Prepare a C<product_deletion> feed and update
1532             the database.
1533              
1534             =cut
1535              
1536             sub delete_skus {
1537             my ($self, @skus) = @_;
1538             return unless @skus;
1539             print "Trying to purge missing items " . join(" ", @skus) . "\n";
1540              
1541             # delete only products which are not in pending status
1542             my $check = $self
1543             ->_exe_query($self->sqla
1544             ->select('amazon_mws_products', [qw/sku status/],
1545             {
1546             sku => { -in => \@skus },
1547             shop_id => $self->_unique_shop_id,
1548             }));
1549             my %our_skus;
1550             while (my $p = $check->fetchrow_hashref) {
1551             $our_skus{$p->{sku}} = $p->{status};
1552             }
1553             my @checked;
1554             while (@skus) {
1555             my $sku = shift @skus;
1556             if (my $status = $our_skus{$sku}) {
1557             if ($status eq 'pending' or
1558             $status eq 'deleted') {
1559             print "Skipping $sku deletion, in status $status\n";
1560             next;
1561             }
1562             }
1563             else {
1564             warn "$sku not found in amazon_mws_products, deleting anyway\n";
1565             }
1566             push @checked, $sku;
1567             }
1568             @skus = @checked;
1569              
1570             unless (@skus) {
1571             print "Not purging anything\n";
1572             return;
1573             }
1574             print "Actually purging items " . join(" ", @skus) . "\n";
1575              
1576             my $feed_content = $self->delete_skus_feed(@skus);
1577             my $job_id = $self->prepare_feeds(product_deletion => [{
1578             name => 'product',
1579             content => $feed_content,
1580             }] );
1581             # delete the skus locally
1582             $self->_exe_query($self->sqla->update('amazon_mws_products',
1583             {
1584             status => 'deleted',
1585             amws_job_id => $job_id,
1586             },
1587             {
1588             sku => { -in => \@skus },
1589             shop_id => $self->_unique_shop_id,
1590             }));
1591             }
1592              
1593             =head2 delete_skus_feed(@skus)
1594              
1595             Prepare a feed (via C<create_feed>) to delete the given skus.
1596              
1597             =cut
1598              
1599             sub delete_skus_feed {
1600             my ($self, @skus) = @_;
1601             return unless @skus;
1602             my $feeder = $self->generic_feeder;
1603             my $counter = 1;
1604             my @messages;
1605             foreach my $sku (@skus) {
1606             push @messages, {
1607             MessageID => $counter++,
1608             OperationType => 'Delete',
1609             Product => {
1610             SKU => $sku,
1611             }
1612             };
1613             }
1614             return $feeder->create_feed(Product => \@messages);
1615             }
1616              
1617             sub register_order_ack_errors {
1618             my ($self, $job_id, $result) = @_;
1619             my @errors = $result->report_errors;
1620             # we hope to have just one error, but in case...
1621             my %update;
1622             if (@errors > 1) {
1623             my @errors_with_code = grep { $_->{code} } @errors;
1624             my $error_code = AMW_ORDER_WILDCARD_ERROR;
1625             if (@errors_with_code) {
1626             # pick just the first, the field is an integer
1627             $error_code = $errors_with_code[0]{code};
1628             }
1629             my $error_msgs = join('\n', map { $_->{type} . ' ' . $_->{message} . ' ' . $_->{code} } @errors);
1630             %update = (
1631             error_msg => $error_msgs,
1632             error_code => $error_code,
1633             );
1634             }
1635             elsif (@errors) {
1636             my $error = shift @errors;
1637             %update = (
1638             error_msg => $error->{type} . ' ' . $_->{message} . ' ' . $_->{code},
1639             error_code => $error->{code},
1640             );
1641             }
1642             else {
1643             %update = (
1644             error_msg => $result->errors,
1645             error_code => AMW_ORDER_WILDCARD_ERROR,
1646             );
1647             }
1648             if (%update) {
1649             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1650             \%update,
1651             { amws_job_id => $job_id,
1652             shop_id => $self->_unique_shop_id }));
1653             }
1654             else {
1655             warn "register_order_ack_errors couldn't parse " . Dumper($result);
1656             }
1657             # then get the amazon order number and recheck
1658             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
1659             [qw/amazon_order_id
1660             shop_order_id
1661             /],
1662             {
1663             amws_job_id => $job_id,
1664             shop_id => $self->_unique_shop_id,
1665             }));
1666             my ($amw_order_number, $shop_order_id) = $sth->fetchrow_array;
1667             if ($sth->fetchrow_array) {
1668             warn "Multiple jobs found for $job_id in amazon_mws_orders!";
1669             }
1670             $sth->finish;
1671             if (my $status = $self->update_amw_order_status($amw_order_number)) {
1672             warn "$amw_order_number ($shop_order_id) has now status $status!\n";
1673             }
1674             }
1675              
1676             sub register_ship_order_errors {
1677             my ($self, $job_id, $result) = @_;
1678             # here we get the amazon ids,
1679             my @orders = $self->orders_in_shipping_job($job_id);
1680             my $errors = $result->orders_errors;
1681             # filter
1682             my @errors_with_order = grep { $_->{order_id} } @$errors;
1683             my %errs = map { $_->{order_id} => {job_id => $job_id, code => $_->{code}, error => $_->{error}} } @errors_with_order;
1684              
1685             foreach my $ord (@orders) {
1686             if (my $fault = $errs{$ord}) {
1687             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1688             {
1689             shipping_confirmation_error => "$fault->{code} $fault->{error}",
1690             },
1691             {
1692             amazon_order_id => $ord,
1693             shipping_confirmation_job_id => $job_id,
1694             shop_id => $self->_unique_shop_id,
1695             }));
1696             }
1697             else {
1698             # this looks good
1699             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1700             {
1701             shipping_confirmation_ok => 1,
1702             },
1703             {
1704             amazon_order_id => $ord,
1705             shipping_confirmation_job_id => $job_id,
1706             shop_id => $self->_unique_shop_id
1707             }));
1708             }
1709             }
1710             }
1711              
1712              
1713             =head2 register_errors($job_id, $result)
1714              
1715             The first argument is the job ID. The second is a
1716             L<Amazon::MWS::XML::Response::FeedSubmissionResult> object.
1717              
1718             This method will update the status of the products (either C<failed>
1719             or C<redo>) in C<amazon_mws_products>.
1720              
1721             =head2 register_order_ack_errors($job_id, $result);
1722              
1723             Same arguments as above, but for order acknowledgements.
1724              
1725             =head2 register_ship_order_errors($job_id, $result);
1726              
1727             Same arguments as above, but for shipping notifications.
1728              
1729             =cut
1730              
1731             sub register_errors {
1732             my ($self, $job_id, $result) = @_;
1733             # first, get the list of all the skus which were scheduled for this job
1734             # we don't have a products hashref anymore.
1735             # probably we could parse back the produced xml, but looks like an overkill.
1736             # just mark them as redo and wait for the next cron call.
1737             my @products = $self->skus_in_job($job_id);
1738             my $errors = $result->skus_errors;
1739             my @errors_with_sku = grep { $_->{sku} } @$errors;
1740             # turn it into an hash
1741             my %errs = map { $_->{sku} => {job_id => $job_id, code => $_->{code}, error => $_->{error}} } @errors_with_sku;
1742              
1743             foreach my $sku (@products) {
1744             if ($errs{$sku}) {
1745             $self->_exe_query($self->sqla->update('amazon_mws_products',
1746             {
1747             status => 'failed',
1748             error_code => $errs{$sku}->{code},
1749             error_msg => "$errs{$sku}->{job_id} $errs{$sku}->{code} $errs{$sku}->{error}",
1750             },
1751             {
1752             sku => $sku,
1753             shop_id => $self->_unique_shop_id,
1754             }));
1755             }
1756             else {
1757             # this is good, mark it to be redone
1758             $self->_exe_query($self->sqla->update('amazon_mws_products',
1759             {
1760             status => 'redo',
1761             },
1762             {
1763             sku => $sku,
1764             shop_id => $self->_unique_shop_id,
1765             }));
1766             print "Scheduling $sku for redoing\n";
1767             }
1768             }
1769             }
1770              
1771             =head2 skus_in_job($job_id)
1772              
1773             Check the amazon_mws_product for the SKU which were uploaded by the
1774             given job ID.
1775              
1776             =cut
1777              
1778             sub skus_in_job {
1779             my ($self, $job_id) = @_;
1780             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_products',
1781             [qw/sku/],
1782             {
1783             amws_job_id => $job_id,
1784             shop_id => $self->_unique_shop_id,
1785             }));
1786             my @skus;
1787             while (my $row = $sth->fetchrow_hashref) {
1788             push @skus, $row->{sku};
1789             }
1790             return @skus;
1791             }
1792              
1793             =head2 get_asin_for_eans(@eans)
1794              
1795             Accept a list of EANs and return an hashref where the keys are the
1796             eans passed as arguments, and the values are the ASIN for the current
1797             marketplace. Max EANs: 5.x
1798              
1799             http://docs.developer.amazonservices.com/en_US/products/Products_GetMatchingProductForId.html
1800              
1801             =head2 get_asin_for_skus(@skus)
1802              
1803             Same as above (with the same limit of 5 items), but for SKUs.
1804              
1805             =head2 get_asin_for_sku($sku)
1806              
1807             Same as above, but for a single sku. Return the ASIN or undef if not
1808             found.
1809              
1810             =head2 get_asin_for_ean($ean)
1811              
1812             Same as above, but for a single ean. Return the ASIN or undef if not
1813             found.
1814              
1815             =cut
1816              
1817             sub get_asin_for_skus {
1818             my ($self, @skus) = @_;
1819             return $self->_get_asin_for_type(SellerSKU => @skus);
1820             }
1821              
1822             sub get_asin_for_eans {
1823             my ($self, @eans) = @_;
1824             return $self->_get_asin_for_type(EAN => @eans);
1825             }
1826              
1827             sub _get_asin_for_type {
1828             my ($self, $type, @list) = @_;
1829             die "Max 5 products to get the asin for $type!" if @list > 5;
1830             my $client = $self->client;
1831             my $res;
1832             try {
1833             $res = $client->GetMatchingProductForId(IdType => $type,
1834             IdList => \@list,
1835             MarketplaceId => $self->marketplace_id);
1836             }
1837             catch { die Dumper($_) };
1838             my %ids;
1839             if ($res && @$res) {
1840             foreach my $product (@$res) {
1841             $ids{$product->{Id}} = $product->{Products}->{Product}->{Identifiers}->{MarketplaceASIN}->{ASIN};
1842             }
1843             }
1844             return \%ids;
1845             }
1846              
1847             sub get_asin_for_ean {
1848             my ($self, $ean) = @_;
1849             my $res = $self->get_asin_for_eans($ean);
1850             if ($res && $res->{$ean}) {
1851             return $res->{$ean};
1852             }
1853             else {
1854             return;
1855             }
1856             }
1857              
1858             sub get_asin_for_sku {
1859             my ($self, $sku) = @_;
1860             my $res = $self->get_asin_for_skus($sku);
1861             if ($res && $res->{$sku}) {
1862             return $res->{$sku};
1863             }
1864             else {
1865             return;
1866             }
1867             }
1868              
1869              
1870             =head2 get_product_category_data($ean)
1871              
1872             Return the deep data structures returned by
1873             C<GetProductCategoriesForASIN>.
1874              
1875             =head2 get_product_categories($ean)
1876              
1877             Return a list of category codes (the ones passed to
1878             RecommendedBrowseNode) which exists on amazon.
1879              
1880             =cut
1881              
1882             sub get_product_category_data {
1883             my ($self, $ean) = @_;
1884             return unless $ean;
1885             my $asin = $self->get_asin_for_ean($ean);
1886             unless ($asin) {
1887             return;
1888             }
1889             my $res = $self->client
1890             ->GetProductCategoriesForASIN(ASIN => $asin,
1891             MarketplaceId => $self->marketplace_id);
1892             return $res;
1893             }
1894              
1895             =head2 get_product_category_names($ean)
1896              
1897             Return a list of arrayrefs with the category paths. Beware that we
1898             strip the first two parents, which euristically appear meaningless
1899             (Category/Category).
1900              
1901             If this is not a case, please report this as a bug and we'll find a
1902             solution.
1903              
1904             You can call C<get_product_category_data> to inspect the raw response
1905             yourself.
1906              
1907             =cut
1908              
1909             sub get_product_category_names {
1910             my ($self, $ean) = @_;
1911             my $res = $self->get_product_category_data($ean);
1912             if ($res) {
1913             my @category_names;
1914             foreach my $cat (@$res) {
1915             my @list = $self->_parse_amz_cat($cat);
1916             if (@list) {
1917             push @category_names, \@list;
1918             }
1919              
1920             }
1921             return @category_names;
1922             }
1923             else {
1924             warn "ASIN exists but no categories found. Bug?\n";
1925             return;
1926             }
1927             }
1928              
1929             sub _parse_amz_cat {
1930             my ($self, $cat) = @_;
1931             my @path;
1932             while ($cat) {
1933             if ($cat->{ProductCategoryName}) {
1934             push @path, $cat->{ProductCategoryName};
1935             }
1936             $cat = $cat->{Parent};
1937             }
1938             @path = reverse @path;
1939              
1940             # the first two parents are Category/Category.
1941             if (@path >= 2) {
1942             splice(@path, 0, 2);
1943             }
1944             return @path;
1945             }
1946              
1947              
1948             sub get_product_categories {
1949             my ($self, $ean) = @_;
1950             my $res = $self->get_product_category_data($ean);
1951             if ($res) {
1952             my @ids = map { $_->{ProductCategoryId} } @$res;
1953             return @ids;
1954             }
1955             else {
1956             warn "ASIN exists but no categories found. Bug?\n";
1957             return;
1958             }
1959             }
1960              
1961              
1962             # http://docs.developer.amazonservices.com/en_US/products/Products_GetLowestOfferListingsForSKU.html
1963              
1964             =head2 get_lowest_price_for_asin($asin, $condition)
1965              
1966             Return the lowest price for asin, excluding ourselves. The second
1967             argument, condition, is optional and defaults to "New".
1968              
1969             If you need the full details, you have to call
1970             $self->client->GetLowestOfferListingsForASIN yourself and make sense
1971             of the output. This method is mostly a wrapper meant to simplify the
1972             routine.
1973              
1974             If we can't get any info, just return undef.
1975              
1976             Return undef if no prices are found.
1977              
1978             =head2 get_lowest_price_for_ean($ean, $condition)
1979              
1980             Same as above, but use the EAN instead
1981              
1982             =cut
1983              
1984             sub get_lowest_price_for_ean {
1985             my ($self, $ean, $condition) = @_;
1986             return unless $ean;
1987             my $asin = $self->get_asin_for_ean($ean);
1988             return unless $asin;
1989             return $self->get_lowest_price_for_asin($asin, $condition);
1990             }
1991              
1992             sub get_lowest_price_for_asin {
1993             my ($self, $asin, $condition) = @_;
1994             die "Wrong usage, missing argument asin" unless $asin;
1995             my $listing;
1996             try { $listing = $self->client
1997             ->GetLowestOfferListingsForASIN(
1998             ASINList => [ $asin ],
1999             MarketplaceId => $self->marketplace_id,
2000             ExcludeMe => 1,
2001             ItemCondition => $condition || 'New',
2002             );
2003             }
2004             catch { die Dumper($_) };
2005              
2006             return unless $listing && @$listing;
2007             my $lowest;
2008             foreach my $item (@$listing) {
2009             my $current = $item->{Price}->{LandedPrice}->{Amount};
2010             $lowest ||= $current;
2011             if ($current < $lowest) {
2012             $lowest = $current;
2013             }
2014             }
2015             return $lowest;
2016             }
2017              
2018             =head2 shipping_confirmation_feed(@shipped_orders)
2019              
2020             Return a feed string with the shipping confirmation. A list of
2021             L<Amazon::MWS::XML::ShippedOrder> object must be passed.
2022              
2023             =cut
2024              
2025             sub shipping_confirmation_feed {
2026             my ($self, @shipped_orders) = @_;
2027             die "Missing Amazon::MWS::XML::ShippedOrder argument" unless @shipped_orders;
2028             my $feeder = $self->generic_feeder;
2029             my $counter = 1;
2030             my @messages;
2031             foreach my $order (@shipped_orders) {
2032             push @messages, {
2033             MessageID => $counter++,
2034             OrderFulfillment => $order->as_shipping_confirmation_hashref,
2035             };
2036             }
2037             return $feeder->create_feed(OrderFulfillment => \@messages);
2038              
2039             }
2040              
2041             =head2 send_shipping_confirmation($shipped_orders)
2042              
2043             Schedule the shipped orders (an L<Amazon::MWS::XML::ShippedOrder>
2044             object) for the uploading.
2045              
2046             =head2 order_already_shipped($shipped_order)
2047              
2048             Check if the shipped orders (an L<Amazon::MWS::XML::ShippedOrder> was
2049             already notified as shipped looking into our table, returning the row
2050             with the order.
2051              
2052             To see the status, check shipping_confirmation_ok (already done),
2053             shipping_confirmation_error (faulty), shipping_confirmation_job_id (pending).
2054              
2055             =cut
2056              
2057             sub order_already_shipped {
2058             my ($self, $order) = @_;
2059             my $condition = $self->_condition_for_shipped_orders($order);
2060             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => '*', $condition));
2061             if (my $row = $sth->fetchrow_hashref) {
2062             die "Multiple results found in amazon_mws_orders for " . Dumper($condition)
2063             if $sth->fetchrow_hashref;
2064             return $row;
2065             }
2066             else {
2067             return;
2068             }
2069             }
2070              
2071             sub send_shipping_confirmation {
2072             my ($self, @orders) = @_;
2073             my @orders_to_notify;
2074             foreach my $ord (@orders) {
2075             if (my $report = $self->order_already_shipped($ord)) {
2076             if ($report->{shipping_confirmation_ok}) {
2077             print "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id}: already notified\n";
2078             }
2079             elsif (my $error = $report->{shipping_confirmation_error}) {
2080             if ($self->reset_all_errors) {
2081             warn "Submitting again previously failed job $report->{amazon_order_id} $report->{shop_order_id}\n";
2082             push @orders_to_notify, $ord;
2083             }
2084             else {
2085             warn "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id} with error $error\n";
2086             }
2087             }
2088             elsif ($report->{shipping_confirmation_job_id}) {
2089             print "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id}: pending\n";
2090             }
2091             else {
2092             push @orders_to_notify, $ord;
2093             }
2094             }
2095             else {
2096             die "It looks like you are trying to send a shipping confirmation "
2097             . " without prior order acknowlegdement. "
2098             . "At least in the amazon_mws_orders there is no trace of "
2099             . "$report->{amazon_order_id} $report->{shop_order_id}";
2100             }
2101             }
2102             return unless @orders_to_notify;
2103             my $feed_content = $self->shipping_confirmation_feed(@orders_to_notify);
2104             # here we have only one feed to upload and check
2105             my $job_id = $self->prepare_feeds(shipping_confirmation => [{
2106             name => 'shipping_confirmation',
2107             content => $feed_content,
2108             }]);
2109             # and store the job id in the table
2110             foreach my $ord (@orders_to_notify) {
2111             $self->_exe_query($self->sqla->update(amazon_mws_orders => {
2112             shipping_confirmation_job_id => $job_id,
2113             shipping_confirmation_error => undef,
2114             },
2115             $self->_condition_for_shipped_orders($ord)));
2116             }
2117             }
2118              
2119             sub _condition_for_shipped_orders {
2120             my ($self, $order) = @_;
2121             die "Missing order" unless $order;
2122             my %condition = (shop_id => $self->_unique_shop_id);
2123             if (my $amazon_order_id = $order->amazon_order_id) {
2124             $condition{amazon_order_id} = $amazon_order_id;
2125             }
2126             elsif (my $order_id = $order->merchant_order_id) {
2127             $condition{shop_order_id} = $order_id;
2128             }
2129             else {
2130             die "Missing amazon_order_id or merchant_order_id";
2131             }
2132             return \%condition;
2133             }
2134              
2135              
2136             =head2 orders_waiting_for_shipping
2137              
2138             Return a list of hashref with two keys, C<amazon_order_id> and
2139             C<shop_order_id> for each order which is waiting confirmation.
2140              
2141             This is implemented looking into amazon_mws_orders where there is no
2142             shipping confirmation job id.
2143              
2144             The confirmed flag (which means we acknowledged the order) is ignored
2145             to avoid stuck order_ack jobs to prevent the shipping confirmation.
2146              
2147             =cut
2148              
2149             sub orders_waiting_for_shipping {
2150             my $self = shift;
2151             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
2152             [qw/amazon_order_id
2153             shop_order_id/],
2154             {
2155             shop_id => $self->_unique_shop_id,
2156             shipping_confirmation_job_id => undef,
2157             # do not stop the unconfirmed to be considered
2158             # confirmed => 1,
2159             }));
2160             my @out;
2161             while (my $row = $sth->fetchrow_hashref) {
2162             push @out, $row;
2163             }
2164             return @out;
2165             }
2166              
2167             =head2 product_needs_upload($sku, $timestamp)
2168              
2169             Lookup the product $sku with timestamp $timestamp and return the sku
2170             if the product needs to be uploaded or can be safely skipped. This
2171             method is stateless and doesn't alter anything.
2172              
2173             =cut
2174              
2175             sub product_needs_upload {
2176             my ($self, $sku, $timestamp) = @_;
2177             my $debug = $self->debug;
2178             return unless $sku;
2179              
2180             my $forced = $self->_force_hashref;
2181             # if it's forced, we have nothing to check, just pass it.
2182             if ($forced->{$sku}) {
2183             print "Forcing $sku as requested\n" if $debug;
2184             return $sku;
2185             }
2186              
2187             $timestamp ||= 0;
2188             my $existing = $self->existing_products;
2189              
2190             if (exists $existing->{$sku}) {
2191             if (my $exists = $existing->{$sku}) {
2192              
2193             my $status = $exists->{status} || '';
2194              
2195             if ($status eq 'ok') {
2196             if ($exists->{timestamp_string} eq $timestamp) {
2197             return;
2198             }
2199             else {
2200             return $sku;
2201             }
2202             }
2203             elsif ($status eq 'redo') {
2204             return $sku;
2205             }
2206             elsif ($status eq 'failed') {
2207             if ($self->reset_all_errors) {
2208             return $sku;
2209             }
2210             elsif (my $reset = $self->_reset_error_structure) {
2211             # option for this error was passed.
2212             my $error = $exists->{error_code};
2213             my $match = $reset->{codes}->{$error};
2214             if (($match && $reset->{negate}) or
2215             (!$match && !$reset->{negate})) {
2216             # was passed !this error or !random , so do not reset
2217             print "Skipping failed item $sku with error code $error\n" if $debug;
2218             return;
2219             }
2220             else {
2221             # otherwise reset
2222             print "Resetting error for $sku with error code $error\n" if $debug;
2223             return $sku;
2224             }
2225             }
2226             else {
2227             print "Skipping failed item $sku\n" if $debug;
2228             return;
2229             }
2230             }
2231             elsif ($status eq 'pending') {
2232             print "Skipping pending item $sku\n" if $debug;
2233             return;
2234             }
2235             die "I shouldn't have reached this point with status <$status>";
2236             }
2237             }
2238             print "$sku wasn't uploaded so far, scheduling it\n" if $debug;
2239             return $sku;
2240             }
2241              
2242             =head2 orders_in_shipping_job($job_id)
2243              
2244             Lookup the C<amazon_mws_orders> table and return a list of
2245             C<amazon_order_id> for the given shipping confirmation job. INTERNAL.
2246              
2247             =cut
2248              
2249             sub orders_in_shipping_job {
2250             my ($self, $job_id) = @_;
2251             die unless $job_id;
2252             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => [qw/amazon_order_id/],
2253             {
2254             shipping_confirmation_job_id => $job_id,
2255             shop_id => $self->_unique_shop_id,
2256             }));
2257             my @orders;
2258             while (my $row = $sth->fetchrow_hashref) {
2259             push @orders, $row->{amazon_order_id};
2260             }
2261             return @orders;
2262             }
2263              
2264             =head2 put_product_on_error(sku => $sku, timestamp_string => $timestamp, error_code => $error_code, error_msg => $error)
2265              
2266             Register a custom error for the product $sku with error $error and
2267             $timestamp as the timestamp string. The error is optional, and will be
2268             "shop error" if not provided. The error code will be 1 if not provided.
2269              
2270             =cut
2271              
2272             sub put_product_on_error {
2273             my ($self, %product) = @_;
2274             die "Missing sku" unless $product{sku};
2275             die "Missing timestamp" unless defined $product{timestamp_string};
2276              
2277             my %identifier = (
2278             shop_id => $self->_unique_shop_id,
2279             sku => $product{sku},
2280             );
2281             my %errors = (
2282             status => 'failed',
2283             error_msg => $product{error_msg} || 'shop error',
2284             error_code => $product{error_code} || 1,
2285             timestamp_string => $product{timestamp_string},
2286             );
2287              
2288              
2289             # check if we have it
2290             my $sth = $self->_exe_query($self->sqla
2291             ->select('amazon_mws_products',
2292             [qw/sku/], { %identifier }));
2293             if ($sth->fetchrow_hashref) {
2294             $sth->finish;
2295             print "Updating $product{sku} with error $product{error_msg}\n";
2296             $self->_exe_query($self->sqla->update('amazon_mws_products',
2297             \%errors, \%identifier));
2298             }
2299             else {
2300             print "Inserting $identifier{sku} with error $errors{error_msg}\n";
2301             $self->_exe_query($self->sqla
2302             ->insert('amazon_mws_products',
2303             {
2304             %identifier,
2305             %errors,
2306             }));
2307             }
2308             }
2309              
2310              
2311             =head2 cancel_feed($feed_id)
2312              
2313             Call the CancelFeedSubmissions API and abort the feed and the
2314             belonging job if found in the list. Return the response, which
2315             probably is not even meaningful. It is a big FeedSubmissionInfo with
2316             the past feed submissions.
2317              
2318             =cut
2319              
2320             sub cancel_feed {
2321             my ($self, $feed) = @_;
2322             die "Missing feed id argument" unless $feed;
2323             # do the api call
2324             my $sth = $self->_exe_query($self->sqla
2325             ->select(amazon_mws_feeds => [qw/amws_job_id/],
2326             {
2327             shop_id => $self->_unique_shop_id,
2328             feed_id => $feed,
2329             aborted => 0,
2330             success => 0,
2331             processing_complete => 0,
2332             }));
2333             my $feed_record = $sth->fetchrow_hashref;
2334             if ($feed_record) {
2335             $sth->finish;
2336             print "Found $feed in pending state\n";
2337             # abort it on our side
2338             $self->_exe_query($self->sqla
2339             ->update('amazon_mws_feeds',
2340             {
2341             aborted => 1,
2342             errors => "Canceled by shop action",
2343             },
2344             {
2345             feed_id => $feed,
2346             shop_id => $self->_unique_shop_id,
2347             }));
2348             # and abort the job as well
2349             $self->_exe_query($self->sqla
2350             ->update('amazon_mws_jobs',
2351             {
2352             aborted => 1,
2353             status => "Job aborted by cancel_feed",
2354             },
2355             {
2356             amws_job_id => $feed_record->{amws_job_id},
2357             shop_id => $self->_unique_shop_id,
2358             }));
2359             # and set the belonging products to redo
2360             $self->_exe_query($self->sqla
2361             ->update('amazon_mws_products',
2362             {
2363             status => 'redo',
2364             },
2365             {
2366             amws_job_id => $feed_record->{amws_job_id},
2367             shop_id => $self->_unique_shop_id,
2368             }));
2369             }
2370             else {
2371             warn "No $feed found in pending list, trying to cancel anyway\n";
2372             }
2373             return $self->client->CancelFeedSubmissions(IdList => [ $feed ]);
2374             }
2375              
2376             sub _error_logger {
2377             my ($self, $error_or_warning, $error_code, $message) = @_;
2378             my $mode = 'warn';
2379             my $modes = $self->skus_warnings_modes;
2380             my $out_message = "$error_or_warning: $message ($error_code)\n";
2381             # hardcode 8008 as print
2382             $modes->{8008} = 'print';
2383             if (exists $modes->{$error_code}) {
2384             $mode = $modes->{$error_code};
2385             }
2386             if ($mode eq 'print') {
2387             print $out_message;
2388             }
2389             elsif ($mode eq 'warn') {
2390             warn $out_message;
2391             }
2392             elsif ($mode eq 'skip') {
2393             # do nothing
2394             }
2395             else {
2396             warn "Invalid mode $mode for $out_message";
2397             }
2398             }
2399              
2400             =head2 update_amw_order_status($amazon_order_number)
2401              
2402             Check the order status on Amazon and update the row in the
2403             amazon_mws_orders table.
2404              
2405             =cut
2406              
2407             sub update_amw_order_status {
2408             my ($self, $order) = @_;
2409             # first, check if it exists
2410             return unless $order;
2411             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
2412             '*',
2413             {
2414             amazon_order_id => $order,
2415             shop_id => $self->_unique_shop_id,
2416             }));
2417             my $order_ref = $sth->fetchrow_hashref;
2418             die "Multiple rows found for $order!" if $sth->fetchrow_hashref;
2419             print Dumper($order_ref);
2420             my $res = $self->client->GetOrder(AmazonOrderId => [$order]);
2421             my $amazon_order;
2422             if ($res && $res->{Orders}->{Order} && @{$res->{Orders}->{Order}}) {
2423             if (@{$res->{Orders}->{Order}} > 1) {
2424             warn "Multiple results for order $order: " . Dumper($res);
2425             }
2426             $amazon_order = $res->{Orders}->{Order}->[0];
2427             }
2428             else {
2429             warn "Order $order not found on amazon!"
2430             }
2431             print Dumper($amazon_order);
2432             my $obj = Amazon::MWS::XML::Order->new(order => $amazon_order);
2433             my $status = $obj->order_status;
2434             $self->_exe_query($self->sqla->update('amazon_mws_orders',
2435             { status => $status },
2436             {
2437             amazon_order_id => $order,
2438             shop_id => $self->_unique_shop_id,
2439             }));
2440             return $status;
2441             }
2442              
2443             =head2 get_products_with_error_code(@error_codes)
2444              
2445             Return a list of hashref with the rows from C<amazon_mws_products> for
2446             the current shop and the error code passed as argument. If no error
2447             codes are passed, fetch all the products in error.
2448              
2449             =head2 get_products_with_warnings
2450              
2451             Returns a list of hashref, with C<sku> and C<warnings> as keys, for
2452             each product in the shop which has the warnings set to something.
2453              
2454             =cut
2455              
2456             sub get_products_with_error_code {
2457             my ($self, @errcodes) = @_;
2458             my $where = { '>' => 0 };
2459             if (@errcodes) {
2460             $where = { -in => \@errcodes };
2461             }
2462             my $sth = $self->_exe_query($self->sqla
2463             ->select('amazon_mws_products', '*',
2464             {
2465             status => { '!=' => 'deleted' },
2466             shop_id => $self->_unique_shop_id,
2467             error_code => $where,
2468             },
2469             [qw/error_code sku/]));
2470             my @records;
2471             while (my $row = $sth->fetchrow_hashref) {
2472             push @records, $row;
2473             }
2474             return @records;
2475             }
2476              
2477             sub get_products_with_warnings {
2478             my $self = shift;
2479             my $sth = $self->_exe_query($self->sqla
2480             ->select('amazon_mws_products', '*',
2481             {
2482             status => 'ok',
2483             shop_id => $self->_unique_shop_id,
2484             warnings => { '!=' => '' },
2485             },
2486             [qw/sku warnings/]));
2487             my @records;
2488             while (my $row = $sth->fetchrow_hashref) {
2489             push @records, $row;
2490             }
2491             return @records;
2492             }
2493              
2494             =head2 mark_failed_products_as_redo(@skus)
2495              
2496             Alter the status of the failed skus passed as argument from 'failed'
2497             to 'redo' to trigger an update.
2498              
2499             =cut
2500              
2501             sub mark_failed_products_as_redo {
2502             my ($self, @skus) = @_;
2503             return unless @skus;
2504             $self->_exe_query($self->sqla->update('amazon_mws_products',
2505             {
2506             status => 'redo',
2507             },
2508             {
2509             shop_id => $self->_unique_shop_id,
2510             status => 'failed',
2511             sku => { -in => \@skus },
2512             }));
2513             }
2514              
2515             =head2 get_products_with_amazon_shop_mismatches(@errors)
2516              
2517             Parse the amazon_mws_products and return an hashref where the keys are
2518             the failed skus, and the values are hashrefs where the keys are the
2519             mismatched fields and the values are hashrefs with these keys:
2520              
2521             Mismatched fields may be: C<part_number>, C<title>, C<manufacturer>,
2522             C<brand>, C<color>, C<size>
2523              
2524             =over 4
2525              
2526             =item shop
2527              
2528             The value on the shop
2529              
2530             =item amazon
2531              
2532             The value of the amazon product
2533              
2534             =item error_code
2535              
2536             The error code
2537              
2538             =back
2539              
2540             E.g.
2541              
2542             my $mismatches = {12344 => {
2543             part_number => {
2544             shop => 'XY',
2545             amazon => 'XYZ',
2546             error_code => '8541',
2547             },
2548             title => {
2549             shop => 'ABC',
2550             amazon => 'DFG',
2551             error_code => '8541',
2552             },
2553             },
2554             .....
2555             };
2556              
2557             Optionally, if the error codes are passed to the argument, only those
2558             errors are fetches.
2559              
2560             =cut
2561              
2562              
2563             sub get_products_with_amazon_shop_mismatches {
2564             my ($self, @errors) = @_;
2565             # so far only this code is for mismatches
2566             my %mismatches;
2567             my @faulty = $self->get_products_with_error_code(@errors);
2568             foreach my $product (@faulty) {
2569             # only if failed.
2570             next if $product->{status} ne 'failed';
2571             my $msg = $product->{error_msg};
2572             my $error_code = $product->{error_code};
2573             my $sku = $product->{sku};
2574             my $errors = $self->_parse_error_message_mismatches($msg);
2575             foreach my $key (keys %$errors) {
2576             # in this case, we are interested only in the pairs
2577             if (ref($errors->{$key}) eq 'HASH') {
2578             # we have the pair, so add the error code and report
2579             $errors->{$key}->{error_code} = $error_code;
2580             $mismatches{$sku}{$key} = $errors->{$key};
2581             }
2582             }
2583             }
2584             return \%mismatches;
2585             }
2586              
2587             =head2 get_products_with_mismatches(@errors)
2588              
2589             Similar to C<get_products_with_amazon_shop_mismatches>, but instead
2590             return an arrayref where each element is a hashref with all the info
2591             collapsed.
2592              
2593             The structures reported by C<get_products_with_amazon_shop_mismatches>
2594             are flattened with an C<our_> and C<amazon_> prefix.
2595              
2596             our_part_number => 'XY',
2597             amazon_part_number => 'YZ',
2598             our_title = 'xx',
2599             amazon_title => 'yy',
2600             # etc.
2601              
2602              
2603             =cut
2604              
2605             sub get_products_with_mismatches {
2606             my ($self, @errors) = @_;
2607             my @faulty = $self->get_products_with_error_code(@errors);
2608             my @out;
2609             while (@faulty) {
2610             my $product = shift @faulty;
2611             my $errors = $self->_parse_error_message_mismatches($product->{error_msg});
2612             push @out, { %$product, %$errors };
2613             }
2614             return \@out;
2615             }
2616              
2617             sub _parse_error_message_mismatches {
2618             my ($self, $message) = @_;
2619             return {} unless $message;
2620             my %patterns = %{$self->_mismatch_patterns};
2621             my %out;
2622             foreach my $key (keys %patterns) {
2623             # if the pattern start we shop_ or amazon_, it's a pair
2624             my ($mismatch, $target);
2625             if ($key =~ /\A(shop|amazon)_(.+)/) {
2626             $target = $1;
2627             $mismatch = $2;
2628             }
2629             if ($message =~ $patterns{$key}) {
2630             my $value = $1;
2631             if ($target && $mismatch) {
2632             $out{$mismatch}{$target} = $value;
2633             }
2634             # and in any case store a scalar (and let's hope not to conflict)
2635             $out{$key} = $value;
2636             }
2637             }
2638             return \%out;
2639             }
2640              
2641             =head2 Order Report
2642              
2643             To get this feature working, you need an C<amzn-envelope.xsd> with
2644             OrderReport plugged in. Older versions are broken. Newer schema
2645             versions may have missing Amazon.xsd file. So either you ask amazon to
2646             give you a B<full set of xsd, which inclused OrderReport in
2647             amzn-envelope.xsd> or you apply this patch to amzn-envelope.xsd:
2648              
2649             --- a/amzn-envelope.xsd 2014-10-27 10:26:19.000000000 +0100
2650             +++ b/amzn-envelope.xsd 2015-03-26 10:56:16.000000000 +0100
2651             @@ -23,2 +23,3 @@
2652             <xsd:include schemaLocation="Price.xsd"/>
2653             + <xsd:include schemaLocation="OrderReport.xsd"/>
2654             <xsd:include schemaLocation="ProcessingReport.xsd"/>
2655             @@ -41,2 +42,3 @@
2656             <xsd:enumeration value="OrderFulfillment"/>
2657             + <xsd:enumeration value="OrderReport"/>
2658             <xsd:enumeration value="Override"/>
2659             @@ -83,2 +85,3 @@
2660             <xsd:element ref="OrderFulfillment"/>
2661             + <xsd:element ref="OrderReport"/>
2662             <xsd:element ref="Override"/>
2663              
2664             =head3 get_unprocessed_orders
2665              
2666             Return a list of objects with the orders.
2667              
2668             =head3 get_unprocessed_order_report_ids
2669              
2670             Return a list of unprocessed (i.e., which weren't acknowledged by us)
2671             order report ids.
2672              
2673             =cut
2674              
2675             sub get_unprocessed_orders {
2676             my ($self) = @_;
2677             my @ids = $self->get_unprocessed_order_report_ids;
2678             my @orders = $self->get_order_reports_by_id(@ids);
2679             return @orders;
2680             }
2681              
2682             sub get_unprocessed_order_report_ids {
2683             my ($self, %options) = @_;
2684             my $res;
2685             try {
2686             $res = $self->client
2687             ->GetReportList(Acknowledged => 0,
2688             ReportTypeList => ['_GET_ORDERS_DATA_'],
2689             %options,
2690             );
2691             } catch {
2692             _handle_exception($_);
2693             };
2694              
2695             my @reportids;
2696              
2697             # for now, do not ask for the next token, we will process them all
2698             # eventually
2699              
2700             if ($res and $res->{ReportInfo}) {
2701             foreach my $report (@{$res->{ReportInfo}}) {
2702             if (my $id = $report->{ReportId}) {
2703             push @reportids, $id;
2704             }
2705             }
2706             }
2707             return @reportids;
2708             }
2709              
2710              
2711             =head3 get_order_reports_by_id(@id_list)
2712              
2713             The GetReport operation has a maximum request quota of 15 and a
2714             restore rate of one request every minute.
2715              
2716             =cut
2717              
2718             sub get_order_reports_by_id {
2719             my ($self, @ids) = @_;
2720             my @orders;
2721             foreach my $id (@ids) {
2722             my $xml;
2723             try {
2724             $xml = $self->client->GetReport(ReportId => $id);
2725             } catch {
2726             _handle_exception($_);
2727             };
2728             if ($xml) {
2729             if (my @got = $self->_parse_order_reports_xml($xml)) {
2730             push @orders, @got;
2731             }
2732             }
2733             }
2734             return @orders;
2735             }
2736              
2737             sub _parse_order_reports_xml {
2738             my ($self, $xml) = @_;
2739             my @orders;
2740             my $data = $self->xml_reader->($xml);
2741             if (my $messages = $data->{Message}) {
2742             foreach my $message (@$messages) {
2743             if (my $order = $message->{OrderReport}) {
2744             my $order_object = Amazon::MWS::XML::Response::OrderReport->new(struct => $order);
2745             push @orders, $order_object;
2746             }
2747             else {
2748             die "Cannot found expected OrderReport in " . Dumper($message);
2749             }
2750             }
2751             }
2752             return @orders;
2753             }
2754              
2755              
2756             =head2 acknowledge_reports(@ids)
2757              
2758             Mark the reports as processed.
2759              
2760             =head2 unacknowledge_reports(@ids)
2761              
2762             Mark the reports as not processed.
2763              
2764             =cut
2765              
2766             sub acknowledge_reports {
2767             my ($self, @ids) = @_;
2768             $self->_toggle_ack_reports(1, @ids);
2769             }
2770              
2771             sub unacknowledge_reports {
2772             my ($self, @ids) = @_;
2773             $self->_toggle_ack_reports(0, @ids);
2774             }
2775              
2776             sub _toggle_ack_reports {
2777             my ($self, $flag, @ids) = @_;
2778             return unless @ids;
2779             while (@ids) {
2780             # max 100 ids per run
2781             my @list = splice(@ids, 0, 100);
2782             try {
2783             $self->client->UpdateReportAcknowledgements(ReportIdList => \@list,
2784             Acknowledged => $flag);
2785             } catch {
2786             _handle_exception($_);
2787             };
2788             }
2789             return;
2790             }
2791              
2792             sub _handle_exception {
2793             my ($err) = @_;
2794             if (blessed $err) {
2795             my $msg;
2796             if ( $err->isa('Amazon::MWS::Exception::Throttled') ) {
2797             $msg = $err->xml;
2798             }
2799             elsif ( $err->isa('Amazon::MWS::Exception')) {
2800             if (my $string = $err->error) {
2801             $msg = $string;
2802             }
2803             else {
2804             $msg = Dumper($err);
2805             }
2806             }
2807             else {
2808             $msg = Dumper($err);
2809             }
2810             if ( $err->isa('Amazon::MWS::Exception')) {
2811             $msg .= "\n" . $err->trace->as_string . "\n";
2812             }
2813             die $msg;
2814             }
2815             die $err;
2816             }
2817              
2818             =head2 job_timed_out($job_row) [INTERNAL]
2819              
2820             Check if the hashref (which is a hashref of the amazon_mws_jobs row)
2821             has timed out, comparing with the C<order_ack_days_timeout> and
2822             C<job_hours_timeout> (depending on the job).
2823              
2824              
2825             =cut
2826              
2827             sub job_timed_out {
2828             my ($self, $job_row) = @_;
2829             my $task = $job_row->{task};
2830             die "Missing task in $job_row->{amws_job_id}" unless $task;
2831             my $started = $job_row->{job_started_epoch};
2832             die "Missing job_started_epoch in $job_row->{amws_job_id}" unless $started;
2833             my $now = time();
2834             my $timeout;
2835             if ($task eq 'order_ack') {
2836             $timeout = $self->order_ack_days_timeout * 60 * 60 * 24;
2837             }
2838             else {
2839             $timeout = $self->job_hours_timeout * 60 * 60;
2840             }
2841             die "Something is off, timeout not defined" unless defined $timeout;
2842             return unless $timeout;
2843             my $elapsed = $now - $started;
2844             if ($elapsed > $timeout) {
2845             return $elapsed;
2846             }
2847             else {
2848             return;
2849             }
2850             }
2851              
2852             sub _print_or_warn_error {
2853             my ($self, @args) = @_;
2854             my $action;
2855             if (@args) {
2856             if ($self->quiet) {
2857             $action = 'print';
2858             print @args;
2859             }
2860             else {
2861             $action = 'warn';
2862             warn @args;
2863             }
2864             }
2865             return ($action, @args);
2866             }
2867              
2868             =head2 purge_old_jobs($limit)
2869              
2870             Eventually the jobs and feed tables grow and never get purged. You can
2871             call this method to remove from the db all the feeds older than
2872             C<order_ack_days_timeout> (30 by default).
2873              
2874             To avoid too much load on the db, you can set the limit to purge the
2875             jobs. Defaults to 500. Set it to 0 to disable it.
2876              
2877             =cut
2878              
2879             sub purge_old_jobs {
2880             my ($self, $limit) = @_;
2881             unless (defined $limit) {
2882             $limit = 500;
2883             }
2884             my $range = time() - $self->order_ack_days_timeout * 60 * 60 * 24;
2885             my @and = (
2886             task => [qw/product_deletion
2887             upload/],
2888             job_started_epoch => { '<', $range },
2889             [ -or => {
2890             aborted => 1,
2891             success => 1,
2892             },
2893             ],
2894             );
2895             if (my $shop_id = $self->shop_id) {
2896             push @and, shop_id => $shop_id;
2897             }
2898              
2899             my $sth = $self->_exe_query($self->sqla
2900             ->select(amazon_mws_jobs => [qw/amws_job_id shop_id/],
2901             [ -and => \@and ] ));
2902             my @purge_jobs;
2903             my $count = 0;
2904             while (my $where = $sth->fetchrow_hashref) {
2905             if ($limit) {
2906             last if $count++ > $limit;
2907             }
2908             push @purge_jobs, $where;
2909             }
2910             $sth->finish;
2911             if (@purge_jobs) {
2912             $self->_exe_query($self->sqla->delete(amazon_mws_feeds => \@purge_jobs));
2913             $self->_exe_query($self->sqla->delete(amazon_mws_jobs => \@purge_jobs));
2914             while (@purge_jobs) {
2915             my $feed = shift @purge_jobs;
2916             my $dir = path($self->feed_dir)->child($feed->{shop_id}, $feed->{amws_job_id});
2917             if ($dir->exists) {
2918             print "Removing " . $dir->canonpath . "\n"; # unless $self->quiet;
2919             $dir->remove_tree;
2920             }
2921             else {
2922             print "$dir doesn't exist\n";
2923             }
2924             }
2925             }
2926             else {
2927             print "Nothing to purge\n" unless $self->quiet;
2928             }
2929             }
2930              
2931             1;