File Coverage

blib/lib/Bif/Sync/Plugin/Identity.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package Bif::Sync::Plugin::Identity;
2 1     1   5 use strict;
  1         2  
  1         34  
3 1     1   4 use warnings;
  1         2  
  1         23  
4 1     1   535 use Coro;
  0            
  0            
5             use DBIx::ThinSQL qw/qv sq/;
6             use Log::Any '$log';
7              
8             our $VERSION = '0.1.5_7';
9              
10             my %import_functions = (
11             CHANGESET => {},
12             QUIT => {},
13             CANCEL => {},
14             );
15              
16             my $identity_functions = {
17             entity_contact_method_delta => 'func_import_update_entity_contact_method',
18             entity_contact_method => 'func_import_new_entity_contact_method',
19             entity_delta => 'func_import_update_entity',
20             entity => 'func_import_new_entity',
21             identity_delta => 'func_import_update_identity',
22             identity => 'func_import_new_identity',
23             node => 'func_import_new_node',
24             change_delta => 'func_import_update_change',
25             change => 'func_import_new_change',
26             };
27              
28             sub Bif::Sync::real_import_identity {
29             my $self = shift;
30             my $result = $self->recv_changesets($identity_functions);
31             return 'IdentityImported' if $result eq 'RecvChangesets';
32             return $result;
33             }
34              
35             sub Bif::Sync::real_sync_identity {
36             my $self = shift;
37             my $id = shift || die caller;
38             my $prefix = shift;
39             my $tmp = $self->temp_table;
40              
41             $prefix = '' unless defined $prefix;
42             my $prefix2 = $prefix . '_';
43             my $db = $self->db;
44              
45             $self->trigger_on_update( 'matching: ' . $prefix2 );
46              
47             my @refs = $db->xarrayrefs(
48             select => [qw/rm.prefix rm.hash/],
49             from => 'self_related_changes_merkle rm',
50             where =>
51             [ 'rm.self_id = ', qv($id), ' AND rm.prefix LIKE ', qv($prefix2) ],
52             );
53              
54             my $here = { map { $_->[0] => $_->[1] } @refs };
55             $self->write( 'MATCH', $prefix2, $here );
56             my ( $action, $mprefix, $there ) = $self->read;
57              
58             return "expected MATCH $prefix2 {} (not $action $mprefix ...)"
59             unless $action eq 'MATCH'
60             and $mprefix eq $prefix2
61             and ref $there eq 'HASH';
62              
63             my @next;
64             my @missing;
65              
66             while ( my ( $k, $v ) = each %$here ) {
67             if ( !exists $there->{$k} ) {
68             push( @missing, $k );
69             }
70             elsif ( $there->{$k} ne $v ) {
71             push( @next, $k );
72             }
73             }
74              
75             if (@missing) {
76             my @where;
77             foreach my $miss (@missing) {
78             push( @where, ' OR ' ) if @where;
79             push( @where, "c.uuid LIKE ", qv( $miss . '%' ) ),;
80             }
81              
82             $self->db->xdo(
83             insert_into => "$tmp(id,ucount)",
84             select => [ 'c.id', 'c.ucount' ],
85             from => 'changes c',
86             inner_join => 'self_related_changes src',
87             on => {
88             'src.change_id' => \'c.id',
89             'src.self_id' => $id,
90             },
91             where => \@where,
92             );
93             }
94              
95             if (@next) {
96             foreach my $next ( sort @next ) {
97             $self->real_sync_identity( $id, $next, $tmp );
98             }
99             }
100              
101             return unless $prefix eq '';
102             return 'IdentitySync';
103             }
104              
105             sub Bif::Sync::real_transfer_identity_changes {
106             my $self = shift;
107             my $tmp = $self->temp_table;
108              
109             my $fh = select;
110             my $send = async {
111             select $fh;
112              
113             my $total = $self->db->xval(
114             select => 'COALESCE(sum(t.ucount), 0)',
115             from => "$tmp t",
116             );
117              
118             $self->changes_tosend( $self->changes_tosend + $total );
119             $self->write( 'TOTAL', $total );
120              
121             my $change_list = $self->db->xprepare(
122             select => [
123             'c.id', 'c.uuid',
124             'p.uuid AS parent_uuid', 'n.uuid AS identity_uuid',
125             'c.mtime', 'c.mtimetz',
126             'c.author', 'c.author_contact',
127             'c.author_contact_method', 'c.author_shortname',
128             'c.lang', 'c.message',
129             'c.action', 'c.ucount',
130             ],
131             from => "$tmp tmp",
132             inner_join => 'changes c',
133             on => 'c.id = tmp.id',
134             left_join => 'nodes n',
135              
136             # Don't fetch the identity_uuid for the first identity
137             # change
138             on => 'n.id = c.identity_id AND n.first_change_id != c.id',
139             left_join => 'changes p',
140             on => 'p.id = c.parent_id',
141             order_by => 'c.id ASC',
142             );
143              
144             $change_list->execute;
145             return $self->send_identity_changes( $change_list, $total );
146             };
147              
148             my $r1 = $self->recv_identity_deltas;
149             my $r2 = $send->join;
150              
151             $self->db->xdo( delete_from => $tmp );
152             $self->db->xdo(
153             insert_into => 'func_merge_changes',
154             values => { merge => 1 },
155             );
156              
157             if ( $r1 =~ m/^\d+$/ ) {
158             $self->write( 'Recv', $r1 );
159             my ( $recv, $count ) = $self->read;
160             return 'TransferIdentityChanges' if $recv eq 'Recv' and $count == $r2;
161             $log->debug("MEH: $count $r2");
162             return $recv;
163             }
164              
165             $self->write( 'ProtocolError', $r1 );
166             return $r1;
167             }
168              
169             sub Bif::Sync::real_export_identity {
170             my $self = shift;
171             my $id = shift;
172              
173             my $total = $self->db->xval(
174             select => 'COUNT(eru.change_id)',
175             from => 'entity_related_changes eru',
176             where => { 'eru.entity_id' => $id },
177             );
178              
179             my $recv = $self->send_changesets(
180             $total,
181             [
182             select => 'eru.change_id AS id',
183             from => 'entity_related_changes eru',
184             where => { 'eru.entity_id' => $id },
185             order_by => 'eru.change_id ASC',
186             ]
187             );
188              
189             return 'IdentityExported' if $recv eq 'ChangesetsSent';
190             return $recv;
191             }
192              
193             1;
194              
195             =head1 NAME
196              
197             =for bif-doc #perl
198              
199             Bif::Sync::Plugin::Identity - synchronisation plugin for identities
200