File Coverage

blib/lib/AnyEvent/Filesys/Watcher/ReadDirectoryChanges/Queue.pm
Criterion Covered Total %
statement 42 43 97.6
branch 6 12 50.0
condition n/a
subroutine 11 11 100.0
pod 5 5 100.0
total 64 71 90.1


line stmt bran cond sub pod time code
1             package AnyEvent::Filesys::Watcher::ReadDirectoryChanges::Queue;
2              
3 2     2   4063 use strict;
  2         5  
  2         87  
4              
5             # Once this module works it should be inlined with the MS-DOS backend because
6             # it is only relevant there. For the time being, ship it separately, so that
7             # it can be tested independently.
8              
9 2     2   12 use Locale::TextDomain ('AnyEvent-Filesys-Watcher');
  2         4  
  2         25  
10 2     2   1966 use Thread::Queue 3.13;
  2         14272  
  2         74  
11 2     2   1388 use Socket;
  2         10183  
  2         1275  
12 2     2   1241 use IO::Handle;
  2         15380  
  2         120  
13 2     2   1155 use IO::Select;
  2         3893  
  2         1123  
14              
15             sub new {
16 1     1 1 663 my ($class, @args) = @_;
17              
18 1         10 my $q = Thread::Queue->new(@args);
19              
20             # You cannot do a select on an anonymous pipe on MS-DOS. But sockets
21             # seem to work.
22 1 50       193 socketpair my $rh, my $wh, AF_UNIX, SOCK_STREAM, PF_UNSPEC
23             or die __x("cannot create pipe: {error}", error => $!);
24 1         15 shutdown $rh, 1;
25 1         6 shutdown $wh, 0;
26 1         11 $rh = IO::Handle->new_from_fd($rh, 'r');
27 1         137 $wh = IO::Handle->new_from_fd($wh, 'w');
28 1         112 $rh->autoflush(1);
29 1         67 $wh->autoflush(1);
30              
31 1 50       38 if ($q->pending) {
32 0 0       0 $wh->print('1')
33             or die __x("cannot write to pipe: {error}", error => $!);
34             }
35              
36             bless {
37 1         27 __q => $q,
38             __rh => $rh,
39             __wh => $wh,
40             }, $class;
41             }
42              
43             sub handle {
44 1     1 1 1003 shift->{__rh};
45             }
46              
47             sub enqueue {
48 2     2 1 2433 my ($self, @items) = @_;
49              
50 2         33 $self->{__q}->enqueue(@items);
51 2 50       170 if ($self->{__q}->pending) {
52 2 50       32 $self->{__wh}->print('1')
53             or die __x("cannot write to pipe: {error}", error => $!);
54             }
55              
56 2         93 return $self;
57             }
58              
59             sub dequeue {
60 2     2 1 3107 my ($self, @args) = @_;
61              
62 2         12 my @items = $self->{__q}->dequeue(@args);
63 2 100       195 if (!$self->{__q}->pending) {
64             # Maybe it is better to set the handle to non-blocking instead of doing
65             # a select()? Whatever is more portable.
66 1         20 while (IO::Select->new($self->{__rh})->can_read(0)) {
67 1         129 $self->{__rh}->getc;
68             }
69             }
70              
71 2         146 return @items;
72             }
73              
74             sub pending {
75 4     4 1 3076 shift->{__q}->pending;
76             }
77              
78             1;