File Coverage

blib/lib/Thread/Task/Concurrent.pm
Criterion Covered Total %
statement 85 97 87.6
branch 12 26 46.1
condition 4 9 44.4
subroutine 14 15 93.3
pod 6 6 100.0
total 121 153 79.0


line stmt bran cond sub pod time code
1             package Thread::Task::Concurrent;
2              
3 2     2   25840 use warnings;
  2         4  
  2         81  
4 2     2   10 use strict;
  2         4  
  2         62  
5 2     2   43 use 5.010;
  2         6  
  2         135  
6              
7             my $can_use_threads = $threads::threads;
8              
9 2     2   1033 use threads::shared;
  2         1313  
  2         11  
10              
11 2     2   2071 use Thread::Queue;
  2         8381  
  2         61  
12 2     2   624 use Thread::Task::Concurrent::Util qw/unshared_clone/;
  2         14  
  2         306  
13              
14             my ($tmsg_sub);
15              
16             if ($can_use_threads) {
17             $tmsg_sub = sub { say STDERR '[' . ( $_[1] // threads->tid ) . '] ' . $_[0]; };
18             } else {
19             warn "threads module not loaded, working in serial mode";
20             $tmsg_sub = sub { say STDERR '[' . ( $_[1] // "main" ) . '] ' . $_[0]; }
21             }
22              
23 2     2   1720 use Mouse;
  2         79761  
  2         13  
24              
25 2     2   697 use Mouse::Exporter;
  2         6  
  2         9  
26              
27             Mouse::Exporter->setup_import_methods( as_is => ['tmsg'] );
28              
29             our $VERSION = 0.01_05;
30              
31             has queue => ( is => 'rw' );
32             has task => ( is => 'rw', required => 1 );
33             has arg => ( is => 'rw' );
34             has max_instances => ( is => 'rw', default => 4 );
35             has threads => ( is => 'rw' );
36             has verbose => ( is => 'rw' );
37             has result_queue => ( is => 'rw' );
38             has finished => ( is => 'rw' );
39             has _start_time => ( is => 'rw' );
40             has _real_task => ( is => 'rw' );
41              
42             sub BUILD {
43 1     1 1 165 my ($self) = @_;
44              
45 1         9 my $q = Thread::Queue->new();
46 1         70 $self->queue($q);
47              
48 1         3 $self->result_queue( Thread::Queue->new() );
49             }
50              
51             {
52             my $enqueue_finished : shared;
53             my $wait : shared;
54             my $tasks_running : shared;
55              
56             sub start {
57 1     1 1 45 my ($self) = @_;
58              
59 1 50       6 tmsg( "starting", "main" ) if ( $self->verbose );
60 1         8 $self->_start_time(time);
61              
62 1         4 my $q = $self->queue;
63 1         4 my $rq = $self->result_queue;
64              
65 1         3 my $task = $self->task;
66 1         4 my $arg = $self->arg;
67              
68 1         2 $tasks_running = 0;
69              
70             my $real_task = sub {
71             ELEMENT:
72 1     1   3 while (1) {
73             {
74 6         7 lock($enqueue_finished);
  6         5  
75 6 100 66     25 if ( $enqueue_finished && $q->pending == 0 ) {
76             #broadcast as much as possible, so no thread gets stuck
77 1         8 lock($wait);
78 1         24 cond_broadcast($wait);
79 1         4 last ELEMENT;
80             }
81             }
82              
83 5         52 my $i = $q->dequeue_nb;
84 5 50       36 unless ( defined($i) ) {
85 0         0 lock($wait);
86 0         0 cond_wait($wait);
87 0         0 next ELEMENT;
88             }
89             {
90 5         14 lock($tasks_running);
  5         5  
91 5         6 $tasks_running++;
92             }
93              
94 5 50       17 tmsg("running task ...") if ( $self->verbose );
95 5         12 my @result = $task->( $i, $arg );
96 5 50 33     111 $rq->enqueue(@result) if ( @result && @result > 0 );
97              
98             {
99 5         203 lock($tasks_running);
  5         6  
100 5         4 $tasks_running--;
101              
102 5 50       23 tmsg( "task done, tasks running: " . $tasks_running . ", pending: " . $q->pending )
103             if ( $self->verbose );
104             }
105             }
106 1         7 };
107              
108 1         4 $self->_real_task($real_task);
109 1         2 my @threads;
110 1         6 for ( my $i = 0; $i < $self->max_instances; $i++ ) {
111             # early exit if no threads are loaded/supported
112 1 50       5 last unless ($can_use_threads);
113 0         0 push @threads, threads->create($real_task);
114              
115             }
116 1         5 $self->threads( \@threads );
117              
118 1         6 return $self;
119             }
120              
121             sub join {
122 1     1 1 5 my ($self) = @_;
123              
124             {
125 1         2 lock($enqueue_finished);
  1         2  
126 1         1 $enqueue_finished = 1;
127             }
128              
129 1         12 my $threads = $self->threads;
130 1         3 for my $t (@$threads) {
131              
132             #broadcast as much as possible, so no thread gets stuck
133             {
134 0         0 lock($wait);
  0         0  
135 0         0 cond_broadcast($wait);
136             }
137 0 0       0 tmsg( "waiting for thread " . $t->tid, 'main' ) if ( $self->verbose );
138 0         0 $t->join;
139 0 0       0 tmsg( "thread " . $t->tid . " joined successfully", 'main' ) if ( $self->verbose );
140             }
141              
142             #execute task if we have no thread support (@$threads is empty)
143 1 50       6 $self->_real_task->() unless ($can_use_threads);
144              
145 1         3 $self->finished(1);
146 1 50       5 tmsg( "time: " . sprintf( "%dd %dh %dm %ds", ( gmtime( time - $self->_start_time ) )[ 7, 2, 1, 0 ] ),
147             'main' )
148             if ( $self->verbose );
149 1         3 $self;
150             }
151              
152             sub enqueue {
153 1     1 1 5 my ($self) = shift;
154              
155 1         5 $self->queue->enqueue(@_);
156 1         80 lock($wait);
157 1         26 cond_broadcast($wait);
158 1         4 return $self;
159             }
160              
161             sub result {
162 1     1 1 5 my ($self) = @_;
163              
164 1 50       12 $self->join
165             unless ( $self->finished );
166 1         3 my $rq = $self->result_queue;
167 1         2 my @results;
168 1         3 while ( defined( my $item = $rq->dequeue_nb ) ) {
169 5         49 push @results, unshared_clone($item);
170             }
171              
172 1 50 33     15 if ( @results && @results > 0 ) {
173 1         3 return \@results;
174             } else {
175 0           return;
176             }
177             }
178             }
179              
180 0     0 1   sub tmsg { return $tmsg_sub->(@_); }
181              
182             __PACKAGE__->meta->make_immutable;
183              
184             1;
185             __END__