703
|
0 |
0 |
0 |
$self->{'protocol'} //= do {
my $pg = "Protocol::Database::PostgreSQL::Client"->new("database", $self->database_name, "outgoing", $self->outgoing);
$self->incoming->switch_str(sub {
$_->type;
}
, "authentication_request", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Auth request received: %s", $msg);
$Database::Async::Engine::PostgreSQL::log->errorf("unknown auth type %s", $msg->auth_type) unless my $code = $AUTH_HANDLER{$msg->auth_type};
$self->$code($msg);
}
), "password", $self->$curry::weak(sub {
my($self, %args) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Auth request received: %s", \%args);
$self->protocol->{'user'} = $self->uri->user;
$self->protocol->send_message("PasswordMessage", "password", $self->encode_text($self->database_password));
}
), "parameter_status", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Parameter received: %s", $msg);
$self->set_parameter(map($self->decode_text($_), $msg->key, $msg->value));
}
), "row_description", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Row description %s", $msg);
$Database::Async::Engine::PostgreSQL::log->errorf("No active query?") unless my $q = $self->active_query;
$q->row_description($msg->description);
}
), "data_row", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Have row data %s", $msg);
$self->{'fc'} ||= $self->active_query->row_data->flow_control->each($self->$curry::weak(sub {
my($self) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Flow control event - will %s stream", $_ ? "resume" : "pause");
$self->stream->want_readready($_) if $self->stream;
}
));
$self->active_query->row([map($self->decode_text($_), $msg->fields)]);
}
), "command_complete", $self->$curry::weak(sub {
my($self, $msg) = @_;
delete $self->{'fc'};
unless (my $query = delete $self->{'active_query'}) {
$Database::Async::Engine::PostgreSQL::log->warnf("Command complete but no query");
return;
};
$Database::Async::Engine::PostgreSQL::log->tracef("Completed query %s with result %s", $query, $msg->result);
$query->done unless $query->completed->is_ready;
}
), "no_data", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Completed query %s with no data", $self->active_query);
}
), "send_request", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Send request for %s", $msg);
$self->stream->write($msg);
}
), "ready_for_query", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Ready for query, state is %s", $msg->state);
$self->ready_for_query->set_string($msg->state);
$self->db->engine_ready($self) if $self->db;
}
), "backend_key_data", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Backend key data: pid %d, key 0x%08x", $msg->pid, $msg->key);
}
), "parse_complete", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Parsing complete for query %s", $self->active_query);
}
), "bind_complete", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Bind complete for query %s", $self->active_query);
}
), "close_complete", $self->$curry::weak(sub {
my($self, $msg) = @_;
delete $self->{'fc'};
$Database::Async::Engine::PostgreSQL::log->tracef("Close complete for query %s", $self->active_query);
}
), "empty_query_response", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Query returned no results for %s", $self->active_query);
}
), "error_response", $self->$curry::weak(sub {
my($self, $msg) = @_;
if (my $query = $self->active_query) {
$Database::Async::Engine::PostgreSQL::log->warnf("Query returned error %s for %s", $msg->error, $self->active_query);
my $f = $query->completed;
$f->fail($msg->error) unless $f->is_ready;
}
else {
$Database::Async::Engine::PostgreSQL::log->errorf("Received error %s with no active query", $msg->error);
};
}
), "copy_in_response", $self->$curry::weak(sub {
my($self, $msg) = @_;
my $query = $self->active_query;
$Database::Async::Engine::PostgreSQL::log->tracef("Ready to copy data for %s", $query);
my $proto = $self->protocol;
{
my $src = $query->streaming_input;
$src->completed->on_ready(sub {
my($f) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Sending copy done notification, stream status was %s", $f->state);
$proto->send_message("CopyDone", "data", "");
$proto->send_message("Close", "portal", "", "statement", "");
$proto->send_message("Sync", "portal", "", "statement", "");
}
);
$src->each(sub {
$Database::Async::Engine::PostgreSQL::log->tracef("Sending %s", $_);
$proto->send_copy_data($_);
}
);
};
$query->ready_to_stream->done unless $query->ready_to_stream->is_ready;
}
), "copy_out_response", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("copy out starts %s", $msg);
}
), "copy_data", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Have copy data %s", $msg);
unless (my $query = $self->active_query) {
$Database::Async::Engine::PostgreSQL::log->warnf("No active query for copy data");
return;
};
$query->row([map($self->decode_text($_), @$_)]) foreach ($msg->rows);
}
), "copy_done", $self->$curry::weak(sub {
my($self, $msg) = @_;
$Database::Async::Engine::PostgreSQL::log->tracef("Copy done - %s", $msg);
}
), "notification_response", $self->$curry::weak(sub {
my($self, $msg) = @_;
my($chan, $data) = @{$msg;}{"channel", "data"};
$Database::Async::Engine::PostgreSQL::log->tracef("Notification on channel %s containing %s", $chan, $data);
$self->db->notification($self, map($self->decode_text($_), $chan, $data));
}
), sub {
$Database::Async::Engine::PostgreSQL::log->errorf("Unknown message %s (type %s)", $_, $_->type);
}
);
$pg
} |