File Coverage

blib/lib/IO/Storm/Component.pm
Criterion Covered Total %
statement 92 118 77.9
branch 9 26 34.6
condition 0 6 0.0
subroutine 20 22 90.9
pod 6 8 75.0
total 127 180 70.5


line stmt bran cond sub pod time code
1             # ABSTRACT: The base class for Bolts and Spouts.
2              
3             package IO::Storm::Component;
4             $IO::Storm::Component::VERSION = '0.17';
5             # Imports
6 3     3   97377 use strict;
  3         5  
  3         90  
7 3     3   21 use warnings;
  3         3  
  3         71  
8 3     3   24 use v5.10;
  3         7  
  3         89  
9 3     3   10 use IO::Handle qw(autoflush);
  3         4  
  3         237  
10 3     3   1443 use IO::File;
  3         2225  
  3         363  
11 3     3   15 use Log::Log4perl qw(:easy);
  3         3  
  3         96  
12 3     3   1429 use JSON::XS;
  3         4  
  3         117  
13 3     3   12 use Data::Dumper;
  3         1  
  3         113  
14 3     3   14 use IO::Storm::Tuple;
  3         3  
  3         51  
15              
16             # Setup Moo for object-oriented niceties
17 3     3   9 use Moo;
  3         3  
  3         20  
18 3     3   5224 use namespace::clean;
  3         5  
  3         22  
19              
20             # Setup STDIN/STDOUT/STDERR to use UTF8
21             binmode STDERR, ':utf8';
22 3     3   19 binmode STDIN, ':encoding(UTF-8)';
  3         5  
  3         24  
23             binmode STDOUT, ':utf8';
24              
25             has '_pending_commands' => (
26             is => 'rw',
27             default => sub { [] },
28             );
29              
30             has '_pending_taskids' => (
31             is => 'rw',
32             default => sub { [] },
33             );
34              
35             has '_stdin' => (
36             is => 'rw',
37             default => sub {
38             my $io = IO::Handle->new;
39             $io->fdopen( fileno(STDIN), 'r' );
40             }
41             );
42              
43             has 'max_lines' => (
44             is => 'rw',
45             default => 100
46             );
47              
48             has 'max_blank_msgs' => (
49             is => 'rw',
50             default => 500
51             );
52              
53             has '_json' => (
54             is => 'rw',
55             default => sub { JSON::XS->new->allow_blessed->convert_blessed }
56             );
57              
58             has '_topology_name' => (
59             is => 'rw',
60             init_args => undef
61             );
62              
63             has '_task_id' => (
64             is => 'rw',
65             init_args => undef
66             );
67              
68             has '_component_name' => (
69             is => 'rw',
70             init_args => undef
71             );
72              
73             has '_debug' => (
74             is => 'rw',
75             init_args => undef
76             );
77              
78             has '_storm_conf' => (
79             is => 'rw',
80             init_args => undef
81             );
82              
83             has '_context' => (
84             is => 'rw',
85             init_args => undef
86             );
87              
88             my $logger = Log::Log4perl->get_logger('storm');
89              
90              
91             sub _setup_component {
92 0     0   0 my ( $self, $storm_conf, $context ) = @_;
93 0         0 my $conf_is_hash = ref($storm_conf) eq ref {};
94 0 0 0     0 $self->_topology_name(
95             ( $conf_is_hash && exists( $storm_conf->{'topology.name'} ) )
96             ? $storm_conf->{'topology.name'}
97             : ''
98             );
99 0 0       0 $self->_task_id( exists( $context->{taskid} ) ? $context->{taskid} : '' );
100 0         0 $self->_component_name('');
101 0 0       0 if ( exists( $context->{'task->component'} ) ) {
102 0         0 my $task_component = $context->{'task->component'};
103 0 0       0 if ( exists( $task_component->{ $self->_task_id } ) ) {
104 0         0 $self->_component_name( $task_component->{ $self->_task_id } );
105             }
106             }
107             $self->_debug(
108 0 0 0     0 ( $conf_is_hash && exists( $storm_conf->{'topology.debug'} ) )
109             ? $storm_conf->{'topology.debug'}
110             : 0
111             );
112 0         0 $self->_storm_conf($storm_conf);
113 0         0 $self->_context($context);
114             }
115              
116              
117             sub read_message {
118 13     13 1 58 $logger->debug('start read_message');
119 13         77 my $self = shift;
120 13         14 my $blank_lines = 0;
121 13         9 my $message_size = 0;
122 13         17 my $line = '';
123              
124 13         16 my @messages = ();
125 13         12 while (1) {
126 26         129 $line = $self->_stdin->getline;
127 26 50       1161 if ( defined($line) ) {
128 26         77 $logger->debug("read_message: line=$line");
129             }
130             else {
131 0         0 $logger->error( "Received EOF while trying to read stdin from "
132             . "Storm, pipe appears to be broken, exiting." );
133 0         0 exit(1);
134             }
135 26 100       156 if ( $line eq "end\n" ) {
    50          
    50          
136 13         19 last;
137             }
138             elsif ( $line eq '' ) {
139 0         0 $logger->error( "Received EOF while trying to read stdin from "
140             . "Storm, pipe appears to be broken, exiting." );
141 0         0 exit(1);
142             }
143             elsif ( $line eq "\n" ) {
144 0         0 $blank_lines++;
145 0 0       0 if ( $blank_lines % 1000 == 0 ) {
146 0         0 $logger->warn( "While trying to read a command or pending "
147             . "task ID, Storm has instead sent $blank_lines "
148             . "'\\n' messages." );
149 0         0 next;
150             }
151             }
152 13         22 chomp($line);
153 13         17 push( @messages, $line );
154             }
155              
156 13         124 return $self->_json->decode( join( "\n", @messages ) );
157             }
158              
159             sub read_task_ids {
160 7     7 0 480 my $self = shift;
161              
162 7 50       8 if ( scalar( @{ $self->_pending_taskids } ) ) {
  7         37  
163 0         0 return shift( @{ $self->_pending_taskids } );
  0         0  
164             }
165             else {
166 7         20 my $msg = $self->read_message();
167 7         28 while ( ref($msg) ne 'ARRAY' ) {
168 3         3 push( @{ $self->_pending_commands }, $msg );
  3         12  
169 3         7 $msg = $self->read_message();
170             }
171              
172 7         30 return $msg;
173             }
174             }
175              
176             sub read_command {
177 3     3 0 30 my $self = shift;
178              
179 3 100       4 if ( @{ $self->_pending_commands } ) {
  3         13  
180 2         3 return shift( @{ $self->_pending_commands } );
  2         7  
181             }
182             else {
183 1         3 my $msg = $self->read_message();
184 1         7 while ( ref($msg) eq 'ARRAY' ) {
185 0         0 push( @{ $self->_pending_taskids }, $msg );
  0         0  
186 0         0 $msg = $self->read_message();
187             }
188 1         3 return $msg;
189             }
190             }
191              
192              
193             sub read_tuple {
194 2     2 1 790 my $self = shift;
195 2         15 $logger->debug('read_tuple');
196              
197 2         15 my $tupmap = $self->read_command();
198              
199 2         19 return IO::Storm::Tuple->new(
200             id => $tupmap->{id},
201             component => $tupmap->{comp},
202             stream => $tupmap->{stream},
203             task => $tupmap->{task},
204             values => $tupmap->{tuple}
205             );
206             }
207              
208              
209             sub read_handshake {
210 1     1 1 2676 my $self = shift;
211              
212             # TODO: Figure out how to redirect stdout to ensure that print
213             # statements/functions won't crash the Storm Java worker
214              
215 1         11 autoflush STDOUT 1;
216 1         53 autoflush STDERR 1;
217              
218 1         24 my $msg = $self->read_message();
219             $logger->debug(
220 1     0   8 sub { 'Received initial handshake from Storm: ' . Dumper($msg) } );
  0         0  
221              
222             # Write a blank PID file out to the pidDir
223 1         10 my $pid = $$;
224 1         3 my $pid_dir = $msg->{pidDir};
225 1         4 my $filename = $pid_dir . '/' . $pid;
226 1 50       75 open my $fh, '>', $filename
227             or die "Cant't write to '$filename': $!\n";
228 1         11 $fh->close;
229 1         19 $logger->debug("Sending process ID $pid to Storm");
230 1         12 $self->send_message( { pid => int($pid) } );
231              
232 1         14 return [ $msg->{conf}, $msg->{context} ];
233             }
234              
235              
236             sub send_message {
237 11     11 1 4083 my ( $self, $msg ) = @_;
238 11         485 say $self->_json->encode($msg);
239 11         123 say "end";
240             }
241              
242              
243             sub sync {
244 1     1 1 1447 my $self = shift;
245 1         7 $self->send_message( { command => 'sync' } );
246             }
247              
248              
249             sub log {
250 1     1 1 1118 my ( $self, $message ) = @_;
251 1         5 $self->send_message( { command => 'log', msg => $message } );
252             }
253              
254             1;
255              
256             __END__