Filter a Kafka stream by date using KSQL

Confluent Platform is a full-scale data streaming managed cloud service that enables us to access, store, and manage data as a continuous, real-time stream. Confluent Cloud provides ksqlDB, a streaming SQL engine for Kafka. It is an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka,  In this article, we will see how to filter a Kafka stream by date using KSQL.

When we create a KSQL stream or table, we implicitly get the following pseudo columns in each stream or table. These columns are present by default in each stream/table, and we do not need to specify them in stream/table definitions.

Pseudo column nameDescription
ROWOFFSETThe offset of the source record.
ROWPARTITIONThe partition of the source record.
ROWTIMERow timestamp of the source record.

Query ROWTIME of a particular KSQL stream or table record

The following query will show the ROWTIME of particular row(s) filtered by COLUMN_NAME.

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') FROM SOME_STREAM
WHERE COLUMN_NAME = 'some_value' 
EMIT CHANGES;

Query to filter KSQL stream or table based on a date

The following query will filter the KSQL stream or table based on the specified date.

SELECT * FROM  SOME_STREAM 
WHERE TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd') = 'yyyy-MM-dd'  --some date in the format <yyyy-MM-dd>
EMIT CHANGES;
Filter a Kafka stream by date using KSQL

You can format the ROWTIME as you want to apply the filter. Use auto.offset/reset = ‘Earliest’ to get data from the past.

Pro tips:
1. If you want to automate Confluent ksql deployments, you can refer to this post.

Kunal Rathi

With over 13 years of experience in data engineering and analytics, I've assisted countless clients in gaining valuable insights from their data. As a dedicated supporter of Data, Cloud and DevOps, I'm excited to connect with individuals who share my passion for this field. If my work resonates with you, we can talk and collaborate.

Shopping Cart
Scroll to Top