Skip to content

Commit 2904405

Browse files
VictorSpirinfunny-falcon
authored andcommitted
[PBCKP-270] Added pio file functions for compatibility/walmethods.c
1 parent 4418ec9 commit 2904405

File tree

6 files changed

+124
-199
lines changed

6 files changed

+124
-199
lines changed

src/compatibility/receivelog.c

Lines changed: 28 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
7474
snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
7575
fname);
7676

77-
f = stream->walmethod->open_for_write(tmppath, NULL, 0);
77+
f = stream->walmethod->open_for_write(tmppath);
7878
if (f == NULL)
7979
{
8080
elog(ERROR, "could not create archive status file \"%s\": %s",
@@ -113,8 +113,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
113113
XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
114114

115115
/* Note that this considers the compression used if necessary */
116-
fn = stream->walmethod->get_file_name(current_walfile_name,
117-
stream->partial_suffix);
116+
fn = stream->walmethod->get_file_name(current_walfile_name);
118117

119118
/*
120119
* When streaming to files, if an existing file exists we verify that it's
@@ -140,7 +139,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
140139
if (size == WalSegSz)
141140
{
142141
/* Already padded file. Open it for use */
143-
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
142+
f = stream->walmethod->open_for_write(current_walfile_name);
144143
if (f == NULL)
145144
{
146145
elog(ERROR, "could not open existing write-ahead log file \"%s\": %s",
@@ -153,7 +152,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
153152
if (stream->walmethod->sync(f) != 0)
154153
{
155154
elog(ERROR, "could not fsync existing write-ahead log file \"%s\": %s",
156-
fn, stream->walmethod->getlasterror());//FATAL
155+
fn, stream->walmethod->getlasterror());
157156
stream->walmethod->close(f, CLOSE_UNLINK);
158157
exit(1);
159158
}
@@ -179,8 +178,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
179178

180179
/* No file existed, so create one */
181180

182-
f = stream->walmethod->open_for_write(current_walfile_name,
183-
stream->partial_suffix, WalSegSz);
181+
f = stream->walmethod->open_for_write(current_walfile_name);
184182
if (f == NULL)
185183
{
186184
elog(ERROR, "could not open write-ahead log file \"%s\": %s",
@@ -218,20 +216,32 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
218216

219217
return false;
220218
}
221-
222-
if (stream->partial_suffix)
219+
/*
220+
* Pad file to WalSegSz size by zero bytes
221+
*/
222+
if (currpos < WalSegSz)
223223
{
224-
if (currpos == WalSegSz)
225-
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
226-
else
224+
char *tempbuf = pgut_malloc0(XLOG_BLCKSZ);
225+
int needWrite = WalSegSz - currpos;
226+
int cnt;
227+
while (needWrite > 0)
227228
{
228-
elog(INFO, "not renaming \"%s%s\", segment is not complete",
229-
current_walfile_name, stream->partial_suffix);
230-
r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
229+
230+
cnt = needWrite > XLOG_BLCKSZ ? XLOG_BLCKSZ : needWrite;
231+
if (stream->walmethod->write(walfile, tempbuf, cnt) != cnt)
232+
{
233+
elog(ERROR, "failed to append file \"%s\": %s",
234+
current_walfile_name, stream->walmethod->getlasterror());
235+
stream->walmethod->close(walfile, CLOSE_NORMAL);
236+
walfile = NULL;
237+
pgut_free(tempbuf);
238+
return false;
239+
}
240+
needWrite -= cnt;
231241
}
242+
pgut_free(tempbuf);
232243
}
233-
else
234-
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
244+
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
235245

236246
walfile = NULL;
237247

@@ -242,19 +252,6 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
242252
return false;
243253
}
244254

245-
/*
246-
* Mark file as archived if requested by the caller - pg_basebackup needs
247-
* to do so as files can otherwise get archived again after promotion of a
248-
* new node. This is in line with walreceiver.c always doing a
249-
* XLogArchiveForceDone() after a complete segment.
250-
*/
251-
if (currpos == WalSegSz && stream->mark_done)
252-
{
253-
/* writes error message if failed */
254-
if (!mark_file_as_archived(stream, current_walfile_name))
255-
return false;
256-
}
257-
258255
lastFlushPosition = pos;
259256
return true;
260257
}
@@ -299,7 +296,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
299296
return false;
300297
}
301298

302-
f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
299+
f = stream->walmethod->open_for_write(histfname);
303300
if (f == NULL)
304301
{
305302
pg_log_error("could not create timeline history file \"%s\": %s",
@@ -327,14 +324,6 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
327324
return false;
328325
}
329326

330-
/* Maintain archive_status, check close_walfile() for details. */
331-
if (stream->mark_done)
332-
{
333-
/* writes error message if failed */
334-
if (!mark_file_as_archived(stream, histfname))
335-
return false;
336-
}
337-
338327
return true;
339328
}
340329

@@ -490,10 +479,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
490479
}
491480
else
492481
{
493-
if (stream->synchronous)
494-
reportFlushPosition = true;
495-
else
496-
reportFlushPosition = false;
497482
slotcmd[0] = 0;
498483
}
499484

@@ -777,29 +762,6 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
777762

778763
now = feGetCurrentTimestamp();
779764

780-
/*
781-
* If synchronous option is true, issue sync command as soon as there
782-
* are WAL data which has not been flushed yet.
783-
*/
784-
if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
785-
{
786-
if (stream->walmethod->sync(walfile) != 0)
787-
{
788-
pg_log_fatal("could not fsync file \"%s\": %s",
789-
current_walfile_name, stream->walmethod->getlasterror());
790-
exit(1);
791-
}
792-
lastFlushPosition = blockpos;
793-
794-
/*
795-
* Send feedback so that the server sees the latest WAL locations
796-
* immediately.
797-
*/
798-
if (!sendFeedback(conn, blockpos, now, false))
799-
goto error;
800-
last_status = now;
801-
}
802-
803765
/*
804766
* Potentially send a status message to the primary
805767
*/

src/compatibility/receivelog.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,13 @@ typedef struct StreamCtl
3333
char *sysidentifier; /* Validate this system identifier and
3434
* timeline */
3535
int standby_message_timeout; /* Send status messages this often */
36-
bool synchronous; /* Flush immediately WAL data on write */
37-
bool mark_done; /* Mark segment as done in generated archive */
38-
bool do_sync; /* Flush to disk to ensure consistent state of
39-
* data */
4036

4137
stream_stop_callback stream_stop; /* Stop streaming when returns true */
4238

4339
pgsocket stop_socket; /* if valid, watch for input on this socket
4440
* and check stream_stop() when there is any */
4541

4642
WalWriteMethod *walmethod; /* How to write the WAL */
47-
char *partial_suffix; /* Suffix appended to partially received files */
4843
char *replication_slot; /* Replication slot to use, or NULL */
4944
} StreamCtl;
5045

src/compatibility/streamutil.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*-------------------------------------------------------------------------
1313
*/
1414

15+
#include "pg_probackup.h"
1516
#include "postgres_fe.h"
1617

1718
#include <sys/time.h>

0 commit comments

Comments
 (0)