File Coverage

blib/lib/Eixo/Queue/SocketPair.pm
Criterion Covered Total %
statement 56 74 75.6
branch 5 12 41.6
condition n/a
subroutine 16 18 88.8
pod 0 4 0.0
total 77 108 71.3


line stmt bran cond sub pod time code
1             package Eixo::Queue::SocketPair;
2              
3 1     1   83006 use strict;
  1         2  
  1         41  
4 1     1   609 use Eixo::Base::Clase qw(Eixo::QueueInmediate);
  1         14372  
  1         6  
5              
6              
7 1     1   782 use Socket;
  1         4185  
  1         540  
8 1     1   599 use IO::Handle;
  1         5989  
  1         68  
9 1     1   682 use IO::Select;
  1         1901  
  1         58  
10 1     1   538 use Eixo::Queue::Job;
  1         10  
  1         8  
11              
12 1     1   9788 use Eixo::Queue::SocketPairDriver;
  1         4  
  1         7  
13              
14             has(
15              
16             backend=>undef,
17              
18             pid_c=>undef,
19              
20             initiated=>undef,
21              
22             jobSent=>undef,
23              
24             input=>undef,
25              
26             output=>undef,
27              
28             );
29              
30             sub init{
31 1     1 0 202 my ($self) = @_;
32              
33 1         3 my ($driver) = $self->__openCommunications;
34            
35 1         53 $self->__startBackend($driver);
36              
37 1         24 $self->initiated(1);
38              
39             }
40              
41             sub DESTROY{
42 1     1   685 my ($self) = @_;
43              
44 1 50       4 if($self->pid_c){
45              
46 1         9 kill(10, $self->pid_c);
47              
48 1         631 waitpid($self->pid_c, 0);
49              
50             }
51              
52             }
53              
54              
55             sub __openCommunications{
56 3     3   60 my ($self) = @_;
57              
58 3         6 &Eixo::Queue::SocketPairDriver::open();
59             }
60              
61             sub __startBackend{
62 1     1   2 my ($self, $driver) = @_;
63              
64 1         2 my ($a, $b) = ($self->__openCommunications(), $self->__openCommunications);
65              
66              
67 1 50       933 if(my $pid = fork){
68            
69 1         42 $a->A;
70 1         12 $b->A;
71              
72 1         8 $self->{input} = $a;
73 1         7 $self->{output} = $b;
74              
75 1         10 $self->pid_c($pid);
76              
77             }
78             else{
79 0         0 $a->B;
80 0         0 $b->B;
81              
82 0         0 $self->{input} = $b;
83 0         0 $self->{output} = $a;
84              
85 0         0 eval{
86              
87 0         0 $self->__backendLoop();
88              
89             };
90 0 0       0 if($@){
91            
92 1     1   385 use Data::Dumper;
  1         2  
  1         444  
93              
94 0         0 print Dumper($@);
95              
96 0         0 exit 1;
97             }
98              
99 0         0 exit 0;
100            
101             }
102            
103             }
104              
105             sub __backendLoop{
106 0     0   0 my ($self) = @_;
107              
108 0         0 while(my $job = $self->input->receive){
109              
110 0         0 $job = Eixo::Queue::Job->unserialize($job);
111              
112 0         0 $self->backend->($job);
113              
114 0         0 $self->output->send($job->serialize);
115             }
116             }
117              
118             sub add{
119 1000     1000 0 937 my ($self, $job) = @_;
120              
121 1000 50       2241 unless($self->initiated){
122 0         0 die(ref($self) . '::add Queue not initiated');
123             }
124              
125 1000 50       5463 return undef if($self->jobSent);
126              
127 1000         4591 $self->__toBackend($job);
128              
129 1000         4540 $self->jobSent(1);
130              
131 1000         5731 return 1;
132             }
133              
134             sub __toBackend{
135 1000     1000   954 my ($self, $job) = @_;
136              
137 1000         2041 $self->output->send($job->serialize);
138             }
139              
140             sub wait{
141 1000     1000 0 1074 my ($self) = @_;
142              
143 1000 50       1638 if($self->jobSent){
144              
145 1000         5542 $self->__waitBackend;
146              
147             }
148             }
149             sub __waitBackend{
150              
151 1000     1000   2112 my $data = $_[0]->input->receive;
152              
153 1000         3781 $_[0]->jobSent(0);
154              
155 1000         6259 return Eixo::Queue::Job->unserialize($data);
156              
157            
158             }
159              
160              
161             sub status{
162 0     0 0   my ($self) = @_;
163              
164 0           $self->jobSent;
165             }
166              
167             1;