-
Notifications
You must be signed in to change notification settings - Fork 437
Description
Environment:
- eKuiper version (e.g.
1.3.0
): 2.2.1 - Hardware configuration (e.g.
lscpu
): - OS (e.g.
cat /etc/os-release
): linux wsl or official docker image - Others:
What happened and what you expected to happen:
I have a plugin (sink) that times out after a number of collected messages. This is the error:
time="2025-08-07 17:57:08" level=error msg="ack error: send time out" file="runtime/sink.go:99" rule=value_plus_one
I looked through various issues and saw that the problem was resolved by increasing the timeout on the eKuiper side, but in this case, the timeout seems to come from the SDK, and there are no configuration parameters to increase that timeout.
How to reproduce it (as minimally and precisely as possible):
This is the source plugin that sends a message every 10ms:
package plugin
import (
"time"
"github.com/lf-edge/ekuiper/sdk/go/api"
)
type MyPluginEdgeSource struct {
}
func (s *MyPluginEdgeSource) Configure(_ string, props map[string]interface{}) error {
return nil
}
func (s *MyPluginEdgeSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error) {
logger := ctx.GetLogger()
ctx.GetLogger().Infof("[SOURCE] Open")
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for {
select {
case <-t.C:
next := map[string]any{
"test": 99,
}
logger.Infof("[SOURCE] Send out data %v", next)
consumer <- api.NewDefaultSourceTuple(next, nil)
case <-ctx.Done():
return
}
}
}
func (s *MyPluginEdgeSource) Close(ctx api.StreamContext) error {
ctx.GetLogger().Infof("[SOURCE] Closing")
return nil
}
This is the sink that throws the error after 537 messages:
package plugin
import (
"github.com/lf-edge/ekuiper/sdk/go/api"
)
type MyPluginEdgeEventSink struct {
count int
}
func (m *MyPluginEdgeEventSink) Configure(props map[string]interface{}) error {
return nil
}
func (m *MyPluginEdgeEventSink) Open(ctx api.StreamContext) error {
ctx.GetLogger().Infof("[SINK] Open")
return nil
}
func (m *MyPluginEdgeEventSink) Collect(ctx api.StreamContext, item interface{}) error {
m.count++
ctx.GetLogger().Infof("[SINK] Collect %s (#%d)", item, m.count)
return nil
}
func (m *MyPluginEdgeEventSink) Close(ctx api.StreamContext) error {
ctx.GetLogger().Infof("[SINK] Closing")
return nil
}
Source configuration:
CREATE STREAM rule_random_device_test_c () WITH (DATASOURCE="*", format="JSON", TYPE="myplugin-edge", STRICT_VALIDATION="true", SHARED="false")
Rule configuration:
SELECT test+1 as testplusone FROM rule_random_device_test_c;
With the sink plugin as the only action for the rule.
Anything else we need to know?:
The code above is a simplified version based on the portable examples from the repository. The error is systematic and occurs after 537 messages are received. The source keeps sending data. I get the same error in my plugin, which is much more complex, and it always happens (roughly, I haven’t counted precisely since timing isn’t so strict in production) after that number of messages. The error appears on a local ekuiper installation and inside a docker container. I'm pasting the logs below.
time="2025-08-07 17:57:07" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SINK] Collect {\"metadata\":{},\"testplusone\":100} (#536)" file="internal/myplugin_edge_event_sink.go:22" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SINK] Collect {\"metadata\":{},\"testplusone\":100} (#537)" file="internal/myplugin_edge_event_sink.go:22" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:07" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
[...]
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=error msg="ack error: send time out" file="runtime/sink.go:99" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SINK] Closing" file="internal/myplugin_edge_event_sink.go:27" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="closed sink data channel" file="runtime/sink.go:121" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_one
time="2025-08-07 17:57:08" level=info msg="[SOURCE] Send out data map[test:99]" file="internal/myplugin_edge_source.go:30" rule=value_plus_on