Skip to content

Commit fa17b17

Browse files
committed
Support ANALYZE for VOPS FDW
1 parent 6e9542c commit fa17b17

File tree

3 files changed

+166
-22
lines changed

3 files changed

+166
-22
lines changed

deparse.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ typedef struct deparse_expr_cxt
8585
*/
8686
static bool foreign_expr_walker(Node *node,
8787
foreign_glob_cxt *glob_cxt);
88-
static char *deparse_type_name(Oid type_oid, int32 typemod);
8988

9089
/*
9190
* Functions to construct string representation of a node tree.
@@ -107,7 +106,6 @@ static void deparseReturningList(StringInfo buf, PlannerInfo *root,
107106
List **retrieved_attrs);
108107
static void deparseColumnRef(StringInfo buf, int varno, int varattno,
109108
PlannerInfo *root, bool qualify_col);
110-
static void deparseRelation(StringInfo buf, Relation rel);
111109
static void deparseExpr(Expr *expr, deparse_expr_cxt *context);
112110
static void deparseVar(Var *node, deparse_expr_cxt *context);
113111
static void deparseConst(Const *node, deparse_expr_cxt *context, int showtype);
@@ -388,7 +386,7 @@ foreign_expr_walker(Node *node,
388386
* type names that are not in pg_catalog. We assume here that built-in types
389387
* are all in pg_catalog and need not be qualified; otherwise, qualify.
390388
*/
391-
static char *
389+
char *
392390
deparse_type_name(Oid type_oid, int32 typemod)
393391
{
394392
if (type_oid < FirstBootstrapObjectId)
@@ -1381,7 +1379,7 @@ deparseColumnRef(StringInfo buf, int varno, int varattno, PlannerInfo *root,
13811379
* Use value of table_name FDW option (if any) instead of relation's name.
13821380
* Similarly, schema_name FDW option overrides schema name.
13831381
*/
1384-
static void
1382+
void
13851383
deparseRelation(StringInfo buf, Relation rel)
13861384
{
13871385
ForeignTable *table;

vops_fdw.c

Lines changed: 162 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
127127
RelOptInfo *output_rel);
128128
static bool postgresIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
129129
RangeTblEntry *rte);
130+
static bool postgresAnalyzeForeignTable(Relation relation,
131+
AcquireSampleRowsFunc *func,
132+
BlockNumber *totalpages);
130133
/*
131134
* Helper functions
132135
*/
@@ -141,6 +144,10 @@ static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
141144
static void add_foreign_grouping_paths(PlannerInfo *root,
142145
RelOptInfo *input_rel,
143146
RelOptInfo *grouped_rel);
147+
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
148+
HeapTuple *rows, int targrows,
149+
double *totalrows,
150+
double *totaldeadrows);
144151

145152

146153
/*
@@ -162,6 +169,9 @@ vops_fdw_handler(PG_FUNCTION_ARGS)
162169
routine->EndForeignScan = postgresEndForeignScan;
163170
routine->IsForeignScanParallelSafe = postgresIsForeignScanParallelSafe;
164171

172+
/* Support functions for ANALYZE */
173+
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
174+
165175
/* Support functions for EXPLAIN */
166176
routine->ExplainForeignScan = postgresExplainForeignScan;
167177

@@ -213,6 +223,32 @@ vops_fdw_validator(PG_FUNCTION_ARGS)
213223
PG_RETURN_VOID();
214224
}
215225

226+
227+
static Relation open_vops_relation(ForeignTable* table)
228+
{
229+
ListCell *lc;
230+
char *nspname = NULL;
231+
char *relname = NULL;
232+
RangeVar *rv;
233+
234+
235+
foreach(lc, table->options)
236+
{
237+
DefElem *def = (DefElem *) lfirst(lc);
238+
239+
if (strcmp(def->defname, "schema_name") == 0)
240+
nspname = defGetString(def);
241+
else if (strcmp(def->defname, "table_name") == 0)
242+
relname = defGetString(def);
243+
}
244+
Assert(relname != NULL);
245+
if (nspname == NULL) {
246+
nspname = get_namespace_name(get_rel_namespace(table->relid));
247+
}
248+
rv = makeRangeVar(nspname, relname, -1);
249+
return heap_openrv_extended(rv, RowExclusiveLock, false);
250+
}
251+
216252
/*
217253
* postgresGetForeignRelSize
218254
* Estimate # of rows and width of the result of the scan
@@ -225,13 +261,12 @@ postgresGetForeignRelSize(PlannerInfo *root,
225261
RelOptInfo *baserel,
226262
Oid foreigntableid)
227263
{
228-
PgFdwRelationInfo *fpinfo;
229264
ListCell *lc;
265+
PgFdwRelationInfo *fpinfo;
230266
RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
231267
char *nspname = NULL;
232268
char *relname = NULL;
233269
char *refname = NULL;
234-
RangeVar *rv;
235270
Relation fdw_rel;
236271
Relation vops_rel;
237272
TupleDesc fdw_tupdesc;
@@ -251,29 +286,16 @@ postgresGetForeignRelSize(PlannerInfo *root,
251286
/* Look up foreign-table catalog info. */
252287
fpinfo->table = GetForeignTable(foreigntableid);
253288
fpinfo->server = GetForeignServer(fpinfo->table->serverid);
254-
255-
foreach(lc, fpinfo->table->options)
256-
{
257-
DefElem *def = (DefElem *) lfirst(lc);
258-
259-
if (strcmp(def->defname, "schema_name") == 0)
260-
nspname = defGetString(def);
261-
else if (strcmp(def->defname, "table_name") == 0)
262-
relname = defGetString(def);
263-
}
264-
Assert(relname != NULL);
265-
if (nspname == NULL) {
266-
nspname = get_namespace_name(get_rel_namespace(foreigntableid));
267-
}
289+
290+
Assert(foreigntableid == fpinfo->table->relid);
268291

269292
/*
270293
* Build mappnig with VOPS table
271294
*/
272295
fpinfo->tile_attrs = NULL;
273296
fpinfo->vops_attrs = NULL;
274297

275-
rv = makeRangeVar(nspname, relname, -1);
276-
vops_rel = heap_openrv_extended(rv, RowExclusiveLock, false);
298+
vops_rel = open_vops_relation(fpinfo->table);
277299
fdw_rel = heap_open(rte->relid, NoLock);
278300

279301
estimate_rel_size(vops_rel, baserel->attr_widths,
@@ -1335,3 +1357,125 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
13351357
/* Add generated path into grouped_rel by add_path(). */
13361358
add_path(grouped_rel, (Path *) grouppath);
13371359
}
1360+
1361+
/*
1362+
* postgresAnalyzeForeignTable
1363+
* Test whether analyzing this foreign table is supported
1364+
*/
1365+
static bool
1366+
postgresAnalyzeForeignTable(Relation relation,
1367+
AcquireSampleRowsFunc *func,
1368+
BlockNumber *totalpages)
1369+
{
1370+
ForeignTable *table;
1371+
Relation vops_rel;
1372+
double tuples;
1373+
double allvisfrac;
1374+
1375+
/* Return the row-analysis function pointer */
1376+
*func = postgresAcquireSampleRowsFunc;
1377+
1378+
table = GetForeignTable(RelationGetRelid(relation));
1379+
vops_rel = open_vops_relation(table);
1380+
estimate_rel_size(vops_rel, NULL, totalpages, &tuples, &allvisfrac);
1381+
heap_close(vops_rel, RowExclusiveLock);
1382+
1383+
return true;
1384+
}
1385+
1386+
/*
1387+
* Acquire a random sample of rows from VOPS table
1388+
*/
1389+
static int
1390+
postgresAcquireSampleRowsFunc(Relation relation, int elevel,
1391+
HeapTuple *rows, int targrows,
1392+
double *totalrows,
1393+
double *totaldeadrows)
1394+
{
1395+
TupleDesc tupdesc = RelationGetDescr(relation);
1396+
StringInfoData sql;
1397+
StringInfoData record;
1398+
double samplerows;
1399+
Portal portal;
1400+
int i;
1401+
bool first = true;
1402+
char*colname;
1403+
1404+
SPI_connect();
1405+
1406+
initStringInfo(&sql);
1407+
initStringInfo(&record);
1408+
appendStringInfoString(&sql, "SELECT ");
1409+
1410+
for (i = 0; i < tupdesc->natts; i++)
1411+
{
1412+
/* Ignore dropped columns. */
1413+
if (tupdesc->attrs[i]->attisdropped)
1414+
continue;
1415+
if (!first) {
1416+
appendStringInfoString(&record, ", ");
1417+
appendStringInfoString(&sql, ", ");
1418+
}
1419+
first = false;
1420+
1421+
/* Use attribute name or column_name option. */
1422+
colname = NameStr(tupdesc->attrs[i]->attname);
1423+
appendStringInfoString(&sql, "r.");
1424+
appendStringInfoString(&sql, quote_identifier(colname));
1425+
1426+
appendStringInfo(&record, "%s %s", quote_identifier(colname), deparse_type_name(tupdesc->attrs[i]->atttypid, tupdesc->attrs[i]->atttypmod));
1427+
}
1428+
appendStringInfoString(&sql, " FROM ");
1429+
deparseRelation(&sql, relation);
1430+
appendStringInfo(&sql, " t,unnest(t) r(%s)", record.data);
1431+
1432+
portal = SPI_cursor_open_with_args(NULL, sql.data, 0, NULL, NULL, NULL, true, 0);
1433+
1434+
/* First targrows rows are always included into the sample */
1435+
SPI_cursor_fetch(portal, true, targrows);
1436+
for (i = 0; i < SPI_processed; i++)
1437+
{
1438+
rows[i] = SPI_copytuple(SPI_tuptable->vals[i]);
1439+
}
1440+
samplerows = i;
1441+
1442+
if (i == targrows)
1443+
{
1444+
ReservoirStateData rstate; /* state for reservoir sampling */
1445+
double rowstoskip = -1; /* -1 means not set yet */
1446+
1447+
reservoir_init_selection_state(&rstate, targrows);
1448+
1449+
while (true)
1450+
{
1451+
SPI_freetuptable(SPI_tuptable);
1452+
SPI_cursor_fetch(portal, true, 1);
1453+
if (!SPI_processed) {
1454+
break;
1455+
}
1456+
samplerows += 1;
1457+
if (rowstoskip < 0) {
1458+
rowstoskip = reservoir_get_next_S(&rstate, samplerows, targrows);
1459+
}
1460+
if (rowstoskip <= 0)
1461+
{
1462+
/* Choose a random reservoir element to replace. */
1463+
int pos = (int) (targrows * sampler_random_fract(rstate.randstate));
1464+
Assert(pos >= 0 && pos < targrows);
1465+
SPI_freetuple(rows[pos]);
1466+
rows[pos] = SPI_copytuple(SPI_tuptable->vals[0]);
1467+
}
1468+
rowstoskip -= 1;
1469+
}
1470+
}
1471+
SPI_cursor_close(portal);
1472+
SPI_finish();
1473+
1474+
/* We assume that we have no dead tuple. */
1475+
*totaldeadrows = 0.0;
1476+
1477+
/* We've retrieved all living tuples from foreign server. */
1478+
*totalrows = samplerows;
1479+
1480+
return i;
1481+
}

vops_fdw.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,5 +146,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root,
146146
RelOptInfo *foreignrel, List *tlist,
147147
List *remote_conds, List *pathkeys,
148148
List **retrieved_attrs, List **params_list);
149+
extern char *deparse_type_name(Oid type_oid, int32 typemod);
150+
extern void deparseRelation(StringInfo buf, Relation rel);
149151

150152
#endif /* POSTGRES_FDW_H */

0 commit comments

Comments
 (0)