A complete real-time data enrichment pipeline using Confluent Cloud FlinkSQL that enriches product click events with the latest searchId from previous search events by the same user. Uses temporal joins to maintain user state and implements Schema Registry AVRO integration for type safety.
This pipeline demonstrates modern stream processing with Confluent Cloud's managed FlinkSQL service. It processes click stream events where:
- Search events contain a
searchId
and search query - Product click events initially have
searchId = null
- FlinkSQL enriches product clicks with the latest
searchId
from the same user's previous search events - Temporal joins maintain user state to track the most recent search per user
- Real-time enrichment happens as events flow through the pipeline
- AVRO schemas ensure data consistency and enable schema evolution
- End-to-end automation with cleanup and monitoring
βββββββββββββββ ββββββββββββββββββββ βββββββββββββββ
β Producer βββββΆβ Input Topic βββββΆβ FlinkSQL β
β (AVRO) β β (AVRO Schema) β β Enrichment β
βββββββββββββββ ββββββββββββββββββββ βββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββ βββββββββββββββ
β Schema Registry β β Output Topicβ
β (Type Safety) β β (Enriched) β
ββββββββββββββββββββ βββββββββββββββ
β
βΌ
βββββββββββββββ
β Consumer β
β (AVRO) β
βββββββββββββββ
- π Schema Registry Integration: Full AVRO serialization/deserialization
- π Native Confluent Cloud: Leverages managed FlinkSQL service
- β‘ Real-time Processing: Event-time based enrichment
- π§Ό Auto Cleanup: Automated resource management
- π§ Easy Setup: Single command execution with
make all-in-one
- π Monitoring: Built-in success/failure indicators
git clone <repository-url>
cp .env.template .env
# Edit .env with your Confluent Cloud credentials
make all-in-one
That's it! The pipeline will:
- β Create virtual environment and install dependencies
- β Setup configuration and validate credentials
- β Create Kafka topics with AVRO schemas
- β Deploy FlinkSQL enrichment job
- β Start consumer for enriched events
- β Generate test click stream events
- β Display real-time enrichment results
- β Clean up all resources
Edit .env
with your Confluent Cloud details:
# Kafka Cluster
BOOTSTRAP_SERVERS=pkc-xxxxx.region.aws.confluent.cloud:9092
SASL_USERNAME=your-kafka-api-key
SASL_PASSWORD=your-kafka-api-secret
KAFKA_CLUSTER_ID=lkc-xxxxx
# Confluent Cloud Environment
CONFLUENT_ENV_ID=env-xxxxx
CONFLUENT_CLOUD_ENVIRONMENT_ID=env-xxxxx
# FlinkSQL Service
FLINK_REST_ENDPOINT=https://flink.region.aws.confluent.cloud
FLINK_ORG_ID=your-org-id
FLINK_API_KEY=your-flink-api-key
FLINK_API_SECRET=your-flink-api-secret
FLINK_COMPUTE_POOL_ID=lfcp-xxxxx
# Schema Registry (Required for AVRO)
SCHEMA_REGISTRY_API_KEY=your-sr-api-key
SCHEMA_REGISTRY_API_SECRET=your-sr-api-secret
SCHEMA_REGISTRY_ENDPOINT=https://psrc-xxxxx.region.gcp.confluent.cloud
The pipeline uses a 4-step temporal join approach:
- Input Table Setup: Adds watermarks to the input topic for temporal processing
- State Table Creation: Creates an upsert table to maintain latest
searchId
per user - State Population: Inserts search events into the state table as they arrive
- Temporal Enrichment: Joins product clicks with user state using
FOR SYSTEM_TIME AS OF
-- Step 4: Temporal Join Query
INSERT INTO output_topic
SELECT
c.userId,
c.eventType,
COALESCE(c.searchId, s.searchId, CONCAT('enriched-', c.userId)) as searchId,
-- ... other fields
FROM input_topic c
LEFT JOIN user_search_state FOR SYSTEM_TIME AS OF c.eventTime s
ON c.userId = s.userId;
This ensures real-time enrichment where each product click gets the latest actual searchId from that user's previous search events.
{
"eventTime": "2025-08-12T15:08:07.863593+00:00",
"userId": "user1",
"clickId": "unique-click-id",
"eventType": "search" | "product_click",
"searchId": "search-uuid" | null,
"productId": "product_1" | null,
"query": "search query" | null,
"referrer": "google.com" | "search_results",
"metadata": {
"device": "desktop",
"browser": "chrome"
}
}
The SQL enrichment implements a temporal join approach:
- State Table: Maintains the latest
searchId
per user using upsert mode - Temporal Join: Uses
FOR SYSTEM_TIME AS OF
to join product clicks with user's latest search state - Real-time Processing: Events are processed as they arrive with proper watermarks
- Fallback Logic: When no previous search exists, applies
enriched-{userId}
pattern
The pipeline maintains user state and enriches each product click with the actual searchId from that user's most recent search event.
{
"eventTime": "2025-08-12T15:08:09.863593+00:00",
"userId": "user1",
"clickId": "unique-click-id",
"eventType": "product_click",
"searchId": "85894f91-6de5-4674-85ef-76748c1bc095", // β ENRICHED with actual searchId!
"productId": "product_1",
"query": null,
"referrer": "search_results",
"metadata": {
"device": "desktop",
"browser": "chrome"
}
}
Run individual components for debugging:
# Setup environment
python setup.py
# Create topics and schemas
python topics.py
# Deploy FlinkSQL job
python flink_sql.py
# Generate test events
python producer.py
# Consume enriched results
python consumer.py
# Clean up resources
python cleanup.py
When running make all-in-one
, you should see:
β
Message delivered to input_pipeline_xxx [1] at offset 20
β
Message delivered to input_pipeline_xxx [2] at offset 21
π¨ Enriched Message:
π― User: user1
π¦ Event Type: product_click
ποΈ Product ID: product_1
π SearchId: 85894f91-6de5-4674-85ef-76748c1bc095 // β Real searchId from user's previous search!
π ClickId: 7a86a720-6ab5-49fd-ae58-1780290ddf58
β° Time: 2025-08-12T15:08:09.863593+00:00
β
ENRICHED: Product click successfully linked to user's latest search
π Enrichment Rate: 6/14 messages (43%) - Real searchId enrichment
π Fallback Rate: 8/14 messages (57%) - Using enriched-{userId} pattern
Enrichment Logic:
- When a user performs a search, their
searchId
is stored in the state table - Subsequent product clicks by the same user get enriched with that actual searchId
- If no previous search exists, fallback to
enriched-{userId}
pattern - Temporal joins ensure real-time enrichment as events flow through
The pipeline automatically cleans up resources, but you can manually run:
python cleanup.py
This removes:
- All Flink SQL statements
- Created Kafka topics
- Schema Registry subjects
- Consumer groups
Issue | Solution |
---|---|
β Failed to setup consumer |
Check Schema Registry credentials in .env |
β Consumer error: UTF-8 decode |
Verify AVRO deserializer is working |
β Topic creation failed |
Validate Kafka API keys and cluster ID |
β FlinkSQL deployment failed |
Check Flink compute pool and API credentials |
Enable verbose logging:
export DEBUG=1
make all-in-one
This pipeline pattern is ideal for:
- E-commerce Analytics: Link product clicks to search queries
- User Journey Tracking: Connect user actions across sessions
- Real-time Personalization: Enrich events with user context
- Marketing Attribution: Track campaign effectiveness
- Fraud Detection: Correlate suspicious activities
The optimal implementation approach depends on your scale requirements:
-- Alternative: Use tumbling windows instead of temporal joins
SELECT userId,
LAST_VALUE(searchId) as latestSearchId
FROM input_topic
WHERE eventType = 'search'
GROUP BY userId, TUMBLE(eventTime, INTERVAL '1' HOUR)
- State Storage: Windowed state with configurable retention
- Pros: Better memory management, handles user churn
- Cons: Less precise timing, potential searchId delays
- Use When: Moderate scale with acceptable latency trade-offs
-- Batch: Periodic user state snapshots (every 15 minutes)
CREATE TABLE user_state_snapshots AS
SELECT userId, LAST_VALUE(searchId) as searchId, window_end
FROM search_events
GROUP BY userId, TUMBLE(eventTime, INTERVAL '15' MINUTES);
-- Stream: Enrich with latest snapshot + recent searches
SELECT c.*,
COALESCE(recent.searchId, snapshot.searchId, 'enriched-' || c.userId) as searchId
FROM product_clicks c
LEFT JOIN recent_searches recent ON c.userId = recent.userId
LEFT JOIN user_state_snapshots snapshot ON c.userId = snapshot.userId;
- State Storage: Hybrid batch snapshots + streaming updates
- Pros: Handles massive scale, cost-effective, fault-tolerant
- Cons: Complex architecture, potential staleness
- Use When: Massive scale, cost optimization priority
- State TTL: Configure state cleanup for inactive users
- Partitioning: Use consistent userId hashing for parallel processing
- Compression: Enable state compression for memory efficiency
- Monitoring: Track state size, processing latency, and enrichment rates
- Confluent Cloud: Managed Kafka + FlinkSQL
- Schema Registry: AVRO schema management
- Python: Producer/Consumer applications
- FlinkSQL: Stream processing and enrichment
- Docker: Local development (optional)
This project is open source and available under the MIT License.
FlinkSQL Click Stream - Real-time event enrichment made simple π