Skip to content

Commit 8492561

Browse files
committed
Merge branch '145-auto-sync-instance' into 'master'
Run sync instance in order to keep data from physical backup up-to-date: * integrate as an option to existing stages * prepare stage configuration(docker images, credentials, volumes) * manage sync container during snapshot taking * manage promote container Closes #145 See merge request postgres-ai/database-lab!136
2 parents 039cde8 + 99b743a commit 8492561

File tree

9 files changed

+193
-83
lines changed

9 files changed

+193
-83
lines changed

cmd/database-lab/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/docker/docker/client"
1818
"github.com/jessevdk/go-flags"
1919
"github.com/pkg/errors"
20+
"github.com/rs/xid"
2021

2122
"gitlab.com/postgres-ai/database-lab/pkg/config"
2223
"gitlab.com/postgres-ai/database-lab/pkg/log"
@@ -72,6 +73,8 @@ func main() {
7273
cfg.Global.MountDir = cfg.Provision.ModeLocal.MountDir
7374
}
7475

76+
cfg.Global.InstanceID = xid.New().String()
77+
7578
ctx, cancel := context.WithCancel(context.Background())
7679
defer cancel()
7780

configs/config.example.physical_generic.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ retrieval:
158158
options:
159159
tool: customTool
160160
dockerImage: "postgresai/sync-instance:12"
161+
syncInstance: true
161162

162163
# Set environment variables here. See https://www.postgresql.org/docs/current/libpq-envars.html
163164
envs:

configs/config.example.physical_walg.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ retrieval:
158158
options:
159159
tool: walg
160160
dockerImage: "postgresai/sync-instance:12"
161+
syncInstance: true
161162
envs:
162163
WALG_GS_PREFIX: "gs://{BUCKET}/{SCOPE}"
163164
walg:

pkg/config/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ type Config struct {
3434

3535
// Global contains global Database Lab configurations.
3636
type Global struct {
37-
Engine string `yaml:"engine"`
38-
DataDir string `yaml:"dataDir"`
39-
MountDir string // TODO (akartasov): Use MountDir for the ModeLocalConfig of a Provision service.
37+
InstanceID string
38+
Engine string `yaml:"engine"`
39+
DataDir string `yaml:"dataDir"`
40+
MountDir string // TODO (akartasov): Use MountDir for the ModeLocalConfig of a Provision service.
4041
}
4142

4243
// LoadConfig instances a new Config by configuration filename.

pkg/retrieval/engine/postgres/initialize/physical/custom.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
package physical
66

77
import (
8-
"strings"
8+
"bytes"
9+
"fmt"
910

1011
"github.com/docker/docker/api/types/mount"
1112
)
@@ -19,7 +20,8 @@ type custom struct {
1920
}
2021

2122
type customOptions struct {
22-
Command string `yaml:"command"`
23+
RestoreCommand string `yaml:"command"`
24+
RefreshCommand string `yaml:"refresh_command"`
2325
}
2426

2527
func newCustomTool(options customOptions) *custom {
@@ -39,11 +41,16 @@ func (c *custom) GetMounts() []mount.Mount {
3941
}
4042

4143
// GetRestoreCommand returns a custom command to restore data.
42-
func (c *custom) GetRestoreCommand() []string {
43-
return strings.Split(c.options.Command, " ")
44+
func (c *custom) GetRestoreCommand() string {
45+
return c.options.RestoreCommand
4446
}
4547

4648
// GetRecoveryConfig returns a recovery config to restore data.
4749
func (c *custom) GetRecoveryConfig() []byte {
48-
return []byte{}
50+
buffer := bytes.Buffer{}
51+
52+
buffer.WriteString("standby_mode = 'on'\n")
53+
buffer.WriteString(fmt.Sprintf("restore_command = '%s'\n", c.options.RefreshCommand))
54+
55+
return buffer.Bytes()
4956
}

pkg/retrieval/engine/postgres/initialize/physical/physical.go

Lines changed: 110 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ const (
3939
// RestoreJobType defines the physical job type.
4040
RestoreJobType = "physical-restore"
4141

42-
restoreContainerName = "retriever_physical_restore"
43-
restoreContainerPath = "/var/lib/postgresql/dblabdata"
42+
restoreContainerPrefix = "dblab_phr_"
43+
restoreContainerPath = "/var/lib/postgresql/dblabdata"
4444

4545
readyLogLine = "database system is ready to accept"
4646

@@ -59,11 +59,12 @@ type RestoreJob struct {
5959

6060
// CopyOptions describes options for physical copying.
6161
type CopyOptions struct {
62-
Tool string `yaml:"tool"`
63-
DockerImage string `yaml:"dockerImage"`
64-
Envs map[string]string `yaml:"envs"`
65-
WALG walgOptions `yaml:"walg"`
66-
CustomTool customOptions `yaml:"customTool"`
62+
Tool string `yaml:"tool"`
63+
DockerImage string `yaml:"dockerImage"`
64+
Envs map[string]string `yaml:"envs"`
65+
WALG walgOptions `yaml:"walg"`
66+
CustomTool customOptions `yaml:"customTool"`
67+
SyncInstance bool `yaml:"syncInstance"`
6768
}
6869

6970
// restorer describes the interface of tools for physical restore.
@@ -75,7 +76,7 @@ type restorer interface {
7576
GetMounts() []mount.Mount
7677

7778
// GetRestoreCommand returns a command to restore data.
78-
GetRestoreCommand() []string
79+
GetRestoreCommand() string
7980

8081
// GetRecoveryConfig returns a recovery config to restore data.
8182
GetRecoveryConfig() []byte
@@ -117,6 +118,10 @@ func (r *RestoreJob) getRestorer(tool string) (restorer, error) {
117118
return nil, errors.Errorf("unknown restore tool given: %v", tool)
118119
}
119120

121+
func (r *RestoreJob) restoreContainerName() string {
122+
return restoreContainerPrefix + r.globalCfg.InstanceID
123+
}
124+
120125
// Name returns a name of the job.
121126
func (r *RestoreJob) Name() string {
122127
return r.name
@@ -132,49 +137,26 @@ func (r *RestoreJob) Run(ctx context.Context) error {
132137
}
133138

134139
if !isEmpty {
135-
return errors.New("the data directory is not empty. Clean the data directory before continue")
140+
return errors.New("the data directory is not empty. Clean the data directory before proceeding")
136141
}
137142

138-
cont, err := r.dockerClient.ContainerCreate(ctx,
139-
&container.Config{
140-
Env: r.getEnvironmentVariables(),
141-
Image: r.DockerImage,
142-
},
143-
&container.HostConfig{
144-
Mounts: r.getMountVolumes(),
145-
},
146-
&network.NetworkingConfig{},
147-
restoreContainerName,
148-
)
149-
143+
contID, err := r.startReplica(ctx, r.restoreContainerName())
150144
if err != nil {
151-
return errors.Wrap(err, "failed to create container")
145+
return errors.Wrapf(err, "failed to create container: %s", r.restoreContainerName())
152146
}
153147

154-
defer func() {
155-
if err := r.dockerClient.ContainerRemove(ctx, cont.ID, types.ContainerRemoveOptions{
156-
Force: true,
157-
}); err != nil {
158-
log.Err("Failed to remove container: ", err)
159-
160-
return
161-
}
162-
163-
log.Msg(fmt.Sprintf("Stop container: %s. ID: %v", restoreContainerName, cont.ID))
164-
}()
148+
defer tools.RemoveContainer(ctx, r.dockerClient, contID, tools.StopTimeout)
165149

166-
defer tools.RemoveContainer(ctx, r.dockerClient, cont.ID, tools.StopTimeout)
150+
log.Msg(fmt.Sprintf("Running container: %s. ID: %v", r.restoreContainerName(), contID))
167151

168-
log.Msg(fmt.Sprintf("Running container: %s. ID: %v", restoreContainerName, cont.ID))
169-
170-
if err = r.dockerClient.ContainerStart(ctx, cont.ID, types.ContainerStartOptions{}); err != nil {
171-
return errors.Wrap(err, "failed to start container")
152+
if err = r.dockerClient.ContainerStart(ctx, contID, types.ContainerStartOptions{}); err != nil {
153+
return errors.Wrapf(err, "failed to start container: %v", contID)
172154
}
173155

174156
log.Msg("Running restore command")
175157

176-
if err := tools.ExecCommand(ctx, r.dockerClient, cont.ID, types.ExecConfig{
177-
Cmd: r.restorer.GetRestoreCommand(),
158+
if err := tools.ExecCommand(ctx, r.dockerClient, contID, types.ExecConfig{
159+
Cmd: []string{"bash", "-c", r.restorer.GetRestoreCommand()},
178160
}); err != nil {
179161
return errors.Wrap(err, "failed to restore data")
180162
}
@@ -209,20 +191,14 @@ func (r *RestoreJob) Run(ctx context.Context) error {
209191
}
210192

211193
// Set permissions.
212-
if err := tools.ExecCommand(ctx, r.dockerClient, cont.ID, types.ExecConfig{
194+
if err := tools.ExecCommand(ctx, r.dockerClient, contID, types.ExecConfig{
213195
Cmd: []string{"chown", "-R", "postgres", restoreContainerPath},
214196
}); err != nil {
215197
return errors.Wrap(err, "failed to set permissions")
216198
}
217199

218200
// Start PostgreSQL instance.
219-
startCommand, err := r.dockerClient.ContainerExecCreate(ctx, cont.ID, types.ExecConfig{
220-
AttachStdout: true,
221-
AttachStderr: true,
222-
Tty: true,
223-
Cmd: []string{"postgres", "-D", restoreContainerPath},
224-
User: defaults.Username,
225-
})
201+
startCommand, err := r.dockerClient.ContainerExecCreate(ctx, contID, startingPostgresConfig())
226202

227203
if err != nil {
228204
return errors.Wrap(err, "failed to create an exec command")
@@ -241,11 +217,51 @@ func (r *RestoreJob) Run(ctx context.Context) error {
241217
return errors.Wrap(err, "failed to refresh data")
242218
}
243219

244-
log.Msg("Running restore command")
220+
log.Msg("Refresh command has been finished")
221+
222+
if r.CopyOptions.SyncInstance {
223+
if err := r.runSyncInstance(ctx); err != nil {
224+
log.Err("Failed to run sync instance", err)
225+
}
226+
}
245227

246228
return nil
247229
}
248230

231+
func (r *RestoreJob) startReplica(ctx context.Context, containerName string) (string, error) {
232+
syncInstance, err := r.dockerClient.ContainerCreate(ctx,
233+
&container.Config{
234+
Env: r.getEnvironmentVariables(),
235+
Image: r.DockerImage,
236+
},
237+
&container.HostConfig{
238+
Mounts: r.getMountVolumes(),
239+
},
240+
&network.NetworkingConfig{},
241+
containerName,
242+
)
243+
244+
if err != nil {
245+
return "", errors.Wrap(err, "failed to start sync container")
246+
}
247+
248+
if err = r.dockerClient.ContainerStart(ctx, syncInstance.ID, types.ContainerStartOptions{}); err != nil {
249+
return "", errors.Wrap(err, "failed to start sync container")
250+
}
251+
252+
return syncInstance.ID, nil
253+
}
254+
255+
func startingPostgresConfig() types.ExecConfig {
256+
return types.ExecConfig{
257+
AttachStdout: true,
258+
AttachStderr: true,
259+
Tty: true,
260+
Cmd: []string{"postgres", "-D", restoreContainerPath},
261+
User: defaults.Username,
262+
}
263+
}
264+
249265
func isDatabaseReady(input io.Reader) error {
250266
scanner := bufio.NewScanner(input)
251267

@@ -272,7 +288,51 @@ func isDatabaseReady(input io.Reader) error {
272288
return err
273289
}
274290

275-
return errors.New("not found")
291+
return errors.New("database instance is not running")
292+
}
293+
294+
func (r *RestoreJob) syncInstanceName() string {
295+
return tools.SyncInstanceContainerPrefix + r.globalCfg.InstanceID
296+
}
297+
298+
func (r *RestoreJob) runSyncInstance(ctx context.Context) error {
299+
syncContainer, err := r.dockerClient.ContainerInspect(ctx, r.syncInstanceName())
300+
if err != nil && !client.IsErrNotFound(err) {
301+
return errors.Wrap(err, "failed to inspect sync container")
302+
}
303+
304+
if syncContainer.ContainerJSONBase != nil {
305+
if syncContainer.State.Running {
306+
log.Msg("Sync instance is already running")
307+
return nil
308+
}
309+
310+
log.Msg("Removing non-running sync instance")
311+
312+
tools.RemoveContainer(ctx, r.dockerClient, syncContainer.ID, tools.StopTimeout)
313+
}
314+
315+
log.Msg("Starting sync instance: ", r.syncInstanceName())
316+
317+
syncInstanceID, err := r.startReplica(ctx, r.syncInstanceName())
318+
if err != nil {
319+
return err
320+
}
321+
322+
startSyncCommand, err := r.dockerClient.ContainerExecCreate(ctx, syncInstanceID, startingPostgresConfig())
323+
if err != nil {
324+
return errors.Wrap(err, "failed to create exec command")
325+
}
326+
327+
if err = r.dockerClient.ContainerExecStart(ctx, startSyncCommand.ID, types.ExecStartCheck{Tty: true}); err != nil {
328+
return errors.Wrap(err, "failed to attach to exec command")
329+
}
330+
331+
if err := tools.InspectCommandResponse(ctx, r.dockerClient, startSyncCommand.ID, startSyncCommand.ID); err != nil {
332+
return errors.Wrap(err, "failed to perform exec command")
333+
}
334+
335+
return nil
276336
}
277337

278338
func (r *RestoreJob) getEnvironmentVariables() []string {

pkg/retrieval/engine/postgres/initialize/physical/wal_g.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package physical
77
import (
88
"bytes"
99
"fmt"
10-
"time"
1110

1211
"github.com/docker/docker/api/types/mount"
1312
)
@@ -64,16 +63,15 @@ func (w *walg) GetMounts() []mount.Mount {
6463
}
6564

6665
// GetRestoreCommand returns a command to restore data.
67-
func (w *walg) GetRestoreCommand() []string {
68-
return []string{"wal-g", "backup-fetch", restoreContainerPath, w.options.BackupName}
66+
func (w *walg) GetRestoreCommand() string {
67+
return fmt.Sprintf("wal-g backup-fetch %s %s", restoreContainerPath, w.options.BackupName)
6968
}
7069

7170
// GetRecoveryConfig returns a recovery config to restore data.
7271
func (w *walg) GetRecoveryConfig() []byte {
7372
buffer := bytes.Buffer{}
7473

7574
buffer.WriteString("standby_mode = 'on'\n")
76-
buffer.WriteString(fmt.Sprintf("recovery_target_time = '%s'\n", time.Now().Format("2006-02-01 15:04:05")))
7775
buffer.WriteString("restore_command = 'wal-g wal-fetch %f %p'\n")
7876

7977
return buffer.Bytes()

0 commit comments

Comments
 (0)