File Coverage

blib/lib/App/aep.pm
Criterion Covered Total %
statement 169 675 25.0
branch 21 172 12.2
condition 20 125 16.0
subroutine 21 48 43.7
pod 0 29 0.0
total 231 1049 22.0


line stmt bran cond sub pod time code
1             package App::aep;
2              
3             # ABSTRACT: Allows you to run a command within a container and control its start up
4              
5             # Core
6 13     13   266213 use warnings;
  13         17  
  13         602  
7 13     13   101 use strict;
  13         52  
  13         219  
8 13     13   512 use utf8;
  13         216  
  13         64  
9 13     13   311 use v5.28;
  13         30  
10              
11             # Core - Modules
12 13     13   511 use Socket qw(AF_INET PF_UNIX SOCK_STREAM);
  13         3303  
  13         803  
13 13     13   392 use IO::Socket::INET;
  13         12132  
  13         198  
14              
15             # Core - Experimental (stable)
16 13     13   5694 use experimental 'signatures';
  13         1213  
  13         79  
17              
18             # Debug
19 13     13   8434 use Data::Dumper;
  13         87450  
  13         958  
20              
21             # External
22 13         76 use POE qw(
23             Session::PlainCall
24             Wheel::SocketFactory
25             Wheel::ReadWrite
26             Wheel::Run
27             Filter::Stackable
28             Filter::Line
29             Filter::JSONMaybeXS
30 13     13   612 );
  13         14445  
31 13     13   541835 use Try::Tiny;
  13         1775  
  13         130160  
32              
33             # Ensure unbuffered output for container environments
34             STDOUT->autoflush(1);
35             STDERR->autoflush(1);
36              
37             # Version of this software
38             our $VERSION = '0.013';
39              
40             # create a new blessed object, we will carry any passed arguments forward.
41 12         17 sub new ( $class, @args )
42 12     12 0 1756 {
  12         17  
  12         13  
43 12         58 my $self = bless { '_passed_args' => $args[ 0 ]->{ '_passed_args' }, }, $class;
44 12         35 return $self;
45             }
46              
47             # POE::Kernel's _start, in this case it also tells the kernel to capture signals
48 9         16 sub _start ( $self, @args )
49 9     9   5424 {
  9         14  
  9         13  
50 9         45 poe->kernel->sig( INT => 'sig_int' );
51 9         682 poe->kernel->sig( TERM => 'sig_term' );
52 9         299 poe->kernel->sig( CHLD => 'sig_chld' );
53 9         1055 poe->kernel->sig( USR => 'sig_usr' );
54              
55             # Store the main session ID so sub-sessions can post events back to us
56 9         186 poe->heap->{ '_' }->{ 'main_session' } = poe->session->ID;
57              
58 9         139 my $debug = poe->heap->{ '_' }->{ 'debug' };
59 9         57 $debug->( 'STDERR', __LINE__, 'Signals(INT,TERM,CHLD,USR) trapped.' );
60              
61             # What command are we meant to be running?
62 9         34 my $opt = poe->heap->{ '_' }->{ 'opt' };
63              
64             # Initialize lock server order tracking
65 9 50       89 if ( $opt->lock_server )
66             {
67 0   0     0 my $order_str = $opt->lock_server_order || '';
68 0         0 my @raw_steps = grep { $_ ne '' } split( /,/, $order_str );
  0         0  
69              
70             # Each step may contain || for parallel groups: "redis1||redis2" becomes ['redis1', 'redis2']
71 0         0 my @order;
72 0         0 for my $step_str ( @raw_steps )
73             {
74 0         0 my @ids = split( /\|\|/, $step_str );
75 0         0 push @order, \@ids;
76             }
77              
78 0         0 poe->heap->{ 'lock' }->{ 'order' } = \@order;
79 0         0 poe->heap->{ 'lock' }->{ 'order_idx' } = 0;
80 0         0 poe->heap->{ 'lock' }->{ 'order_orig' } = [ map { [ @{ $_ } ] } @order ];
  0         0  
  0         0  
81 0         0 poe->heap->{ 'lock' }->{ 'waiting' } = {};
82 0         0 poe->heap->{ 'lock' }->{ 'unknown_queue' } = [];
83 0         0 poe->heap->{ 'lock' }->{ 'step_completed' } = 0;
84             }
85              
86             # Initialize command state
87 9         88 poe->heap->{ 'command' } = {};
88 9         59 poe->heap->{ 'command' }->{ 'restart_count' } = 0;
89 9         44 poe->heap->{ 'command' }->{ 'running' } = 0;
90 9         56 poe->heap->{ 'command' }->{ 'trigger_ok' } = 0;
91              
92 9 100 66     44 if ( $opt->docker_health_check || $opt->lock_client )
    50          
93             {
94 1         7 poe->heap->{ 'services' }->{ 'afunixcli' } = POE::Session::PlainCall->create(
95             'object_states' => [
96             App::aep->new() => {
97             '_start' => 'afunixcli_client_start',
98             'afunixcli_server_connected' => 'afunixcli_server_connected',
99             'afunixcli_client_error' => 'afunixcli_client_error',
100             'afunixcli_server_input' => 'afunixcli_server_input',
101             'afunixcli_server_error' => 'afunixcli_server_error',
102             'afunixcli_client_send' => 'afunixcli_client_send',
103             'afunixcli_client_reconnect' => 'afunixcli_client_reconnect',
104             },
105             ],
106             'heap' => poe->heap,
107             );
108             }
109             elsif ( $opt->lock_server )
110             {
111 0         0 poe->heap->{ 'services' }->{ 'afunixsrv' } = POE::Session::PlainCall->create(
112             'object_states' => [
113             App::aep->new() => {
114             '_start' => 'afunixsrv_server_start',
115             'afunixsrv_client_connected' => 'afunixsrv_client_connected',
116             'afunixsrv_server_error' => 'afunixsrv_server_error',
117             'afunixsrv_client_input' => 'afunixsrv_client_input',
118             'afunixsrv_client_error' => 'afunixsrv_client_error',
119             'afunixsrv_server_send' => 'afunixsrv_server_send'
120             },
121             ],
122             'heap' => poe->heap,
123             );
124             }
125              
126 9         333 poe->kernel->yield( 'scheduler' );
127              
128 9         680 return;
129             }
130              
131             # As server
132             sub afunixsrv_server_start
133             {
134 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
135 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
136              
137 0         0 my $socket_path = poe->heap->{ '_' }->{ 'config' }->{ 'AEP_SOCKETPATH' };
138 0         0 poe->heap->{ 'afunixsrv' }->{ 'socket_path' } = $socket_path;
139              
140 0 0       0 if ( -e $socket_path )
141             {
142 0         0 unlink $socket_path;
143             }
144              
145             # Unix domain socket listener
146 0         0 poe->heap->{ 'afunixsrv' }->{ 'server' } = POE::Wheel::SocketFactory->new(
147             'SocketDomain' => PF_UNIX,
148             'BindAddress' => $socket_path,
149             'SuccessEvent' => 'afunixsrv_client_connected',
150             'FailureEvent' => 'afunixsrv_server_error',
151             );
152              
153             # TCP socket listener
154 0   0     0 my $tcp_host = $opt->lock_server_host || '0.0.0.0';
155 0   0     0 my $tcp_port = $opt->lock_server_port || 60000;
156              
157 0         0 poe->heap->{ 'afunixsrv' }->{ 'tcp_server' } = POE::Wheel::SocketFactory->new(
158             'SocketDomain' => AF_INET,
159             'BindAddress' => $tcp_host,
160             'BindPort' => $tcp_port,
161             'Reuse' => 'yes',
162             'SuccessEvent' => 'afunixsrv_client_connected',
163             'FailureEvent' => 'afunixsrv_server_error',
164             );
165              
166 0         0 $debug->( 'STDERR', __LINE__, "Lock server listening on unix:$socket_path and tcp:$tcp_host:$tcp_port" );
167              
168 0         0 return;
169             }
170              
171             # As client
172             sub afunixcli_client_start
173             {
174 1     1 0 374 my $debug = poe->heap->{ '_' }->{ 'debug' };
175 1         6 my $opt = poe->heap->{ '_' }->{ 'opt' };
176              
177 1   50     6 my $transport = $opt->lock_transport || 'auto';
178              
179 1         16 my $socket_path = poe->heap->{ '_' }->{ 'config' }->{ 'AEP_SOCKETPATH' };
180 1         6 poe->heap->{ 'afunixcli' }->{ 'socket_path' } = $socket_path;
181              
182 1 50 33     13 if ( $transport eq 'tcp' || $transport eq 'auto' )
183             {
184             # Try TCP first (or only TCP if transport is 'tcp')
185 1   50     3 my $tcp_host = $opt->lock_client_host || 'aep-master';
186 1   50     5 my $tcp_port = $opt->lock_client_port || 60000;
187              
188 1         6 $debug->( 'STDERR', __LINE__, "Lock client connecting via TCP to $tcp_host:$tcp_port (transport=$transport)." );
189              
190 1         3 poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } = 'tcp';
191              
192 1         15 poe->heap->{ 'afunixcli' }->{ 'client' } = POE::Wheel::SocketFactory->new(
193             'SocketDomain' => AF_INET,
194             'RemoteAddress' => $tcp_host,
195             'RemotePort' => $tcp_port,
196             'SuccessEvent' => 'afunixcli_server_connected',
197             'FailureEvent' => 'afunixcli_client_error',
198             );
199             }
200             else
201             {
202             # Unix socket only
203 0 0       0 if ( !-e $socket_path )
204             {
205 0         0 $debug->( 'STDERR', __LINE__, "Control socket '$socket_path' does not exist, refusing to continue." );
206 0         0 die;
207             }
208              
209 0         0 $debug->( 'STDERR', __LINE__, "Lock client connecting via Unix socket $socket_path." );
210              
211 0         0 poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } = 'unix';
212              
213 0         0 poe->heap->{ 'afunixcli' }->{ 'client' } = POE::Wheel::SocketFactory->new(
214             'SocketDomain' => PF_UNIX,
215             'RemoteAddress' => $socket_path,
216             'SuccessEvent' => 'afunixcli_server_connected',
217             'FailureEvent' => 'afunixcli_client_error',
218             );
219             }
220              
221 1         12559 return;
222             }
223              
224             # As server
225 0         0 sub afunixsrv_server_error ( $self, $syscall, $errno, $error, $wid )
  0         0  
  0         0  
  0         0  
226 0     0 0 0 {
  0         0  
  0         0  
227 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
228              
229 0 0       0 if ( !$errno )
230             {
231 0         0 $error = "Normal disconnection.";
232             }
233              
234 0         0 $debug->( 'STDERR', __LINE__, "Server AA socket encountered $syscall error $errno: $error", 'error' );
235              
236 0         0 delete poe->heap->{ 'services' }->{ 'afunixsrv' };
237 0         0 return;
238             }
239              
240             # As client
241 1         2 sub afunixcli_client_error ( $self, $syscall, $errno, $error, $wid )
  1         2  
  1         2  
  1         2  
242 1     1 0 340 {
  1         1  
  1         2  
243 1         4 my $debug = poe->heap->{ '_' }->{ 'debug' };
244 1         9 my $opt = poe->heap->{ '_' }->{ 'opt' };
245              
246 1 50       7 if ( !$errno )
247             {
248 0         0 $error = "Normal disconnection.";
249             }
250              
251 1         7 $debug->( 'STDERR', __LINE__, "Client socket encountered $syscall error $errno: $error", 'error' );
252              
253             # If running a docker health check and connection failed, exit unhealthy
254 1 50       5 if ( $opt->docker_health_check )
255             {
256 1         9 $debug->( 'STDERR', __LINE__, "Health check connection failed, exiting unhealthy.", 'error' );
257 1         4 poe->heap->{ '_' }->{ 'set_exit' }->( '1', 'health-check-failed' );
258 1         3 poe->kernel->stop();
259 1         950 return;
260             }
261              
262             # Auto transport fallback: if TCP was attempted and failed, try Unix socket
263 0   0     0 my $transport = $opt->lock_transport || 'auto';
264 0   0     0 my $attempted = poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } || '';
265              
266 0 0 0     0 if ( $transport eq 'auto' && $attempted eq 'tcp' )
267             {
268 0         0 my $socket_path = poe->heap->{ 'afunixcli' }->{ 'socket_path' };
269              
270 0 0 0     0 if ( $socket_path && -e $socket_path )
271             {
272 0         0 $debug->( 'STDERR', __LINE__, "TCP connection failed, falling back to Unix socket $socket_path." );
273              
274 0         0 poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } = 'unix';
275              
276 0         0 poe->heap->{ 'afunixcli' }->{ 'client' } = POE::Wheel::SocketFactory->new(
277             'SocketDomain' => PF_UNIX,
278             'RemoteAddress' => $socket_path,
279             'SuccessEvent' => 'afunixcli_server_connected',
280             'FailureEvent' => 'afunixcli_client_error',
281             );
282 0         0 return;
283             }
284             else
285             {
286 0         0 $debug->( 'STDERR', __LINE__, "TCP failed and Unix socket '$socket_path' does not exist." );
287             }
288             }
289              
290 0         0 delete poe->heap->{ 'services' }->{ 'afunixcli' };
291              
292             # Check if retries are disabled
293 0 0       0 if ( $opt->lock_client_noretry )
294             {
295 0         0 $debug->( 'STDERR', __LINE__, "lock-client-noretry is set, exiting.", 'error' );
296 0         0 poe->heap->{ '_' }->{ 'set_exit' }->( '1', 'lock-client-noretry' );
297 0         0 poe->kernel->stop();
298 0         0 return;
299             }
300              
301             # Increment retry counter
302 0   0     0 poe->heap->{ 'afunixcli' }->{ 'retry_count' } ||= 0;
303 0         0 poe->heap->{ 'afunixcli' }->{ 'retry_count' }++;
304              
305 0   0     0 my $max_retries = $opt->lock_client_retry || 0;
306 0         0 my $retry_count = poe->heap->{ 'afunixcli' }->{ 'retry_count' };
307              
308             # 0 = infinite retries, otherwise check max
309 0 0 0     0 if ( $max_retries > 0 && $retry_count > $max_retries )
310             {
311 0         0 $debug->( 'STDERR', __LINE__, "Max retries ($max_retries) exceeded, exiting.", 'error' );
312 0         0 poe->heap->{ '_' }->{ 'set_exit' }->( '1', 'lock-client-retries-exhausted' );
313 0         0 poe->kernel->stop();
314 0         0 return;
315             }
316              
317 0   0     0 my $delay = $opt->lock_client_retry_delay || 5;
318 0 0       0 $debug->( 'STDERR', __LINE__,
319             "Scheduling reconnect attempt $retry_count in ${delay}s (max: "
320             . ( $max_retries == 0 ? 'infinite' : $max_retries ) . ")." );
321 0         0 poe->kernel->delay( 'afunixcli_client_reconnect' => $delay );
322              
323 0         0 return;
324             }
325              
326             # As client - reconnect after a failed connection
327             sub afunixcli_client_reconnect
328             {
329 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
330 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
331              
332 0         0 $debug->( 'STDERR', __LINE__, "Attempting lock client reconnect." );
333              
334 0   0     0 my $transport = $opt->lock_transport || 'auto';
335              
336 0 0 0     0 if ( $transport eq 'tcp' || $transport eq 'auto' )
337             {
338 0   0     0 my $tcp_host = $opt->lock_client_host || 'aep-master';
339 0   0     0 my $tcp_port = $opt->lock_client_port || 60000;
340              
341 0         0 $debug->( 'STDERR', __LINE__, "Reconnecting via TCP to $tcp_host:$tcp_port." );
342              
343 0         0 poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } = 'tcp';
344              
345 0         0 poe->heap->{ 'afunixcli' }->{ 'client' } = POE::Wheel::SocketFactory->new(
346             'SocketDomain' => AF_INET,
347             'RemoteAddress' => $tcp_host,
348             'RemotePort' => $tcp_port,
349             'SuccessEvent' => 'afunixcli_server_connected',
350             'FailureEvent' => 'afunixcli_client_error',
351             );
352             }
353             else
354             {
355 0         0 my $socket_path = poe->heap->{ '_' }->{ 'config' }->{ 'AEP_SOCKETPATH' };
356 0         0 poe->heap->{ 'afunixcli' }->{ 'socket_path' } = $socket_path;
357              
358 0 0       0 if ( !-e $socket_path )
359             {
360 0         0 $debug->( 'STDERR', __LINE__, "Control socket '$socket_path' does not exist, will retry." );
361 0   0     0 my $delay = $opt->lock_client_retry_delay || 5;
362 0         0 poe->kernel->delay( 'afunixcli_client_reconnect' => $delay );
363 0         0 return;
364             }
365              
366 0         0 $debug->( 'STDERR', __LINE__, "Reconnecting via Unix socket $socket_path." );
367              
368 0         0 poe->heap->{ 'afunixcli' }->{ 'transport_attempted' } = 'unix';
369              
370 0         0 poe->heap->{ 'afunixcli' }->{ 'client' } = POE::Wheel::SocketFactory->new(
371             'SocketDomain' => PF_UNIX,
372             'RemoteAddress' => $socket_path,
373             'SuccessEvent' => 'afunixcli_server_connected',
374             'FailureEvent' => 'afunixcli_client_error',
375             );
376             }
377              
378 0         0 return;
379             }
380              
381             # As server
382 0         0 sub afunixsrv_client_connected ( $self, $socket, @args )
  0         0  
383 0     0 0 0 {
  0         0  
  0         0  
384              
385             # Generate an ID we can use
386 0         0 my $client_id = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'id' }++;
387              
388             # Store the socket within it so it cannot go out of scope
389 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'socket' } = $socket;
390              
391             # Send a debug message for the event of a client connecting
392 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
393 0         0 $debug->( 'STDERR', __LINE__, "Client connected." );
394              
395             # Create a stackable filter so we can talk in json
396 0         0 my $filter = POE::Filter::Stackable->new();
397 0         0 $filter->push( POE::Filter::Line->new(), POE::Filter::JSONMaybeXS->new(), );
398              
399             # Create a rw_wheel to deal with the client
400 0         0 my $rw_wheel = POE::Wheel::ReadWrite->new(
401             'Handle' => $socket,
402             'Filter' => $filter,
403             'InputEvent' => 'afunixsrv_client_input',
404             'ErrorEvent' => 'afunixsrv_client_error',
405             );
406              
407             # Store the wheel next to the socket
408 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'wheel' } = $rw_wheel;
409              
410             # Store the filter so it never falls out of scope
411 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'filter' } = $filter;
412              
413             # Store tx/rx about the connection
414 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'tx_count' } = 0;
415 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'rx_count' } = 0;
416              
417             # Create a mapping from the wheelid to the client
418 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'cid2wid' }->{ $client_id } = $rw_wheel->ID;
419              
420             # And the other way
421 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'wid2cid' }->{ $rw_wheel->ID } = $client_id;
422              
423             # Also make a note under the obj, for cleaning up
424 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $client_id }->{ 'wid' } = $rw_wheel->ID;
425              
426             # Send a message to the connected client (direct put, not cross-session yield)
427 0         0 my $msg = { 'event' => 'hello' };
428 0         0 $rw_wheel->put( $msg );
429              
430 0         0 return;
431             }
432              
433             # As client
434 0         0 sub afunixcli_server_connected ( $self, $socket, @args )
  0         0  
435 0     0 0 0 {
  0         0  
  0         0  
436             # Store the socket within it so it cannot go out of scope
437 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'obj' } = $socket;
438              
439             # Send a debug message for the event of a client connecting
440 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
441 0         0 $debug->( 'STDERR', __LINE__, "Server connected." );
442              
443             # Create a stackable filter so we can talk in json
444 0         0 my $filter = POE::Filter::Stackable->new();
445 0         0 $filter->push( POE::Filter::Line->new(), POE::Filter::JSONMaybeXS->new(), );
446              
447             # Create a rw_wheel to deal with the client
448 0         0 my $rw_wheel = POE::Wheel::ReadWrite->new(
449             'Handle' => $socket,
450             'Filter' => $filter,
451             'InputEvent' => 'afunixcli_server_input',
452             'ErrorEvent' => 'afunixcli_server_error',
453             );
454              
455             # Store the wheel next to the socket
456 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'wheel' } = $rw_wheel;
457              
458             # Store the filter so it never falls out of scope
459 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'filter' } = $filter;
460              
461             # Store tx/rx about the connection
462 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'tx_count' } = 0;
463 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'rx_count' } = 0;
464              
465 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
466              
467 0 0       0 if ( $opt->docker_health_check )
468             {
469             # Health check mode: request status from server
470 0         0 my $msg = { 'event' => 'health_check' };
471 0         0 $rw_wheel->put( $msg );
472             }
473             else
474             {
475             # Send our lock-id to the server so it knows who we are
476 0         0 my $msg = { 'event' => 'hello', 'lock_id' => $opt->lock_id };
477 0         0 $rw_wheel->put( $msg );
478             }
479              
480 0         0 return;
481             }
482              
483             # As server
484 0         0 sub afunixsrv_server_send ( $self, $cid, $pkt )
  0         0  
485 0     0 0 0 {
  0         0  
  0         0  
486 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
487              
488 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'tx_count' }++;
489              
490 0         0 my $wheel = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'wheel' };
491              
492             # Format the packet, should be small
493 0         0 my $packet = Dumper( $pkt );
494 0         0 $packet =~ s#[\r\n]##g;
495 0         0 $packet =~ s#\s+# #g;
496              
497 0         0 $debug->( 'STDERR', __LINE__, "Client($cid) TX: $packet", 'debug' );
498              
499 0         0 $wheel->put( $pkt );
500              
501 0         0 return;
502             }
503              
504             # As client
505 0         0 sub afunixcli_client_send ( $self, $pkt )
506 0     0 0 0 {
  0         0  
  0         0  
507 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
508              
509 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'tx_count' }++;
510              
511 0         0 my $wheel = poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'wheel' };
512              
513             # Format the packet, should be small
514 0         0 my $packet = Dumper( $pkt );
515 0         0 $packet =~ s#[\r\n]##g;
516 0         0 $packet =~ s#\s+# #g;
517              
518 0         0 $debug->( 'STDERR', __LINE__, "Server(-) TX: $packet", 'debug' );
519              
520 0         0 $wheel->put( $pkt );
521              
522 0         0 return;
523             }
524              
525             # As server - handle input from a connected lock client
526 0         0 sub afunixsrv_client_input ( $self, $input, $wid )
  0         0  
527 0     0 0 0 {
  0         0  
  0         0  
528 0         0 my $cid = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'wid2cid' }->{ $wid };
529 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
530 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
531              
532             # Increment the received packet count
533 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'rx_count' }++;
534              
535             # Shortcut to the wheel the client is connected to
536 0         0 my $wheel = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'wheel' };
537              
538             # Format the packet, should be small
539 0         0 my $packet = Dumper( $input );
540 0         0 $packet =~ s#[\r\n]##g;
541 0         0 $packet =~ s#\s+# #g;
542              
543 0         0 $debug->( 'STDERR', __LINE__, "Client($cid) RX: $packet", 'debug' );
544              
545 0   0     0 my $event = $input->{ 'event' } || '';
546              
547             # Client is saying hello with its lock-id
548 0 0 0     0 if ( $event eq 'hello' && defined $input->{ 'lock_id' } )
    0          
    0          
549             {
550 0         0 my $lock_id = $input->{ 'lock_id' };
551 0         0 $debug->( 'STDERR', __LINE__, "Client($cid) identified as lock-id: $lock_id" );
552              
553             # Store the lock-id for this client
554 0         0 poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'lock_id' } = $lock_id;
555              
556             # Map lock_id to cid for quick lookup
557 0         0 poe->heap->{ 'lock' }->{ 'id2cid' }->{ $lock_id } = $cid;
558              
559             # Check if this client is next in the order
560 0         0 _lock_server_check_next();
561             }
562             # Client is reporting that its lock-trigger passed (command started successfully)
563             elsif ( $event eq 'trigger_ok' )
564             {
565 0   0     0 my $lock_id = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'lock_id' } || 'unknown';
566 0         0 $debug->( 'STDERR', __LINE__, "Client($cid) lock-id '$lock_id' reports trigger success." );
567              
568             # Increment step_completed counter for parallel groups
569 0         0 poe->heap->{ 'lock' }->{ 'step_completed' }++;
570              
571 0         0 my $order = poe->heap->{ 'lock' }->{ 'order' };
572 0         0 my $idx = poe->heap->{ 'lock' }->{ 'order_idx' };
573              
574 0 0       0 if ( $idx < scalar( @{ $order } ) )
  0         0  
575             {
576 0         0 my $step = $order->[ $idx ];
577 0 0       0 my @ids = ref $step eq 'ARRAY' ? @{ $step } : ( $step );
  0         0  
578 0         0 my $step_size = scalar @ids;
579              
580             $debug->( 'STDERR', __LINE__,
581 0         0 "Step $idx: " . poe->heap->{ 'lock' }->{ 'step_completed' } . "/$step_size completed." );
582              
583             # Only advance when all IDs in the current parallel step have reported trigger_ok
584 0 0       0 if ( poe->heap->{ 'lock' }->{ 'step_completed' } >= $step_size )
585             {
586 0         0 poe->heap->{ 'lock' }->{ 'step_completed' } = 0;
587 0         0 poe->heap->{ 'lock' }->{ 'order_idx' }++;
588 0         0 _lock_server_check_next();
589             }
590             }
591             }
592             # Client is requesting a health check
593             elsif ( $event eq 'health_check' )
594             {
595 0   0     0 my $order = poe->heap->{ 'lock' }->{ 'order' } || [];
596 0   0     0 my $idx = poe->heap->{ 'lock' }->{ 'order_idx' } || 0;
597 0         0 my $total_steps = scalar @{ $order };
  0         0  
598              
599             # Build lists of cleared and waiting IDs
600 0         0 my @cleared;
601             my @waiting;
602 0         0 for my $i ( 0 .. $#{ $order } )
  0         0  
603             {
604 0         0 my $step = $order->[ $i ];
605 0 0       0 my @ids = ref $step eq 'ARRAY' ? @{ $step } : ( $step );
  0         0  
606 0 0       0 if ( $i < $idx )
607             {
608 0         0 push @cleared, @ids;
609             }
610             else
611             {
612 0         0 push @waiting, @ids;
613             }
614             }
615              
616             my $status = {
617             'event' => 'health_status',
618             'status' => 'ok',
619 0 0       0 'clients_connected' => scalar( keys %{ poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' } || {} } ),
  0         0  
620             'order_progress' => "$idx/$total_steps",
621             'cleared' => \@cleared,
622             'waiting' => \@waiting,
623             };
624 0         0 $wheel->put( $status );
625             }
626              
627 0         0 return;
628             }
629              
630             # Check if the next client in the lock order is connected and ready
631             sub _lock_server_check_next
632             {
633 0     0   0 my $debug = poe->heap->{ '_' }->{ 'debug' };
634 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
635 0         0 my $order = poe->heap->{ 'lock' }->{ 'order' };
636 0         0 my $idx = poe->heap->{ 'lock' }->{ 'order_idx' };
637              
638             # Check if we have exhausted the order list
639 0 0       0 if ( $idx >= scalar( @{ $order } ) )
  0         0  
640             {
641 0         0 $debug->( 'STDERR', __LINE__, "Lock order list exhausted." );
642 0         0 _lock_server_handle_exhaust();
643 0         0 return;
644             }
645              
646 0         0 my $step = $order->[ $idx ];
647 0 0       0 my @ids = ref $step eq 'ARRAY' ? @{ $step } : ( $step );
  0         0  
648 0         0 my $step_label = join( '||', @ids );
649 0         0 $debug->( 'STDERR', __LINE__, "Lock order: checking step $idx [$step_label]." );
650              
651             # For each ID in the current parallel step, send run if connected
652 0         0 for my $next_id ( @ids )
653             {
654 0         0 my $cid = poe->heap->{ 'lock' }->{ 'id2cid' }->{ $next_id };
655 0 0       0 if ( defined $cid )
656             {
657             # Only send run if we haven't already sent it
658 0 0       0 if ( !poe->heap->{ 'lock' }->{ 'run_sent' }->{ $next_id } )
659             {
660 0         0 $debug->( 'STDERR', __LINE__, "Lock-id '$next_id' is connected (cid $cid), sending run." );
661 0         0 my $msg = { 'event' => 'run' };
662 0         0 my $wheel = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'wheel' };
663 0         0 $wheel->put( $msg );
664 0         0 poe->heap->{ 'lock' }->{ 'run_sent' }->{ $next_id } = 1;
665             }
666             }
667             else
668             {
669 0         0 $debug->( 'STDERR', __LINE__, "Lock-id '$next_id' not yet connected, waiting." );
670             }
671             }
672              
673             # Also process any unknown clients based on lock-server-default
674 0         0 _lock_server_process_unknown();
675              
676 0         0 return;
677             }
678              
679             # Handle unknown lock-ids based on --lock-server-default
680             sub _lock_server_process_unknown
681             {
682 0     0   0 my $debug = poe->heap->{ '_' }->{ 'debug' };
683 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
684 0   0     0 my $default_act = $opt->lock_server_default || 'ignore';
685 0         0 my $order = poe->heap->{ 'lock' }->{ 'order' };
686              
687             # Build a set of known lock-ids from the order list (which is now array-of-arrays)
688 0         0 my %known;
689 0         0 for my $step ( @{ $order } )
  0         0  
690             {
691 0 0       0 my @ids = ref $step eq 'ARRAY' ? @{ $step } : ( $step );
  0         0  
692 0         0 $known{ $_ } = 1 for @ids;
693             }
694              
695             # Check all connected clients for unknown lock-ids
696 0   0     0 my $clients = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' } || {};
697 0         0 for my $cid ( keys %{ $clients } )
  0         0  
698             {
699 0         0 my $lid = $clients->{ $cid }->{ 'lock_id' };
700 0 0       0 next unless defined $lid;
701 0 0       0 next if $known{ $lid };
702 0 0       0 next if $clients->{ $cid }->{ 'unknown_handled' };
703              
704 0 0       0 if ( $default_act eq 'run' )
    0          
705             {
706 0         0 $debug->( 'STDERR', __LINE__, "Unknown lock-id '$lid' (cid $cid): sending run (default=run)." );
707 0         0 my $msg = { 'event' => 'run' };
708 0         0 my $wheel = $clients->{ $cid }->{ 'wheel' };
709 0 0       0 $wheel->put( $msg ) if $wheel;
710 0         0 $clients->{ $cid }->{ 'unknown_handled' } = 1;
711             }
712             elsif ( $default_act eq 'runlast' )
713             {
714             # Queue it - will be processed after order list exhaustion
715 0         0 push @{ poe->heap->{ 'lock' }->{ 'unknown_queue' } }, $cid
716 0 0       0 unless grep { $_ == $cid } @{ poe->heap->{ 'lock' }->{ 'unknown_queue' } };
  0         0  
  0         0  
717             }
718             else
719             {
720             # ignore
721 0         0 $debug->( 'STDERR', __LINE__, "Unknown lock-id '$lid' (cid $cid): ignoring (default=ignore)." );
722 0         0 $clients->{ $cid }->{ 'unknown_handled' } = 1;
723             }
724             }
725              
726 0         0 return;
727             }
728              
729             # Handle what happens when the lock order list is fully exhausted
730             sub _lock_server_handle_exhaust
731             {
732 0     0   0 my $debug = poe->heap->{ '_' }->{ 'debug' };
733 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
734 0   0     0 my $action = $opt->lock_server_exhaust_action || 'idle';
735              
736             # First, run any "runlast" queued unknowns
737 0   0     0 my $queue = poe->heap->{ 'lock' }->{ 'unknown_queue' } || [];
738 0         0 for my $cid ( @{ $queue } )
  0         0  
739             {
740 0         0 $debug->( 'STDERR', __LINE__, "Exhaust: sending run to queued unknown cid $cid." );
741 0         0 my $msg = { 'event' => 'run' };
742 0         0 my $wheel = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'wheel' };
743 0 0       0 $wheel->put( $msg ) if $wheel;
744             }
745 0         0 poe->heap->{ 'lock' }->{ 'unknown_queue' } = [];
746              
747 0 0       0 if ( $action eq 'exit' )
    0          
    0          
748             {
749 0         0 $debug->( 'STDERR', __LINE__, "Lock order exhausted: exiting." );
750 0         0 poe->heap->{ '_' }->{ 'set_exit' }->( '0', 'lock-order-exhausted' );
751 0         0 poe->kernel->stop();
752             }
753             elsif ( $action eq 'restart' )
754             {
755 0         0 $debug->( 'STDERR', __LINE__, "Lock order exhausted: restarting order list." );
756 0         0 poe->heap->{ 'lock' }->{ 'order_idx' } = 0;
757 0         0 poe->heap->{ 'lock' }->{ 'order' } = [ map { [ @{ $_ } ] } @{ poe->heap->{ 'lock' }->{ 'order_orig' } } ];
  0         0  
  0         0  
  0         0  
758 0         0 poe->heap->{ 'lock' }->{ 'id2cid' } = {};
759 0         0 poe->heap->{ 'lock' }->{ 'step_completed' } = 0;
760 0         0 poe->heap->{ 'lock' }->{ 'run_sent' } = {};
761             }
762             elsif ( $action eq 'execute' )
763             {
764 0         0 $debug->( 'STDERR', __LINE__, "Lock order exhausted: starting own command." );
765 0         0 poe->kernel->yield( 'command_start' );
766             }
767             else
768             {
769             # idle - do nothing, just keep the event loop alive
770 0         0 $debug->( 'STDERR', __LINE__, "Lock order exhausted: idling." );
771             }
772              
773 0         0 return;
774             }
775              
776             # As client - handle input from the lock server
777 0         0 sub afunixcli_server_input ( $self, $input, $wid )
  0         0  
778 0     0 0 0 {
  0         0  
  0         0  
779 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
780              
781             # Increment the received packet count
782 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'rx_count' }++;
783              
784             # Shortcut to the wheel the client is connected to
785 0         0 my $wheel = poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'wheel' };
786              
787             # Format the packet, should be small
788 0         0 my $packet = Dumper( $input );
789 0         0 $packet =~ s#[\r\n]##g;
790 0         0 $packet =~ s#\s+# #g;
791              
792 0         0 $debug->( 'STDERR', __LINE__, "Server(-) RX: $packet", 'debug' );
793              
794 0   0     0 my $event = $input->{ 'event' } || '';
795              
796             # Server says run - start our command (post to main session, not this socket session)
797 0 0       0 if ( $event eq 'run' )
    0          
798             {
799 0         0 $debug->( 'STDERR', __LINE__, "Received 'run' from lock server, starting command." );
800 0         0 poe->heap->{ 'command' }->{ 'lock_cleared' } = 1;
801             # Cancel the timeout if one was set
802 0         0 poe->kernel->post( poe->heap->{ '_' }->{ 'main_session' }, 'lock_client_timeout_cancel' );
803 0         0 poe->kernel->post( poe->heap->{ '_' }->{ 'main_session' }, 'command_start' );
804             }
805             # Server sends health status
806             elsif ( $event eq 'health_status' )
807             {
808 0         0 require JSON::MaybeXS;
809 0         0 say STDOUT JSON::MaybeXS::encode_json( $input );
810 0         0 poe->heap->{ '_' }->{ 'set_exit' }->( 0, 'health-check-ok' );
811 0         0 poe->kernel->stop();
812             }
813              
814 0         0 return;
815             }
816              
817             # As server
818 0         0 sub afunixsrv_client_error ( $self, $syscall, $errno, $error, $wid )
  0         0  
  0         0  
  0         0  
819 0     0 0 0 {
  0         0  
  0         0  
820 0         0 my $cid = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'wid2cid' }->{ $wid };
821 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
822              
823 0 0       0 if ( !$errno )
824             {
825 0         0 $error = "Normal disconnection for wheel: $wid, cid: $cid";
826             }
827              
828 0         0 $debug->( 'STDERR', __LINE__, "Server session encountered $syscall error $errno: $error", 'error' );
829              
830             # Clean up the dead client's state
831 0 0       0 if ( defined $cid )
832             {
833 0         0 my $lock_id = poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid }->{ 'lock_id' };
834              
835             # Remove the lock id2cid mapping if it exists
836 0 0       0 if ( defined $lock_id )
837             {
838 0         0 delete poe->heap->{ 'lock' }->{ 'id2cid' }->{ $lock_id };
839             }
840              
841             # Remove wid2cid and cid2wid mappings
842 0         0 delete poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'wid2cid' }->{ $wid };
843 0         0 delete poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'cid2wid' }->{ $cid };
844              
845             # Delete the client's obj entry
846 0         0 delete poe->heap->{ 'afunixsrv' }->{ 'client' }->{ 'obj' }->{ $cid };
847             }
848              
849 0         0 return;
850             }
851              
852             # As client
853 0         0 sub afunixcli_server_error ( $self, $syscall, $errno, $error, $wid )
  0         0  
  0         0  
  0         0  
854 0     0 0 0 {
  0         0  
  0         0  
855 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
856              
857 0 0       0 if ( !$errno )
858             {
859 0         0 $error = "Normal disconnection for wheel: $wid";
860             }
861              
862 0         0 $debug->( 'STDERR', __LINE__, "Server session encountered $syscall error $errno: $error", 'error' );
863              
864 0         0 return;
865             }
866              
867             # --- Command execution via POE::Wheel::Run ---
868              
869             # Start the child command process
870             sub command_start
871             {
872 10     10 0 200020 my $debug = poe->heap->{ '_' }->{ 'debug' };
873 10         62 my $opt = poe->heap->{ '_' }->{ 'opt' };
874              
875             # Do not start if already running
876 10 50       50 if ( poe->heap->{ 'command' }->{ 'running' } )
877             {
878 0         0 $debug->( 'STDERR', __LINE__, "Command already running, skipping start." );
879 0         0 return;
880             }
881              
882 10   50     80 my $cmd = $opt->command || 'aep --help';
883 10   100     77 my $cmd_args = $opt->command_args || '';
884              
885             # Build the program + args array for Wheel::Run
886 10         89 my @args = grep { $_ ne '' } split( /,/, $cmd_args );
  6         18  
887              
888 10         60 $debug->( 'STDERR', __LINE__, "Starting command: $cmd " . join( ' ', @args ) );
889              
890             # Reset trigger state for this run
891 10         28 poe->heap->{ 'command' }->{ 'trigger_ok' } = 0;
892              
893 10         170 my $wheel = POE::Wheel::Run->new(
894             'Program' => $cmd,
895             'ProgramArgs' => \@args,
896             'StdoutEvent' => 'command_stdout',
897             'StderrEvent' => 'command_stderr',
898             'CloseEvent' => 'command_close',
899             'ErrorEvent' => 'command_error',
900             );
901              
902 10         61685 poe->heap->{ 'command' }->{ 'wheel' } = $wheel;
903 10         537 poe->heap->{ 'command' }->{ 'pid' } = $wheel->PID;
904 10         221 poe->heap->{ 'command' }->{ 'running' } = 1;
905              
906 10         203 $debug->( 'STDERR', __LINE__, "Command started with PID: " . $wheel->PID );
907              
908             # Tell the kernel to watch this child
909 10         121 poe->kernel->sig_child( $wheel->PID, 'sig_chld' );
910              
911             # If we are a lock client with a time-based trigger, set the timer now
912 10 50       920 if ( $opt->lock_client )
913             {
914 0         0 _lock_trigger_setup();
915             }
916              
917 10         365 return;
918             }
919              
920             # Handle stdout from the child process
921 4         15 sub command_stdout ( $self, $line, $wid )
  4         12  
922 4     4 0 3975 {
  4         11  
  4         10  
923 4         15 my $debug = poe->heap->{ '_' }->{ 'debug' };
924 4         31 my $opt = poe->heap->{ '_' }->{ 'opt' };
925              
926             # Pass through to our own stdout
927 4         166 say STDOUT $line;
928              
929             # Check lock trigger if we are a lock client
930 4 50 33     32 if ( $opt->lock_client && !poe->heap->{ 'command' }->{ 'trigger_ok' } )
931             {
932 0         0 _lock_trigger_check( 'stdout', $line );
933             }
934              
935 4         57 return;
936             }
937              
938             # Handle stderr from the child process
939 1         2 sub command_stderr ( $self, $line, $wid )
  1         4  
940 1     1 0 2592 {
  1         2  
  1         3  
941 1         6 my $debug = poe->heap->{ '_' }->{ 'debug' };
942 1         9 my $opt = poe->heap->{ '_' }->{ 'opt' };
943              
944             # Pass through to our own stderr
945 1         57 say STDERR $line;
946              
947             # Check lock trigger if we are a lock client
948 1 50 33     7 if ( $opt->lock_client && !poe->heap->{ 'command' }->{ 'trigger_ok' } )
949             {
950 0         0 _lock_trigger_check( 'stderr', $line );
951             }
952              
953 1         16 return;
954             }
955              
956             # Handle child process close (all filehandles closed)
957 10         22 sub command_close ( $self, $wid )
958 10     10 0 457 {
  10         23  
  10         44  
959 10         34 my $debug = poe->heap->{ '_' }->{ 'debug' };
960 10         67 my $opt = poe->heap->{ '_' }->{ 'opt' };
961              
962 10         81 $debug->( 'STDERR', __LINE__, "Command process closed (wheel $wid)." );
963              
964 10         46 poe->heap->{ 'command' }->{ 'running' } = 0;
965 10         68 delete poe->heap->{ 'command' }->{ 'wheel' };
966              
967             # Do not restart if we are shutting down
968 10 50       3587 if ( poe->heap->{ 'command' }->{ 'shutting_down' } )
969             {
970 0         0 $debug->( 'STDERR', __LINE__, "Command exited during shutdown, not restarting." );
971 0         0 return;
972             }
973              
974             # Check restart logic
975 10   100     144 my $max_restart = $opt->command_restart || 0;
976 10   100     111 my $no_restart = $opt->command_norestart || 0;
977              
978 10 100       79 if ( $no_restart )
979             {
980             # In lock-client mode, don't exit yet -- wait for the trigger to fire
981             # and report back to the server before shutting down
982 7 50 33     74 if ( $opt->lock_client && !poe->heap->{ 'command' }->{ 'trigger_ok' } )
983             {
984 0         0 $debug->( 'STDERR', __LINE__, "Command exited, waiting for lock trigger before shutdown." );
985 0         0 return;
986             }
987 7         91 $debug->( 'STDERR', __LINE__, "Command exited, no-restart flag set." );
988 7         32 poe->heap->{ '_' }->{ 'set_exit' }->( '0', 'command-exited-norestart' );
989 7         38 return;
990             }
991              
992 3         11 my $count = poe->heap->{ 'command' }->{ 'restart_count' };
993              
994             # -1 means infinite restarts, otherwise check the limit
995 3 100 66     28 if ( $max_restart == -1 || $count < $max_restart )
996             {
997 2         4 poe->heap->{ 'command' }->{ 'restart_count' }++;
998 2   50     12 my $delay_ms = $opt->command_restart_delay || 1000;
999 2         8 my $delay_s = $delay_ms / 1000;
1000              
1001 2         11 $debug->( 'STDERR', __LINE__,
1002             "Command exited, restarting in ${delay_ms}ms (attempt " . ( $count + 1 ) . ")." );
1003 2         19 poe->kernel->delay( 'command_start' => $delay_s );
1004             }
1005             else
1006             {
1007 1         15 $debug->( 'STDERR', __LINE__, "Command exited, max restarts ($max_restart) reached." );
1008 1         7 poe->heap->{ '_' }->{ 'set_exit' }->( '0', 'command-exited-max-restarts' );
1009             }
1010              
1011 3         236 return;
1012             }
1013              
1014             # Handle errors from the child process wheel
1015 20         43 sub command_error ( $self, $syscall, $errno, $error, $wid, @extra )
  20         41  
  20         26  
  20         46  
  20         32  
1016 20     20 0 13043 {
  20         49  
  20         26  
1017 20         52 my $debug = poe->heap->{ '_' }->{ 'debug' };
1018              
1019             # errno 0 on read means EOF, which is normal
1020 20 50       194 if ( !$errno )
1021             {
1022 20         87 return;
1023             }
1024              
1025 0         0 $debug->( 'STDERR', __LINE__, "Command wheel error: $syscall errno=$errno: $error", 'error' );
1026              
1027 0         0 return;
1028             }
1029              
1030             # --- Lock trigger logic ---
1031              
1032             # Parse the lock-trigger spec and set up the appropriate watcher
1033             sub _lock_trigger_setup
1034             {
1035 0     0   0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1036 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
1037              
1038 0   0     0 my $trigger = $opt->lock_trigger || 'none:time:10000';
1039 0         0 my ( $handle, $filter, $spec ) = split( /:/, $trigger, 3 );
1040              
1041 0   0     0 poe->heap->{ 'command' }->{ 'trigger' } = {
      0        
      0        
1042             'handle' => $handle || 'none',
1043             'filter' => $filter || 'time',
1044             'spec' => $spec || '10000',
1045             };
1046              
1047 0         0 $debug->( 'STDERR', __LINE__, "Lock trigger configured: handle=$handle filter=$filter spec=$spec" );
1048              
1049             # If the trigger is time-based, set up a delay
1050 0 0       0 if ( $filter eq 'time' )
    0          
    0          
1051             {
1052 0   0     0 my $delay_ms = $spec || 10000;
1053 0         0 my $delay_s = $delay_ms / 1000;
1054 0         0 $debug->( 'STDERR', __LINE__, "Time-based trigger: will fire in ${delay_ms}ms." );
1055 0         0 poe->kernel->delay( 'lock_trigger_fire' => $delay_s );
1056             }
1057             # If the trigger is connect-based, try a TCP connection
1058             elsif ( $filter eq 'connect' )
1059             {
1060 0         0 $debug->( 'STDERR', __LINE__, "Connect-based trigger: will try connecting to $spec." );
1061 0         0 poe->kernel->delay( 'lock_trigger_connect' => 1 );
1062             }
1063             # If the trigger is script-based, run the script
1064             elsif ( $filter eq 'script' )
1065             {
1066 0         0 $debug->( 'STDERR', __LINE__, "Script-based trigger: will run $spec." );
1067 0         0 poe->kernel->delay( 'lock_trigger_script' => 1 );
1068             }
1069             # text and regex triggers are checked inline via _lock_trigger_check
1070              
1071 0         0 return;
1072             }
1073              
1074             # Check a line of output against text/regex triggers
1075 0         0 sub _lock_trigger_check ( $source, $line )
1076 0     0   0 {
  0         0  
  0         0  
1077 0         0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1078 0         0 my $trigger = poe->heap->{ 'command' }->{ 'trigger' };
1079              
1080 0 0       0 return unless $trigger;
1081              
1082 0         0 my $handle = $trigger->{ 'handle' };
1083 0         0 my $filter = $trigger->{ 'filter' };
1084 0         0 my $spec = $trigger->{ 'spec' };
1085              
1086             # Check if this source matches the handle
1087 0 0 0     0 return if ( $handle eq 'stdout' && $source ne 'stdout' );
1088 0 0 0     0 return if ( $handle eq 'stderr' && $source ne 'stderr' );
1089             # 'both' and 'none' match everything (none has no output filter)
1090              
1091 0 0       0 if ( $filter eq 'text' )
    0          
1092             {
1093 0 0       0 if ( index( $line, $spec ) != -1 )
1094             {
1095 0         0 $debug->( 'STDERR', __LINE__, "Text trigger matched: '$spec' found in $source output." );
1096 0         0 poe->kernel->yield( 'lock_trigger_fire' );
1097             }
1098             }
1099             elsif ( $filter eq 'regex' )
1100             {
1101 0 0       0 if ( $line =~ m{$spec} )
1102             {
1103 0         0 $debug->( 'STDERR', __LINE__, "Regex trigger matched: /$spec/ found in $source output." );
1104 0         0 poe->kernel->yield( 'lock_trigger_fire' );
1105             }
1106             }
1107              
1108 0         0 return;
1109             }
1110              
1111             # Fire the lock trigger - report success to the lock server
1112             sub lock_trigger_fire
1113             {
1114 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1115 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
1116              
1117             # Only fire once
1118 0 0       0 if ( poe->heap->{ 'command' }->{ 'trigger_ok' } )
1119             {
1120 0         0 return;
1121             }
1122              
1123 0         0 poe->heap->{ 'command' }->{ 'trigger_ok' } = 1;
1124              
1125 0         0 $debug->( 'STDERR', __LINE__, "Lock trigger fired, reporting success to server." );
1126              
1127             # Send trigger_ok directly via the wheel (not via yield, to avoid cross-session issues)
1128 0 0       0 if ( poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'wheel' } )
1129             {
1130 0         0 my $msg = { 'event' => 'trigger_ok', 'lock_id' => $opt->lock_id };
1131 0         0 poe->heap->{ 'afunixcli' }->{ 'server' }->{ 'wheel' }->put( $msg );
1132 0         0 $debug->( 'STDERR', __LINE__, "Sent trigger_ok to server." );
1133             }
1134              
1135             # If the command has already exited, schedule shutdown after a brief delay
1136             # to allow the trigger_ok message to flush to the server
1137 0 0       0 if ( !poe->heap->{ 'command' }->{ 'running' } )
1138             {
1139 0         0 $debug->( 'STDERR', __LINE__, "Trigger fired and command already exited, shutting down shortly." );
1140 0         0 poe->heap->{ '_' }->{ 'set_exit' }->( '0', 'trigger-ok-command-exited' );
1141 0         0 poe->kernel->delay( 'scheduler' => 0.5 );
1142             }
1143              
1144 0         0 return;
1145             }
1146              
1147             # Attempt a TCP connect for the connect trigger type
1148             sub lock_trigger_connect
1149             {
1150 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1151 0         0 my $trigger = poe->heap->{ 'command' }->{ 'trigger' };
1152 0   0     0 my $spec = $trigger->{ 'spec' } || '';
1153              
1154             # Already triggered
1155 0 0       0 return if poe->heap->{ 'command' }->{ 'trigger_ok' };
1156              
1157             # Parse host:port from spec
1158 0         0 my ( $host, $port ) = split( /:/, $spec, 2 );
1159 0 0 0     0 if ( !$host || !$port )
1160             {
1161 0         0 $debug->( 'STDERR', __LINE__, "Connect trigger: invalid spec '$spec', expected host:port." );
1162 0         0 return;
1163             }
1164              
1165 0         0 $debug->( 'STDERR', __LINE__, "Connect trigger: trying $host:$port." );
1166              
1167             my $ok = try {
1168 0     0   0 my $sock = IO::Socket::INET->new(
1169             PeerAddr => $host,
1170             PeerPort => $port,
1171             Proto => 'tcp',
1172             Timeout => 2,
1173             );
1174 0 0       0 if ( $sock )
1175             {
1176 0         0 close( $sock );
1177 0         0 return 1;
1178             }
1179 0         0 return 0;
1180             }
1181             catch {
1182 0     0   0 return 0;
1183 0         0 };
1184              
1185 0 0       0 if ( $ok )
1186             {
1187 0         0 $debug->( 'STDERR', __LINE__, "Connect trigger: connection to $host:$port succeeded." );
1188 0         0 poe->kernel->yield( 'lock_trigger_fire' );
1189             }
1190             else
1191             {
1192             # Retry after 1 second
1193 0         0 poe->kernel->delay( 'lock_trigger_connect' => 1 );
1194             }
1195              
1196 0         0 return;
1197             }
1198              
1199             # Run an external script for the script trigger type
1200             sub lock_trigger_script
1201             {
1202 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1203 0         0 my $trigger = poe->heap->{ 'command' }->{ 'trigger' };
1204 0   0     0 my $spec = $trigger->{ 'spec' } || '';
1205              
1206             # Already triggered
1207 0 0       0 return if poe->heap->{ 'command' }->{ 'trigger_ok' };
1208              
1209 0         0 $debug->( 'STDERR', __LINE__, "Script trigger: running '$spec'." );
1210              
1211             # WARNING: system() blocks the event loop. Using alarm() to cap execution time.
1212 0         0 my $exit_code;
1213 0         0 eval {
1214 0     0   0 local $SIG{ 'ALRM' } = sub { die "script_timeout\n" };
  0         0  
1215 0         0 alarm( 30 );
1216 0         0 $exit_code = system( $spec );
1217 0         0 alarm( 0 );
1218             };
1219 0 0       0 if ( $@ )
1220             {
1221 0         0 $debug->( 'STDERR', __LINE__, "Script trigger: '$spec' timed out after 30s." );
1222 0         0 $exit_code = -1;
1223             }
1224              
1225 0 0       0 if ( $exit_code == 0 )
1226             {
1227 0         0 $debug->( 'STDERR', __LINE__, "Script trigger: '$spec' exited 0 (success)." );
1228 0         0 poe->kernel->yield( 'lock_trigger_fire' );
1229             }
1230             else
1231             {
1232 0         0 $debug->( 'STDERR', __LINE__, "Script trigger: '$spec' exited non-zero, retrying." );
1233 0         0 poe->kernel->delay( 'lock_trigger_script' => 1 );
1234             }
1235              
1236 0         0 return;
1237             }
1238              
1239             # --- Lock client timeout ---
1240              
1241             # Fire when the lock client timeout expires without receiving "run"
1242             sub lock_client_timeout_fire
1243             {
1244 0     0 0 0 my $debug = poe->heap->{ '_' }->{ 'debug' };
1245 0         0 my $opt = poe->heap->{ '_' }->{ 'opt' };
1246              
1247             # If command already started (run received), ignore
1248 0 0       0 return if poe->heap->{ 'command' }->{ 'lock_cleared' };
1249              
1250 0         0 $debug->( 'STDERR', __LINE__,
1251             "WARNING: Lock client timeout reached (" . $opt->lock_client_timeout . "s), starting command without server permission.",
1252             'error' );
1253 0         0 poe->heap->{ 'command' }->{ 'lock_cleared' } = 1;
1254 0         0 poe->kernel->yield( 'command_start' );
1255              
1256 0         0 return;
1257             }
1258              
1259             # Cancel the lock client timeout (called when "run" is received)
1260             sub lock_client_timeout_cancel
1261             {
1262 0     0 0 0 poe->kernel->delay( 'lock_client_timeout_fire' );
1263 0         0 return;
1264             }
1265              
1266             # --- Signal handlers ---
1267              
1268             sub sig_int
1269             {
1270              
1271             # Set an appropriate exit
1272 0     0 0 0 poe->heap->{ '_' }->{ 'set_exit' }->( '1', 'sigint' );
1273              
1274             # Announce the event
1275 0         0 poe->heap->{ '_' }->{ 'debug' }->( 'STDERR', __LINE__, 'Signal: INT - starting controlled shutdown.' );
1276              
1277             # Tell the kernel to ignore the term we are handling it
1278 0         0 poe->kernel->sig_handled();
1279              
1280             # Send kill to the child process if running
1281 0 0       0 if ( poe->heap->{ 'command' }->{ 'wheel' } )
1282             {
1283 0         0 poe->heap->{ 'command' }->{ 'wheel' }->kill( 'INT' );
1284             }
1285              
1286             # Prevent restarts during shutdown
1287 0         0 poe->heap->{ 'command' }->{ 'shutting_down' } = 1;
1288              
1289             # Clean up the unix socket file if it exists
1290 0         0 my $socket_path = poe->heap->{'afunixsrv'}->{'socket_path'};
1291 0 0 0     0 unlink $socket_path if $socket_path && -e $socket_path;
1292              
1293             # Stop the event wheel
1294 0         0 poe->kernel->stop();
1295              
1296 0         0 return;
1297             }
1298              
1299             sub sig_term
1300             {
1301              
1302             # Set an appropriate exit
1303 0     0 0 0 poe->heap->{ '_' }->{ 'set_exit' }->( '1', 'sigterm' );
1304              
1305             # Announce the event
1306 0         0 poe->heap->{ '_' }->{ 'debug' }->( 'STDERR', __LINE__, 'Signal: TERM - starting controlled shutdown.' );
1307              
1308             # Tell the kernel to ignore the term we are handling it
1309 0         0 poe->kernel->sig_handled();
1310              
1311             # Send kill to the child process if running
1312 0 0       0 if ( poe->heap->{ 'command' }->{ 'wheel' } )
1313             {
1314 0         0 poe->heap->{ 'command' }->{ 'wheel' }->kill( 'TERM' );
1315             }
1316              
1317             # Prevent restarts during shutdown
1318 0         0 poe->heap->{ 'command' }->{ 'shutting_down' } = 1;
1319              
1320             # Clean up the unix socket file if it exists
1321 0         0 my $socket_path = poe->heap->{'afunixsrv'}->{'socket_path'};
1322 0 0 0     0 unlink $socket_path if $socket_path && -e $socket_path;
1323              
1324             # Stop the event wheel
1325 0         0 poe->kernel->stop();
1326              
1327 0         0 return;
1328             }
1329              
1330             sub sig_chld
1331             {
1332              
1333             # Announce the event
1334 20     20 0 8290 poe->heap->{ '_' }->{ 'debug' }->( 'STDERR', __LINE__, 'Signal CHLD received.' );
1335              
1336             # Let POE handle the child reaping
1337 20         51 poe->kernel->sig_handled();
1338              
1339 20         309 return;
1340             }
1341              
1342             sub sig_usr
1343             {
1344              
1345             # Announce the event
1346 0     0 0 0 poe->heap->{ '_' }->{ 'debug' }->( 'STDERR', __LINE__, 'Signal USR, ignoring' );
1347              
1348 0         0 return;
1349             }
1350              
1351             # --- Scheduler ---
1352              
1353             # The scheduler decides what to do based on the operating mode
1354             sub scheduler
1355             {
1356 8     8 0 2200 my $debug = poe->heap->{ '_' }->{ 'debug' };
1357 8         104 my $opt = poe->heap->{ '_' }->{ 'opt' };
1358              
1359             # If called after trigger_ok, this is a deferred shutdown
1360 8 50 33     58 if ( poe->heap->{ 'command' }->{ 'trigger_ok' } && !poe->heap->{ 'command' }->{ 'running' } )
1361             {
1362 0         0 $debug->( 'STDERR', __LINE__, "Scheduler: deferred shutdown after trigger." );
1363 0         0 poe->kernel->stop();
1364 0         0 return;
1365             }
1366              
1367 8 50       104 if ( $opt->lock_client )
    50          
1368             {
1369             # Lock client mode: wait for the server to tell us to run
1370             # The afunixcli_server_input handler will post command_start when it receives "run"
1371 0         0 $debug->( 'STDERR', __LINE__, "Scheduler: lock-client mode, waiting for server signal." );
1372              
1373             # Set a timeout if configured
1374 0   0     0 my $timeout = $opt->lock_client_timeout || 0;
1375 0 0       0 if ( $timeout > 0 )
1376             {
1377 0         0 $debug->( 'STDERR', __LINE__, "Scheduler: lock-client timeout set to ${timeout}s." );
1378 0         0 poe->kernel->delay( 'lock_client_timeout_fire' => $timeout );
1379             }
1380             }
1381             elsif ( $opt->lock_server )
1382             {
1383             # Lock server mode: listen for connections and process the order
1384             # The afunixsrv_client_input handler manages the ordering protocol
1385 0         0 $debug->( 'STDERR', __LINE__, "Scheduler: lock-server mode, listening for clients." );
1386             }
1387             else
1388             {
1389             # Standalone mode: start the command immediately
1390 8         82 $debug->( 'STDERR', __LINE__, "Scheduler: standalone mode, starting command." );
1391 8         31 poe->kernel->yield( 'command_start' );
1392             }
1393              
1394 8         461 return;
1395             }
1396              
1397             __END__