File Coverage

blib/lib/Qudo/Parallel/Manager.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Qudo::Parallel::Manager;
2 1     1   4 use strict;
  1         1  
  1         36  
3 1     1   5 use warnings;
  1         1  
  1         24  
4 1     1   450 use Qudo;
  0            
  0            
5             use UNIVERSAL::require;
6             use Parallel::Prefork::SpareWorkers qw(:status);
7             use Sub::Throttle qw/throttle/;
8             use IO::Socket;
9              
10             our $VERSION = '0.06';
11              
12             sub new {
13             my ($class, %args) = @_;
14              
15             my $max_request_par_child = delete $args{max_request_par_child} || 30;
16             my $max_workers = delete $args{max_workers} || 1;
17             my $min_spare_workers = delete $args{min_spare_workers} || 1;
18             my $max_spare_workers = delete $args{max_spare_workers} || $max_workers;
19             my $auto_load_worker = delete $args{auto_load_worker} || 1;
20             my $work_delay = $args{work_delay} || 5;
21             my $admin = delete $args{admin} || 0;
22             my $admin_host = delete $args{admin_host} || '127.0.0.1';
23             my $admin_port = delete $args{admin_port} || 90000;
24             my $debug = delete $args{debug} || 0;
25              
26             my $qudo = Qudo->new(%args);
27              
28             $qudo->manager->register_hooks(qw/Qudo::Hook::Scoreboard/);
29              
30             my $self = bless {
31             max_workers => $max_workers,
32             max_request_par_child => $max_request_par_child,
33             min_spare_workers => $min_spare_workers,
34             max_spare_workers => $max_spare_workers,
35             work_delay => $work_delay,
36             admin => $admin,
37             admin_host => $admin_host,
38             admin_port => $admin_port,
39             debug => $debug,
40             qudo => $qudo,
41             }, $class;
42              
43             if ($auto_load_worker) {
44             for my $worker (@{$qudo->{manager_abilities}}) {
45             $self->debug("Setting up the $worker\n");
46             $worker->use or die $@
47             }
48             }
49              
50             $self;
51             }
52              
53             sub debug {
54             my ($self, $msg) = @_;
55             warn $msg if $self->{debug};
56             }
57              
58             sub run {
59             my $self = shift;
60              
61             $self->debug("START WORKING : $$\n");
62              
63             my $pm = $self->pm;
64             my $c_pid = $self->start_admin_port;
65              
66             while ($pm->signal_received ne 'TERM') {
67             $pm->start and next;
68              
69             $self->debug("spawn $$\n");
70              
71             {
72             my $manager = $self->{qudo}->manager;
73             for my $dsn ($manager->shuffled_databases) {
74             my $db = $manager->driver_for($dsn);
75             $db->reconnect;
76             }
77              
78             my $reqs_before_exit = $self->{max_request_par_child};
79              
80             local $SIG{TERM} = sub { $reqs_before_exit = 0 };
81              
82             while ($reqs_before_exit > 0) {
83             if (throttle(0.5, sub { $manager->work_once })) {
84             $self->debug("WORK $$\n");
85             --$reqs_before_exit
86             } else {
87             sleep $self->{work_delay};
88             }
89             }
90             }
91              
92             $self->debug("FINISHED $$\n");
93             $pm->finish;
94             }
95              
96             $pm->wait_all_children;
97              
98             $self->stop_admin_port($c_pid);
99             }
100              
101             sub stop_admin_port {
102             my ($self, $pid) = @_;
103             return unless $pid;
104             kill 'TERM', $pid;
105             }
106              
107             sub start_admin_port {
108             my $self = shift;
109              
110             return unless $self->{admin};
111              
112             my $pid = fork();
113             die "fork failed: $!" unless defined $pid;
114             return $pid if $pid; # main process
115              
116             my $admin = IO::Socket::INET->new(
117             Listen => 5,
118             LocalAddr => $self->{admin_host},
119             LocalPort => $self->{admin_port},
120             Proto => 'tcp',
121             Type => SOCK_STREAM,
122             ReuseAddr => 1,
123             ) or die "Cannot open server socket: $!";
124              
125             while (my $remote = $admin->accept) {
126             my $status = join ' ', $self->pm->scoreboard->get_statuses;
127             $remote->print($status);
128             $remote->close;
129             }
130             }
131              
132             sub pm {
133             my $self = shift;
134              
135             $self->{pm} ||= do {
136              
137             my $pm = Parallel::Prefork::SpareWorkers->new({
138             max_workers => $self->{max_workers},
139             min_spare_workers => $self->{min_spare_workers},
140             max_spare_workers => $self->{max_spare_workers},
141             trap_signals => {
142             TERM => 'TERM',
143             HUP => 'TERM',
144             },
145             });
146              
147             {
148             no strict 'refs'; ## no critic.
149             *{"Qudo::Parallel::Manager::Registrar::pm"} = sub { $pm }
150             }
151              
152             $pm;
153             };
154             }
155              
156             1;
157             __END__