写给同事看的
Group by
create table KAFKA_TXDATTJSMRC
(
`data` row(
`GRID_POINT_ID` DECIMAL(20)
,`GRID_POINT_NAME` VARCHAR(500)
,`CREATE_TIME` STRING
,`AREA` STRING
)
)
with (
'connector' = 'kafka',
'topic' = 'EX_ZHENGWU_120_TXJLB_LOGMINER',
'properties.group.id' = '1654704000000',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1654704000000', -- 2022-06-09 00:00:00
'properties.bootstrap.servers' = '192.168.XX.XX:9591,192.168.XX.XX:9592,192.168.XX.XX:9593',
'format' = 'json'
);
create table `DM_YQZC_TXDATTJSMRC`
(
`YWZJ` varchar(32)
,`CJDID` VARCHAR(50)
,`CJDMC` VARCHAR(200)
,`CYRQ` VARCHAR(50)
,`SMRC` BIGINT
,`ZLSJ` TIMESTAMP
,primary key(`YWZJ`) not enforced
)
with (
'connector' = 'jdbc',
'url' = 'jdbc:dm://DMDW',
--'url' = 'jdbc:dm://192.168.XX.XX:9690',
'table-name' = 'YCZC_DM.DM_YQZC_TXDATTJSMRC',
'username' = 'BIGDATA',
'password' = 'XXXXXXXXXX',
'driver' = 'dm.jdbc.driver.DmDriver'
);
insert into `DM_YQZC_TXDATTJSMRC`
SELECT
MD5(CONCAT(CJDID,CYRQ)) YWZJ
,CJDID
,CJDMC
,CYRQ
,SMRC
,localtimestamp ZLSJ
FROM (
SELECT
cast(CJDID as string) CJDID,cast(CJDMC as string) CJDMC,cast(CYRQ as string) CYRQ,
COUNT(1) SMRC
FROM (
SELECT
cast(`data`.`GRID_POINT_ID` as string) as CJDID
,coalesce(`data`.`GRID_POINT_NAME`,'') as CJDMC
,to_date(`data`.`CREATE_TIME`,'yyyy-MM-dd') as CYRQ
,`data`.`CREATE_TIME` as CJSJ
FROM
KAFKA_TXDATTJSMRC
WHERE `data`.`AREA`='XX区'
)t
GROUP BY CJDID,CJDMC,CYRQ
ORDER BY CYRQ DESC,SMRC DESC
) tt
使用Over聚合改写
create table KAFKA_TXDATTJSMRC
(
`data` row(
`GRID_POINT_ID` STRING
,`GRID_POINT_NAME` STRING
,`CREATE_TIME` STRING
,`AREA` STRING
),
CJDID AS `data`.`GRID_POINT_ID`,
CJDMC AS `data`.`GRID_POINT_NAME`,
CYSJ AS `data`.`CREATE_TIME`,
AREA AS `data`.`AREA`,
PROCTIME AS PROCTIME()
)with (
'connector' = 'kafka',
'topic' = 'EX_ZHENGWU_120_TXJLB_LOGMINER',
'properties.group.id' = '1655740800000',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655740800000', -- 2022-06-21 00:00:00
'properties.bootstrap.servers' = '192.168.XX.XX:9591,192.168.XX.XX:9592,192.168.XX.XX:9593',
'format' = 'json'
);
create table `DM_YQZC_TXDATTJSMRC`
(
`YWZJ` varchar(32)
,`CJDID` VARCHAR(50)
,`CJDMC` VARCHAR(200)
,`CYRQ` VARCHAR(50)
,`SMRC` BIGINT
,`ZLSJ` TIMESTAMP
,primary key(`YWZJ`) not enforced
)
with (
'connector' = 'jdbc',
'url' = 'jdbc:dm://DMDW',
--'url' = 'jdbc:dm://192.168.XX.XX:9690',
'table-name' = 'YCZC_DM.DM_YQZC_TXDATTJSMRC',
'username' = 'BIGDATA',
'password' = 'XXXXXXXXXX',
'driver' = 'dm.jdbc.driver.DmDriver'
);
INSERT INTO `DM_YQZC_TXDATTJSMRC`
SELECT
MD5(CONCAT(CJDID,CYRQ)) YWZJ
,CJDID
,CJDMC
,CYRQ
,COUNT(1) OVER w SMRC
,LOCALTIMESTAMP ZLSJ
FROM (
SELECT
CAST(CJDID AS STRING) CJDID
,CJDMC
,DATE_FORMAT(CYSJ,'yyyy-MM-dd') CYRQ
,PROCTIME
FROM KAFKA_TXDATTJSMRC
WHERE AREA='XX区'
)
WINDOW w AS(
PARTITION BY CJDID,CYRQ
ORDER BY PROCTIME
RANGE BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW
)
Flink官方文档
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/over-agg/#range-definitions