File Coverage

blib/lib/DBIx/QuickORM/STH/Fork.pm
Criterion Covered Total %
statement 64 74 86.4
branch 25 40 62.5
condition 3 9 33.3
subroutine 15 18 83.3
pod 0 9 0.0
total 107 150 71.3


line stmt bran cond sub pod time code
1             package DBIx::QuickORM::STH::Fork;
2 24     24   209 use strict;
  24         59  
  24         1016  
3 24     24   133 use warnings;
  24         68  
  24         2103  
4              
5             our $VERSION = '0.000019';
6              
7 24     24   181 use Role::Tiny::With qw/with/;
  24         81  
  24         14689  
8             with 'DBIx::QuickORM::Role::STH';
9             with 'DBIx::QuickORM::Role::Async';
10              
11 24     24   187 use Carp qw/croak/;
  24         53  
  24         1537  
12 24     24   204 use Time::HiRes qw/sleep/;
  24         54  
  24         280  
13 24     24   53096 use Cpanel::JSON::XS qw/decode_json/;
  24         163318  
  24         2063  
14              
15 24     24   761 use IO::Select;
  24         2311  
  24         1808  
16              
17 24         308 use DBIx::QuickORM::Util::HashBase qw{
18             <connection
19             <source
20              
21             only_one
22              
23             +dialect
24             +ready
25             <got_result
26             <done
27             <pid
28             <pipe
29             <ios
30 24     24   194 };
  24         52  
31              
32 0     0 0 0 sub cancel_supported { 1 }
33              
34 0   0 0 0 0 sub dialect { $_[0]->{+DIALECT} //= $_[0]->{+CONNECTION}->dialect }
35 5     5 0 67 sub clear { $_[0]->{+CONNECTION}->clear_fork($_[0]) }
36              
37             sub init {
38 5     5 0 30 my $self = shift;
39              
40 5 50       117 croak "'pid' is a required attribute" unless $self->{+PID};
41 5 50       167 croak "'pipe' is a required attribute" unless $self->{+PIPE};
42 5 50       54 croak "'connection' is a required attribute" unless $self->{+CONNECTION};
43 5 50       51 croak "'source' is a required attribute" unless $self->{+SOURCE};
44             }
45              
46             sub ready {
47 14     14 0 201626 my $self = shift;
48 14 100       126 return 1 if $self->{+READY};
49              
50 10   66     631 my $ios = $self->{+IOS} //= IO::Select->new($self->{+PIPE});
51 10 100       1389 return unless $ios->can_read(0);
52              
53 5         426 return $self->{+READY} = 1;
54             }
55              
56             sub result {
57 5     5 0 45 my $self = shift;
58 5 50       25 return $self->{+GOT_RESULT} if $self->{+GOT_RESULT};
59              
60 5 100       137 $self->wait unless $self->{+READY};
61              
62 5         32 my $pipe = $self->{+PIPE};
63 5         169 my $line = <$pipe>;
64 5         105 my $data = decode_json($line);
65              
66 5 50 33     69 unless ($data && exists $data->{result}) {
67 0         0 chomp($line);
68 0         0 croak "Got invalid data from pipe: $line";
69             }
70              
71 5         39 return $self->{+GOT_RESULT} = $data->{result};
72             }
73              
74             sub cancel {
75 0     0 0 0 my $self = shift;
76              
77 0 0       0 return if $self->{+DONE};
78              
79 0 0       0 close(delete $self->{+PIPE}) if $self->{+PIPE};
80              
81 0 0       0 kill('TERM', $self->{+PID}) or die "Could not kill pid $self->{+PID}: $!\n";
82              
83 0         0 $self->clear;
84 0         0 $self->{+DONE} = 1;
85             }
86              
87             sub next {
88 9     9 0 34 my $self = shift;
89 9         48 my $row = $self->_next;
90              
91 9 100       40 if ($self->{+ONLY_ONE}) {
92 3 100       23 croak "Expected only 1 row, but got more than one" if $self->_next;
93 2         10 $self->set_done;
94             }
95              
96 8         39 return $row;
97             }
98              
99             sub _next {
100 12     12   59 my $self = shift;
101              
102 12 50       56 return if $self->{+DONE};
103              
104 12 100       72 $self->result unless $self->{+GOT_RESULT};
105              
106 12         27 my $pipe = $self->{+PIPE};
107 12         69 my $line = <$pipe>;
108 12 100       35 if (defined $line) {
109 9         90 my $row = decode_json($line);
110 9 50       601 return $row if $row;
111             }
112              
113 3         37 $self->set_done;
114              
115 3         16 return;
116             }
117              
118             sub set_done {
119 9     9 0 42 my $self = shift;
120              
121 9 100       40 return if $self->{+DONE};
122              
123 5 50       161 close(delete $self->{+PIPE}) if $self->{+PIPE};
124 5         40 $self->clear;
125 5         26 $self->{+DONE} = 1;
126             }
127              
128             1;