File Coverage

blib/lib/POE/Component/Server/Syslog/TCP.pm
Criterion Covered Total %
statement 65 101 64.3
branch 11 28 39.2
condition 2 9 22.2
subroutine 12 16 75.0
pod 4 8 50.0
total 94 162 58.0


line stmt bran cond sub pod time code
1             # $Id: TCP.pm 446 2004-12-27 00:57:57Z sungo $
2             package POE::Component::Server::Syslog::TCP;
3              
4 5     5   992683 use warnings;
  5         11  
  5         205  
5 5     5   26 use strict;
  5         10  
  5         593  
6              
7             our $VERSION = '1.20';
8              
9             sub BINDADDR () { '0.0.0.0' }
10             sub BINDPORT () { 514 }
11             sub DATAGRAM_MAXLEN () { 1024 } # syslogd defaults to this. as do most
12             # libc implementations of syslog
13              
14 5     5   6393 use Params::Validate qw(validate_with);
  5         99421  
  5         488  
15 5     5   49 use Carp qw(carp croak);
  5         23  
  5         291  
16 5     5   31 use Socket;
  5         12  
  5         5018  
17              
18 5         49 use POE qw(
19             Driver::SysRW
20             Wheel::SocketFactory
21             Wheel::ReadWrite
22             Filter::Syslog
23 5     5   37 );
  5         9  
24              
25              
26             sub spawn {
27 3     3 1 1099 my $class = shift;
28              
29             my %args = validate_with(
30             params => \@_,
31             spec => {
32             InputState => {
33             type => &Params::Validate::CODEREF,
34             optional => 1,
35 0     0   0 default => sub {},
36             },
37             ErrorState => {
38             type => &Params::Validate::CODEREF,
39             optional => 1,
40 0     0   0 default => sub {},
41             },
42 3         359 BindAddress => {
43             type => &Params::Validate::SCALAR,
44             optional => 1,
45             default => BINDADDR,
46             },
47             BindPort => {
48             type => &Params::Validate::SCALAR,
49             optional => 1,
50             default => BINDPORT,
51             },
52             MaxLen => {
53             type => &Params::Validate::SCALAR,
54             optional => 1,
55             default => DATAGRAM_MAXLEN,
56             },
57             Alias => {
58             type => &Params::Validate::SCALAR,
59             optional => 1,
60             },
61             },
62             );
63              
64 3         47 $args{type} = 'tcp';
65 3         37 $args{filter} = POE::Filter::Syslog->new();
66              
67 3         118 my $sess = POE::Session->create(
68             inline_states => {
69             _start => \&start,
70             _stop => \&shutdown,
71              
72             socket_connect => \&socket_connect,
73             socket_error => \&socket_error,
74             socket_input => \&socket_input,
75             register => \®ister,
76             unregister => \&unregister,
77             shutdown => \&shutdown,
78              
79             client_input => $args{InputState},
80             client_error => $args{ErrorState},
81              
82             },
83             heap => \%args,
84             );
85              
86              
87 3         639 return $sess;
88             }
89              
90             sub start {
91 3     3 0 1029 $_[HEAP]->{socketfactory} = POE::Wheel::SocketFactory->new(
92             BindAddress => $_[HEAP]->{BindAddress},
93             BindPort => $_[HEAP]->{BindPort},
94             SuccessEvent => 'socket_connect',
95             FailureEvent => 'client_error',
96             ListenQueue => $_[HEAP]->{MaxLen},
97             Reuse => 'yes',
98             );
99              
100 3 50       4553 unless($_[HEAP]->{socketfactory}) {
101 0         0 croak("Unable to setup socketfactory");
102             }
103 3 100       39 $_[KERNEL]->alias_set( $_[HEAP]->{Alias} ) if $_[HEAP]->{Alias};
104 3         56 return;
105             }
106              
107             sub socket_connect {
108 3     3 0 1517560 my $handle = $_[ARG0];
109 3         9 my $host;
110              
111 3 50       33 if( ( sockaddr_in( getpeername($handle) ) )[1]) {
112 3         61 $host = gethostbyaddr( ( sockaddr_in( getpeername($handle) ) )[1], AF_INET );
113             }
114             else {
115 0         0 $host = '[unknown]';
116             }
117              
118 3         935 my $wheel = POE::Wheel::ReadWrite->new(
119             Handle => $handle,
120             Driver => POE::Driver::SysRW->new(),
121             Filter => POE::Filter::Syslog->new(),
122             InputEvent => 'socket_input',
123             ErrorEvent => 'socket_error',
124             );
125              
126 3         1172 $_[HEAP]->{wheels}->{ $wheel->ID } = {
127             wheel => $wheel,
128             host => $host,
129             };
130 3         27 return;
131             }
132              
133             sub socket_error {
134 0     0 0 0 my ($errop, $errnum, $errstr, $wid) = @_[ARG0 .. ARG3];
135 0 0 0     0 unless( ($errnum == 0) && ($errop eq 'read') ) {
136 0         0 $_[KERNEL]->yield( 'client_error', $errop, $errnum, $errstr );
137             }
138 0         0 delete $_[HEAP]->{wheels}->{ $wid };
139 0         0 return;
140             }
141              
142             sub socket_input {
143 3     3 0 394 my ($input, $wid) = @_[ARG0, ARG1];
144 3         12 my $info = $_[HEAP]->{wheels}->{ $wid };
145              
146 3 50 33     46 if(ref $input && ref $input eq 'ARRAY') {
    50 33        
147 0         0 foreach my $record (@{ $input }) {
  0         0  
148 0         0 $input->{host} = $info->{host};
149 0         0 $_[KERNEL]->yield( 'client_input', $record );
150             }
151             }
152             elsif(ref $input && ref $input eq 'HASH') {
153 3         9 $input->{host} = $info->{host};
154 3         19 $_[KERNEL]->yield( 'client_input', $input );
155 3         31 $_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{inputevent}, $input )
156 3         8730 for keys %{ $_[HEAP]->{sessions} };
157             }
158             else {
159 0         0 $_[KERNEL]->yield( 'client_error', $input );
160 0         0 $_[KERNEL]->post( $_, $_[HEAP]->{sessions}->{$_}->{errorevent}, $input )
161 0         0 for grep { defined $_[HEAP]->{sessions}->{errorevent} }
  0         0  
162             keys %{ $_[HEAP]->{sessions} };
163             }
164 3         138 return;
165             }
166              
167             sub shutdown {
168 4     4 1 7412 my ($kernel,$heap) = @_[KERNEL,HEAP];
169 4 100       25 if($heap->{socketfactory}) {
170 3         24 $heap->{socketfactory}->pause_accept();
171 3         175 delete $heap->{socketfactory};
172             }
173 4         6721 delete $heap->{wheels};
174 4         1952 $kernel->alarm_remove_all();
175 4         192 $kernel->alias_remove( $_ ) for $kernel->alias_list();
176 4         21 $kernel->refcount_decrement( $_, __PACKAGE__ )
177 4         178 for keys %{ $heap->{sessions} };
178 4         86 return;
179             }
180              
181             sub register {
182 1     1 1 763 my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
183 1         6 my $sender_id = $sender->ID();
184 1         5 my %args;
185 1 50       6 if ( ref $_[ARG0] eq 'HASH' ) {
    0          
186 1         3 %args = %{ $_[ARG0] };
  1         7  
187             }
188             elsif ( ref $_[ARG0] eq 'ARRAY' ) {
189 0         0 %args = @{ $_[ARG0] };
  0         0  
190             }
191             else {
192 0         0 %args = @_[ARG0..$#_];
193             }
194 1         11 $args{lc $_} = delete $args{$_} for keys %args;
195 1 50       10 unless ( $args{inputevent} ) {
196 0         0 warn "No 'inputevent' argument supplied\n";
197 0         0 return;
198             }
199 1 50       7 if ( defined $self->{sessions}->{ $sender_id } ) {
200 0         0 $self->{sessions}->{ $sender_id } = \%args;
201             }
202             else {
203 1         3 $self->{sessions}->{ $sender_id } = \%args;
204 1         7 $kernel->refcount_increment( $sender_id, __PACKAGE__ );
205             }
206 1         41 return;
207             }
208              
209             sub unregister {
210 0     0 1   my ($kernel,$self,$sender) = @_[KERNEL,HEAP,SENDER];
211 0           my $sender_id = $sender->ID();
212 0           my %args;
213 0 0         if ( ref $_[ARG0] eq 'HASH' ) {
    0          
214 0           %args = %{ $_[ARG0] };
  0            
215             }
216             elsif ( ref $_[ARG0] eq 'ARRAY' ) {
217 0           %args = @{ $_[ARG0] };
  0            
218             }
219             else {
220 0           %args = @_[ARG0..$#_];
221             }
222 0           $args{lc $_} = delete $args{$_} for keys %args;
223 0           my $data = delete $self->{sessions}->{ $sender_id };
224 0 0         $kernel->refcount_decrement( $sender_id, __PACKAGE__ ) if $data;
225 0           return;
226             }
227              
228             1;
229             __END__