File Coverage

blib/lib/Net/Clacks/PostgreSQL2Clacks.pm
Criterion Covered Total %
statement 47 151 31.1
branch 0 52 0.0
condition n/a
subroutine 16 22 72.7
pod 5 5 100.0
total 68 230 29.5


line stmt bran cond sub pod time code
1             package Net::Clacks::PostgreSQL2Clacks;
2             #---AUTOPRAGMASTART---
3 1     1   4020 use v5.36;
  1         5  
4 1     1   6 use strict;
  1         3  
  1         32  
5 1     1   6 use diagnostics;
  1         2  
  1         8  
6 1     1   43 use mro 'c3';
  1         2  
  1         8  
7 1     1   40 use English qw(-no_match_vars);
  1         3  
  1         7  
8 1     1   519 use Carp qw[carp croak confess cluck longmess shortmess];
  1         3  
  1         124  
9             our $VERSION = 35;
10 1     1   8 use autodie qw( close );
  1         2  
  1         9  
11 1     1   424 use Array::Contains;
  1         3  
  1         113  
12 1     1   7 use utf8;
  1         2  
  1         21  
13 1     1   57 use Encode qw(is_utf8 encode_utf8 decode_utf8);
  1         3  
  1         62  
14 1     1   7 use Data::Dumper;
  1         2  
  1         79  
15 1     1   7 use builtin qw[true false is_bool];
  1         3  
  1         56  
16 1     1   6 no warnings qw(experimental::builtin); ## no critic (TestingAndDebugging::ProhibitNoWarnings)
  1         2  
  1         83  
17             #---AUTOPRAGMAEND---
18              
19 1     1   8 use Net::Clacks::Client;
  1         2  
  1         121  
20 1     1   2695 use DBI;
  1         30802  
  1         100  
21 1     1   10 use Time::HiRes qw(sleep);
  1         3  
  1         10  
22              
23 0     0 1   sub new($class, %config) {
  0            
  0            
  0            
24 0           my $self = bless \%config, $class;
25              
26 0 0         if(!defined($self->{clacks})) {
27 0           croak("clacks config not defined");
28             }
29              
30 0           my @requireds = qw(username password clientname);
31 0 0         if(!defined($self->{clacks}->{socket})) {
32 0           push @requireds, 'server';
33 0           push @requireds, 'port';
34             }
35              
36 0           foreach my $required (@requireds) {
37 0 0         if(!defined($self->{clacks}->{$required})) {
38 0           croak("clacks setting $required not defined");
39             }
40             }
41            
42 0 0         if(!defined($self->{clacks}->{caching})) {
43 0           $self->{clacks}->{caching} = 0;
44             }
45              
46 0 0         if(!defined($self->{postgres})) {
47 0           croak("postgres config not defined");
48             }
49              
50 0           foreach my $required (qw(url username password)) {
51 0 0         if(!defined($self->{postgres}->{$required})) {
52 0           croak("postgres setting $required not defined");
53             }
54             }
55              
56 0           my $clacks;
57              
58 0 0         if(defined($self->{clacks}->{socket})) {
59             # Unix domain sockets
60             $clacks = Net::Clacks::Client->newSocket($self->{clacks}->{socket}, $self->{clacks}->{username}, $self->{clacks}->{password}, $self->{clacks}->{clientname}, $self->{clacks}->{caching})
61 0 0         or croak("Failed to connect to Clacks");
62             } else {
63             # TCP/IP connection
64             $clacks = Net::Clacks::Client->new($self->{clacks}->{server}, $self->{clacks}->{port}, $self->{clacks}->{username}, $self->{clacks}->{password}, $self->{clacks}->{clientname}, $self->{clacks}->{caching})
65 0 0         or croak("Failed to connect to Clacks");
66             }
67              
68             my $dbh = DBI->connect($self->{postgres}->{url}, $self->{postgres}->{username}, $self->{postgres}->{password},
69             {
70 0 0         AutoCommit => 0,
71             RaiseError => 0,
72             AutoInactiveDestroy => 1,
73             }) or croak($EVAL_ERROR);
74              
75 0           $self->{clacks} = $clacks;
76 0           $self->{dbh} = $dbh;
77              
78 0           $self->{nextping} = time + 10;
79              
80 0           $self->{dbh}->do('LISTEN clacksmessage');
81 0           $self->{dbh}->commit;
82              
83 0           return $self;
84             }
85              
86 0     0 1   sub initFunctions($self) {
  0            
  0            
87              
88 0           my @stmts = $self->getStatements();
89              
90             # Create/update function on PostgreSQL side
91 0           foreach my $stmt (@stmts) {
92 0 0         if(!$self->{dbh}->do($stmt)) {
93 0           croak($self->{dbh}->errstr);
94             }
95             }
96 0           $self->{dbh}->commit;
97              
98 0           return;
99             }
100              
101 0     0 1   sub run($self) {
  0            
  0            
102 0           while(1) {
103 0           my @lines = $self->runOnce();
104 0           foreach my $line (@lines) {
105 0           print STDERR $line, "\n";
106             }
107 0           sleep(0.1);
108             }
109              
110 0           return;
111             }
112              
113 0     0 1   sub runOnce($self) {
  0            
  0            
114 0 0         if(!$self->{dbh}->ping) {
115 0           croak("Database connection failed!");
116             }
117 0           my @lines;
118              
119 0           $self->{clacks}->doNetwork();
120              
121 0 0         if(time < $self->{nextping}) {
122 0           $self->{clacks}->ping();
123 0           $self->{nextping} = time + 10;
124 0           $self->{clacks}->doNetwork();
125             }
126              
127              
128 0           while((my $message = $self->{clacks}->getNext())) {
129             # We mostly do this to keep the inbuffer nice and empty.
130             # Since we neither LISTEN nor expect any other message, we should be fine
131 0 0         if($message->{type} eq 'serverinfo') {
    0          
    0          
132 0           push @lines, "Connected to " . $message->{data};
133             } elsif($message->{type} eq 'disconnect') {
134 0 0         if($message->{data} eq 'timeout') {
135 0           push @lines, "Connection timeout! If you use runOnce() instead of run(), make sure to call it at least every 10 seconds!";
136             }
137 0           push @lines, "Connection to server lost.";
138             } elsif($message->{type} eq 'reconnected') {
139 0           push @lines, "Reconnected to server.";
140             } else {
141 0           push @lines, "Clacks-Message: " . $message->{type} . " ignored.\n";
142             }
143             }
144              
145              
146 0           while((my $notify = $self->{dbh}->pg_notifies)) {
147 0           my ($nname, $npid, $npayload) = @{$notify};
  0            
148              
149 0           my ($command, $name, $value) = split/\§\§\§CLACKSDELIMETER\§\§\§/, $npayload;
150              
151 0 0         if(!defined($name)) {
152 0           $name = '';
153             }
154              
155 0 0         if(!defined($value)) {
156 0           $value = '';
157             }
158              
159 0           my $line = join(' ', $command, $name, $value);
160 0 0         if($command eq 'NOTIFY') {
    0          
    0          
    0          
    0          
    0          
    0          
161 0           $self->{clacks}->notify($name);
162             } elsif($command eq 'SET') {
163 0           $self->{clacks}->set($name, $value);
164             } elsif($command eq 'STORE') {
165 0           $self->{clacks}->store($name, $value);
166             } elsif($command eq 'SETANDSTORE') {
167 0           $self->{clacks}->setAndStore($name, $value);
168             } elsif($command eq 'INCREMENT') {
169 0           $self->{clacks}->increment($name, $value);
170             } elsif($command eq 'DECREMENT') {
171 0           $self->{clacks}->decrement($name, $value);
172             } elsif($command eq 'REMOVE') {
173 0           $self->{clacks}->remove($name);
174             } else {
175 0           $line = 'UNKNOWN COMMAND: ' . $line;
176             }
177              
178 0           push @lines, $line;
179 0           $self->{dbh}->commit;
180 0           $self->{clacks}->doNetwork();
181             }
182 0           $self->{dbh}->commit;
183 0           $self->{clacks}->doNetwork();
184              
185 0           return @lines;
186             }
187              
188              
189 0     0     sub DESTROY($self) {
  0            
  0            
190             # Try to disconnect cleanly, but socket might already be DESTROYed, so catch any errors
191 0           eval { ## no critic (ErrorHandling::RequireCheckingReturnValueOfEval)
192 0           $self->{clacks}->disconnect();
193 0           $self->{dbh}->disconnect();
194             };
195              
196 0           return;
197             }
198              
199 0     0 1   sub getStatements($self) {
  0            
  0            
200             return (
201 0           "CREATE OR REPLACE FUNCTION clacks_notify(clacksname text) RETURNS void AS \$\$
202             BEGIN
203             PERFORM pg_notify('clacksmessage', 'NOTIFY' || '§§§CLACKSDELIMETER§§§' || clacksname);
204             RETURN;
205             END
206             \$\$ LANGUAGE plpgsql;",
207              
208             "CREATE OR REPLACE FUNCTION clacks_set(clacksname text, clacksvalue text) RETURNS void AS \$\$
209             BEGIN
210             PERFORM pg_notify('clacksmessage', 'SET' || '§§§CLACKSDELIMETER§§§' || clacksname || '§§§CLACKSDELIMETER§§§' || clacksvalue);
211             RETURN;
212             END
213             \$\$ LANGUAGE plpgsql;",
214              
215             "CREATE OR REPLACE FUNCTION clacks_store(clacksname text, clacksvalue text) RETURNS void AS \$\$
216             BEGIN
217             PERFORM pg_notify('clacksmessage', 'STORE' || '§§§CLACKSDELIMETER§§§' || clacksname || '§§§CLACKSDELIMETER§§§' || clacksvalue);
218             RETURN;
219             END
220             \$\$ LANGUAGE plpgsql;",
221              
222             "CREATE OR REPLACE FUNCTION clacks_setandstore(clacksname text, clacksvalue text) RETURNS void AS \$\$
223             BEGIN
224             PERFORM pg_notify('clacksmessage', 'SETANDSTORE' || '§§§CLACKSDELIMETER§§§' || clacksname || '§§§CLACKSDELIMETER§§§' || clacksvalue);
225             RETURN;
226             END
227             \$\$ LANGUAGE plpgsql;",
228              
229             "CREATE OR REPLACE FUNCTION clacks_increment(clacksname text, clacksvalue text) RETURNS void AS \$\$
230             BEGIN
231             PERFORM pg_notify('clacksmessage', 'INCREMENT' || '§§§CLACKSDELIMETER§§§' || clacksname || '§§§CLACKSDELIMETER§§§' || clacksvalue);
232             RETURN;
233             END
234             \$\$ LANGUAGE plpgsql;",
235              
236             "CREATE OR REPLACE FUNCTION clacks_decrement(clacksname text, clacksvalue text) RETURNS void AS \$\$
237             BEGIN
238             PERFORM pg_notify('clacksmessage', 'DECREMENT' || '§§§CLACKSDELIMETER§§§' || clacksname || '§§§CLACKSDELIMETER§§§' || clacksvalue);
239             RETURN;
240             END
241             \$\$ LANGUAGE plpgsql;",
242              
243             "CREATE OR REPLACE FUNCTION clacks_remove(clacksname text) RETURNS void AS \$\$
244             BEGIN
245             PERFORM pg_notify('clacksmessage', 'REMOVE' || '§§§CLACKSDELIMETER§§§' || clacksname);
246             RETURN;
247             END
248             \$\$ LANGUAGE plpgsql;",
249             );
250             };
251              
252              
253             1;
254             __END__