From 2815152fe11500fd6a8a333f4ad414e7e3400e95 Mon Sep 17 00:00:00 2001 From: chenzhikai <895543892@qq.com> Date: Tue, 28 Feb 2023 21:03:23 +0800 Subject: [PATCH] =?UTF-8?q?dblink=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87odbc?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=BC=82=E6=9E=84=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GNUmakefile.in | 1 + build/script/aarch64_opengauss_list | 4 + build/script/opengauss_release_list_mini | 4 + .../opengauss_release_list_ubuntu_single | 4 + build/script/x86_64_opengauss_list | 4 + contrib/dblink/Makefile | 2 +- contrib/dblink/dblink--1.0.sql | 5 + contrib/dblink/dblink--unpackaged--1.0.sql | 1 + contrib/dblink/dblink.cpp | 1448 +++++++++++------ contrib/dblink/dblink.h | 108 ++ src/gausskernel/Makefile | 3 +- 11 files changed, 1093 insertions(+), 491 deletions(-) diff --git a/GNUmakefile.in b/GNUmakefile.in index 9333c9150..77878d460 100644 --- a/GNUmakefile.in +++ b/GNUmakefile.in @@ -83,6 +83,7 @@ install: $(MAKE) install_pldebugger $(MAKE) -C contrib/postgres_fdw $@ $(MAKE) -C contrib/hstore $@ + $(MAKE) -C contrib/dblink $@ $(MAKE) -C $(root_builddir)/privategauss/kernel/extension/packages $@ +@echo "openGauss installation complete." endif diff --git a/build/script/aarch64_opengauss_list b/build/script/aarch64_opengauss_list index dd4f906d1..d01ef91ac 100644 --- a/build/script/aarch64_opengauss_list +++ b/build/script/aarch64_opengauss_list @@ -83,6 +83,9 @@ ./share/postgresql/extension/mot_fdw.control ./share/postgresql/extension/postgres_fdw--1.0.sql ./share/postgresql/extension/postgres_fdw.control +./share/postgresql/extension/dblink--1.0.sql +./share/postgresql/extension/dblink--unpackaged--1.0.sql +./share/postgresql/extension/dblink.control ./share/postgresql/timezone/GB-Eire ./share/postgresql/timezone/Turkey ./share/postgresql/timezone/Kwajalein @@ -757,6 +760,7 @@ ./lib/postgresql/pg_upgrade_support.so ./lib/postgresql/java/pljava.jar ./lib/postgresql/postgres_fdw.so +./lib/postgresql/dblink.so ./lib/postgresql/pgoutput.so ./lib/libpljava.so ./lib/libpq.a diff --git a/build/script/opengauss_release_list_mini b/build/script/opengauss_release_list_mini index bd4b139f8..435f57b52 100644 --- a/build/script/opengauss_release_list_mini +++ b/build/script/opengauss_release_list_mini @@ -79,6 +79,9 @@ ./share/postgresql/extension/gsredistribute--unpackaged--1.0.sql ./share/postgresql/extension/postgres_fdw--1.0.sql ./share/postgresql/extension/postgres_fdw.control +./share/postgresql/extension/dblink--1.0.sql +./share/postgresql/extension/dblink--unpackaged--1.0.sql +./share/postgresql/extension/dblink.control ./share/postgresql/timezone/GB-Eire ./share/postgresql/timezone/Turkey ./share/postgresql/timezone/Kwajalein @@ -752,6 +755,7 @@ ./lib/postgresql/pg_upgrade_support.so ./lib/postgresql/java/pljava.jar ./lib/postgresql/postgres_fdw.so +./lib/postgresql/dblink.so ./lib/libpljava.so ./lib/libpq.a ./lib/libpq.so diff --git a/build/script/opengauss_release_list_ubuntu_single b/build/script/opengauss_release_list_ubuntu_single index 5c169d1b4..62c2733a6 100644 --- a/build/script/opengauss_release_list_ubuntu_single +++ b/build/script/opengauss_release_list_ubuntu_single @@ -82,6 +82,9 @@ ./share/postgresql/extension/gsredistribute--unpackaged--1.0.sql ./share/postgresql/extension/postgres_fdw--1.0.sql ./share/postgresql/extension/postgres_fdw.control +./share/postgresql/extension/dblink--1.0.sql +./share/postgresql/extension/dblink--unpackaged--1.0.sql +./share/postgresql/extension/dblink.control ./share/postgresql/timezone/GB-Eire ./share/postgresql/timezone/Turkey ./share/postgresql/timezone/Kwajalein @@ -755,6 +758,7 @@ ./lib/postgresql/pg_upgrade_support.so ./lib/postgresql/java/pljava.jar ./lib/postgresql/postgres_fdw.so +./lib/postgresql/dblink.so ./lib/libpljava.so ./lib/libpq.a ./lib/libpq.so diff --git a/build/script/x86_64_opengauss_list b/build/script/x86_64_opengauss_list index 167303711..2c7a57df3 100644 --- a/build/script/x86_64_opengauss_list +++ b/build/script/x86_64_opengauss_list @@ -83,6 +83,9 @@ ./share/postgresql/extension/mot_fdw.control ./share/postgresql/extension/postgres_fdw--1.0.sql ./share/postgresql/extension/postgres_fdw.control +./share/postgresql/extension/dblink--1.0.sql +./share/postgresql/extension/dblink--unpackaged--1.0.sql +./share/postgresql/extension/dblink.control ./share/postgresql/timezone/GB-Eire ./share/postgresql/timezone/Turkey ./share/postgresql/timezone/Kwajalein @@ -757,6 +760,7 @@ ./lib/postgresql/pg_upgrade_support.so ./lib/postgresql/java/pljava.jar ./lib/postgresql/postgres_fdw.so +./lib/postgresql/dblink.so ./lib/postgresql/pgoutput.so ./lib/libpljava.so ./lib/libpq.a diff --git a/contrib/dblink/Makefile b/contrib/dblink/Makefile index b6b31d56e..605dfa501 100644 --- a/contrib/dblink/Makefile +++ b/contrib/dblink/Makefile @@ -3,7 +3,7 @@ MODULE_big = dblink OBJS = dblink.o PG_CPPFLAGS = -I$(libpq_srcdir) -SHLIB_LINK = $(libpq) +SHLIB_LINK = $(libpq) -lodbc SHLIB_PREREQS = submake-libpq EXTENSION = dblink diff --git a/contrib/dblink/dblink--1.0.sql b/contrib/dblink/dblink--1.0.sql index 1fec9e394..28067f2c1 100644 --- a/contrib/dblink/dblink--1.0.sql +++ b/contrib/dblink/dblink--1.0.sql @@ -221,3 +221,8 @@ CREATE FUNCTION dblink_get_notify( RETURNS setof record AS 'MODULE_PATHNAME', 'dblink_get_notify' LANGUAGE C STRICT; + +CREATE FUNCTION dblink_get_drivername() +RETURNS text +AS 'MODULE_PATHNAME', 'dblink_get_drivername' +LANGUAGE C STRICT; \ No newline at end of file diff --git a/contrib/dblink/dblink--unpackaged--1.0.sql b/contrib/dblink/dblink--unpackaged--1.0.sql index 29f5bed0c..0314909fc 100644 --- a/contrib/dblink/dblink--unpackaged--1.0.sql +++ b/contrib/dblink/dblink--unpackaged--1.0.sql @@ -44,3 +44,4 @@ ALTER EXTENSION dblink ADD function dblink_cancel_query(text); ALTER EXTENSION dblink ADD function dblink_error_message(text); ALTER EXTENSION dblink ADD function dblink_get_notify(); ALTER EXTENSION dblink ADD function dblink_get_notify(text); +ALTER EXTENSION dblink ADD function dblink_get_drivername(); \ No newline at end of file diff --git a/contrib/dblink/dblink.cpp b/contrib/dblink/dblink.cpp index e903f4ffc..e894eeaea 100644 --- a/contrib/dblink/dblink.cpp +++ b/contrib/dblink/dblink.cpp @@ -32,9 +32,7 @@ */ #include "postgres.h" #include "knl/knl_variable.h" - #include - #include "libpq/libpq-fe.h" #include "funcapi.h" #include "catalog/indexing.h" @@ -56,27 +54,11 @@ #include "utils/rel_gs.h" #include "access/heapam.h" #include "commands/extension.h" - #include "dblink.h" PG_MODULE_MAGIC; -typedef struct remoteConn { - PGconn* conn; /* Hold the remote connection */ - int openCursorCount; /* The number of open cursors */ - bool newXactForCursor; /* Opened a transaction for a cursor */ -} remoteConn; -typedef struct storeInfo { - FunctionCallInfo fcinfo; - Tuplestorestate* tuplestore; - AttInMetadata* attinmeta; - MemoryContext tmpcontext; - char** cstrs; - /* temp storage for results to avoid leaks on exception */ - PGresult* last_res; - PGresult* cur_res; -} storeInfo; /* * Internal declarations @@ -85,7 +67,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static void prepTuplestoreResult(FunctionCallInfo fcinfo); static void materializeResult(FunctionCallInfo fcinfo, PGconn* conn, PGresult* res); static void materializeQueryResult( - FunctionCallInfo fcinfo, PGconn* conn, const char* conname, const char* sql, bool fail); + FunctionCallInfo fcinfo, Linker* linker, const char* conname, const char* sql, bool fail); static PGresult* storeQueryResult(storeInfo* sinfo, PGconn* conn, const char* sql); static void storeRow(storeInfo* sinfo, PGresult* res, bool first); static remoteConn* getConnectionByName(const char* name); @@ -103,7 +85,7 @@ static HeapTuple get_tuple_of_interest(Relation rel, int* pkattnums, int pknumat static Relation get_rel_from_relname(text* relname_text, LOCKMODE lockmode, AclMode aclmode); static char* generate_relation_name(Relation rel); static void dblink_connstr_check(const char* connstr); -static void dblink_security_check(PGconn* conn, remoteConn* rconn); +static void dblink_security_check(PGconn* conn); static void dblink_res_error(const char* conname, PGresult* res, const char* dblink_context_msg, bool fail); static char* get_connect_string(const char* servername); static char* escape_param_str(const char* from); @@ -111,30 +93,22 @@ static void validate_pkattnums( Relation rel, int2vector* pkattnums_arg, int32 pknumatts_arg, int** pkattnums, int* pknumatts); static int applyRemoteGucs(PGconn* conn); static void restoreLocalGucs(int nestlevel); - static uint32 dblink_index; +dblink_session_context* get_session_context(); +static void storeRowInit(storeInfo* sinfo, int nfields, bool first); +static void GetDrivername(char* connstr_or_name, LinkInfo* linfo); + +/* odbc */ +static void ODBCstoreRow(storeInfo* sinfo, char** tupdata, SQLLEN* lenOut, SQLSMALLINT nfields, bool isFirst); +static bool UseODBCLinker(char* connstr); + #define PCONN (get_session_context()->pconn) #define REMOTE_CONN_HASH (get_session_context()->remoteConnHash) - -typedef struct dblink_session_context { - remoteConn* pconn; - HTAB* remoteConnHash; -} dblink_session_context; - -/* - * Following is list that holds multiple remote connections. - * Calling convention of each dblink function changes to accept - * connection name as the first parameter. The connection list is - * much like ecpg e.g. a mapping between a name and a PGconn object. - */ - -typedef struct remoteConnHashEnt { - char name[NAMEDATALEN]; - remoteConn* rconn; -} remoteConnHashEnt; - /* initial number of connection hashes */ #define NUMCONN 16 +#define MAX_BUF_LEN 100000 +#define MAX_DRIVERNAME_LEN 50 +#define DBLINK_NOTIFY_COLS 3 /* general utility */ #define xpfree(var_) \ @@ -153,14 +127,6 @@ typedef struct remoteConnHashEnt { var_c = NULL; \ } while (0) -#define DBLINK_RES_INTERNALERROR(p2) \ - do { \ - msg = pstrdup(PQerrorMessage(conn)); \ - if (res) \ - PQclear(res); \ - elog(ERROR, "%s: %s", p2, msg); \ - } while (0) - #define DBLINK_CONN_NOT_AVAIL \ do { \ if (conname) \ @@ -170,52 +136,49 @@ typedef struct remoteConnHashEnt { ereport(ERROR, (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), errmsg("connection not available"))); \ } while (0) -#define DBLINK_GET_CONN \ - do { \ - char* conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ - rconn = getConnectionByName(conname_or_str); \ - if (rconn) { \ - conn = rconn->conn; \ - conname = conname_or_str; \ - } else { \ - connstr = get_connect_string(conname_or_str); \ - if (connstr == NULL) { \ - connstr = conname_or_str; \ - } \ - dblink_connstr_check(connstr); \ - conn = PQconnectdb(connstr); \ - if (PQstatus(conn) == CONNECTION_BAD) { \ - msg = pstrdup(PQerrorMessage(conn)); \ - PQfinish(conn); \ - ereport(ERROR, \ - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \ - errmsg("could not establish connection"), \ - errdetail_internal("%s", msg))); \ - } \ - dblink_security_check(conn, rconn); \ - PQsetClientEncoding(conn, GetDatabaseEncodingName()); \ - freeconn = true; \ - } \ +#define DBLINK_GET_CONN \ + do { \ + char* conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ + rconn = getConnectionByName(conname_or_str); \ + if (rconn) { \ + linker = rconn->linker; \ + conname = conname_or_str; \ + } else { \ + if(UseODBCLinker(conname_or_str)){ \ + ODBCLinker* olinker = New(SESS_GET_MEM_CXT_GROUP \ + (MEMORY_CONTEXT_COMMUNICATION)) ODBCLinker(conname_or_str); \ + linker = olinker; \ + } else { \ + connstr = get_connect_string(conname_or_str); \ + if (connstr == NULL) { \ + connstr = conname_or_str; \ + } \ + dblink_connstr_check(connstr); \ + PQLinker* plinker = New(SESS_GET_MEM_CXT_GROUP \ + (MEMORY_CONTEXT_COMMUNICATION)) PQLinker(connstr); \ + linker = plinker; \ + freeconn = true; \ + } \ + } \ } while (0) #define DBLINK_GET_NAMED_CONN \ do { \ conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ rconn = getConnectionByName(conname); \ - if (rconn) \ - conn = rconn->conn; \ - else \ + if (rconn) { \ + linker = rconn->linker; \ + } else { \ DBLINK_CONN_NOT_AVAIL; \ + } \ } while (0) #define DBLINK_INIT \ do { \ if (!PCONN) { \ PCONN = (remoteConn*)MemoryContextAlloc( \ - SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \ - PCONN->conn = NULL; \ - PCONN->openCursorCount = 0; \ - PCONN->newXactForCursor = FALSE; \ + SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); \ + PCONN->linker = NULL; \ } \ } while (0) @@ -244,6 +207,582 @@ dblink_session_context* get_session_context() return (dblink_session_context*)u_sess->attr.attr_common.extension_session_vars_array[dblink_index]; } +PQLinker::PQLinker(char* connstr) +{ + this->conn = NULL; + this->res = NULL; + this->openCursorCount = 0; + this->newXactForCursor = false; + + this->conn = PQconnectdb(connstr); + char* msg; + + if (PQstatus(this->conn) == CONNECTION_BAD) { + msg = pstrdup(PQerrorMessage(this->conn)); + PQfinish(this->conn); + + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail_internal("%s", msg))); + } + dblink_security_check(this->conn); + /* attempt to set client encoding to match server encoding */ + PQsetClientEncoding(conn, GetDatabaseEncodingName()); +} + +void PQLinker::finish() +{ + PQfinish(this->conn); +} + +text* PQLinker::exec(char* conname, const char* sql, bool fail) +{ + text* volatile sql_cmd_status = NULL; + + this->res = PQexec(conn, sql); + if (!res || (PQresultStatus(this->res) != PGRES_COMMAND_OK && PQresultStatus(this->res) != PGRES_TUPLES_OK)) { + dblink_res_error(conname, this->res, "could not execute command", fail); + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text("ERROR"); + } else if (PQresultStatus(this->res) == PGRES_COMMAND_OK) { + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text(PQcmdStatus(this->res)); + PQclear(this->res); + } else { + PQclear(this->res); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("statement returning results not allowed"))); + } + return sql_cmd_status; +} + +char* PQLinker::errorMsg() +{ + char* msg = PQerrorMessage(this->conn); + return msg; +} + +int PQLinker::isBusy() +{ + PQconsumeInput(this->conn); + return PQisBusy(this->conn); +} + +char* PQLinker::cancel(PGcancel* cancel) +{ + char* errbuf = NULL; + int res = 0; + cancel = PQgetCancel(this->conn); + res = PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + + if (res == 1) { + errbuf = "OK"; + } + + return errbuf; +} + +int PQLinker::sendQuery(char* sql){ + int retval = 0; + retval = PQsendQuery(this->conn, sql); + if (retval != 1) { + elog(NOTICE, "could not send query: %s", PQerrorMessage(conn)); + } + return 1; +} + + +char* PQLinker::open(char* conname, char* sql, bool fail) +{ + /* If we are not in a transaction, start one */ + if (PQtransactionStatus(this->conn) == PQTRANS_IDLE) { + this->res = PQexec(this->conn, "START TRANSACTION"); + + if (PQresultStatus(this->res) != PGRES_COMMAND_OK) { + char* errmsg = pstrdup(PQerrorMessage(this->conn)); + if (res) { + PQclear(res); + } + elog(ERROR, "begin error: %s", errmsg); + } + + PQclear(this->res); + this->newXactForCursor = TRUE; + + /* + * Since transaction state was IDLE, we force cursor count to + * initially be 0. This is needed as a previous ABORT might have wiped + * out our transaction without maintaining the cursor count for us. + */ + this->openCursorCount = 0; + } + + /* if we started a transaction, increment cursor count */ + if (this->newXactForCursor) + (this->openCursorCount)++; + + this->res = PQexec(this->conn, sql); + if (!res || PQresultStatus(this->res) != PGRES_COMMAND_OK) { + dblink_res_error(conname, this->res, "could not open cursor", fail); + return "ERROR"; + } + + PQclear(this->res); + return "OK"; +} + +char* PQLinker::close(char* conname, char* sql, bool fail) +{ + + this->res = PQexec(conn, sql); + if (!res || PQresultStatus(this->res) != PGRES_COMMAND_OK) { + dblink_res_error(conname, this->res, "could not close cursor", fail); + return "ERROR"; + } + + PQclear(this->res); + + /* if we started a transaction, decrement cursor count */ + if (this->newXactForCursor) { + (this->openCursorCount)--; + + /* if count is zero, commit the transaction */ + if (this->openCursorCount == 0) { + this->newXactForCursor = FALSE; + + this->res = PQexec(this->conn, "COMMIT"); + + if (PQresultStatus(this->res) != PGRES_COMMAND_OK) { + char* errmsg = pstrdup(PQerrorMessage(this->conn)); + if (this->res) { + PQclear(this->res); + } + elog(ERROR, "commit error: %s", errmsg); + } + PQclear(this->res); + } + } + return "OK"; +} + +void PQLinker::getResult(char* conname, FunctionCallInfo fcinfo, char* sql, bool fail) +{ + /* async result retrieval, do it the old way */ + this->res = PQgetResult(this->conn); + + /* NULL means we're all done with the async results */ + if (this->res) { + if (PQresultStatus(this->res) != PGRES_COMMAND_OK && PQresultStatus(this->res) != PGRES_TUPLES_OK) { + dblink_res_error(conname, this->res, "could not execute query", fail); + /* if fail isn't set, we'll return an empty query result */ + } else { + materializeResult(fcinfo, this->conn, this->res); + } + } +} + +void PQLinker::queryResult(ReturnSetInfo* rsinfo, const char* conname, storeInfo* sinfo, const char* sql, bool fail) +{ + PG_TRY(); + { + /* execute query, collecting any tuples into the tuplestore */ + this->res = storeQueryResult(sinfo, this->conn, sql); + + if (!this->res || (PQresultStatus(this->res) != PGRES_COMMAND_OK && PQresultStatus(this->res) != PGRES_TUPLES_OK)) { + /* + * dblink_res_error will clear the passed PGresult, so we need + * this ugly dance to avoid doing so twice during error exit + */ + PGresult* res1 = this->res; + + this->res = NULL; + dblink_res_error(conname, res1, "could not execute query", fail); + /* if fail isn't set, we'll return an empty query result */ + } else if (PQresultStatus(this->res) == PGRES_COMMAND_OK) { + /* + * storeRow didn't get called, so we need to convert the command + * status string to a tuple manually + */ + TupleDesc tupdesc; + AttInMetadata* attinmeta = NULL; + Tuplestorestate* tupstore = NULL; + HeapTuple tuple; + char* values[1]; + MemoryContext oldcontext; + + /* + * need a tuple descriptor representing one TEXT column to return + * the command status string as our result tuple + */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber)1, "status", TEXTOID, -1, 0); + attinmeta = TupleDescGetAttInMetadata(tupdesc); + + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + values[0] = PQcmdStatus(this->res); + + /* build the tuple and put it into the tuplestore. */ + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, tuple); + + PQclear(this->res); + this->res = NULL; + } else { + Assert(PQresultStatus(this->res) == PGRES_TUPLES_OK); + /* storeRow should have created a tuplestore */ + Assert(rsinfo->setResult != NULL); + + PQclear(this->res); + this->res = NULL; + } + PQclear(sinfo->last_res); + sinfo->last_res = NULL; + PQclear(sinfo->cur_res); + sinfo->cur_res = NULL; + } + PG_CATCH(); + { + /* be sure to release any libpq result we collected */ + PQclear(this->res); + PQclear(sinfo->last_res); + PQclear(sinfo->cur_res); + /* and clear out any pending data in libpq */ + while ((this->res = PQgetResult(this->conn)) != NULL) { + PQclear(this->res); + } + PG_RE_THROW(); + } + PG_END_TRY(); +} + +void PQLinker::fetch(char* conname, FunctionCallInfo fcinfo, const char* sql, bool fail, char* curname) +{ + /* + * Try to execute the query. Note that since libpq uses malloc, the + * PGresult will be long-lived even though we are still in a short-lived + * memory context. + */ + this->res = PQexec(this->conn, sql); + if (!this->res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + dblink_res_error(conname, this->res, "could not fetch from cursor", fail); + return; + } else if (PQresultStatus(this->res) == PGRES_COMMAND_OK) { + /* cursor does not exist - closed already or bad name */ + PQclear(this->res); + ereport(ERROR, (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); + } + + materializeResult(fcinfo, this->conn, this->res); +} + +void PQLinker::getNotify(ReturnSetInfo* rsinfo) +{ + TupleDesc tupdesc; + Tuplestorestate* tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + PGnotify* notify = NULL; + + /* create the tuplestore in per-query memory */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false); + TupleDescInitEntry(tupdesc, (AttrNumber)1, "notify_name", TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber)2, "be_pid", INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber)3, "extra", TEXTOID, -1, 0); + tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + PQconsumeInput(this->conn); + while ((notify = PQnotifies(this->conn)) != NULL) { + Datum values[DBLINK_NOTIFY_COLS] = {0}; + bool nulls[DBLINK_NOTIFY_COLS] = {false}; + + if (notify->relname != NULL) { + values[0] = CStringGetTextDatum(notify->relname); + } else { + values[1] = Int32GetDatum(notify->be_pid); + } + + nulls[0] = true; + + if (notify->extra != NULL) { + values[2] = CStringGetTextDatum(notify->extra); + } else { + nulls[2] = true; + } + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + + PQfreemem(notify); + PQconsumeInput(this->conn); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + +ODBCLinker::ODBCLinker(char* connstr_or_name) +{ + SQLINTEGER error = 0; + error = SQLAllocHandle(SQL_HANDLE_ENV,SQL_NULL_HANDLE, &this->envHandle); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Error AllocHandle for Environment"))); + } + + SQLSetEnvAttr(this->envHandle, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0); + + error = SQLAllocHandle(SQL_HANDLE_DBC, this->envHandle, &this->connHandle); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + SQLFreeHandle(SQL_HANDLE_ENV, this->envHandle); + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("Error AllocHandle for Connect"))); + } + + LinkInfo linfo; + int len = strlen(connstr_or_name); + GetDrivername(connstr_or_name, &linfo); + /* atuo commit is the default value */ + error = SQLConnect(this->connHandle, linfo.drivername, SQL_NTS, + linfo.username, SQL_NTS, linfo.password, SQL_NTS); + errno_t rc = memset_s(connstr_or_name, len, 0, len); + securec_check(rc, "\0", "\0"); + + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + SQLFreeHandle(SQL_HANDLE_DBC, this->connHandle); + SQLFreeHandle(SQL_HANDLE_ENV, this->envHandle); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Error SQLConnect"))); + } +} + +void ODBCLinker::finish() +{ + if (this->stmt != NULL) { + SQLFreeHandle(SQL_HANDLE_STMT, this->connHandle); + } + + if (this->connHandle != NULL) { + SQLDisconnect(this->connHandle); + SQLFreeHandle(SQL_HANDLE_DBC, this->connHandle); + } + + if (this->envHandle != NULL) { + SQLFreeHandle(SQL_HANDLE_ENV, this->envHandle); + } + + pfree(this); + return; +} + +text* ODBCLinker::exec(char* conname, const char* sql, bool fail) +{ + SQLINTEGER error = 0; /* ERROR CODE */ + SQLHSTMT stmt = SQL_NULL_HSTMT; + + error = SQLAllocHandle(SQL_HANDLE_STMT, this->connHandle, &stmt); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Error AllocHandle for Statement"))); + } + + SQLSetStmtAttr(stmt, SQL_ATTR_QUERY_TIMEOUT, (SQLPOINTER)0, 0); + + error = SQLExecDirect(stmt, (SQLCHAR*)sql, SQL_NTS); + if (this->stmt != NULL) { + SQLFreeHandle(SQL_HANDLE_STMT, this->stmt); + } + this->stmt = stmt; + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + char* msg = this->errorMsg(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Error exec\n%s", msg))); + } + + return cstring_to_text("OK"); +} + +char* ODBCLinker::errorMsg() +{ + if (this->stmt == NULL) { + return NULL; + } + int msgLen = 100; + char* msg = (char*)palloc(sizeof(char) * msgLen); + SQLGetDiagRec(SQL_HANDLE_STMT, this->stmt, 1, NULL, NULL, (SQLCHAR*)msg, 100 ,NULL); + return msg; +} + +int ODBCLinker::isBusy() +{ + SQLINTEGER ret = 0; + SQLCHAR outstr[1024]; + SQLSMALLINT outstrlen; + ret = SQLGetDiagField(SQL_HANDLE_STMT, this->stmt, 0, SQL_ATTR_ASYNC_ENABLE, outstr, 1024, &outstrlen); + if (outstr[0] == 'y') { + return 1; + } else { + return 0; + } +} + +char* ODBCLinker::cancel(PGcancel* cancel) +{ + SQLFreeStmt(this->stmt, SQL_CLOSE); + return "OK"; +} + +int ODBCLinker::sendQuery(char *sql) +{ + SQLINTEGER error = 0; /* ERROR CODE */ + SQLHSTMT stmt = SQL_NULL_HSTMT; + + /* Specify that the statement is to be executed asynchronously. */ + error = SQLAllocHandle(SQL_HANDLE_STMT, this->connHandle, &stmt); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Error AllocHandle for Statement"))); + } + + SQLSetStmtAttr(stmt, SQL_ATTR_ASYNC_ENABLE, (void*)SQL_ASYNC_ENABLE_ON, 0); + + // When the statement has finished executing, retrieve the results. + error = SQLExecDirect(stmt, (SQLCHAR*)sql, SQL_NTS); + if (this->stmt != NULL) { + SQLFreeHandle(SQL_HANDLE_STMT, this->stmt); + } + this->stmt = stmt; + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + char* msg = this->errorMsg(); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Error exec\n%s", msg))); + } + + return 1; +} + +char* ODBCLinker::open(char* conname, char* sql, bool fail) +{ + this->exec(conname, sql, fail); + return "OK"; +} + +char* ODBCLinker::close(char* conname, char* sql, bool fail) +{ + this->exec(conname, sql, fail); + return "OK"; +} + +void ODBCLinker::getResult(char* conname, FunctionCallInfo fcinfo, char* sql, bool fail) +{ + prepTuplestoreResult(fcinfo); + ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; + storeInfo sinfo; + bool isFirst = true; + SQLINTEGER error = 0; + SQLSMALLINT nfields = 0; + + /* prepTuplestoreResult must have been called previously */ + Assert(rsinfo->returnMode == SFRM_Materialize); + + /* initialize storeInfo to empty */ + (void)memset_s(&sinfo, sizeof(sinfo), 0, sizeof(sinfo)); + sinfo.fcinfo = fcinfo; + + error = SQLNumResultCols(this->stmt, &nfields); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Error get colum number"))); + } + + SQLLEN lenOut[nfields]; + char** tupdata = (char**)palloc(sizeof(char*) * nfields); + + for (int i = 0; i < nfields; i++) { + SQLColAttribute(this->stmt, i + 1, SQL_DESC_DISPLAY_SIZE, NULL, 0, NULL, &(lenOut[i])); + tupdata[i] = (char*)palloc(sizeof(char) * (lenOut[i] + 1)); + SQLBindCol(this->stmt, i + 1, SQL_C_CHAR, (SQLPOINTER)tupdata[i], MAX_BUF_LEN, &(lenOut[i])); + } + + while (SQLFetch(this->stmt) != SQL_NO_DATA) { + CHECK_FOR_INTERRUPTS(); + ODBCstoreRow(&sinfo, tupdata, lenOut, nfields, isFirst); + isFirst = false; + } + return; +} + +void ODBCLinker::queryResult(ReturnSetInfo* rsinfo, const char* conname, storeInfo* sinfo, const char* sql, bool fail) +{ + bool isFirst = true; + SQLINTEGER error = 0; + SQLSMALLINT nfields = 0; + + this->exec(NULL, sql, true); + + /* get number of colum */ + error = SQLNumResultCols(this->stmt, &nfields); + if ((error != SQL_SUCCESS) && (error != SQL_SUCCESS_WITH_INFO)) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Error get colum number"))); + } + + /* init data of tuple */ + SQLLEN lenOut[nfields]; + char** tupdata = (char**)palloc(sizeof(char*) * nfields); + + for (int i = 0; i < nfields; i++) { + SQLColAttribute(stmt, i + 1, SQL_DESC_DISPLAY_SIZE, NULL, 0, NULL, &(lenOut[i])); + tupdata[i] = (char*)palloc(sizeof(char) * (lenOut[i] + 1)); + SQLBindCol(stmt, i + 1, SQL_C_CHAR, (SQLPOINTER)tupdata[i], MAX_BUF_LEN, &(lenOut[i])); + } + + while (SQLFetch(stmt) != SQL_NO_DATA) { + CHECK_FOR_INTERRUPTS(); + ODBCstoreRow(sinfo, tupdata, lenOut, nfields, isFirst); + isFirst = false; + } + return; +} + +void ODBCLinker::fetch(char* conname, FunctionCallInfo fcinfo, const char* sql, bool fail, char* curname) +{ + materializeQueryResult(fcinfo, this, NULL, sql, fail); +} + +void ODBCLinker::getNotify(ReturnSetInfo* rsinfo) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("dblink_get_notify not support by odbc"))); +} + /* * Create a persistent connection to another database */ @@ -252,56 +791,57 @@ Datum dblink_connect(PG_FUNCTION_ARGS) { char* conname_or_str = NULL; char* connstr = NULL; - char* connname = NULL; - char* msg = NULL; - PGconn* conn = NULL; + char* conname = NULL; remoteConn* rconn = NULL; - + DBLINK_INIT; if (PG_NARGS() == 2) { conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1)); - connname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - } else if (PG_NARGS() == 1) + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + } else if (PG_NARGS() == 1) { conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); - - if (connname) + } + + if (conname) { rconn = (remoteConn*)MemoryContextAlloc( SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION), sizeof(remoteConn)); - - /* first check for valid foreign data server */ - connstr = get_connect_string(conname_or_str); - if (connstr == NULL) - connstr = conname_or_str; - - /* check password in connection string if not superuser */ - dblink_connstr_check(connstr); - conn = PQconnectdb(connstr); - - if (PQstatus(conn) == CONNECTION_BAD) { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - if (rconn) - pfree(rconn); - - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not establish connection"), - errdetail_internal("%s", msg))); } - /* check password actually used if not superuser */ - dblink_security_check(conn, rconn); + /* + * determine odbc or libpq + * if we have driver name , we choose odbc + * otherwise we choose libpq + */ + + if (UseODBCLinker(conname_or_str)) { + /* connect by odbc */ + ODBCLinker* olinker = New(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION)) ODBCLinker(conname_or_str); - /* attempt to set client encoding to match server encoding */ - PQsetClientEncoding(conn, GetDatabaseEncodingName()); + if (conname) { + rconn->linker = olinker; + createNewConnection(conname, rconn); + } else { + PCONN->linker = olinker; + } + } else { + /* first check for valid foreign data server */ + connstr = get_connect_string(conname_or_str); + if (connstr == NULL) { + connstr = conname_or_str; + } + PQLinker* plinker = New(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_COMMUNICATION)) PQLinker(connstr); - if (connname) { - rconn->conn = conn; - createNewConnection(connname, rconn); - } else - PCONN->conn = conn; + /* check password in connection string if not superuser */ + dblink_connstr_check(connstr); + if (conname) { + rconn->linker = plinker; + createNewConnection(conname, rconn); + } else { + PCONN->linker = plinker; + } + } PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -313,27 +853,33 @@ Datum dblink_disconnect(PG_FUNCTION_ARGS) { char* conname = NULL; remoteConn* rconn = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; DBLINK_INIT; + /* first determine whether it is an unnamed link */ if (PG_NARGS() == 1) { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; - } else - conn = PCONN->conn; + if (rconn) { + linker = rconn->linker; + } + } else { + linker = PCONN->linker; + } - if (!conn) + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; + } + + linker->finish(); - PQfinish(conn); if (rconn) { deleteConnection(conname); pfree(rconn); - } else - PCONN->conn = NULL; + } else { + PCONN->linker = NULL; + } PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -345,8 +891,7 @@ PG_FUNCTION_INFO_V1(dblink_open); Datum dblink_open(PG_FUNCTION_ARGS) { char* msg = NULL; - PGresult* res = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; char* curname = NULL; char* sql = NULL; char* conname = NULL; @@ -384,40 +929,17 @@ Datum dblink_open(PG_FUNCTION_ARGS) rconn = getConnectionByName(conname); } - if (!rconn || !rconn->conn) + linker = rconn->linker; + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; - else - conn = rconn->conn; - - /* If we are not in a transaction, start one */ - if (PQtransactionStatus(conn) == PQTRANS_IDLE) { - res = PQexec(conn, "START TRANSACTION"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("begin error"); - PQclear(res); - rconn->newXactForCursor = TRUE; - - /* - * Since transaction state was IDLE, we force cursor count to - * initially be 0. This is needed as a previous ABORT might have wiped - * out our transaction without maintaining the cursor count for us. - */ - rconn->openCursorCount = 0; } - /* if we started a transaction, increment cursor count */ - if (rconn->newXactForCursor) - (rconn->openCursorCount)++; - + /* Assemble sql */ appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql); - res = PQexec(conn, buf.data); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - dblink_res_error(conname, res, "could not open cursor", fail); - PG_RETURN_TEXT_P(cstring_to_text("ERROR")); - } - PQclear(res); - PG_RETURN_TEXT_P(cstring_to_text("OK")); + msg = linker->open(conname, buf.data, fail); + + PG_RETURN_TEXT_P(cstring_to_text(msg)); } /* @@ -426,8 +948,7 @@ Datum dblink_open(PG_FUNCTION_ARGS) PG_FUNCTION_INFO_V1(dblink_close); Datum dblink_close(PG_FUNCTION_ARGS) { - PGconn* conn = NULL; - PGresult* res = NULL; + Linker* linker = NULL; char* curname = NULL; char* conname = NULL; StringInfoData buf; @@ -462,38 +983,17 @@ Datum dblink_close(PG_FUNCTION_ARGS) rconn = getConnectionByName(conname); } - if (!rconn || !rconn->conn) + linker = rconn->linker; + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; - else - conn = rconn->conn; + } appendStringInfo(&buf, "CLOSE %s", curname); /* close the cursor */ - res = PQexec(conn, buf.data); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - dblink_res_error(conname, res, "could not close cursor", fail); - PG_RETURN_TEXT_P(cstring_to_text("ERROR")); - } + msg = linker->close(conname, buf.data, fail); - PQclear(res); - - /* if we started a transaction, decrement cursor count */ - if (rconn->newXactForCursor) { - (rconn->openCursorCount)--; - - /* if count is zero, commit the transaction */ - if (rconn->openCursorCount == 0) { - rconn->newXactForCursor = FALSE; - - res = PQexec(conn, "COMMIT"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("commit error"); - PQclear(res); - } - } - - PG_RETURN_TEXT_P(cstring_to_text("OK")); + PG_RETURN_TEXT_P(cstring_to_text(msg)); } /* @@ -502,10 +1002,9 @@ Datum dblink_close(PG_FUNCTION_ARGS) PG_FUNCTION_INFO_V1(dblink_fetch); Datum dblink_fetch(PG_FUNCTION_ARGS) { - PGresult* res = NULL; char* conname = NULL; remoteConn* rconn = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; StringInfoData buf; char* curname = NULL; int howmany = 0; @@ -523,53 +1022,42 @@ Datum dblink_fetch(PG_FUNCTION_ARGS) fail = PG_GETARG_BOOL(3); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; + if (rconn) { + linker = rconn->linker; + } } else if (PG_NARGS() == 3) { /* text,text,int or text,int,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) { curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); - conn = PCONN->conn; + linker = PCONN->linker; } else { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); howmany = PG_GETARG_INT32(2); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; + if (rconn) { + linker = rconn->linker; + } } } else if (PG_NARGS() == 2) { /* text,int */ curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); - conn = PCONN->conn; + linker = PCONN->linker; } - if (!conn) + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; + } initStringInfo(&buf); appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); + + linker->fetch(conname, fcinfo, buf.data, fail, curname); - /* - * Try to execute the query. Note that since libpq uses malloc, the - * PGresult will be long-lived even though we are still in a short-lived - * memory context. - */ - res = PQexec(conn, buf.data); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - dblink_res_error(conname, res, "could not fetch from cursor", fail); - return (Datum)0; - } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { - /* cursor does not exist - closed already or bad name */ - PQclear(res); - ereport(ERROR, (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); - } - - materializeResult(fcinfo, conn, res); return (Datum)0; } @@ -586,7 +1074,7 @@ PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { char* conname = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; char* sql = NULL; remoteConn* rconn = NULL; int retval; @@ -599,9 +1087,7 @@ Datum dblink_send_query(PG_FUNCTION_ARGS) elog(ERROR, "wrong number of arguments"); /* async query send */ - retval = PQsendQuery(conn, sql); - if (retval != 1) - elog(NOTICE, "could not send query: %s", PQerrorMessage(conn)); + retval = linker->sendQuery(sql); PG_RETURN_INT32(retval); } @@ -614,7 +1100,7 @@ Datum dblink_get_result(PG_FUNCTION_ARGS) static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { - PGconn* volatile conn = NULL; + Linker* linker = NULL; volatile bool freeconn = false; prepTuplestoreResult(fcinfo); @@ -623,7 +1109,6 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) PG_TRY(); { - char* msg = NULL; char* connstr = NULL; char* sql = NULL; char* conname = NULL; @@ -641,7 +1126,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } else if (is_two) { /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = PCONN->conn; + linker = PCONN->linker; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { @@ -650,7 +1135,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } } else if (is_one) { /* text */ - conn = PCONN->conn; + linker = PCONN->linker; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else if (!is_async) { /* shouldn't happen */ @@ -669,39 +1154,32 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) elog(ERROR, "wrong number of arguments"); } - if (!conn) + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; + } if (!is_async) { /* synchronous query, use efficient tuple collection method */ - materializeQueryResult(fcinfo, conn, conname, sql, fail); + materializeQueryResult(fcinfo, linker, conname, sql, fail); } else { - /* async result retrieval, do it the old way */ - PGresult* res = PQgetResult(conn); - - /* NULL means we're all done with the async results */ - if (res) { - if (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK) { - dblink_res_error(conname, res, "could not execute query", fail); - /* if fail isn't set, we'll return an empty query result */ - } else { - materializeResult(fcinfo, conn, res); - } - } + linker->getResult(conname, fcinfo, sql, fail); } } PG_CATCH(); { /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); + if (freeconn) { + linker->finish(); + } + PG_RE_THROW(); } PG_END_TRY(); /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); + if (freeconn) { + linker->finish(); + } return (Datum)0; } @@ -873,10 +1351,9 @@ static void materializeResult(FunctionCallInfo fcinfo, PGconn* conn, PGresult* r * inside libpq before it gets transferred to the tuplestore. */ static void materializeQueryResult( - FunctionCallInfo fcinfo, PGconn* conn, const char* conname, const char* sql, bool fail) + FunctionCallInfo fcinfo, Linker* linker, const char* conname, const char* sql, bool fail) { ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; - PGresult* volatile res = NULL; storeInfo sinfo; /* prepTuplestoreResult must have been called previously */ @@ -886,80 +1363,7 @@ static void materializeQueryResult( (void)memset_s(&sinfo, sizeof(sinfo), 0, sizeof(sinfo)); sinfo.fcinfo = fcinfo; - PG_TRY(); - { - /* execute query, collecting any tuples into the tuplestore */ - res = storeQueryResult(&sinfo, conn, sql); - - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - /* - * dblink_res_error will clear the passed PGresult, so we need - * this ugly dance to avoid doing so twice during error exit - */ - PGresult* res1 = res; - - res = NULL; - dblink_res_error(conname, res1, "could not execute query", fail); - /* if fail isn't set, we'll return an empty query result */ - } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { - /* - * storeRow didn't get called, so we need to convert the command - * status string to a tuple manually - */ - TupleDesc tupdesc; - AttInMetadata* attinmeta = NULL; - Tuplestorestate* tupstore = NULL; - HeapTuple tuple; - char* values[1]; - MemoryContext oldcontext; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber)1, "status", TEXTOID, -1, 0); - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); - - values[0] = PQcmdStatus(res); - - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - - PQclear(res); - res = NULL; - } else { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); - /* storeRow should have created a tuplestore */ - Assert(rsinfo->setResult != NULL); - - PQclear(res); - res = NULL; - } - PQclear(sinfo.last_res); - sinfo.last_res = NULL; - PQclear(sinfo.cur_res); - sinfo.cur_res = NULL; - } - PG_CATCH(); - { - /* be sure to release any libpq result we collected */ - PQclear(res); - PQclear(sinfo.last_res); - PQclear(sinfo.cur_res); - /* and clear out any pending data in libpq */ - while ((res = PQgetResult(conn)) != NULL) - PQclear(res); - PG_RE_THROW(); - } - PG_END_TRY(); + linker->queryResult(rsinfo, conname, &sinfo, sql, fail); } /* @@ -1034,82 +1438,7 @@ static void storeRow(storeInfo* sinfo, PGresult* res, bool first) HeapTuple tuple; int i; MemoryContext oldcontext; - - if (first) { - /* Prepare for new result set */ - ReturnSetInfo* rsinfo = (ReturnSetInfo*)sinfo->fcinfo->resultinfo; - TupleDesc tupdesc; - - /* - * It's possible to get more than one result set if the query string - * contained multiple SQL commands. In that case, we follow PQexec's - * traditional behavior of throwing away all but the last result. - */ - if (sinfo->tuplestore) - tuplestore_end(sinfo->tuplestore); - sinfo->tuplestore = NULL; - - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc)) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - - /* check result and tuple descriptor have the same number of columns */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - /* Prepare attinmeta for later data conversions */ - sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); - - /* Create a new, empty tuplestore */ - oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); - sinfo->tuplestore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); - rsinfo->setResult = sinfo->tuplestore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); - - /* Done if empty resultset */ - if (PQntuples(res) == 0) - return; - - /* - * Set up sufficiently-wide string pointers array; this won't change - * in size so it's easy to preallocate. - */ - if (sinfo->cstrs) - pfree(sinfo->cstrs); - sinfo->cstrs = (char**)palloc(nfields * sizeof(char*)); - - /* Create short-lived memory context for data conversions */ - if (!sinfo->tmpcontext) - sinfo->tmpcontext = AllocSetContextCreate(CurrentMemoryContext, - "dblink temporary context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - } - - /* Should have a single-row result if we get here */ - Assert(PQntuples(res) == 1); + storeRowInit(sinfo, nfields, first); /* * Do the following work in a temp context that we reset after each tuple. @@ -1118,14 +1447,21 @@ static void storeRow(storeInfo* sinfo, PGresult* res, bool first) */ oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext); + /* Should have a single-row result if we get here */ + Assert(PQntuples(res) == 1); + + /* Done if empty resultset */ + if (PQntuples(res) == 0) + return; /* * Fill cstrs with null-terminated strings of column values. */ for (i = 0; i < nfields; i++) { - if (PQgetisnull(res, 0, i)) + if (PQgetisnull(res, 0, i)) { sinfo->cstrs[i] = NULL; - else + } else { sinfo->cstrs[i] = PQgetvalue(res, 0, i); + } } /* Convert row to a tuple, and add it to the tuplestore */ @@ -1176,14 +1512,13 @@ PG_FUNCTION_INFO_V1(dblink_is_busy); Datum dblink_is_busy(PG_FUNCTION_ARGS) { char* conname = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; remoteConn* rconn = NULL; DBLINK_INIT; DBLINK_GET_NAMED_CONN; - PQconsumeInput(conn); - PG_RETURN_INT32(PQisBusy(conn)); + PG_RETURN_INT32(linker->isBusy()); } /* @@ -1200,24 +1535,18 @@ Datum dblink_is_busy(PG_FUNCTION_ARGS) PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res = 0; char* conname = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; remoteConn* rconn = NULL; PGcancel* cancel = NULL; - char errbuf[256]; + char* errbuf; DBLINK_INIT; DBLINK_GET_NAMED_CONN; - cancel = PQgetCancel(conn); - res = PQcancel(cancel, errbuf, 256); - PQfreeCancel(cancel); + errbuf = linker->cancel(cancel); - if (res == 1) - PG_RETURN_TEXT_P(cstring_to_text("OK")); - else - PG_RETURN_TEXT_P(cstring_to_text(errbuf)); + PG_RETURN_TEXT_P(cstring_to_text(errbuf)); } /* @@ -1235,13 +1564,18 @@ Datum dblink_error_message(PG_FUNCTION_ARGS) { char* msg = NULL; char* conname = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; remoteConn* rconn = NULL; DBLINK_INIT; DBLINK_GET_NAMED_CONN; - msg = PQerrorMessage(conn); + if (linker == NULL) { + DBLINK_CONN_NOT_AVAIL; + } + + msg = linker->errorMsg(); + if (msg == NULL || msg[0] == '\0') PG_RETURN_TEXT_P(cstring_to_text("OK")); else @@ -1255,15 +1589,13 @@ PG_FUNCTION_INFO_V1(dblink_exec); Datum dblink_exec(PG_FUNCTION_ARGS) { text* volatile sql_cmd_status = NULL; - PGconn* volatile conn = NULL; + Linker* linker = NULL; volatile bool freeconn = false; DBLINK_INIT; PG_TRY(); { - char* msg = NULL; - PGresult* res = NULL; char* connstr = NULL; char* sql = NULL; char* conname = NULL; @@ -1278,7 +1610,7 @@ Datum dblink_exec(PG_FUNCTION_ARGS) } else if (PG_NARGS() == 2) { /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = PCONN->conn; + linker = PCONN->linker; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { @@ -1287,50 +1619,37 @@ Datum dblink_exec(PG_FUNCTION_ARGS) } } else if (PG_NARGS() == 1) { /* must be single text argument */ - conn = PCONN->conn; + linker = PCONN->linker; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - } else + } else { /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); + } - if (!conn) + if (linker == NULL) { DBLINK_CONN_NOT_AVAIL; + } + + sql_cmd_status = linker->exec(conname, sql, fail); - res = PQexec(conn, sql); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - dblink_res_error(conname, res, "could not execute command", fail); - - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text("ERROR"); - } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text(PQcmdStatus(res)); - PQclear(res); - } else { - PQclear(res); - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("statement returning results not allowed"))); + if (freeconn) { + linker->finish(); } } PG_CATCH(); { /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); + if (freeconn) { + linker->finish(); + } PG_RE_THROW(); } PG_END_TRY(); /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); + if (freeconn) { + linker->finish(); + } PG_RETURN_TEXT_P(sql_cmd_status); } @@ -1716,73 +2035,82 @@ Datum dblink_current_query(PG_FUNCTION_ARGS) * connection per default. * */ -#define DBLINK_NOTIFY_COLS 3 PG_FUNCTION_INFO_V1(dblink_get_notify); Datum dblink_get_notify(PG_FUNCTION_ARGS) { char* conname = NULL; - PGconn* conn = NULL; + Linker* linker = NULL; remoteConn* rconn = NULL; - PGnotify* notify = NULL; ReturnSetInfo* rsinfo = (ReturnSetInfo*)fcinfo->resultinfo; - TupleDesc tupdesc; - Tuplestorestate* tupstore = NULL; - MemoryContext per_query_ctx; - MemoryContext oldcontext; prepTuplestoreResult(fcinfo); DBLINK_INIT; if (PG_NARGS() == 1) DBLINK_GET_NAMED_CONN; - else - conn = PCONN->conn; - - /* create the tuplestore in per-query memory */ - per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; - oldcontext = MemoryContextSwitchTo(per_query_ctx); - - tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS, false); - TupleDescInitEntry(tupdesc, (AttrNumber)1, "notify_name", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber)2, "be_pid", INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber)3, "extra", TEXTOID, -1, 0); - - tupstore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - - MemoryContextSwitchTo(oldcontext); - - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) { - Datum values[DBLINK_NOTIFY_COLS] = {0}; - bool nulls[DBLINK_NOTIFY_COLS] = {false}; - - if (notify->relname != NULL) - values[0] = CStringGetTextDatum(notify->relname); - else - nulls[0] = true; - - values[1] = Int32GetDatum(notify->be_pid); - - if (notify->extra != NULL) - values[2] = CStringGetTextDatum(notify->extra); - else - nulls[2] = true; - - tuplestore_putvalues(tupstore, tupdesc, values, nulls); - - PQfreemem(notify); - PQconsumeInput(conn); + else { + linker = PCONN->linker; } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); + linker->getNotify(rsinfo); return (Datum)0; } +PG_FUNCTION_INFO_V1(dblink_get_drivername); +Datum dblink_get_drivername(PG_FUNCTION_ARGS) +{ + if (!superuser()) { + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("Only system admin can use this function.")))); + } + + char* odbcini = getenv("ODBCINI"); + StringInfoData res; + initStringInfo(&res); + char* buf = (char*)palloc(sizeof(char) * MAX_DRIVERNAME_LEN); + /* The character of the cursor */ + char c; + /* Buffer string index */ + int i = 0; + bool first = true; + + FILE* file = fopen(odbcini,"r"); + if (!file) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("can not open file"))); + } + + /* Traversing files,get drivername */ + while ((c = fgetc(file)) != EOF) { + if (c == '[') { + i = 0; + buf[0] = '\0'; + c = fgetc(file); + while (c != ']') { + buf[i] = c; + i++; + if ((c = fgetc(file)) == EOF || i >= MAX_DRIVERNAME_LEN) { + break; + } + } + buf[i] = '\0'; + + if (first) { + appendStringInfo(&res, "%s", buf); + first = false; + } else { + appendStringInfo(&res, ",%s", buf); + } + } + } + fclose(file); + PG_RETURN_TEXT_P(cstring_to_text(res.data)); +} + /************************************************************* * internal functions */ @@ -2277,7 +2605,7 @@ static void createNewConnection(const char* name, remoteConn* rconn) hentry = (remoteConnHashEnt*)hash_search(REMOTE_CONN_HASH, key, HASH_ENTER, &found); if (found) { - PQfinish(rconn->conn); + rconn->linker->finish(); pfree(rconn); ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("duplicate connection name"))); @@ -2304,13 +2632,11 @@ static void deleteConnection(const char* name) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("undefined connection name"))); } -static void dblink_security_check(PGconn* conn, remoteConn* rconn) +static void dblink_security_check(PGconn* conn) { if (!superuser()) { if (!PQconnectionUsedPassword(conn)) { PQfinish(conn); - if (rconn) - pfree(rconn); ereport(ERROR, (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), @@ -2598,3 +2924,147 @@ static void restoreLocalGucs(int nestlevel) if (nestlevel > 0) AtEOXact_GUC(true, nestlevel); } + +/* +* Link by odbc +*/ + +static void GetDrivername(char* connstr_or_name, LinkInfo* linfo) +{ + char* p; + p = strtok(connstr_or_name, " "); + while(p != NULL) { + if(strstr(p,"drivername=")){ + linfo->drivername = (SQLCHAR*)(p + 11); + } else if(strstr(p,"user=")) { + linfo->username = (SQLCHAR*)(p + 5); + } else if(strstr(p,"password=")) { + linfo->password = (SQLCHAR*)(p + 9); + } + p = strtok(NULL," "); + } + + if (linfo->username == NULL || linfo->password == NULL) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Error connect string"))); + } + return; +} + +static bool UseODBCLinker(char* connstr) +{ + if (strstr(connstr, "drivername=") == NULL) { + return false; + } else { + return true; + } +} + +static void ODBCstoreRow(storeInfo* sinfo, char** tupdata, SQLLEN* lenOut, SQLSMALLINT nfields, bool isFirst) +{ + HeapTuple tuple; + int i; + MemoryContext oldcontext; + storeRowInit(sinfo, nfields, isFirst); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext); + + /* + * Fill cstrs with null-terminated strings of column values. + */ + for (i = 0; i < nfields; i++) { + if (lenOut[i] == -1) { + sinfo->cstrs[i] = NULL; + } else { + sinfo->cstrs[i] = tupdata[i]; + } + } + + /* Convert row to a tuple, and add it to the tuplestore */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs); + + tuplestore_puttuple(sinfo->tuplestore, tuple); + + /* Clean up */ + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(sinfo->tmpcontext); +} + +static void storeRowInit(storeInfo* sinfo, int nfields, bool first) +{ + MemoryContext oldcontext; + if (first) { + /* Prepare for new result set */ + ReturnSetInfo* rsinfo = (ReturnSetInfo*)sinfo->fcinfo->resultinfo; + TupleDesc tupdesc; + /* + * It's possible to get more than one result set if the query string + * contained multiple SQL commands. In that case, we follow PQexec's + * traditional behavior of throwing away all but the last result. + */ + if (sinfo->tuplestore) { + tuplestore_end(sinfo->tuplestore); + } + sinfo->tuplestore = NULL; + + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc)) { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + /* check result and tuple descriptor have the same number of columns */ + if (nfields != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* Prepare attinmeta for later data conversions */ + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* Create a new, empty tuplestore */ + oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory); + sinfo->tuplestore = tuplestore_begin_heap(true, false, u_sess->attr.attr_memory.work_mem); + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* + * Set up sufficiently-wide string pointers array; this won't change + * in size so it's easy to preallocate. + */ + if (sinfo->cstrs) + pfree(sinfo->cstrs); + sinfo->cstrs = (char**)palloc(nfields * sizeof(char*)); + + /* Create short-lived memory context for data conversions */ + if (!sinfo->tmpcontext) + sinfo->tmpcontext = AllocSetContextCreate(CurrentMemoryContext, + "dblink temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } +} \ No newline at end of file diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h index 5f4703c27..aa7490772 100644 --- a/contrib/dblink/dblink.h +++ b/contrib/dblink/dblink.h @@ -35,6 +35,113 @@ #define DBLINK_H #include "fmgr.h" +#include "sql.h" +#include "sqlext.h" + + +typedef struct ODBCconn { + SQLHENV envHandle; /* Handle ODBC environment */ + SQLHDBC connHandle; /* Handle connection */ + SQLHSTMT stmt; /* Handle sql */ +} ODBCconn; + +typedef struct storeInfo { + FunctionCallInfo fcinfo; + Tuplestorestate* tuplestore; + AttInMetadata* attinmeta; + MemoryContext tmpcontext; + char** cstrs; + /* temp storage for results to avoid leaks on exception */ + PGresult* last_res; + PGresult* cur_res; +} storeInfo; + +typedef struct LinkInfo { + SQLCHAR* username; /* odbc username */ + SQLCHAR* password; /* odbc password */ + SQLCHAR* drivername; /* odbc driver name */ +} LinkInfo; + +class Linker : public BaseObject { +public: + virtual void finish() = 0; + virtual text* exec(char* conname, const char* sql, bool fail) = 0; + virtual char* errorMsg() = 0; + virtual int isBusy() = 0; + virtual char* cancel(PGcancel* cancel) = 0; + virtual int sendQuery(char* sql) = 0; + virtual char* open(char* conname, char* sql, bool fail) = 0; + virtual char* close(char* conname, char* sql, bool fail) = 0; + virtual void getResult(char* conname, FunctionCallInfo fcinfo, char* sql, bool fail) = 0; + virtual void queryResult(ReturnSetInfo* rsinfo, const char* conname, storeInfo* sinfo, const char* sql, bool fail) = 0; + virtual void fetch(char* conname, FunctionCallInfo fcinfo, const char* sql, bool fail, char* curname) = 0; + virtual void getNotify(ReturnSetInfo* rsinfo) = 0; +}; + +typedef struct remoteConn { + Linker* linker; +} remoteConn; + +typedef struct dblink_session_context { + remoteConn* pconn; + HTAB* remoteConnHash; +} dblink_session_context; + +/* + * Following is list that holds multiple remote connections. + * Calling convention of each dblink function changes to accept + * connection name as the first parameter. The connection list is + * much like ecpg e.g. a mapping between a name and a PGconn object. + */ + +typedef struct remoteConnHashEnt { + char name[NAMEDATALEN]; + remoteConn* rconn; +} remoteConnHashEnt; + +class PQLinker : public Linker { +public: + PGconn* conn; + PGresult* res; + int openCursorCount; /* The number of open cursors */ + bool newXactForCursor; /* Opened a transaction for a cursor */ +public: + PQLinker(char* connstr); + void finish(); + text* exec(char* conname, const char* sql, bool fail); + char* errorMsg(); + int isBusy(); + char* cancel(PGcancel* cancel); + int sendQuery(char* sql); + char* open(char* conname, char* sql, bool fail); + char* close(char* conname, char* sql, bool fail); + void getResult(char* conname, FunctionCallInfo fcinfo, char* sql, bool fail); + void queryResult(ReturnSetInfo* rsinfo, const char* conname, storeInfo* sinfo, const char* sql, bool fail); + void fetch(char* conname, FunctionCallInfo fcinfo, const char* sql, bool fail, char* curname); + void getNotify(ReturnSetInfo* rsinfo); +}; + + +class ODBCLinker : public Linker { +public: + SQLHENV envHandle; /* Handle ODBC environment */ + SQLHDBC connHandle; /* Handle connection */ + SQLHSTMT stmt; /* Handle sql */ +public: + ODBCLinker(char* connstr_or_name); + void finish(); + text* exec(char* conname, const char* sql, bool fail); + char* errorMsg(); + int isBusy(); + char* cancel(PGcancel* cancel); + int sendQuery(char* sql); + char* open(char* conname, char* sql, bool fail); + char* close(char* conname, char* sql, bool fail); + void getResult(char* conname, FunctionCallInfo fcinfo, char* sql, bool fail); + void queryResult(ReturnSetInfo* rsinfo, const char* conname, storeInfo* sinfo, const char* sql, bool fail); + void fetch(char* conname, FunctionCallInfo fcinfo, const char* sql, bool fail, char* curname); + void getNotify(ReturnSetInfo* rsinfo); +}; /* * External declarations @@ -58,6 +165,7 @@ extern "C" Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); extern "C" Datum dblink_build_sql_update(PG_FUNCTION_ARGS); extern "C" Datum dblink_current_query(PG_FUNCTION_ARGS); extern "C" Datum dblink_get_notify(PG_FUNCTION_ARGS); +extern "C" Datum dblink_get_drivername(PG_FUNCTION_ARGS); extern "C" void set_extension_index(uint32 index); extern "C" void init_session_vars(void); diff --git a/src/gausskernel/Makefile b/src/gausskernel/Makefile index 5202a277e..6ee39aa1a 100755 --- a/src/gausskernel/Makefile +++ b/src/gausskernel/Makefile @@ -622,6 +622,7 @@ endif cp $(SECUREDYNAMICLIB_HOME)/libsecurec* '$(DESTDIR)$(libdir)/' ifneq (, $(findstring __USE_NUMA, $(CFLAGS))) cp $(ZSTD_LIB_PATH)/libzstd* '$(DESTDIR)$(libdir)/' + cp $(LIBODBC_LIB_PATH)/libodbc* '$(DESTDIR)$(libdir)/' cp $(NUMA_LIB_PATH)/* '$(DESTDIR)$(libdir)/' endif ifeq ($(enable_mot), yes) @@ -685,7 +686,7 @@ ifeq ($(enable_lite_mode), no) endif endif cp '$(ZSTD_LIB_PATH)'/libzstd.so* '$(DESTDIR)$(libdir)/' - + cp '$(LIBODBC_LIB_PATH)'/libodbc* '$(DESTDIR)$(libdir)/' cp -d '$(LIBCURL_LIB_PATH)'/libcurl.so* '$(DESTDIR)$(libdir)/' endif