6
6
package physical
7
7
8
8
import (
9
+ "bufio"
9
10
"context"
10
11
"fmt"
11
12
"io"
12
13
"os"
14
+ "path"
15
+ "strconv"
16
+ "strings"
17
+ "time"
13
18
14
19
"github.com/docker/docker/api/types"
15
20
"github.com/docker/docker/api/types/container"
@@ -23,7 +28,9 @@ import (
23
28
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/config"
24
29
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/dbmarker"
25
30
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/initialize/tools"
31
+ "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/initialize/tools/defaults"
26
32
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/options"
33
+ "gitlab.com/postgres-ai/database-lab/pkg/services/provision/databases/postgres/configuration"
27
34
)
28
35
29
36
const (
@@ -32,6 +39,8 @@ const (
32
39
33
40
restoreContainerName = "retriever_physical_restore"
34
41
restoreContainerPath = "/var/lib/postgresql/dblabdata"
42
+
43
+ readyLogLine = "database system is ready to accept"
35
44
)
36
45
37
46
// RestoreJob describes a job for physical restoring.
@@ -63,6 +72,9 @@ type restorer interface {
63
72
64
73
// GetRestoreCommand returns a command to restore data.
65
74
GetRestoreCommand () []string
75
+
76
+ // GetRecoveryConfig returns a recovery config to restore data.
77
+ GetRecoveryConfig () []byte
66
78
}
67
79
68
80
// NewJob creates a new physical restore job.
@@ -147,48 +159,106 @@ func (r *RestoreJob) Run(ctx context.Context) error {
147
159
log .Msg (fmt .Sprintf ("Stop container: %s. ID: %v" , restoreContainerName , cont .ID ))
148
160
}()
149
161
162
+ defer tools .RemoveContainer (ctx , r .dockerClient , cont .ID , tools .StopTimeout )
163
+
164
+ log .Msg (fmt .Sprintf ("Running container: %s. ID: %v" , restoreContainerName , cont .ID ))
165
+
150
166
if err = r .dockerClient .ContainerStart (ctx , cont .ID , types.ContainerStartOptions {}); err != nil {
151
167
return errors .Wrap (err , "failed to start container" )
152
168
}
153
169
154
- log .Msg (fmt .Sprintf ("Running container: %s. ID: %v" , restoreContainerName , cont .ID ))
170
+ log .Msg ("Running restore command" )
171
+
172
+ if err := tools .ExecCommand (ctx , r .dockerClient , cont .ID , types.ExecConfig {
173
+ Cmd : r .restorer .GetRestoreCommand (),
174
+ }); err != nil {
175
+ return errors .Wrap (err , "failed to restore data" )
176
+ }
177
+
178
+ log .Msg ("Restoring job has been finished" )
179
+
180
+ if err := r .markDatabaseData (); err != nil {
181
+ log .Err ("Failed to mark database data: " , err )
182
+ }
155
183
156
- execCommand , err := r .dockerClient .ContainerExecCreate (ctx , cont .ID , types.ExecConfig {
157
- AttachStdin : false ,
184
+ pgVersion , err := tools .DetectPGVersion (r .globalCfg .DataDir )
185
+ if err != nil {
186
+ return errors .Wrap (err , "failed to detect the Postgres version" )
187
+ }
188
+
189
+ if err := configuration .Run (r .globalCfg .DataDir ); err != nil {
190
+ return errors .Wrap (err , "failed to configure" )
191
+ }
192
+
193
+ if err := r .adjustRecoveryConfiguration (pgVersion , r .globalCfg .DataDir ); err != nil {
194
+ return err
195
+ }
196
+
197
+ // Set permissions.
198
+ if err := tools .ExecCommand (ctx , r .dockerClient , cont .ID , types.ExecConfig {
199
+ Cmd : []string {"chown" , "-R" , "postgres" , restoreContainerPath },
200
+ }); err != nil {
201
+ return errors .Wrap (err , "failed to set permissions" )
202
+ }
203
+
204
+ // Start PostgreSQL instance.
205
+ startCommand , err := r .dockerClient .ContainerExecCreate (ctx , cont .ID , types.ExecConfig {
158
206
AttachStdout : true ,
159
207
AttachStderr : true ,
160
208
Tty : true ,
161
- Cmd : r .restorer .GetRestoreCommand (),
209
+ Cmd : []string {"postgres" , "-D" , restoreContainerPath },
210
+ User : defaults .Username ,
162
211
})
163
212
164
213
if err != nil {
165
214
return errors .Wrap (err , "failed to create an exec command" )
166
215
}
167
216
168
- log .Msg ("Running restore command" )
217
+ log .Msg ("Running refresh command" )
169
218
170
- attachResponse , err := r .dockerClient .ContainerExecAttach (ctx , execCommand .ID , types.ExecStartCheck {Tty : true })
219
+ attachResponse , err := r .dockerClient .ContainerExecAttach (ctx , startCommand .ID , types.ExecStartCheck {Tty : true })
171
220
if err != nil {
172
221
return errors .Wrap (err , "failed to attach to the exec command" )
173
222
}
174
223
175
224
defer attachResponse .Close ()
176
225
177
- if err := waitForCommandResponse ( ctx , attachResponse ); err != nil {
178
- return errors .Wrap (err , "failed to exec the command " )
226
+ if err := isDatabaseReady ( attachResponse . Reader ); err != nil {
227
+ return errors .Wrap (err , "failed to refresh data " )
179
228
}
180
229
181
- if err := tools .InspectCommandResponse (ctx , r .dockerClient , cont .ID , execCommand .ID ); err != nil {
182
- return errors .Wrap (err , "failed to exec the restore command" )
183
- }
230
+ log .Msg ("Running restore command" )
184
231
185
- log .Msg ("Restoring job has been finished" )
232
+ return nil
233
+ }
186
234
187
- if err := r .markDatabaseData (); err != nil {
188
- log .Err ("Failed to mark database data: " , err )
235
+ func isDatabaseReady (input io.Reader ) error {
236
+ scanner := bufio .NewScanner (input )
237
+
238
+ timer := time .NewTimer (time .Minute )
239
+ defer timer .Stop ()
240
+
241
+ for scanner .Scan () {
242
+ select {
243
+ case <- timer .C :
244
+ return errors .New ("timeout exceeded" )
245
+ default :
246
+ }
247
+
248
+ text := scanner .Text ()
249
+
250
+ if strings .Contains (text , readyLogLine ) {
251
+ return nil
252
+ }
253
+
254
+ fmt .Println (text )
189
255
}
190
256
191
- return nil
257
+ if err := scanner .Err (); err != nil {
258
+ return err
259
+ }
260
+
261
+ return errors .New ("not found" )
192
262
}
193
263
194
264
func (r * RestoreJob ) getEnvironmentVariables () []string {
@@ -226,22 +296,50 @@ func (r *RestoreJob) markDatabaseData() error {
226
296
return r .dbMarker .SaveConfig (& dbmarker.Config {DataType : dbmarker .PhysicalDataType })
227
297
}
228
298
229
- func waitForCommandResponse (ctx context.Context , attachResponse types.HijackedResponse ) error {
230
- waitCommandCh := make (chan struct {})
299
+ func (r * RestoreJob ) adjustRecoveryConfiguration (pgVersion , pgDataDir string ) error {
300
+ // Remove postmaster.pid.
301
+ if err := os .Remove (path .Join (pgDataDir , "postmaster.pid" )); err != nil && ! errors .Is (err , os .ErrNotExist ) {
302
+ return errors .Wrap (err , "failed to remove postmaster.pid" )
303
+ }
304
+
305
+ // Truncate pg_ident.conf.
306
+ if err := tools .TouchFile (path .Join (pgDataDir , "pg_ident.conf" )); err != nil {
307
+ return errors .Wrap (err , "failed to truncate pg_ident.conf" )
308
+ }
309
+
310
+ // Replication mode.
311
+ var recoveryFilename string
312
+
313
+ if len (r .restorer .GetRecoveryConfig ()) == 0 {
314
+ return nil
315
+ }
316
+
317
+ version , err := strconv .Atoi (pgVersion )
318
+ if err != nil {
319
+ return errors .Wrap (err , "failed to parse PostgreSQL version" )
320
+ }
321
+
322
+ const pgVersion12 = 12
231
323
232
- go func () {
233
- if _ , err := io . Copy ( os . Stdout , attachResponse . Reader ); err != nil {
234
- log . Err ( "failed to get command output:" , err )
324
+ if version >= pgVersion12 {
325
+ if err := tools . TouchFile ( path . Join ( pgDataDir , "standby.signal" ) ); err != nil {
326
+ return err
235
327
}
236
328
237
- waitCommandCh <- struct {}{}
238
- }()
329
+ recoveryFilename = "postgresql.conf"
330
+ } else {
331
+ recoveryFilename = "recovery.conf"
332
+ }
333
+
334
+ recoveryFile , err := os .OpenFile (path .Join (pgDataDir , recoveryFilename ), os .O_APPEND | os .O_CREATE | os .O_WRONLY , 0644 )
335
+ if err != nil {
336
+ return err
337
+ }
239
338
240
- select {
241
- case <- ctx .Done ():
242
- return ctx .Err ()
339
+ defer func () { _ = recoveryFile .Close () }()
243
340
244
- case <- waitCommandCh :
341
+ if _ , err := recoveryFile .Write (r .restorer .GetRecoveryConfig ()); err != nil {
342
+ return err
245
343
}
246
344
247
345
return nil
0 commit comments