File Coverage

blib/lib/Confluent/SchemaRegistry.pm
Criterion Covered Total %
statement 51 156 32.6
branch 5 74 6.7
condition 2 120 1.6
subroutine 16 39 41.0
pod 4 17 23.5
total 78 406 19.2


line stmt bran cond sub pod time code
1             package Confluent::SchemaRegistry;
2              
3             =head1 NAME
4              
5             Confluent::SchemaRegistry - A simple client for interacting with B<Confluent Schema Registry>.
6              
7             =head1 SYNOPSIS
8              
9             use Confluent::SchemaRegistry;
10              
11             my $sr = Confluent::SchemaRegistry->new( { host => 'https://my-schema-registry.org' } );
12              
13             =head1 DESCRIPTION
14              
15             C<Confluent::SchemaRegistry> provides a simple way to interact with B<Confluent Schema Registry>
16             (L<https://docs.confluent.io/current/schema-registry/docs/index.html>) enabling writing into
17             B<Apache Kafka> (L<https://kafka.apache.org/>) according to I<Apache Avro> schema specification
18             (L<https://avro.apache.org/>).
19              
20             =head2 HEAD UP
21              
22             =over 4
23              
24             =item Confluent Schema Registry documentation
25              
26             Full RESTful API documentation of B<Schema Registry> is available here:
27             L<https://docs.confluent.io/current/schema-registry/docs/api.html?_ga=2.234767710.1188695207.1526911788-1213051144.1524553242#>
28              
29             =item Avro package
30              
31             B<Avro> package is a dependency of I<Confluent::SchemaRegistry> but is not available in CPAN index.
32             Perhaps you may find and download it directly from GitHub repository at L<https://github.com/apache/avro/tree/master/lang/perl>.
33             Please, refer its documentation for installation.
34              
35             =back
36              
37              
38             =cut
39              
40 1     1   106262 use 5.010;
  1         4  
41 1     1   19 use strict;
  1         2  
  1         24  
42 1     1   13 use warnings;
  1         2  
  1         23  
43              
44 1     1   747 use JSON::XS;
  1         7791  
  1         61  
45 1     1   510 use REST::Client;
  1         51295  
  1         40  
46 1     1   9 use HTTP::Status qw/:is/;
  1         3  
  1         244  
47 1     1   8 use Try::Tiny;
  1         2  
  1         49  
48 1     1   728 use Aspect;
  1         37962  
  1         18  
49 1     1   1268 use Avro::Schema;
  1         17568  
  1         39  
50              
51              
52 1     1   471 use version 0.77; our $VERSION = version->declare('v1.0.0');
  1         1909  
  1         7  
53              
54             our $COMPATIBILITY_LEVELS = [ qw/NONE FULL FORWARD BACKWARD/ ];
55              
56              
57             =head1 INSTALL
58              
59             Installation of C<Kafka::Consumer::Avro> is a canonical:
60              
61             perl Makefile.PL
62             make
63             make test
64             make install
65              
66             =head2 TEST NOTES
67              
68             Tests expect that in the target host is available Schema Registry listening on C<http://localhost:8081>, otherwise most of the test are skipped.
69              
70             You can alternatively set a different URL by exporting C<CONFLUENT_SCHEMA_REGISTY_URL> environment variable.
71              
72             =head1 USAGE
73              
74             =head2 Constructor
75              
76             =head3 new( [%config] )
77              
78             Construct a new C<Confluent::SchemaRegistry>. Takes an optional hash that provides
79             configuration flags for the L<REST::Client> internal object.
80              
81             The config flags, according to C<REST::Client::new> specs, are:
82              
83             =over 4
84              
85             =item host
86              
87             The host at which I<Schema Registry> is listening.
88              
89             The default is L<http://localhost:8081>
90              
91             =item timeout
92              
93             A timeout in seconds for requests made with the client. After the timeout the
94             client will return a 500.
95              
96             The default is 5 minutes.
97              
98             =item cert
99              
100             The path to a X509 certificate file to be used for client authentication.
101              
102             The default is to not use a certificate/key pair.
103              
104             =item key
105              
106             The path to a X509 key file to be used for client authentication.
107              
108             The default is to not use a certificate/key pair.
109              
110             =item ca
111              
112             The path to a certificate authority file to be used to verify host
113             certificates.
114              
115             The default is to not use a certificates authority.
116              
117             =item pkcs12
118              
119             The path to a PKCS12 certificate to be used for client authentication.
120              
121             =item pkcs12password
122              
123             The password for the PKCS12 certificate specified with 'pkcs12'.
124              
125             =item follow
126              
127             Boolean that determins whether REST::Client attempts to automatically follow
128             redirects/authentication.
129              
130             The default is false.
131              
132             =item useragent
133              
134             An L<LWP::UserAgent> object, ready to make http requests.
135              
136             REST::Client will provide a default for you if you do not set this.
137              
138             =back
139              
140             =cut
141              
142             sub new {
143 1     1 1 165 my $this = shift;
144 1   33     9 my $class = ref($this) || $this;
145 1         5 my %config = @_;
146 1         2 my $self = {};
147              
148             # Creazione client REST
149 1 50       5 $config{host} = 'http://localhost:8081' unless defined $config{host};
150 1         23 $self->{_CLIENT} = REST::Client->new( %config );
151 1         5857 $self->{_CLIENT}->addHeader('Content-Type', 'application/vnd.schemaregistry.v1+json');
152 1         17 $self->{_CLIENT}->{_ERROR} = undef; # will be set in case of unsuccessfully responses
153 1         3 $self->{_CLIENT}->{_RESPONSE} = undef; # will be set with normalized response contents
154              
155 1         5 $self = bless($self, $class);
156              
157             # Recupero la configurazione globale del registry per testare se le coordinate fanno
158             # effettivamente riferimento ad un Confluent Schema Registry
159 1         18 my $res = $self->get_top_level_config();
160             return undef
161 1 50 33     63 unless defined($res) && grep(/^$res$/, @$COMPATIBILITY_LEVELS);
162            
163 0         0 return $self;
164             }
165              
166              
167             #
168             # BEGIN Using Aspect to simplify error handling
169             #
170             my $rest_client_calls = qr/^REST::Client::(GET|PUT|POST|DELETE)/;
171              
172             # Clear internal error and response before every REST::Client call
173             before {
174             $_->self->{_RESPONSE} = undef;
175             $_->self->{_ERROR} = undef;
176             #print STDERR 'Calling ' . $_->sub_name . ' : ';
177             } call $rest_client_calls;
178              
179             # Verify if REST calls are successfull
180             after {
181             if (is_success($_->self->responseCode())) {
182             $_->self->{_RESPONSE} = _normalize_content( $_->self->responseContent() );
183             $_->return_value(1); # success
184             } else {
185             $_->self->{_ERROR} = _normalize_content( $_->self->responseContent() );
186             $_->return_value(0); # failure
187             }
188             #print STDERR $_->self->responseCode() . "\n";
189             } call $rest_client_calls;
190              
191             #
192             # END Aspect
193             #
194              
195              
196              
197             ##############################################################################################
198             # CLASS METHODS
199             #
200              
201             sub _normalize_content {
202 1     1   23 my $res = shift;
203             return undef
204 1 50       5 unless defined($res);
205 1 50       5 return $res
206             if ref($res) eq 'HASH';
207             return try {
208 1     1   61 decode_json($res);
209             } catch {
210 1     1   19 $res;
211             }
212 1         11 }
213              
214             sub _encode_error {
215             {
216 0     0   0 error_code => $_[0],
217             message => $_[1]
218             }
219             }
220              
221             ##############################################################################################
222             # PRIVATE METHODS
223             #
224            
225 1     1   30 sub _client { $_[0]->{_CLIENT} } # RESTful client
226 0     0   0 sub _set_error { $_[0]->_client->{_ERROR} = _normalize_content($_[1]); } # get internal error
227 0     0   0 sub _get_error { $_[0]->_client->{_ERROR} } # get internal error
228 0     0   0 sub _get_response { $_[0]->_client->{_RESPONSE} } # return http response
229              
230              
231              
232             ##############################################################################################
233             # PUBLIC METHODS
234             #
235            
236             =head2 METHODS
237              
238             C<Confluent::SchemRegistry> exposes the following methods.
239              
240             =cut
241              
242              
243             =head3 get_response_content()
244              
245             Returns the body (content) of the last method call to Schema Registry.
246              
247             =cut
248              
249 0     0 1 0 sub get_response_content { $_[0]->_get_response() }
250            
251              
252             =head3 get_error()
253              
254             Returns the error structure of the last method call to Schema Registry.
255              
256             =cut
257              
258 0     0 1 0 sub get_error { $_[0]->_get_error() }
259            
260              
261             =head3 add_schema( %params )
262              
263             Registers a new schema version under a subject.
264              
265             Returns the generated id for the new schema or C<undef>.
266              
267             Params keys are:
268              
269             =over 4
270              
271             =item SUBJECT ($scalar)
272              
273             the name of the Kafka topic
274              
275             =item TYPE ($scalar)
276              
277             the type of schema ("key" or "value")
278              
279             =item SCHEMA ($hashref or $json)
280              
281             the schema to add
282              
283             =back
284              
285             =cut
286              
287             sub add_schema {
288 0     0 1 0 my $self = shift;
289 0         0 my %params = @_;
290             return undef
291             unless defined($params{SUBJECT})
292             && defined($params{TYPE})
293             && $params{SUBJECT} =~ m/^.+$/
294 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
295             return undef
296 0 0       0 unless defined($params{SCHEMA});
297 0         0 my $schema = _normalize_content($params{SCHEMA});
298 0         0 $schema = encode_json({
299             schema => encode_json($schema)
300             });
301             return $self->_get_response()->{id}
302 0 0       0 if $self->_client()->POST('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions', $schema);
303 0         0 return undef;
304             }
305              
306              
307             # List all the registered subjects
308             #
309             # Returns the list of subjects (ARRAY) or C<undef>
310             sub get_subjects {
311 0     0 0 0 my $self = shift;
312 0         0 $self->_client()->GET('/subjects');
313 0         0 return $self->_get_response();
314             }
315              
316              
317             # Delete a subject
318             #
319             # SUBJECT...: the name of the Kafka topic
320             # TYPE......: the type of schema ("key" or "value")
321             #
322             # Returns the list of versions for the deleted subject or C<undef>
323             sub delete_subject {
324 0     0 0 0 my $self = shift;
325 0         0 my %params = @_;
326             return undef
327             unless defined($params{SUBJECT})
328             && defined($params{TYPE})
329             && $params{SUBJECT} =~ m/^.+$/
330 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
331 0         0 $self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE});
332 0         0 return $self->_get_response()
333             }
334              
335              
336             # List the schema versions registered under a subject
337             #
338             # SUBJECT...: the name of the Kafka topic
339             # TYPE......: the type of schema ("key" or "value")
340             #
341             # Returns the list of schema versions (ARRAY)
342             sub get_schema_versions {
343 0     0 0 0 my $self = shift;
344 0         0 my %params = @_;
345             return undef
346             unless defined($params{SUBJECT})
347             && defined($params{TYPE})
348             && $params{SUBJECT} =~ m/^.+$/
349 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
350 0         0 $self->_client()->GET('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions');
351 0         0 return $self->_get_response();
352             }
353              
354              
355             # Fetch a schema by globally unique id
356             #
357             # SCHEMA_ID...: the globally unique id of the schema
358             #
359             # Returns the schema in Avro::Schema format or C<undef>
360             sub get_schema_by_id {
361 0     0 0 0 my $self = shift;
362 0         0 my %params = @_;
363             return undef
364             unless defined($params{SCHEMA_ID})
365 0 0 0     0 && $params{SCHEMA_ID} =~ m/^\d+$/;
366 0 0       0 if ( $self->_client()->GET('/schemas/ids/' . $params{SCHEMA_ID})) {
367 0 0       0 if (exists $self->_get_response()->{schema}) {
368 0         0 my $avro_schema;
369             try {
370 0     0   0 $avro_schema = Avro::Schema->parse($self->_get_response()->{schema});
371             } catch {
372 0     0   0 $self->_set_error( _encode_error(-2, $_->{'-text'}) );
373 0         0 };
374 0         0 return $avro_schema;
375             }
376             }
377 0         0 return undef;
378             }
379              
380              
381             # Fetch a specific version of the schema registered under a subject
382             #
383             # SUBJECT...: the name of the Kafka topic
384             # TYPE......: the type of schema ("key" or "value")
385             # VERSION...: the schema version to fetch; if omitted the latest version is fetched
386             #
387             # Returns the schema in Avro::Schema format or C<undef>
388             sub get_schema {
389 0     0 0 0 my $self = shift;
390 0         0 my %params = @_;
391             return undef
392             unless defined($params{SUBJECT})
393             && defined($params{TYPE})
394             && $params{SUBJECT} =~ m/^.+$/
395 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
396             return undef
397             if defined($params{VERSION})
398 0 0 0     0 && $params{VERSION} !~ m/^\d+$/;
399 0 0       0 $params{VERSION} = 'latest' unless defined($params{VERSION});
400 0 0       0 if ($self->_client()->GET('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION})) {
401 0         0 my $sv = $self->_get_response();
402 0 0       0 if (exists $sv->{schema}) {
403             try {
404 0     0   0 $sv->{schema} = Avro::Schema->parse($sv->{schema});
405             } catch {
406 0     0   0 $self->_set_error( _encode_error(-2, $_->{'-text'}) );
407 0         0 return undef;
408 0         0 };
409             }
410 0         0 return $sv;
411             }
412 0         0 return undef;
413             }
414              
415              
416             # Delete a specific version of the schema registered under a subject
417             #
418             # SUBJECT...: the name of the Kafka topic
419             # TYPE......: the type of schema ("key" or "value")
420             # VERSION...: the schema version to delete
421             #
422             # Returns the deleted version number (NUMBER) or C<undef>
423             sub delete_schema {
424 0     0 0 0 my $self = shift;
425 0         0 my %params = @_;
426             return undef
427             unless defined($params{SUBJECT})
428             && defined($params{TYPE})
429             && $params{SUBJECT} =~ m/^.+$/
430 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
431             return undef
432             unless defined($params{VERSION})
433 0 0 0     0 && $params{VERSION} =~ m/^\d+$/;
434 0         0 $self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION});
435 0         0 return $self->_get_response();
436             }
437              
438              
439             # Delete all versions of the schema registered under subject "Kafka-value"
440             #
441             # SUBJECT...: the name of the Kafka topic
442             # TYPE......: the type of schema ("key" or "value")
443             #
444             # Returns the list of deleted versions or C<undef>
445             sub delete_all_schemas {
446 0     0 0 0 my $self = shift;
447 0         0 my %params = @_;
448             return undef
449             unless defined($params{SUBJECT})
450             && defined($params{TYPE})
451             && $params{SUBJECT} =~ m/^.+$/
452 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
453 0         0 $self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE});
454 0         0 return $self->_get_response();
455             }
456              
457              
458             # Check whether the schema $SCHEMA has been registered under subject "${SUBJECT}-${TYPE}"
459             #
460             # SUBJECT...: the name of the Kafka topic
461             # TYPE......: the type of schema ("key" or "value")
462             # SCHEMA....: the schema (HASH or JSON) to check for
463             #
464             # If found, returns the schema info (HASH) otherwise C<undef>
465             sub check_schema {
466 0     0 0 0 my $self = shift;
467 0         0 my %params = @_;
468             return undef
469             unless defined($params{SUBJECT})
470             && defined($params{TYPE})
471             && $params{SUBJECT} =~ m/^.+$/
472 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
473             return undef
474 0 0       0 unless defined($params{SCHEMA});
475 0         0 my $schema = _normalize_content($params{SCHEMA});
476 0         0 $schema = encode_json({
477             schema => encode_json($schema)
478             });
479 0         0 $self->_client()->POST('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE}, $schema);
480 0         0 my $schema_info = $self->_get_response();
481             return undef
482 0 0       0 unless $schema_info;
483 0         0 $schema_info->{schema} = Avro::Schema->parse($schema_info->{schema});
484 0         0 return $schema_info;
485             }
486              
487              
488             # Test compatibility of the schema $SCHEMA with the version $VERSION of the schema under subject "${SUBJECT}-${TYPE}"
489             #
490             # SUBJECT...: the name of the Kafka topic
491             # TYPE......: the type of schema ("key" or "value")
492             # VERSION...: the schema version to test; if omitted latest version is used
493             # SCHEMA....: the schema (HASH or JSON) to check for
494             #
495             # returns TRUE if the providied schema is compatible with the latest one (BOOLEAN)
496             sub test_schema {
497 0     0 0 0 my $self = shift;
498 0         0 my %params = @_;
499             return undef
500             unless defined($params{SUBJECT})
501             && defined($params{TYPE})
502             && $params{SUBJECT} =~ m/^.+$/
503 0 0 0     0 && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
504             return undef
505             if defined($params{VERSION})
506 0 0 0     0 && $params{VERSION} !~ m/^\d+$/;
507 0 0       0 $params{VERSION} = 'latest' unless defined($params{VERSION});
508             return undef
509 0 0       0 unless defined($params{SCHEMA});
510 0         0 my $schema = _normalize_content($params{SCHEMA});
511 0         0 $schema = {
512             schema => encode_json($schema)
513             };
514 0         0 $self->_client()->POST('/compatibility/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION}, encode_json($schema));
515             return undef
516 0 0       0 unless defined $self->_get_response();
517             return $self->_get_response()->{is_compatible}
518 0 0       0 if exists($self->_get_response()->{is_compatible});
519 0         0 return undef;
520             }
521              
522              
523             # Get top level config
524             #
525             # Return top-level compatibility level or C<undef>
526             sub get_top_level_config {
527 1     1 0 5 my $self = shift;
528             return $self->_get_response()->{compatibilityLevel}
529 1 50       4 if $self->_client()->GET('/config');
530 1         48 return undef;
531             }
532              
533              
534             # Update compatibility requirements globally
535             # $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
536             # --data '{"compatibility": "NONE"}' \
537             # http://localhost:8081/config
538             # {"compatibility":"NONE"}
539             sub set_top_level_config {
540 0     0 0   my $self = shift;
541 0           my %params = @_;
542             $self->_set_error( _encode_error(-1, 'Unexpected value for COMPATIBILITY_LEVEL param') )
543             and return undef
544             unless defined($params{COMPATIBILITY_LEVEL})
545 0 0 0       && grep(/^$params{COMPATIBILITY_LEVEL}$/, @$COMPATIBILITY_LEVELS);
      0        
546 0           $self->_client()->PUT('/config', encode_json( { compatibility => $params{COMPATIBILITY_LEVEL} } ));
547             return $self->_get_response()->{compatibility}
548 0 0         if defined $self->_get_response();
549 0           return undef;
550             }
551              
552              
553             # Get compatibility requirements under the subject
554             #
555             # Return compatibility level for the subject or C<undef>
556             sub get_config {
557 0     0 0   my $self = shift;
558 0           my %params = @_;
559             return undef
560             unless defined($params{SUBJECT})
561             && defined($params{TYPE})
562             && $params{SUBJECT} =~ m/^.+$/
563 0 0 0       && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
564             return $self->_get_response()->{compatibilityLevel}
565 0 0         if $self->_client()->GET('/config/' . $params{SUBJECT} . '-' . $params{TYPE});
566 0           return undef;
567             }
568              
569             # Update compatibility requirements under the subject
570             #
571             # Return the new compatibility level for the subject or C<undef>
572             sub set_config {
573 0     0 0   my $self = shift;
574 0           my %params = @_;
575             $self->_set_error( _encode_error(-1, 'Bad SUBJECT or TYPE parameter') )
576             and return undef
577             unless defined($params{SUBJECT})
578             && defined($params{TYPE})
579             && $params{SUBJECT} =~ m/^.+$/
580 0 0 0       && $params{TYPE} =~ m/^key|value$/;
      0        
      0        
      0        
581             $self->_set_error( _encode_error(-1, 'Unexpected value for COMPATIBILITY_LEVEL param') )
582             and return undef
583             unless defined($params{COMPATIBILITY_LEVEL})
584 0 0 0       && grep(/^$params{COMPATIBILITY_LEVEL}$/, @$COMPATIBILITY_LEVELS);
      0        
585 0           $self->_client()->PUT('/config/' . $params{SUBJECT} . '-' . $params{TYPE}, encode_json( { compatibility => $params{COMPATIBILITY_LEVEL} } ));
586             return $self->_get_response()->{compatibility}
587 0 0         if defined $self->_get_response();
588 0           return undef;
589             }
590              
591             =head1 TODO
592              
593             ...
594              
595             =head1 AUTHOR
596              
597             Alvaro Livraghi, E<lt>alvarol@cpan.orgE<gt>
598              
599             =head1 CONTRIBUTE
600              
601             L<https://github.com/alivraghi/Confluent-SchemaRegistry>
602              
603             =head1 BUGS
604              
605             Please use GitHub project link above to report problems or contact authors.
606              
607             =head1 COPYRIGHT AND LICENSE
608              
609             Copyright 2018 by Alvaro Livraghi
610              
611             This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
612              
613             =cut
614              
615             1;