494
|
0 |
0 |
0 |
$self->{'protocol'} //= do {
my $pg = 'Protocol::Database::PostgreSQL::Client'->new('database', $self->database_name, 'outgoing', $self->outgoing);
$self->incoming->switch_str(sub {
$_->type;
}
, 'authentication_request', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Auth request received: %s', $msg);
$Database::Async::Engine::PostgreSQL::log->errorf('unknown auth type %s', $msg->auth_type) unless my $code = $AUTH_HANDLER{$msg->auth_type};
$self->$code($msg);
}
), 'password', $self->$curry::weak(sub {
my($self, %args) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Auth request received: %s', \%args);
$self->protocol->{'user'} = $self->uri->user;
$self->protocol->send_message('PasswordMessage', 'password', $self->encode_text($self->uri->password));
}
), 'parameter_status', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Parameter received: %s', $msg);
$self->set_parameter(map($self->decode_text($_), $msg->key, $msg->value));
}
), 'row_description', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Row description %s', $msg);
$Database::Async::Engine::PostgreSQL::log->errorf('No active query?') unless my $q = $self->active_query;
$q->row_description($msg->description);
}
), 'data_row', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Have row data %s', $msg);
$self->active_query->row([map($self->decode_text($_), $msg->fields)]);
}
), 'command_complete', $self->$curry::weak(sub {
my($self, $msg) = @_;
unless (my $query = delete $self->{'active_query'}) {
$Database::Async::Engine::PostgreSQL::log->warnf('Command complete but no query');
return;
};
$Database::Async::Engine::PostgreSQL::log->tracef('Completed query %s with result %s', $query, $msg->result);
$query->done unless $query->completed->is_ready;
}
), 'no_data', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Completed query %s with no data', $self->active_query);
}
), 'send_request', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Send request for %s', $msg);
$self->stream->write($msg);
}
), 'ready_for_query', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Ready for query, state is %s', $msg->state);
$self->ready_for_query->set_string($msg->state);
$self->db->engine_ready($self) if $self->db;
}
), 'backend_key_data', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Backend key data: pid %d, key 0x%08x', $msg->pid, $msg->key);
}
), 'parse_complete', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Parsing complete for query %s', $self->active_query);
}
), 'bind_complete', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Bind complete for query %s', $self->active_query);
}
), 'close_complete', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Close complete for query %s', $self->active_query);
}
), 'empty_query_response', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Query returned no results for %s', $self->active_query);
}
), 'error_response', $self->$curry::weak(sub {
my($self, $msg) = @_;
my $query = $self->active_query;
$Database::Async::Engine::PostgreSQL::log->warnf('Query returned error %s for %s', $msg->error, $self->active_query);
my $f = $query->completed;
$f->fail($msg->error) unless $f->is_ready;
}
), 'copy_in_response', $self->$curry::weak(sub {
my($self, $msg) = @_;
my $query = $self->active_query;
$Database::Async::Engine::PostgreSQL::log->tracef('Ready to copy data for %s', $query);
my $proto = $self->protocol;
{
my $src = $query->streaming_input;
$src->completed->on_ready(sub {
my($f) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Sending copy done notification, stream status was %s', $f->state);
$proto->send_message('CopyDone', 'data', '');
$proto->send_message('Close', 'portal', '', 'statement', '');
$proto->send_message('Sync', 'portal', '', 'statement', '');
}
);
$src->each(sub {
$Database::Async::Engine::PostgreSQL::log->tracef('Sending %s', $_);
$proto->send_copy_data($_);
}
);
};
$query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
}
), 'copy_out_response', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('copy out starts %s', $msg);
}
), 'copy_data', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Have copy data %s', $msg);
unless (my $query = $self->active_query) {
$Database::Async::Engine::PostgreSQL::log->warnf('No active query for copy data');
return;
};
$query->row([map($self->decode_text($_), @$_)]) foreach ($msg->rows);
}
), 'copy_done', $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef('Copy done - %s', $msg);
}
), 'notification_response', $self->$curry::weak(sub {
my($self, $msg) = @_;
my($chan, $data) = @{$msg;}{'channel', 'data'};
$Database::Async::Engine::PostgreSQL::log->tracef('Notification on channel %s containing %s', $chan, $data);
$self->db->notification($self, map($self->decode_text($_), $chan, $data));
}
), sub {
$Database::Async::Engine::PostgreSQL::log->errorf('Unknown message %s (type %s)', $_, $_->type);
}
);
$pg
} |