Skip to content

Commit 5b95c2d

Browse files
committed
Merge branch '224-parallel-job-configs' into 'master'
fix: incompatible options for parallel jobs in logical retrieval steps (#224) Closes #224 See merge request postgres-ai/database-lab!238
2 parents 9a4fe6b + 800b044 commit 5b95c2d

File tree

4 files changed

+71
-18
lines changed

4 files changed

+71
-18
lines changed

pkg/retrieval/engine/postgres/logical/dump.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ const (
4848
// Container network modes.
4949
networkModeDefault = container.NetworkMode("default")
5050
networkModeHost = container.NetworkMode("host")
51+
52+
// PostgreSQL pg_dump formats.
53+
customFormat = "custom"
54+
directoryFormat = "directory"
5155
)
5256

5357
// DumpJob declares a job for logical dumping.
@@ -397,12 +401,6 @@ func (d *DumpJob) getExecEnvironmentVariables() []string {
397401
}
398402

399403
func (d *DumpJob) buildLogicalDumpCommand() []string {
400-
format := "custom"
401-
402-
if d.DumpOptions.ParallelJobs > defaultParallelJobs {
403-
format = "directory"
404-
}
405-
406404
optionalArgs := map[string]string{
407405
"--host": d.config.db.Host,
408406
"--port": strconv.Itoa(d.config.db.Port),
@@ -411,14 +409,15 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
411409
"--jobs": strconv.Itoa(d.DumpOptions.ParallelJobs),
412410
}
413411

414-
dumpCmd := append([]string{"pg_dump", "--create", "--format", format}, prepareCmdOptions(optionalArgs)...)
412+
dumpCmd := append([]string{"pg_dump", "--create"}, prepareCmdOptions(optionalArgs)...)
415413

416414
for _, table := range d.Partial.Tables {
417415
dumpCmd = append(dumpCmd, "--table", table)
418416
}
419417

420418
// Define if restore directly or export to dump location.
421419
if d.DumpOptions.Restore != nil {
420+
dumpCmd = append(dumpCmd, "--format", customFormat)
422421
dumpCmd = append(dumpCmd, d.buildLogicalRestoreCommand()...)
423422
cmd := strings.Join(dumpCmd, " ")
424423

@@ -427,7 +426,7 @@ func (d *DumpJob) buildLogicalDumpCommand() []string {
427426
return []string{"sh", "-c", cmd}
428427
}
429428

430-
dumpCmd = append(dumpCmd, "--file", d.DumpOptions.DumpLocation)
429+
dumpCmd = append(dumpCmd, "--format", directoryFormat, "--file", d.DumpOptions.DumpLocation)
431430

432431
return dumpCmd
433432
}

pkg/retrieval/engine/postgres/logical/restore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ const (
3737

3838
// const defines restore options.
3939
restoreContainerPrefix = "dblab_lr_"
40-
defaultParallelJobs = 1
40+
41+
// defaultParallelJobs declares a default number of parallel jobs for logical dump and restore.
42+
defaultParallelJobs = 1
4143
)
4244

4345
// RestoreJob defines a logical restore job.

pkg/retrieval/retrieval.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Retrieval struct {
4040
scheduler *cron.Cron
4141
retrieveMutex sync.Mutex
4242
ctxCancel context.CancelFunc
43+
jobSpecs map[string]config.JobSpec
4344
}
4445

4546
// New creates a new data retrieval.
@@ -50,6 +51,7 @@ func New(cfg *dblabCfg.Config, docker *client.Client, pm *pool.Manager, runner r
5051
docker: docker,
5152
poolManager: pm,
5253
runner: runner,
54+
jobSpecs: make(map[string]config.JobSpec, len(cfg.Retrieval.Jobs)),
5355
}
5456
}
5557

@@ -147,6 +149,7 @@ func (r *Retrieval) parseJobs(fsm pool.FSManager) error {
147149
}
148150

149151
jobSpec.Name = jobName
152+
r.jobSpecs[jobName] = jobSpec
150153

151154
jobCfg := config.JobConfig{
152155
Spec: jobSpec,
@@ -172,16 +175,10 @@ func (r *Retrieval) addJob(job components.JobRunner) {
172175
}
173176

174177
func (r *Retrieval) validate() error {
175-
jobsList := make(map[string]struct{}, len(r.jobs))
178+
_, hasLogicalRestore := r.jobSpecs[logical.RestoreJobType]
179+
_, hasPhysicalRestore := r.jobSpecs[physical.RestoreJobType]
176180

177-
for _, job := range r.jobs {
178-
jobsList[job.Name()] = struct{}{}
179-
}
180-
181-
_, hasLogical := jobsList[logical.RestoreJobType]
182-
_, hasPhysical := jobsList[physical.RestoreJobType]
183-
184-
if hasLogical && hasPhysical {
181+
if hasLogicalRestore && hasPhysicalRestore {
185182
return errors.New("must not contain physical and logical restore jobs simultaneously")
186183
}
187184

pkg/retrieval/retrieval_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package retrieval
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/config"
9+
)
10+
11+
func TestParallelJobSpecs(t *testing.T) {
12+
testCases := []map[string]config.JobSpec{
13+
{
14+
"logicalRestore": {},
15+
},
16+
{
17+
"physicalRestore": {},
18+
},
19+
{
20+
"logicalDump": {},
21+
},
22+
{
23+
"logicalDump": {},
24+
"logicalRestore": {},
25+
},
26+
}
27+
28+
for _, tc := range testCases {
29+
r := Retrieval{
30+
jobSpecs: tc,
31+
}
32+
33+
err := r.validate()
34+
assert.Nil(t, err)
35+
}
36+
37+
}
38+
39+
func TestInvalidParallelJobSpecs(t *testing.T) {
40+
testCases := []map[string]config.JobSpec{
41+
{
42+
"physicalRestore": {},
43+
"logicalRestore": {},
44+
},
45+
}
46+
47+
for _, tc := range testCases {
48+
r := Retrieval{
49+
jobSpecs: tc,
50+
}
51+
52+
err := r.validate()
53+
assert.Error(t, err)
54+
}
55+
}

0 commit comments

Comments
 (0)