Condition Coverage

blib/lib/Database/Async/Engine/PostgreSQL.pm
Criterion Covered Total %
condition 18 94 19.1


and 3 conditions

line !l l&&!r l&&r condition
317 0 0 17 $host eq '*' || $host eq $self->uri->host and $port eq '*' || $port eq $self->uri->port
0 0 17 $host eq '*' || $host eq $self->uri->host and $port eq '*' || $port eq $self->uri->port and $user eq '*' || $user eq $self->database_user
0 0 17 $host eq '*' || $host eq $self->uri->host and $port eq '*' || $port eq $self->uri->port and $user eq '*' || $user eq $self->database_user and $db eq '*' || $db eq $self->database_name
887 0 0 0 my $idle = $self->{'idle'} and $v
906 0 0 0 defined $txt and my $encoding = $self->encoding
913 0 0 0 defined $txt and my $encoding = $self->encoding

or 2 conditions

line l !l condition
141 0 0 +(shift())->{'read_len'} //= 2097152
152 0 0 +(shift())->{'write_len'} //= 2097152
181 0 0 $uri->query_param("sslmode") // "prefer"
182 0 0 $Protocol::Database::PostgreSQL::Constants::SSL_NAME_MAP{$mode} // die("unknown SSL mode " . $mode)
277 8 0 ($uri->dbname // $uri->user) // "postgres"
282 8 0 $uri->user // "postgres"
384 0 0 +(shift())->{'is_replication'} //= 0
385 0 0 +(shift())->{'application_name'} //= "perl"

or 3 conditions

line l !l&&r !l&&!r condition
110 0 0 0 $self->{'connection'} //= $self->connect
173 0 0 0 $self->{'uri'} ||= $self->uri_for_service($self->service)
234 0 0 0 $qp{'application_name'} //= $self->application_name
273 0 0 0 +(shift())->{'service'} //= $ENV{'PGSERVICE'}
277 8 0 0 $uri->dbname // $uri->user
287 2 16 0 $ENV{'PGPASSFILE'} || 'File::HomeDir'->my_home . '/.pgpass'
317 9 8 0 $host eq '*' || $host eq $self->uri->host
9 8 0 $port eq '*' || $port eq $self->uri->port
9 8 0 $user eq '*' || $user eq $self->database_user
9 8 0 $db eq '*' || $db eq $self->database_name
330 2 18 0 ($self->uri->password // $ENV{'PGPASSWORD'}) || $self->password_from_file
484 0 0 0 $self->{'ryu'} //= do { $self->add_child(my $ryu = "Ryu::Async"->new); $ryu }
500 0 0 0 $self->{'outgoing'} //= $self->ryu->source
511 0 0 0 $self->{'incoming'} //= $self->ryu->source
523 0 0 0 $self->{'connected'} //= do { my $obs = "Ryu::Observable"->new(0); $obs->subscribe($self->$curry::weak(sub { my($self, $v) = @_; return if $v; if (my $query = delete $self->{'active_query'}) { $query->completed->fail("disconnected") unless $query->completed->is_ready; } ; if (my $db = $self->db) { $db->engine_disconnected($self); } ; } )); $obs }
554 0 0 0 $self->{'authenticated'} //= $self->loop->new_future
679 0 0 0 $self->{'protocol'} //= do { my $pg = "Protocol::Database::PostgreSQL::Client"->new("database", $self->database_name, "outgoing", $self->outgoing); $self->incoming->switch_str(sub { $_->type; } , "authentication_request", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Auth request received: %s", $msg); $Database::Async::Engine::PostgreSQL::log->errorf("unknown auth type %s", $msg->auth_type) unless my $code = $AUTH_HANDLER{$msg->auth_type}; $self->$code($msg); } ), "password", $self->$curry::weak(sub { my($self, %args) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Auth request received: %s", \%args); $self->protocol->{'user'} = $self->uri->user; $self->protocol->send_message("PasswordMessage", "password", $self->encode_text($self->database_password)); } ), "parameter_status", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Parameter received: %s", $msg); $self->set_parameter(map($self->decode_text($_), $msg->key, $msg->value)); } ), "row_description", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Row description %s", $msg); $Database::Async::Engine::PostgreSQL::log->errorf("No active query?") unless my $q = $self->active_query; $q->row_description($msg->description); } ), "data_row", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Have row data %s", $msg); $self->{'fc'} ||= $self->active_query->row_data->flow_control->each($self->$curry::weak(sub { my($self) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Flow control event - will %s stream", $_ ? "resume" : "pause"); $self->stream->want_readready($_) if $self->stream; } )); $self->active_query->row([map($self->decode_text($_), $msg->fields)]); } ), "command_complete", $self->$curry::weak(sub { my($self, $msg) = @_; delete $self->{'fc'}; unless (my $query = delete $self->{'active_query'}) { $Database::Async::Engine::PostgreSQL::log->warnf("Command complete but no query"); return; } ; $Database::Async::Engine::PostgreSQL::log->tracef("Completed query %s with result %s", $query, $msg->result); $query->done unless $query->completed->is_ready; } ), "no_data", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Completed query %s with no data", $self->active_query); } ), "send_request", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Send request for %s", $msg); $self->stream->write($msg); } ), "ready_for_query", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Ready for query, state is %s", $msg->state); $self->ready_for_query->set_string($msg->state); $self->db->engine_ready($self) if $self->db; } ), "backend_key_data", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Backend key data: pid %d, key 0x%08x", $msg->pid, $msg->key); } ), "parse_complete", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Parsing complete for query %s", $self->active_query); } ), "bind_complete", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Bind complete for query %s", $self->active_query); } ), "close_complete", $self->$curry::weak(sub { my($self, $msg) = @_; delete $self->{'fc'}; $Database::Async::Engine::PostgreSQL::log->tracef("Close complete for query %s", $self->active_query); } ), "empty_query_response", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Query returned no results for %s", $self->active_query); } ), "error_response", $self->$curry::weak(sub { my($self, $msg) = @_; if (my $query = $self->active_query) { $Database::Async::Engine::PostgreSQL::log->warnf("Query returned error %s for %s", $msg->error, $self->active_query); my $f = $query->completed; $f->fail($msg->error) unless $f->is_ready; } else { $Database::Async::Engine::PostgreSQL::log->errorf("Received error %s with no active query", $msg->error); } ; } ), "copy_in_response", $self->$curry::weak(sub { my($self, $msg) = @_; my $query = $self->active_query; $Database::Async::Engine::PostgreSQL::log->tracef("Ready to copy data for %s", $query); my $proto = $self->protocol; { my $src = $query->streaming_input; $src->completed->on_ready(sub { my($f) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Sending copy done notification, stream status was %s", $f->state); $proto->send_message("CopyDone", "data", ""); $proto->send_message("Close", "portal", "", "statement", ""); $proto->send_message("Sync", "portal", "", "statement", ""); } ); $src->each(sub { $Database::Async::Engine::PostgreSQL::log->tracef("Sending %s", $_); $proto->send_copy_data($_); } ); } ; $query->ready_to_stream->done unless $query->ready_to_stream->is_ready; } ), "copy_out_response", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("copy out starts %s", $msg); } ), "copy_data", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Have copy data %s", $msg); unless (my $query = $self->active_query) { $Database::Async::Engine::PostgreSQL::log->warnf("No active query for copy data"); return; } ; $query->row([map($self->decode_text($_), @$_)]) foreach ($msg->rows); } ), "copy_done", $self->$curry::weak(sub { my($self, $msg) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Copy done - %s", $msg); } ), "notification_response", $self->$curry::weak(sub { my($self, $msg) = @_; my($chan, $data) = @{$msg;}{"channel", "data"}; $Database::Async::Engine::PostgreSQL::log->tracef("Notification on channel %s containing %s", $chan, $data); $self->db->notification($self, map($self->decode_text($_), $chan, $data)); } ), sub { $Database::Async::Engine::PostgreSQL::log->errorf("Unknown message %s (type %s)", $_, $_->type); } ); $pg }
718 0 0 0 $self->{'fc'} ||= $self->active_query->row_data->flow_control->each($self->$curry::weak(sub { my($self) = @_; $Database::Async::Engine::PostgreSQL::log->tracef("Flow control event - will %s stream", $_ ? "resume" : "pause"); $self->stream->want_readready($_) if $self->stream; } ))
879 0 0 0 $self->{'idle'} //= $self->loop->new_future->on_ready(sub { delete $self->{'idle'}; } )
884 0 0 0 $self->{'ready_for_query'} //= do { "Ryu::Observable"->new(0)->subscribe($self->$curry::weak(sub { my($self, $v) = @_; return unless my $idle = $self->{'idle'} and $v; $idle->done unless $idle->is_ready; } )) }