Skip to content

Commit 5fe8013

Browse files
author
Andrei Krichinin
committed
Add some log and fix COPY BOTH mode
1 parent e00f486 commit 5fe8013

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

src/dmq.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ dmq_sender_main(Datum main_arg)
809809
switch (conns[conn_id].state)
810810
{
811811
case Idle:
812-
Assert(false);
812+
// Assert(false);
813813
break;
814814

815815
/*
@@ -908,10 +908,23 @@ dmq_sender_main(Datum main_arg)
908908
if (!PQisBusy(conns[conn_id].pgconn))
909909
{
910910
int8 mask_pos = conns[conn_id].mask_pos;
911+
PGresult *result = PQgetResult(conns[conn_id].pgconn);
911912

912-
/*
913-
* XXX check here that dmq_receiver_loop not failed?
914-
*/
913+
if (!result) {
914+
mtm_log(ERROR, "[DMQ] PQgetResult returned NULL");
915+
}
916+
917+
mtm_log(DmqStateIntermediate, "PQgetResult status: %d", PQresultStatus(result));
918+
919+
if (PQresultStatus(result) != PGRES_COPY_BOTH) {
920+
mtm_log(DmqStateFinal, "[DMQ] wrong response from dmq receiver '%s': '%s'; '%s'",
921+
conns[conn_id].receiver_name,
922+
PQresStatus(PQresultStatus(result)),
923+
PQerrorMessage(conns[conn_id].pgconn));
924+
conns[conn_id].state = Idle;
925+
dmq_state->sconn_cnt[conns[conn_id].mask_pos] = DMQSCONN_DEAD;
926+
break;
927+
}
915928

916929
conns[conn_id].state = Active;
917930
DeleteWaitEvent(set, event.pos);

src/multimaster.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252

5353
#include "compat.h"
5454

55+
#include "time.h"
56+
5557
typedef enum
5658
{
5759
MTM_STATE_LOCK_ID
@@ -2040,14 +2042,27 @@ gather(nodemask_t participants,
20402042
gather_hook_t msg_ok, Datum msg_ok_arg,
20412043
int *sendconn_cnt, uint64 gen_num)
20422044
{
2045+
time_t start;
20432046
*msg_count = 0;
2047+
2048+
start = time(NULL);
2049+
2050+
//elog(LOG, "----> gather 1");
20442051
while (participants != 0)
20452052
{
20462053
bool ret;
20472054
int8 sender_mask_pos;
20482055
StringInfoData msg;
20492056
int rc;
20502057
bool wait;
2058+
time_t current;
2059+
2060+
current = time(NULL);
2061+
2062+
if (current - start > 5) {
2063+
elog(LOG, "----> gather timeout");
2064+
// return false;
2065+
}
20512066

20522067
ret = dmq_pop_nb(&sender_mask_pos, &msg, participants, &wait);
20532068
if (ret)
@@ -2102,6 +2117,7 @@ gather(nodemask_t participants,
21022117
}
21032118

21042119
}
2120+
//elog(LOG, "----> gather 2");
21052121
return true;
21062122
}
21072123

src/pglogical_proto.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
175175
MtmLastRelId = InvalidOid;
176176
MtmCurrentXid = txn->xid;
177177
DDLInProgress = false;
178+
elog(LOG, "---> pglogical_write_begin: false");
178179

179180
pq_sendbyte(out, 'B'); /* BEGIN */
180181
pq_sendint(out, hooks_data->cfg->my_node_id, 4);
@@ -253,6 +254,7 @@ pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
253254

254255
case 'D':
255256
DDLInProgress = true;
257+
elog(LOG, "---> pglogical_write_message: true %d: %s",hooks_data->receiver_node_id, message);
256258
mtm_log(ProtoTraceMessage, "Sent tx DDL message to node %d: %s",
257259
hooks_data->receiver_node_id, message);
258260
break;
@@ -266,6 +268,7 @@ pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
266268

267269
case 'E':
268270
DDLInProgress = false;
271+
elog(LOG, "---> pglogical_write_message: false");
269272

270273
/*
271274
* we use End message only as indicator of DDL transaction finish,

0 commit comments

Comments
 (0)