Skip to content

Memory keeps on increasing while updating existing rule through REST API #3667

@ankit-4129

Description

@ankit-4129

Environment:

  • eKuiper version: 2.1.1 make build_core
  • Hardware configuration (e.g. lscpu):
Architecture:           x86_64
  CPU op-mode(s):       32-bit, 64-bit
  Address sizes:        39 bits physical, 48 bits virtual
  Byte Order:           Little Endian
CPU(s):                 8
  On-line CPU(s) list:  0-7
Vendor ID:              GenuineIntel
  Model name:           Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz
    CPU family:         6
    Model:              94
    Thread(s) per core: 2
    Core(s) per socket: 4
    Socket(s):          1
    Stepping:           3
    CPU max MHz:        4000.0000
    CPU min MHz:        800.0000
  • OS (e.g. cat /etc/os-release):
NAME="Oracle Linux Server"
VERSION="9.0"
ID="ol"
ID_LIKE="fedora"
VARIANT="Server"
VARIANT_ID="server"
VERSION_ID="9.0"
PLATFORM_ID="platform:el9"
PRETTY_NAME="Oracle Linux Server 9.0"
  • Others:

What happened and what you expected to happen:
kuiperd memory increases while updating existing rule and never reduces.

How to reproduce it (as minimally and precisely as possible):

  1. Create a simple bash script updateRule.sh and rule payload in rtest.json to repeatedly update rule:
    updateRule.sh
#!/bin/bash

while true; do
        curl -X PUT -H 'Content-Type: application/json' http://localhost:59720/rules/RULE_1 -d @rtest.json
        sleep 1
done

rtest.json

{
  "id": "RULE_1",
  "sql": "Select avg(value) AS value, * EXCEPT (value) from STREAM_1 left join STREAM_1 GROUP BY TumblingWindow(ms, 300000)",
  "actions": [
    {
      "memory": {
        "topic": "k-s-m"
      }
    }
  ]
}

  1. Run the script for 5-10 minutes and Observe that kuiperd memory keeps on increasing.
  2. Stop the script updateRule.sh and observe that kuiperd memory never reduces.

Anything else we need to know?:
Attaching logs for the same:

time="2025-04-10T10:17:10Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000000000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendNil:false SendError:false Qos:0 CheckpointInterval:300000000000 RestartStrategy:0xc00246aa50 Cron: Duration: CronDatetimeRange:[] P
lanOptimizeStrategy:<nil> NotifySub:false DisableBufferFullDiscard:false EnableSaveStateBeforeStop:false}" file="planner/planner.go:54"                        
time="2025-04-10T10:17:10Z" level=info msg="get conf for mqtt with conf key : map[datasource:device/SENSOR/someSensor delimiter: format:json key: password:* qos:0 retainSize:0 schemaId: server:tcp://mqtt-broker:1883 strictValidation:false timestamp: timestampFormat: u
sername:rule-engine-user]" file="conf/source.go:88"                            
time="2025-04-10T10:17:10Z" level=info msg="provision source STREAM_1 with props map[datasource:device/SENSOR/someSensor delimiter: format:json key: password:OXiilq1XIwcYa100 qos:0 retainSize:0 schemaId: server:tcp://mqtt-broker:1883 strictVali
dation:false timestamp: timestampFormat: username:rule-engine-user]" file="node/source_node.go:58" rule=RULE_1                         
time="2025-04-10T10:17:10Z" level=info msg="get conf for mqtt with conf key : map[datasource:device/SENSOR/someSensor delimiter: format:json key: password:* qos:0 retainSize:0 schemaId: server:tcp://mqtt-broker:1883 strictValidation:false timestamp: timestampFormat: u
sername:rule-engine-user]" file="conf/source.go:88"                            
time="2025-04-10T10:17:10Z" level=info msg="provision source STREAM_1 with props map[datasource:device/SENSOR/someSensor delimiter: format:json key: password:OXiilq1XIwcYa100 qos:0 retainSize:0 schemaId: server:tcp://mqtt-broker:1883 strictVali
dation:false timestamp: timestampFormat: username:rule-engine-user]" file="node/source_node.go:58" rule=RULE_1                         
time="2025-04-10T10:17:10Z" level=info msg="provision sink memory_0 with props map[topic:k-s-m]" file="planner/planner_sink.go:94" rule=RULE_1                                                                                                                                            
time="2025-04-10T10:17:10Z" level=info msg="create message sink node memory_0" file="node/sink_node.go:222" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="create sink node memory_0 with isRetry false, resendInterval 0, bufferLength 1024" file="node/sink_node.go:65" rule=RULE_1                                                                                                                                 
time="2025-04-10T10:17:10Z" level=info msg="Rule RULE_1 is update." file="processor/rule.go:90"
time="2025-04-10T10:17:10Z" level=info msg="stopping rule RULE_1" Rule=RULE_1 file="rule/state.go:385"
time="2025-04-10T10:17:10Z" level=info msg="node memory_0 is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="distribute done" file="node/concurrent.go:95" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="node 4_decoder is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="unary operator 6_join instance 0 cancelling...." file="node/operations.go:126" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="unary operator 6_join instance 0 done, cancelling future items" file="node/operations.go:92" rule=RULE_1                                                                                                                                                   
time="2025-04-10T10:17:10Z" level=info msg="node 6_join is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="merge done" file="node/concurrent.go:78" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="Cancelling window...." file="node/window_op.go:447" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="node 5_window is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="merge done" file="node/concurrent.go:78" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="unary operator 7_project instance 0 cancelling...." file="node/operations.go:126" rule=RULE_1                                                                                                                                                              
time="2025-04-10T10:17:10Z" level=info msg="unary operator 7_project instance 0 done, cancelling future items" file="node/operations.go:92" rule=RULE_1                                                                                                                                                
time="2025-04-10T10:17:10Z" level=info msg="node 7_project is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="distribute done" file="node/concurrent.go:95" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="node 2_decoder is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="merge done" file="node/concurrent.go:78" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="Closing mqtt source connector to topic device/SENSOR/someSensor." file="mqtt/source.go:131" rule=RULE_1                                                                                                      
time="2025-04-10T10:17:10Z" level=info msg="distribute done" file="node/concurrent.go:95" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="node memory_0_0_transform is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="Closing mqtt source connector to topic device/SENSOR/someSensor." file="mqtt/source.go:131" rule=RULE_1                                                                                                      
time="2025-04-10T10:17:10Z" level=info msg="detachConnection remove conn:RULE_1-STREAM_1-device/SENSOR/someSensor-mqtt-source,ref:RULE_1_STREAM_1_0" file="connection/pool.go:322
"                                                                              
time="2025-04-10T10:17:10Z" level=info msg="node STREAM_1 is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="detachConnection remove conn:RULE_1-STREAM_1-device/SENSOR/someSensor-mqtt-source,ref:RULE_1_STREAM_1_0" file="connection/pool.go:322
"                                                                              
time="2025-04-10T10:17:10Z" level=info msg="node STREAM_1 is closing" file="node/node.go:236" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="rule RULE_1 transit to state stopped" Rule=RULE_1 file="rule/state.go:155"                                                                                                                                                         
time="2025-04-10T10:17:10Z" level=info msg="start to run rule RULE_1" Rule=RULE_1 file="rule/state.go:274"
time="2025-04-10T10:17:10Z" level=info msg="rule RULE_1 transit to state running" Rule=RULE_1 file="rule/state.go:155"                                                                                                                                                         
time="2025-04-10T10:17:10Z" level=info msg="Opening stream" file="topo/topo.go:273" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="memory_0 started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="2_decoder started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="4_decoder started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="5_window started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="Start with window state triggerTime: 1744280230465, msgCount: 0" file="node/window_op.go:162" rule=RULE_1                                                                                                                                                  
time="2025-04-10T10:17:10Z" level=info msg="6_join started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="7_project started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="memory_0_0_transform started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="align window timer to 2025-04-10 10:22:10 +0000 UTC(1744280530000)" file="node/window_op.go:225" rule=RULE_1                                                                                                                                               
time="2025-04-10T10:17:10Z" level=info msg="STREAM_1 started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="STREAM_1 started" file="node/node.go:217" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="Connecting to mqtt server" file="mqtt/source.go:87" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="FetchConnection return new conn RULE_1-STREAM_1-device/SENSOR/someSensor-mqtt-source" file="connection/pool.go:111"                                                                  
time="2025-04-10T10:17:10Z" level=info msg="Connecting to mqtt server" file="mqtt/source.go:87" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="FetchConnection return existed conn RULE_1-STREAM_1-device/SENSOR/someSensor-mqtt-source" file="connection/pool.go:98"                                                               
time="2025-04-10T10:17:10Z" level=info msg="trigger status change handler" file="mqtt/conn.go:112" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="new mqtt client created" file="mqtt/conn.go:100" rule=RULE_1
time="2025-04-10T10:17:10Z" level=info msg="The connection to mqtt broker is established" file="mqtt/conn.go:123" rule=RULE_1

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions