Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/Core/PostgreSQLProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ ColumnTypeSpec convertTypeIndexToPostgresColumnTypeSpec(TypeIndex type_index)
case TypeIndex::Int64:
return {ColumnType::INT8, 8};

case TypeIndex::UInt64:
case TypeIndex::Int128:
case TypeIndex::UInt128:
case TypeIndex::Int256:
case TypeIndex::UInt256:
return {ColumnType::NUMERIC, -1};

case TypeIndex::Float32:
return {ColumnType::FLOAT4, 4};
case TypeIndex::Float64:
Expand All @@ -32,16 +39,35 @@ ColumnTypeSpec convertTypeIndexToPostgresColumnTypeSpec(TypeIndex type_index)
return {ColumnType::VARCHAR, -1};

case TypeIndex::Date:
case TypeIndex::Date32:
return {ColumnType::DATE, 4};

case TypeIndex::DateTime:
return {ColumnType::TIMESTAMP, 8};

case TypeIndex::DateTime64:
return {ColumnType::TIMESTAMPTZ, 8};

case TypeIndex::Decimal32:
case TypeIndex::Decimal64:
case TypeIndex::Decimal128:
case TypeIndex::Decimal256:
return {ColumnType::NUMERIC, -1};

case TypeIndex::UUID:
return {ColumnType::UUID, 16};

case TypeIndex::Enum8:
case TypeIndex::Enum16:
return {ColumnType::VARCHAR, -1};

case TypeIndex::Map:
return {ColumnType::JSONB, -1};

case TypeIndex::Array:
case TypeIndex::Tuple:
return {ColumnType::VARCHAR, -1};

default:
return {ColumnType::VARCHAR, -1};
}
Expand Down
267 changes: 267 additions & 0 deletions src/Core/PostgreSQLProtocol.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <functional>
#include <optional>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBuffer.h>
Expand Down Expand Up @@ -46,6 +47,7 @@ enum class FrontMessageType : Int32
PARSE = 'P',
BIND = 'B',
DESCRIBE = 'D',
EXECUTE = 'E',
SYNC = 'S',
FLUSH = 'H',
CLOSE = 'C',
Expand Down Expand Up @@ -122,16 +124,21 @@ enum class MessageType : Int32
//// Column 'typelem' from 'pg_type' table. NB: not all types are compatible with PostgreSQL's ones
enum class ColumnType : Int32
{
BOOL = 16,
CHAR = 18,
INT8 = 20,
INT2 = 21,
INT4 = 23,
TEXT = 25,
FLOAT4 = 700,
FLOAT8 = 701,
VARCHAR = 1043,
DATE = 1082,
TIMESTAMP = 1114,
TIMESTAMPTZ = 1184,
NUMERIC = 1700,
UUID = 2950,
JSONB = 3802,
};

class ColumnTypeSpec
Expand Down Expand Up @@ -793,6 +800,266 @@ class CommandComplete : BackendMessage
}
};

// Extended query protocol front messages

class Parse : public FrontMessage
{
public:
String statement_name;
String query;
std::vector<Int32> param_oids;

void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
Int32 remaining = sz - 4;

readNullTerminated(statement_name, in);
remaining -= static_cast<Int32>(statement_name.size() + 1);

readNullTerminated(query, in);
remaining -= static_cast<Int32>(query.size() + 1);

Int16 num_params;
readBinaryBigEndian(num_params, in);
remaining -= 2;

param_oids.resize(num_params);
for (Int16 i = 0; i < num_params; ++i)
{
readBinaryBigEndian(param_oids[i], in);
remaining -= 4;
}

if (remaining > 0)
in.ignore(remaining);
}

MessageType getMessageType() const override { return MessageType::PARSE; }
};

class Bind : public FrontMessage
{
public:
String portal_name;
String statement_name;
std::vector<Int16> param_format_codes;
std::vector<std::optional<String>> param_values;
std::vector<Int16> result_format_codes;

void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);

readNullTerminated(portal_name, in);
readNullTerminated(statement_name, in);

Int16 num_format_codes;
readBinaryBigEndian(num_format_codes, in);
param_format_codes.resize(num_format_codes);
for (Int16 i = 0; i < num_format_codes; ++i)
readBinaryBigEndian(param_format_codes[i], in);

Int16 num_params;
readBinaryBigEndian(num_params, in);
param_values.resize(num_params);
for (Int16 i = 0; i < num_params; ++i)
{
Int32 len;
readBinaryBigEndian(len, in);
if (len == -1)
{
param_values[i] = std::nullopt; // NULL parameter
}
else
{
String val;
val.resize(len);
in.readStrict(val.data(), len);
param_values[i] = std::move(val);
}
}

Int16 num_result_codes;
readBinaryBigEndian(num_result_codes, in);
result_format_codes.resize(num_result_codes);
for (Int16 i = 0; i < num_result_codes; ++i)
readBinaryBigEndian(result_format_codes[i], in);
}

MessageType getMessageType() const override { return MessageType::BIND; }
};

class Describe : public FrontMessage
{
public:
char describe_type = 0; // 'S' = statement, 'P' = portal
String name;

void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
in.readStrict(describe_type);
readNullTerminated(name, in);
}

MessageType getMessageType() const override { return MessageType::DESCRIBE; }
};

class Execute : public FrontMessage
{
public:
String portal_name;
Int32 max_rows = 0;

void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
readNullTerminated(portal_name, in);
readBinaryBigEndian(max_rows, in);
}

MessageType getMessageType() const override { return MessageType::EXECUTE; }
};

class CloseMsg : public FrontMessage
{
public:
char close_type = 0; // 'S' or 'P'
String name;

void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
in.readStrict(close_type);
readNullTerminated(name, in);
}

MessageType getMessageType() const override { return MessageType::CLOSE; }
};

class FlushMsg : public FrontMessage
{
public:
void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
}

MessageType getMessageType() const override { return MessageType::FLUSH; }
};

class SyncMsg : public FrontMessage
{
public:
void deserialize(ReadBuffer & in) override
{
Int32 sz;
readBinaryBigEndian(sz, in);
}

MessageType getMessageType() const override { return MessageType::SYNC; }
};

// Extended query protocol backend messages

class ParseComplete : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('1');
writeBinaryBigEndian(size(), out);
}

Int32 size() const override { return 4; }
MessageType getMessageType() const override { return MessageType::PARSE_COMPLETE; }
};

class BindComplete : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('2');
writeBinaryBigEndian(size(), out);
}

Int32 size() const override { return 4; }
MessageType getMessageType() const override { return MessageType::BIND_COMPLETE; }
};

class CloseCompleteMsg : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('3');
writeBinaryBigEndian(size(), out);
}

Int32 size() const override { return 4; }
MessageType getMessageType() const override { return MessageType::CLOSE_COMPLETE; }
};

class NoDataMsg : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('n');
writeBinaryBigEndian(size(), out);
}

Int32 size() const override { return 4; }
MessageType getMessageType() const override { return MessageType::NODATA; }
};

class ParameterDescription : public BackendMessage
{
private:
std::vector<Int32> param_oids;

public:
ParameterDescription() = default;
explicit ParameterDescription(std::vector<Int32> oids) : param_oids(std::move(oids)) {}

void serialize(WriteBuffer & out) const override
{
out.write('t');
writeBinaryBigEndian(size(), out);
writeBinaryBigEndian(static_cast<Int16>(param_oids.size()), out);
for (const auto & oid : param_oids)
writeBinaryBigEndian(oid, out);
}

Int32 size() const override
{
return static_cast<Int32>(4 + 2 + param_oids.size() * 4);
}

MessageType getMessageType() const override { return MessageType::PARAMETER_DESCRIPTION; }
};

class PortalSuspended : public BackendMessage
{
public:
void serialize(WriteBuffer & out) const override
{
out.write('s');
writeBinaryBigEndian(size(), out);
}

Int32 size() const override { return 4; }
MessageType getMessageType() const override { return MessageType::PORTAL_SUSPENDED; }
};

}

namespace PGAuthentication
Expand Down
Loading