File Coverage

blib/lib/Convos/Core.pm
Criterion Covered Total %
statement 24 231 10.3
branch 0 56 0.0
condition 3 13 23.0
subroutine 8 46 17.3
pod 10 10 100.0
total 45 356 12.6


line stmt bran cond sub pod time code
1             package Convos::Core;
2              
3             =head1 NAME
4              
5             Convos::Core - TODO
6              
7             =head1 SYNOPSIS
8              
9             TODO
10              
11             =cut
12              
13 36     36   145 use Mojo::Base -base;
  36         55  
  36         220  
14 36     36   4076 use Mojo::JSON 'j';
  36         56  
  36         1474  
15 36     36   155 use Mojolicious::Validator;
  36         48  
  36         225  
16 36     36   16000 use Convos::Core::Connection;
  36         91  
  36         439  
17 36     36   1307 use Convos::Core::Util qw( as_id id_as );
  36         81  
  36         1983  
18 36     36   197 use Time::HiRes qw( time );
  36         48  
  36         280  
19 36   100 36   4597 use constant CONNECT_INTERVAL => $ENV{CONVOS_CONNECT_INTERVAL} || 2;
  36         49  
  36         2348  
20 36   50 36   145 use constant DEBUG => $ENV{CONVOS_DEBUG} // 0;
  36         44  
  36         104944  
21              
22             =head1 ATTRIBUTES
23              
24             =head2 archive
25              
26             Holds a L object.
27              
28             =head2 log
29              
30             Holds a L object.
31              
32             =head2 redis
33              
34             Holds a L object.
35              
36             =cut
37              
38             has archive => sub { require Convos::Archive::File; Convos::Archive::File->new; };
39             has log => sub { Mojo::Log->new };
40             has redis => sub { die 'redis connection required in constructor' };
41              
42             =head1 METHODS
43              
44             =head2 control
45              
46             $self->control($command, $cb);
47              
48             Used to issue a control command.
49              
50             =cut
51              
52             sub control {
53 0     0 1   my ($self, @args) = @_;
54 0           my $cb = pop @args;
55              
56 0           $self->redis->lpush('core:control', join(':', @args), $cb);
57 0           $self;
58             }
59              
60             =head2 start
61              
62             Will fetch connection information from the database and try to connect to them.
63              
64             =cut
65              
66             sub start {
67 0     0 1   my $self = shift;
68              
69 0 0         die "Convos::Core is already started" if $self->{start}++;
70              
71             # TODO: Remove in future versions and/or move to Convos::Upgrader
72             $self->redis->del($_)
73 0           for qw( convos:backend:lock convos:backend:pid convos:backend:started convos:host2convos convos:loopback:names );
74              
75 0           $self->redis->del('core:control'); # need to clear instructions queued while backend was stopped
76 0           $self->_start_control_channel;
77              
78 0           Scalar::Util::weaken($self);
79             Mojo::IOLoop->delay(
80             sub {
81 0     0     my ($delay) = @_;
82 0           $self->redis->smembers('connections', $delay->begin);
83             },
84             sub {
85 0     0     my ($delay, $connections) = @_;
86 0           for my $id (@$connections) {
87 0           $self->_connection($id)->_state('disconnected')->{core_connect_timer} = 1;
88             }
89             },
90 0           );
91              
92             $self->{connect_tid} ||= Mojo::IOLoop->recurring(
93             CONNECT_INTERVAL,
94             sub {
95 0     0     for my $conn (values %{$self->{connections}}) {
  0            
96 0 0         next if --$conn->{core_connect_timer};
97 0           $conn->connect;
98 0           last;
99             }
100             }
101 0   0       );
102              
103 0           return $self;
104             }
105              
106             sub _start_control_channel {
107 0     0     my $self = shift;
108 0           my $cb;
109              
110 0           Scalar::Util::weaken($self);
111              
112             $cb = sub {
113 0     0     my ($redis, $instruction) = @_;
114 0           $redis->brpop($instruction->[0], 0, $cb);
115 0 0         $instruction->[1] or return;
116 0           my ($command, $login, $name) = split /:/, $instruction->[1];
117 0           my $action = "ctrl_$command";
118 0           $self->$action($login, $name);
119 0           };
120              
121 0           $self->{control} = Mojo::Redis->new(server => $self->redis->server);
122 0           $self->{control}->$cb(['core:control']);
123             $self->{control}->on(
124             error => sub {
125 0     0     my ($redis, $error) = @_;
126 0           $self->log->error("[core:control] Stopping Mojo::IOLoop on 'core:control' error: $error ");
127 0           Mojo::IOLoop->stop;
128             },
129 0           );
130             }
131              
132             =head2 add_connection
133              
134             $self->add_connection({
135             login => $str,
136             name => $str,
137             nick => $str,
138             server => $str, # irc_server[:port]
139             }, $callback);
140              
141             Add a new connection to redis. Will create a new connection id and
142             set all the keys in the %connection hash
143              
144             =cut
145              
146             sub add_connection {
147 0     0 1   my ($self, $input, $cb) = @_;
148 0           my $validation = $self->_validation($input, qw( login name nick password server username ));
149              
150 0 0         if ($validation->has_error) {
151 0           $self->$cb($validation, undef);
152 0           return $self;
153             }
154              
155 0           my ($login, $name) = $validation->param([qw( login name )]);
156              
157 0           warn "[core:$login] add ", _dumper($validation->output), "\n" if DEBUG;
158 0           Scalar::Util::weaken($self);
159             Mojo::IOLoop->delay(
160             sub {
161 0     0     my ($delay) = @_;
162 0           $self->redis->exists("user:$login:connection:$name", $delay->begin);
163             },
164             sub {
165 0     0     my ($delay, $exists) = @_;
166              
167 0 0         if ($exists) {
168 0           $validation->error(name => ['exists']);
169 0           $self->$cb($validation, undef);
170 0           return;
171             }
172              
173             $self->redis->execute(
174 0           [sadd => "connections", "$login:$name"],
175             [sadd => "user:$login:connections", $name],
176 0           [hmset => "user:$login:connection:$name", %{$validation->output}, state => 'disconnected'],
177             $delay->begin,
178             );
179             },
180             sub {
181 0     0     my ($delay, @saved) = @_;
182 0           $self->control(start => $login, $name, $delay->begin);
183             },
184             sub {
185 0     0     my ($delay, $started) = @_;
186 0           $self->$cb($validation, $validation->output);
187             },
188 0           );
189             }
190              
191             =head2 update_connection
192              
193             $self->update_connection({
194             login => $str,
195             name => $str,
196             nick => $str,
197             server => $str, # irc_server[:port]
198             }, $callback);
199              
200             Update a connection's settings. This might issue a reconnect or issue
201             IRC commands to reflect the changes.
202              
203             =cut
204              
205             sub update_connection {
206 0     0 1   my ($self, $input, $cb) = @_;
207 0           my $validation = $self->_validation($input, qw( login name nick password server username ));
208              
209 0 0         if ($validation->has_error) {
210 0           $self->$cb($validation, undef);
211 0           return $self;
212             }
213              
214 0           my ($login, $name) = $validation->param([qw( login name )]);
215 0           my $conn = Convos::Core::Connection->new(%{$validation->output});
  0            
216 0           my $redis = $self->redis;
217              
218 0           warn "[core:$login] update ", _dumper($validation->output), "\n" if DEBUG;
219              
220             Mojo::IOLoop->delay(
221             sub {
222 0     0     my ($delay) = @_;
223 0           $redis->hgetall("user:$login:connection:$name", $delay->begin);
224             },
225             sub {
226 0     0     my ($delay, $current) = @_;
227              
228 0 0 0       unless ($current and %$current) {
229 0           $validation->error(name => ['no_such_connection']);
230 0           $self->$cb($validation, undef);
231 0           return;
232             }
233              
234 0           $delay->begin->(@_); # pass on $current and $conversations
235 0           $redis->zrange("user:$login:conversations", 0, 1, $delay->begin);
236 0           $redis->hmset("user:$login:connection:$name", $validation->output, $delay->begin);
237             },
238             sub {
239 0     0     my ($delay, $current, $conversations) = @_;
240              
241 0           $conn = $validation->output; # get rid of the extra junk from Connection->new()
242              
243 0 0         if ($current->{server} ne $conn->{server}) {
244 0           $self->control(restart => $login, $name, sub { });
  0            
245 0           $self->$cb(undef, $conn);
246 0           return;
247             }
248 0 0         if ($current->{nick} ne $conn->{nick}) {
249 0           warn "[core:$login] NICK $conn->{nick}\n" if DEBUG;
250 0           $redis->publish("convos:user:$login:$name", "dummy-uuid NICK $conn->{nick}");
251             }
252              
253 0           $self->$cb(undef, $conn);
254             },
255 0           );
256              
257 0           return $self;
258             }
259              
260             =head2 delete_connection
261              
262             $self->delete_connection({
263             login => $str,
264             name => $str,
265             }, $cb);
266              
267             =cut
268              
269             sub delete_connection {
270 0     0 1   my ($self, $input, $cb) = @_;
271 0           my $validation = $self->_validation($input);
272              
273 0           $validation->required('login');
274 0           $validation->required('name');
275              
276 0 0         if ($validation->has_error) {
277 0           $self->$cb($validation);
278 0           return $self;
279             }
280              
281 0           my ($login, $name) = $validation->param([qw( login name )]);
282              
283 0           warn "[core:$login] delete $name\n" if DEBUG;
284             Mojo::IOLoop->delay(
285             sub {
286 0     0     my ($delay) = @_;
287 0           $self->redis->del("user:$login:connection:$name", $delay->begin);
288 0           $self->redis->srem("connections", "$login:$name", $delay->begin);
289 0           $self->redis->srem("user:$login:connections", $name, $delay->begin);
290             },
291             sub {
292 0     0     my ($delay, @removed) = @_;
293              
294 0 0         unless ($removed[0]) {
295 0           $validation->error(name => ['no_such_connection']);
296 0           $self->$cb($validation);
297 0           return $self;
298             }
299              
300 0           $self->redis->keys("user:$login:connection:$name*", $delay->begin); # jht: not sure if i like this...
301 0           $self->redis->zrange("user:$login:conversations", 0, -1, $delay->begin);
302 0           $self->control(stop => $login, $name, $delay->begin);
303             },
304             sub {
305 0     0     my ($delay, $keys, $conversations) = @_;
306 0 0         $self->redis->del(@$keys, $delay->begin) if @$keys;
307 0           $self->redis->zrem("user:$login:conversations", $_) for grep {/^$name\b/} @$conversations;
  0            
308 0           $self->$cb(undef);
309             },
310 0           );
311             }
312              
313             =head2 delete_user
314              
315             $self = $self->delete_user(
316             { login => $str },
317             sub { my ($self, $err) = @_; ... },
318             );
319              
320             This method will delete a user and all the conversations, connections, and
321             related data. It will also stop all the connections.
322              
323             =cut
324              
325             sub delete_user {
326 0     0 1   my ($self, $input, $cb) = @_;
327 0           my $redis = $self->redis;
328 0           my $login = $input->{login};
329              
330             Mojo::IOLoop->delay(
331             sub {
332 0     0     my ($delay) = @_;
333 0           $redis->smembers("user:$login:connections", $delay->begin);
334 0           $redis->keys("user:$login:*", $delay->begin);
335             },
336             sub {
337 0     0     my ($delay, $connections, $keys) = @_;
338              
339 0 0         $redis->del(@$keys, $delay->begin) if @$keys;
340 0           $redis->del("user:$login", $delay->begin);
341 0           $redis->srem("users", $login, $delay->begin);
342              
343 0           for my $name (@$connections) {
344 0           my $conn = $self->_connection("$login:$name");
345 0           $self->control(stop => $login, $name, $delay->begin);
346 0           $self->archive->flush($conn);
347 0           $redis->srem("connections", "$login:$name", $delay->begin);
348             }
349             },
350             sub {
351 0     0     my ($delay, @deleted) = @_;
352 0           $self->$cb('');
353             },
354 0           );
355              
356 0           return $self;
357             }
358              
359             =head2 ctrl_stop
360              
361             $self->ctrl_stop($login, $server);
362              
363             Stop a connection by connection id.
364              
365             =cut
366              
367             sub ctrl_stop {
368 0     0 1   my ($self, $login, $server) = @_;
369 0           my $id = join ':', $login, $server;
370 0 0         my $conn = $self->{connections}{$id} or return;
371              
372 0           Scalar::Util::weaken($self);
373 0     0     $conn->disconnect(sub { delete $self->{connections}{$id} });
  0            
374             }
375              
376             =head2 ctrl_restart
377              
378             $self->ctrl_restart($login, $server);
379              
380             Restart a connection by connection id.
381              
382             =cut
383              
384             sub ctrl_restart {
385 0     0 1   my ($self, $login, $server) = @_;
386 0           my $id = join ':', $login, $server;
387              
388 0 0         if (my $conn = $self->{connections}{$id}) {
389 0           Scalar::Util::weaken($self);
390             $conn->disconnect(
391             sub {
392 0     0     delete $self->{connections}{$id};
393 0           $self->ctrl_start($login => $server);
394             }
395 0           );
396             }
397             else {
398 0           $self->ctrl_start($login => $server);
399             }
400             }
401              
402             =head2 ctrl_start
403              
404             Start a single connection by connection id.
405              
406             =cut
407              
408             sub ctrl_start {
409 0     0 1   my ($self, $login, $name) = @_;
410 0           $self->_connection("$login:$name")->connect;
411             }
412              
413             =head2 login
414              
415             $self->login({ login => $str, password => $str }, $callback);
416              
417             Will call callback after authenticating the user. C<$callback> will receive
418             either:
419              
420             $callback->($self, ''); # success
421             $callback->($self, 'error message'); # on error
422              
423             =cut
424              
425             sub login {
426 0     0 1   my ($self, $input, $cb) = @_;
427 0           my $validation = $self->_validation($input);
428 0           my $output;
429              
430 0           $validation->required('login');
431 0           $validation->required('password');
432              
433 0 0         if ($validation->has_error) {
434 0           $self->$cb($validation);
435 0           return $self;
436             }
437              
438 0           $output = $validation->output;
439 0           $output->{login} = lc $output->{login};
440              
441             Mojo::IOLoop->delay(
442             sub {
443 0     0     my $delay = shift;
444 0           $self->redis->hget("user:$output->{login}", "digest", $delay->begin);
445             },
446             sub {
447 0     0     my ($delay, $digest) = @_;
448 0 0         if (!$digest) {
    0          
449 0           $validation->error(login => ['no_such_user']);
450 0           $self->$cb($validation);
451             }
452             elsif ($digest eq crypt scalar $validation->param('password'), $digest) {
453 0           warn "[core:$output->{login}] Valid password\n" if DEBUG;
454 0           $self->$cb(undef);
455             }
456             else {
457 0           $validation->error(login => ['invalid_password']);
458 0           $self->$cb($validation);
459             }
460             }
461 0           );
462             }
463              
464             sub _connection {
465 0     0     my ($self, $id) = @_;
466 0           my $conn = $self->{connections}{$id};
467              
468 0 0         unless ($conn) {
469 0           my ($login, $name) = split /:/, $id;
470 0           Scalar::Util::weaken($self);
471 0           $conn = Convos::Core::Connection->new(redis => $self->redis, log => $self->log, login => $login, name => $name);
472 0 0 0 0     $conn->on(save => sub { $_[1]->{message} and $_[1]->{timestamp} and $self->archive->save(@_); });
  0            
473 0           $self->{connections}{$id} = $conn;
474             }
475              
476 0           $conn;
477             }
478              
479             sub _dumper { # function
480 0     0     Data::Dumper->new([@_])->Indent(0)->Sortkeys(1)->Terse(1)->Dump;
481             }
482              
483             sub _validation {
484 0     0     my ($self, $input, @names) = @_;
485 0           my $validation;
486              
487 0 0         if (UNIVERSAL::isa($input, 'Mojolicious::Validator::Validation')) {
488 0           $validation = $input;
489             }
490             else {
491 0           $validation = Mojolicious::Validator->new->validation;
492 0           $validation->input($input);
493             }
494              
495 0           for my $k (@names) {
496 0 0         if ($k eq 'password') { $validation->optional('password') }
  0 0          
    0          
    0          
    0          
    0          
497 0           elsif ($k eq 'username') { $validation->optional('username') }
498 0           elsif ($k eq 'login') { $validation->required('login')->size(3, 30) }
499 0           elsif ($k eq 'name') { $validation->required('name')->like(qr{^[-a-z0-9]+$}) } # network name
500 0           elsif ($k eq 'nick') { $validation->required('nick')->size(1, 30) }
501 0           elsif ($k eq 'server') { $validation->required('server')->like($Convos::Core::Util::SERVER_NAME_RE) }
502 0           else { $validation->required($k) }
503             }
504              
505 0           $validation;
506             }
507              
508             sub DESTROY {
509 0     0     my $self = shift;
510 0           my $tid;
511              
512 0 0         Mojo::IOLoop->remove($tid) if $tid = $self->{connect_tid};
513             }
514              
515             =head1 COPYRIGHT
516              
517             See L.
518              
519             =head1 AUTHOR
520              
521             Jan Henning Thorsen
522              
523             Marcus Ramberg
524              
525             =cut
526              
527             1;