Skip to content

Commit 79ff961

Browse files
committed
Merge branch '71-rds-dump' into 'master'
feat: support a data import from RDS (#126) Closes #126 See merge request postgres-ai/database-lab!126
2 parents 2c124b7 + bb57bfc commit 79ff961

File tree

8 files changed

+406
-44
lines changed

8 files changed

+406
-44
lines changed

configs/config.sample.yml

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,37 @@ retrieval:
103103
dumpLocation: /tmp/db.dump
104104

105105
# The Docker image containing the tools required to get a dump.
106-
dockerImage: "postgresai/retrieval:12"
106+
dockerImage: "postgres:12-alpine"
107107

108-
# Connection parameters of the database to be dumped.
109-
connection:
108+
# Source of data.
109+
source:
110+
# Source type: local, remote, rds.
110111
type: local
111-
dbname: postgres
112-
host: 127.0.0.1
113-
port: 5432
114-
username: postgres
115-
password: postgres # The environment variable PGPASSWORD can be used instead of this option.
112+
113+
# Connection parameters of the database to be dumped.
114+
connection:
115+
dbname: postgres
116+
host: 127.0.0.1
117+
port: 5432
118+
username: postgres
119+
password: postgres # The environment variable PGPASSWORD can be used instead of this option.
120+
121+
# Optional definition of RDS data source.
122+
rds:
123+
# RDS policy name.
124+
iamPolicyName: rds-dblab-retrieval
125+
126+
# AWS Region.
127+
awsRegion: us-east-2
128+
129+
# AWS username.
130+
username: db_user
131+
132+
# RDS instance Identifier.
133+
dbInstanceIdentifier: database-1
134+
135+
# Path to the SSL root certificate: https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem
136+
sslRootCert: "/tmp/rds-combined-ca-bundle.pem"
116137

117138
# Options for a partial dump.
118139
partial:

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/AlekSi/pointer v1.1.0
77
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
88
github.com/Microsoft/go-winio v0.4.14 // indirect
9+
github.com/aws/aws-sdk-go v1.33.8
910
github.com/containerd/containerd v1.3.4 // indirect
1011
github.com/docker/distribution v2.7.1+incompatible // indirect
1112
github.com/docker/docker v1.13.1
@@ -18,18 +19,18 @@ require (
1819
github.com/morikuni/aec v1.0.0 // indirect
1920
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
2021
github.com/opencontainers/image-spec v1.0.1 // indirect
21-
github.com/pkg/errors v0.8.1
22+
github.com/pkg/errors v0.9.1
2223
github.com/rs/xid v1.2.1
2324
github.com/sergi/go-diff v1.1.0
2425
github.com/sirupsen/logrus v1.4.2
25-
github.com/stretchr/testify v1.4.0
26+
github.com/stretchr/testify v1.5.1
2627
github.com/urfave/cli/v2 v2.1.1
2728
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9
28-
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
2929
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 // indirect
3030
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect
3131
google.golang.org/grpc v1.30.0 // indirect
3232
gopkg.in/yaml.v2 v2.2.7
33+
gotest.tools v2.2.0+incompatible // indirect
3334
)
3435

3536
replace github.com/docker/docker v1.13.1 => github.com/docker/engine v17.12.0-ce-rc1.0.20200531234253-77e06fda0c94+incompatible

go.sum

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
77
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
88
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
99
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
10+
github.com/aws/aws-sdk-go v1.33.8 h1:2/sOfb9oPHTRZ0lxinoaTPDcYwNa1H/SpKP4nVRBwmg=
11+
github.com/aws/aws-sdk-go v1.33.8/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
1012
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
1113
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
1214
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@@ -28,6 +30,7 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
2830
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
2931
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
3032
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
33+
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
3134
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
3235
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
3336
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@@ -43,6 +46,8 @@ github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
4346
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
4447
github.com/jessevdk/go-flags v1.4.1-0.20181221193153-c0795c8afcf4 h1:xKkUL6QBojwguhKKetf1SocCAKqc6W7S/mGm9xEGllo=
4548
github.com/jessevdk/go-flags v1.4.1-0.20181221193153-c0795c8afcf4/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
49+
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
50+
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
4651
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
4752
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
4853
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@@ -62,6 +67,8 @@ github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVo
6267
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
6368
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
6469
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
70+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
71+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
6572
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
6673
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
6774
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -81,6 +88,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
8188
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
8289
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
8390
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
91+
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
92+
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
8493
github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k=
8594
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
8695
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -95,8 +104,8 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
95104
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
96105
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
97106
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
98-
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA=
99-
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
107+
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
108+
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
100109
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
101110
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
102111
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

pkg/retrieval/engine/postgres/initialize/logical/dump.go

Lines changed: 109 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ const (
3737
dumpContainerName = "retriever_logical_dump"
3838
dumpContainerDir = "/tmp/dump"
3939

40-
// Defines dump connection types.
41-
connectionTypeLocal = "local"
42-
//connectionTypeRemote = "remote"
43-
//connectionTypeRDS = "rds"
40+
// Defines dump source types.
41+
sourceTypeLocal = "local"
42+
sourceTypeRemote = "remote"
43+
sourceTypeRDS = "rds"
4444

4545
// reservePort defines reserve port in case of a local dump.
4646
reservePort = 9999
@@ -55,6 +55,8 @@ type DumpJob struct {
5555
name string
5656
dockerClient *client.Client
5757
globalCfg *dblabCfg.Global
58+
config dumpJobConfig
59+
dumper dumper
5860
DumpOptions
5961
}
6062

@@ -63,14 +65,37 @@ type DumpOptions struct {
6365
DumpFile string `yaml:"dumpLocation"`
6466
DockerImage string `yaml:"dockerImage"`
6567
Connection Connection `yaml:"connection"`
68+
Source Source `yaml:"source"`
6669
Partial Partial `yaml:"partial"`
6770
ParallelJobs int `yaml:"parallelJobs"`
6871
Restore *DirectRestore `yaml:"restore,omitempty"`
6972
}
7073

74+
// Source describes source of data to dump.
75+
type Source struct {
76+
Type string `yaml:"type"`
77+
Connection Connection `yaml:"connection"`
78+
RDS *RDSConfig `yaml:"rds"`
79+
}
80+
81+
type dumpJobConfig struct {
82+
db Connection
83+
}
84+
85+
// dumper describes the interface to prepare environment for a logical dump.
86+
type dumper interface {
87+
// GetEnvVariables returns dumper environment variables.
88+
GetCmdEnvVariables() []string
89+
90+
// GetMounts returns dumper volume configurations for mounting.
91+
GetMounts() []mount.Mount
92+
93+
// SetConnectionOptions sets connection options for dumping.
94+
SetConnectionOptions(context.Context, *Connection) error
95+
}
96+
7197
// Connection provides connection options.
7298
type Connection struct {
73-
Type string `yaml:"type"`
7499
Host string `yaml:"host"`
75100
Port int `yaml:"port"`
76101
DBName string `yaml:"dbname"`
@@ -95,12 +120,16 @@ func NewDumpJob(cfg config.JobConfig, docker *client.Client, global *dblabCfg.Gl
95120
return nil, errors.Wrap(err, "failed to unmarshal configuration options")
96121
}
97122

98-
dumpJob.setDefaults()
99-
100123
if err := dumpJob.validate(); err != nil {
101124
return nil, errors.Wrap(err, "invalid logical dump job")
102125
}
103126

127+
dumpJob.setDefaults()
128+
129+
if err := dumpJob.setupDumper(); err != nil {
130+
return nil, errors.Wrap(err, "failed to set up a dump helper")
131+
}
132+
104133
return dumpJob, nil
105134
}
106135

@@ -113,22 +142,51 @@ Either set 'numberOfJobs' equals to 1 or disable the restore section`)
113142
return nil
114143
}
115144

116-
// Name returns a name of the job.
117-
func (d *DumpJob) Name() string {
118-
return d.name
119-
}
120-
121145
func (d *DumpJob) setDefaults() {
122146
// TODO: Default yaml values in tags.
123-
if d.DumpOptions.Connection.Port == 0 {
124-
d.DumpOptions.Connection.Port = defaultPort
147+
if d.DumpOptions.Source.Connection.Port == 0 {
148+
d.DumpOptions.Source.Connection.Port = defaultPort
125149
}
126150

127-
if d.DumpOptions.Connection.Username == "" {
128-
d.DumpOptions.Connection.Username = defaultUsername
151+
if d.DumpOptions.Source.Connection.Username == "" {
152+
d.DumpOptions.Source.Connection.Username = defaultUsername
153+
}
154+
155+
if d.DumpOptions.ParallelJobs == 0 {
156+
d.DumpOptions.ParallelJobs = defaultParallelJobs
129157
}
130158
}
131159

160+
// setupDumper sets up a tool to perform physical restoring.
161+
func (d *DumpJob) setupDumper() error {
162+
switch d.Source.Type {
163+
case sourceTypeLocal, sourceTypeRemote, "":
164+
d.dumper = newDefaultDumper()
165+
return nil
166+
167+
case sourceTypeRDS:
168+
if d.Source.RDS == nil {
169+
return errors.New("the RDS configuration section must not be empty when using the RDS source type")
170+
}
171+
172+
dumper, err := newRDSDumper(d.Source.RDS)
173+
if err != nil {
174+
return errors.Wrap(err, "failed to create an RDS dumper")
175+
}
176+
177+
d.dumper = dumper
178+
179+
return nil
180+
}
181+
182+
return errors.Errorf("unknown source type given: %v", d.Source.Type)
183+
}
184+
185+
// Name returns a name of the job.
186+
func (d *DumpJob) Name() string {
187+
return d.name
188+
}
189+
132190
// Run starts the job.
133191
func (d *DumpJob) Run(ctx context.Context) error {
134192
log.Msg(fmt.Sprintf("Run job: %s. Options: %v", d.Name(), d.DumpOptions))
@@ -160,6 +218,8 @@ func (d *DumpJob) Run(ctx context.Context) error {
160218
dumpContainerName,
161219
)
162220
if err != nil {
221+
log.Err(err)
222+
163223
return errors.Wrap(err, "failed to create container")
164224
}
165225

@@ -185,6 +245,10 @@ func (d *DumpJob) Run(ctx context.Context) error {
185245
return errors.Wrap(err, "failed to readiness check")
186246
}
187247

248+
if err := d.setupConnectionOptions(ctx); err != nil {
249+
return errors.Wrap(err, "failed to setup connection options")
250+
}
251+
188252
dumpCommand := d.buildLogicalDumpCommand()
189253
log.Msg("Running dump command", dumpCommand)
190254

@@ -233,6 +297,17 @@ func (d *DumpJob) Run(ctx context.Context) error {
233297
return nil
234298
}
235299

300+
// setupConnectionOptions prepares connection options to perform a logical dump.
301+
func (d *DumpJob) setupConnectionOptions(ctx context.Context) error {
302+
d.config.db = d.DumpOptions.Source.Connection
303+
304+
if err := d.dumper.SetConnectionOptions(ctx, &d.config.db); err != nil {
305+
return errors.Wrap(err, "failed to set connection options")
306+
}
307+
308+
return nil
309+
}
310+
236311
func (d *DumpJob) performDumpCommand(ctx context.Context, cmdOutput io.Writer, commandID string) error {
237312
execAttach, err := d.dockerClient.ContainerExecAttach(ctx, commandID, types.ExecStartCheck{})
238313
if err != nil {
@@ -275,9 +350,12 @@ func (d *DumpJob) getDumpContainerPath() string {
275350
}
276351

277352
func (d *DumpJob) getEnvironmentVariables() []string {
278-
envs := []string{"PGDATA=" + pgDataContainerDir}
353+
envs := []string{
354+
"PGDATA=" + pgDataContainerDir,
355+
"POSTGRES_HOST_AUTH_METHOD=trust",
356+
}
279357

280-
if d.DumpOptions.Connection.Type == connectionTypeLocal && d.DumpOptions.Connection.Port == defaultPort {
358+
if d.DumpOptions.Source.Type == sourceTypeLocal && d.DumpOptions.Source.Connection.Port == defaultPort {
281359
envs = append(envs, "PGPORT="+strconv.Itoa(reservePort))
282360
}
283361

@@ -294,33 +372,36 @@ func (d *DumpJob) getMountVolumes() []mount.Mount {
294372
},
295373
}
296374

297-
if d.Connection.Type != connectionTypeLocal && d.DumpOptions.DumpFile != "" {
375+
if d.DumpOptions.DumpFile != "" {
298376
mounts = append(mounts, mount.Mount{
299377
Type: mount.TypeBind,
300378
Source: filepath.Dir(d.DumpOptions.DumpFile),
301379
Target: dumpContainerDir,
302380
})
303381
}
304382

383+
// Add dump specific mounts.
384+
mounts = append(mounts, d.dumper.GetMounts()...)
385+
305386
return mounts
306387
}
307388

308389
func (d *DumpJob) getContainerNetworkMode() container.NetworkMode {
309390
networkMode := networkModeDefault
310391

311-
if d.Connection.Type == connectionTypeLocal {
392+
if d.Source.Type == sourceTypeLocal {
312393
networkMode = networkModeHost
313394
}
314395

315396
return networkMode
316397
}
317398

318399
func (d *DumpJob) getExecEnvironmentVariables() []string {
319-
execEnvs := []string{}
400+
execEnvs := d.dumper.GetCmdEnvVariables()
320401

321-
pgPassword := d.DumpOptions.Connection.Password
402+
pgPassword := d.config.db.Password
322403

323-
if os.Getenv("PGPASSWORD") != "" {
404+
if pgPassword == "" && os.Getenv("PGPASSWORD") != "" {
324405
pgPassword = os.Getenv("PGPASSWORD")
325406
}
326407

@@ -335,10 +416,10 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
335416
dumpCmd := []string{"pg_dump", "-C"}
336417

337418
optionalArgs := map[string]string{
338-
"-h": d.DumpOptions.Connection.Host,
339-
"-p": strconv.Itoa(d.DumpOptions.Connection.Port),
340-
"-U": d.DumpOptions.Connection.Username,
341-
"-d": d.DumpOptions.Connection.DBName,
419+
"-h": d.config.db.Host,
420+
"-p": strconv.Itoa(d.config.db.Port),
421+
"-U": d.config.db.Username,
422+
"-d": d.config.db.DBName,
342423
"-j": strconv.Itoa(d.DumpOptions.ParallelJobs),
343424
}
344425
dumpCmd = append(dumpCmd, prepareCmdOptions(optionalArgs)...)
@@ -351,7 +432,7 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
351432
dumpCmd = append(dumpCmd, d.buildLogicalRestoreCommand()...)
352433
cmd := strings.Join(dumpCmd, " ")
353434

354-
log.Msg(cmd)
435+
log.Dbg(cmd)
355436

356437
return []string{"sh", "-c", cmd}
357438
}

0 commit comments

Comments
 (0)