File Coverage

blib/lib/Mojo/IOLoop/HoboProcess.pm
Criterion Covered Total %
statement 30 80 37.5
branch 2 22 9.0
condition 0 13 0.0
subroutine 10 18 55.5
pod 3 4 75.0
total 45 137 32.8


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Like Mojo::IOLoop::Subprocess, spawns subprocesses with MCE::Hobo instead.
4             ##
5             ###############################################################################
6              
7             package Mojo::IOLoop::HoboProcess;
8              
9 2     2   529637 use strict;
  2         6  
  2         60  
10 2     2   13 use warnings;
  2         5  
  2         77  
11              
12 2     2   12 use Mojo::Base -base;
  2         8  
  2         15  
13              
14             our $VERSION = '0.005';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17              
18 2     2   317 use Carp 'croak';
  2         4  
  2         86  
19 2     2   1016 use MCE::Hobo;
  2         98903  
  2         14  
20 2     2   132 use Mojo::IOLoop;
  2         8  
  2         24  
21 2     2   82 use Mojo::IOLoop::Stream;
  2         7  
  2         29  
22 2     2   71 use Scalar::Util 'weaken';
  2         6  
  2         178  
23 2     2   15 use Socket qw( PF_UNIX PF_UNSPEC SOCK_STREAM );
  2         6  
  2         289  
24              
25             BEGIN {
26 2 50 0 2   25 $ENV{'MOJO_REACTOR'} ||= 'Mojo::Reactor::Poll' if ($^O eq 'MSWin32');
27              
28 2 50       39 unless (Mojo::IOLoop->can('hoboprocess')) {
29             # Inject Mojo::IOLoop::hoboprocess, no impact to Mojo::IOLoop::subprocess
30              
31 2 0   0 0 1580 eval '
  0            
  0            
  0            
  0            
32             sub Mojo::IOLoop::hoboprocess {
33             my $ioloop = shift;
34             my $subprocess = Mojo::IOLoop::HoboProcess->new;
35              
36             weaken $subprocess->ioloop(
37             ref $ioloop ? $ioloop : $ioloop->singleton
38             )->{"ioloop"};
39              
40             return $subprocess->run(@_);
41             }
42             ';
43             }
44             }
45              
46             has ioloop => sub { Mojo::IOLoop->singleton };
47             has timeout => 0;
48              
49 0   0 0 1   sub exit { my $self = shift->{'hobo'} || MCE::Hobo->self; $self->exit(@_) }
  0            
50              
51 0     0 1   sub pid { shift->{'pid'} }
52              
53             sub run {
54 0     0 1   my ($self, $child, $parent) = @_;
55              
56             # Start the shared-manager process if not already started
57 0           MCE::Shared->start;
58              
59             # Make socketpair or pipe for event notification
60 0           my ($reader, $writer);
61              
62 0 0         if ($^O eq 'MSWin32') {
63 0 0         socketpair($reader, $writer, PF_UNIX, SOCK_STREAM, PF_UNSPEC)
64             or croak "Can't create socketpair: $!";
65             }
66             else {
67 0 0         pipe($reader, $writer) or croak "Can't create pipe: $!";
68             }
69              
70 0           $writer->autoflush(1);
71              
72             # Child
73             my $hobo = MCE::Hobo->create({ posix_exit => 1 }, sub {
74 0     0     close $reader;
75              
76 0           $self->ioloop->reset;
77 0           $self->{'pid'} = MCE::Hobo->pid;
78              
79 0   0       my $results = eval { [ $self->$child ] } || [];
80 0 0         my $error = $@; $error = '' if ($error eq "Hobo exited (0)\n");
  0            
81              
82 0 0 0       $error = "Hobo ". $self->{'pid'} ." exited abnormally"
83             if ($error && $error =~ /^Hobo exited \(\S+\)$/);
84              
85 0           print $writer "done\n";
86 0           close $writer;
87              
88 0           [ $error, @{ $results } ];
  0            
89 0           });
90              
91 0 0         croak "Can't spawn Hobo: $!" unless defined($hobo);
92 0           $self->{'hobo'} = $hobo, $self->{'pid'} = $hobo->pid;
93              
94             # Parent
95 0           my $me = $$;
96 0           my $stream = Mojo::IOLoop::Stream->new($reader);
97              
98 0           $stream->timeout($self->timeout);
99 0           $self->ioloop->stream($stream);
100              
101             $stream->on( read => sub {
102 0     0     my $results = eval { $hobo->join };
  0            
103 0   0       my $error = $hobo->error || shift @{ $results };
104 0           $self->$parent($error, @{ $results });
  0            
105 0           });
106              
107             $stream->on( timeout => sub {
108 0     0     $hobo->kill('QUIT');
109 0           eval { $hobo->join };
  0            
110 0           $self->$parent( "Hobo ". $self->pid ." timed out", () );
111 0           });
112              
113             $stream->on( close => sub {
114 0 0   0     return unless $$ == $me;
115 0 0         unless (exists $hobo->{'JOINED'}) {
116 0           eval { $hobo->join };
  0            
117 0           $self->$parent( "Hobo ". $self->{'pid'} ." exited abnormally", () );
118             }
119 0           });
120              
121 0           return $self;
122             }
123              
124             1;
125              
126             __END__