Skip to content

[PGPRO-6693] Checking the result of shm_mq_send (according to Svace). #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions pg_query_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader,
ExplainFormat format);

/* Shared memory variables */
shm_toc *toc = NULL;
shm_toc *toc = NULL;
RemoteUserIdResult *counterpart_userid = NULL;
pg_qs_params *params = NULL;
shm_mq *mq = NULL;
pg_qs_params *params = NULL;
shm_mq *mq = NULL;

/*
* Estimate amount of shared memory needed.
Expand Down Expand Up @@ -208,7 +208,7 @@ _PG_init(void)
|| UserIdPollReason == INVALID_PROCSIGNAL)
{
ereport(WARNING, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
errmsg("pg_query_state isn't loaded: insufficient custom ProcSignal slots")));
return;
}

Expand Down Expand Up @@ -435,7 +435,7 @@ deserialize_stack(char *src, int stack_depth)
{
List *result = NIL;
char *curr_ptr = src;
int i;
int i;

for (i = 0; i < stack_depth; i++)
{
Expand Down Expand Up @@ -599,10 +599,10 @@ pg_query_state(PG_FUNCTION_ARGS)
/* print warnings if exist */
if (msg->warnings & TIMINIG_OFF_WARNING)
ereport(WARNING, (errcode(ERRCODE_WARNING),
errmsg("timing statistics disabled")));
errmsg("timing statistics disabled")));
if (msg->warnings & BUFFERS_OFF_WARNING)
ereport(WARNING, (errcode(ERRCODE_WARNING),
errmsg("buffers statistics disabled")));
errmsg("buffers statistics disabled")));

oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

Expand Down Expand Up @@ -864,6 +864,7 @@ SendBgWorkerPids(void)
int i;
shm_mq_handle *mqh;
LOCKTAG tag;
shm_mq_result result;

LockShmem(&tag, PG_QS_SND_KEY);

Expand Down Expand Up @@ -893,10 +894,15 @@ SendBgWorkerPids(void)
}

#if PG_VERSION_NUM < 150000
shm_mq_send(mqh, msg_len, msg, false);
result = shm_mq_send(mqh, msg_len, msg, false);
#else
shm_mq_send(mqh, msg_len, msg, false, true);
result = shm_mq_send(mqh, msg_len, msg, false, true);
#endif

/* Check for failure. */
if(result == SHM_MQ_DETACHED)
elog(WARNING, "could not send message queue to shared-memory queue: receiver has been detached");

UnlockShmem(&tag);
}

Expand Down Expand Up @@ -953,10 +959,10 @@ GetRemoteBackendWorkers(PGPROC *proc)

signal_error:
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid send signal")));
errmsg("invalid send signal")));
mq_error:
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("error in message queue data transmitting")));
errmsg("error in message queue data transmitting")));

return NIL;
}
Expand All @@ -974,12 +980,12 @@ static shm_mq_result
receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap,
int64 timeout, int *rc, bool nowait)
{
shm_mq_result mq_receive_result;
shm_mq_msg *buff;
int offset;
Size *expected;
Size expected_data;
Size len;
shm_mq_result mq_receive_result;
shm_mq_msg *buff;
int offset;
Size *expected;
Size expected_data;
Size len;

/* Get the expected number of bytes in message */
mq_receive_result = shm_mq_receive(mqh, &len, (void **) &expected, nowait);
Expand Down Expand Up @@ -1107,7 +1113,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
mqh = shm_mq_attach(mq, NULL, NULL);
elog(DEBUG1, "Wait response from leader %d", leader->pid);
mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg,
0, NULL, false);
0, NULL, false);
if (mq_receive_result != SHM_MQ_SUCCESS)
goto mq_error;
if (msg->reqid != reqid)
Expand All @@ -1126,7 +1132,7 @@ GetRemoteBackendQueryStates(PGPROC *leader,
*/
foreach(iter, alive_procs)
{
PGPROC *proc = (PGPROC *) lfirst(iter);
PGPROC *proc = (PGPROC *) lfirst(iter);

/* prepare message queue to transfer data */
elog(DEBUG1, "Wait response from worker %d", proc->pid);
Expand Down Expand Up @@ -1166,15 +1172,15 @@ GetRemoteBackendQueryStates(PGPROC *leader,

signal_error:
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid send signal")));
errmsg("invalid send signal")));
mq_error:
#if PG_VERSION_NUM < 100000
shm_mq_detach(mq);
#else
shm_mq_detach(mqh);
#endif
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("error in message queue data transmitting")));
errmsg("error in message queue data transmitting")));

return NIL;
}
Expand Down
12 changes: 6 additions & 6 deletions signal_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data)
void
SendQueryState(void)
{
shm_mq_handle *mqh;
instr_time start_time;
instr_time cur_time;
int64 delay = MAX_SND_TIMEOUT;
int reqid = params->reqid;
LOCKTAG tag;
shm_mq_handle *mqh;
instr_time start_time;
instr_time cur_time;
int64 delay = MAX_SND_TIMEOUT;
int reqid = params->reqid;
LOCKTAG tag;

INSTR_TIME_SET_CURRENT(start_time);

Expand Down