Skip to content

Commit f75aecb

Browse files
committed
fix: various issues related to the automated snapshot management and sync instance (#152)
1 parent aa065ba commit f75aecb

File tree

4 files changed

+60
-52
lines changed

4 files changed

+60
-52
lines changed

pkg/retrieval/engine/postgres/initialize/physical/physical.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,23 @@ func (r *RestoreJob) Name() string {
128128
func (r *RestoreJob) Run(ctx context.Context) (err error) {
129129
log.Msg(fmt.Sprintf("Run job: %s. Options: %v", r.Name(), r.CopyOptions))
130130

131+
defer func() {
132+
if err != nil && r.CopyOptions.SyncInstance {
133+
if syncErr := r.runSyncInstance(ctx); syncErr != nil {
134+
log.Err("Failed to run sync instance", syncErr)
135+
}
136+
}
137+
}()
138+
131139
isEmpty, err := tools.IsEmptyDirectory(r.globalCfg.DataDir)
132140
if err != nil {
133141
return errors.Wrap(err, "failed to explore the data directory")
134142
}
135143

136144
if !isEmpty {
137-
return errors.New("the data directory is not empty. Clean the data directory before proceeding")
145+
log.Msg("Data directory is not empty. Skipping physical restore.")
146+
147+
return nil
138148
}
139149

140150
if err := tools.PullImage(ctx, r.dockerClient, r.CopyOptions.DockerImage); err != nil {
@@ -213,7 +223,7 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
213223

214224
log.Msg("Running refresh command")
215225

216-
attachResponse, err := r.dockerClient.ContainerExecAttach(ctx, startCommand.ID, types.ExecStartCheck{Tty: true})
226+
attachResponse, err := r.dockerClient.ContainerExecAttach(ctx, startCommand.ID, types.ExecStartCheck{})
217227
if err != nil {
218228
return errors.Wrap(err, "failed to attach to the exec command")
219229
}
@@ -226,12 +236,6 @@ func (r *RestoreJob) Run(ctx context.Context) (err error) {
226236

227237
log.Msg("Refresh command has been finished")
228238

229-
if r.CopyOptions.SyncInstance {
230-
if err := r.runSyncInstance(ctx); err != nil {
231-
log.Err("Failed to run sync instance", err)
232-
}
233-
}
234-
235239
return nil
236240
}
237241

@@ -263,7 +267,6 @@ func startingPostgresConfig(pgDataDir string) types.ExecConfig {
263267
return types.ExecConfig{
264268
AttachStdout: true,
265269
AttachStderr: true,
266-
Tty: true,
267270
Cmd: []string{"postgres", "-D", pgDataDir},
268271
User: defaults.Username,
269272
}

pkg/retrieval/engine/postgres/initialize/snapshot/physical.go

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,21 @@ func NewPhysicalInitialJob(cfg config.JobConfig, docker *client.Client, cloneMan
111111
}
112112

113113
func (p *PhysicalInitial) setupScheduler() error {
114-
if p.options.Scheduler == nil {
114+
if p.options.Scheduler == nil ||
115+
p.options.Scheduler.Snapshot.Timetable == "" && p.options.Scheduler.Retention.Timetable == "" {
115116
return nil
116117
}
117118

118119
specParser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
119120

120-
if _, err := specParser.Parse(p.options.Scheduler.Snapshot.Timetable); err != nil {
121+
if _, err := specParser.Parse(p.options.Scheduler.Snapshot.Timetable); p.options.Scheduler.Snapshot.Timetable != "" && err != nil {
121122
return errors.Wrapf(err, "failed to parse schedule timetable %q", p.options.Scheduler.Snapshot.Timetable)
122123
}
123124

124-
if _, err := specParser.Parse(p.options.Scheduler.Retention.Timetable); err != nil {
125+
if _, err := specParser.Parse(p.options.Scheduler.Retention.Timetable); p.options.Scheduler.Retention.Timetable != "" && err != nil {
125126
return errors.Wrapf(err, "failed to parse retention timetable %q", p.options.Scheduler.Retention.Timetable)
126127
}
127128

128-
p.scheduleOnce = sync.Once{}
129129
p.scheduler = cron.New()
130130

131131
return nil
@@ -141,7 +141,7 @@ func (p *PhysicalInitial) Name() string {
141141
}
142142

143143
// Run starts the job.
144-
func (p *PhysicalInitial) Run(ctx context.Context) error {
144+
func (p *PhysicalInitial) Run(ctx context.Context) (err error) {
145145
p.scheduleOnce.Do(p.startScheduler(ctx))
146146

147147
select {
@@ -184,36 +184,42 @@ func (p *PhysicalInitial) Run(ctx context.Context) error {
184184
}()
185185
}
186186

187-
// Promotion.
188-
if p.options.Promote {
189-
// Prepare pre-snapshot.
190-
snapshotName, err := p.cloneManager.CreateSnapshot("", preDataStateAt+pre)
191-
if err != nil {
192-
return errors.Wrap(err, "failed to create a snapshot")
187+
defer func() {
188+
if _, ok := err.(*skipSnapshotErr); ok {
189+
log.Msg(err.Error())
190+
err = nil
193191
}
192+
}()
194193

195-
defer func() {
194+
// Prepare pre-snapshot.
195+
snapshotName, err := p.cloneManager.CreateSnapshot("", preDataStateAt+pre)
196+
if err != nil {
197+
return errors.Wrap(err, "failed to create snapshot")
198+
}
199+
200+
defer func() {
201+
if err != nil {
196202
if errDestroy := p.cloneManager.DestroySnapshot(snapshotName); errDestroy != nil {
197203
log.Err(fmt.Sprintf("Failed to destroy the %q snapshot: %v", snapshotName, err))
198204
}
199-
}()
200-
201-
if err := p.cloneManager.CreateClone(cloneName, snapshotName); err != nil {
202-
return errors.Wrapf(err, "failed to create a pre clone %s", cloneName)
203205
}
206+
}()
204207

205-
defer func() {
208+
if err := p.cloneManager.CreateClone(cloneName, snapshotName); err != nil {
209+
return errors.Wrapf(err, "failed to create \"pre\" clone %s", cloneName)
210+
}
211+
212+
defer func() {
213+
if err != nil {
206214
if errDestroy := p.cloneManager.DestroyClone(cloneName); errDestroy != nil {
207215
log.Err(fmt.Sprintf("Failed to destroy clone %q: %v", cloneName, err))
208216
}
209-
}()
217+
}
218+
}()
210219

220+
// Promotion.
221+
if p.options.Promote {
211222
if err := p.promoteInstance(ctx, path.Join(p.globalCfg.MountDir, cloneName)); err != nil {
212-
if _, ok := err.(*skipSnapshotErr); ok {
213-
log.Msg(err.Error())
214-
return nil
215-
}
216-
217223
return err
218224
}
219225
}
@@ -239,20 +245,25 @@ func (p *PhysicalInitial) Run(ctx context.Context) error {
239245
}
240246

241247
func (p *PhysicalInitial) startScheduler(ctx context.Context) func() {
242-
if p.scheduler == nil {
248+
if p.scheduler == nil || p.options.Scheduler == nil ||
249+
p.options.Scheduler.Snapshot.Timetable == "" && p.options.Scheduler.Retention.Timetable == "" {
243250
return func() {}
244251
}
245252

246253
return func() {
247-
if _, err := p.scheduler.AddFunc(p.options.Scheduler.Snapshot.Timetable, p.runAutoSnapshot(ctx)); err != nil {
248-
log.Err(errors.Wrap(err, "failed to schedule a new snapshot job"))
249-
return
254+
if p.options.Scheduler.Snapshot.Timetable != "" {
255+
if _, err := p.scheduler.AddFunc(p.options.Scheduler.Snapshot.Timetable, p.runAutoSnapshot(ctx)); err != nil {
256+
log.Err(errors.Wrap(err, "failed to schedule a new snapshot job"))
257+
return
258+
}
250259
}
251260

252-
if _, err := p.scheduler.AddFunc(p.options.Scheduler.Retention.Timetable,
253-
p.runAutoCleanup(p.options.Scheduler.Retention.Limit)); err != nil {
254-
log.Err(errors.Wrap(err, "failed to schedule a new cleanup job"))
255-
return
261+
if p.options.Scheduler.Retention.Timetable != "" {
262+
if _, err := p.scheduler.AddFunc(p.options.Scheduler.Retention.Timetable,
263+
p.runAutoCleanup(p.options.Scheduler.Retention.Limit)); err != nil {
264+
log.Err(errors.Wrap(err, "failed to schedule a new cleanup job"))
265+
return
266+
}
256267
}
257268

258269
p.scheduler.Start()

pkg/services/provision/runners/runners.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,9 @@ func (r *LocalRunner) Run(command string, options ...bool) (string, error) {
101101
logCommand := Hidden
102102
if logsEnabled {
103103
logCommand = command
104+
log.Dbg(fmt.Sprintf(`Run(Local): "%s"`, logCommand))
104105
}
105106

106-
log.Dbg(fmt.Sprintf(`Run(Local): "%s"`, logCommand))
107-
108107
var out bytes.Buffer
109108
var stderr bytes.Buffer
110109

@@ -144,13 +143,10 @@ func (r *LocalRunner) Run(command string, options ...bool) (string, error) {
144143

145144
outFormatted := strings.Trim(out.String(), " \n")
146145

147-
logOut := Hidden
148146
if logsEnabled {
149-
logOut = outFormatted
147+
log.Dbg(fmt.Sprintf(`Run(Local): output "%s"`, outFormatted))
150148
}
151149

152-
log.Dbg(fmt.Sprintf(`Run(Local): output "%s"`, logOut))
153-
154150
if stderrStr := stderr.String(); len(stderrStr) > 0 {
155151
log.Dbg("Run(Local): stderr", stderr.String())
156152
}

pkg/services/provision/thinclones/zfs/zfs.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ const (
2424
dataStateAtLabel = "dblab:datastateat"
2525
isRoughStateAtLabel = "dblab:isroughdsa"
2626
dataStateAtFormat = "20060102150405"
27-
stdErrCleanupTag = "cleanup_zfs_snapshot"
2827
)
2928

3029
// ListEntry defines entry of ZFS list command.
@@ -235,7 +234,6 @@ func RollbackSnapshot(r runners.Runner, pool string, snapshot string) error {
235234

236235
// DestroySnapshot destroys the snapshot.
237236
func DestroySnapshot(r runners.Runner, snapshotName string) error {
238-
// TODO(akartasov): Implement the function.
239237
cmd := fmt.Sprintf("zfs destroy -R %s", snapshotName)
240238

241239
if _, err := r.Run(cmd); err != nil {
@@ -248,11 +246,11 @@ func DestroySnapshot(r runners.Runner, snapshotName string) error {
248246
// CleanupSnapshots destroys old ZFS snapshots considering retention limit.
249247
func CleanupSnapshots(r runners.Runner, pool string, retentionLimit int) ([]string, error) {
250248
cleanupCmd := fmt.Sprintf(
251-
"zfs list -t snapshot -r %s -H -o name -s %s -s creation | grep -v clone | head -n -%d "+
252-
"| xargs -n1 --no-run-if-empty zfs destroy -R 2>&1 | logger --stderr --tag \"%s\"",
253-
pool, dataStateAtLabel, retentionLimit, stdErrCleanupTag)
249+
"zfs list -t snapshot -H -o name -s %s -s creation -r %s | grep -v clone | head -n -%d "+
250+
"| xargs -n1 --no-run-if-empty zfs destroy -R ",
251+
dataStateAtLabel, pool, retentionLimit)
254252

255-
out, err := r.Run(cleanupCmd, true)
253+
out, err := r.Run(cleanupCmd)
256254
if err != nil {
257255
return nil, errors.Wrap(err, "failed to clean up snapshots")
258256
}

0 commit comments

Comments
 (0)