| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | # $Id: Thread.pm,v 1.25 2010/03/27 19:51:24 dk Exp $ | 
| 2 |  |  |  |  |  |  | package IO::Lambda::Thread; | 
| 3 | 1 |  |  | 1 |  | 896 | use base qw(IO::Lambda); | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 105 |  | 
| 4 | 1 |  |  | 1 |  | 5 | use strict; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 22 |  | 
| 5 | 1 |  |  | 1 |  | 4 | use warnings; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 23 |  | 
| 6 | 1 |  |  | 1 |  | 5 | use Exporter; | 
|  | 1 |  |  |  |  | 13 |  | 
|  | 1 |  |  |  |  | 32 |  | 
| 7 | 1 |  |  | 1 |  | 898 | use Socket; | 
|  | 1 |  |  |  |  | 3843 |  | 
|  | 1 |  |  |  |  | 642 |  | 
| 8 | 1 |  |  | 1 |  | 913 | use IO::Handle; | 
|  | 1 |  |  |  |  | 6818 |  | 
|  | 1 |  |  |  |  | 46 |  | 
| 9 | 1 |  |  | 1 |  | 6 | use IO::Lambda qw(:all :dev swap_frame); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 1234 |  | 
| 10 |  |  |  |  |  |  |  | 
| 11 |  |  |  |  |  |  | our $DISABLED; | 
| 12 |  |  |  |  |  |  | eval { require threads; }; | 
| 13 |  |  |  |  |  |  | $DISABLED = $@ if $@; | 
| 14 |  |  |  |  |  |  |  | 
| 15 |  |  |  |  |  |  | our $DEBUG = $IO::Lambda::DEBUG{thread}; | 
| 16 |  |  |  |  |  |  |  | 
| 17 |  |  |  |  |  |  | our @EXPORT_OK = qw(threaded new_thread); | 
| 18 |  |  |  |  |  |  | our %EXPORT_TAGS = (all => \@EXPORT_OK); | 
| 19 |  |  |  |  |  |  |  | 
| 20 | 0 |  |  | 0 |  |  | sub _d { "threaded(" . _o($_[0]) . ")" } | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | sub thread_init | 
| 23 |  |  |  |  |  |  | { | 
| 24 | 0 |  |  | 0 | 0 |  | my ( $r, $cb, $pass_handle, @param) = @_; | 
| 25 |  |  |  |  |  |  |  | 
| 26 | 0 |  |  | 0 |  |  | $SIG{KILL} = sub { threads-> exit(0) }; | 
|  | 0 |  |  |  |  |  |  | 
| 27 | 0 |  |  |  |  |  | $SIG{PIPE} = 'IGNORE'; | 
| 28 | 0 |  |  |  |  |  | eval "END { IO::Lambda::__end(); };"; | 
| 29 | 0 | 0 |  |  |  |  | warn "thread(", threads->tid, ") started\n" if $DEBUG; | 
| 30 |  |  |  |  |  |  |  | 
| 31 | 0 |  |  |  |  |  | my @ret; | 
| 32 | 0 | 0 |  |  |  |  | eval { @ret = $cb-> (( $pass_handle ? $r : ()), @param) if $cb }; | 
|  | 0 | 0 |  |  |  |  |  | 
| 33 |  |  |  |  |  |  |  | 
| 34 | 0 | 0 |  |  |  |  | warn "thread(", threads->tid, ") ended: [@ret]\n" if $DEBUG; | 
| 35 | 0 |  |  |  |  |  | close($r); | 
| 36 | 0 |  |  |  |  |  | undef $r; | 
| 37 | 0 | 0 |  |  |  |  | die $@ if $@; | 
| 38 |  |  |  |  |  |  |  | 
| 39 | 0 |  |  |  |  |  | return @ret; | 
| 40 |  |  |  |  |  |  | } | 
| 41 |  |  |  |  |  |  |  | 
| 42 |  |  |  |  |  |  | sub new_thread | 
| 43 |  |  |  |  |  |  | { | 
| 44 | 0 | 0 |  | 0 | 1 |  | return undef, $DISABLED if $DISABLED; | 
| 45 |  |  |  |  |  |  |  | 
| 46 | 0 |  |  |  |  |  | my ( @args, $cb, $pass_handle, @param); | 
| 47 | 0 | 0 | 0 |  |  |  | @args = shift if $_[0] and ref($_[0]) and ref($_[0]) eq 'HASH'; | 
|  |  |  | 0 |  |  |  |  | 
| 48 | 0 |  |  |  |  |  | ( $cb, $pass_handle, @param) = @_; | 
| 49 |  |  |  |  |  |  |  | 
| 50 | 0 |  |  |  |  |  | my $r = IO::Handle-> new; | 
| 51 | 0 |  |  |  |  |  | my $w = IO::Handle-> new; | 
| 52 | 0 |  |  |  |  |  | socketpair( $r, $w, AF_UNIX, SOCK_STREAM, PF_UNSPEC); | 
| 53 | 0 |  |  |  |  |  | $w-> blocking(0); | 
| 54 |  |  |  |  |  |  |  | 
| 55 | 0 |  |  |  |  |  | my ($t) = threads-> create( | 
| 56 |  |  |  |  |  |  | @args, | 
| 57 |  |  |  |  |  |  | \&thread_init, | 
| 58 |  |  |  |  |  |  | $r, $cb, $pass_handle, @param | 
| 59 |  |  |  |  |  |  | ); | 
| 60 |  |  |  |  |  |  |  | 
| 61 | 0 |  |  |  |  |  | close($r); | 
| 62 |  |  |  |  |  |  |  | 
| 63 | 0 | 0 |  |  |  |  | warn "new thread(", $t->tid, ")\n" if $DEBUG; | 
| 64 | 0 |  |  |  |  |  | return ($t, $w); | 
| 65 |  |  |  |  |  |  | } | 
| 66 |  |  |  |  |  |  |  | 
| 67 |  |  |  |  |  |  | # overridden IO::Lambda methods | 
| 68 |  |  |  |  |  |  |  | 
| 69 |  |  |  |  |  |  | sub DESTROY | 
| 70 |  |  |  |  |  |  | { | 
| 71 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 72 |  |  |  |  |  |  |  | 
| 73 | 0 | 0 | 0 |  |  |  | return if defined($self->{tid}) and $self->{tid} != threads-> tid; | 
| 74 |  |  |  |  |  |  |  | 
| 75 | 0 | 0 |  |  |  |  | close($self->{socket}) if $self-> {socket}; | 
| 76 | 0 |  |  |  |  |  | delete @{$self}{qw(socket thread)}; | 
|  | 0 |  |  |  |  |  |  | 
| 77 |  |  |  |  |  |  |  | 
| 78 | 0 |  |  |  |  |  | $self-> SUPER::DESTROY; | 
| 79 |  |  |  |  |  |  | } | 
| 80 |  |  |  |  |  |  |  | 
| 81 | 0 |  |  | 0 | 1 |  | sub thread { $_[0]-> {thread} } | 
| 82 | 0 |  |  | 0 | 1 |  | sub socket { $_[0]-> {socket} } | 
| 83 |  |  |  |  |  |  |  | 
| 84 |  |  |  |  |  |  | sub threaded(&) | 
| 85 |  |  |  |  |  |  | { | 
| 86 | 0 |  |  | 0 | 1 |  | my $cb = shift; | 
| 87 |  |  |  |  |  |  |  | 
| 88 |  |  |  |  |  |  | # use overridden IO::Lambda, because we need | 
| 89 |  |  |  |  |  |  | # give the caller a chance to join | 
| 90 |  |  |  |  |  |  | # for it, if the lambda gets terminated | 
| 91 |  |  |  |  |  |  | __PACKAGE__-> new( sub { | 
| 92 |  |  |  |  |  |  | # Save context. This is needed because the caller | 
| 93 |  |  |  |  |  |  | # may have his own this. lambda(&) does the same | 
| 94 |  |  |  |  |  |  | # protection | 
| 95 | 0 |  |  | 0 |  |  | my $this  = shift; | 
| 96 | 0 |  |  |  |  |  | my @frame = swap_frame($this); | 
| 97 |  |  |  |  |  |  |  | 
| 98 | 0 | 0 |  |  |  |  | warn _d($this), " started\n" if $DEBUG; | 
| 99 |  |  |  |  |  |  |  | 
| 100 |  |  |  |  |  |  | # can start a thread? | 
| 101 | 0 |  |  |  |  |  | my ( $t, $r) = new_thread( $cb, 1 ); | 
| 102 | 0 | 0 |  |  |  |  | return $r unless $t; | 
| 103 |  |  |  |  |  |  |  | 
| 104 |  |  |  |  |  |  | # save this | 
| 105 | 0 |  |  |  |  |  | $this-> {tid}    = threads-> tid; | 
| 106 | 0 |  |  |  |  |  | $this-> {thread} = $t; | 
| 107 | 0 |  |  |  |  |  | $this-> {socket} = $r; | 
| 108 |  |  |  |  |  |  |  | 
| 109 |  |  |  |  |  |  | # now wait | 
| 110 | 0 |  |  |  |  |  | context $this-> {socket}; | 
| 111 |  |  |  |  |  |  | readable { | 
| 112 | 0 |  |  |  |  |  | my $this = this; | 
| 113 | 0 |  |  |  |  |  | delete $this-> {thread}; | 
| 114 | 0 |  |  |  |  |  | close($this-> {socket}); | 
| 115 | 0 |  |  |  |  |  | delete @{$this}{qw(socket thread)}; | 
|  | 0 |  |  |  |  |  |  | 
| 116 | 0 |  |  |  |  |  | $this-> clear; | 
| 117 | 0 | 0 |  |  |  |  | warn _d($this), " joining\n" if $DEBUG; | 
| 118 | 0 |  |  |  |  |  | $t-> join; | 
| 119 | 0 |  |  |  |  |  | }; | 
| 120 |  |  |  |  |  |  |  | 
| 121 |  |  |  |  |  |  | # restore context | 
| 122 | 0 |  |  |  |  |  | swap_frame(@frame); | 
| 123 | 0 |  |  |  |  |  | }); | 
| 124 |  |  |  |  |  |  | } | 
| 125 |  |  |  |  |  |  |  | 
| 126 |  |  |  |  |  |  | 1; | 
| 127 |  |  |  |  |  |  |  | 
| 128 |  |  |  |  |  |  | __DATA__ |