File Coverage

blib/lib/Thread/Pipeline.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Thread::Pipeline;
2             {
3             $Thread::Pipeline::VERSION = '0.004';
4             }
5              
6             # $Id: Pipeline.pm 16 2012-12-28 09:03:33Z xliosha@gmail.com $
7              
8             # NAME: Thread::Pipeline
9             # ABSTRACT: multithreaded pipeline manager
10              
11              
12              
13 1     1   25096 use 5.010;
  1         5  
  1         85  
14 1     1   6 use strict;
  1         2  
  1         36  
15 1     1   7 use warnings;
  1         2  
  1         37  
16 1     1   6 use utf8;
  1         2  
  1         7  
17 1     1   24 use Carp;
  1         2  
  1         104  
18              
19 1     1   1059 use threads;
  0            
  0            
20             use threads::shared;
21             use Thread::Queue::Any;
22              
23              
24              
25             sub new {
26             my ($class, $blocks, %opt) = @_;
27             my $self :shared = shared_clone {
28             blocks => {},
29             out_queue => Thread::Queue::Any->new(),
30             input_ids => [],
31             };
32             bless $self, $class;
33              
34             if ( ref $blocks eq 'HASH' ) {
35             while ( my ($id, $info) = each %$blocks ) {
36             $self->add_block( $id => $info );
37             }
38             }
39             elsif ( ref $blocks eq 'ARRAY' ) {
40             for my $i ( 0 .. @$blocks/2 - 1 ) {
41             my ( $id, $info, $next_id ) = @$blocks[ $i*2 .. $i*2+2 ];
42             my %block = %$info;
43             $block{main_input} //= 1 if $i == 0;
44             $block{out} //= $next_id // '_out';
45             $self->add_block( $id => \%block );
46             }
47             }
48              
49             return $self;
50             }
51              
52              
53              
54             sub add_block {
55             my ($self, $block_id, $block_info, %opt) = @_;
56              
57             my $queue :shared = Thread::Queue::Any->new();
58             my $block = shared_clone {
59             queue => $queue,
60             };
61              
62             my $threads_num :shared = $block_info->{num_threads} || 1;
63             my $thread_sub = sub {
64             while (1) {
65             # get incoming data block
66             my ($in_data) = $queue->dequeue();
67              
68             # process it
69             my @out_data;
70             if ( defined $in_data || $block_info->{need_finalize} ) {
71             eval { @out_data = $block_info->{sub}->( $in_data, $self ); 1 }
72             or carp "Worker '$block_id' died in thread tid=" . threads->tid() . ": $@";
73             }
74              
75             # send results to next block
76             if ( $block_info->{out} ) {
77             for my $item ( @out_data ) {
78             next if !defined $item;
79             $self->enqueue( $item, block => $block_info->{out} );
80             }
81             }
82              
83             # finish work if incoming data was undefined
84             last if !defined $in_data;
85             }
86              
87             lock $threads_num;
88             $threads_num --;
89              
90             # send undef to next block
91             if ( !$threads_num ) {
92             $block_info->{post_sub}->() if $block_info->{post_sub};
93             if ( $block_info->{out} && $block_info->{out} ne '_out' ) {
94             $self->no_more_data($block_info->{out});
95             }
96             }
97              
98             return;
99             };
100              
101             my @threads = map { threads->create($thread_sub) } ( 1 .. $threads_num );
102             $block->{threads} = shared_clone \@threads;
103              
104             $self->{blocks}->{$block_id} = $block;
105             push @{ $self->{input_ids} }, $block_id if $block_info->{main_input};
106              
107             return $self;
108             }
109              
110              
111              
112             sub enqueue {
113             my ($self, $data, %opt) = @_;
114              
115             my $ids = $opt{block} || $self->{input_ids};
116              
117             for my $block_id ( @{ ref $ids ? $ids : [$ids] } ) {
118             if ( $block_id eq '_out' ) {
119             $self->{out_queue}->enqueue($data);
120             }
121             else {
122             my $block = $self->{blocks}->{$block_id};
123             croak "Unknown block id: $block_id" if !$block;
124             $block->{queue}->enqueue( $data );
125             }
126             }
127              
128             return $self;
129             }
130              
131              
132              
133             sub no_more_data {
134             my ($self, $ids) = @_;
135             $ids ||= $self->{input_ids};
136              
137             for my $block_id ( @{ ref $ids ? $ids : [$ids] } ) {
138             my $num = $self->get_threads_num($block_id);
139             my $block = $self->{blocks}->{$block_id};
140             $block->{queue}->enqueue( undef ) for ( 1 .. $num );
141             }
142              
143             return $self;
144             }
145              
146              
147              
148              
149             sub get_results {
150             my ($self, %opt) = @_;
151              
152             for my $block ( values %{ $self->{blocks} } ) {
153             for my $thread ( @{ $block->{threads} } ) {
154             $thread->join();
155             }
156             }
157              
158             my @result;
159             while ( my @items = $self->{out_queue}->dequeue_dontwait() ) {
160             push @result, @items;
161             }
162              
163             return @result;
164             }
165              
166              
167              
168             sub get_threads_num {
169             my ($self, $block_id) = @_;
170              
171             my $block = $self->{blocks}->{$block_id};
172             croak "Unknown block id: $block_id" if !$block;
173              
174             return scalar @{ $block->{threads} };
175             }
176              
177              
178              
179             1;
180              
181             __END__