@@ -37,10 +37,10 @@ const (
37
37
dumpContainerName = "retriever_logical_dump"
38
38
dumpContainerDir = "/tmp/dump"
39
39
40
- // Defines dump connection types.
41
- connectionTypeLocal = "local"
42
- //connectionTypeRemote = "remote"
43
- //connectionTypeRDS = "rds"
40
+ // Defines dump source types.
41
+ sourceTypeLocal = "local"
42
+ sourceTypeRemote = "remote"
43
+ sourceTypeRDS = "rds"
44
44
45
45
// reservePort defines reserve port in case of a local dump.
46
46
reservePort = 9999
@@ -55,6 +55,8 @@ type DumpJob struct {
55
55
name string
56
56
dockerClient * client.Client
57
57
globalCfg * dblabCfg.Global
58
+ config dumpJobConfig
59
+ dumper dumper
58
60
DumpOptions
59
61
}
60
62
@@ -63,14 +65,37 @@ type DumpOptions struct {
63
65
DumpFile string `yaml:"dumpLocation"`
64
66
DockerImage string `yaml:"dockerImage"`
65
67
Connection Connection `yaml:"connection"`
68
+ Source Source `yaml:"source"`
66
69
Partial Partial `yaml:"partial"`
67
70
ParallelJobs int `yaml:"parallelJobs"`
68
71
Restore * DirectRestore `yaml:"restore,omitempty"`
69
72
}
70
73
74
+ // Source describes source of data to dump.
75
+ type Source struct {
76
+ Type string `yaml:"type"`
77
+ Connection Connection `yaml:"connection"`
78
+ RDS * RDSConfig `yaml:"rds"`
79
+ }
80
+
81
+ type dumpJobConfig struct {
82
+ db Connection
83
+ }
84
+
85
+ // dumper describes the interface to prepare environment for a logical dump.
86
+ type dumper interface {
87
+ // GetEnvVariables returns dumper environment variables.
88
+ GetCmdEnvVariables () []string
89
+
90
+ // GetMounts returns dumper volume configurations for mounting.
91
+ GetMounts () []mount.Mount
92
+
93
+ // SetConnectionOptions sets connection options for dumping.
94
+ SetConnectionOptions (context.Context , * Connection ) error
95
+ }
96
+
71
97
// Connection provides connection options.
72
98
type Connection struct {
73
- Type string `yaml:"type"`
74
99
Host string `yaml:"host"`
75
100
Port int `yaml:"port"`
76
101
DBName string `yaml:"dbname"`
@@ -95,12 +120,16 @@ func NewDumpJob(cfg config.JobConfig, docker *client.Client, global *dblabCfg.Gl
95
120
return nil , errors .Wrap (err , "failed to unmarshal configuration options" )
96
121
}
97
122
98
- dumpJob .setDefaults ()
99
-
100
123
if err := dumpJob .validate (); err != nil {
101
124
return nil , errors .Wrap (err , "invalid logical dump job" )
102
125
}
103
126
127
+ dumpJob .setDefaults ()
128
+
129
+ if err := dumpJob .setupDumper (); err != nil {
130
+ return nil , errors .Wrap (err , "failed to set up a dump helper" )
131
+ }
132
+
104
133
return dumpJob , nil
105
134
}
106
135
@@ -113,22 +142,51 @@ Either set 'numberOfJobs' equals to 1 or disable the restore section`)
113
142
return nil
114
143
}
115
144
116
- // Name returns a name of the job.
117
- func (d * DumpJob ) Name () string {
118
- return d .name
119
- }
120
-
121
145
func (d * DumpJob ) setDefaults () {
122
146
// TODO: Default yaml values in tags.
123
- if d .DumpOptions .Connection .Port == 0 {
124
- d .DumpOptions .Connection .Port = defaultPort
147
+ if d .DumpOptions .Source . Connection .Port == 0 {
148
+ d .DumpOptions .Source . Connection .Port = defaultPort
125
149
}
126
150
127
- if d .DumpOptions .Connection .Username == "" {
128
- d .DumpOptions .Connection .Username = defaultUsername
151
+ if d .DumpOptions .Source .Connection .Username == "" {
152
+ d .DumpOptions .Source .Connection .Username = defaultUsername
153
+ }
154
+
155
+ if d .DumpOptions .ParallelJobs == 0 {
156
+ d .DumpOptions .ParallelJobs = defaultParallelJobs
129
157
}
130
158
}
131
159
160
+ // setupDumper sets up a tool to perform physical restoring.
161
+ func (d * DumpJob ) setupDumper () error {
162
+ switch d .Source .Type {
163
+ case sourceTypeLocal , sourceTypeRemote , "" :
164
+ d .dumper = newDefaultDumper ()
165
+ return nil
166
+
167
+ case sourceTypeRDS :
168
+ if d .Source .RDS == nil {
169
+ return errors .New ("the RDS configuration section must not be empty when using the RDS source type" )
170
+ }
171
+
172
+ dumper , err := newRDSDumper (d .Source .RDS )
173
+ if err != nil {
174
+ return errors .Wrap (err , "failed to create an RDS dumper" )
175
+ }
176
+
177
+ d .dumper = dumper
178
+
179
+ return nil
180
+ }
181
+
182
+ return errors .Errorf ("unknown source type given: %v" , d .Source .Type )
183
+ }
184
+
185
+ // Name returns a name of the job.
186
+ func (d * DumpJob ) Name () string {
187
+ return d .name
188
+ }
189
+
132
190
// Run starts the job.
133
191
func (d * DumpJob ) Run (ctx context.Context ) error {
134
192
log .Msg (fmt .Sprintf ("Run job: %s. Options: %v" , d .Name (), d .DumpOptions ))
@@ -160,6 +218,8 @@ func (d *DumpJob) Run(ctx context.Context) error {
160
218
dumpContainerName ,
161
219
)
162
220
if err != nil {
221
+ log .Err (err )
222
+
163
223
return errors .Wrap (err , "failed to create container" )
164
224
}
165
225
@@ -185,6 +245,10 @@ func (d *DumpJob) Run(ctx context.Context) error {
185
245
return errors .Wrap (err , "failed to readiness check" )
186
246
}
187
247
248
+ if err := d .setupConnectionOptions (ctx ); err != nil {
249
+ return errors .Wrap (err , "failed to setup connection options" )
250
+ }
251
+
188
252
dumpCommand := d .buildLogicalDumpCommand ()
189
253
log .Msg ("Running dump command" , dumpCommand )
190
254
@@ -233,6 +297,17 @@ func (d *DumpJob) Run(ctx context.Context) error {
233
297
return nil
234
298
}
235
299
300
+ // setupConnectionOptions prepares connection options to perform a logical dump.
301
+ func (d * DumpJob ) setupConnectionOptions (ctx context.Context ) error {
302
+ d .config .db = d .DumpOptions .Source .Connection
303
+
304
+ if err := d .dumper .SetConnectionOptions (ctx , & d .config .db ); err != nil {
305
+ return errors .Wrap (err , "failed to set connection options" )
306
+ }
307
+
308
+ return nil
309
+ }
310
+
236
311
func (d * DumpJob ) performDumpCommand (ctx context.Context , cmdOutput io.Writer , commandID string ) error {
237
312
execAttach , err := d .dockerClient .ContainerExecAttach (ctx , commandID , types.ExecStartCheck {})
238
313
if err != nil {
@@ -275,9 +350,12 @@ func (d *DumpJob) getDumpContainerPath() string {
275
350
}
276
351
277
352
func (d * DumpJob ) getEnvironmentVariables () []string {
278
- envs := []string {"PGDATA=" + pgDataContainerDir }
353
+ envs := []string {
354
+ "PGDATA=" + pgDataContainerDir ,
355
+ "POSTGRES_HOST_AUTH_METHOD=trust" ,
356
+ }
279
357
280
- if d .DumpOptions .Connection .Type == connectionTypeLocal && d .DumpOptions .Connection .Port == defaultPort {
358
+ if d .DumpOptions .Source .Type == sourceTypeLocal && d .DumpOptions . Source .Connection .Port == defaultPort {
281
359
envs = append (envs , "PGPORT=" + strconv .Itoa (reservePort ))
282
360
}
283
361
@@ -294,33 +372,36 @@ func (d *DumpJob) getMountVolumes() []mount.Mount {
294
372
},
295
373
}
296
374
297
- if d .Connection . Type != connectionTypeLocal && d . DumpOptions .DumpFile != "" {
375
+ if d .DumpOptions .DumpFile != "" {
298
376
mounts = append (mounts , mount.Mount {
299
377
Type : mount .TypeBind ,
300
378
Source : filepath .Dir (d .DumpOptions .DumpFile ),
301
379
Target : dumpContainerDir ,
302
380
})
303
381
}
304
382
383
+ // Add dump specific mounts.
384
+ mounts = append (mounts , d .dumper .GetMounts ()... )
385
+
305
386
return mounts
306
387
}
307
388
308
389
func (d * DumpJob ) getContainerNetworkMode () container.NetworkMode {
309
390
networkMode := networkModeDefault
310
391
311
- if d .Connection .Type == connectionTypeLocal {
392
+ if d .Source .Type == sourceTypeLocal {
312
393
networkMode = networkModeHost
313
394
}
314
395
315
396
return networkMode
316
397
}
317
398
318
399
func (d * DumpJob ) getExecEnvironmentVariables () []string {
319
- execEnvs := [] string {}
400
+ execEnvs := d . dumper . GetCmdEnvVariables ()
320
401
321
- pgPassword := d .DumpOptions . Connection .Password
402
+ pgPassword := d .config . db .Password
322
403
323
- if os .Getenv ("PGPASSWORD" ) != "" {
404
+ if pgPassword == "" && os .Getenv ("PGPASSWORD" ) != "" {
324
405
pgPassword = os .Getenv ("PGPASSWORD" )
325
406
}
326
407
@@ -335,10 +416,10 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
335
416
dumpCmd := []string {"pg_dump" , "-C" }
336
417
337
418
optionalArgs := map [string ]string {
338
- "-h" : d .DumpOptions . Connection .Host ,
339
- "-p" : strconv .Itoa (d .DumpOptions . Connection .Port ),
340
- "-U" : d .DumpOptions . Connection .Username ,
341
- "-d" : d .DumpOptions . Connection .DBName ,
419
+ "-h" : d .config . db .Host ,
420
+ "-p" : strconv .Itoa (d .config . db .Port ),
421
+ "-U" : d .config . db .Username ,
422
+ "-d" : d .config . db .DBName ,
342
423
"-j" : strconv .Itoa (d .DumpOptions .ParallelJobs ),
343
424
}
344
425
dumpCmd = append (dumpCmd , prepareCmdOptions (optionalArgs )... )
@@ -351,7 +432,7 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
351
432
dumpCmd = append (dumpCmd , d .buildLogicalRestoreCommand ()... )
352
433
cmd := strings .Join (dumpCmd , " " )
353
434
354
- log .Msg (cmd )
435
+ log .Dbg (cmd )
355
436
356
437
return []string {"sh" , "-c" , cmd }
357
438
}
0 commit comments