Skip to content

Commit 27ce72b

Browse files
committed
Merge branch 'v2.0' into port2.0
2 parents 5346420 + 533b162 commit 27ce72b

File tree

3 files changed

+109
-104
lines changed

3 files changed

+109
-104
lines changed

pkg/retrieval/engine/postgres/physical/physical.go

Lines changed: 65 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -224,28 +224,7 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
224224
return errors.Wrap(err, "failed to adjust by init parameters")
225225
}
226226

227-
// Start PostgreSQL instance.
228-
startCommand, err := r.dockerClient.ContainerExecCreate(ctx, contID,
229-
pgCommandConfig("postgres", dataDir, pgVersion))
230-
231-
if err != nil {
232-
return errors.Wrap(err, "failed to create an exec command")
233-
}
234-
235-
log.Msg("Running refresh command")
236-
237-
attachResponse, err := r.dockerClient.ContainerExecAttach(ctx, startCommand.ID, types.ExecStartCheck{})
238-
if err != nil {
239-
return errors.Wrap(err, "failed to attach to the exec command")
240-
}
241-
242-
defer attachResponse.Close()
243-
244-
if err := isDatabaseReady(attachResponse.Reader); err != nil {
245-
return errors.Wrap(err, "failed to refresh data")
246-
}
247-
248-
log.Msg("Refresh command has been finished")
227+
log.Msg("Configuration has been finished")
249228

250229
return nil
251230
}
@@ -283,53 +262,6 @@ func (r *RestoreJob) startContainer(ctx context.Context, containerName, containe
283262
return syncInstance.ID, nil
284263
}
285264

286-
func pgCommandConfig(cmd, pgDataDir, pgVersion string) types.ExecConfig {
287-
command := fmt.Sprintf("/usr/lib/postgresql/%s/bin/%s", pgVersion, cmd)
288-
289-
return types.ExecConfig{
290-
AttachStdout: true,
291-
AttachStderr: true,
292-
Cmd: []string{command, "-D", pgDataDir},
293-
User: defaults.Username,
294-
Env: os.Environ(),
295-
}
296-
}
297-
298-
func isDatabaseReady(input io.Reader) error {
299-
scanner := bufio.NewScanner(input)
300-
301-
timer := time.NewTimer(time.Minute)
302-
defer timer.Stop()
303-
304-
LOOP:
305-
for {
306-
select {
307-
case <-timer.C:
308-
return errors.New("timeout exceeded")
309-
default:
310-
if !scanner.Scan() {
311-
break LOOP
312-
}
313-
314-
timer.Reset(time.Minute)
315-
}
316-
317-
text := scanner.Text()
318-
319-
if strings.Contains(text, readyLogLine) {
320-
return nil
321-
}
322-
323-
fmt.Println(text)
324-
}
325-
326-
if err := scanner.Err(); err != nil {
327-
return err
328-
}
329-
330-
return errors.New("database instance is not running")
331-
}
332-
333265
func (r *RestoreJob) syncInstanceName() string {
334266
return cont.SyncInstanceContainerPrefix + r.globalCfg.InstanceID
335267
}
@@ -358,34 +290,61 @@ func (r *RestoreJob) runSyncInstance(ctx context.Context) error {
358290
return err
359291
}
360292

361-
// Set permissions.
362-
if err := tools.ExecCommand(ctx, r.dockerClient, syncInstanceID, types.ExecConfig{
363-
Cmd: []string{"chown", "-R", "postgres", r.globalCfg.DataDir()},
364-
}); err != nil {
365-
return errors.Wrap(err, "failed to set permissions")
293+
log.Msg("Starting PostgreSQL")
294+
295+
if err := tools.RunPostgres(ctx, r.dockerClient, syncInstanceID, r.globalCfg.DataDir()); err != nil {
296+
return errors.Wrap(err, "failed to start PostgreSQL instance")
366297
}
367298

368-
pgVersion, err := tools.DetectPGVersion(r.globalCfg.DataDir())
299+
logs, err := r.dockerClient.ContainerLogs(ctx, syncInstanceID, types.ContainerLogsOptions{
300+
ShowStdout: true,
301+
Follow: true,
302+
})
369303
if err != nil {
370-
return err
304+
return errors.Wrap(err, "failed to get container logs")
371305
}
372306

373-
startSyncCommand, err := r.dockerClient.ContainerExecCreate(ctx, syncInstanceID,
374-
pgCommandConfig("postgres", r.globalCfg.DataDir(), pgVersion))
375-
if err != nil {
376-
return errors.Wrap(err, "failed to create exec command")
307+
defer func() { _ = logs.Close() }()
308+
309+
if err := isDatabaseReady(logs); err != nil {
310+
return errors.Wrap(err, "failed to run PostgreSQL")
377311
}
378312

379-
if err = r.dockerClient.ContainerExecStart(ctx, startSyncCommand.ID, types.ExecStartCheck{
380-
Detach: true, Tty: true}); err != nil {
381-
return errors.Wrap(err, "failed to attach to exec command")
313+
return nil
314+
}
315+
316+
func isDatabaseReady(input io.Reader) error {
317+
scanner := bufio.NewScanner(input)
318+
319+
timer := time.NewTimer(time.Minute)
320+
defer timer.Stop()
321+
322+
LOOP:
323+
for {
324+
select {
325+
case <-timer.C:
326+
return errors.New("timeout exceeded")
327+
default:
328+
if !scanner.Scan() {
329+
break LOOP
330+
}
331+
332+
timer.Reset(time.Minute)
333+
}
334+
335+
text := scanner.Text()
336+
337+
if strings.Contains(text, readyLogLine) {
338+
log.Msg(text)
339+
return nil
340+
}
382341
}
383342

384-
if err := tools.InspectCommandResponse(ctx, r.dockerClient, syncInstanceID, startSyncCommand.ID); err != nil {
385-
return errors.Wrap(err, "failed to perform exec command")
343+
if err := scanner.Err(); err != nil {
344+
return err
386345
}
387346

388-
return nil
347+
return errors.New("database instance is not running")
389348
}
390349

391350
func (r *RestoreJob) getEnvironmentVariables(password string) []string {
@@ -467,12 +426,11 @@ func (r *RestoreJob) adjustRecoveryConfiguration(pgVersion, pgDataDir string) er
467426
recoveryFilename = "recovery.conf"
468427
}
469428

470-
return r.appendConfigFile(path.Join(pgDataDir, recoveryFilename), r.restorer.GetRecoveryConfig())
429+
return appendConfigFile(path.Join(pgDataDir, recoveryFilename), r.restorer.GetRecoveryConfig())
471430
}
472431

473432
func (r *RestoreJob) applyInitParams(ctx context.Context, contID, pgVersion, dataDir string) error {
474-
initConfCmd, err := r.dockerClient.ContainerExecCreate(ctx, contID,
475-
pgCommandConfig("pg_controldata", dataDir, pgVersion))
433+
initConfCmd, err := r.dockerClient.ContainerExecCreate(ctx, contID, pgControlDataConfig(dataDir, pgVersion))
476434

477435
if err != nil {
478436
return errors.Wrap(err, "failed to create an exec command")
@@ -487,15 +445,27 @@ func (r *RestoreJob) applyInitParams(ctx context.Context, contID, pgVersion, dat
487445

488446
defer attachResponse.Close()
489447

490-
initParams, err := r.extractInitParams(ctx, attachResponse.Reader)
448+
initParams, err := extractInitParams(ctx, attachResponse.Reader)
491449
if err != nil {
492450
return err
493451
}
494452

495-
return r.appendInitConfigs(initParams, dataDir)
453+
return appendInitConfigs(initParams, dataDir)
454+
}
455+
456+
func pgControlDataConfig(pgDataDir, pgVersion string) types.ExecConfig {
457+
command := fmt.Sprintf("/usr/lib/postgresql/%s/bin/pg_controldata", pgVersion)
458+
459+
return types.ExecConfig{
460+
AttachStdout: true,
461+
AttachStderr: true,
462+
Cmd: []string{command, "-D", pgDataDir},
463+
User: defaults.Username,
464+
Env: os.Environ(),
465+
}
496466
}
497467

498-
func (r *RestoreJob) extractInitParams(ctx context.Context, read io.Reader) (map[string]string, error) {
468+
func extractInitParams(ctx context.Context, read io.Reader) (map[string]string, error) {
499469
extractedConfigs := make(map[string]string)
500470
scanner := bufio.NewScanner(read)
501471

@@ -528,7 +498,7 @@ func (r *RestoreJob) extractInitParams(ctx context.Context, read io.Reader) (map
528498
return extractedConfigs, nil
529499
}
530500

531-
func (r *RestoreJob) appendInitConfigs(initConfiguration map[string]string, pgDataDir string) error {
501+
func appendInitConfigs(initConfiguration map[string]string, pgDataDir string) error {
532502
if len(initConfiguration) == 0 {
533503
return nil
534504
}
@@ -539,10 +509,10 @@ func (r *RestoreJob) appendInitConfigs(initConfiguration map[string]string, pgDa
539509
buffer.WriteString(fmt.Sprintf("%s = '%s'\n", key, value))
540510
}
541511

542-
return r.appendConfigFile(path.Join(pgDataDir, "postgresql.conf"), buffer.Bytes())
512+
return appendConfigFile(path.Join(pgDataDir, "postgresql.conf"), buffer.Bytes())
543513
}
544514

545-
func (r *RestoreJob) appendConfigFile(file string, data []byte) error {
515+
func appendConfigFile(file string, data []byte) error {
546516
configFile, err := os.OpenFile(file, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
547517
if err != nil {
548518
return err
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package physical
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestInitParamsExtraction(t *testing.T) {
13+
controlDataOutput := bytes.NewBufferString(`
14+
wal_level setting: logical
15+
wal_log_hints setting: on
16+
max_connections setting: 500
17+
max_worker_processes setting: 8
18+
max_prepared_xacts setting: 0
19+
max_locks_per_xact setting: 128
20+
track_commit_timestamp setting: off
21+
`)
22+
23+
expectedSettings := map[string]string{
24+
"max_connections": "500",
25+
"max_locks_per_transaction": "128",
26+
"max_prepared_transactions": "0",
27+
"max_worker_processes": "8",
28+
"track_commit_timestamp": "off",
29+
}
30+
31+
settings, err := extractInitParams(context.Background(), controlDataOutput)
32+
33+
require.Nil(t, err)
34+
assert.EqualValues(t, settings, expectedSettings)
35+
}

pkg/retrieval/engine/postgres/tools/tools.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/shirou/gopsutil/host"
3232

3333
"gitlab.com/postgres-ai/database-lab/pkg/log"
34-
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/defaults"
3534
)
3635

3736
const (
@@ -83,15 +82,14 @@ func DetectPGVersion(dataDir string) (string, error) {
8382
return string(bytes.TrimSpace(version)), nil
8483
}
8584

86-
// StartingPostgresConfig provides configuration to start Postgres.
87-
func StartingPostgresConfig(pgDataDir, pgVersion string) types.ExecConfig {
88-
command := fmt.Sprintf("/usr/lib/postgresql/%s/bin/postgres", pgVersion)
85+
// PGRunConfig provides configuration to start Postgres.
86+
func PGRunConfig(pgDataDir, pgVersion string) types.ExecConfig {
87+
command := fmt.Sprintf("sudo -Eu postgres /usr/lib/postgresql/%s/bin/postgres -D %s >& /proc/1/fd/1", pgVersion, pgDataDir)
8988

9089
return types.ExecConfig{
9190
AttachStdout: true,
9291
AttachStderr: true,
93-
Cmd: []string{command, "-D", pgDataDir},
94-
User: defaults.Username,
92+
Cmd: []string{"bash", "-c", command},
9593
Env: os.Environ(),
9694
}
9795
}
@@ -166,16 +164,18 @@ func RunPostgres(ctx context.Context, dockerClient *client.Client, containerID,
166164
return errors.Wrap(err, "failed to detect PostgreSQL version")
167165
}
168166

169-
startSyncCommand, err := dockerClient.ContainerExecCreate(ctx, containerID, StartingPostgresConfig(dataDir, pgVersion))
167+
startSyncCommand, err := dockerClient.ContainerExecCreate(ctx, containerID, PGRunConfig(dataDir, pgVersion))
170168
if err != nil {
171169
return errors.Wrap(err, "failed to create exec command")
172170
}
173171

174-
if err = dockerClient.ContainerExecStart(ctx, startSyncCommand.ID, types.ExecStartCheck{
175-
Detach: true, Tty: true}); err != nil {
172+
attach, err := dockerClient.ContainerExecAttach(ctx, startSyncCommand.ID, types.ExecStartCheck{})
173+
if err != nil {
176174
return errors.Wrap(err, "failed to attach to exec command")
177175
}
178176

177+
defer attach.Close()
178+
179179
if err := InspectCommandResponse(ctx, dockerClient, containerID, startSyncCommand.ID); err != nil {
180180
return errors.Wrap(err, "failed to perform exec command")
181181
}

0 commit comments

Comments
 (0)