Skip to content

Commit 8b46c8a

Browse files
VictorSpirinfunny-falcon
authored andcommitted
[PBCKP-270] Fixed some functions for WAL stream
1 parent 2904405 commit 8b46c8a

File tree

4 files changed

+10
-36
lines changed

4 files changed

+10
-36
lines changed

src/compatibility/receivelog.c

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,32 +65,6 @@ static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_ti
6565
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
6666
uint32 *timeline);
6767

68-
static bool
69-
mark_file_as_archived(StreamCtl *stream, const char *fname)
70-
{
71-
Walfile *f;
72-
static char tmppath[MAXPGPATH];
73-
74-
snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
75-
fname);
76-
77-
f = stream->walmethod->open_for_write(tmppath);
78-
if (f == NULL)
79-
{
80-
elog(ERROR, "could not create archive status file \"%s\": %s",
81-
tmppath, stream->walmethod->getlasterror());
82-
return false;
83-
}
84-
85-
if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
86-
{
87-
elog(ERROR, "could not close archive status file \"%s\": %s",
88-
tmppath, stream->walmethod->getlasterror());
89-
return false;
90-
}
91-
92-
return true;
93-
}
9468

9569
/*
9670
* Open a new WAL file in the specified directory.
@@ -139,7 +113,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
139113
if (size == WalSegSz)
140114
{
141115
/* Already padded file. Open it for use */
142-
f = stream->walmethod->open_for_write(current_walfile_name);
116+
f = stream->walmethod->open_for_write(current_walfile_name, false);
143117
if (f == NULL)
144118
{
145119
elog(ERROR, "could not open existing write-ahead log file \"%s\": %s",
@@ -178,7 +152,7 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
178152

179153
/* No file existed, so create one */
180154

181-
f = stream->walmethod->open_for_write(current_walfile_name);
155+
f = stream->walmethod->open_for_write(current_walfile_name, false);
182156
if (f == NULL)
183157
{
184158
elog(ERROR, "could not open write-ahead log file \"%s\": %s",
@@ -296,7 +270,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
296270
return false;
297271
}
298272

299-
f = stream->walmethod->open_for_write(histfname);
273+
f = stream->walmethod->open_for_write(histfname, true);
300274
if (f == NULL)
301275
{
302276
pg_log_error("could not create timeline history file \"%s\": %s",

src/compatibility/walmethods.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ typedef struct DirectoryMethodFile
6565
off_t currpos;
6666
char *pathname;
6767
char *fullpath;
68-
//char *temp_suffix;/* todo: remove temp_suffix - S3 not support rename, pioOpenRewrite ust temp files fot local file operations */
6968
#ifdef HAVE_LIBZ
7069
gzFile gzfp;
7170
#endif
@@ -96,7 +95,7 @@ dir_get_file_name(const char *pathname)
9695
}
9796

9897
static Walfile
99-
dir_open_for_write(const char *pathname)
98+
dir_open_for_write(const char *pathname, bool use_temp)
10099
{
101100
FOBJ_FUNC_ARP();
102101
char tmppath[MAXPGPATH];
@@ -122,7 +121,7 @@ dir_open_for_write(const char *pathname)
122121
* does not do any system calls to fsync() to make changes permanent on
123122
* disk.
124123
*/
125-
fd = $i(pioOpenRewrite, dir_data->drive, tmppath, O_WRONLY | O_CREAT | PG_BINARY, .err = &err);
124+
fd = $i(pioOpenRewrite, dir_data->drive, tmppath, .err = &err, .use_temp = use_temp);
126125
if ($haserr(err))
127126
{
128127
dir_data->lasterrno = getErrno(err);
@@ -189,7 +188,7 @@ dir_open_for_write(const char *pathname)
189188
if (dir_data->compression > 0)
190189
f->gzfp = gzfp;
191190
#endif
192-
f->fd = fd;
191+
f->fd = $iref(fd);
193192
f->currpos = 0;
194193
f->pathname = pg_strdup(pathname);
195194
f->fullpath = pg_strdup(tmppath);
@@ -256,7 +255,6 @@ dir_close(Walfile f, WalCloseMethod method)
256255
int r = 0;
257256
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
258257
char tmppath[MAXPGPATH];
259-
char tmppath2[MAXPGPATH];
260258
err_i err = $noerr();
261259

262260
Assert(f != NULL);
@@ -270,7 +268,7 @@ dir_close(Walfile f, WalCloseMethod method)
270268
}
271269
else
272270
#endif
273-
err = $i(pioClose, df->fd, dir_data->sync);
271+
err = $i(pioClose, df->fd);
274272
if ($haserr(err))
275273
{
276274
dir_data->lasterrno = getErrno(err);
@@ -295,6 +293,7 @@ dir_close(Walfile f, WalCloseMethod method)
295293

296294
pg_free(df->pathname);
297295
pg_free(df->fullpath);
296+
$idel(&df->fd);
298297
pg_free(df);
299298

300299
return r;

src/compatibility/walmethods.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ struct WalWriteMethod
3838
* automatically renamed in close(). If pad_to_size is specified, the file
3939
* will be padded with NUL up to that size, if supported by the Walmethod.
4040
*/
41-
Walfile (*open_for_write) (const char *pathname);
41+
Walfile (*open_for_write) (const char *pathname, bool use_temp);
4242

4343
/*
4444
* Close an open Walfile, using one or more methods for handling automatic

src/stream.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ CreateReplicationSlot_compat(PGconn *conn, const char *slot_name, const char *pl
198198
static void *
199199
StreamLog(void *arg)
200200
{
201+
FOBJ_FUNC_ARP();
201202
StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
202203

203204
/*

0 commit comments

Comments
 (0)