-
Notifications
You must be signed in to change notification settings - Fork 1.5k
http2: share same hpack decoder for one tuple connect #744 #772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
06b05e5
to
c9c6805
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the handling of HTTP2 connections by sharing a single HPACK decoder per tuple connection and improves connection lifecycle management. Key changes include:
- Modifying GetConn to return a pointer to ConnInfo instead of a tuple string.
- Introducing a destroy connection channel and associated worker cleanup in the event processor.
- Updating HTTP2 request and response implementations to reuse an HPACK decoder instance.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
user/module/probe_openssl.go | Updates GetConn return type and adds a call to WriteDestroyConn for valid socket IDs. |
pkg/event_processor/processor.go | Adds destroyConn channel, destroyWorkers, and clearAllWorkers methods for managing worker lifecycles. |
pkg/event_processor/iworker.go | Enhances the eventWorker interface with GetSock and Done; updates Run logic to handle ticker restarts and UUID parsing. |
pkg/event_processor/http2_response.go | Reuses an HPACK decoder instance instead of creating a new one per call, with updated logging messages. |
pkg/event_processor/http2_request.go | Applies similar HPACK decoder reuse as in responses and adds ClientPreface handling. |
} | ||
ew.tickerCount++ | ||
case e := <-ew.incoming: | ||
if restartFlag { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When restartFlag is true, a new ticker is created without stopping the previous ticker, which could lead to resource leaks. Consider calling ticker.Stop() on the existing ticker before assigning a new one.
if restartFlag { | |
if restartFlag { | |
ew.ticker.Stop() |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Copilot ew.ticker.Stop() is already exec in function drainAndClose(ew) -> ew.Close() before here, so when restartFlag is true, it means ew.ticker is stopped status, no need to calling ticker.Stop()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感谢您的PR,不过,我觉得有很多功能需要改善一下。
另外,请务必修复 CI的错误。
@@ -73,21 +79,43 @@ func NewEventWorker(uuid string, processor *EventProcessor) IWorker { | |||
return eWorker | |||
} | |||
|
|||
func getSock(uuid string) uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eCapture的事件类型可能不是socket, iWorker应该是一个抽象的对象。不应该把转化为socket的功能放到iwork里。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个问题建议如何修改呢? socket信息uuid 中再拆解?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iwork是一个不涉及具体包类型的数据处理类。 传入的数据可能是base捕获的字符串,也可能是http、tcp的包。 所以,socket这种具体业务的内容,我建议放到IParser
这类代码中。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iwork是一个不涉及具体包类型的数据处理类。 传入的数据可能是base捕获的字符串,也可能是http、tcp的包。 所以,socket这种具体业务的内容,我建议放到
IParser
这类代码中。
当前修改方案是一个uuid(tuple+dateType确认) 共享一个eWork,以解决hpack解码器共享问题。 destory发生时候通过通知destroyWorkers根据sock 匹配删除ework, 将sock放到IParser如何实现eWork的add/delete管理?
@@ -33,6 +33,8 @@ type EventProcessor struct { | |||
incoming chan event.IEventStruct | |||
// send to output | |||
outComing chan string | |||
// destroyConn sock | |||
destroyConn chan uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
跟前面提到的iwork一样的问题,抽象上需要改进一下。 不应该在processor里定义具体的socket的功能字段。
|
CI 错误: https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:32-125 === RUN TestEventProcessor_Serve
==================
WARNING: DATA RACE
Write at 0x00c000124268 by goroutine 12:
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).clearAllWorkers()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:156 +0x32d
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:84 +0x[31](https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:32)3
github.com/gojue/ecapture/pkg/event_processor.TestEventProcessor_Serve.func1()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:46 +0x37
Previous read at 0x00c000124268 by goroutine 11:
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).Close()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:190 +0x130
github.com/gojue/ecapture/pkg/event_processor.TestEventProcessor_Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:81 +0xd79
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.(*T).Run.gowrap1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x44
Goroutine 12 (running) created at:
github.com/gojue/ecapture/pkg/event_processor.TestEventProcessor_Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:44 +0x55c
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.(*T).Run.gowrap1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x44
Goroutine 11 (running) created at:
testing.(*T).Run()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x8f2
testing.runTests.func1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2279 +0x85
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.runTests()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2277 +0x96c
testing.(*M).Run()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2142 +0xeea
main.main()
_testmain.go:53 +0x164
==================
processor_test.go:103: close error: EventProcessor.Close(): workerQueue is not empty:13
testing.go:1490: race detected during execution of test
--- FAIL: TestEventProcessor_Serve (10.02s)
=== RUN Test_Truncated_EventProcessor_Serve
==================
WARNING: DATA RACE
Write at 0x00c000090448 by goroutine 35:
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).clearAllWorkers()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:156 +0x[32](https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:33)d
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:84 +0x313
github.com/gojue/ecapture/pkg/event_processor.Test_Truncated_EventProcessor_Serve.func1()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:129 +0x37
Previous read at 0x00c000090448 by goroutine [34](https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:35):
github.com/gojue/ecapture/pkg/event_processor.(*EventProcessor).Close()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor.go:190 +0x130
github.com/gojue/ecapture/pkg/event_processor.Test_Truncated_EventProcessor_Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:165 +0xcb9
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.(*T).Run.gowrap1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x44
Goroutine [35](https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:36) (running) created at:
github.com/gojue/ecapture/pkg/event_processor.Test_Truncated_EventProcessor_Serve()
/home/runner/work/ecapture/ecapture/pkg/event_processor/processor_test.go:127 +0x55c
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.(*T).Run.gowrap1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x44
Goroutine 34 (running) created at:
testing.(*T).Run()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1851 +0x8f2
testing.runTests.func1()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2279 +0x85
testing.tRunner()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:1792 +0x225
testing.runTests()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2277 +0x96c
testing.(*M).Run()
/home/runner/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.24.1.linux-amd64/src/testing/testing.go:2142 +0xeea
main.main()
_testmain.go:53 +0x164
==================
processor_test.go:177: Events truncated, size: 1000 bytes
processor_test.go:182: close error: EventProcessor.Close(): workerQueue is not empty:2
testing.go:1490: race detected during execution of test
--- FAIL: Test_Truncated_EventProcessor_Serve (10.01s)
FAIL
FAIL github.com/gojue/ecapture/pkg/event_processor 20.0[39](https://github.com/gojue/ecapture/actions/runs/14855593137/job/41708068834?pr=772#step:10:40)s |
Signed-off-by: chilli <chilli19890121@gmail.com>
c9c6805
to
35f82e7
Compare
为了让eventWorker和特定字段解耦,我认为可以新增一个类似isStatefulLongLived的bool类型字段来标识一个eventWorker需要维护一个长时间的有状态的事件解析器,在init阶段可以定义这类eventWorker的特定行为,这样既不用改变其他模块的eventWorker逻辑,也能对特定需求的eventWorker进行定制化处理,如果大家觉得这个思路有帮助,我可以尝试动手修改一下,并发起一个新的PR |
@cfc4n 请问引入一个以上的eventWorker逻辑是否符合本项目的设计 |
引入 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances connection tracking and HTTP/2 header decoding by:
- Introducing a
Sock
identifier to link SSL events and manage connection lifecycles. - Adding a
destroyConn
channel and related logic to tear down workers when connections close. - Sharing a single HPACK decoder instance (
hdec
) per HTTP/2 connection instead of recreating one on each frame display.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
user/module/probe_openssl.go | Updated GetConn to return *ConnInfo and propagate Sock . |
user/event/event_openssl.go | Added Sock to SSLDataEvent and extended GetUUID formatting. |
pkg/event_processor/processor.go | Added destroyConn channel, destroyWorkers , and cleanup logic. |
pkg/event_processor/iworker.go | Propagates Sock in workers and implements GetSock /Done . |
pkg/event_processor/http2_request.go | Introduced per-connection hdec and reused it in Display . |
pkg/event_processor/http2_response.go | Same per-connection hdec sharing for responses. |
@@ -105,6 +111,20 @@ func (ep *EventProcessor) dispatch(e event.IEventStruct) error { | |||
// return this.incoming | |||
//} | |||
|
|||
func (ep *EventProcessor) destroyWorkers(sock uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing ep.workerQueue
without acquiring ep.Lock()
can lead to concurrent map read/write panics. Wrap the iteration over workerQueue
in ep.Lock()
/defer ep.Unlock()
to ensure thread safety.
Copilot uses AI. Check for mistakes.
@@ -132,6 +152,12 @@ func (ep *EventProcessor) delWorkerByUUID(worker IWorker) { | |||
delete(ep.workerQueue, worker.GetUUID()) | |||
} | |||
|
|||
func (ep *EventProcessor) clearAllWorkers() { | |||
ep.Lock() | |||
defer ep.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearAllWorkers
only resets the workerQueue
map but does not signal existing workers to stop, potentially leaking goroutines. Consider calling Done()
on each worker before clearing the map to properly shut them down.
defer ep.Unlock() | |
defer ep.Unlock() | |
for _, worker := range ep.workerQueue { | |
worker.Done() | |
} |
Copilot uses AI. Check for mistakes.
payloadLen := len(h2r.reader.String()) | ||
if payloadLen >= ClientPrefaceLen { | ||
data := h2r.reader.String()[0:ClientPrefaceLen] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Calling reader.String()
allocates a new string; use reader.Bytes()
or reader.Len()
on the underlying buffer to avoid unnecessary allocations.
payloadLen := len(h2r.reader.String()) | |
if payloadLen >= ClientPrefaceLen { | |
data := h2r.reader.String()[0:ClientPrefaceLen] | |
payloadLen := h2r.reader.Len() | |
if payloadLen >= ClientPrefaceLen { | |
data := string(h2r.reader.Bytes()[0:ClientPrefaceLen]) |
Copilot uses AI. Check for mistakes.
uuidFileCount := 7 | ||
parts := strings.SplitN(uuid, "_", uuidFileCount) | ||
if len(parts) != uuidFileCount { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The magic number 7
in SplitN(uuid, "_", 7)
is brittle. Define a constant for the expected number of UUID segments to clarify its purpose and make future changes easier.
uuidFileCount := 7 | |
parts := strings.SplitN(uuid, "_", uuidFileCount) | |
if len(parts) != uuidFileCount { | |
parts := strings.SplitN(uuid, "_", UUIDSegmentCount) | |
if len(parts) != UUIDSegmentCount { |
Copilot uses AI. Check for mistakes.
I will create a new pr according to #785 |
#744