Confluent Platform is a full-scale data streaming managed cloud service that enables us to access, store, and manage data as a continuous, real-time stream. Confluent Cloud provides ksqlDB, a streaming SQL engine for Kafka. It is an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, In this article, we will see how to filter a Kafka stream by date using KSQL.
When we create a KSQL stream or table, we implicitly get the following pseudo columns in each stream or table. These columns are present by default in each stream/table, and we do not need to specify them in stream/table definitions.
Pseudo column name | Description |
---|---|
ROWOFFSET | The offset of the source record. |
ROWPARTITION | The partition of the source record. |
ROWTIME | Row timestamp of the source record. |
Query ROWTIME of a particular KSQL stream or table record
The following query will show the ROWTIME of particular row(s) filtered by COLUMN_NAME.
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') FROM SOME_STREAM
WHERE COLUMN_NAME = 'some_value'
EMIT CHANGES;
Query to filter KSQL stream or table based on a date
The following query will filter the KSQL stream or table based on the specified date.
SELECT * FROM SOME_STREAM
WHERE TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd') = 'yyyy-MM-dd' --some date in the format <yyyy-MM-dd>
EMIT CHANGES;
You can format the ROWTIME as you want to apply the filter. Use auto.offset/reset = ‘Earliest’ to get data from the past.
Pro tips:
1. If you want to automate Confluent ksql deployments, you can refer to this post.
Kunal Rathi
With over 13 years of experience in data engineering and analytics, I've assisted countless clients in gaining valuable insights from their data. As a dedicated supporter of Data, Cloud and DevOps, I'm excited to connect with individuals who share my passion for this field. If my work resonates with you, we can talk and collaborate.