Filter a Kafka stream by date using KSQL

Confluent Platform is a full-scale data streaming managed cloud service that enables us to access, store, and manage data as a continuous, real-time stream. Confluent Cloud provides ksqlDB, a streaming SQL engine for Kafka. It is an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka,  In this article, we will see how to filter a Kafka stream by date using KSQL.

When we create a KSQL stream or table, we implicitly get the following pseudo columns in each stream or table. These columns are present by default in each stream/table, and we do not need to specify them in stream/table definitions.

Pseudo column nameDescription
ROWOFFSETThe offset of the source record.
ROWPARTITIONThe partition of the source record.
ROWTIMERow timestamp of the source record.

Query ROWTIME of a particular KSQL stream or table record

The following query will show the ROWTIME of particular row(s) filtered by COLUMN_NAME.

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') FROM SOME_STREAM
WHERE COLUMN_NAME = 'some_value' 
EMIT CHANGES;

Query to filter KSQL stream or table based on a date

The following query will filter the KSQL stream or table based on the specified date.

SELECT * FROM  SOME_STREAM 
WHERE TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd') = 'yyyy-MM-dd'  --some date in the format <yyyy-MM-dd>
EMIT CHANGES;
Filter a Kafka stream by date using KSQL

You can format the ROWTIME as you want to apply the filter. Use auto.offset/reset = ‘Earliest’ to get data from the past.

Kunal Rathi

Been working in the data engineering and analytics space for over a decade. Helping customers transform their data into insights, Cloud & DevOps enthusiast.