@@ -39,8 +39,8 @@ const (
39
39
// RestoreJobType defines the physical job type.
40
40
RestoreJobType = "physical-restore"
41
41
42
- restoreContainerName = "retriever_physical_restore "
43
- restoreContainerPath = "/var/lib/postgresql/dblabdata"
42
+ restoreContainerPrefix = "dblab_phr_ "
43
+ restoreContainerPath = "/var/lib/postgresql/dblabdata"
44
44
45
45
readyLogLine = "database system is ready to accept"
46
46
@@ -59,11 +59,12 @@ type RestoreJob struct {
59
59
60
60
// CopyOptions describes options for physical copying.
61
61
type CopyOptions struct {
62
- Tool string `yaml:"tool"`
63
- DockerImage string `yaml:"dockerImage"`
64
- Envs map [string ]string `yaml:"envs"`
65
- WALG walgOptions `yaml:"walg"`
66
- CustomTool customOptions `yaml:"customTool"`
62
+ Tool string `yaml:"tool"`
63
+ DockerImage string `yaml:"dockerImage"`
64
+ Envs map [string ]string `yaml:"envs"`
65
+ WALG walgOptions `yaml:"walg"`
66
+ CustomTool customOptions `yaml:"customTool"`
67
+ SyncInstance bool `yaml:"syncInstance"`
67
68
}
68
69
69
70
// restorer describes the interface of tools for physical restore.
@@ -75,7 +76,7 @@ type restorer interface {
75
76
GetMounts () []mount.Mount
76
77
77
78
// GetRestoreCommand returns a command to restore data.
78
- GetRestoreCommand () [] string
79
+ GetRestoreCommand () string
79
80
80
81
// GetRecoveryConfig returns a recovery config to restore data.
81
82
GetRecoveryConfig () []byte
@@ -117,6 +118,10 @@ func (r *RestoreJob) getRestorer(tool string) (restorer, error) {
117
118
return nil , errors .Errorf ("unknown restore tool given: %v" , tool )
118
119
}
119
120
121
+ func (r * RestoreJob ) restoreContainerName () string {
122
+ return restoreContainerPrefix + r .globalCfg .InstanceID
123
+ }
124
+
120
125
// Name returns a name of the job.
121
126
func (r * RestoreJob ) Name () string {
122
127
return r .name
@@ -132,49 +137,26 @@ func (r *RestoreJob) Run(ctx context.Context) error {
132
137
}
133
138
134
139
if ! isEmpty {
135
- return errors .New ("the data directory is not empty. Clean the data directory before continue " )
140
+ return errors .New ("the data directory is not empty. Clean the data directory before proceeding " )
136
141
}
137
142
138
- cont , err := r .dockerClient .ContainerCreate (ctx ,
139
- & container.Config {
140
- Env : r .getEnvironmentVariables (),
141
- Image : r .DockerImage ,
142
- },
143
- & container.HostConfig {
144
- Mounts : r .getMountVolumes (),
145
- },
146
- & network.NetworkingConfig {},
147
- restoreContainerName ,
148
- )
149
-
143
+ contID , err := r .startReplica (ctx , r .restoreContainerName ())
150
144
if err != nil {
151
- return errors .Wrap (err , "failed to create container" )
145
+ return errors .Wrapf (err , "failed to create container: %s" , r . restoreContainerName () )
152
146
}
153
147
154
- defer func () {
155
- if err := r .dockerClient .ContainerRemove (ctx , cont .ID , types.ContainerRemoveOptions {
156
- Force : true ,
157
- }); err != nil {
158
- log .Err ("Failed to remove container: " , err )
159
-
160
- return
161
- }
162
-
163
- log .Msg (fmt .Sprintf ("Stop container: %s. ID: %v" , restoreContainerName , cont .ID ))
164
- }()
148
+ defer tools .RemoveContainer (ctx , r .dockerClient , contID , tools .StopTimeout )
165
149
166
- defer tools . RemoveContainer ( ctx , r . dockerClient , cont . ID , tools . StopTimeout )
150
+ log . Msg ( fmt . Sprintf ( "Running container: %s. ID: %v" , r . restoreContainerName (), contID ) )
167
151
168
- log .Msg (fmt .Sprintf ("Running container: %s. ID: %v" , restoreContainerName , cont .ID ))
169
-
170
- if err = r .dockerClient .ContainerStart (ctx , cont .ID , types.ContainerStartOptions {}); err != nil {
171
- return errors .Wrap (err , "failed to start container" )
152
+ if err = r .dockerClient .ContainerStart (ctx , contID , types.ContainerStartOptions {}); err != nil {
153
+ return errors .Wrapf (err , "failed to start container: %v" , contID )
172
154
}
173
155
174
156
log .Msg ("Running restore command" )
175
157
176
- if err := tools .ExecCommand (ctx , r .dockerClient , cont . ID , types.ExecConfig {
177
- Cmd : r .restorer .GetRestoreCommand (),
158
+ if err := tools .ExecCommand (ctx , r .dockerClient , contID , types.ExecConfig {
159
+ Cmd : [] string { "bash" , "-c" , r .restorer .GetRestoreCommand ()} ,
178
160
}); err != nil {
179
161
return errors .Wrap (err , "failed to restore data" )
180
162
}
@@ -209,20 +191,14 @@ func (r *RestoreJob) Run(ctx context.Context) error {
209
191
}
210
192
211
193
// Set permissions.
212
- if err := tools .ExecCommand (ctx , r .dockerClient , cont . ID , types.ExecConfig {
194
+ if err := tools .ExecCommand (ctx , r .dockerClient , contID , types.ExecConfig {
213
195
Cmd : []string {"chown" , "-R" , "postgres" , restoreContainerPath },
214
196
}); err != nil {
215
197
return errors .Wrap (err , "failed to set permissions" )
216
198
}
217
199
218
200
// Start PostgreSQL instance.
219
- startCommand , err := r .dockerClient .ContainerExecCreate (ctx , cont .ID , types.ExecConfig {
220
- AttachStdout : true ,
221
- AttachStderr : true ,
222
- Tty : true ,
223
- Cmd : []string {"postgres" , "-D" , restoreContainerPath },
224
- User : defaults .Username ,
225
- })
201
+ startCommand , err := r .dockerClient .ContainerExecCreate (ctx , contID , startingPostgresConfig ())
226
202
227
203
if err != nil {
228
204
return errors .Wrap (err , "failed to create an exec command" )
@@ -241,11 +217,51 @@ func (r *RestoreJob) Run(ctx context.Context) error {
241
217
return errors .Wrap (err , "failed to refresh data" )
242
218
}
243
219
244
- log .Msg ("Running restore command" )
220
+ log .Msg ("Refresh command has been finished" )
221
+
222
+ if r .CopyOptions .SyncInstance {
223
+ if err := r .runSyncInstance (ctx ); err != nil {
224
+ log .Err ("Failed to run sync instance" , err )
225
+ }
226
+ }
245
227
246
228
return nil
247
229
}
248
230
231
+ func (r * RestoreJob ) startReplica (ctx context.Context , containerName string ) (string , error ) {
232
+ syncInstance , err := r .dockerClient .ContainerCreate (ctx ,
233
+ & container.Config {
234
+ Env : r .getEnvironmentVariables (),
235
+ Image : r .DockerImage ,
236
+ },
237
+ & container.HostConfig {
238
+ Mounts : r .getMountVolumes (),
239
+ },
240
+ & network.NetworkingConfig {},
241
+ containerName ,
242
+ )
243
+
244
+ if err != nil {
245
+ return "" , errors .Wrap (err , "failed to start sync container" )
246
+ }
247
+
248
+ if err = r .dockerClient .ContainerStart (ctx , syncInstance .ID , types.ContainerStartOptions {}); err != nil {
249
+ return "" , errors .Wrap (err , "failed to start sync container" )
250
+ }
251
+
252
+ return syncInstance .ID , nil
253
+ }
254
+
255
+ func startingPostgresConfig () types.ExecConfig {
256
+ return types.ExecConfig {
257
+ AttachStdout : true ,
258
+ AttachStderr : true ,
259
+ Tty : true ,
260
+ Cmd : []string {"postgres" , "-D" , restoreContainerPath },
261
+ User : defaults .Username ,
262
+ }
263
+ }
264
+
249
265
func isDatabaseReady (input io.Reader ) error {
250
266
scanner := bufio .NewScanner (input )
251
267
@@ -272,7 +288,51 @@ func isDatabaseReady(input io.Reader) error {
272
288
return err
273
289
}
274
290
275
- return errors .New ("not found" )
291
+ return errors .New ("database instance is not running" )
292
+ }
293
+
294
+ func (r * RestoreJob ) syncInstanceName () string {
295
+ return tools .SyncInstanceContainerPrefix + r .globalCfg .InstanceID
296
+ }
297
+
298
+ func (r * RestoreJob ) runSyncInstance (ctx context.Context ) error {
299
+ syncContainer , err := r .dockerClient .ContainerInspect (ctx , r .syncInstanceName ())
300
+ if err != nil && ! client .IsErrNotFound (err ) {
301
+ return errors .Wrap (err , "failed to inspect sync container" )
302
+ }
303
+
304
+ if syncContainer .ContainerJSONBase != nil {
305
+ if syncContainer .State .Running {
306
+ log .Msg ("Sync instance is already running" )
307
+ return nil
308
+ }
309
+
310
+ log .Msg ("Removing non-running sync instance" )
311
+
312
+ tools .RemoveContainer (ctx , r .dockerClient , syncContainer .ID , tools .StopTimeout )
313
+ }
314
+
315
+ log .Msg ("Starting sync instance: " , r .syncInstanceName ())
316
+
317
+ syncInstanceID , err := r .startReplica (ctx , r .syncInstanceName ())
318
+ if err != nil {
319
+ return err
320
+ }
321
+
322
+ startSyncCommand , err := r .dockerClient .ContainerExecCreate (ctx , syncInstanceID , startingPostgresConfig ())
323
+ if err != nil {
324
+ return errors .Wrap (err , "failed to create exec command" )
325
+ }
326
+
327
+ if err = r .dockerClient .ContainerExecStart (ctx , startSyncCommand .ID , types.ExecStartCheck {Tty : true }); err != nil {
328
+ return errors .Wrap (err , "failed to attach to exec command" )
329
+ }
330
+
331
+ if err := tools .InspectCommandResponse (ctx , r .dockerClient , startSyncCommand .ID , startSyncCommand .ID ); err != nil {
332
+ return errors .Wrap (err , "failed to perform exec command" )
333
+ }
334
+
335
+ return nil
276
336
}
277
337
278
338
func (r * RestoreJob ) getEnvironmentVariables () []string {
0 commit comments