line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Thread::Task::Concurrent; |
2
|
|
|
|
|
|
|
|
3
|
2
|
|
|
2
|
|
25840
|
use warnings; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
81
|
|
4
|
2
|
|
|
2
|
|
10
|
use strict; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
62
|
|
5
|
2
|
|
|
2
|
|
43
|
use 5.010; |
|
2
|
|
|
|
|
6
|
|
|
2
|
|
|
|
|
135
|
|
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
my $can_use_threads = $threads::threads; |
8
|
|
|
|
|
|
|
|
9
|
2
|
|
|
2
|
|
1033
|
use threads::shared; |
|
2
|
|
|
|
|
1313
|
|
|
2
|
|
|
|
|
11
|
|
10
|
|
|
|
|
|
|
|
11
|
2
|
|
|
2
|
|
2071
|
use Thread::Queue; |
|
2
|
|
|
|
|
8381
|
|
|
2
|
|
|
|
|
61
|
|
12
|
2
|
|
|
2
|
|
624
|
use Thread::Task::Concurrent::Util qw/unshared_clone/; |
|
2
|
|
|
|
|
14
|
|
|
2
|
|
|
|
|
306
|
|
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
my ($tmsg_sub); |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
if ($can_use_threads) { |
17
|
|
|
|
|
|
|
$tmsg_sub = sub { say STDERR '[' . ( $_[1] // threads->tid ) . '] ' . $_[0]; }; |
18
|
|
|
|
|
|
|
} else { |
19
|
|
|
|
|
|
|
warn "threads module not loaded, working in serial mode"; |
20
|
|
|
|
|
|
|
$tmsg_sub = sub { say STDERR '[' . ( $_[1] // "main" ) . '] ' . $_[0]; } |
21
|
|
|
|
|
|
|
} |
22
|
|
|
|
|
|
|
|
23
|
2
|
|
|
2
|
|
1720
|
use Mouse; |
|
2
|
|
|
|
|
79761
|
|
|
2
|
|
|
|
|
13
|
|
24
|
|
|
|
|
|
|
|
25
|
2
|
|
|
2
|
|
697
|
use Mouse::Exporter; |
|
2
|
|
|
|
|
6
|
|
|
2
|
|
|
|
|
9
|
|
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
Mouse::Exporter->setup_import_methods( as_is => ['tmsg'] ); |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
our $VERSION = 0.01_05; |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
has queue => ( is => 'rw' ); |
32
|
|
|
|
|
|
|
has task => ( is => 'rw', required => 1 ); |
33
|
|
|
|
|
|
|
has arg => ( is => 'rw' ); |
34
|
|
|
|
|
|
|
has max_instances => ( is => 'rw', default => 4 ); |
35
|
|
|
|
|
|
|
has threads => ( is => 'rw' ); |
36
|
|
|
|
|
|
|
has verbose => ( is => 'rw' ); |
37
|
|
|
|
|
|
|
has result_queue => ( is => 'rw' ); |
38
|
|
|
|
|
|
|
has finished => ( is => 'rw' ); |
39
|
|
|
|
|
|
|
has _start_time => ( is => 'rw' ); |
40
|
|
|
|
|
|
|
has _real_task => ( is => 'rw' ); |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
sub BUILD { |
43
|
1
|
|
|
1
|
1
|
165
|
my ($self) = @_; |
44
|
|
|
|
|
|
|
|
45
|
1
|
|
|
|
|
9
|
my $q = Thread::Queue->new(); |
46
|
1
|
|
|
|
|
70
|
$self->queue($q); |
47
|
|
|
|
|
|
|
|
48
|
1
|
|
|
|
|
3
|
$self->result_queue( Thread::Queue->new() ); |
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
{ |
52
|
|
|
|
|
|
|
my $enqueue_finished : shared; |
53
|
|
|
|
|
|
|
my $wait : shared; |
54
|
|
|
|
|
|
|
my $tasks_running : shared; |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
sub start { |
57
|
1
|
|
|
1
|
1
|
45
|
my ($self) = @_; |
58
|
|
|
|
|
|
|
|
59
|
1
|
50
|
|
|
|
6
|
tmsg( "starting", "main" ) if ( $self->verbose ); |
60
|
1
|
|
|
|
|
8
|
$self->_start_time(time); |
61
|
|
|
|
|
|
|
|
62
|
1
|
|
|
|
|
4
|
my $q = $self->queue; |
63
|
1
|
|
|
|
|
4
|
my $rq = $self->result_queue; |
64
|
|
|
|
|
|
|
|
65
|
1
|
|
|
|
|
3
|
my $task = $self->task; |
66
|
1
|
|
|
|
|
4
|
my $arg = $self->arg; |
67
|
|
|
|
|
|
|
|
68
|
1
|
|
|
|
|
2
|
$tasks_running = 0; |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
my $real_task = sub { |
71
|
|
|
|
|
|
|
ELEMENT: |
72
|
1
|
|
|
1
|
|
3
|
while (1) { |
73
|
|
|
|
|
|
|
{ |
74
|
6
|
|
|
|
|
7
|
lock($enqueue_finished); |
|
6
|
|
|
|
|
5
|
|
75
|
6
|
100
|
66
|
|
|
25
|
if ( $enqueue_finished && $q->pending == 0 ) { |
76
|
|
|
|
|
|
|
#broadcast as much as possible, so no thread gets stuck |
77
|
1
|
|
|
|
|
8
|
lock($wait); |
78
|
1
|
|
|
|
|
24
|
cond_broadcast($wait); |
79
|
1
|
|
|
|
|
4
|
last ELEMENT; |
80
|
|
|
|
|
|
|
} |
81
|
|
|
|
|
|
|
} |
82
|
|
|
|
|
|
|
|
83
|
5
|
|
|
|
|
52
|
my $i = $q->dequeue_nb; |
84
|
5
|
50
|
|
|
|
36
|
unless ( defined($i) ) { |
85
|
0
|
|
|
|
|
0
|
lock($wait); |
86
|
0
|
|
|
|
|
0
|
cond_wait($wait); |
87
|
0
|
|
|
|
|
0
|
next ELEMENT; |
88
|
|
|
|
|
|
|
} |
89
|
|
|
|
|
|
|
{ |
90
|
5
|
|
|
|
|
14
|
lock($tasks_running); |
|
5
|
|
|
|
|
5
|
|
91
|
5
|
|
|
|
|
6
|
$tasks_running++; |
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
5
|
50
|
|
|
|
17
|
tmsg("running task ...") if ( $self->verbose ); |
95
|
5
|
|
|
|
|
12
|
my @result = $task->( $i, $arg ); |
96
|
5
|
50
|
33
|
|
|
111
|
$rq->enqueue(@result) if ( @result && @result > 0 ); |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
{ |
99
|
5
|
|
|
|
|
203
|
lock($tasks_running); |
|
5
|
|
|
|
|
6
|
|
100
|
5
|
|
|
|
|
4
|
$tasks_running--; |
101
|
|
|
|
|
|
|
|
102
|
5
|
50
|
|
|
|
23
|
tmsg( "task done, tasks running: " . $tasks_running . ", pending: " . $q->pending ) |
103
|
|
|
|
|
|
|
if ( $self->verbose ); |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
} |
106
|
1
|
|
|
|
|
7
|
}; |
107
|
|
|
|
|
|
|
|
108
|
1
|
|
|
|
|
4
|
$self->_real_task($real_task); |
109
|
1
|
|
|
|
|
2
|
my @threads; |
110
|
1
|
|
|
|
|
6
|
for ( my $i = 0; $i < $self->max_instances; $i++ ) { |
111
|
|
|
|
|
|
|
# early exit if no threads are loaded/supported |
112
|
1
|
50
|
|
|
|
5
|
last unless ($can_use_threads); |
113
|
0
|
|
|
|
|
0
|
push @threads, threads->create($real_task); |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
} |
116
|
1
|
|
|
|
|
5
|
$self->threads( \@threads ); |
117
|
|
|
|
|
|
|
|
118
|
1
|
|
|
|
|
6
|
return $self; |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
sub join { |
122
|
1
|
|
|
1
|
1
|
5
|
my ($self) = @_; |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
{ |
125
|
1
|
|
|
|
|
2
|
lock($enqueue_finished); |
|
1
|
|
|
|
|
2
|
|
126
|
1
|
|
|
|
|
1
|
$enqueue_finished = 1; |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
|
129
|
1
|
|
|
|
|
12
|
my $threads = $self->threads; |
130
|
1
|
|
|
|
|
3
|
for my $t (@$threads) { |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
#broadcast as much as possible, so no thread gets stuck |
133
|
|
|
|
|
|
|
{ |
134
|
0
|
|
|
|
|
0
|
lock($wait); |
|
0
|
|
|
|
|
0
|
|
135
|
0
|
|
|
|
|
0
|
cond_broadcast($wait); |
136
|
|
|
|
|
|
|
} |
137
|
0
|
0
|
|
|
|
0
|
tmsg( "waiting for thread " . $t->tid, 'main' ) if ( $self->verbose ); |
138
|
0
|
|
|
|
|
0
|
$t->join; |
139
|
0
|
0
|
|
|
|
0
|
tmsg( "thread " . $t->tid . " joined successfully", 'main' ) if ( $self->verbose ); |
140
|
|
|
|
|
|
|
} |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
#execute task if we have no thread support (@$threads is empty) |
143
|
1
|
50
|
|
|
|
6
|
$self->_real_task->() unless ($can_use_threads); |
144
|
|
|
|
|
|
|
|
145
|
1
|
|
|
|
|
3
|
$self->finished(1); |
146
|
1
|
50
|
|
|
|
5
|
tmsg( "time: " . sprintf( "%dd %dh %dm %ds", ( gmtime( time - $self->_start_time ) )[ 7, 2, 1, 0 ] ), |
147
|
|
|
|
|
|
|
'main' ) |
148
|
|
|
|
|
|
|
if ( $self->verbose ); |
149
|
1
|
|
|
|
|
3
|
$self; |
150
|
|
|
|
|
|
|
} |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
sub enqueue { |
153
|
1
|
|
|
1
|
1
|
5
|
my ($self) = shift; |
154
|
|
|
|
|
|
|
|
155
|
1
|
|
|
|
|
5
|
$self->queue->enqueue(@_); |
156
|
1
|
|
|
|
|
80
|
lock($wait); |
157
|
1
|
|
|
|
|
26
|
cond_broadcast($wait); |
158
|
1
|
|
|
|
|
4
|
return $self; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
sub result { |
162
|
1
|
|
|
1
|
1
|
5
|
my ($self) = @_; |
163
|
|
|
|
|
|
|
|
164
|
1
|
50
|
|
|
|
12
|
$self->join |
165
|
|
|
|
|
|
|
unless ( $self->finished ); |
166
|
1
|
|
|
|
|
3
|
my $rq = $self->result_queue; |
167
|
1
|
|
|
|
|
2
|
my @results; |
168
|
1
|
|
|
|
|
3
|
while ( defined( my $item = $rq->dequeue_nb ) ) { |
169
|
5
|
|
|
|
|
49
|
push @results, unshared_clone($item); |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
|
172
|
1
|
50
|
33
|
|
|
15
|
if ( @results && @results > 0 ) { |
173
|
1
|
|
|
|
|
3
|
return \@results; |
174
|
|
|
|
|
|
|
} else { |
175
|
0
|
|
|
|
|
|
return; |
176
|
|
|
|
|
|
|
} |
177
|
|
|
|
|
|
|
} |
178
|
|
|
|
|
|
|
} |
179
|
|
|
|
|
|
|
|
180
|
0
|
|
|
0
|
1
|
|
sub tmsg { return $tmsg_sub->(@_); } |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
__PACKAGE__->meta->make_immutable; |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
1; |
185
|
|
|
|
|
|
|
__END__ |