File Coverage

lib/App/dupfind/Threaded/ThreadManagement.pm
Criterion Covered Total %
statement 11 14 78.5
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 16 19 84.2


line stmt bran cond sub pod time code
1             # ABSTRACT: Thread management logic, abstracted safely away in its own namespace
2              
3 9     9   4504 use strict;
  9         16  
  9         269  
4 9     9   41 use warnings;
  9         15  
  9         394  
5              
6             package App::dupfind::Threaded::ThreadManagement;
7             {
8             $App::dupfind::Threaded::ThreadManagement::VERSION = '0.140230'; # TRIAL
9             }
10              
11 9     9   147 use 5.010;
  9         36  
  9         873  
12              
13             BEGIN
14             {
15 9     9   547 $SIG{TERM} = $SIG{INT} = sub { $_->kill( 'KILL' ) for threads->list };
  0         0  
16             }
17              
18 9     9   8679 use threads;
  0            
  0            
19             use threads::shared;
20              
21             our $counter :shared = 0;
22             our $term_flag :shared = 0;
23             our $init_flag :shared = 0;
24             our $mapped = &share( {} );
25              
26             use Moo;
27              
28             use Thread::Queue;
29             use Time::HiRes 'usleep';
30              
31             use lib 'lib';
32              
33             extends 'App::dupfind';
34              
35             with 'App::dupfind::Threaded::Overrides';
36              
37             has work_queue => ( is => 'rw', default => sub { Thread::Queue->new } );
38              
39             before threads_progress => sub { require Term::ProgressBar };
40              
41              
42             sub mapped { $mapped }
43              
44             sub counter { $counter }
45              
46             sub reset_all
47             {
48             my $self = shift;
49              
50             $self->reset_queue;
51              
52             $self->clear_counter;
53              
54             $self->reset_mapped;
55              
56             $self->init_flag( 0 );
57              
58             $self->term_flag( 0 );
59             }
60              
61             sub reset_queue { shift->work_queue( Thread::Queue->new ) };
62              
63             sub clear_counter { lock $counter; $counter = 0; return $counter; }
64              
65             sub reset_mapped { $mapped = &share( {} ); $mapped; }
66              
67             sub increment_counter { lock $counter; return ++$counter; }
68              
69             sub term_flag
70             {
71             shift;
72              
73             if ( @_ ) { lock $term_flag; $term_flag = shift; }
74              
75             return $term_flag
76             }
77              
78             sub init_flag
79             {
80             shift;
81              
82             if ( @_ ) { lock $init_flag; $init_flag = shift; }
83              
84             return $init_flag
85             }
86              
87             sub push_mapped
88             {
89             my ( $self, $key, @vals ) = @_;
90              
91             lock $mapped;
92              
93             $mapped->{ $key } ||= &share( [] );
94              
95             push @{ $mapped->{ $key } }, @vals;
96              
97             return $mapped;
98             }
99              
100             sub delete_mapped
101             {
102             my ( $self, @keys ) = @_;
103              
104             lock $mapped;
105              
106             delete $mapped->{ $_ } for @keys;
107              
108             return $mapped;
109             }
110              
111             sub create_thread_pool
112             {
113             my ( $self, $map_code, $dup_count ) = @_;
114              
115             $self->init_flag( 1 );
116              
117             threads->create( threads_progress => $self => $dup_count )
118             if $self->opts->{progress};
119              
120             for ( 1 .. $self->opts->{threads} )
121             {
122             # $map coderef is responsible for calling $self->increment_counter!
123              
124             threads->create( $map_code );
125             }
126             }
127              
128             sub end_wait_thread_pool
129             {
130             my $self = shift;
131              
132             $self->term_flag( 1 );
133              
134             $self->work_queue->end;
135              
136             $_->join for threads->list;
137             }
138              
139             sub threads_progress
140             {
141             my ( $self, $task_item_count ) = @_;
142              
143             my $last_update = 0;
144              
145             my $threads_progress = Term::ProgressBar->new
146             (
147             {
148             name => ' ...PROGRESS',
149             count => $task_item_count,
150             remove => 1,
151             }
152             );
153              
154             while ( !$self->term_flag )
155             {
156             usleep 1000; # sleep for 1 millisecond
157              
158             $threads_progress->update( $self->counter )
159             if $self->counter > $last_update;
160              
161             last if $self->counter == $task_item_count;
162              
163             $last_update = $self->counter;
164             }
165              
166             $threads_progress->update( $task_item_count );
167             }
168              
169             __PACKAGE__->meta->make_immutable;
170              
171             1;
172              
173             __END__