File Coverage

blib/lib/Bif/Sync/Client.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Bif::Sync::Client;
2 41     41   226 use strict;
  41         93  
  41         1129  
3 41     41   232 use warnings;
  41         89  
  41         1351  
4 41     41   257 use feature 'state';
  41         298  
  41         3815  
5 41     41   224 use AnyEvent;
  41         89  
  41         911  
6 41     41   218 use Bif::Mo;
  41         84  
  41         360  
7 41     41   24489 use Coro::Handle;
  0            
  0            
8             use Log::Any '$log';
9             use Sys::Cmd qw/spawn/;
10              
11             our $VERSION = '0.1.5_7';
12             extends 'Bif::Sync';
13              
14             has name => ( required => 1, );
15              
16             has location => ( is => 'ro', );
17              
18             has hub_id => (
19             is => 'rw',
20             default => sub {
21             my $self = shift;
22             my $hub_id = $self->db->xval(
23             select => 'h.id',
24             from => 'hubs h',
25             where => { 'h.location' => $self->location },
26             );
27             return $hub_id;
28             },
29             );
30              
31             has child => ( is => 'rw' );
32              
33             has child_watcher => ( is => 'rw' );
34              
35             has stderr_watcher => ( is => 'rw' );
36              
37             has debug_bifsync => ( is => 'ro' );
38              
39             has rh => ( is => 'rw' );
40              
41             has wh => ( is => 'rw' );
42              
43             sub BUILD {
44             my $self = shift;
45             $self->trigger_on_update('connecting...');
46              
47             if ( $self->location =~ m!^ssh://(.+)! ) {
48             $self->child(
49             spawn( 'ssh', $1, 'bifsync', $self->debug_bifsync ? '--debug' : (),
50             )
51             );
52             }
53             else {
54             my @bifsync;
55             if ($main::BIF_BIFSYNC) { # set by t/lib/Test/Bif.pm (for Win32 tests)
56             @bifsync = ( $^X, $main::BIF_BIFSYNC );
57             }
58             elsif ( $^O eq 'MSWin32' ) { # Assume bifsync next to bif
59             @bifsync = ( $^X, Path::Tiny::path($0)->parent->child('bifsync') );
60             }
61             else {
62             @bifsync = ('bifsync'); # rest of world can use PATH
63             }
64              
65             $self->child(
66             spawn(
67             @bifsync, $self->debug_bifsync
68             ? '--debug'
69             : (),
70             $self->location
71             )
72             );
73             }
74              
75             $self->child_watcher(
76             AE::child $self->child->pid,
77             sub {
78             $self->on_error->('child process ended unexpectedly');
79             }
80             );
81              
82             my $stderr = $self->child->stderr;
83             my $name = $self->name;
84             my $debug = $self->debug;
85              
86             $self->stderr_watcher(
87             AE::io $stderr,
88             0,
89             sub {
90             # This doesn't need to be every time, but I can't find
91             # where Coro is setting default fh back to 1
92             state $once = select $App::bif::pager->fh if $App::bif::pager;
93              
94             my $line = $stderr->getline;
95             if ( !defined $line ) {
96             $self->stderr_watcher(undef);
97             return;
98             }
99              
100             # This needs to be the same in the cleanup_errors code if
101             # you don't want errors going to fd2...
102             if ($debug) {
103             $line =~ s!\n$!!;
104             $log->debug("$name: $line");
105             }
106             else {
107             warn "$name: $line";
108             }
109             }
110             );
111              
112             $self->rh(
113             Coro::Handle->new_from_fh( $self->child->stdout, timeout => 30 ) );
114             $self->wh(
115             Coro::Handle->new_from_fh( $self->child->stdin, timeout => 30 ) );
116              
117             $self->new_temp_table;
118             return;
119             }
120              
121             sub trigger_on_update {
122             my $self = shift;
123              
124             if ( my $msg = shift ) {
125             return $self->on_update->( $self, $msg );
126             }
127              
128             if ( $self->changes_tosend ) {
129             if ( $self->changes_torecv ) {
130             $self->on_update->(
131             $self,
132             'sent: '
133             . ( $self->changes_sent // '' ) . '/'
134             . $self->changes_tosend
135             . ' received: '
136             . ( $self->changes_recv // '' ) . '/'
137             . $self->changes_torecv
138             . ' duplicates: '
139             . $self->changes_dup
140             );
141             }
142             else {
143             $self->on_update->(
144             $self,
145             'sent: '
146             . ( $self->changes_sent // '' ) . '/'
147             . $self->changes_tosend
148             );
149             }
150             }
151             elsif ( $self->changes_torecv ) {
152             $self->on_update->(
153             $self,
154             'received: '
155             . ( $self->changes_recv // '' ) . '/'
156             . $self->changes_torecv
157             . ' duplicates: '
158             . $self->changes_dup
159             );
160             }
161             else {
162             $self->on_update->( $self, 'no changes' );
163             }
164             }
165              
166             sub bootstrap_identity {
167             my $self = shift;
168              
169             $self->write( 'IMPORT', 'self' );
170              
171             my ( $action, $type, $uuid ) = $self->read;
172             return $action
173             unless ( $action eq 'EXPORT' and $type eq 'identity', and $uuid );
174              
175             require Bif::Sync::Plugin::Identity;
176              
177             my $status = $self->real_import_identity;
178             return $status unless $status eq 'IdentityImported';
179              
180             my $dbw = $self->db;
181             my ( $iid, $uid ) = $dbw->xlist(
182             select => [ 'n.id', 'n.first_change_id' ],
183             from => 'nodes n',
184             inner_join => 'identities i',
185             on => 'i.id = n.id',
186             where => { 'n.uuid' => $uuid, },
187             );
188              
189             return 'IdentityNotImported' unless $iid;
190              
191             $dbw->xdo(
192             insert_into => 'bifkv',
193             values => { key => 'self', identity_id => $iid },
194             );
195              
196             return $status;
197             }
198              
199             sub pull_hub {
200             my $self = shift;
201             my $name = shift;
202              
203             $self->write( 'IMPORT', 'hub' );
204              
205             my ( $action, $type, $uuid ) = $self->read;
206             if ( $action eq 'EXPORT' and $type eq 'hub' ) {
207             return 'NoUUID' unless $uuid;
208             require Bif::Sync::Plugin::Repo;
209             return $self->real_import_hub($uuid);
210             }
211             return $action;
212             }
213              
214             sub sync_hub {
215             my $self = shift;
216             my $id = shift || die 'sync_hub($id)';
217              
218             my $hub = $self->db->xhashref(
219             select => [ 'n.uuid', 'h.hash' ],
220             from => 'hubs h',
221             inner_join => 'nodes n',
222             on => 'n.id = h.id',
223             where => { 'h.id' => $id },
224             );
225              
226             $self->write( 'SYNC', 'hub', $hub->{uuid}, $hub->{hash} );
227             my ( $action, $type, $uuid, $hash ) = $self->read;
228              
229             return 'ProtocolError'
230             unless defined $action && $action eq 'SYNC'
231             and defined $type && $type eq 'hub'
232             and defined $uuid && $uuid eq $hub->{uuid}
233             and defined $hash;
234              
235             if ( $hash eq $hub->{hash} ) {
236             $self->trigger_on_update('no changes');
237             return 'HubMatch';
238             }
239              
240             require Bif::Sync::Plugin::Repo;
241             return $self->real_sync( 'hub', $id );
242             }
243              
244             sub transfer_hub_changes {
245             my $self = shift;
246             require Bif::Sync::Plugin::Repo;
247              
248             $self->write( 'TRANSFER', 'hub_changes' );
249             return $self->real_transfer_hub_changes;
250             }
251              
252             sub sync_project {
253             my $self = shift;
254             my $id = shift || die 'sync_project($id)';
255              
256             my $pinfo = $self->db->xhashref(
257             select => [ 'n.uuid', 'p.hash', 'n.id' ],
258             from => 'projects p',
259             inner_join => 'nodes n',
260             on => 'n.id = p.id',
261             where => {
262             'p.id' => $id,
263             },
264             );
265              
266             $self->write( 'SYNC', 'project', $pinfo->{uuid}, $pinfo->{hash} );
267             my ( $action, $type, $uuid, $hash ) = $self->read;
268              
269             return 'ProtocolError'
270             unless defined $action && $action eq 'SYNC'
271             and defined $type && $type eq 'project'
272             and defined $uuid && $uuid eq $pinfo->{uuid}
273             and defined $hash;
274              
275             if ( $hash eq $pinfo->{hash} ) {
276             $self->trigger_on_update('no changes');
277             return 'ProjectMatch';
278             }
279              
280             require Bif::Sync::Plugin::Project;
281             return $self->real_sync( 'project', $id );
282             }
283              
284             sub transfer_project_related_changes {
285             my $self = shift;
286             require Bif::Sync::Plugin::Project;
287              
288             $self->write( 'TRANSFER', 'project_related_changes' );
289             return $self->real_transfer_project_related_changes;
290             }
291              
292             sub export_project {
293             my $self = shift;
294             my $pinfo = shift;
295              
296             $self->write( 'EXPORT', 'project', $pinfo->{uuid}, $pinfo->{path} );
297              
298             my ( $action, $type ) = $self->read;
299             if ( $action eq 'IMPORT' and $type eq 'project' ) {
300             require Bif::Sync::Plugin::Project;
301             return $self->real_export_project( $pinfo->{id} );
302             }
303             return $action;
304             }
305              
306             sub cleanup_errors {
307             my $self = shift;
308             return unless $self->stderr_watcher;
309             $self->stderr_watcher(undef);
310             return if $^O eq 'MSWin32';
311              
312             my $name = $self->name;
313             my $child = $self->child or return;
314             my $stderr = $child->stderr or return;
315              
316             $stderr->blocking(0);
317              
318             my $debug = $self->debug;
319             while ( my $line = $stderr->getline ) {
320             if ($debug) {
321             $line =~ s!\n$!!;
322             $log->debug("$name: $line");
323             }
324             else {
325             warn "$name: $line";
326             }
327             }
328              
329             return;
330             }
331              
332             sub disconnect {
333             my $self = shift;
334             $self->cleanup_errors;
335              
336             $self->write('QUIT') if $self->child_watcher;
337             $self->child_watcher(undef);
338              
339             return unless my $child = $self->child;
340             $child->close;
341             $child->wait_child;
342             $self->child(undef);
343              
344             return;
345             }
346              
347             sub DESTROY {
348             my $self = shift;
349             $self->disconnect;
350             }
351              
352             1;
353              
354             __END__