Skip to content

feat(site): use websocket connection for devcontainer updates #18808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
487ee95
feat(site): use websocket connection for devcontainer updates
DanielleMaywood Jul 3, 2025
cc42018
fix: some issues
DanielleMaywood Jul 7, 2025
fa46517
Merge branch 'main' into danielle/container-push
DanielleMaywood Jul 7, 2025
5aef560
Merge branch 'main' into danielle/container-push
DanielleMaywood Jul 8, 2025
975ef8b
chore: fix disconnect bug and add agentcontainers test
DanielleMaywood Jul 8, 2025
8fdeca3
Merge branch 'main' into danielle/container-push
DanielleMaywood Jul 9, 2025
6da941f
test: add coderd/ test
DanielleMaywood Jul 9, 2025
ff5725e
chore: appease formatter
DanielleMaywood Jul 9, 2025
178507c
chore: feedback
DanielleMaywood Jul 9, 2025
367b87d
chore: fix nil exception
DanielleMaywood Jul 9, 2025
34b17c4
chore: make gen
DanielleMaywood Jul 9, 2025
8f12460
fix: docs
DanielleMaywood Jul 9, 2025
81022fa
Merge branch 'main' into danielle/container-push
DanielleMaywood Jul 9, 2025
1768f7b
fix: only send when there are updates
DanielleMaywood Jul 9, 2025
8240663
chore: lint and format
DanielleMaywood Jul 9, 2025
6d97960
Merge branch 'main' into danielle/container-push
DanielleMaywood Jul 10, 2025
88a611d
chore: test `useAgentContainers`
DanielleMaywood Jul 10, 2025
001ccda
chore: check container ids match in `Equals` function
DanielleMaywood Jul 10, 2025
3e50965
chore: add logger to WatchContainers
DanielleMaywood Jul 10, 2025
6ce5c19
chore: reposition close of update channel
DanielleMaywood Jul 10, 2025
cd0c2d5
chore: rename `knownDevcontainers`
DanielleMaywood Jul 10, 2025
04a92a4
chore: use `WebsocketNetConn`
DanielleMaywood Jul 10, 2025
096a85e
chore: steal CloseRead
DanielleMaywood Jul 10, 2025
971f9d6
chore: check agents match
DanielleMaywood Jul 10, 2025
f24401f
test: parsing error and socket error
DanielleMaywood Jul 10, 2025
64d9252
chore: lint and format
DanielleMaywood Jul 10, 2025
40c3fd9
chore: give comment some love
DanielleMaywood Jul 14, 2025
1cda455
chore: re-use json encoder instead of recreating every time
DanielleMaywood Jul 14, 2025
2ded15f
fix: push initial dev container state in websocket
DanielleMaywood Jul 14, 2025
a87f388
fix: do not invalidateQuery + fix bad types
DanielleMaywood Jul 14, 2025
2de01f5
chore: appease linter
DanielleMaywood Jul 14, 2025
00fdae6
chore: broadcast updates in more places, add staleTime: Infinity
DanielleMaywood Jul 14, 2025
a4a4bb2
chore: appease linter
DanielleMaywood Jul 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions agent/agentcontainers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package agentcontainers

import (
"context"
"encoding/json"
"errors"
"fmt"
"maps"
"net/http"
"os"
"path"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/provisioner"
"github.com/coder/quartz"
"github.com/coder/websocket"
)

const (
Expand Down Expand Up @@ -74,6 +77,7 @@ type API struct {

mu sync.RWMutex // Protects the following fields.
initDone chan struct{} // Closed by Init.
updateChans []chan struct{}
closed bool
containers codersdk.WorkspaceAgentListContainersResponse // Output from the last list operation.
containersErr error // Error from the last list operation.
Expand Down Expand Up @@ -535,6 +539,7 @@ func (api *API) Routes() http.Handler {
r.Use(ensureInitDoneMW)

r.Get("/", api.handleList)
r.Get("/watch", api.watchContainers)
// TODO(mafredri): Simplify this route as the previous /devcontainers
// /-route was dropped. We can drop the /devcontainers prefix here too.
r.Route("/devcontainers/{devcontainer}", func(r chi.Router) {
Expand All @@ -544,6 +549,88 @@ func (api *API) Routes() http.Handler {
return r
}

func (api *API) broadcastUpdatesLocked() {
// Broadcast state changes to WebSocket listeners.
for _, ch := range api.updateChans {
select {
case ch <- struct{}{}:
default:
}
}
}

func (api *API) watchContainers(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()

conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to upgrade connection to websocket.",
Detail: err.Error(),
})
return
}

// Here we close the websocket for reading, so that the websocket library will handle pings and
// close frames.
_ = conn.CloseRead(context.Background())

ctx, wsNetConn := codersdk.WebsocketNetConn(ctx, conn, websocket.MessageText)
defer wsNetConn.Close()

go httpapi.Heartbeat(ctx, conn)

updateCh := make(chan struct{}, 1)

api.mu.Lock()
api.updateChans = append(api.updateChans, updateCh)
api.mu.Unlock()

defer func() {
api.mu.Lock()
api.updateChans = slices.DeleteFunc(api.updateChans, func(ch chan struct{}) bool {
return ch == updateCh
})
close(updateCh)
api.mu.Unlock()
}()

encoder := json.NewEncoder(wsNetConn)

ct, err := api.getContainers()
if err != nil {
api.logger.Error(ctx, "unable to get containers", slog.Error(err))
return
}

if err := encoder.Encode(ct); err != nil {
api.logger.Error(ctx, "encode container list", slog.Error(err))
return
}

for {
select {
case <-api.ctx.Done():
return

case <-ctx.Done():
return

case <-updateCh:
ct, err := api.getContainers()
if err != nil {
api.logger.Error(ctx, "unable to get containers", slog.Error(err))
continue
}

if err := encoder.Encode(ct); err != nil {
api.logger.Error(ctx, "encode container list", slog.Error(err))
return
}
}
}
}

// handleList handles the HTTP request to list containers.
func (api *API) handleList(rw http.ResponseWriter, r *http.Request) {
ct, err := api.getContainers()
Expand Down Expand Up @@ -583,8 +670,26 @@ func (api *API) updateContainers(ctx context.Context) error {
api.mu.Lock()
defer api.mu.Unlock()

var previouslyKnownDevcontainers map[string]codersdk.WorkspaceAgentDevcontainer
if len(api.updateChans) > 0 {
previouslyKnownDevcontainers = maps.Clone(api.knownDevcontainers)
}

api.processUpdatedContainersLocked(ctx, updated)

if len(api.updateChans) > 0 {
statesAreEqual := maps.EqualFunc(
previouslyKnownDevcontainers,
api.knownDevcontainers,
func(dc1, dc2 codersdk.WorkspaceAgentDevcontainer) bool {
return dc1.Equals(dc2)
})

if !statesAreEqual {
api.broadcastUpdatesLocked()
}
}

api.logger.Debug(ctx, "containers updated successfully", slog.F("container_count", len(api.containers.Containers)), slog.F("warning_count", len(api.containers.Warnings)), slog.F("devcontainer_count", len(api.knownDevcontainers)))

return nil
Expand Down Expand Up @@ -955,6 +1060,8 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
dc.Container = nil
dc.Error = ""
api.knownDevcontainers[dc.WorkspaceFolder] = dc
api.broadcastUpdatesLocked()

go func() {
_ = api.CreateDevcontainer(dc.WorkspaceFolder, dc.ConfigPath, WithRemoveExistingContainer())
}()
Expand Down Expand Up @@ -1070,6 +1177,7 @@ func (api *API) CreateDevcontainer(workspaceFolder, configPath string, opts ...D
dc.Error = ""
api.recreateSuccessTimes[dc.WorkspaceFolder] = api.clock.Now("agentcontainers", "recreate", "successTimes")
api.knownDevcontainers[dc.WorkspaceFolder] = dc
api.broadcastUpdatesLocked()
api.mu.Unlock()

// Ensure an immediate refresh to accurately reflect the
Expand Down
173 changes: 173 additions & 0 deletions agent/agentcontainers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/coder/coder/v2/pty"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
"github.com/coder/websocket"
)

// fakeContainerCLI implements the agentcontainers.ContainerCLI interface for
Expand Down Expand Up @@ -441,6 +442,178 @@ func TestAPI(t *testing.T) {
logbuf.Reset()
})

t.Run("Watch", func(t *testing.T) {
t.Parallel()

fakeContainer1 := fakeContainer(t, func(c *codersdk.WorkspaceAgentContainer) {
c.ID = "container1"
c.FriendlyName = "devcontainer1"
c.Image = "busybox:latest"
c.Labels = map[string]string{
agentcontainers.DevcontainerLocalFolderLabel: "/home/coder/project1",
agentcontainers.DevcontainerConfigFileLabel: "/home/coder/project1/.devcontainer/devcontainer.json",
}
})

fakeContainer2 := fakeContainer(t, func(c *codersdk.WorkspaceAgentContainer) {
c.ID = "container2"
c.FriendlyName = "devcontainer2"
c.Image = "ubuntu:latest"
c.Labels = map[string]string{
agentcontainers.DevcontainerLocalFolderLabel: "/home/coder/project2",
agentcontainers.DevcontainerConfigFileLabel: "/home/coder/project2/.devcontainer/devcontainer.json",
}
})

stages := []struct {
containers []codersdk.WorkspaceAgentContainer
expected codersdk.WorkspaceAgentListContainersResponse
}{
{
containers: []codersdk.WorkspaceAgentContainer{fakeContainer1},
expected: codersdk.WorkspaceAgentListContainersResponse{
Containers: []codersdk.WorkspaceAgentContainer{fakeContainer1},
Devcontainers: []codersdk.WorkspaceAgentDevcontainer{
{
Name: "project1",
WorkspaceFolder: fakeContainer1.Labels[agentcontainers.DevcontainerLocalFolderLabel],
ConfigPath: fakeContainer1.Labels[agentcontainers.DevcontainerConfigFileLabel],
Status: "running",
Container: &fakeContainer1,
},
},
},
},
{
containers: []codersdk.WorkspaceAgentContainer{fakeContainer1, fakeContainer2},
expected: codersdk.WorkspaceAgentListContainersResponse{
Containers: []codersdk.WorkspaceAgentContainer{fakeContainer1, fakeContainer2},
Devcontainers: []codersdk.WorkspaceAgentDevcontainer{
{
Name: "project1",
WorkspaceFolder: fakeContainer1.Labels[agentcontainers.DevcontainerLocalFolderLabel],
ConfigPath: fakeContainer1.Labels[agentcontainers.DevcontainerConfigFileLabel],
Status: "running",
Container: &fakeContainer1,
},
{
Name: "project2",
WorkspaceFolder: fakeContainer2.Labels[agentcontainers.DevcontainerLocalFolderLabel],
ConfigPath: fakeContainer2.Labels[agentcontainers.DevcontainerConfigFileLabel],
Status: "running",
Container: &fakeContainer2,
},
},
},
},
{
containers: []codersdk.WorkspaceAgentContainer{fakeContainer2},
expected: codersdk.WorkspaceAgentListContainersResponse{
Containers: []codersdk.WorkspaceAgentContainer{fakeContainer2},
Devcontainers: []codersdk.WorkspaceAgentDevcontainer{
{
Name: "",
WorkspaceFolder: fakeContainer1.Labels[agentcontainers.DevcontainerLocalFolderLabel],
ConfigPath: fakeContainer1.Labels[agentcontainers.DevcontainerConfigFileLabel],
Status: "stopped",
Container: nil,
},
{
Name: "project2",
WorkspaceFolder: fakeContainer2.Labels[agentcontainers.DevcontainerLocalFolderLabel],
ConfigPath: fakeContainer2.Labels[agentcontainers.DevcontainerConfigFileLabel],
Status: "running",
Container: &fakeContainer2,
},
},
},
},
}

var (
ctx = testutil.Context(t, testutil.WaitShort)
mClock = quartz.NewMock(t)
updaterTickerTrap = mClock.Trap().TickerFunc("updaterLoop")
mCtrl = gomock.NewController(t)
mLister = acmock.NewMockContainerCLI(mCtrl)
logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
)

// Set up initial state for immediate send on connection
mLister.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{Containers: stages[0].containers}, nil)
mLister.EXPECT().DetectArchitecture(gomock.Any(), gomock.Any()).Return("<none>", nil).AnyTimes()

api := agentcontainers.NewAPI(logger,
agentcontainers.WithClock(mClock),
agentcontainers.WithContainerCLI(mLister),
agentcontainers.WithWatcher(watcher.NewNoop()),
)
api.Start()
defer api.Close()

srv := httptest.NewServer(api.Routes())
defer srv.Close()

updaterTickerTrap.MustWait(ctx).MustRelease(ctx)
defer updaterTickerTrap.Close()

client, res, err := websocket.Dial(ctx, srv.URL+"/watch", nil)
require.NoError(t, err)
if res != nil && res.Body != nil {
defer res.Body.Close()
}

// Read initial state sent immediately on connection
mt, msg, err := client.Read(ctx)
require.NoError(t, err)
require.Equal(t, websocket.MessageText, mt)

var got codersdk.WorkspaceAgentListContainersResponse
err = json.Unmarshal(msg, &got)
require.NoError(t, err)

require.Equal(t, stages[0].expected.Containers, got.Containers)
require.Len(t, got.Devcontainers, len(stages[0].expected.Devcontainers))
for j, expectedDev := range stages[0].expected.Devcontainers {
gotDev := got.Devcontainers[j]
require.Equal(t, expectedDev.Name, gotDev.Name)
require.Equal(t, expectedDev.WorkspaceFolder, gotDev.WorkspaceFolder)
require.Equal(t, expectedDev.ConfigPath, gotDev.ConfigPath)
require.Equal(t, expectedDev.Status, gotDev.Status)
require.Equal(t, expectedDev.Container, gotDev.Container)
}

// Process remaining stages through updater loop
for i, stage := range stages[1:] {
mLister.EXPECT().List(gomock.Any()).Return(codersdk.WorkspaceAgentListContainersResponse{Containers: stage.containers}, nil)

// Given: We allow the update loop to progress
_, aw := mClock.AdvanceNext()
aw.MustWait(ctx)

// When: We attempt to read a message from the socket.
mt, msg, err := client.Read(ctx)
require.NoError(t, err)
require.Equal(t, websocket.MessageText, mt)

// Then: We expect the receieved message matches the expected response.
var got codersdk.WorkspaceAgentListContainersResponse
err = json.Unmarshal(msg, &got)
require.NoError(t, err)

require.Equal(t, stages[i+1].expected.Containers, got.Containers)
require.Len(t, got.Devcontainers, len(stages[i+1].expected.Devcontainers))
for j, expectedDev := range stages[i+1].expected.Devcontainers {
gotDev := got.Devcontainers[j]
require.Equal(t, expectedDev.Name, gotDev.Name)
require.Equal(t, expectedDev.WorkspaceFolder, gotDev.WorkspaceFolder)
require.Equal(t, expectedDev.ConfigPath, gotDev.ConfigPath)
require.Equal(t, expectedDev.Status, gotDev.Status)
require.Equal(t, expectedDev.Container, gotDev.Container)
}
}
})

// List tests the API.getContainers method using a mock
// implementation. It specifically tests caching behavior.
t.Run("List", func(t *testing.T) {
Expand Down
35 changes: 35 additions & 0 deletions coderd/apidoc/docs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading