File Coverage

blib/lib/Amazon/MWS/Uploader.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Amazon::MWS::Uploader;
2              
3 6     6   114359 use utf8;
  6         171  
  6         44  
4 6     6   302 use strict;
  6         15  
  6         175  
5 6     6   41 use warnings;
  6         15  
  6         261  
6              
7 6     6   7929 use DBI;
  6         132906  
  6         574  
8 6     6   3128 use Amazon::MWS::XML::Feed;
  6         29  
  6         258  
9 6     6   2709 use Amazon::MWS::XML::Order;
  6         30  
  6         269  
10 6     6   2584 use Amazon::MWS::Client;
  6         30  
  6         277  
11 6     6   2745 use Amazon::MWS::XML::Response::FeedSubmissionResult;
  0            
  0            
12             use Amazon::MWS::XML::Response::OrderReport;
13             use Data::Dumper;
14             use File::Spec;
15             use DateTime;
16             use SQL::Abstract;
17             use Try::Tiny;
18             use Path::Tiny;
19             use Scalar::Util qw/blessed/;
20             use XML::Compile::Schema;
21              
22             use Moo;
23             use MooX::Types::MooseLike::Base qw(:all);
24             use namespace::clean;
25              
26             our $VERSION = '0.18';
27              
28             use constant {
29             AMW_ORDER_WILDCARD_ERROR => 999999,
30             DEBUG => $ENV{AMZ_UPLOADER_DEBUG},
31             };
32              
33             =head1 NAME
34              
35             Amazon::MWS::Uploader -- high level agent to upload products to AMWS
36              
37             =head1 DESCRIPTION
38              
39             This module provide an high level interface to the upload process. It
40             has to keep track of the state to resume the uploading, which could
41             get stuck on the Amazon's side processing, so database credentials
42             have to be provided (or the database handle itself).
43              
44             The table structure needed is defined and commented in sql/amazon.sql
45              
46             =head1 SYNOPSIS
47              
48             my $agent = Amazon::MWS::Uploader->new(
49             db_dsn => 'DBI:mysql:database=XXX',
50             db_username => 'xxx',
51             db_password => 'xxx',
52             db_options => \%options
53             # or dbh => $dbh,
54            
55             schema_dir => '/path/to/xml_schema',
56             feed_dir => '/path/to/directory/for/xml',
57            
58             merchant_id => 'xxx',
59             access_key_id => 'xxx',
60             secret_key => 'xxx',
61            
62             marketplace_id => 'xxx',
63             endpoint => 'xxx',
64            
65             products => \@products,
66             );
67            
68             # say once a day, retrieve the full batch and send it up
69             $agent->upload;
70            
71             # every 10 minutes or so, continue the work started with ->upload, if any
72             $agent->resume;
73              
74              
75             =head1 UPGRADE NOTES
76              
77             When migrating from 0.05 to 0.06 please execute this SQL statement
78              
79             ALTER TABLE amazon_mws_products ADD COLUMN listed BOOLEAN;
80             UPDATE amazon_mws_products SET listed = 1 WHERE status = 'ok';
81              
82             When upgrading to 0.16, please execute this SQL statement:
83              
84             ALTER TABLE amazon_mws_products ADD COLUMN warnings TEXT;
85              
86             =head1 ACCESSORS
87              
88             The following keys must be passed at the constructor and can be
89             accessed read-only:
90              
91             =over 4
92              
93             =item dbh
94              
95             The DBI handle. If not provided will be built using the following
96             self-describing accessor:
97              
98             =item db_dsn
99              
100             =item db_username
101              
102             =item db_password
103              
104             =item db_options
105              
106             E.g.
107              
108             {
109             mysql_enable_utf8 => 1,
110             }
111              
112             AutoCommit and RaiseError are set by us.
113              
114             =cut
115              
116             has db_dsn => (is => 'ro');
117             has db_password => (is => 'ro');
118             has db_username => (is => 'ro');
119             has db_options => (is => 'ro',
120             isa => AnyOf[Undef,HashRef],
121             );
122             has dbh => (is => 'lazy');
123              
124             =item skus_warnings_modes
125              
126             Determines how to treat warnings. This is a hash reference with the
127             code of the warning as key and one of the following modes as value:
128              
129             =over 4
130              
131             =item warn
132              
133             Prints warning from Amazon with C<warn> function (default mode).
134              
135             =item print
136              
137             Prints warning from Amazon with C<print> function (default mode).
138              
139             =item skip
140              
141             Ignores warning from Amazon.
142              
143             =back
144              
145             =cut
146              
147             has skus_warnings_modes => (is => 'rw',
148             isa => HashRef,
149             default => sub {{}},
150             );
151              
152             =item order_days_range
153              
154             When calling get_orders, check the orders for the last X days. It
155             accepts an integer which should be in the range 1-30. Defaults to 7.
156              
157             Keep in mind that if you change the default and you have a lot of
158             orders, you will get throttled because for each order we retrieve the
159             orderline as well.
160              
161             DEVEL NOTE: a possible smart fix would be to store this object in the
162             order (or into a closure) and make the orderline a lazy attribute
163             which will call C<ListOrderItems>.
164              
165             =cut
166              
167             has order_days_range => (is => 'rw',
168             default => sub { 7 },
169             isa => sub {
170             my $days = $_[0];
171             die "Not an integer"
172             unless is_Int($days);
173             die "$days is out of range 1-30"
174             unless $days > 0 && $days < 31;
175             });
176              
177             =item shop_id
178              
179             You can pass an arbitrary identifier to the constructor which will be
180             used to keep the database records separated if you have multiple
181             amazon accounts. If not provided, the merchant id will be used, which
182             will work, but it's harder (for the humans) to spot and debug.
183              
184             =cut
185              
186             has shop_id => (is => 'ro');
187              
188             has _unique_shop_id => (is => 'lazy');
189              
190             sub _build__unique_shop_id {
191             my $self = shift;
192             if (my $id = $self->shop_id) {
193             return $id;
194             }
195             else {
196             return $self->merchant_id;
197             }
198             }
199              
200             =item debug
201              
202             Print out additional information.
203              
204             =item logfile
205              
206             Passed to L<Amazon::MWS::Client> constructor.
207              
208             =cut
209              
210             has debug => (is => 'ro');
211              
212             has logfile => (is => 'ro');
213              
214             =item quiet
215              
216             Boolean. Do not warn on timeouts and aborts (just print) if set to
217             true.
218              
219             =cut
220              
221             has quiet => (is => 'ro');
222              
223             sub _build_dbh {
224             my $self = shift;
225             my $dsn = $self->db_dsn;
226             die "Missing dns" unless $dsn;
227             my $options = $self->db_options || {};
228             # forse raise error and auto-commit
229             $options->{RaiseError} = 1;
230             $options->{AutoCommit} = 1;
231             my $dbh = DBI->connect($dsn, $self->db_username, $self->db_password,
232             $options) or die "Couldn't connect to $dsn!";
233             return $dbh;
234             }
235              
236             =item purge_missing_products
237              
238             If true, the first time C<products_to_upload> is called, products not
239             passed to the C<products> constructor will be purged from the
240             C<amazon_mws_products> table. Default to false.
241              
242             This setting is DEPRECATED because can have some unwanted
243             side-effects. You are recommended to delete the obsoleted products
244             yourself.
245              
246             =cut
247              
248             has purge_missing_products => (is => 'rw');
249              
250              
251             =item reset_all_errors
252              
253             If set to a true value, don't skip previously failed items and
254             effectively reset all of them.
255              
256             Also, when the accessor is set for send_shipping_confirmation, try to
257             upload again previously failed orders.
258              
259             =cut
260              
261             has reset_all_errors => (is => 'ro');
262              
263             =item reset_errors
264              
265             A string containing a comma separated list of error codes, optionally
266             prefixed with a "!" (to reverse its meaning).
267              
268             Example:
269              
270             "!6024,6023"
271              
272             Meaning: reupload all the products whose error code is B<not> 6024 or
273             6023.
274              
275             "6024,6023"
276              
277             Meaning: reupload the products whose error code was 6024 or 6023
278              
279             =cut
280              
281             has reset_errors => (is => 'ro',
282             isa => sub {
283             my $string = $_[0];
284             # undef/0/'' is fine
285             if ($string) {
286             die "reset_errors must be a comma separated list of error code, optionally prefixed by a '!' to negate its meaning"
287             if $string !~ m/^\s*!?\s*(([0-9]+)(\s*,\s*)?)+/;
288             }
289             });
290              
291              
292             has _reset_error_structure => (is => 'lazy');
293              
294             sub _build__reset_error_structure {
295             my $self = shift;
296             my $reset_string = $self->reset_errors || '';
297             $reset_string =~ s/^\s*//;
298             $reset_string =~ s/\s*$//;
299             return unless $reset_string;
300              
301             my $negate = 0;
302             if ($reset_string =~ m/^\s*!\s*(.+)/) {
303             $reset_string = $1;
304             $negate = 1;
305             }
306             my %codes = map { $_ => 1 } grep { $_ } split(/\s*,\s*/, $reset_string);
307             return unless %codes;
308             return {
309             negate => $negate,
310             codes => \%codes,
311             };
312             }
313              
314              
315             =item force
316              
317             Same as above, but only for the selected items. An arrayref is
318             expected here with the B<skus>.
319              
320             =cut
321              
322             has force => (is => 'ro',
323             isa => ArrayRef,
324             );
325              
326              
327             has _force_hashref => (is => 'lazy');
328              
329             sub _build__force_hashref {
330             my $self = shift;
331             my %forced;
332             if (my $arrayref = $self->force) {
333             %forced = map { $_ => 1 } @$arrayref;
334             }
335             return \%forced;
336             }
337              
338             =item limit_inventory
339              
340             If set to an integer, limit the inventory to this value. Setting this
341             to 0 will disable it.
342              
343             =item job_hours_timeout
344              
345             If set to an integer, abort the job after X hours are elapsed since
346             the job was started. Default to 3 hours. Set to 0 to disable (not
347             recommended).
348              
349             This doesn't affect jobs for order acknowledgements (C<order_ack>), see below.
350              
351             =item order_ack_days_timeout
352              
353             Order acknowlegments times out at different rate, because it's somehow
354             sensitive.
355              
356             =cut
357              
358             has job_hours_timeout => (is => 'ro',
359             isa => Int,
360             default => sub { 3 });
361              
362             has order_ack_days_timeout => (is => 'ro',
363             isa => Int,
364             default => sub { 30 });
365              
366             has limit_inventory => (is => 'ro',
367             isa => Int);
368              
369             =item schema_dir
370              
371             The directory where the xsd files for the feed building can be found.
372              
373             =item feeder
374              
375             A L<Amazon::MWS::XML::Feed> object. Lazy attribute, you shouldn't pass
376             this to the constructor, it is lazily built using C<products>,
377             C<merchant_id> and C<schema_dir>.
378              
379             =item feed_dir
380              
381             A working directory where to stash the uploaded feeds for inspection
382             if problems are detected.
383              
384             =item schema
385              
386             The L<XML::Compile::Schema> object, built lazily from C<feed_dir>
387              
388             =item xml_writer
389              
390             The xml writer, built lazily.
391              
392             =item xml_reader
393              
394             The xml reader, built lazily.
395              
396             =cut
397              
398             has schema_dir => (is => 'ro',
399             required => 1,
400             isa => sub {
401             die "$_[0] is not a directory" unless -d $_[0];
402             });
403              
404             has feed_dir => (is => 'ro',
405             required => 1,
406             isa => sub {
407             die "$_[0] is not a directory" unless -d $_[0];
408             });
409              
410             has schema => (is => 'lazy');
411              
412             sub _build_schema {
413             my $self = shift;
414             my $files = File::Spec->catfile($self->schema_dir, '*.xsd');
415             my $schema = XML::Compile::Schema->new([glob $files]);
416             return $schema;
417             }
418              
419             has xml_writer => (is => 'lazy');
420              
421             sub _build_xml_writer {
422             my $self = shift;
423             return $self->schema->compile(WRITER => 'AmazonEnvelope');
424             }
425              
426             has xml_reader => (is => 'lazy');
427              
428             sub _build_xml_reader {
429             my $self = shift;
430             return $self->schema->compile(READER => 'AmazonEnvelope');
431             }
432              
433              
434             =item generic_feeder
435              
436             Return a L<Amazon::MWS::XML::GenericFeed> object to build a feed using
437             the XML writer.
438              
439             =cut
440              
441             sub generic_feeder {
442             my $self = shift;
443             return Amazon::MWS::XML::GenericFeed->new(
444             xml_writer => $self->xml_writer,
445             merchant_id => $self->merchant_id,
446             );
447             }
448              
449              
450             =item merchant_id
451              
452             The merchant ID provided by Amazon.
453              
454             =item access_key_id
455              
456             Provided by Amazon.
457              
458             =item secret_key
459              
460             Provided by Amazon.
461              
462             =item marketplace_id
463              
464             L<http://docs.developer.amazonservices.com/en_US/dev_guide/DG_Endpoints.html>
465              
466             =item endpoint
467              
468             Ditto.
469              
470             =cut
471              
472             has merchant_id => (is => 'ro', required => 1);
473             has access_key_id => (is => 'ro', required => 1);
474             has secret_key => (is => 'ro', required => 1);
475             has marketplace_id => (is => 'ro', required => 1);
476             has endpoint => (is => 'ro', required => 1);
477              
478             =item products
479              
480             An arrayref of L<Amazon::MWS::XML::Product> objects, or anything that
481             (properly) responds to C<as_product_hash>, C<as_inventory_hash>,
482             C<as_price_hash>. See L<Amazon::MWS::XML::Product> for details.
483              
484             B<This is set as read-write, so you can set the product after the
485             object construction, but if you change it afterward, you will get
486             unexpected results>.
487              
488             This routine also check if the product needs upload and delete
489             disappeared products. If you are doing the check yourself, use
490             C<checked_products>.
491              
492             =item checked_products
493              
494             As C<products>, but no check is performed. This takes precedence.
495              
496             =item sqla
497              
498             Lazy attribute to hold the C<SQL::Abstract> object.
499              
500             =cut
501              
502             has products => (is => 'rw',
503             isa => ArrayRef);
504              
505             has sqla => (
506             is => 'ro',
507             default => sub {
508             return SQL::Abstract->new;
509             }
510             );
511              
512             has existing_products => (is => 'lazy');
513              
514             sub _build_existing_products {
515             my $self = shift;
516             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku
517             timestamp_string
518             status
519             listed
520             error_code
521             /],
522             {
523             status => { -not_in => [qw/deleted/] },
524             shop_id => $self->_unique_shop_id,
525             }));
526             my %uploaded;
527             while (my $row = $sth->fetchrow_hashref) {
528             $row->{timestamp_string} ||= 0;
529             $uploaded{$row->{sku}} = $row;
530             }
531             return \%uploaded;
532             }
533              
534             has products_to_upload => (is => 'lazy');
535              
536             has checked_products => (is => 'rw', isa => ArrayRef);
537              
538             sub _build_products_to_upload {
539             my $self = shift;
540             if (my $checked = $self->checked_products) {
541             return $checked;
542             }
543             my $product_arrayref = $self->products;
544             return [] unless $product_arrayref && @$product_arrayref;
545             my @products = @$product_arrayref;
546             my $existing = $self->existing_products;
547             my @todo;
548             foreach my $product (@products) {
549             my $sku = $product->sku;
550             if (my $exists = $existing->{$sku}) {
551             # mark the item as visited
552             $exists->{_examined} = 1;
553             }
554             print "Checking $sku\n" if $self->debug;
555             next unless $self->product_needs_upload($product->sku, $product->timestamp_string);
556              
557             print "Scheduling product " . $product->sku . " for upload\n";
558             if (my $limit = $self->limit_inventory) {
559             my $real = $product->inventory;
560             if ($real > $limit) {
561             print "Limiting the $sku inventory from $real to $limit\n" if $self->debug;
562             $product->inventory($limit);
563             }
564             }
565             if (my $children = $product->children) {
566             my @good_children;
567             foreach my $child (@$children) {
568             # skip failed children, but if the current status of
569             # parent is failed, and we reached this point, retry.
570             if (! exists $self->_force_hashref->{$child} and
571             $existing->{$child} and
572             $existing->{$sku} and
573             $existing->{$sku}->{status} ne 'failed' and
574             $existing->{$child}->{status} eq 'failed') {
575             print "Ignoring failed variant $child\n";
576             }
577             else {
578             push @good_children, $child;
579             }
580             }
581             $product->children(\@good_children);
582             }
583             push @todo, $product;
584             }
585             if ($self->purge_missing_products) {
586             # nuke the products not passed
587             # print Dumper($existing);
588             my @deletions = map { $_->{sku} }
589             grep { !$_->{_examined} }
590             values %$existing;
591             if (@deletions) {
592             $self->delete_skus(@deletions);
593             }
594             }
595             return \@todo;
596             }
597              
598              
599             =item client
600              
601             An L<Amazon::MWS::Client> object, built lazily, so you don't have to
602             pass it.
603              
604             =back
605              
606             =cut
607              
608             has client => (is => 'lazy');
609              
610             sub _build_client {
611             my $self = shift;
612             my %mws_args = map { $_ => $self->$_ } (qw/merchant_id
613             marketplace_id
614             access_key_id
615             secret_key
616             debug
617             logfile
618             endpoint/);
619              
620             return Amazon::MWS::Client->new(%mws_args);
621             }
622              
623             has _mismatch_patterns => (is => 'lazy', isa => HashRef);
624              
625             sub _build__mismatch_patterns {
626             my $self = shift;
627             my $merchant_re = qr{\s+\((?:Merchant|Verkäufer):\s+'(.*?)'\s+/};
628             my $amazon_re = qr{\s+.*?/\s*Amazon:\s+'(.*?)'\)};
629             my %patterns = (
630             # informative only
631             asin => qr{ASIN(?:\s+überein)?\s+([0-9A-Za-z]+)},
632              
633             shop_part_number => qr{part_number$merchant_re},
634             amazon_part_number => qr{part_number$amazon_re},
635              
636             shop_title => qr{item_name$merchant_re},
637             amazon_title => qr{item_name$amazon_re},
638              
639             shop_manufacturer => qr{manufacturer$merchant_re},
640             amazon_manufacturer => qr{manufacturer$amazon_re},
641              
642             shop_brand => qr{brand$merchant_re},
643             amazon_brand => qr{brand$amazon_re},
644              
645             shop_color => qr{color$merchant_re},
646             amazon_color => qr{color$amazon_re},
647              
648             shop_size => qr{size$merchant_re},
649             amazon_size => qr{size$amazon_re},
650              
651             );
652             return \%patterns;
653             }
654              
655              
656             =head1 MAIN METHODS
657              
658             =head2 upload
659              
660             If the products is set, begin the routine to upload them. Because of
661             the asynchronous way AMWS works, at some point it will bail out,
662             saving the state in the database. You should reinstantiate the object
663             and call C<resume> on it every 10 minutes or so.
664              
665             The workflow is described here:
666             L<http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html>
667              
668             This has to be done for each feed: Product, Inventory, Price, Image,
669             Relationship (for variants).
670              
671             This method first generate the feeds in the feed directory, and then
672             calls C<resume>, which is in charge for the actual uploading.
673              
674             =head2 resume
675              
676             Restore the state and resume where it was left.
677              
678             This method accepts an optional list of parameters. Each parameter may be:
679              
680             =over 4
681              
682             =item a scalar
683              
684             This is considered a job id.
685              
686             =item a hashref
687              
688             This will be merged in the query to retrieve the pending jobs. A
689             sample usage could be:
690              
691             $upload->resume({ task => [qw/upload product_deletion/] });
692              
693             to resume only those specific tasks.
694              
695             =back
696              
697             =head2 get_pending_jobs
698              
699             Return the list of hashref with the pending jobs out of the database.
700             Accepts the same parameters as C<resume> (which actually calls this
701             method).
702              
703             =cut
704              
705             =head1 INTERNAL METHODS
706              
707             =head2 prepare_feeds($type, { name => $feed_name, content => "<xml>..."}, { name => $feed_name2, content => "<xml>..."}, ....)
708              
709             Prepare the feed of type $type with the feeds provided as additional
710             arguments.
711              
712             Return the job id
713              
714              
715             =cut
716              
717             sub _feed_job_dir {
718             my ($self, $job_id, $create) = @_;
719             die unless $job_id;
720             my $shop_id = $self->_unique_shop_id;
721             $shop_id =~ s/[^0-9A-Za-z_-]//g;
722             die "The shop id without word characters results in an empty string"
723             unless $shop_id;
724             my $feed_root = File::Spec->catdir($self->feed_dir,
725             $shop_id);
726             if ($create) {
727             mkdir $feed_root unless -d $feed_root;
728             }
729              
730             my $feed_subdir = File::Spec->catdir($feed_root,
731             $job_id);
732             if ($create) {
733             mkdir $feed_subdir unless -d $feed_subdir;
734             }
735             return $feed_subdir;
736             }
737              
738             sub _feed_file_for_method {
739             my ($self, $job_id, $feed_type) = @_;
740             die unless $job_id && $feed_type;
741             my $feed_subdir = $self->_feed_job_dir($job_id, "create");
742             my $file = File::Spec->catfile($feed_subdir, $feed_type . '.xml');
743             return File::Spec->rel2abs($file);
744             }
745              
746             sub _slurp_file {
747             my ($self, $file) = @_;
748             open (my $fh, '<', $file) or die "Couldn't open $file $!";
749             local $/ = undef;
750             my $content = <$fh>;
751             close $fh;
752             return $content;
753             }
754              
755             sub upload {
756             my $self = shift;
757             # create the feeds to be uploaded using the products
758             my @products = @{ $self->products_to_upload };
759              
760             unless (@products) {
761             print "No products, can't upload anything\n";
762             return;
763             }
764             my $feeder = Amazon::MWS::XML::Feed->new(
765             products => \@products,
766             xml_writer => $self->xml_writer,
767             merchant_id => $self->merchant_id,
768             );
769             my @feeds;
770             foreach my $feed_name (qw/product
771             inventory
772             price
773             image
774             variants
775             /) {
776             my $method = $feed_name . "_feed";
777             if (my $content = $feeder->$method) {
778             push @feeds, {
779             name => $feed_name,
780             content => $content,
781             };
782             }
783             }
784             if (my $job_id = $self->prepare_feeds(upload => \@feeds)) {
785             $self->_mark_products_as_pending($job_id, @products);
786             return $job_id;
787             }
788             return;
789             }
790              
791             sub _mark_products_as_pending {
792             my ($self, $job_id, @products) = @_;
793             die "Bad usage" unless $job_id;
794             # these skus were cleared up when asking for the products to upload
795             foreach my $p (@products) {
796             my %identifier = (
797             sku => $p->sku,
798             shop_id => $self->_unique_shop_id,
799             );
800             my %data = (
801             amws_job_id => $job_id,
802             status => 'pending',
803             warnings => '', # clear out
804             timestamp_string => $p->timestamp_string,
805             );
806             my $check = $self
807             ->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku/], { %identifier }));
808             my $existing = $check->fetchrow_hashref;
809             $check->finish;
810             if ($existing) {
811             $self->_exe_query($self->sqla->update(amazon_mws_products => \%data, \%identifier));
812             }
813             else {
814             $self->_exe_query($self->sqla->insert(amazon_mws_products => { %identifier, %data }));
815             }
816             }
817             }
818              
819              
820             sub prepare_feeds {
821             my ($self, $task, $feeds) = @_;
822             die "Missing task ($task) and feeds ($feeds)" unless $task && $feeds;
823             return unless @$feeds; # nothing to do
824             my $job_id = $task . "-" . DateTime->now->strftime('%F-%H-%M-%S');
825             my $job_started_epoch = time();
826              
827             $self->_exe_query($self->sqla
828             ->insert(amazon_mws_jobs => {
829             amws_job_id => $job_id,
830             shop_id => $self->_unique_shop_id,
831             task => $task,
832             job_started_epoch => $job_started_epoch,
833             }));
834              
835             # to complete the process, we need to fill out these five
836             # feeds. every feed has the same procedure, as per
837             # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
838             # so we put a flag on the feed when it is done. The processing
839             # of the feed itself is tracked in the amazon_mws_feeds
840              
841             # TODO: we could pass to the object some flags to filter out results.
842             foreach my $feed (@$feeds) {
843             # write out the feed if we got something to do, and add a row
844             # to the feeds.
845              
846             # when there is no content, no need to create a job for it.
847             if (my $content = $feed->{content}) {
848             my $name = $feed->{name} or die "Missing feed_name";
849             my $file = $self->_feed_file_for_method($job_id, $name);
850             open (my $fh, '>', $file) or die "Couldn't open $file $!";
851             print $fh $content;
852             close $fh;
853             # and prepare a row for it
854              
855             my $insertion = {
856             feed_name => $name,
857             feed_file => $file,
858             amws_job_id => $job_id,
859             shop_id => $self->_unique_shop_id,
860             };
861             $self->_exe_query($self->sqla
862             ->insert(amazon_mws_feeds => $insertion));
863             }
864             }
865             return $job_id;
866             }
867              
868              
869             sub get_pending_jobs {
870             my ($self, @args) = @_;
871             my %additional;
872             my @named_jobs;
873             foreach my $arg (@args) {
874             if (!ref($arg)) {
875             push @named_jobs, $arg;
876             }
877             elsif (ref($arg) eq 'HASH') {
878             # add the filters
879             foreach my $key (keys %$arg) {
880             if ($additional{$key}) {
881             die "Attempt to overwrite $key in the additional parameters!\n";
882             }
883             else {
884             $additional{$key} = $arg->{$key};
885             }
886             }
887             }
888             else {
889             die "Argument must be either a scalar with a job name and/or "
890             . "an hashref with additional filters!";
891             }
892             }
893             if (@named_jobs) {
894             $additional{amws_job_id} = { -in => \@named_jobs };
895             }
896             my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
897             {
898             %additional,
899             aborted => 0,
900             success => 0,
901             shop_id => $self->_unique_shop_id,
902             },
903             { -asc => 'job_started_epoch'});
904             my $pending = $self->_exe_query($stmt, @bind);
905             my %jobs;
906             while (my $row = $pending->fetchrow_hashref) {
907             $jobs{$row->{task}} ||= [];
908             push @{$jobs{$row->{task}}}, $row;
909             }
910             my @out;
911             foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
912             if (my $list = delete $jobs{$task}) {
913             if ($task eq 'order_ack') {
914             for (1..2) {
915             push @out, pop @$list if @$list;
916             }
917             }
918             elsif ($task eq 'shipping_confirmation') {
919             while (@$list) {
920             push @out, pop @$list;
921             }
922             }
923             else {
924             push @out, @$list if @$list;
925             }
926             }
927             }
928             return @out;
929             }
930              
931             sub resume {
932             my ($self, @args) = @_;
933             foreach my $row ($self->get_pending_jobs(@args)) {
934             print "Working on $row->{amws_job_id}\n";
935             # check if the job dir exists
936             if (-d $self->_feed_job_dir($row->{amws_job_id})) {
937             if (my $seconds_elapsed = $self->job_timed_out($row)) {
938             $self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
939             . Dumper($row));
940             $self->cancel_job($row->{task}, $row->{amws_job_id},
941             "Job timed out after $seconds_elapsed seconds");
942             next;
943             }
944             $self->process_feeds($row);
945             }
946             else {
947             warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
948             " found, removing job id $row->{amws_job_id}\n";
949             $self->cancel_job($row->{task}, $row->{amws_job_id},
950             "Job canceled due to missing feed directory");
951             }
952             }
953             }
954              
955             =head2 cancel_job($task, $job_id, $reason)
956              
957             Abort the job setting the aborted flag in C<amazon_mws_jobs> table.
958              
959             =cut
960              
961             sub cancel_job {
962             my ($self, $task, $job_id, $reason) = @_;
963             $self->_exe_query($self->sqla->update('amazon_mws_jobs',
964             {
965             aborted => 1,
966             status => $reason,
967             },
968             {
969             amws_job_id => $job_id,
970             shop_id => $self->_unique_shop_id,
971             }));
972              
973             # and revert the products' status
974             my $status;
975             if ($task eq 'product_deletion') {
976             # let's pretend we were deleting good products
977             $status = 'ok';
978             }
979             elsif ($task eq 'upload') {
980             $status = 'redo';
981             }
982             if ($status) {
983             print "Updating product to $status for products with job id $job_id\n";
984             $self->_exe_query($self->sqla->update('amazon_mws_products',
985             { status => $status },
986             {
987             amws_job_id => $job_id,
988             shop_id => $self->_unique_shop_id,
989             }));
990             }
991             }
992              
993              
994              
995             =head2 process_feeds(\%job_row)
996              
997             Given the hashref with the db row of the job, check at which point it
998             is and resume.
999              
1000             =cut
1001              
1002             sub process_feeds {
1003             my ($self, $row) = @_;
1004             # print Dumper($row);
1005             # upload the feeds one by one and stop if something is blocking
1006             my $job_id = $row->{amws_job_id};
1007             print "Processing job $job_id\n";
1008              
1009             # query the feeds table for this job
1010             my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
1011             {
1012             amws_job_id => $job_id,
1013             aborted => 0,
1014             success => 0,
1015             shop_id => $self->_unique_shop_id,
1016             },
1017             ['amws_feed_pk']);
1018              
1019             my $sth = $self->_exe_query($stmt, @bind);
1020             my $unfinished;
1021             while (my $feed = $sth->fetchrow_hashref) {
1022             last unless $self->upload_feed($feed);
1023             }
1024             $sth->finish;
1025              
1026             ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
1027             {
1028             shop_id => $self->_unique_shop_id,
1029             amws_job_id => $job_id,
1030             });
1031              
1032             $sth = $self->_exe_query($stmt, @bind);
1033              
1034             my ($total, $success, $aborted) = (0, 0, 0);
1035              
1036             # query again and check if we have aborted jobs;
1037             while (my $feed = $sth->fetchrow_hashref) {
1038             $total++;
1039             $success++ if $feed->{success};
1040             $aborted++ if $feed->{aborted};
1041             }
1042              
1043             # a job was aborted
1044             my $update;
1045             if ($aborted) {
1046             $update = {
1047             aborted => 1,
1048             status => 'Feed error',
1049             };
1050             $self->_print_or_warn_error("Job $job_id aborted!\n");
1051             }
1052             elsif ($success == $total) {
1053             $update = { success => 1 };
1054             print "Job successful!\n";
1055             # if we're here, all the products are fine, so mark them as
1056             # such if it's an upload job
1057             if ($row->{task} eq 'upload') {
1058             $self->_exe_query($self->sqla->update('amazon_mws_products',
1059             { status => 'ok',
1060             listed_date => DateTime->now,
1061             listed => 1,
1062             },
1063             {
1064             amws_job_id => $job_id,
1065             shop_id => $self->_unique_shop_id,
1066             }));
1067             }
1068             }
1069             else {
1070             print "Job still to be processed\n";
1071             }
1072             if ($update) {
1073             $self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
1074             {
1075             amws_job_id => $job_id,
1076             shop_id => $self->_unique_shop_id,
1077             }));
1078             }
1079             }
1080              
1081             =head2 upload_feed($type, $feed_id);
1082              
1083             Routine to upload the feed. Return true if it's complete, false
1084             otherwise.
1085              
1086             =cut
1087              
1088             sub upload_feed {
1089             my ($self, $record) = @_;
1090             my $job_id = $record->{amws_job_id};
1091             my $type = $record->{feed_name};
1092             my $feed_id = $record->{feed_id};
1093             print "Checking $type feed for $job_id\n";
1094             # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_FeedType.html
1095              
1096              
1097             my %names = (
1098             product => '_POST_PRODUCT_DATA_',
1099             inventory => '_POST_INVENTORY_AVAILABILITY_DATA_',
1100             price => '_POST_PRODUCT_PRICING_DATA_',
1101             image => '_POST_PRODUCT_IMAGE_DATA_',
1102             variants => '_POST_PRODUCT_RELATIONSHIP_DATA_',
1103             order_ack => '_POST_ORDER_ACKNOWLEDGEMENT_DATA_',
1104             shipping_confirmation => '_POST_ORDER_FULFILLMENT_DATA_',
1105             );
1106              
1107             die "Unrecognized type $type" unless $names{$type};
1108              
1109             # no feed id, it's a new batch
1110             if (!$feed_id) {
1111             print "No feed id found, doing a request for $job_id $type\n";
1112             my $feed_content = $self->_slurp_file($record->{feed_file});
1113             my $res;
1114             try {
1115             $res = $self->client
1116             ->SubmitFeed(content_type => 'text/xml; charset=utf-8',
1117             FeedType => $names{$type},
1118             FeedContent => $feed_content,
1119             MarketplaceIdList => [$self->marketplace_id],
1120             );
1121             }
1122             catch {
1123             warn "Failure to submit $type feed: \n";
1124             if (ref($_)) {
1125             if ($_->can('xml')) {
1126             warn $_->xml;
1127             }
1128             else {
1129             warn Dumper($_);
1130             }
1131             }
1132             else {
1133             warn $_;
1134             }
1135             };
1136             # do not register the failure on die, because in this case (no
1137             # response) there could be throttling, or network failure
1138             die unless $res;
1139              
1140             # update the feed_id row storing it and updating.
1141             if ($feed_id = $record->{feed_id} = $res->{FeedSubmissionId}) {
1142             $self->_exe_query($self->sqla
1143             ->update(amazon_mws_feeds => $record,
1144             {
1145             amws_feed_pk => $record->{amws_feed_pk},
1146             shop_id => $self->_unique_shop_id,
1147             }));
1148             }
1149             else {
1150             # something is really wrong here, we have to die
1151             die "Couldn't get a submission id, response is " . Dumper($res);
1152             }
1153             }
1154             print "Feed is $feed_id\n";
1155              
1156             if (!$record->{processing_complete}) {
1157             if ($self->_check_processing_complete($feed_id, $type)) {
1158             # update the record and set the flag to true
1159             $self->_exe_query($self->sqla
1160             ->update('amazon_mws_feeds',
1161             { processing_complete => 1 },
1162             {
1163             feed_id => $feed_id,
1164             shop_id => $self->_unique_shop_id,
1165             }));
1166             }
1167             else {
1168             print "Still processing\n";
1169             return;
1170             }
1171             }
1172              
1173             # check if we didn't already processed it
1174             if (!$record->{aborted} || !$record->{success}) {
1175             # we need a class to parse the result.
1176             my $result = $self->submission_result($feed_id);
1177             if ($result->is_success) {
1178             $self->_exe_query($self->sqla
1179             ->update('amazon_mws_feeds',
1180             { success => 1 },
1181             {
1182             feed_id => $feed_id,
1183             shop_id => $self->_unique_shop_id,
1184             }));
1185             # if we have a success, print the warnings on the stderr.
1186             # if we have a failure, the warnings will just confuse us.
1187              
1188             if ($type eq 'order_ack') {
1189             # flip the confirmation bit
1190             $self->_exe_query($self->sqla->update(amazon_mws_orders => { confirmed => 1 },
1191             { amws_job_id => $job_id,
1192             shop_id => $self->_unique_shop_id }));
1193             }
1194             elsif ($type eq 'shipping_confirmation') {
1195             $self->_exe_query($self->sqla->update(amazon_mws_orders => { shipping_confirmation_ok => 1 },
1196             { shipping_confirmation_job_id => $job_id,
1197             shop_id => $self->_unique_shop_id }));
1198             }
1199             if (my $warn = $result->warnings) {
1200             if (my $warns = $result->skus_warnings) {
1201             foreach my $w (@$warns) {
1202             $self->_error_logger(warning => $w->{code},
1203             "$w->{sku}: $w->{error}");
1204             # and register it in the db
1205             if ($w->{sku} && $w->{error}) {
1206             $self->_exe_query($self->sqla->update('amazon_mws_products',
1207             { warnings => "$job_id $w->{code} $w->{error}" },
1208             {
1209             sku => $w->{sku},
1210             shop_id => $self->_unique_shop_id,
1211             }));
1212             }
1213             }
1214             }
1215             else {
1216             warn "$warn\n";
1217             }
1218             }
1219             return 1;
1220             }
1221             else {
1222             foreach my $err ($result->report_errors) {
1223             $self->_error_logger($err->{type},
1224             $err->{code},
1225             $err->{message});
1226             }
1227             $self->_exe_query($self->sqla
1228             ->update('amazon_mws_feeds',
1229             {
1230             aborted => 1,
1231             errors => $result->errors,
1232             },
1233             {
1234             feed_id => $feed_id,
1235             shop_id => $self->_unique_shop_id,
1236             }));
1237             $self->register_errors($job_id, $result);
1238            
1239             if ($type eq 'order_ack') {
1240             $self->register_order_ack_errors($job_id, $result);
1241             }
1242             elsif ($type eq 'shipping_confirmation') {
1243             $self->register_ship_order_errors($job_id, $result);
1244             }
1245            
1246             # and we stop this job, has errors
1247             return 0;
1248             }
1249             }
1250             return $record->{success};
1251             }
1252              
1253             sub _exe_query {
1254             my ($self, $stmt, @bind) = @_;
1255             my $sth = $self->dbh->prepare($stmt);
1256             print $stmt, Dumper(\@bind) if DEBUG;
1257             eval {
1258             $sth->execute(@bind);
1259             };
1260             if ($@) {
1261             die "Failed to execute $stmt with params" . Dumper(\@bind);
1262             }
1263             return $sth;
1264             }
1265              
1266             sub _check_processing_complete {
1267             my ($self, $feed_id, $type) = @_;
1268             my $res;
1269             try {
1270             $res = $self->client->GetFeedSubmissionList;
1271             } catch {
1272             my $exception = $_;
1273             if (ref($exception) && $exception->can('xml')) {
1274             warn "checking processing complete error for $type $feed_id: " . $exception->xml;
1275             }
1276             else {
1277             warn "checking processing complete for $type $feed_id: " . Dumper($exception);
1278             }
1279             };
1280             die unless $res;
1281             print "Checking if the processing is complete for $type $feed_id\n"; # . Dumper($res);
1282             my $found;
1283             if (my $list = $res->{FeedSubmissionInfo}) {
1284             foreach my $feed (@$list) {
1285             if ($feed->{FeedSubmissionId} eq $feed_id) {
1286             $found = $feed;
1287             last;
1288             }
1289             }
1290              
1291             # check the result
1292             if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
1293             return 1;
1294             }
1295             elsif ($found) {
1296             print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
1297             return;
1298             }
1299             else {
1300             # there is a remote possibility that in it in another
1301             # page, but it should be very unlikely, as we block the
1302             # process when the first one is not complete
1303             print "$feed_id not found in submission list\n";
1304             return;
1305             }
1306             }
1307             else {
1308             warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
1309             return;
1310             }
1311             }
1312              
1313             =head2 submission_result($feed_id)
1314              
1315             Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
1316             for the given feed ID.
1317              
1318             =cut
1319              
1320             sub submission_result {
1321             my ($self, $feed_id) = @_;
1322             my $xml;
1323             try {
1324             $xml = $self->client
1325             ->GetFeedSubmissionResult(FeedSubmissionId => $feed_id);
1326             } catch {
1327             my $exception = $_;
1328             if (ref($exception) && $exception->can('xml')) {
1329             warn "submission result error: " . $exception->xml;
1330             }
1331             else {
1332             warn "submission result error: " . Dumper($exception);
1333             }
1334             };
1335             die unless $xml;
1336             return Amazon::MWS::XML::Response::FeedSubmissionResult
1337             ->new(
1338             xml => $xml,
1339             xml_reader => $self->xml_reader,
1340             );
1341             }
1342              
1343             =head2 get_orders($from_date)
1344              
1345             This is a self-contained method and doesn't require a product list.
1346             The from_date must be a L<DateTime> object. If not provided, it will
1347             the last week.
1348              
1349             Returns a list of Amazon::MWS::XML::Order objects.
1350              
1351             Beware that it's possible you get some items with 0 quantity, i.e.
1352             single items cancelled. The application code needs to be prepared to
1353             deal with such phantom items. You can check each order looping over
1354             C<$order->items> checking for C<$item->quantity>.
1355              
1356             =cut
1357              
1358             sub get_orders {
1359             my ($self, $from_date) = @_;
1360             unless ($from_date) {
1361             $from_date = DateTime->now;
1362             $from_date->subtract(days => $self->order_days_range);
1363             }
1364             my @order_structs;
1365             my $res;
1366             try {
1367             $res = $self->client->ListOrders(
1368             MarketplaceId => [$self->marketplace_id],
1369             CreatedAfter => $from_date,
1370             );
1371             push @order_structs, @{ $res->{Orders}->{Order} };
1372             }
1373             catch {
1374             die Dumper($_);
1375             };
1376             while (my $next_token = $res->{NextToken}) {
1377             # print "Found next token!\n";
1378             try {
1379             $res = $self->client->ListOrdersByNextToken(NextToken => $next_token);
1380             push @order_structs, @{ $res->{Orders}->{Order} };
1381             }
1382             catch {
1383             die Dumper($_);
1384             };
1385             }
1386             my @orders;
1387             foreach my $order (@order_structs) {
1388             my $amws_id = $order->{AmazonOrderId};
1389             die "Missing amazon AmazonOrderId?!" unless $amws_id;
1390              
1391             my $get_orderline = sub {
1392             # begin of the closure
1393             my $orderline;
1394             my @items;
1395             try {
1396             $orderline = $self->client->ListOrderItems(AmazonOrderId => $amws_id);
1397             push @items, @{ $orderline->{OrderItems}->{OrderItem} };
1398             }
1399             catch {
1400             my $err = $_;
1401             if (blessed($err) && $err->isa('Amazon::MWS::Exception::Throttled')) {
1402             die "Request is throttled. Consider to adjust order_days_range as documented at https://metacpan.org/pod/Amazon::MWS::Uploader#ACCESSORS";
1403             }
1404             else {
1405             die Dumper($err);
1406             }
1407             };
1408             while (my $next = $orderline->{NextToken}) {
1409             try {
1410             $orderline =
1411             $self->client->ListOrderItemsByNextToken(NextToken => $next);
1412             push @items, @{ $orderline->{OrderItems}->{OrderItem} };
1413             }
1414             catch {
1415             die Dumper($_);
1416             };
1417             }
1418             return \@items;
1419             # end of the closure
1420             };
1421              
1422             push @orders, Amazon::MWS::XML::Order->new(order => $order,
1423             retrieve_orderline_sub => $get_orderline);
1424             }
1425             return @orders;
1426             }
1427              
1428             =head2 order_already_registered($order)
1429              
1430             Check in the amazon_mws_orders table if we already registered this
1431             order.
1432              
1433             Return the row for this table (as an hashref) if present, nothing
1434             underwise.
1435              
1436             =cut
1437              
1438             sub order_already_registered {
1439             my ($self, $order) = @_;
1440             die "Bad usage, missing order" unless $order;
1441             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => '*',
1442             {
1443             amazon_order_id => $order->amazon_order_number,
1444             shop_id => $self->_unique_shop_id,
1445             }));
1446             if (my $exists = $sth->fetchrow_hashref) {
1447             $sth->finish;
1448             return $exists;
1449             }
1450             else {
1451             return;
1452             }
1453             }
1454              
1455             =head2 acknowledge_successful_order(@orders)
1456              
1457             Accept a list of L<Amazon::MWS::XML::Order> objects, prepare a
1458             acknowledge feed with the C<Success> status, and insert the orders in
1459             the database.
1460              
1461             =cut
1462              
1463             sub acknowledge_successful_order {
1464             my ($self, @orders) = @_;
1465             my @orders_to_register;
1466             foreach my $ord (@orders) {
1467             if (my $existing = $self->order_already_registered($ord)) {
1468             if ($existing->{confirmed}) {
1469             print "Skipping already confirmed order $existing->{amazon_order_id} => $existing->{shop_order_id}\n";
1470             }
1471             else {
1472             # it's not complete, so print out diagnostics
1473             warn "Order $existing->{amazon_order_id} uncompletely registered with id $existing->{shop_order_id}, please indagate why (skipping)\n" . Dumper($existing);
1474             }
1475             }
1476             else {
1477             push @orders_to_register, $ord;
1478             }
1479             }
1480             return unless @orders_to_register;
1481              
1482             my $feed_content = $self->acknowledge_feed(Success => @orders_to_register);
1483             # here we have only one feed to upload and check
1484             my $job_id = $self->prepare_feeds(order_ack => [{
1485             name => 'order_ack',
1486             content => $feed_content,
1487             }]);
1488             # store the pairing amazon order id / shop order id in our table
1489             foreach my $order (@orders_to_register) {
1490             my %order_pairs = (
1491             shop_id => $self->_unique_shop_id,
1492             amazon_order_id => $order->amazon_order_number,
1493             # this will die if we try to insert an undef order_number
1494             shop_order_id => $order->order_number,
1495             amws_job_id => $job_id,
1496             );
1497             $self->_exe_query($self->sqla->insert(amazon_mws_orders => \%order_pairs));
1498             }
1499             }
1500              
1501              
1502             =head2 acknowledge_feed($status, @orders)
1503              
1504             The first argument is usually C<Success>. The other arguments is a
1505             list of L<Amazon::MWS::XML::Order> objects.
1506              
1507             =cut
1508              
1509              
1510             sub acknowledge_feed {
1511             my ($self, $status, @orders) = @_;
1512             die "Missing status" unless $status;
1513             die "Missing orders" unless @orders;
1514              
1515             my $feeder = $self->generic_feeder;
1516              
1517             my $counter = 1;
1518             my @messages;
1519             foreach my $order (@orders) {
1520             my $data = $order->as_ack_order_hashref;
1521             $data->{StatusCode} = $status;
1522             push @messages, {
1523             MessageID => $counter++,
1524             OrderAcknowledgement => $data,
1525             };
1526             }
1527             return $feeder->create_feed(OrderAcknowledgement => \@messages);
1528             }
1529              
1530             =head2 delete_skus(@skus)
1531              
1532             Accept a list of skus. Prepare a C<product_deletion> feed and update
1533             the database.
1534              
1535             =cut
1536              
1537             sub delete_skus {
1538             my ($self, @skus) = @_;
1539             return unless @skus;
1540             print "Trying to purge missing items " . join(" ", @skus) . "\n";
1541              
1542             # delete only products which are not in pending status
1543             my $check = $self
1544             ->_exe_query($self->sqla
1545             ->select('amazon_mws_products', [qw/sku status/],
1546             {
1547             sku => { -in => \@skus },
1548             shop_id => $self->_unique_shop_id,
1549             }));
1550             my %our_skus;
1551             while (my $p = $check->fetchrow_hashref) {
1552             $our_skus{$p->{sku}} = $p->{status};
1553             }
1554             my @checked;
1555             while (@skus) {
1556             my $sku = shift @skus;
1557             if (my $status = $our_skus{$sku}) {
1558             if ($status eq 'pending' or
1559             $status eq 'deleted') {
1560             print "Skipping $sku deletion, in status $status\n";
1561             next;
1562             }
1563             }
1564             else {
1565             warn "$sku not found in amazon_mws_products, deleting anyway\n";
1566             }
1567             push @checked, $sku;
1568             }
1569             @skus = @checked;
1570              
1571             unless (@skus) {
1572             print "Not purging anything\n";
1573             return;
1574             }
1575             print "Actually purging items " . join(" ", @skus) . "\n";
1576              
1577             my $feed_content = $self->delete_skus_feed(@skus);
1578             my $job_id = $self->prepare_feeds(product_deletion => [{
1579             name => 'product',
1580             content => $feed_content,
1581             }] );
1582             # delete the skus locally
1583             $self->_exe_query($self->sqla->update('amazon_mws_products',
1584             {
1585             status => 'deleted',
1586             amws_job_id => $job_id,
1587             },
1588             {
1589             sku => { -in => \@skus },
1590             shop_id => $self->_unique_shop_id,
1591             }));
1592             }
1593              
1594             =head2 delete_skus_feed(@skus)
1595              
1596             Prepare a feed (via C<create_feed>) to delete the given skus.
1597              
1598             =cut
1599              
1600             sub delete_skus_feed {
1601             my ($self, @skus) = @_;
1602             return unless @skus;
1603             my $feeder = $self->generic_feeder;
1604             my $counter = 1;
1605             my @messages;
1606             foreach my $sku (@skus) {
1607             push @messages, {
1608             MessageID => $counter++,
1609             OperationType => 'Delete',
1610             Product => {
1611             SKU => $sku,
1612             }
1613             };
1614             }
1615             return $feeder->create_feed(Product => \@messages);
1616             }
1617              
1618             sub register_order_ack_errors {
1619             my ($self, $job_id, $result) = @_;
1620             my @errors = $result->report_errors;
1621             # we hope to have just one error, but in case...
1622             my %update;
1623             if (@errors > 1) {
1624             my @errors_with_code = grep { $_->{code} } @errors;
1625             my $error_code = AMW_ORDER_WILDCARD_ERROR;
1626             if (@errors_with_code) {
1627             # pick just the first, the field is an integer
1628             $error_code = $errors_with_code[0]{code};
1629             }
1630             my $error_msgs = join('\n', map { $_->{type} . ' ' . $_->{message} . ' ' . $_->{code} } @errors);
1631             %update = (
1632             error_msg => $error_msgs,
1633             error_code => $error_code,
1634             );
1635             }
1636             elsif (@errors) {
1637             my $error = shift @errors;
1638             %update = (
1639             error_msg => $error->{type} . ' ' . $error->{message} . ' ' . $error->{code},
1640             error_code => $error->{code},
1641             );
1642             }
1643             else {
1644             %update = (
1645             error_msg => $result->errors,
1646             error_code => AMW_ORDER_WILDCARD_ERROR,
1647             );
1648             }
1649             if (%update) {
1650             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1651             \%update,
1652             { amws_job_id => $job_id,
1653             shop_id => $self->_unique_shop_id }));
1654             }
1655             else {
1656             warn "register_order_ack_errors couldn't parse " . Dumper($result);
1657             }
1658             # then get the amazon order number and recheck
1659             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
1660             [qw/amazon_order_id
1661             shop_order_id
1662             /],
1663             {
1664             amws_job_id => $job_id,
1665             shop_id => $self->_unique_shop_id,
1666             }));
1667             my ($amw_order_number, $shop_order_id) = $sth->fetchrow_array;
1668             if ($sth->fetchrow_array) {
1669             warn "Multiple jobs found for $job_id in amazon_mws_orders!";
1670             }
1671             $sth->finish;
1672             if (my $status = $self->update_amw_order_status($amw_order_number)) {
1673             warn "$amw_order_number ($shop_order_id) has now status $status!\n";
1674             }
1675             }
1676              
1677             sub register_ship_order_errors {
1678             my ($self, $job_id, $result) = @_;
1679             # here we get the amazon ids,
1680             my @orders = $self->orders_in_shipping_job($job_id);
1681             my $errors = $result->orders_errors;
1682             # filter
1683             my @errors_with_order = grep { $_->{order_id} } @$errors;
1684             my %errs = map { $_->{order_id} => {job_id => $job_id, code => $_->{code}, error => $_->{error}} } @errors_with_order;
1685              
1686             foreach my $ord (@orders) {
1687             if (my $fault = $errs{$ord}) {
1688             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1689             {
1690             shipping_confirmation_error => "$fault->{code} $fault->{error}",
1691             },
1692             {
1693             amazon_order_id => $ord,
1694             shipping_confirmation_job_id => $job_id,
1695             shop_id => $self->_unique_shop_id,
1696             }));
1697             }
1698             else {
1699             # this looks good
1700             $self->_exe_query($self->sqla->update('amazon_mws_orders',
1701             {
1702             shipping_confirmation_ok => 1,
1703             },
1704             {
1705             amazon_order_id => $ord,
1706             shipping_confirmation_job_id => $job_id,
1707             shop_id => $self->_unique_shop_id
1708             }));
1709             }
1710             }
1711             }
1712              
1713              
1714             =head2 register_errors($job_id, $result)
1715              
1716             The first argument is the job ID. The second is a
1717             L<Amazon::MWS::XML::Response::FeedSubmissionResult> object.
1718              
1719             This method will update the status of the products (either C<failed>
1720             or C<redo>) in C<amazon_mws_products>.
1721              
1722             =head2 register_order_ack_errors($job_id, $result);
1723              
1724             Same arguments as above, but for order acknowledgements.
1725              
1726             =head2 register_ship_order_errors($job_id, $result);
1727              
1728             Same arguments as above, but for shipping notifications.
1729              
1730             =cut
1731              
1732             sub register_errors {
1733             my ($self, $job_id, $result) = @_;
1734             # first, get the list of all the skus which were scheduled for this job
1735             # we don't have a products hashref anymore.
1736             # probably we could parse back the produced xml, but looks like an overkill.
1737             # just mark them as redo and wait for the next cron call.
1738             my @products = $self->skus_in_job($job_id);
1739             my $errors = $result->skus_errors;
1740             my @errors_with_sku = grep { $_->{sku} } @$errors;
1741             # turn it into an hash
1742             my %errs = map { $_->{sku} => {job_id => $job_id, code => $_->{code}, error => $_->{error}} } @errors_with_sku;
1743              
1744             foreach my $sku (@products) {
1745             if ($errs{$sku}) {
1746             $self->_exe_query($self->sqla->update('amazon_mws_products',
1747             {
1748             status => 'failed',
1749             error_code => $errs{$sku}->{code},
1750             error_msg => "$errs{$sku}->{job_id} $errs{$sku}->{code} $errs{$sku}->{error}",
1751             },
1752             {
1753             sku => $sku,
1754             shop_id => $self->_unique_shop_id,
1755             }));
1756             }
1757             else {
1758             # this is good, mark it to be redone
1759             $self->_exe_query($self->sqla->update('amazon_mws_products',
1760             {
1761             status => 'redo',
1762             },
1763             {
1764             sku => $sku,
1765             shop_id => $self->_unique_shop_id,
1766             }));
1767             print "Scheduling $sku for redoing\n";
1768             }
1769             }
1770             }
1771              
1772             =head2 skus_in_job($job_id)
1773              
1774             Check the amazon_mws_product for the SKU which were uploaded by the
1775             given job ID.
1776              
1777             =cut
1778              
1779             sub skus_in_job {
1780             my ($self, $job_id) = @_;
1781             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_products',
1782             [qw/sku/],
1783             {
1784             amws_job_id => $job_id,
1785             shop_id => $self->_unique_shop_id,
1786             }));
1787             my @skus;
1788             while (my $row = $sth->fetchrow_hashref) {
1789             push @skus, $row->{sku};
1790             }
1791             return @skus;
1792             }
1793              
1794             =head2 get_asin_for_eans(@eans)
1795              
1796             Accept a list of EANs and return an hashref where the keys are the
1797             eans passed as arguments, and the values are the ASIN for the current
1798             marketplace. Max EANs: 5.x
1799              
1800             http://docs.developer.amazonservices.com/en_US/products/Products_GetMatchingProductForId.html
1801              
1802             =head2 get_asin_for_skus(@skus)
1803              
1804             Same as above (with the same limit of 5 items), but for SKUs.
1805              
1806             =head2 get_asin_for_sku($sku)
1807              
1808             Same as above, but for a single sku. Return the ASIN or undef if not
1809             found.
1810              
1811             =head2 get_asin_for_ean($ean)
1812              
1813             Same as above, but for a single ean. Return the ASIN or undef if not
1814             found.
1815              
1816             =cut
1817              
1818             sub get_asin_for_skus {
1819             my ($self, @skus) = @_;
1820             return $self->_get_asin_for_type(SellerSKU => @skus);
1821             }
1822              
1823             sub get_asin_for_eans {
1824             my ($self, @eans) = @_;
1825             return $self->_get_asin_for_type(EAN => @eans);
1826             }
1827              
1828             sub _get_asin_for_type {
1829             my ($self, $type, @list) = @_;
1830             die "Max 5 products to get the asin for $type!" if @list > 5;
1831             my $client = $self->client;
1832             my $res;
1833             try {
1834             $res = $client->GetMatchingProductForId(IdType => $type,
1835             IdList => \@list,
1836             MarketplaceId => $self->marketplace_id);
1837             }
1838             catch { die Dumper($_) };
1839             my %ids;
1840             if ($res && @$res) {
1841             foreach my $product (@$res) {
1842             $ids{$product->{Id}} = $product->{Products}->{Product}->{Identifiers}->{MarketplaceASIN}->{ASIN};
1843             }
1844             }
1845             return \%ids;
1846             }
1847              
1848             sub get_asin_for_ean {
1849             my ($self, $ean) = @_;
1850             my $res = $self->get_asin_for_eans($ean);
1851             if ($res && $res->{$ean}) {
1852             return $res->{$ean};
1853             }
1854             else {
1855             return;
1856             }
1857             }
1858              
1859             sub get_asin_for_sku {
1860             my ($self, $sku) = @_;
1861             my $res = $self->get_asin_for_skus($sku);
1862             if ($res && $res->{$sku}) {
1863             return $res->{$sku};
1864             }
1865             else {
1866             return;
1867             }
1868             }
1869              
1870              
1871             =head2 get_product_category_data($ean)
1872              
1873             Return the deep data structures returned by
1874             C<GetProductCategoriesForASIN>.
1875              
1876             =head2 get_product_categories($ean)
1877              
1878             Return a list of category codes (the ones passed to
1879             RecommendedBrowseNode) which exists on amazon.
1880              
1881             =cut
1882              
1883             sub get_product_category_data {
1884             my ($self, $ean) = @_;
1885             return unless $ean;
1886             my $asin = $self->get_asin_for_ean($ean);
1887             unless ($asin) {
1888             return;
1889             }
1890             my $res = $self->client
1891             ->GetProductCategoriesForASIN(ASIN => $asin,
1892             MarketplaceId => $self->marketplace_id);
1893             return $res;
1894             }
1895              
1896             =head2 get_product_category_names($ean)
1897              
1898             Return a list of arrayrefs with the category paths. Beware that we
1899             strip the first two parents, which euristically appear meaningless
1900             (Category/Category).
1901              
1902             If this is not a case, please report this as a bug and we'll find a
1903             solution.
1904              
1905             You can call C<get_product_category_data> to inspect the raw response
1906             yourself.
1907              
1908             =cut
1909              
1910             sub get_product_category_names {
1911             my ($self, $ean) = @_;
1912             my $res = $self->get_product_category_data($ean);
1913             if ($res) {
1914             my @category_names;
1915             foreach my $cat (@$res) {
1916             my @list = $self->_parse_amz_cat($cat);
1917             if (@list) {
1918             push @category_names, \@list;
1919             }
1920              
1921             }
1922             return @category_names;
1923             }
1924             else {
1925             warn "ASIN exists but no categories found. Bug?\n";
1926             return;
1927             }
1928             }
1929              
1930             sub _parse_amz_cat {
1931             my ($self, $cat) = @_;
1932             my @path;
1933             while ($cat) {
1934             if ($cat->{ProductCategoryName}) {
1935             push @path, $cat->{ProductCategoryName};
1936             }
1937             $cat = $cat->{Parent};
1938             }
1939             @path = reverse @path;
1940              
1941             # the first two parents are Category/Category.
1942             if (@path >= 2) {
1943             splice(@path, 0, 2);
1944             }
1945             return @path;
1946             }
1947              
1948              
1949             sub get_product_categories {
1950             my ($self, $ean) = @_;
1951             my $res = $self->get_product_category_data($ean);
1952             if ($res) {
1953             my @ids = map { $_->{ProductCategoryId} } @$res;
1954             return @ids;
1955             }
1956             else {
1957             warn "ASIN exists but no categories found. Bug?\n";
1958             return;
1959             }
1960             }
1961              
1962              
1963             # http://docs.developer.amazonservices.com/en_US/products/Products_GetLowestOfferListingsForSKU.html
1964              
1965             =head2 get_lowest_price_for_asin($asin, $condition)
1966              
1967             Return the lowest price for asin, excluding ourselves. The second
1968             argument, condition, is optional and defaults to "New".
1969              
1970             If you need the full details, you have to call
1971             $self->client->GetLowestOfferListingsForASIN yourself and make sense
1972             of the output. This method is mostly a wrapper meant to simplify the
1973             routine.
1974              
1975             If we can't get any info, just return undef.
1976              
1977             Return undef if no prices are found.
1978              
1979             =head2 get_lowest_price_for_ean($ean, $condition)
1980              
1981             Same as above, but use the EAN instead
1982              
1983             =cut
1984              
1985             sub get_lowest_price_for_ean {
1986             my ($self, $ean, $condition) = @_;
1987             return unless $ean;
1988             my $asin = $self->get_asin_for_ean($ean);
1989             return unless $asin;
1990             return $self->get_lowest_price_for_asin($asin, $condition);
1991             }
1992              
1993             sub get_lowest_price_for_asin {
1994             my ($self, $asin, $condition) = @_;
1995             die "Wrong usage, missing argument asin" unless $asin;
1996             my $listing;
1997             try { $listing = $self->client
1998             ->GetLowestOfferListingsForASIN(
1999             ASINList => [ $asin ],
2000             MarketplaceId => $self->marketplace_id,
2001             ExcludeMe => 1,
2002             ItemCondition => $condition || 'New',
2003             );
2004             }
2005             catch { die Dumper($_) };
2006              
2007             return unless $listing && @$listing;
2008             my $lowest;
2009             foreach my $item (@$listing) {
2010             my $current = $item->{Price}->{LandedPrice}->{Amount};
2011             $lowest ||= $current;
2012             if ($current < $lowest) {
2013             $lowest = $current;
2014             }
2015             }
2016             return $lowest;
2017             }
2018              
2019             =head2 shipping_confirmation_feed(@shipped_orders)
2020              
2021             Return a feed string with the shipping confirmation. A list of
2022             L<Amazon::MWS::XML::ShippedOrder> object must be passed.
2023              
2024             =cut
2025              
2026             sub shipping_confirmation_feed {
2027             my ($self, @shipped_orders) = @_;
2028             die "Missing Amazon::MWS::XML::ShippedOrder argument" unless @shipped_orders;
2029             my $feeder = $self->generic_feeder;
2030             my $counter = 1;
2031             my @messages;
2032             foreach my $order (@shipped_orders) {
2033             push @messages, {
2034             MessageID => $counter++,
2035             OrderFulfillment => $order->as_shipping_confirmation_hashref,
2036             };
2037             }
2038             return $feeder->create_feed(OrderFulfillment => \@messages);
2039              
2040             }
2041              
2042             =head2 send_shipping_confirmation($shipped_orders)
2043              
2044             Schedule the shipped orders (an L<Amazon::MWS::XML::ShippedOrder>
2045             object) for the uploading.
2046              
2047             =head2 order_already_shipped($shipped_order)
2048              
2049             Check if the shipped orders (an L<Amazon::MWS::XML::ShippedOrder> was
2050             already notified as shipped looking into our table, returning the row
2051             with the order.
2052              
2053             To see the status, check shipping_confirmation_ok (already done),
2054             shipping_confirmation_error (faulty), shipping_confirmation_job_id (pending).
2055              
2056             =cut
2057              
2058             sub order_already_shipped {
2059             my ($self, $order) = @_;
2060             my $condition = $self->_condition_for_shipped_orders($order);
2061             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => '*', $condition));
2062             if (my $row = $sth->fetchrow_hashref) {
2063             die "Multiple results found in amazon_mws_orders for " . Dumper($condition)
2064             if $sth->fetchrow_hashref;
2065             return $row;
2066             }
2067             else {
2068             return;
2069             }
2070             }
2071              
2072             sub send_shipping_confirmation {
2073             my ($self, @orders) = @_;
2074             my @orders_to_notify;
2075             foreach my $ord (@orders) {
2076             if (my $report = $self->order_already_shipped($ord)) {
2077             if ($report->{shipping_confirmation_ok}) {
2078             print "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id}: already notified\n";
2079             }
2080             elsif (my $error = $report->{shipping_confirmation_error}) {
2081             if ($self->reset_all_errors) {
2082             warn "Submitting again previously failed job $report->{amazon_order_id} $report->{shop_order_id}\n";
2083             push @orders_to_notify, $ord;
2084             }
2085             else {
2086             warn "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id} with error $error\n";
2087             }
2088             }
2089             elsif ($report->{shipping_confirmation_job_id}) {
2090             print "Skipping ship-confirm for order $report->{amazon_order_id} $report->{shop_order_id}: pending\n";
2091             }
2092             else {
2093             push @orders_to_notify, $ord;
2094             }
2095             }
2096             else {
2097             die "It looks like you are trying to send a shipping confirmation "
2098             . " without prior order acknowlegdement. "
2099             . "At least in the amazon_mws_orders there is no trace of "
2100             . "$report->{amazon_order_id} $report->{shop_order_id}";
2101             }
2102             }
2103             return unless @orders_to_notify;
2104             my $feed_content = $self->shipping_confirmation_feed(@orders_to_notify);
2105             # here we have only one feed to upload and check
2106             my $job_id = $self->prepare_feeds(shipping_confirmation => [{
2107             name => 'shipping_confirmation',
2108             content => $feed_content,
2109             }]);
2110             # and store the job id in the table
2111             foreach my $ord (@orders_to_notify) {
2112             $self->_exe_query($self->sqla->update(amazon_mws_orders => {
2113             shipping_confirmation_job_id => $job_id,
2114             shipping_confirmation_error => undef,
2115             },
2116             $self->_condition_for_shipped_orders($ord)));
2117             }
2118             }
2119              
2120             sub _condition_for_shipped_orders {
2121             my ($self, $order) = @_;
2122             die "Missing order" unless $order;
2123             my %condition = (shop_id => $self->_unique_shop_id);
2124             if (my $amazon_order_id = $order->amazon_order_id) {
2125             $condition{amazon_order_id} = $amazon_order_id;
2126             }
2127             elsif (my $order_id = $order->merchant_order_id) {
2128             $condition{shop_order_id} = $order_id;
2129             }
2130             else {
2131             die "Missing amazon_order_id or merchant_order_id";
2132             }
2133             return \%condition;
2134             }
2135              
2136              
2137             =head2 orders_waiting_for_shipping
2138              
2139             Return a list of hashref with two keys, C<amazon_order_id> and
2140             C<shop_order_id> for each order which is waiting confirmation.
2141              
2142             This is implemented looking into amazon_mws_orders where there is no
2143             shipping confirmation job id.
2144              
2145             The confirmed flag (which means we acknowledged the order) is ignored
2146             to avoid stuck order_ack jobs to prevent the shipping confirmation.
2147              
2148             =cut
2149              
2150             sub orders_waiting_for_shipping {
2151             my $self = shift;
2152             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
2153             [qw/amazon_order_id
2154             shop_order_id/],
2155             {
2156             shop_id => $self->_unique_shop_id,
2157             shipping_confirmation_job_id => undef,
2158             # do not stop the unconfirmed to be considered
2159             # confirmed => 1,
2160             }));
2161             my @out;
2162             while (my $row = $sth->fetchrow_hashref) {
2163             push @out, $row;
2164             }
2165             return @out;
2166             }
2167              
2168             =head2 product_needs_upload($sku, $timestamp)
2169              
2170             Lookup the product $sku with timestamp $timestamp and return the sku
2171             if the product needs to be uploaded or can be safely skipped. This
2172             method is stateless and doesn't alter anything.
2173              
2174             =cut
2175              
2176             sub product_needs_upload {
2177             my ($self, $sku, $timestamp) = @_;
2178             my $debug = $self->debug;
2179             return unless $sku;
2180              
2181             my $forced = $self->_force_hashref;
2182             # if it's forced, we have nothing to check, just pass it.
2183             if ($forced->{$sku}) {
2184             print "Forcing $sku as requested\n" if $debug;
2185             return $sku;
2186             }
2187              
2188             $timestamp ||= 0;
2189             my $existing = $self->existing_products;
2190              
2191             if (exists $existing->{$sku}) {
2192             if (my $exists = $existing->{$sku}) {
2193              
2194             my $status = $exists->{status} || '';
2195              
2196             if ($status eq 'ok') {
2197             if ($exists->{timestamp_string} eq $timestamp) {
2198             return;
2199             }
2200             else {
2201             return $sku;
2202             }
2203             }
2204             elsif ($status eq 'redo') {
2205             return $sku;
2206             }
2207             elsif ($status eq 'failed') {
2208             if ($self->reset_all_errors) {
2209             return $sku;
2210             }
2211             elsif (my $reset = $self->_reset_error_structure) {
2212             # option for this error was passed.
2213             my $error = $exists->{error_code};
2214             my $match = $reset->{codes}->{$error};
2215             if (($match && $reset->{negate}) or
2216             (!$match && !$reset->{negate})) {
2217             # was passed !this error or !random , so do not reset
2218             print "Skipping failed item $sku with error code $error\n" if $debug;
2219             return;
2220             }
2221             else {
2222             # otherwise reset
2223             print "Resetting error for $sku with error code $error\n" if $debug;
2224             return $sku;
2225             }
2226             }
2227             else {
2228             print "Skipping failed item $sku\n" if $debug;
2229             return;
2230             }
2231             }
2232             elsif ($status eq 'pending') {
2233             print "Skipping pending item $sku\n" if $debug;
2234             return;
2235             }
2236             die "I shouldn't have reached this point with status <$status>";
2237             }
2238             }
2239             print "$sku wasn't uploaded so far, scheduling it\n" if $debug;
2240             return $sku;
2241             }
2242              
2243             =head2 orders_in_shipping_job($job_id)
2244              
2245             Lookup the C<amazon_mws_orders> table and return a list of
2246             C<amazon_order_id> for the given shipping confirmation job. INTERNAL.
2247              
2248             =cut
2249              
2250             sub orders_in_shipping_job {
2251             my ($self, $job_id) = @_;
2252             die unless $job_id;
2253             my $sth = $self->_exe_query($self->sqla->select(amazon_mws_orders => [qw/amazon_order_id/],
2254             {
2255             shipping_confirmation_job_id => $job_id,
2256             shop_id => $self->_unique_shop_id,
2257             }));
2258             my @orders;
2259             while (my $row = $sth->fetchrow_hashref) {
2260             push @orders, $row->{amazon_order_id};
2261             }
2262             return @orders;
2263             }
2264              
2265             =head2 put_product_on_error(sku => $sku, timestamp_string => $timestamp, error_code => $error_code, error_msg => $error)
2266              
2267             Register a custom error for the product $sku with error $error and
2268             $timestamp as the timestamp string. The error is optional, and will be
2269             "shop error" if not provided. The error code will be 1 if not provided.
2270              
2271             =cut
2272              
2273             sub put_product_on_error {
2274             my ($self, %product) = @_;
2275             die "Missing sku" unless $product{sku};
2276             die "Missing timestamp" unless defined $product{timestamp_string};
2277              
2278             my %identifier = (
2279             shop_id => $self->_unique_shop_id,
2280             sku => $product{sku},
2281             );
2282             my %errors = (
2283             status => 'failed',
2284             error_msg => $product{error_msg} || 'shop error',
2285             error_code => $product{error_code} || 1,
2286             timestamp_string => $product{timestamp_string},
2287             );
2288              
2289              
2290             # check if we have it
2291             my $sth = $self->_exe_query($self->sqla
2292             ->select('amazon_mws_products',
2293             [qw/sku/], { %identifier }));
2294             if ($sth->fetchrow_hashref) {
2295             $sth->finish;
2296             print "Updating $product{sku} with error $product{error_msg}\n";
2297             $self->_exe_query($self->sqla->update('amazon_mws_products',
2298             \%errors, \%identifier));
2299             }
2300             else {
2301             print "Inserting $identifier{sku} with error $errors{error_msg}\n";
2302             $self->_exe_query($self->sqla
2303             ->insert('amazon_mws_products',
2304             {
2305             %identifier,
2306             %errors,
2307             }));
2308             }
2309             }
2310              
2311              
2312             =head2 cancel_feed($feed_id)
2313              
2314             Call the CancelFeedSubmissions API and abort the feed and the
2315             belonging job if found in the list. Return the response, which
2316             probably is not even meaningful. It is a big FeedSubmissionInfo with
2317             the past feed submissions.
2318              
2319             =cut
2320              
2321             sub cancel_feed {
2322             my ($self, $feed) = @_;
2323             die "Missing feed id argument" unless $feed;
2324             # do the api call
2325             my $sth = $self->_exe_query($self->sqla
2326             ->select(amazon_mws_feeds => [qw/amws_job_id/],
2327             {
2328             shop_id => $self->_unique_shop_id,
2329             feed_id => $feed,
2330             aborted => 0,
2331             success => 0,
2332             processing_complete => 0,
2333             }));
2334             my $feed_record = $sth->fetchrow_hashref;
2335             if ($feed_record) {
2336             $sth->finish;
2337             print "Found $feed in pending state\n";
2338             # abort it on our side
2339             $self->_exe_query($self->sqla
2340             ->update('amazon_mws_feeds',
2341             {
2342             aborted => 1,
2343             errors => "Canceled by shop action",
2344             },
2345             {
2346             feed_id => $feed,
2347             shop_id => $self->_unique_shop_id,
2348             }));
2349             # and abort the job as well
2350             $self->_exe_query($self->sqla
2351             ->update('amazon_mws_jobs',
2352             {
2353             aborted => 1,
2354             status => "Job aborted by cancel_feed",
2355             },
2356             {
2357             amws_job_id => $feed_record->{amws_job_id},
2358             shop_id => $self->_unique_shop_id,
2359             }));
2360             # and set the belonging products to redo
2361             $self->_exe_query($self->sqla
2362             ->update('amazon_mws_products',
2363             {
2364             status => 'redo',
2365             },
2366             {
2367             amws_job_id => $feed_record->{amws_job_id},
2368             shop_id => $self->_unique_shop_id,
2369             }));
2370             }
2371             else {
2372             warn "No $feed found in pending list, trying to cancel anyway\n";
2373             }
2374             return $self->client->CancelFeedSubmissions(IdList => [ $feed ]);
2375             }
2376              
2377             sub _error_logger {
2378             my ($self, $error_or_warning, $error_code, $message) = @_;
2379             my $mode = 'warn';
2380             my $modes = $self->skus_warnings_modes;
2381             my $out_message = "$error_or_warning: $message ($error_code)\n";
2382             # hardcode 8008 as print
2383             $modes->{8008} = 'print';
2384             if (exists $modes->{$error_code}) {
2385             $mode = $modes->{$error_code};
2386             }
2387             if ($mode eq 'print') {
2388             print $out_message;
2389             }
2390             elsif ($mode eq 'warn') {
2391             warn $out_message;
2392             }
2393             elsif ($mode eq 'skip') {
2394             # do nothing
2395             }
2396             else {
2397             warn "Invalid mode $mode for $out_message";
2398             }
2399             }
2400              
2401             =head2 update_amw_order_status($amazon_order_number)
2402              
2403             Check the order status on Amazon and update the row in the
2404             amazon_mws_orders table.
2405              
2406             =cut
2407              
2408             sub update_amw_order_status {
2409             my ($self, $order) = @_;
2410             # first, check if it exists
2411             return unless $order;
2412             my $sth = $self->_exe_query($self->sqla->select('amazon_mws_orders',
2413             '*',
2414             {
2415             amazon_order_id => $order,
2416             shop_id => $self->_unique_shop_id,
2417             }));
2418             my $order_ref = $sth->fetchrow_hashref;
2419             die "Multiple rows found for $order!" if $sth->fetchrow_hashref;
2420             print Dumper($order_ref);
2421             my $res = $self->client->GetOrder(AmazonOrderId => [$order]);
2422             my $amazon_order;
2423             if ($res && $res->{Orders}->{Order} && @{$res->{Orders}->{Order}}) {
2424             if (@{$res->{Orders}->{Order}} > 1) {
2425             warn "Multiple results for order $order: " . Dumper($res);
2426             }
2427             $amazon_order = $res->{Orders}->{Order}->[0];
2428             }
2429             else {
2430             warn "Order $order not found on amazon!"
2431             }
2432             print Dumper($amazon_order);
2433             my $obj = Amazon::MWS::XML::Order->new(order => $amazon_order);
2434             my $status = $obj->order_status;
2435             $self->_exe_query($self->sqla->update('amazon_mws_orders',
2436             { status => $status },
2437             {
2438             amazon_order_id => $order,
2439             shop_id => $self->_unique_shop_id,
2440             }));
2441             return $status;
2442             }
2443              
2444             =head2 get_products_with_error_code(@error_codes)
2445              
2446             Return a list of hashref with the rows from C<amazon_mws_products> for
2447             the current shop and the error code passed as argument. If no error
2448             codes are passed, fetch all the products in error.
2449              
2450             =head2 get_products_with_warnings
2451              
2452             Returns a list of hashref, with C<sku> and C<warnings> as keys, for
2453             each product in the shop which has the warnings set to something.
2454              
2455             =cut
2456              
2457             sub get_products_with_error_code {
2458             my ($self, @errcodes) = @_;
2459             my $where = { '>' => 0 };
2460             if (@errcodes) {
2461             $where = { -in => \@errcodes };
2462             }
2463             my $sth = $self->_exe_query($self->sqla
2464             ->select('amazon_mws_products', '*',
2465             {
2466             status => { '!=' => 'deleted' },
2467             shop_id => $self->_unique_shop_id,
2468             error_code => $where,
2469             },
2470             [qw/error_code sku/]));
2471             my @records;
2472             while (my $row = $sth->fetchrow_hashref) {
2473             push @records, $row;
2474             }
2475             return @records;
2476             }
2477              
2478             sub get_products_with_warnings {
2479             my $self = shift;
2480             my $sth = $self->_exe_query($self->sqla
2481             ->select('amazon_mws_products', '*',
2482             {
2483             status => 'ok',
2484             shop_id => $self->_unique_shop_id,
2485             warnings => { '!=' => '' },
2486             },
2487             [qw/sku warnings/]));
2488             my @records;
2489             while (my $row = $sth->fetchrow_hashref) {
2490             push @records, $row;
2491             }
2492             return @records;
2493             }
2494              
2495             =head2 mark_failed_products_as_redo(@skus)
2496              
2497             Alter the status of the failed skus passed as argument from 'failed'
2498             to 'redo' to trigger an update.
2499              
2500             =cut
2501              
2502             sub mark_failed_products_as_redo {
2503             my ($self, @skus) = @_;
2504             return unless @skus;
2505             $self->_exe_query($self->sqla->update('amazon_mws_products',
2506             {
2507             status => 'redo',
2508             },
2509             {
2510             shop_id => $self->_unique_shop_id,
2511             status => 'failed',
2512             sku => { -in => \@skus },
2513             }));
2514             }
2515              
2516             =head2 get_products_with_amazon_shop_mismatches(@errors)
2517              
2518             Parse the amazon_mws_products and return an hashref where the keys are
2519             the failed skus, and the values are hashrefs where the keys are the
2520             mismatched fields and the values are hashrefs with these keys:
2521              
2522             Mismatched fields may be: C<part_number>, C<title>, C<manufacturer>,
2523             C<brand>, C<color>, C<size>
2524              
2525             =over 4
2526              
2527             =item shop
2528              
2529             The value on the shop
2530              
2531             =item amazon
2532              
2533             The value of the amazon product
2534              
2535             =item error_code
2536              
2537             The error code
2538              
2539             =back
2540              
2541             E.g.
2542              
2543             my $mismatches = {12344 => {
2544             part_number => {
2545             shop => 'XY',
2546             amazon => 'XYZ',
2547             error_code => '8541',
2548             },
2549             title => {
2550             shop => 'ABC',
2551             amazon => 'DFG',
2552             error_code => '8541',
2553             },
2554             },
2555             .....
2556             };
2557              
2558             Optionally, if the error codes are passed to the argument, only those
2559             errors are fetches.
2560              
2561             =cut
2562              
2563              
2564             sub get_products_with_amazon_shop_mismatches {
2565             my ($self, @errors) = @_;
2566             # so far only this code is for mismatches
2567             my %mismatches;
2568             my @faulty = $self->get_products_with_error_code(@errors);
2569             foreach my $product (@faulty) {
2570             # only if failed.
2571             next if $product->{status} ne 'failed';
2572             my $msg = $product->{error_msg};
2573             my $error_code = $product->{error_code};
2574             my $sku = $product->{sku};
2575             my $errors = $self->_parse_error_message_mismatches($msg);
2576             foreach my $key (keys %$errors) {
2577             # in this case, we are interested only in the pairs
2578             if (ref($errors->{$key}) eq 'HASH') {
2579             # we have the pair, so add the error code and report
2580             $errors->{$key}->{error_code} = $error_code;
2581             $mismatches{$sku}{$key} = $errors->{$key};
2582             }
2583             }
2584             }
2585             return \%mismatches;
2586             }
2587              
2588             =head2 get_products_with_mismatches(@errors)
2589              
2590             Similar to C<get_products_with_amazon_shop_mismatches>, but instead
2591             return an arrayref where each element is a hashref with all the info
2592             collapsed.
2593              
2594             The structures reported by C<get_products_with_amazon_shop_mismatches>
2595             are flattened with an C<our_> and C<amazon_> prefix.
2596              
2597             our_part_number => 'XY',
2598             amazon_part_number => 'YZ',
2599             our_title = 'xx',
2600             amazon_title => 'yy',
2601             # etc.
2602              
2603              
2604             =cut
2605              
2606             sub get_products_with_mismatches {
2607             my ($self, @errors) = @_;
2608             my @faulty = $self->get_products_with_error_code(@errors);
2609             my @out;
2610             while (@faulty) {
2611             my $product = shift @faulty;
2612             my $errors = $self->_parse_error_message_mismatches($product->{error_msg});
2613             push @out, { %$product, %$errors };
2614             }
2615             return \@out;
2616             }
2617              
2618             sub _parse_error_message_mismatches {
2619             my ($self, $message) = @_;
2620             return {} unless $message;
2621             my %patterns = %{$self->_mismatch_patterns};
2622             my %out;
2623             foreach my $key (keys %patterns) {
2624             # if the pattern start we shop_ or amazon_, it's a pair
2625             my ($mismatch, $target);
2626             if ($key =~ /\A(shop|amazon)_(.+)/) {
2627             $target = $1;
2628             $mismatch = $2;
2629             }
2630             if ($message =~ $patterns{$key}) {
2631             my $value = $1;
2632             if ($target && $mismatch) {
2633             $out{$mismatch}{$target} = $value;
2634             }
2635             # and in any case store a scalar (and let's hope not to conflict)
2636             $out{$key} = $value;
2637             }
2638             }
2639             return \%out;
2640             }
2641              
2642             =head2 Order Report
2643              
2644             To get this feature working, you need an C<amzn-envelope.xsd> with
2645             OrderReport plugged in. Older versions are broken. Newer schema
2646             versions may have missing Amazon.xsd file. So either you ask amazon to
2647             give you a B<full set of xsd, which inclused OrderReport in
2648             amzn-envelope.xsd> or you apply this patch to amzn-envelope.xsd:
2649              
2650             --- a/amzn-envelope.xsd 2014-10-27 10:26:19.000000000 +0100
2651             +++ b/amzn-envelope.xsd 2015-03-26 10:56:16.000000000 +0100
2652             @@ -23,2 +23,3 @@
2653             <xsd:include schemaLocation="Price.xsd"/>
2654             + <xsd:include schemaLocation="OrderReport.xsd"/>
2655             <xsd:include schemaLocation="ProcessingReport.xsd"/>
2656             @@ -41,2 +42,3 @@
2657             <xsd:enumeration value="OrderFulfillment"/>
2658             + <xsd:enumeration value="OrderReport"/>
2659             <xsd:enumeration value="Override"/>
2660             @@ -83,2 +85,3 @@
2661             <xsd:element ref="OrderFulfillment"/>
2662             + <xsd:element ref="OrderReport"/>
2663             <xsd:element ref="Override"/>
2664              
2665             =head3 get_unprocessed_orders
2666              
2667             Return a list of objects with the orders.
2668              
2669             =head3 get_unprocessed_order_report_ids
2670              
2671             Return a list of unprocessed (i.e., which weren't acknowledged by us)
2672             order report ids.
2673              
2674             =cut
2675              
2676             sub get_unprocessed_orders {
2677             my ($self) = @_;
2678             my @ids = $self->get_unprocessed_order_report_ids;
2679             my @orders = $self->get_order_reports_by_id(@ids);
2680             return @orders;
2681             }
2682              
2683             sub get_unprocessed_order_report_ids {
2684             my ($self, %options) = @_;
2685             my $res;
2686             try {
2687             $res = $self->client
2688             ->GetReportList(Acknowledged => 0,
2689             ReportTypeList => ['_GET_ORDERS_DATA_'],
2690             %options,
2691             );
2692             } catch {
2693             _handle_exception($_);
2694             };
2695              
2696             my @reportids;
2697              
2698             # for now, do not ask for the next token, we will process them all
2699             # eventually
2700              
2701             if ($res and $res->{ReportInfo}) {
2702             foreach my $report (@{$res->{ReportInfo}}) {
2703             if (my $id = $report->{ReportId}) {
2704             push @reportids, $id;
2705             }
2706             }
2707             }
2708             return @reportids;
2709             }
2710              
2711              
2712             =head3 get_order_reports_by_id(@id_list)
2713              
2714             The GetReport operation has a maximum request quota of 15 and a
2715             restore rate of one request every minute.
2716              
2717             =cut
2718              
2719             sub get_order_reports_by_id {
2720             my ($self, @ids) = @_;
2721             my @orders;
2722             foreach my $id (@ids) {
2723             my $xml;
2724             try {
2725             $xml = $self->client->GetReport(ReportId => $id);
2726             } catch {
2727             _handle_exception($_);
2728             };
2729             if ($xml) {
2730             if (my @got = $self->_parse_order_reports_xml($xml)) {
2731             push @orders, @got;
2732             }
2733             }
2734             }
2735             return @orders;
2736             }
2737              
2738             sub _parse_order_reports_xml {
2739             my ($self, $xml) = @_;
2740             my @orders;
2741             my $data = $self->xml_reader->($xml);
2742             if (my $messages = $data->{Message}) {
2743             foreach my $message (@$messages) {
2744             if (my $order = $message->{OrderReport}) {
2745             my $order_object = Amazon::MWS::XML::Response::OrderReport->new(struct => $order);
2746             push @orders, $order_object;
2747             }
2748             else {
2749             die "Cannot found expected OrderReport in " . Dumper($message);
2750             }
2751             }
2752             }
2753             return @orders;
2754             }
2755              
2756              
2757             =head2 acknowledge_reports(@ids)
2758              
2759             Mark the reports as processed.
2760              
2761             =head2 unacknowledge_reports(@ids)
2762              
2763             Mark the reports as not processed.
2764              
2765             =cut
2766              
2767             sub acknowledge_reports {
2768             my ($self, @ids) = @_;
2769             $self->_toggle_ack_reports(1, @ids);
2770             }
2771              
2772             sub unacknowledge_reports {
2773             my ($self, @ids) = @_;
2774             $self->_toggle_ack_reports(0, @ids);
2775             }
2776              
2777             sub _toggle_ack_reports {
2778             my ($self, $flag, @ids) = @_;
2779             return unless @ids;
2780             while (@ids) {
2781             # max 100 ids per run
2782             my @list = splice(@ids, 0, 100);
2783             try {
2784             $self->client->UpdateReportAcknowledgements(ReportIdList => \@list,
2785             Acknowledged => $flag);
2786             } catch {
2787             _handle_exception($_);
2788             };
2789             }
2790             return;
2791             }
2792              
2793             sub _handle_exception {
2794             my ($err) = @_;
2795             if (blessed $err) {
2796             my $msg;
2797             if ( $err->isa('Amazon::MWS::Exception::Throttled') ) {
2798             $msg = $err->xml;
2799             }
2800             elsif ( $err->isa('Amazon::MWS::Exception')) {
2801             if (my $string = $err->error) {
2802             $msg = $string;
2803             }
2804             else {
2805             $msg = Dumper($err);
2806             }
2807             }
2808             else {
2809             $msg = Dumper($err);
2810             }
2811             if ( $err->isa('Amazon::MWS::Exception')) {
2812             $msg .= "\n" . $err->trace->as_string . "\n";
2813             }
2814             die $msg;
2815             }
2816             die $err;
2817             }
2818              
2819             =head2 job_timed_out($job_row) [INTERNAL]
2820              
2821             Check if the hashref (which is a hashref of the amazon_mws_jobs row)
2822             has timed out, comparing with the C<order_ack_days_timeout> and
2823             C<job_hours_timeout> (depending on the job).
2824              
2825              
2826             =cut
2827              
2828             sub job_timed_out {
2829             my ($self, $job_row) = @_;
2830             my $task = $job_row->{task};
2831             die "Missing task in $job_row->{amws_job_id}" unless $task;
2832             my $started = $job_row->{job_started_epoch};
2833             die "Missing job_started_epoch in $job_row->{amws_job_id}" unless $started;
2834             my $now = time();
2835             my $timeout;
2836             if ($task eq 'order_ack') {
2837             $timeout = $self->order_ack_days_timeout * 60 * 60 * 24;
2838             }
2839             else {
2840             $timeout = $self->job_hours_timeout * 60 * 60;
2841             }
2842             die "Something is off, timeout not defined" unless defined $timeout;
2843             return unless $timeout;
2844             my $elapsed = $now - $started;
2845             if ($elapsed > $timeout) {
2846             return $elapsed;
2847             }
2848             else {
2849             return;
2850             }
2851             }
2852              
2853             sub _print_or_warn_error {
2854             my ($self, @args) = @_;
2855             my $action;
2856             if (@args) {
2857             if ($self->quiet) {
2858             $action = 'print';
2859             print @args;
2860             }
2861             else {
2862             $action = 'warn';
2863             warn @args;
2864             }
2865             }
2866             return ($action, @args);
2867             }
2868              
2869             =head2 purge_old_jobs($limit)
2870              
2871             Eventually the jobs and feed tables grow and never get purged. You can
2872             call this method to remove from the db all the feeds older than
2873             C<order_ack_days_timeout> (30 by default).
2874              
2875             To avoid too much load on the db, you can set the limit to purge the
2876             jobs. Defaults to 500. Set it to 0 to disable it.
2877              
2878             =cut
2879              
2880             sub purge_old_jobs {
2881             my ($self, $limit) = @_;
2882             unless (defined $limit) {
2883             $limit = 500;
2884             }
2885             my $range = time() - $self->order_ack_days_timeout * 60 * 60 * 24;
2886             my @and = (
2887             task => [qw/product_deletion
2888             upload/],
2889             job_started_epoch => { '<', $range },
2890             [ -or => {
2891             aborted => 1,
2892             success => 1,
2893             },
2894             ],
2895             );
2896             if (my $shop_id = $self->shop_id) {
2897             push @and, shop_id => $shop_id;
2898             }
2899              
2900             my $sth = $self->_exe_query($self->sqla
2901             ->select(amazon_mws_jobs => [qw/amws_job_id shop_id/],
2902             [ -and => \@and ] ));
2903             my @purge_jobs;
2904             my $count = 0;
2905             while (my $where = $sth->fetchrow_hashref) {
2906             if ($limit) {
2907             last if $count++ > $limit;
2908             }
2909             push @purge_jobs, $where;
2910             }
2911             $sth->finish;
2912             if (@purge_jobs) {
2913             $self->_exe_query($self->sqla->delete(amazon_mws_feeds => \@purge_jobs));
2914             $self->_exe_query($self->sqla->delete(amazon_mws_jobs => \@purge_jobs));
2915             while (@purge_jobs) {
2916             my $feed = shift @purge_jobs;
2917             my $dir = path($self->feed_dir)->child($feed->{shop_id}, $feed->{amws_job_id});
2918             if ($dir->exists) {
2919             print "Removing " . $dir->canonpath . "\n"; # unless $self->quiet;
2920             $dir->remove_tree;
2921             }
2922             else {
2923             print "$dir doesn't exist\n";
2924             }
2925             }
2926             }
2927             else {
2928             print "Nothing to purge\n" unless $self->quiet;
2929             }
2930             }
2931              
2932             1;