File Coverage

blib/lib/IO/Storm/Spout.pm
Criterion Covered Total %
statement 21 43 48.8
branch 2 16 12.5
condition n/a
subroutine 6 11 54.5
pod 6 6 100.0
total 35 76 46.0


line stmt bran cond sub pod time code
1             # ABSTRACT: The base class for all IO::Storm Spout.
2              
3             package IO::Storm::Spout;
4             $IO::Storm::Spout::VERSION = '0.17';
5             # Imports
6 1     1   99861 use strict;
  1         1  
  1         30  
7 1     1   13 use warnings;
  1         1  
  1         26  
8 1     1   8 use v5.10;
  1         2  
  1         26  
9              
10 1     1   3 use Moo;
  1         1  
  1         7  
11 1     1   251 use namespace::clean;
  1         1  
  1         7  
12              
13             extends 'IO::Storm::Component';
14              
15              
16             sub initialize {
17 0     0 1 0 my ( $self, $storm_conf, $context ) = @_;
18             }
19              
20              
21             sub ack {
22 0     0 1 0 my ( $self, $id ) = @_;
23             }
24              
25              
26             sub fail {
27 0     0 1 0 my ( $self, $id ) = @_;
28             }
29              
30              
31             sub next_tuple {
32 0     0 1 0 my ($self) = @_;
33             }
34              
35              
36             sub emit ($$;$) {
37 1     1 1 976 my ( $self, $tuple, $args ) = @_;
38              
39 1         4 my $msg = { command => 'emit', tuple => $tuple };
40              
41 1 50       4 if ( defined($args) ) {
42 0 0       0 if ( defined( $args->{tup_id} ) ) {
43 0         0 $msg->{id} = $args->{tup_id};
44             }
45 0 0       0 if ( defined( $args->{stream} ) ) {
46 0         0 $msg->{stream} = $args->{stream};
47             }
48 0 0       0 if ( defined( $args->{direct_task} ) ) {
49 0         0 $msg->{task} = $args->{direct_task};
50             }
51             }
52              
53 1         7 $self->send_message($msg);
54              
55 1 50       4 if ( defined $msg->{task} ) {
56 0         0 return $msg->{task};
57             }
58             else {
59 1         8 return $self->read_task_ids();
60             }
61             }
62              
63              
64             sub run {
65 0     0 1   my ($self) = @_;
66              
67 0           my ( $storm_conf, $context ) = $self->read_handshake();
68 0           $self->_setup_component( $storm_conf, $context );
69 0           $self->initialize( $storm_conf, $context );
70              
71 0           while (1) {
72 0           my $msg = $self->read_command();
73 0 0         if ( $msg->{command} eq 'next' ) {
    0          
    0          
74 0           $self->next_tuple;
75             }
76             elsif ( $msg->{command} eq 'ack' ) {
77 0           $self->ack( $msg->{id} );
78             }
79             elsif ( $msg->{command} eq 'fail' ) {
80 0           $self->fail( $msg->{id} );
81             }
82 0           $self->sync();
83             }
84             }
85              
86             1;
87              
88             __END__