File Coverage

blib/lib/Bif/Sync/Server.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Bif::Sync::Server;
2 1     1   6 use strict;
  1         1  
  1         29  
3 1     1   7 use warnings;
  1         1  
  1         26  
4 1     1   4 use Bif::Mo;
  1         5  
  1         6  
5 1     1   77149 use Bif::Sync::Plugin::Identity;
  0            
  0            
6             use Bif::Sync::Plugin::Project;
7             use Bif::Sync::Plugin::Repo;
8             use Coro::Handle;
9             use DBIx::ThinSQL qw/sq/;
10             use Log::Any '$log';
11              
12             our $VERSION = '0.1.5_5';
13             extends 'Bif::Sync';
14              
15             has hub_id => ( is => 'rw', );
16              
17             # Names are reversed, so that the methods make sense from the server's
18             # point of view.
19              
20             my %METHODS = (
21             EXPORT => {
22             project => 'import_project',
23             },
24             IMPORT => {
25             hub => 'export_hub',
26             project => 'sync_project',
27             self => 'export_self',
28             },
29             SYNC => {
30             hub => 'sync_hub',
31             project => 'sync_project',
32             },
33             TRANSFER => {
34             hub_changes => 'real_transfer_hub_changes',
35             project_related_changes => 'real_transfer_project_related_changes',
36             },
37             QUIT => {},
38             );
39              
40             sub BUILD {
41             my $self = shift;
42             $self->hub_id( $self->db->get_local_hub_id );
43             }
44              
45             sub run {
46             my $self = shift;
47             $self->rh( Coro::Handle->new_from_fh( *STDIN, timeout => 30 ) );
48             $self->wh( Coro::Handle->new_from_fh(*STDOUT) );
49             $self->new_temp_table;
50              
51             while (1) {
52             my ( $action, $type, @rest ) = $self->read;
53              
54             if ( $action eq 'EOF' ) {
55             return;
56             }
57             elsif ( $action eq 'INVALID' ) {
58             next;
59             }
60             elsif ( $action eq 'QUIT' ) {
61             $self->write('Bye');
62             return;
63             }
64              
65             # TODO a VERSION check
66              
67             if ( !exists $METHODS{$action} ) {
68             $self->write( 'InvalidAction', 'Invalid Action: ' . $action );
69             next;
70             }
71              
72             if ( !$type ) {
73             $self->write( 'MissingType', 'missing [2] type' );
74             next;
75             }
76              
77             my $method = $METHODS{$action}->{$type};
78              
79             if ( !$self->can($method) ) {
80             $self->write( 'TypeNotImplemented',
81             'type not implemented: ' . $type );
82             next;
83             }
84              
85             my $response = eval {
86             $self->db->txn(
87             sub {
88             $self->$method(@rest);
89             }
90             );
91             };
92              
93             if ($@) {
94             $log->error($@);
95             $self->write( 'InternalServerError', 'Internal Server Error: ',
96             $action, $type, @rest );
97             next;
98             }
99              
100             if ( $response eq 'EOF' ) {
101             return;
102             }
103             elsif ( $response eq 'INVALID' ) {
104             next;
105             }
106             elsif ( $response eq 'QUIT' ) {
107             $self->write('Bye');
108             return;
109             }
110             }
111              
112             return;
113             }
114              
115             sub export_self {
116             my $self = shift;
117             my $db = $self->db;
118              
119             my ( $id, $uuid ) = $db->xlist(
120             select => [ 'bif.identity_id', 'n.uuid' ],
121             from => 'bifkv bif',
122             inner_join => 'nodes n',
123             on => 'n.id = bif.identity_id',
124             where => { 'bif.key' => 'self' },
125             );
126              
127             if ( !$uuid ) {
128             $self->write( 'SelfNotFound', 'self identity not found here' );
129             return 'SelfNotFound';
130             }
131              
132             $self->write( 'EXPORT', 'identity', $uuid );
133             return $self->real_export_identity($id);
134             }
135              
136             sub export_hub {
137             my $self = shift;
138             my $db = $self->db;
139              
140             my ( $id, $uuid ) = $db->xlist(
141             select => [ 'h.id', 'n.uuid' ],
142             from => 'bifkv b',
143             inner_join => 'hubs h',
144             on => 'h.id = b.hub_id',
145             inner_join => 'nodes n',
146             on => 'n.id = h.id',
147             where => { 'b.key' => 'local' },
148             );
149              
150             if ( !$uuid ) {
151             $self->write( 'HubNotFound', 'local hub not found' );
152             return 'HubNotFound';
153             }
154              
155             $self->write( 'EXPORT', 'hub', $uuid );
156             return $self->real_export_hub($id);
157             }
158              
159             sub sync_hub {
160             my $self = shift;
161             my $uuid = shift;
162             my $hash = shift;
163             my $db = $self->db;
164              
165             unless ( defined $uuid and defined $hash ) {
166             $self->write( 'ProtocolError', 'sync_hub($uuid,$hash)' );
167             return 'ProtocolError';
168             }
169              
170             my $hub = $db->xhashref(
171             select => [ 'h.id', 'n.uuid', 'h.hash' ],
172             from => 'nodes n',
173             inner_join => 'hubs h',
174             on => 'h.id = n.id',
175             where => { 'n.uuid' => $uuid },
176             );
177              
178             if ( !$hub ) {
179             $self->write( 'RepoNotFound', 'hub not found here' );
180             return 'RepoNotFound';
181             }
182              
183             $self->write( 'SYNC', 'hub', $hub->{uuid}, $hub->{hash} );
184              
185             return 'HubMatch' if $hub->{hash} eq $hash;
186             return $self->real_sync( 'hub', $hub->{id} );
187             }
188              
189             sub import_project {
190             my $self = shift;
191             my $uuid = shift;
192             my $path = shift;
193              
194             $self->write( 'ProtocolError', 'uuid/path is required' )
195             unless $uuid and $path;
196              
197             my $local = $self->db->xhashref(
198             select => [ 'p.id AS id', 'n2.uuid AS other_uuid', ],
199             from => '(select 1,2)',
200             left_join => 'nodes n',
201             on => { 'n.uuid' => $uuid },
202             left_join => 'nodes n2',
203             on => { 'n2.path' => $path },
204             limit => 1,
205             );
206              
207             if ( $local->{id} ) {
208             $self->write( 'ProjectFound', 'project exists' );
209             return 'ProjectFound';
210             }
211             elsif ( $local->{other_uuid} ) {
212             $self->write( 'PathExists', 'path is ' . $local->{other_uuid} );
213             return 'PathExists';
214             }
215              
216             $self->write( 'IMPORT', 'project', $uuid );
217             my $status = $self->real_import_project($uuid);
218              
219             $self->db->xdo(
220             update => 'projects',
221             set => 'local = 1',
222             where => {
223             id => sq(
224             select => 'n.id',
225             from => 'nodes n',
226             where => { 'n.uuid' => $uuid, },
227             ),
228             },
229             );
230              
231             return $status;
232             }
233              
234             sub sync_project {
235             my $self = shift;
236             my $uuid = shift;
237             my $hash = shift;
238             my $db = $self->db;
239              
240             unless ( defined $uuid and defined $hash ) {
241             $self->write( 'ProtocolError', 'sync_project($uuid,$hash)' );
242             return 'ProtocolError';
243             }
244              
245             my $pinfo = $self->db->xhashref(
246             select => [ 'n.id', 'p.hash' ],
247             from => 'nodes n',
248             inner_join => 'projects p',
249             on => 'p.id = n.id',
250             where => { 'n.uuid' => $uuid },
251             );
252              
253             if ( !$pinfo ) {
254             $self->write( 'ProjectNotFound', 'project not found: ' . $uuid );
255             return 'ProjectNotFound';
256             }
257              
258             $self->write( 'SYNC', 'project', $uuid, $pinfo->{hash} );
259              
260             return 'ProjectMatch' if $pinfo->{hash} eq $hash;
261             return $self->real_sync( 'project', $pinfo->{id} );
262             }
263              
264             sub disconnect {
265             my $self = shift;
266             $log->info('disconnect');
267             $self->rh->close;
268             $self->wh->close;
269             return;
270             }
271              
272             1;
273              
274             =head1 NAME
275              
276             =for bif-doc #perl
277              
278             Bif::Sync::Server - server for communication with a client
279