line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
#!perl
|
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
# ########################################################################## #
|
4
|
|
|
|
|
|
|
# Title: Data stream sink
|
5
|
|
|
|
|
|
|
# Creation date: 2007-03-05
|
6
|
|
|
|
|
|
|
# Author: Michael Zedeler
|
7
|
|
|
|
|
|
|
# Description: Receives data from data stream
|
8
|
|
|
|
|
|
|
# File: $Source: /data/cvs/lib/DSlib/lib/DS/Target.pm,v $
|
9
|
|
|
|
|
|
|
# Repository: kronhjorten
|
10
|
|
|
|
|
|
|
# State: $State: Exp $
|
11
|
|
|
|
|
|
|
# Documentation: inline
|
12
|
|
|
|
|
|
|
# Recepient: -
|
13
|
|
|
|
|
|
|
# #TODO Sources should not by default be constructed with an explicit typespec, since this may be derived from the data source
|
14
|
|
|
|
|
|
|
# ########################################################################## #
|
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
package DS::Target;
|
17
|
|
|
|
|
|
|
|
18
|
11
|
|
|
11
|
|
3413
|
use strict;
|
|
11
|
|
|
|
|
30
|
|
|
11
|
|
|
|
|
412
|
|
19
|
11
|
|
|
11
|
|
55
|
use Carp qw{ croak cluck confess carp };
|
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
767
|
|
20
|
11
|
|
|
11
|
|
937
|
use Carp::Assert;
|
|
11
|
|
|
|
|
1314
|
|
|
11
|
|
|
|
|
65
|
|
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
our ($VERSION) = $DS::VERSION;
|
23
|
|
|
|
|
|
|
our ($REVISION) = '$Revision: 1.2 $' =~ /(\d+\.\d+)/;
|
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
sub new {
|
27
|
15
|
|
|
15
|
1
|
86
|
my( $class, $source_type, $source ) = @_;
|
28
|
|
|
|
|
|
|
|
29
|
15
|
|
|
|
|
31
|
my $self = {};
|
30
|
15
|
|
|
|
|
35
|
bless $self, $class;
|
31
|
|
|
|
|
|
|
|
32
|
15
|
100
|
|
|
|
53
|
if( defined( $source_type ) ) {
|
33
|
8
|
|
|
|
|
47
|
$self->in_type( $source_type );
|
34
|
|
|
|
|
|
|
}
|
35
|
|
|
|
|
|
|
|
36
|
15
|
50
|
|
|
|
52
|
if( defined( $source ) ) {
|
37
|
0
|
|
|
|
|
0
|
$self->attach_source( $source );
|
38
|
|
|
|
|
|
|
}
|
39
|
|
|
|
|
|
|
|
40
|
15
|
|
|
|
|
74
|
return $self;
|
41
|
|
|
|
|
|
|
}
|
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# Receives row - invoked by source.
|
44
|
|
|
|
|
|
|
sub receive_row {
|
45
|
1
|
|
|
1
|
1
|
204
|
confess("This method is abstract. Don't call it. Override it.");
|
46
|
|
|
|
|
|
|
}
|
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub attach_source {
|
49
|
12
|
|
|
12
|
1
|
2147
|
my( $self, $source ) = @_;
|
50
|
|
|
|
|
|
|
|
51
|
12
|
|
|
|
|
25
|
my $result;
|
52
|
|
|
|
|
|
|
|
53
|
12
|
|
|
|
|
44
|
assert( $source, '$source should be defined' );
|
54
|
12
|
|
|
|
|
114
|
assert( $source->isa('DS::Source'), "ref(\$source) = " . ref( $source ) . " != DS::Source" );
|
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
# First break link with old source, if any
|
57
|
11
|
100
|
|
|
|
77
|
if( $self->{source} ) {
|
58
|
1
|
|
|
|
|
3
|
$self->{source}->{target} = undef;
|
59
|
|
|
|
|
|
|
}
|
60
|
11
|
50
|
|
|
|
61
|
if( $self->source( $source ) ) {
|
61
|
10
|
|
|
|
|
49
|
$source->target( $self );
|
62
|
10
|
|
|
|
|
15
|
$result = 1;
|
63
|
|
|
|
|
|
|
}
|
64
|
10
|
|
|
|
|
31
|
return $result;
|
65
|
|
|
|
|
|
|
}
|
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
# This is a primarily private method
|
68
|
|
|
|
|
|
|
# Important caveat: this method is just a field accessor method.
|
69
|
|
|
|
|
|
|
# Maintaining consistent links with source is handled by attach_source
|
70
|
|
|
|
|
|
|
sub source {
|
71
|
62
|
|
|
62
|
1
|
1003
|
my( $self, $source ) = @_;
|
72
|
|
|
|
|
|
|
|
73
|
62
|
|
|
|
|
67
|
my $result;
|
74
|
62
|
100
|
|
|
|
120
|
if( $source ) {
|
75
|
32
|
|
|
|
|
151
|
assert($source->isa('DS::Source'));
|
76
|
32
|
100
|
|
|
|
180
|
if( my $source_type = $source->out_type ) {
|
77
|
31
|
100
|
|
|
|
133
|
if( not $self->validate_source_type( $source_type ) ) {
|
78
|
3
|
|
|
|
|
3
|
my $target_fields = join(", ", keys %{$self->in_type->{fields}});
|
|
3
|
|
|
|
|
10
|
|
79
|
3
|
|
|
|
|
6
|
my $source_fields = join(", ", keys %{$source_type->{fields}});
|
|
3
|
|
|
|
|
9
|
|
80
|
3
|
|
|
|
|
445
|
croak("Validation of source ($source with fields $source_fields) for me ($self with fields $target_fields) failed.");
|
81
|
|
|
|
|
|
|
} #TODO Consider throwing a type incompatibility exception here
|
82
|
|
|
|
|
|
|
} else {
|
83
|
1
|
|
|
|
|
169
|
cluck("Type checking of stream from $source to $self skipped because source has no outgoing type specification.");
|
84
|
|
|
|
|
|
|
}
|
85
|
29
|
|
|
|
|
195
|
$self->{source} = $source;
|
86
|
29
|
|
|
|
|
49
|
$result = 1;
|
87
|
|
|
|
|
|
|
} else {
|
88
|
30
|
|
|
|
|
64
|
$result = $self->{source};
|
89
|
|
|
|
|
|
|
}
|
90
|
59
|
|
|
|
|
231
|
return $result;
|
91
|
|
|
|
|
|
|
}
|
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
sub validate_source_type {
|
94
|
20
|
|
|
20
|
1
|
27
|
my( $self, $source_type ) = @_;
|
95
|
20
|
|
|
|
|
28
|
my $result = 1;
|
96
|
|
|
|
|
|
|
# We accept if the sender of data are passing more fields to us, than we require
|
97
|
20
|
50
|
|
|
|
41
|
if( $self->in_type ) {
|
98
|
20
|
|
|
|
|
40
|
$result = $source_type->contains( $self->in_type );
|
99
|
|
|
|
|
|
|
} else {
|
100
|
0
|
|
|
|
|
0
|
cluck("Type checking of stream to $self skipped because $self has no ingoing type specification.");
|
101
|
|
|
|
|
|
|
}
|
102
|
20
|
|
|
|
|
196
|
return $result;
|
103
|
|
|
|
|
|
|
}
|
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
sub in_type {
|
106
|
75
|
|
|
75
|
1
|
1447
|
my( $self, $type ) = @_;
|
107
|
|
|
|
|
|
|
|
108
|
75
|
|
|
|
|
85
|
my $result;
|
109
|
75
|
100
|
|
|
|
133
|
if( defined( $type ) ) {
|
110
|
22
|
|
|
|
|
144
|
assert($type->isa('DS::TypeSpec'));
|
111
|
21
|
|
|
|
|
147
|
$self->{in_type} = $type;
|
112
|
|
|
|
|
|
|
} else {
|
113
|
53
|
|
|
|
|
75
|
$result = $self->{in_type};
|
114
|
|
|
|
|
|
|
}
|
115
|
74
|
|
|
|
|
221
|
return $result;
|
116
|
|
|
|
|
|
|
}
|
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
1;
|
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
__END__
|