-
Notifications
You must be signed in to change notification settings - Fork 4
feat(ext/sse): added KeepAlive feature #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideImplements a keep‐alive mechanism in the SSE server that periodically pings clients and automatically disconnects those that exceed a configurable idle timeout, introduces a new Option for setting that timeout, and updates the Join API and tests to support these changes. Sequence diagram for SSE Server KeepAlive health check and client timeoutsequenceDiagram
participant Server
participant Client
participant PingEvent
loop Every clientTimeout/2 interval
Server->>Client: Check lastSeen
alt lastSeen < now - clientTimeout
Server->>Client: Disconnect (cancel)
Server->>Server: Remove client from clients/conns
else lastSeen < now - clientTimeout/2
Server->>Client: Send PingEvent
end
end
Sequence diagram for updated Join API with new client detectionsequenceDiagram
participant Server
participant Client
participant RW as http.ResponseWriter
Server->>RW: NewStreamer(rw)
Server->>Client: Check if client exists
alt Client does not exist
Server->>Client: Create new Client
Server->>Client: Set isNewClient = true
else Client exists
Server->>Client: Update connID
Server->>Client: Set isNewClient = false
end
Server->>Client: Update lastSeen
Server->>RW: Set headers, Flush
Server-->>Client: Return (client, connID, isNewClient, error)
Class diagram for updated SSE Server and Client with KeepAliveclassDiagram
class Server {
+map[string]*Client clients
+map[string]int conns
+context.Context ctx
+context.CancelCauseFunc cancel
+time.Duration clientTimeout
+New(opts ...Option)
+startHealthCheck()
+Join(clientID string, rw http.ResponseWriter) (*Client, int, bool, error)
}
class Client {
+string ID
+int connID
+Streamer sm
+context.Context ctx
+context.CancelCauseFunc cancel
+time.Time lastSeen
+Send(evt Event) error
}
class Option {
<<type>>
+func(*Server)
}
Server "1" -- "*" Client : manages
Option <|.. WithClientTimeout
class WithClientTimeout {
+WithClientTimeout(d time.Duration) Option
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Here's the code health analysis summary for commits Analysis Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cnlangzi - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- The keepAlive logic may incorrectly identify active clients as dead. (link)
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `ext/sse/server.go:46` </location>
<code_context>
+ return s
+}
+
+func (s *Server) keepAlive() {
+ if s.keepalive <= 0 {
+ return
</code_context>
<issue_to_address>
The keepAlive logic may incorrectly identify active clients as dead.
The conditional appears inverted; clients should be marked dead only if lastSeen is before timeout. Please update the logic accordingly.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cnlangzi - I've reviewed your changes - here's some feedback:
- Consider renaming the WithTimeout option to something like WithKeepAlive or WithKeepAliveTimeout so its purpose is clearer and doesn’t clash with other timeout configurations.
- The keep-alive loop and its tests use real time.Sleep, which can slow down and destabilize test runs; consider injecting a clock or timer interface to make the logic more testable and deterministic.
- startKeepAlive ignores errors from c.Send on ping (using nolint); it would be safer to handle or log these errors so ping failures don’t go completely unnoticed.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider renaming the WithTimeout option to something like WithKeepAlive or WithKeepAliveTimeout so its purpose is clearer and doesn’t clash with other timeout configurations.
- The keep-alive loop and its tests use real time.Sleep, which can slow down and destabilize test runs; consider injecting a clock or timer interface to make the logic more testable and deterministic.
- startKeepAlive ignores errors from c.Send on ping (using nolint); it would be safer to handle or log these errors so ping failures don’t go completely unnoticed.
## Individual Comments
### Comment 1
<location> `ext/sse/server_test.go:353` </location>
<code_context>
})
+
+ t.Run("timeout", func(t *testing.T) {
+ srv := New(WithTimeout(3 * time.Second))
+ rw := httptest.NewRecorder()
+
+ _, _, _, err := srv.Join("keepalive", rw)
+ require.NoError(t, err)
+
+ sm := &streamerMock{
</code_context>
<issue_to_address>
Missing test for WithTimeout(0) and negative values.
Add tests for WithTimeout(0) and negative values to confirm the keep-alive routine does not start and clients are not timed out unexpectedly.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
require.ErrorIs(t, err, ErrNotStreamer) | ||
|
||
c1, id, err = srv.Join("join", rw) | ||
c1, id, ok, err = srv.Join("join", rw) | ||
require.NotNil(t, c1) | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Missing test for WithTimeout(0) and negative values.
Add tests for WithTimeout(0) and negative values to confirm the keep-alive routine does not start and clients are not timed out unexpectedly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sourcery-ai review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I'm generating a new review now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cnlangzi, I've posted a new review for you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cnlangzi - I've reviewed your changes - here's some feedback:
- Replace time.Sleep-based tests with a controllable clock or shorter intervals to avoid fragile or slow CI runs.
- Ensure that concurrent pings sent by startHealthCheck via c.Send won’t race on the Streamer/ResponseWriter, e.g. by serializing writes or flushes.
- Rename the WithClientTimeout option (or update docs) so its name matches the description (e.g. WithTimeout) for consistency.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Replace time.Sleep-based tests with a controllable clock or shorter intervals to avoid fragile or slow CI runs.
- Ensure that concurrent pings sent by startHealthCheck via c.Send won’t race on the Streamer/ResponseWriter, e.g. by serializing writes or flushes.
- Rename the WithClientTimeout option (or update docs) so its name matches the description (e.g. WithTimeout) for consistency.
## Individual Comments
### Comment 1
<location> `ext/sse/server_test.go:374` </location>
<code_context>
+ require.Equal(t, body, ": ping\n\n\n")
+ time.Sleep(2 * time.Second)
+
+ err = context.Cause(c.Context())
+
+ require.ErrorIs(t, err, ErrClientTimeout)
+
+ })
</code_context>
<issue_to_address>
Suggestion: Add a test for client activity resetting the timeout.
Please add a test where the client receives an event just before the timeout to confirm lastSeen is updated and the client remains connected, validating the keepalive logic.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
err = context.Cause(c.Context()) | ||
|
||
require.ErrorIs(t, err, ErrClientTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Suggestion: Add a test for client activity resetting the timeout.
Please add a test where the client receives an event just before the timeout to confirm lastSeen is updated and the client remains connected, validating the keepalive logic.
Changed
Fixed
Added
Tests
Tasks to complete before merging PR:
make unit-tests
to check for any regressions 📋make lint
to check for any issuesSummary by Sourcery
Implement configurable client timeout and KeepAlive support for SSE Server by adding WithClientTimeout option, health check loop with PingEvent and automatic disconnection of stale clients, updating Join signature, and extending tests accordingly
New Features:
Enhancements:
Tests: