写给同事看的

Group by

create table KAFKA_TXDATTJSMRC
( 
  `data` row(  
    `GRID_POINT_ID`          DECIMAL(20)
    ,`GRID_POINT_NAME`        VARCHAR(500)
    ,`CREATE_TIME`            STRING
    ,`AREA`                   STRING
  )
)

 with (
  'connector' = 'kafka',
  'topic' = 'EX_ZHENGWU_120_TXJLB_LOGMINER',
  'properties.group.id' = '1654704000000',
  'scan.startup.mode' = 'timestamp', 
  'scan.startup.timestamp-millis' = '1654704000000', -- 2022-06-09 00:00:00
  'properties.bootstrap.servers' = '192.168.XX.XX:9591,192.168.XX.XX:9592,192.168.XX.XX:9593',
  'format' = 'json'
);

create table `DM_YQZC_TXDATTJSMRC`
(
  `YWZJ` varchar(32)
 ,`CJDID` VARCHAR(50)
 ,`CJDMC` VARCHAR(200)
 ,`CYRQ` VARCHAR(50)
 ,`SMRC` BIGINT
 ,`ZLSJ` TIMESTAMP
 ,primary key(`YWZJ`) not enforced 
)
with (
  'connector' = 'jdbc',
  'url' = 'jdbc:dm://DMDW', 
  --'url' = 'jdbc:dm://192.168.XX.XX:9690',
  'table-name' = 'YCZC_DM.DM_YQZC_TXDATTJSMRC',
  'username' = 'BIGDATA',
  'password' = 'XXXXXXXXXX',
  'driver' = 'dm.jdbc.driver.DmDriver'
);

insert into `DM_YQZC_TXDATTJSMRC`  
SELECT
MD5(CONCAT(CJDID,CYRQ)) YWZJ
,CJDID
,CJDMC
,CYRQ
,SMRC
,localtimestamp ZLSJ
FROM (
SELECT 
  cast(CJDID as string) CJDID,cast(CJDMC as string) CJDMC,cast(CYRQ as string) CYRQ,
  COUNT(1) SMRC
FROM (
  SELECT 
  cast(`data`.`GRID_POINT_ID` as string)  as CJDID
  ,coalesce(`data`.`GRID_POINT_NAME`,'') as CJDMC
  ,to_date(`data`.`CREATE_TIME`,'yyyy-MM-dd') as CYRQ
  ,`data`.`CREATE_TIME` as CJSJ
  FROM
  KAFKA_TXDATTJSMRC
  WHERE `data`.`AREA`='XX区'
)t
GROUP BY CJDID,CJDMC,CYRQ
ORDER BY CYRQ DESC,SMRC DESC
) tt 

使用Over聚合改写

create table KAFKA_TXDATTJSMRC
( 
  `data` row(  
    `GRID_POINT_ID`           STRING
    ,`GRID_POINT_NAME`        STRING
    ,`CREATE_TIME`            STRING
    ,`AREA`                   STRING
  ),
  CJDID AS `data`.`GRID_POINT_ID`,
  CJDMC AS `data`.`GRID_POINT_NAME`,
  CYSJ AS `data`.`CREATE_TIME`,
  AREA AS `data`.`AREA`,
  PROCTIME AS PROCTIME()
)with (
  'connector' = 'kafka',
  'topic' = 'EX_ZHENGWU_120_TXJLB_LOGMINER',
  'properties.group.id' = '1655740800000',
  'scan.startup.mode' = 'timestamp', 
  'scan.startup.timestamp-millis' = '1655740800000', -- 2022-06-21 00:00:00
  'properties.bootstrap.servers' = '192.168.XX.XX:9591,192.168.XX.XX:9592,192.168.XX.XX:9593',
  'format' = 'json'
);
create table `DM_YQZC_TXDATTJSMRC`
(
  `YWZJ` varchar(32)
 ,`CJDID` VARCHAR(50)
 ,`CJDMC` VARCHAR(200)
 ,`CYRQ` VARCHAR(50)
 ,`SMRC` BIGINT
 ,`ZLSJ` TIMESTAMP
 ,primary key(`YWZJ`) not enforced 
)
with (
  'connector' = 'jdbc',
  'url' = 'jdbc:dm://DMDW', 
  --'url' = 'jdbc:dm://192.168.XX.XX:9690',
  'table-name' = 'YCZC_DM.DM_YQZC_TXDATTJSMRC',
  'username' = 'BIGDATA',
  'password' = 'XXXXXXXXXX',
  'driver' = 'dm.jdbc.driver.DmDriver'
);


INSERT INTO `DM_YQZC_TXDATTJSMRC`
SELECT
MD5(CONCAT(CJDID,CYRQ)) YWZJ
,CJDID
,CJDMC
,CYRQ
,COUNT(1) OVER w SMRC
,LOCALTIMESTAMP ZLSJ
FROM (
  SELECT 
  CAST(CJDID AS STRING) CJDID
  ,CJDMC
  ,DATE_FORMAT(CYSJ,'yyyy-MM-dd') CYRQ
  ,PROCTIME
  FROM KAFKA_TXDATTJSMRC
  WHERE AREA='XX区'
)
WINDOW w AS(
  PARTITION BY CJDID,CYRQ
  ORDER BY PROCTIME
  RANGE BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW
)

Flink官方文档

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/over-agg/#range-definitions