65
65
#include "funcapi.h"
66
66
#include "libpq/libpq.h"
67
67
#include "libpq/pqformat.h"
68
+ #include "libpq/protocol.h"
68
69
#include "miscadmin.h"
69
70
#include "nodes/replnodes.h"
70
71
#include "pgstat.h"
@@ -735,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
735
736
736
737
switch (mtype )
737
738
{
738
- case 'd' : /* CopyData */
739
+ case PqMsg_CopyData :
739
740
maxmsglen = PQ_LARGE_MESSAGE_LIMIT ;
740
741
break ;
741
- case 'c' : /* CopyDone */
742
- case 'f' : /* CopyFail */
743
- case 'H' : /* Flush */
744
- case 'S' : /* Sync */
742
+ case PqMsg_CopyDone :
743
+ case PqMsg_CopyFail :
744
+ case PqMsg_Flush :
745
+ case PqMsg_Sync :
745
746
maxmsglen = PQ_SMALL_MESSAGE_LIMIT ;
746
747
break ;
747
748
default :
@@ -763,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
763
764
/* Process the message */
764
765
switch (mtype )
765
766
{
766
- case 'd' : /* CopyData */
767
+ case PqMsg_CopyData :
767
768
AppendIncrementalManifestData (ib , buf -> data , buf -> len );
768
769
return true;
769
770
770
- case 'c' : /* CopyDone */
771
+ case PqMsg_CopyDone :
771
772
return false;
772
773
773
- case 'H' : /* Sync */
774
- case 'S' : /* Flush */
774
+ case PqMsg_Sync :
775
+ case PqMsg_Flush :
775
776
/* Ignore these while in CopyOut mode as we do elsewhere. */
776
777
return true;
777
778
778
- case 'f' :
779
+ case PqMsg_CopyFail :
779
780
ereport (ERROR ,
780
781
(errcode (ERRCODE_QUERY_CANCELED ),
781
782
errmsg ("COPY from stdin failed: %s" ,
@@ -1569,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1569
1570
tmpbuf .data , sizeof (int64 ));
1570
1571
1571
1572
/* output previously gathered data in a CopyData packet */
1572
- pq_putmessage_noblock ('d' , ctx -> out -> data , ctx -> out -> len );
1573
+ pq_putmessage_noblock (PqMsg_CopyData , ctx -> out -> data , ctx -> out -> len );
1573
1574
1574
1575
CHECK_FOR_INTERRUPTS ();
1575
1576
@@ -2305,7 +2306,7 @@ ProcessRepliesIfAny(void)
2305
2306
case PqMsg_CopyDone :
2306
2307
if (!streamingDoneSending )
2307
2308
{
2308
- pq_putmessage_noblock ('c' , NULL , 0 );
2309
+ pq_putmessage_noblock (PqMsg_CopyDone , NULL , 0 );
2309
2310
streamingDoneSending = true;
2310
2311
}
2311
2312
@@ -2758,7 +2759,7 @@ ProcessStandbyPSRequestMessage(void)
2758
2759
pq_sendint64 (& output_message , GetCurrentTimestamp ());
2759
2760
2760
2761
/* ... and send it wrapped in CopyData */
2761
- pq_putmessage_noblock ('d' , output_message .data , output_message .len );
2762
+ pq_putmessage_noblock (PqMsg_CopyData , output_message .data , output_message .len );
2762
2763
}
2763
2764
2764
2765
/*
@@ -3306,7 +3307,7 @@ XLogSendPhysical(void)
3306
3307
wal_segment_close (xlogreader );
3307
3308
3308
3309
/* Send CopyDone */
3309
- pq_putmessage_noblock ('c' , NULL , 0 );
3310
+ pq_putmessage_noblock (PqMsg_CopyDone , NULL , 0 );
3310
3311
streamingDoneSending = true;
3311
3312
3312
3313
WalSndCaughtUp = true;
@@ -3434,7 +3435,7 @@ XLogSendPhysical(void)
3434
3435
memcpy (& output_message .data [1 + sizeof (int64 ) + sizeof (int64 )],
3435
3436
tmpbuf .data , sizeof (int64 ));
3436
3437
3437
- pq_putmessage_noblock ('d' , output_message .data , output_message .len );
3438
+ pq_putmessage_noblock (PqMsg_CopyData , output_message .data , output_message .len );
3438
3439
3439
3440
sentPtr = endptr ;
3440
3441
@@ -4140,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
4140
4141
pq_sendbyte (& output_message , requestReply ? 1 : 0 );
4141
4142
4142
4143
/* ... and send it wrapped in CopyData */
4143
- pq_putmessage_noblock ('d' , output_message .data , output_message .len );
4144
+ pq_putmessage_noblock (PqMsg_CopyData , output_message .data , output_message .len );
4144
4145
4145
4146
/* Set local flag */
4146
4147
if (requestReply )
0 commit comments