@@ -74,7 +74,7 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
74
74
snprintf (tmppath , sizeof (tmppath ), "archive_status/%s.done" ,
75
75
fname );
76
76
77
- f = stream -> walmethod -> open_for_write (tmppath , NULL , 0 );
77
+ f = stream -> walmethod -> open_for_write (tmppath );
78
78
if (f == NULL )
79
79
{
80
80
elog (ERROR , "could not create archive status file \"%s\": %s" ,
@@ -113,8 +113,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
113
113
XLogFileName (current_walfile_name , stream -> timeline , segno , WalSegSz );
114
114
115
115
/* Note that this considers the compression used if necessary */
116
- fn = stream -> walmethod -> get_file_name (current_walfile_name ,
117
- stream -> partial_suffix );
116
+ fn = stream -> walmethod -> get_file_name (current_walfile_name );
118
117
119
118
/*
120
119
* When streaming to files, if an existing file exists we verify that it's
@@ -140,7 +139,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
140
139
if (size == WalSegSz )
141
140
{
142
141
/* Already padded file. Open it for use */
143
- f = stream -> walmethod -> open_for_write (current_walfile_name , stream -> partial_suffix , 0 );
142
+ f = stream -> walmethod -> open_for_write (current_walfile_name );
144
143
if (f == NULL )
145
144
{
146
145
elog (ERROR , "could not open existing write-ahead log file \"%s\": %s" ,
@@ -153,7 +152,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
153
152
if (stream -> walmethod -> sync (f ) != 0 )
154
153
{
155
154
elog (ERROR , "could not fsync existing write-ahead log file \"%s\": %s" ,
156
- fn , stream -> walmethod -> getlasterror ());//FATAL
155
+ fn , stream -> walmethod -> getlasterror ());
157
156
stream -> walmethod -> close (f , CLOSE_UNLINK );
158
157
exit (1 );
159
158
}
@@ -179,8 +178,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
179
178
180
179
/* No file existed, so create one */
181
180
182
- f = stream -> walmethod -> open_for_write (current_walfile_name ,
183
- stream -> partial_suffix , WalSegSz );
181
+ f = stream -> walmethod -> open_for_write (current_walfile_name );
184
182
if (f == NULL )
185
183
{
186
184
elog (ERROR , "could not open write-ahead log file \"%s\": %s" ,
@@ -218,20 +216,32 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
218
216
219
217
return false;
220
218
}
221
-
222
- if (stream -> partial_suffix )
219
+ /*
220
+ * Pad file to WalSegSz size by zero bytes
221
+ */
222
+ if (currpos < WalSegSz )
223
223
{
224
- if (currpos == WalSegSz )
225
- r = stream -> walmethod -> close (walfile , CLOSE_NORMAL );
226
- else
224
+ char * tempbuf = pgut_malloc0 (XLOG_BLCKSZ );
225
+ int needWrite = WalSegSz - currpos ;
226
+ int cnt ;
227
+ while (needWrite > 0 )
227
228
{
228
- elog (INFO , "not renaming \"%s%s\", segment is not complete" ,
229
- current_walfile_name , stream -> partial_suffix );
230
- r = stream -> walmethod -> close (walfile , CLOSE_NO_RENAME );
229
+
230
+ cnt = needWrite > XLOG_BLCKSZ ? XLOG_BLCKSZ : needWrite ;
231
+ if (stream -> walmethod -> write (walfile , tempbuf , cnt ) != cnt )
232
+ {
233
+ elog (ERROR , "failed to append file \"%s\": %s" ,
234
+ current_walfile_name , stream -> walmethod -> getlasterror ());
235
+ stream -> walmethod -> close (walfile , CLOSE_NORMAL );
236
+ walfile = NULL ;
237
+ pgut_free (tempbuf );
238
+ return false;
239
+ }
240
+ needWrite -= cnt ;
231
241
}
242
+ pgut_free (tempbuf );
232
243
}
233
- else
234
- r = stream -> walmethod -> close (walfile , CLOSE_NORMAL );
244
+ r = stream -> walmethod -> close (walfile , CLOSE_NORMAL );
235
245
236
246
walfile = NULL ;
237
247
@@ -242,19 +252,6 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
242
252
return false;
243
253
}
244
254
245
- /*
246
- * Mark file as archived if requested by the caller - pg_basebackup needs
247
- * to do so as files can otherwise get archived again after promotion of a
248
- * new node. This is in line with walreceiver.c always doing a
249
- * XLogArchiveForceDone() after a complete segment.
250
- */
251
- if (currpos == WalSegSz && stream -> mark_done )
252
- {
253
- /* writes error message if failed */
254
- if (!mark_file_as_archived (stream , current_walfile_name ))
255
- return false;
256
- }
257
-
258
255
lastFlushPosition = pos ;
259
256
return true;
260
257
}
@@ -299,7 +296,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
299
296
return false;
300
297
}
301
298
302
- f = stream -> walmethod -> open_for_write (histfname , ".tmp" , 0 );
299
+ f = stream -> walmethod -> open_for_write (histfname );
303
300
if (f == NULL )
304
301
{
305
302
pg_log_error ("could not create timeline history file \"%s\": %s" ,
@@ -327,14 +324,6 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
327
324
return false;
328
325
}
329
326
330
- /* Maintain archive_status, check close_walfile() for details. */
331
- if (stream -> mark_done )
332
- {
333
- /* writes error message if failed */
334
- if (!mark_file_as_archived (stream , histfname ))
335
- return false;
336
- }
337
-
338
327
return true;
339
328
}
340
329
@@ -490,10 +479,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
490
479
}
491
480
else
492
481
{
493
- if (stream -> synchronous )
494
- reportFlushPosition = true;
495
- else
496
- reportFlushPosition = false;
497
482
slotcmd [0 ] = 0 ;
498
483
}
499
484
@@ -777,29 +762,6 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
777
762
778
763
now = feGetCurrentTimestamp ();
779
764
780
- /*
781
- * If synchronous option is true, issue sync command as soon as there
782
- * are WAL data which has not been flushed yet.
783
- */
784
- if (stream -> synchronous && lastFlushPosition < blockpos && walfile != NULL )
785
- {
786
- if (stream -> walmethod -> sync (walfile ) != 0 )
787
- {
788
- pg_log_fatal ("could not fsync file \"%s\": %s" ,
789
- current_walfile_name , stream -> walmethod -> getlasterror ());
790
- exit (1 );
791
- }
792
- lastFlushPosition = blockpos ;
793
-
794
- /*
795
- * Send feedback so that the server sees the latest WAL locations
796
- * immediately.
797
- */
798
- if (!sendFeedback (conn , blockpos , now , false))
799
- goto error ;
800
- last_status = now ;
801
- }
802
-
803
765
/*
804
766
* Potentially send a status message to the primary
805
767
*/
0 commit comments