line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
=head1 NAME |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
Coro::Channel - message queues |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 SYNOPSIS |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
use Coro; |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
$q1 = new Coro::Channel ; |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
$q1->put ("xxx"); |
12
|
|
|
|
|
|
|
print $q1->get; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
die unless $q1->size; |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=head1 DESCRIPTION |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
A Coro::Channel is the equivalent of a unix pipe (and similar to amiga |
19
|
|
|
|
|
|
|
message ports): you can put things into it on one end and read things out |
20
|
|
|
|
|
|
|
of it from the other end. If the capacity of the Channel is maxed out |
21
|
|
|
|
|
|
|
writers will block. Both ends of a Channel can be read/written from by as |
22
|
|
|
|
|
|
|
many coroutines as you want concurrently. |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
You don't have to load C manually, it will be loaded |
25
|
|
|
|
|
|
|
automatically when you C |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=over 4 |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
=cut |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
package Coro::Channel; |
32
|
|
|
|
|
|
|
|
33
|
3
|
|
|
3
|
|
1185
|
use common::sense; |
|
3
|
|
|
|
|
9
|
|
|
3
|
|
|
|
|
15
|
|
34
|
|
|
|
|
|
|
|
35
|
3
|
|
|
3
|
|
149
|
use Coro (); |
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
52
|
|
36
|
3
|
|
|
3
|
|
626
|
use Coro::Semaphore (); |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
1138
|
|
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
our $VERSION = 6.514; |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
sub DATA (){ 0 } |
41
|
|
|
|
|
|
|
sub SGET (){ 1 } |
42
|
|
|
|
|
|
|
sub SPUT (){ 2 } |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
=item $q = new Coro:Channel $maxsize |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
Create a new channel with the given maximum size (practically unlimited |
47
|
|
|
|
|
|
|
if C is omitted or zero). Giving a size of one gives you a |
48
|
|
|
|
|
|
|
traditional channel, i.e. a queue that can store only a single element |
49
|
|
|
|
|
|
|
(which means there will be no buffering, and C will wait until there |
50
|
|
|
|
|
|
|
is a corresponding C call). To buffer one element you have to specify |
51
|
|
|
|
|
|
|
C<2>, and so on. |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=cut |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
sub new { |
56
|
|
|
|
|
|
|
# we cheat and set infinity == 2*10**9 |
57
|
3
|
|
50
|
3
|
|
151
|
bless [ |
58
|
|
|
|
|
|
|
[], # initially empty |
59
|
|
|
|
|
|
|
(Coro::Semaphore::_alloc 0), # counts data |
60
|
|
|
|
|
|
|
(Coro::Semaphore::_alloc +($_[1] || 2_000_000_000) - 1), # counts remaining space |
61
|
|
|
|
|
|
|
] |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
=item $q->put ($scalar) |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
Put the given scalar into the queue. |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=cut |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
sub put { |
71
|
12
|
|
|
12
|
1
|
104
|
push @{$_[0][DATA]}, $_[1]; |
|
12
|
|
|
|
|
32
|
|
72
|
12
|
|
|
|
|
38
|
Coro::Semaphore::up $_[0][SGET]; |
73
|
12
|
|
|
|
|
65
|
Coro::Semaphore::down $_[0][SPUT]; |
74
|
|
|
|
|
|
|
} |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
=item $q->get |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
Return the next element from the queue, waiting if necessary. |
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=cut |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub get { |
83
|
12
|
|
|
12
|
1
|
150
|
Coro::Semaphore::down $_[0][SGET]; |
84
|
12
|
|
|
|
|
51
|
Coro::Semaphore::up $_[0][SPUT]; |
85
|
12
|
|
|
|
|
20
|
shift @{$_[0][DATA]} |
|
12
|
|
|
|
|
33
|
|
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=item $q->shutdown |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
Shuts down the Channel by pushing a virtual end marker onto it: This |
91
|
|
|
|
|
|
|
changes the behaviour of the Channel when it becomes or is empty to return |
92
|
|
|
|
|
|
|
C, almost as if infinitely many C elements had been put |
93
|
|
|
|
|
|
|
into the queue. |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
Specifically, this function wakes up any pending C calls and lets |
96
|
|
|
|
|
|
|
them return C, the same on future C calls. C will return |
97
|
|
|
|
|
|
|
the real number of stored elements, though. |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
Another way to describe the behaviour is that C calls will not block |
100
|
|
|
|
|
|
|
when the queue becomes empty but immediately return C. This means |
101
|
|
|
|
|
|
|
that calls to C will work normally and the data will be returned on |
102
|
|
|
|
|
|
|
subsequent C calls. |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
This method is useful to signal the end of data to any consumers, quite |
105
|
|
|
|
|
|
|
similar to an end of stream on e.g. a tcp socket: You have one or more |
106
|
|
|
|
|
|
|
producers that C data into the Channel and one or more consumers who |
107
|
|
|
|
|
|
|
C them. When all producers have finished producing data, a call to |
108
|
|
|
|
|
|
|
C signals this fact to any consumers. |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
A common implementation uses one or more threads that C from |
111
|
|
|
|
|
|
|
a channel until it returns C. To clean everything up, first |
112
|
|
|
|
|
|
|
C the channel, then C the threads. |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
=cut |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
sub shutdown { |
117
|
0
|
|
|
0
|
1
|
|
Coro::Semaphore::adjust $_[0][SGET], 1_000_000_000; |
118
|
|
|
|
|
|
|
} |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
=item $q->size |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
Return the number of elements waiting to be consumed. Please note that: |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
if ($q->size) { |
125
|
|
|
|
|
|
|
my $data = $q->get; |
126
|
|
|
|
|
|
|
... |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
is I a race condition but instead works just fine. Note that the |
130
|
|
|
|
|
|
|
number of elements that wait can be larger than C<$maxsize>, as it |
131
|
|
|
|
|
|
|
includes any coroutines waiting to put data into the channel (but not any |
132
|
|
|
|
|
|
|
shutdown condition). |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
This means that the number returned is I the number of calls |
135
|
|
|
|
|
|
|
to C that will succeed instantly and return some data. Calling |
136
|
|
|
|
|
|
|
C has no effect on this number. |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
=cut |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
sub size { |
141
|
0
|
|
|
0
|
1
|
|
scalar @{$_[0][DATA]} |
|
0
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
# this is not undocumented by accident - if it breaks, you |
145
|
|
|
|
|
|
|
# get to keep the pieces |
146
|
|
|
|
|
|
|
sub adjust { |
147
|
0
|
|
|
0
|
0
|
|
Coro::Semaphore::adjust $_[0][SPUT], $_[1]; |
148
|
|
|
|
|
|
|
} |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
1; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
=back |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=head1 AUTHOR/SUPPORT/CONTACT |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
Marc A. Lehmann |
157
|
|
|
|
|
|
|
http://software.schmorp.de/pkg/Coro.html |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
=cut |
160
|
|
|
|
|
|
|
|