mirror of https://github.com/ByConity/ByConity
Removed very old protocol compatibility features [#CLICKHOUSE-2].
This commit is contained in:
parent
eb88391655
commit
16d6c4f9e7
|
@ -351,8 +351,8 @@ void Connection::sendQuery(
|
||||||
block_in.reset();
|
block_in.reset();
|
||||||
block_out.reset();
|
block_out.reset();
|
||||||
|
|
||||||
/// If server version is new enough, send empty block which meand end of data.
|
/// Send empty block which means end of data.
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES && !with_pending_data)
|
if (!with_pending_data)
|
||||||
{
|
{
|
||||||
sendData(Block());
|
sendData(Block());
|
||||||
out->next();
|
out->next();
|
||||||
|
@ -384,9 +384,7 @@ void Connection::sendData(const Block & block, const String & name)
|
||||||
}
|
}
|
||||||
|
|
||||||
writeVarUInt(Protocol::Client::Data, *out);
|
writeVarUInt(Protocol::Client::Data, *out);
|
||||||
|
writeStringBinary(name, *out);
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
|
||||||
writeStringBinary(name, *out);
|
|
||||||
|
|
||||||
size_t prev_bytes = out->count();
|
size_t prev_bytes = out->count();
|
||||||
|
|
||||||
|
@ -405,9 +403,7 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
|
||||||
/// NOTE 'Throttler' is not used in this method (could use, but it's not important right now).
|
/// NOTE 'Throttler' is not used in this method (could use, but it's not important right now).
|
||||||
|
|
||||||
writeVarUInt(Protocol::Client::Data, *out);
|
writeVarUInt(Protocol::Client::Data, *out);
|
||||||
|
writeStringBinary(name, *out);
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
|
||||||
writeStringBinary(name, *out);
|
|
||||||
|
|
||||||
if (0 == size)
|
if (0 == size)
|
||||||
copyData(input, *out);
|
copyData(input, *out);
|
||||||
|
@ -419,13 +415,6 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
|
||||||
|
|
||||||
void Connection::sendExternalTablesData(ExternalTablesData & data)
|
void Connection::sendExternalTablesData(ExternalTablesData & data)
|
||||||
{
|
{
|
||||||
/// If working with older server, don't send any info.
|
|
||||||
if (server_revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
|
||||||
{
|
|
||||||
out->next();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (data.empty())
|
if (data.empty())
|
||||||
{
|
{
|
||||||
/// Send empty block, which means end of data transfer.
|
/// Send empty block, which means end of data transfer.
|
||||||
|
@ -552,9 +541,7 @@ Block Connection::receiveData()
|
||||||
initBlockInput();
|
initBlockInput();
|
||||||
|
|
||||||
String external_table_name;
|
String external_table_name;
|
||||||
|
readStringBinary(external_table_name, *in);
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
|
||||||
readStringBinary(external_table_name, *in);
|
|
||||||
|
|
||||||
size_t prev_bytes = in->count();
|
size_t prev_bytes = in->count();
|
||||||
|
|
||||||
|
|
|
@ -61,9 +61,6 @@
|
||||||
/// Name suffix for the column containing the array offsets.
|
/// Name suffix for the column containing the array offsets.
|
||||||
#define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size"
|
#define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size"
|
||||||
|
|
||||||
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
|
|
||||||
#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
|
|
||||||
#define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
|
|
||||||
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
|
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
|
||||||
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
|
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
|
||||||
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
|
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
|
||||||
|
|
|
@ -17,9 +17,7 @@ void Progress::read(ReadBuffer & in, UInt64 server_revision)
|
||||||
|
|
||||||
readVarUInt(new_rows, in);
|
readVarUInt(new_rows, in);
|
||||||
readVarUInt(new_bytes, in);
|
readVarUInt(new_bytes, in);
|
||||||
|
readVarUInt(new_total_rows, in);
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS)
|
|
||||||
readVarUInt(new_total_rows, in);
|
|
||||||
|
|
||||||
rows = new_rows;
|
rows = new_rows;
|
||||||
bytes = new_bytes;
|
bytes = new_bytes;
|
||||||
|
@ -31,9 +29,7 @@ void Progress::write(WriteBuffer & out, UInt64 client_revision) const
|
||||||
{
|
{
|
||||||
writeVarUInt(rows.load(), out);
|
writeVarUInt(rows.load(), out);
|
||||||
writeVarUInt(bytes.load(), out);
|
writeVarUInt(bytes.load(), out);
|
||||||
|
writeVarUInt(total_rows.load(), out);
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS)
|
|
||||||
writeVarUInt(total_rows.load(), out);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -106,7 +106,7 @@ Block NativeBlockInputStream::readImpl()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Additional information about the block.
|
/// Additional information about the block.
|
||||||
if (server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
|
if (server_revision > 0)
|
||||||
res.info.read(istr);
|
res.info.read(istr);
|
||||||
|
|
||||||
/// Dimensions
|
/// Dimensions
|
||||||
|
|
|
@ -60,8 +60,7 @@ struct IndexForNativeFormat
|
||||||
class NativeBlockInputStream : public IProfilingBlockInputStream
|
class NativeBlockInputStream : public IProfilingBlockInputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/** If a non-zero server_revision is specified, additional block information may be expected and read,
|
/** If a non-zero server_revision is specified, additional block information may be expected and read.
|
||||||
* depending on what is supported for the specified revision.
|
|
||||||
*
|
*
|
||||||
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
|
* `index` is not required parameter. If set, only parts of columns specified in the index will be read.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -116,7 +116,7 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
|
||||||
void NativeBlockOutputStream::write(const Block & block)
|
void NativeBlockOutputStream::write(const Block & block)
|
||||||
{
|
{
|
||||||
/// Additional information about the block.
|
/// Additional information about the block.
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
|
if (client_revision > 0)
|
||||||
block.info.write(ostr);
|
block.info.write(ostr);
|
||||||
|
|
||||||
/// Dimensions
|
/// Dimensions
|
||||||
|
|
|
@ -20,8 +20,7 @@ class CompressedWriteBuffer;
|
||||||
class NativeBlockOutputStream : public IBlockOutputStream
|
class NativeBlockOutputStream : public IBlockOutputStream
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/** If non-zero client_revision is specified, additional block information can be written,
|
/** If non-zero client_revision is specified, additional block information can be written.
|
||||||
* depending on what is supported for the specified revision.
|
|
||||||
*/
|
*/
|
||||||
NativeBlockOutputStream(
|
NativeBlockOutputStream(
|
||||||
WriteBuffer & ostr_, UInt64 client_revision_ = 0,
|
WriteBuffer & ostr_, UInt64 client_revision_ = 0,
|
||||||
|
|
|
@ -144,8 +144,7 @@ void TCPHandler::runImpl()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
/// Get blocks of temporary tables
|
/// Get blocks of temporary tables
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
readData(global_settings);
|
||||||
readData(global_settings);
|
|
||||||
|
|
||||||
/// Reset the input stream, as we received an empty block while receiving external table data.
|
/// Reset the input stream, as we received an empty block while receiving external table data.
|
||||||
/// So, the stream has been marked as cancelled and we can't read from it anymore.
|
/// So, the stream has been marked as cancelled and we can't read from it anymore.
|
||||||
|
@ -416,8 +415,7 @@ void TCPHandler::sendTotals()
|
||||||
initBlockOutput();
|
initBlockOutput();
|
||||||
|
|
||||||
writeVarUInt(Protocol::Server::Totals, *out);
|
writeVarUInt(Protocol::Server::Totals, *out);
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
writeStringBinary("", *out);
|
||||||
writeStringBinary("", *out);
|
|
||||||
|
|
||||||
state.block_out->write(totals);
|
state.block_out->write(totals);
|
||||||
state.maybe_compressed_out->next();
|
state.maybe_compressed_out->next();
|
||||||
|
@ -438,8 +436,7 @@ void TCPHandler::sendExtremes()
|
||||||
initBlockOutput();
|
initBlockOutput();
|
||||||
|
|
||||||
writeVarUInt(Protocol::Server::Extremes, *out);
|
writeVarUInt(Protocol::Server::Extremes, *out);
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
writeStringBinary("", *out);
|
||||||
writeStringBinary("", *out);
|
|
||||||
|
|
||||||
state.block_out->write(extremes);
|
state.block_out->write(extremes);
|
||||||
state.maybe_compressed_out->next();
|
state.maybe_compressed_out->next();
|
||||||
|
@ -612,8 +609,7 @@ bool TCPHandler::receiveData()
|
||||||
|
|
||||||
/// The name of the temporary table for writing data, default to empty string
|
/// The name of the temporary table for writing data, default to empty string
|
||||||
String external_table_name;
|
String external_table_name;
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
readStringBinary(external_table_name, *in);
|
||||||
readStringBinary(external_table_name, *in);
|
|
||||||
|
|
||||||
/// Read one block from the network and write it down
|
/// Read one block from the network and write it down
|
||||||
Block block = state.block_in->read();
|
Block block = state.block_in->read();
|
||||||
|
@ -717,8 +713,7 @@ void TCPHandler::sendData(Block & block)
|
||||||
initBlockOutput();
|
initBlockOutput();
|
||||||
|
|
||||||
writeVarUInt(Protocol::Server::Data, *out);
|
writeVarUInt(Protocol::Server::Data, *out);
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES)
|
writeStringBinary("", *out);
|
||||||
writeStringBinary("", *out);
|
|
||||||
|
|
||||||
state.block_out->write(block);
|
state.block_out->write(block);
|
||||||
state.maybe_compressed_out->next();
|
state.maybe_compressed_out->next();
|
||||||
|
|
Loading…
Reference in New Issue