1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义
1.31.1.工程结构
1.31.2.定义pom.xml文件
1.31.3.log4j2.properties
1.31.4.logback.xml
1.31.5.cache.properties
1.31.6.project-config.properties
1.31.7.IssueAcceptSimpleProducer.java
1.31.8.Consumer.java
1.31.9.DefaultTopicSelector.java
1.31.10.SimpleTopicSelector.java
1.31.11.TopicSelector.java
1.31.12.KeyValueDeserializationSchema.java
1.31.13.KeyValueSerializationSchema.java
1.31.14.SimpleKeyValueDeserializationSchema.java
1.31.15.SimpleKeyValueSerializationSchema.java
1.31.16.RocketMQConfig.java
1.31.17.RocketMQSink.java
1.31.18.RocketMQSource.java
1.31.19.RocketMQUtils.java
1.31.20.RunningChecker.java
1.31.21.DateUtils.java
1.31.22.PropertiesUtils.java
1.31.23.RedisUtil.java
1.31.24.IssueConstants.java
1.31.25.IssueAcceptRedisSink.java
1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java
1.32.Flink其它案例
1.32.1.使用DataGen生成数据
1.32.2.使用value state进行存储临时数据
1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义
1.31.1.工程结构
1.31.2.定义pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- xxxxxx实时处理 -->
<modelVersion>4.0.0</modelVersion>
<groupId>xxx.xxx.xxxx</groupId>
<artifactId>indicators-real-time-handler</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
<maven.test.skip>true</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--<rocketmq.version>4.7.1</rocketmq.version>-->
<rocketmq.version>4.5.1</rocketmq.version>
<flink.version>1.11.1</flink.version>
<flink-connector-redis.version>1.0</flink-connector-redis.version>
<commons-lang.version>2.5</commons-lang.version>
<scala.binary.version>2.12</scala.binary.version>
<junit.version>4.12</junit.version>
<redis.version>3.3.0</redis.version>
<slf4j.version>1.7.25</slf4j.version>
<fastjson.version>1.2.73</fastjson.version>
<joda-time.version>2.9.4</joda-time.version>
<!--<hadoop.version>2.8.3</hadoop.version>-->
<!-- 用于连接中间件团队的redis用 -->
<tmc-version>0.6.2</tmc-version>
<fileName>issue-handler</fileName>
<mainClass>com.xxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>
</properties>
<dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-databind</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--
1.compile : 默认的scope,运行期有效,需要打入包中。
2.provided : 编译器有效,运行期不需要提供,不会打入包中。
3.runtime : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)
4.test : 测试需要,不会打入包中
5.system : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<!--<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>${flink-connector-redis.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-namesrv</artifactId>
<version>${rocketmq.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-broker</artifactId>
<version>${rocketmq.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>${rocketmq.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<!--test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<!-- 使用scala编程的时候使用下面的依赖 start-->
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>-->
<!-- 使用scala编程的时候使用下面的依赖 end-->
<!-- kafka connector scala 2.12 -->
<!--
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
-->
<!--
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>1.5.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<version>1.5.5</version>
<scope>test</scope>
</dependency>
-->
</dependencies>
<distributionManagement>
<repository>
<id>releases</id>
<layout>default</layout>
<url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>snapshots</name>
<url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<repositories>
<repository>
<id>releases</id>
<layout>default</layout>
<url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
</repository>
<repository>
<id>snapshots</id>
<name>snapshots</name>
<url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>warn</checksumPolicy>
</snapshots>
</repository>
<repository>
<id>xxxxx</id>
<name>xxxxxx</name>
<url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url>
</repository>
<repository>
<id>public</id>
<name>public</name>
<url>http://xxx.xxx.xxx/nexus/content/groups/public/</url>
</repository>
<!-- 新加 -->
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<build>
<finalName>${fileName}</finalName>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>${project.build.sourceEncoding}</encoding>
<compilerVersion>${maven.compiler.source}</compilerVersion>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<skipTests>${maven.test.skip}</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.12</version>
<configuration>
<excludes>
<exclude>README.md</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<configuration>
<aggregate>true</aggregate>
<reportOutputDirectory>javadocs</reportOutputDirectory>
<locale>en</locale>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<!--<groupId>org.apache.maven.plugins</groupId>-->
<artifactId>maven-assembly-plugin</artifactId>
<!--<version>2.6</version>-->
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!--<archive>
<manifest>
<mainClass>${mainClass}</mainClass>
</manifest>
</archive>-->
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
- 382
- 383
- 384
- 385
- 386
- 387
- 388
- 389
- 390
- 391
- 392
- 393
- 394
- 395
- 396
- 397
- 398
- 399
- 400
- 401
- 402
- 403
- 404
1.31.3.log4j2.properties
rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- 1
- 2
- 3
- 4
- 5
- 6
- 7
1.31.4.logback.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- Daily rollover -->
<fileNamePattern>log/generator.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- Keep 7 days' worth of history -->
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="ERROR">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
1.31.5.cache.properties
#\u5E94\u7528appkey
app.key=appKeyTest
# not use memoryCache
#\u7F13\u5B58\u76D1\u63A7\u5F00\u5173 true/false
monitor.enabled=false
# 测试环境
monitor.bootstrap.servers=xxx.xxx.xxx.xxx:9094
synchronize.enabled= false
local.cache.name = none
#localCache config start
# 测试环境redis
remote.cache.servers=redis://xxx.xxx.xxx.xxx:6390,redis://xxx.xxx.xxx.xxx:6391,redis://xxx.xxx.xxx.xxx:6392,redis://xxx.xxx.xxx.xxx:6393,redis://xxx.xxx.xxx.xxx:6394,redis://xxx.xxx.xxx.xxx:6395
remote.cache.mode=cluster
#remote.cache.mode=standalone
remote.cache.password=123456
synchronize.strategy=kafka
synchronize.address=
monitor.address=
monitor.strategy=kafka
namespace=doraemon
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
1.31.6.project-config.properties
# 本地的rocketmq的name server地址
#rocketmq.name.server.addr=localhost:9876
# 开发环境rocketmq
rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
# 测试环境
# rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
rocketmq.topics=issue_sync_message_1##issue_sync_message_2
####################################flink相关配置 start###########################################
# 间隔5秒产生checkpoing
flink.checkpoint.interval=120000
# 确保检查点之间有至少500 ms的间隔
flink.checkpoint.minPauseBetweenCheckpoints=1000
# 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
flink.checkpoint.checkpointTimeout=60000
# 同一时间只允许进行一个检查点
flink.checkpoint.maxConcurrentCheckpoints=1
# rocketmq的读并发
flink.rockeqmq.source.parallelism=1
# redis下沉的并发
flink.redis.sink.parallelism=1
# 尝试重启次数
flink.fixedDelayRestart.times=3
# 每次尝试重启时之间的时间间隔
flink.fixedDelayRestart.interval=5
####################################redis相关配置 end ###########################################
# 默认保存10天
redis.default.expiration.time=864000
####################################redis相关配置 end ###########################################
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
1.31.7.IssueAcceptSimpleProducer.java
package com.xxxxx.issue.producer;
import com.alibaba.fastjson.JSON;
import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.domain.IssueSyncMessageBody;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import com.xxxxx.issue.utils.PropertiesUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* 启动rocketmq的命令:
* .\bin\mqnamesrv.cmd
* .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
*
* @author tuzuoquan
* @version 1.0
* @ClassName IssuePassSimpleProducer
* @description TODO
* @date 2020/9/14 15:29
**/
public class IssueAcceptSimpleProducer {
private static Logger LOG = LoggerFactory.getLogger(IssueAcceptSimpleProducer.class);
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("p003");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
Long i = 1000L;
while (i <= 100000000L) {
IssueSyncMessageBody body = new IssueSyncMessageBody();
body.setIssueId(i);
body.setSerialNumber("" + i);
body.setCreateDate(new Date());
Long flag = i % 10;
String userName = "user" + flag;
String mobile = "1981715866" + flag;
body.setHandleUserName(userName);
body.setHandleMobile(mobile);
String tenantId = i % 10 + "";
;
body.setTenantId(tenantId);
String oneLevel = i % 32 + "";
String twoLevel = i % 22 + "";
String threeLevel = i % 30 + "";
String fourLevel = i % 15 + "";
String fiveLevel = i % 25 + "";
String sixLevel = i % 5 + "";
String sevenLevel = i % 20 + "";
Long tmp = i % 7;
String handlerOrgCode = null;
if (tmp.compareTo(0L) == 0) {
handlerOrgCode = oneLevel;
} else if (tmp.compareTo(1L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel;
} else if (tmp.compareTo(2L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel;
} else if (tmp.compareTo(3L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel;
} else if (tmp.compareTo(4L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel;
} else if (tmp.compareTo(5L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel;
} else if (tmp.compareTo(6L) == 0) {
handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel + "." + sevenLevel;
}
body.setHandleOrgCode(handlerOrgCode);
//1、受理数
String tag = "issue_accept_operat";
Integer operatType = 61;
//2、发生事件数:
// TAG:issue_accept_operat operatType:61 +
// TAG:issue_add_operat operatType:2 -
// TAG:issue_delete_operat operatType:0 org_code:区域范围 createDate:今日
//3、办结数:
//TAG:issue_inspect_pass_operat operatType:30 +TAG:issue_complete_operat operatType:31
IssueSyncMessageBodyVO issueSyncMessageBodyVO = new IssueSyncMessageBodyVO();
issueSyncMessageBodyVO.setBody(body);
issueSyncMessageBodyVO.setOperatType(operatType);
//TQMessage msg = new TQMessage("issue_sync_message", "issue_add_operat", ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));
Message msg = new Message("issue_sync_message_1", tag, "id_" + i, ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));
//Message msg = new Message("issue_sync_message5", "issue_add_operat", "id_" + i, JSON.toJSONString(issueSyncMessageBodyVO).getBytes());
try {
producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("send :" + i + " content: " + JSON.toJSONString(issueSyncMessageBodyVO));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
1.31.8.Consumer.java
package com.xxxxx.issue.consumer;
import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("abcdefsssss");
// Specify name server addresses.
//consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
consumer.setNamesrvAddr("localhost:9876");
//consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
// Subscribe one more more topics to consume.
//consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "issue_accept_operat || issue_add_operat || issue_delete_operat");
//consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "*");
consumer.subscribe("issue_sync_message_1", "*");
consumer.subscribe("issue_sync_message_2", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());
System.out.println(issueSyncMessageBodyVO.getBody());
//byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();
// System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), value.toString());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
1.31.9.DefaultTopicSelector.java
package org.apache.rocketmq.flink.common.selector;
public class DefaultTopicSelector<T> implements TopicSelector<T> {
private final String topicName;
private final String tagName;
public DefaultTopicSelector(final String topicName, final String tagName) {
this.topicName = topicName;
this.tagName = tagName;
}
public DefaultTopicSelector(final String topicName) {
this(topicName, "");
}
@Override
public String getTopic(T tuple) {
return topicName;
}
@Override
public String getTag(T tuple) {
return tagName;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
1.31.10.SimpleTopicSelector.java
package org.apache.rocketmq.flink.common.selector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class SimpleTopicSelector implements TopicSelector<Map> {
private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class);
private final String topicFieldName;
private final String defaultTopicName;
private final String tagFieldName;
private final String defaultTagName;
/**
* SimpleTopicSelector Constructor.
* @param topicFieldName field name used for selecting topic
* @param defaultTopicName default field name used for selecting topic
* @param tagFieldName field name used for selecting tag
* @param defaultTagName default field name used for selecting tag
*/
public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
this.topicFieldName = topicFieldName;
this.defaultTopicName = defaultTopicName;
this.tagFieldName = tagFieldName;
this.defaultTagName = defaultTagName;
}
@Override
public String getTopic(Map tuple) {
if (tuple.containsKey(topicFieldName)) {
Object topic = tuple.get(topicFieldName);
return topic != null ? topic.toString() : defaultTopicName;
} else {
LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
return defaultTopicName;
}
}
@Override
public String getTag(Map tuple) {
if (tuple.containsKey(tagFieldName)) {
Object tag = tuple.get(tagFieldName);
return tag != null ? tag.toString() : defaultTagName;
} else {
LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName);
return defaultTagName;
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
1.31.11.TopicSelector.java
package org.apache.rocketmq.flink.common.selector;
import java.io.Serializable;
public interface TopicSelector<T> extends Serializable {
String getTopic(T tuple);
String getTag(T tuple);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
1.31.12.KeyValueDeserializationSchema.java
package org.apache.rocketmq.flink.common.serialization;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import java.io.Serializable;
public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
T deserializeKeyAndValue(byte[] key, byte[] value);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
1.31.13.KeyValueSerializationSchema.java
package org.apache.rocketmq.flink.common.serialization;
import java.io.Serializable;
public interface KeyValueSerializationSchema<T> extends Serializable {
byte[] serializeKey(T tuple);
byte[] serializeValue(T tuple);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
1.31.14.SimpleKeyValueDeserializationSchema.java
package org.apache.rocketmq.flink.common.serialization;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
public String keyField;
public String valueField;
public SimpleKeyValueDeserializationSchema() {
this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
}
/**
* SimpleKeyValueDeserializationSchema Constructor.
* @param keyField tuple field for selecting the key
* @param valueField tuple field for selecting the value
*/
public SimpleKeyValueDeserializationSchema(String keyField, String valueField) {
this.keyField = keyField;
this.valueField = valueField;
}
@Override
public Map deserializeKeyAndValue(byte[] key, byte[] value) {
HashMap map = new HashMap(2);
if (keyField != null) {
String k = key != null ? new String(key, StandardCharsets.UTF_8) : null;
map.put(keyField, k);
}
if (valueField != null) {
String v = value != null ? new String(value, StandardCharsets.UTF_8) : null;
map.put(valueField, v);
}
return map;
}
@Override
public TypeInformation<Map> getProducedType() {
return TypeInformation.of(Map.class);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
1.31.15.SimpleKeyValueSerializationSchema.java
package org.apache.rocketmq.flink.common.serialization;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class SimpleKeyValueSerializationSchema implements KeyValueSerializationSchema<Map> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
public String keyField;
public String valueField;
public SimpleKeyValueSerializationSchema() {
this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
}
/**
* SimpleKeyValueSerializationSchema Constructor.
* @param keyField tuple field for selecting the key
* @param valueField tuple field for selecting the value
*/
public SimpleKeyValueSerializationSchema(String keyField, String valueField) {
this.keyField = keyField;
this.valueField = valueField;
}
@Override
public byte[] serializeKey(Map tuple) {
if (tuple == null || keyField == null) {
return null;
}
Object key = tuple.get(keyField);
return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : null;
}
@Override
public byte[] serializeValue(Map tuple) {
if (tuple == null || valueField == null) {
return null;
}
Object value = tuple.get(valueField);
return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
1.31.16.RocketMQConfig.java
package org.apache.rocketmq.flink;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.Properties;
import java.util.UUID;
import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
/**
* RocketMQConfig for Consumer/Producer.
*/
public class RocketMQConfig {
// Server Config
public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
// Producer related config
public static final String PRODUCER_GROUP = "producer.group";
public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
public static final String PRODUCER_TIMEOUT = "producer.timeout";
public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
public static final String ACCESS_KEY = "access.key";
public static final String SECRET_KEY = "secret.key";
// Consumer related config
public static final String CONSUMER_GROUP = "consumer.group"; // Required
public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
public static final String CONSUMER_TAG = "consumer.tag";
public static final String DEFAULT_CONSUMER_TAG = "*";
public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
public static final String CONSUMER_OFFSET_LATEST = "latest";
public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp";
public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size";
public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;
public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
public static final String MSG_DELAY_LEVEL = "msg.delay.level";
public static final int MSG_DELAY_LEVEL00 = 0; // no delay
public static final int MSG_DELAY_LEVEL01 = 1; // 1s
public static final int MSG_DELAY_LEVEL02 = 2; // 5s
public static final int MSG_DELAY_LEVEL03 = 3; // 10s
public static final int MSG_DELAY_LEVEL04 = 4; // 30s
public static final int MSG_DELAY_LEVEL05 = 5; // 1min
public static final int MSG_DELAY_LEVEL06 = 6; // 2min
public static final int MSG_DELAY_LEVEL07 = 7; // 3min
public static final int MSG_DELAY_LEVEL08 = 8; // 4min
public static final int MSG_DELAY_LEVEL09 = 9; // 5min
public static final int MSG_DELAY_LEVEL10 = 10; // 6min
public static final int MSG_DELAY_LEVEL11 = 11; // 7min
public static final int MSG_DELAY_LEVEL12 = 12; // 8min
public static final int MSG_DELAY_LEVEL13 = 13; // 9min
public static final int MSG_DELAY_LEVEL14 = 14; // 10min
public static final int MSG_DELAY_LEVEL15 = 15; // 20min
public static final int MSG_DELAY_LEVEL16 = 16; // 30min
public static final int MSG_DELAY_LEVEL17 = 17; // 1h
public static final int MSG_DELAY_LEVEL18 = 18; // 2h
/**
* Build Producer Configs.
* @param props Properties
* @param producer DefaultMQProducer
*/
public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
buildCommonConfigs(props, producer);
String group = props.getProperty(PRODUCER_GROUP);
if (StringUtils.isEmpty(group)) {
group = UUID.randomUUID().toString();
}
producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
producer.setRetryTimesWhenSendFailed(getInteger(props,
PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
producer.setSendMsgTimeout(getInteger(props,
PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
}
/**
* Build Consumer Configs.
* @param props Properties
* @param consumer DefaultMQPushConsumer
*/
public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
buildCommonConfigs(props, consumer);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setPersistConsumerOffsetInterval(getInteger(props,
CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
}
/**
* Build Common Configs.
* @param props Properties
* @param client ClientConfig
*/
public static void buildCommonConfigs(Properties props, ClientConfig client) {
String nameServers = props.getProperty(NAME_SERVER_ADDR);
Validate.notEmpty(nameServers);
client.setNamesrvAddr(nameServers);
client.setPollNameServerInterval(getInteger(props,
NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
client.setHeartbeatBrokerInterval(getInteger(props,
BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
}
/**
* Build credentials for client.
* @param props
* @return
*/
public static AclClientRPCHook buildAclRPCHook(Properties props) {
String accessKey = props.getProperty(ACCESS_KEY);
String secretKey = props.getProperty(SECRET_KEY);
if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
return aclClientRPCHook;
}
return null;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
1.31.17.RocketMQSink.java
package org.apache.rocketmq.flink;
import org.apache.commons.lang.Validate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.flink.common.selector.TopicSelector;
import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/**
* The RocketMQSink provides at-least-once reliability guarantees when
* checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
* Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.
*/
public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);
private transient DefaultMQProducer producer;
private boolean async; // false by default
private Properties props;
private TopicSelector<IN> topicSelector;
private KeyValueSerializationSchema<IN> serializationSchema;
private boolean batchFlushOnCheckpoint; // false by default
private int batchSize = 1000;
private List<Message> batchList;
private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
this.serializationSchema = schema;
this.topicSelector = topicSelector;
this.props = props;
if (this.props != null) {
this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
RocketMQConfig.MSG_DELAY_LEVEL00);
if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
} else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) {
this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18;
}
}
}
@Override
public void open(Configuration parameters) throws Exception {
Validate.notEmpty(props, "Producer properties can not be empty");
Validate.notNull(topicSelector, "TopicSelector can not be null");
Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
RocketMQConfig.buildProducerConfigs(props, producer);
batchList = new LinkedList<>();
if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
batchFlushOnCheckpoint = false;
}
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
@Override
public void invoke(IN input, Context context) throws Exception {
Message msg = prepareMessage(input);
if (batchFlushOnCheckpoint) {
batchList.add(msg);
if (batchList.size() >= batchSize) {
flushSync();
}
return;
}
if (async) {
try {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success! result: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
if (throwable != null) {
LOG.error("Async send message failure!", throwable);
}
}
});
} catch (Exception e) {
LOG.error("Async send message failure!", e);
}
} else {
try {
SendResult result = producer.send(msg);
LOG.debug("Sync send message result: {}", result);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new RemotingException(result.toString());
}
} catch (Exception e) {
LOG.error("Sync send message failure!", e);
throw e;
}
}
}
private Message prepareMessage(IN input) {
String topic = topicSelector.getTopic(input);
String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";
byte[] k = serializationSchema.serializeKey(input);
String key = k != null ? new String(k, StandardCharsets.UTF_8) : "";
byte[] value = serializationSchema.serializeValue(input);
Validate.notNull(topic, "the message topic is null");
Validate.notNull(value, "the message body is null");
Message msg = new Message(topic, tag, key, value);
if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
}
return msg;
}
public RocketMQSink<IN> withAsync(boolean async) {
this.async = async;
return this;
}
public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {
this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
return this;
}
public RocketMQSink<IN> withBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
@Override
public void close() throws Exception {
if (producer != null) {
try {
flushSync();
} catch (Exception e) {
LOG.error("FlushSync failure!", e);
}
// make sure producer can be shutdown, thus current producerGroup will be unregistered
producer.shutdown();
}
}
private void flushSync() throws Exception {
if (batchFlushOnCheckpoint) {
synchronized (batchList) {
if (batchList.size() > 0) {
producer.send(batchList);
batchList.clear();
}
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
flushSync();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Nothing to do
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
1.31.18.RocketMQSource.java
import com.alibaba.fastjson.JSON;
import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.rocketmq.flink.RocketMQConfig.*;
import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
/**
* The MyRocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when
* checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees.
*/
public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);
private transient MQPullConsumerScheduleService pullConsumerScheduleService;
private DefaultMQPullConsumer consumer;
private KeyValueDeserializationSchema<OUT> schema;
private RunningChecker runningChecker;
private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
private Map<MessageQueue, Long> offsetTable;
private Map<MessageQueue, Long> restoredOffsets;
/** Data for pending but uncommitted offsets. */
private LinkedMap pendingOffsetsToCommit;
private Properties props;
private String topic;
private String group;
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private transient volatile boolean restored;
private transient boolean enableCheckpoint;
public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
this.props = props;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.debug("source open....");
Validate.notEmpty(props, "Consumer properties can not be empty");
Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");
this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
if (offsetTable == null) {
offsetTable = new ConcurrentHashMap<>();
}
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
if (pendingOffsetsToCommit == null) {
pendingOffsetsToCommit = new LinkedMap();
}
runningChecker = new RunningChecker();
//Wait for lite pull consumer
//pullConsumerScheduleService = new MQPullConsumerScheduleService(group, RocketMQConfig.buildAclRPCHook(props));
pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
RocketMQConfig.buildConsumerConfigs(props, consumer);
}
@Override
public void run(SourceContext context) throws Exception {
LOG.debug("source run....");
// The lock that guarantees that record emission and state updates are atomic,
// from the view of taking a checkpoint.
final Object lock = context.getCheckpointLock();
int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
try {
long offset = getMessageQueueOffset(mq);
if (offset < 0) {
return;
}
PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
boolean found = false;
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messages = pullResult.getMsgFoundList();
for (MessageExt msg : messages) {
byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
//byte[] value = msg.getBody();
//IssueSyncMessageBodyVO bodyVO = JSON.parseObject(new String(value),IssueSyncMessageBodyVO.class);
IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());
//LOG.info(JSON.toJSONString("issueSyncMessageBodyVO = " + issueSyncMessageBodyVO));
byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();
OUT data = schema.deserializeKeyAndValue(key, value);
// output and state update are atomic
synchronized (lock) {
context.collectWithTimestamp(data, msg.getBornTimestamp());
}
}
found = true;
break;
case NO_MATCHED_MSG:
LOG.debug("No matched message after offset {} for queue {}", offset, mq);
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
LOG.warn("Offset {} is illegal for queue {}", offset, mq);
break;
default:
break;
}
synchronized (lock) {
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
}
if (found) {
pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found
} else {
pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
try {
pullConsumerScheduleService.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
runningChecker.setRunning(true);
awaitTermination();
}
private void awaitTermination() throws InterruptedException {
while (runningChecker.isRunning()) {
Thread.sleep(50);
}
}
private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
Long offset = offsetTable.get(mq);
// restoredOffsets(unionOffsetStates) is the restored global union state;
// should only snapshot mqs that actually belong to us
if (restored && offset == null) {
offset = restoredOffsets.get(mq);
}
if (offset == null) {
offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0) {
String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
switch (initialOffset) {
case CONSUMER_OFFSET_EARLIEST:
offset = consumer.minOffset(mq);
break;
case CONSUMER_OFFSET_LATEST:
offset = consumer.maxOffset(mq);
break;
case CONSUMER_OFFSET_TIMESTAMP:
offset = consumer.searchOffset(mq, getLong(props,
RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
break;
default:
throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
}
}
}
offsetTable.put(mq, offset);
return offsetTable.get(mq);
}
private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
offsetTable.put(mq, offset);
if (!enableCheckpoint) {
consumer.updateConsumeOffset(mq, offset);
}
}
@Override
public void cancel() {
LOG.debug("cancel ...");
runningChecker.setRunning(false);
if (pullConsumerScheduleService != null) {
pullConsumerScheduleService.shutdown();
}
if (offsetTable != null) {
offsetTable.clear();
}
if (restoredOffsets != null) {
restoredOffsets.clear();
}
if (pendingOffsetsToCommit != null) {
pendingOffsetsToCommit.clear();
}
}
@Override
public void close() throws Exception {
LOG.debug("close ...");
// pretty much the same logic as cancelling
try {
cancel();
} finally {
super.close();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when a snapshot for a checkpoint is requested
if (!runningChecker.isRunning()) {
LOG.debug("snapshotState() called on closed source; returning null.");
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
}
unionOffsetStates.clear();
HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
// remove the unassigned queues in order to avoid read the wrong offset when the source restart
Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
currentOffsets.put(entry.getKey(), entry.getValue());
}
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
if (LOG.isDebugEnabled()) {
LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called every time the user-defined function is initialized,
// be that when the function is first initialized or be that
// when the function is actually recovering from an earlier checkpoint.
// Given this, initializeState() is not only the place where different types of state are initialized,
// but also where state recovery logic is included.
LOG.debug("initialize State ...");
this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
})));
this.restored = context.isRestored();
if (restored) {
if (restoredOffsets == null) {
restoredOffsets = new ConcurrentHashMap<>();
}
for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
}
}
LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
} else {
LOG.info("No restore state for the consumer.");
}
}
@Override
public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// callback when checkpoint complete
if (!runningChecker.isRunning()) {
LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
return;
}
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
1.31.19.RocketMQUtils.java
package org.apache.rocketmq.flink;
import java.util.Properties;
public final class RocketMQUtils {
public static int getInteger(Properties props, String key, int defaultValue) {
return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
}
public static long getLong(Properties props, String key, long defaultValue) {
return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
}
public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
1.31.20.RunningChecker.java
package org.apache.rocketmq.flink;
import java.io.Serializable;
public class RunningChecker implements Serializable {
private volatile boolean isRunning = false;
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean running) {
isRunning = running;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
1.31.21.DateUtils.java
package com.xxxxx.issue.utils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* Created by Administrator on 2017/3/7.
*
* @author Administrator
*/
public final class DateUtils {
private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
/**
* 时间格式符:yyyy-MM-dd HH:mm:ss
*/
public static final String PATTERN_DATE_TIME = "yyyy-MM-dd HH:mm:ss";
/**
* 精简时间格式符:yyyyMMddHHmmss
*/
public static final String PATTERN_DATE_TIME_SIMPLE = "yyyyMMddHHmmss";
/**
* 毫秒格式符:yyyyMMddhhmmssSSS
*/
public static final String PATTERN_DATE_TIME_ALL = "yyyyMMddhhmmssSSS";
/**
* 时间格式符:yyyy-MM-dd
*/
public static final String PATTERN_DATE = "yyyy-MM-dd";
/**
* 精简时间格式符:yyyyMM dd
*/
public static final String PATTERN_DATE_SIMPLE = "yyyyMMdd";
/**
* 精简时间格式符:yyMM
*/
public static final String PATTERN_DATE_SIMPLE_YYMM = "yyMM";
/**
* 时间格式符:HH:mm:ss
*/
public static final String PATTERN_TIME = "HH:mm:ss";
/**
* 精简时间格式符:HHmmss
*/
public static final String PATTERN_TIME_SIMPLE = "HHmmss";
/**
* 时间格式符:yyyy
*/
public static final String PATTERN_YYYY = "yyyy";
/**
* 时间格式符:MM
*/
public static final String PATTERN_MM = "MM";
/**
* 时间格式符:dd
*/
public static final String PATTERN_DAY = "dd";
/**
* 从开始到现在的最大时间戳
**/
public static final Long TIME_MAX = 999999999999L;
/**
* 时间格式符:E
*/
public static final String PATTERN_WEEK = "E";
private static final String DATE_STR = "yyyy-MM-dd";
private static final String TIME_STR = " 00:00:00";
private static final String DATE_TIME_STR = "yyyy-MM-dd HH:mm:ss";
/**
* @return 取得系统毫秒数,返回Long.
*/
public static Long getTimeMillisLong() {
return System.currentTimeMillis();
}
/**
* @return 取得系统秒数,返回Long.
*/
public static Long getTimeSecondLong() {
return System.currentTimeMillis() / 1000;
}
/**
* @return 取得系统毫秒数,返回字符串.
*/
public static String getTimeMillisString() {
return System.currentTimeMillis() + "";
}
/**
* @return 取得现在年月日时分秒yyyy-MM-dd HH:mm:ss.
*/
public static String getNowDateTime() {
return formateDateTime(PATTERN_DATE_TIME);
}
/**
* @return 取得现在年月日时分秒yyyyMMddHHmmss.
*/
public static String getNowDateTimeSimple() {
return formateDateTime(PATTERN_DATE_TIME_SIMPLE);
}
/**
* @return 取得现在年月日时分秒yyyyMMddHHmmss.
*/
public static String getNowDateTimeAll() {
return formateDateTime(PATTERN_DATE_TIME_ALL);
}
/**
* @return 取得现在年月日yyyy-MM-dd.
*/
public static String getNowDate() {
return formateDateTime(PATTERN_DATE);
}
/**
* @return 取得现在年月日yyyyMMdd.
*/
public static String getNowDateSimple() {
return formateDateTime(PATTERN_DATE_SIMPLE);
}
/**
* @return 取得现在年月yyMM.
*/
public static String getNowDateSimpleYymm() {
return formateDateTime(PATTERN_DATE_SIMPLE_YYMM);
}
/**
* @return 取得现在时分秒HH:mm:ss.
*/
public static String getNowTime() {
return formateDateTime(PATTERN_TIME);
}
/**
* @return 取得现在时分秒HHmmss.
*/
public static String getNowTimeSimple() {
return formateDateTime(PATTERN_TIME_SIMPLE);
}
/**
* @return 取得现在年yyyy.
*/
public static String getNowYear() {
return formateDateTime(PATTERN_YYYY);
}
/**
* @return 取得现在月MM.
*/
public static String getNowMonth() {
return formateDateTime(PATTERN_MM);
}
/**
* @return 取得现在天dd.
*/
public static String getNowDay() {
return formateDateTime(PATTERN_DAY);
}
/**
* @return 取得现在星期,格式为星期一.
*/
public static String getNowWeek() {
return formateDateTime(PATTERN_WEEK);
}
/**
* @param pattern 自定义的格式类型
* @return 根据自定义格式取得现在时间.
*/
public static String getNowDateTime(String pattern) {
if (StringUtils.isNotBlank(pattern)) {
return formateDateTime(pattern);
} else {
return "";
}
}
/**
* @return 取得当月第一天.
*/
public static String getStartDayOfMonth() {
DateTime dateTime = new DateTime();
return dateTime.dayOfMonth().withMinimumValue().withTimeAtStartOfDay().toString(PATTERN_DATE);
}
/**
* @return 取得当月最后一天.
*/
public static String getEndDayOfMonth() {
DateTime dateTime = new DateTime();
return dateTime.dayOfMonth().withMaximumValue().millisOfDay().withMaximumValue().toString(PATTERN_DATE);
}
/**
* @param month 月数
* @return 增加月数
* @throws Exception 向外抛出异常
*/
public static String addMonth(int month) throws Exception {
//设置日期格式
SimpleDateFormat df = new SimpleDateFormat(PATTERN_DATE_TIME);
String validatetime = df.format(new Date());
Date now = df.parse(validatetime);
Calendar calendar = Calendar.getInstance();
calendar.setTime(now);
calendar.add(Calendar.MONTH, month);
return calendar.getTime().getTime() + "";
}
/**
* 增加或减少指定数量的天数
*
* @param date :传入的时间
* @param num :增加或减少的天数,增加num值为正数,减少num的值为负数
* @return 时间date
*/
public static Date addOrMinusDay(Date date, int num) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.add(Calendar.DAY_OF_MONTH, num);
return calendar.getTime();
}
/**
* @param date 时间类型的字符串
* @param pattern 时间的格式类型
* @return 根据时间字符串转换成Long毫秒数. 时间字符串的格式应该与Pattern的样式一致.
*/
public static Long getStringToLong(String date, String pattern) {
if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {
return 0L;
} else {
DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);
return dateTime.getMillis();
}
}
/**
* @param date 时间
* @param pattern 时间类型
* @return 根据时间字符串转换成Long秒数. 时间字符串的格式应该与Pattern的样式一致.
*/
public static int getStringToIntSeconds(String date, String pattern) {
if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {
return 0;
} else {
DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);
return Integer.parseInt(dateTime.getMillis() / 1000 + "");
}
}
/**
* @param millis long型的时间值
* @param pattern 要转换成的类型
* @return 根据String毫秒数转换成时间字符串.
*/
public static String getLongToString(String millis, String pattern) {
DateTime dateTime = new DateTime(Long.parseLong(millis));
return dateTime.toString(pattern);
}
/**
* @param millis long型的时间值
* @param pattern 要转换成的类型
* @return 根据Long毫秒数转换成时间字符串.
*/
public static String getLongToString(long millis, String pattern) {
DateTime dateTime = new DateTime(millis);
return dateTime.toString(pattern);
}
/**
* @param seconds 秒值
* @return 根据秒数获取天时分秒.
*/
public static String getRuntimeBySecond(int seconds) {
long diffSeconds = seconds % 60;
long diffMinutes = seconds / 60 % 60;
long diffHours = seconds / (60 * 60) % 24;
long diffDays = seconds / (24 * 60 * 60);
StringBuffer buffer = new StringBuffer();
buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");
return buffer.toString();
}
/**
* @param millis 毫秒值
* @return 根据毫秒数获取天时分秒.
*/
public static String getRuntimeByMillis(long millis) {
long diffSeconds = millis / 1000 % 60;
long diffMinutes = millis / (60 * 1000) % 60;
long diffHours = millis / (60 * 60 * 1000) % 24;
long diffDays = millis / (24 * 60 * 60 * 1000);
StringBuffer buffer = new StringBuffer();
buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");
return buffer.toString();
}
/**
* @param pattern 格式类型
* @return 转换时间.
*/
private static String formateDateTime(String pattern) {
DateTime dateTime = new DateTime();
return dateTime.toString(pattern);
}
/**
* @param time 字符串时间
* @return 转String为Calendar
* @throws ParseException 向外抛出异常
*/
public static Calendar changecal(String time) throws ParseException {
//转类型
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = sdf.parse(time);
Calendar cal = Calendar.getInstance();
cal.setTime(date);
return cal;
}
/**
* @param cal 日期对象
* @return 转Calendar为String
*/
public static String changestr(Calendar cal) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String time = sdf.format(cal.getTime());
return time;
}
/**
* @param cal 日期对象
* @return 根据日期获取当月第一天和最后一天的索引
*/
public static List<Integer> flmonthdate(Calendar cal) {
//获取本月第一天和最后一天
int monthMum = cal.get(Calendar.MONTH);
cal.set(Calendar.DAY_OF_MONTH, 1);
int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);
int nextMonth = monthMum + 1;
cal.set(Calendar.MONTH, nextMonth);
cal.set(Calendar.DAY_OF_MONTH, 1);
int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);
int lastDayInThisMonth = firstDayInNextMonth - 1;
List<Integer> list = new ArrayList<Integer>();
list.add(0, firstDayInThisMonth);
list.add(1, lastDayInThisMonth);
return list;
}
/**
* @param cal 指定的日期对象
* @return 根据日期获取当天和当月最后一天的索引
*/
public static List<Integer> twoDay(Calendar cal) {
int firstDay = cal.get(Calendar.DAY_OF_YEAR);
int monthMum = cal.get(Calendar.MONTH);
int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);
int nextMonth = monthMum + 1;
cal.set(Calendar.MONTH, nextMonth);
cal.set(Calendar.DAY_OF_MONTH, 1);
int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);
int lastDayInThisMonth = firstDayInNextMonth - 1;
List<Integer> list = new ArrayList<Integer>();
list.add(0, firstDay);
list.add(1, lastDayInThisMonth);
return list;
}
/**
* @param i 日期索引值
* @return 日期索引转毫秒数
*/
public static String changeday(int i) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
//赋值日期
cal.set(Calendar.DAY_OF_YEAR, i);
//日期转毫秒
String time = Long.toString(DateUtils.getStringToLong(sdf.format(cal.getTime()), DateUtils.PATTERN_DATE));
return time;
}
/**
* @param date 指定时间
* @return 获取当月最大天数
* @throws ParseException 向外抛出异常
*/
public static int getMaxDay(String date) throws ParseException {
Calendar cal = changecal(date);
int day = cal.getActualMaximum(Calendar.DATE);
return day;
}
/**
* @param date 指定的时间
* @return 获取当月除星期天天数
* @throws ParseException 向外抛出异常
*/
public static int getDayNoSunday(String date) throws ParseException {
Calendar cal = changecal(date);
List<Integer> day = flmonthdate(cal);
int days = 0;
for (int i = day.get(0); i <= day.get(1); i++) {
cal.set(Calendar.DAY_OF_YEAR, i);
int weekDay = cal.get(Calendar.DAY_OF_WEEK);
if (weekDay != 1) {
days++;
}
}
return days;
}
/**
* @param date 指定的时间字符串
* @return 获取当月某日到月底中除星期天的天数
* @throws ParseException 向外抛出异常
*/
public static int getDayNoSundayBewToDay(String date) throws ParseException {
Calendar cal = changecal(date);
List<Integer> day = twoDay(cal);
int days = 0;
for (int i = day.get(0); i <= day.get(1); i++) {
cal.set(Calendar.DAY_OF_YEAR, i);
int weekDay = cal.get(Calendar.DAY_OF_WEEK);
if (weekDay != 1) {
days++;
}
}
return days;
}
/**
* @param date 指定的时间字符串
* @return 获取当月某日到月底中除星期六星期天的天数
* @throws ParseException 向外抛出异常
*/
public static int getDayNoWeekendBewToDay(String date) throws ParseException {
Calendar cal = changecal(date);
List<Integer> day = twoDay(cal);
int days = 0;
for (int i = day.get(0); i <= day.get(1); i++) {
cal.set(Calendar.DAY_OF_YEAR, i);
int weekDay = cal.get(Calendar.DAY_OF_WEEK);
if (weekDay != 1 && weekDay != 7) {
days++;
}
}
return days;
}
/**
* @param date 指定的时间字符串
* @return 获取当月某日到月底中的天数
* @throws ParseException 向外抛出异常
*/
public static int getDayNoWeekend(String date) throws ParseException {
Calendar cal = changecal(date);
List<Integer> day = twoDay(cal);
int days = 0;
for (int i = day.get(0); i <= day.get(1); i++) {
days++;
}
return days;
}
/**
* @param i 索引值
* @return 日期索引转String
*/
public static String getday(int i) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
//赋值日期
cal.set(Calendar.DAY_OF_YEAR, i);
String day = changestr(cal);
return day;
}
/**
* @param time 时间的字符串
* @return 时间 转 毫秒 time格式:yyyy-MM-dd hh:mm:ss
* @throws ParseException 向外抛出异常
*/
public static long dateChangeMillisecond(String time) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
//毫秒
long millionSeconds = sdf.parse(time).getTime();
return millionSeconds;
}
/**
* @param time 时间的字符串
* @return 时间 转 毫秒 time格式:yyyyMMddhhmmss
* @throws ParseException 向外抛出异常
*/
public static long dateChangeMillisecond1(String time) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddhhmmss");
//毫秒
long millionSeconds = sdf.parse(time).getTime();
return millionSeconds;
}
/**
* @return 计算当前日期的开始时间 time格式:yyyyMMdd
* @throws ParseException 向外抛出异常
*/
public static long getTodayStartLongSecond() throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
//毫秒
String nowDate = sdf.format(new Date()) + "000000";
return dateChangeMillisecond1(nowDate) / 1000;
}
/**
* @return 计算当前日期的开始时间 time格式:yyyyMMdd
* @throws ParseException 向外抛出异常
*/
public static long getTodayEndLongSecond() throws ParseException {
return getTodayStartLongSecond() + 24 * 60 * 60;
}
/**
* @param times long 时间戳[]
* @param dateFor 数据格式类型
* @return 计算当前日期的开始时间 time格式:yyyyMMdd
*/
public static String getDateStrLongtoString(Long times, String dateFor) {
SimpleDateFormat sdf = new SimpleDateFormat(dateFor);
String str = "";
if (times > TIME_MAX) {
Date date = new Date(times);
str = sdf.format(date);
} else {
Date date = new Date(times * 1000);
str = sdf.format(date);
}
return str;
}
/**
* @param year :指定年
* @param month :指定月
* @param flag :true 月的最早开始时间 false,最
* @return 获取指定月的最早开始时间 和 指定月的最晚时间
*/
public static Date getStartOrEndDateInTargetMonth(int year, int month, boolean flag) {
GregorianCalendar ca = new GregorianCalendar();
ca.clear();
ca.set(Calendar.YEAR, year);
ca.set(Calendar.MONTH, month - 1);
Calendar calendar = Calendar.getInstance();
//设置时间
calendar.setTime(ca.getTime());
//要获得上一个月的第一天
if (flag) {
calendar.set(Calendar.MONTH, calendar.get(Calendar.MONTH));
//设置"日"
calendar.set(Calendar.DAY_OF_MONTH, 1);
//设置"时"
calendar.set(Calendar.HOUR_OF_DAY, 0);
//设置"分"
calendar.set(Calendar.MINUTE, 0);
//设置"秒"
calendar.set(Calendar.SECOND, 0);
//设置"毫秒"
calendar.set(Calendar.MILLISECOND, 0);
} else {
//设置"日"
calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMaximum(Calendar.DAY_OF_MONTH));
//设置"时"
calendar.set(Calendar.HOUR_OF_DAY, 23);
//设置"分"
calendar.set(Calendar.MINUTE, 59);
//设置"秒"
calendar.set(Calendar.SECOND, 59);
//设置"毫秒"
calendar.set(Calendar.MILLISECOND, 999);
}
return calendar.getTime();
}
/**
* <p class="detail">
* 功能:日期格式化为字符串
* </p>
* @author tangy
* @param date 时间
* @param format 格式
* @return 日期格式化为字符串
*/
public static String dateFormat(Date date,String format){
return new SimpleDateFormat(format).format(date);
}
// public static void main(String[] args) {
// LOG.info(DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE));
// }
public static Date parseDate(String datestr,String format) throws ParseException{
return new SimpleDateFormat(format).parse(datestr);
}
/**
* <p class="detail">
* 功能:日期查询时根据类型获得日期查询范围的开始时间
* </p>
* @author tangy
* @param dateType 10今天,20昨天,30过去7天,40过去30天,50过去3个月,60过去6个月,70过去一年
* @return 日期扣减后的当天凌晨时间
*/
public static Date getStartDateByType(Integer dateType){
Date curDate = new Date();
Date sDate = null;
Calendar rightNow = Calendar.getInstance();
rightNow.setTime(curDate);
if(dateType==null){
sDate=new Date();
}else if(dateType==10){
try {
sDate = parseDate(getMorningToString(curDate),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==20){
rightNow.add(Calendar.DAY_OF_YEAR,-1);//日期加减一天
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==30){
rightNow.add(Calendar.DAY_OF_YEAR,-6);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==35){
rightNow.add(Calendar.DAY_OF_YEAR,-13);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==40){
rightNow.add(Calendar.MONTH,-1);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==50){
rightNow.add(Calendar.MONTH,-3);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==60){
rightNow.add(Calendar.MONTH,-6);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}else if(dateType==70){
rightNow.add(Calendar.MONTH,-12);
try {
sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
} catch (ParseException e) {
e.printStackTrace();
}
}
return sDate;
}
/**
* <p class="detail">
* 功能:获得传入的时间凌晨时间(即00:00:00)
* </p>
* @author tangy
* @param date 时间对象
* @return 获得传入的时间凌晨时间(即00:00:00)
*/
public static String getMorningToString(Date date){
return dateFormat(date,DATE_TIME_STR).substring(0,10)+TIME_STR;
}
/**
* <p class="detail">
* 功能:获得传入的时间午夜时间(即23:59:59)
* </p>
* @author tangy
* @param date 时间对象
* @return 获得传入的时间午夜时间(即23:59:59)
*/
public static String getNightToString(Date date){
return dateFormat(date,DATE_TIME_STR).substring(0,10)+" 23:59:59";
}
/**
* <p class="detail">
* 功能:获取上月第一天日期
* </p>
* @author tangy
* @return 获取上月第一天日期
*/
public static Date getLastMonthFirstDay(){
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MONTH, -1);
calendar.set(Calendar.DAY_OF_MONTH, 1);
return calendar.getTime();
}
/**
* <p class="detail">
* 功能:获得上月最后一天日期
* </p>
* @author tangy
* @return 获得上月最后一天日期
*/
public static Date getLastMonthEndDay(){
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_MONTH, 1);
calendar.add(Calendar.DATE, -1);
return calendar.getTime();
}
/**
* <p class="detail">
* 功能:获取当月第一天
* </p>
* @author zhanghl
* @return 获取当月第一天
*/
public static String getCurrentMonthFirstDay(){
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.DAY_OF_MONTH, 1);
return dateFormat(calendar.getTime(),DATE_TIME_STR).substring(0,10)+TIME_STR;
}
/**
* <p class="detail">
* 功能:获得给定的时间所在月份第一天的时间
* </p>
* @author tangy
* @param date 时间
* @return 返回给定时间第一天00:00:00点时间
*/
public static String getMonthFirstDay(Date date){
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.set(Calendar.DAY_OF_MONTH, 1);
return dateFormat(calendar.getTime(),DATE_STR)+TIME_STR;
}
/**
* <p class="detail">
* 功能:获得给定的时间所在月份最后一天的时间
* </p>
* @author tangy
* @param date 时间
* @return 返回给定时间最后一天23:59:59点时间
*/
public static String getMonthEndDay(Date date){
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.set(Calendar.DATE, calendar.getActualMaximum(Calendar.DATE));
return dateFormat(calendar.getTime(),DATE_STR)+" 23:59:59";
}
/**
* <p class="detail">
* 功能:按格式格式化时间字符串,如果时间格式不对则返回空
* </p>
* @author tangy
* @param dateStr 时间字符串
* @param dateFormat (如:yyyy-MM-dd HH:mm:ss)
* @return 按格式格式化时间字符串,如果时间格式不对则返回空
*/
public static String getDateFormat(String dateStr,String dateFormat){
String result = null;
if(dateStr!=null && dateFormat!=null){
try{
result=dateFormat(parseDate(dateStr,dateFormat),dateFormat);
}catch(Exception e){
logger.info(e.getMessage());
}
}
return result;
}
/**
* <p class="detail">
* 功能:时间加减某个天数后的时间
* </p>
* @author tangy
* @param date 要加减的时间
* @param day 要加减的天数,减天数传负数
* @return 时间加减某个天数后的时间
*/
public static Date addSubtractDate(Date date,int day){
Calendar resultCalendar = Calendar.getInstance();
resultCalendar.setTime(date);
resultCalendar.add(Calendar.DAY_OF_YEAR,day);
return resultCalendar.getTime();
}
public static Date timeAddTwoHour(){
Date now = new Date();
try {
return parseDate(dateFormat(new Date(now.getTime()+7200000),"yyyy-MM-dd HH:mm"),"yyyy-MM-dd HH:mm");
} catch (ParseException e) {
e.printStackTrace();
}
return now;
}
/**
* <p class="detail">
* 功能:时间戳转换成字符窜
* </p>
* @author zhangqi
* @param time 时间戳
* @return 时间戳转换成字符窜
*/
public static String getDateToString(long time) {
Date d = new Date(time);
SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);
return sf.format(d);
}
/**
* <p class="detail">
* 功能:字符串转换成时间戳
* </p>
* @author zhangqi
* @param time 时间的字符串
* @return 字符串转换成时间戳
*/
public static long getStringToDate(String time) {
SimpleDateFormat sdf = new SimpleDateFormat(DATE_TIME_STR);
Date date = new Date();
try{
date = sdf.parse(time);
} catch(ParseException e) {
e.printStackTrace();
}
return date.getTime();
}
/**
* <p >
* 功能:获取当天时间的时间戳 (精确时分秒)
* </p>
* @param
* @author chenyx
* @date
* @return XXX
*/
public static Long getNowDateToDate(){
SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);
String nowDateStr = sf.format(new Date());
Date date = new Date();
try {
date = sf.parse(nowDateStr);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return date.getTime();
}
/**
* <p >
* 功能:获取当天时间的时间戳 (年月日)
* </p>
* @param
* @author chenyx
* @return Long型年月日时间戳
*/
public static Long getYearMonthDayTimeStamp(){
SimpleDateFormat sf = new SimpleDateFormat(DATE_STR);
String nowDateStr = sf.format(new Date());
Date date = new Date();
try {
date = sf.parse(nowDateStr);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return date.getTime();
}
/**
* <p >
* 功能:两个时间之间的秒数
* </p>
*
* @param time 第一个时间
* @param bigTime 第二个时间
* @author zhangq
* @return XXX
*/
public static int countTimes(Date time, Date bigTime){
long timeNum = time.getTime();
long bigTimeNum = bigTime.getTime();
int count = (int)((bigTimeNum - timeNum) / 1000);
return count;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
- 382
- 383
- 384
- 385
- 386
- 387
- 388
- 389
- 390
- 391
- 392
- 393
- 394
- 395
- 396
- 397
- 398
- 399
- 400
- 401
- 402
- 403
- 404
- 405
- 406
- 407
- 408
- 409
- 410
- 411
- 412
- 413
- 414
- 415
- 416
- 417
- 418
- 419
- 420
- 421
- 422
- 423
- 424
- 425
- 426
- 427
- 428
- 429
- 430
- 431
- 432
- 433
- 434
- 435
- 436
- 437
- 438
- 439
- 440
- 441
- 442
- 443
- 444
- 445
- 446
- 447
- 448
- 449
- 450
- 451
- 452
- 453
- 454
- 455
- 456
- 457
- 458
- 459
- 460
- 461
- 462
- 463
- 464
- 465
- 466
- 467
- 468
- 469
- 470
- 471
- 472
- 473
- 474
- 475
- 476
- 477
- 478
- 479
- 480
- 481
- 482
- 483
- 484
- 485
- 486
- 487
- 488
- 489
- 490
- 491
- 492
- 493
- 494
- 495
- 496
- 497
- 498
- 499
- 500
- 501
- 502
- 503
- 504
- 505
- 506
- 507
- 508
- 509
- 510
- 511
- 512
- 513
- 514
- 515
- 516
- 517
- 518
- 519
- 520
- 521
- 522
- 523
- 524
- 525
- 526
- 527
- 528
- 529
- 530
- 531
- 532
- 533
- 534
- 535
- 536
- 537
- 538
- 539
- 540
- 541
- 542
- 543
- 544
- 545
- 546
- 547
- 548
- 549
- 550
- 551
- 552
- 553
- 554
- 555
- 556
- 557
- 558
- 559
- 560
- 561
- 562
- 563
- 564
- 565
- 566
- 567
- 568
- 569
- 570
- 571
- 572
- 573
- 574
- 575
- 576
- 577
- 578
- 579
- 580
- 581
- 582
- 583
- 584
- 585
- 586
- 587
- 588
- 589
- 590
- 591
- 592
- 593
- 594
- 595
- 596
- 597
- 598
- 599
- 600
- 601
- 602
- 603
- 604
- 605
- 606
- 607
- 608
- 609
- 610
- 611
- 612
- 613
- 614
- 615
- 616
- 617
- 618
- 619
- 620
- 621
- 622
- 623
- 624
- 625
- 626
- 627
- 628
- 629
- 630
- 631
- 632
- 633
- 634
- 635
- 636
- 637
- 638
- 639
- 640
- 641
- 642
- 643
- 644
- 645
- 646
- 647
- 648
- 649
- 650
- 651
- 652
- 653
- 654
- 655
- 656
- 657
- 658
- 659
- 660
- 661
- 662
- 663
- 664
- 665
- 666
- 667
- 668
- 669
- 670
- 671
- 672
- 673
- 674
- 675
- 676
- 677
- 678
- 679
- 680
- 681
- 682
- 683
- 684
- 685
- 686
- 687
- 688
- 689
- 690
- 691
- 692
- 693
- 694
- 695
- 696
- 697
- 698
- 699
- 700
- 701
- 702
- 703
- 704
- 705
- 706
- 707
- 708
- 709
- 710
- 711
- 712
- 713
- 714
- 715
- 716
- 717
- 718
- 719
- 720
- 721
- 722
- 723
- 724
- 725
- 726
- 727
- 728
- 729
- 730
- 731
- 732
- 733
- 734
- 735
- 736
- 737
- 738
- 739
- 740
- 741
- 742
- 743
- 744
- 745
- 746
- 747
- 748
- 749
- 750
- 751
- 752
- 753
- 754
- 755
- 756
- 757
- 758
- 759
- 760
- 761
- 762
- 763
- 764
- 765
- 766
- 767
- 768
- 769
- 770
- 771
- 772
- 773
- 774
- 775
- 776
- 777
- 778
- 779
- 780
- 781
- 782
- 783
- 784
- 785
- 786
- 787
- 788
- 789
- 790
- 791
- 792
- 793
- 794
- 795
- 796
- 797
- 798
- 799
- 800
- 801
- 802
- 803
- 804
- 805
- 806
- 807
- 808
- 809
- 810
- 811
- 812
- 813
- 814
- 815
- 816
- 817
- 818
- 819
- 820
- 821
- 822
- 823
- 824
- 825
- 826
- 827
- 828
- 829
- 830
- 831
- 832
- 833
- 834
- 835
- 836
- 837
- 838
- 839
- 840
- 841
- 842
- 843
- 844
- 845
- 846
- 847
- 848
- 849
- 850
- 851
- 852
- 853
- 854
- 855
- 856
- 857
- 858
- 859
- 860
- 861
- 862
- 863
- 864
- 865
- 866
- 867
- 868
- 869
- 870
- 871
- 872
- 873
- 874
- 875
- 876
- 877
- 878
- 879
- 880
- 881
- 882
- 883
- 884
- 885
- 886
- 887
- 888
- 889
- 890
- 891
- 892
- 893
- 894
- 895
- 896
- 897
- 898
- 899
- 900
- 901
- 902
- 903
- 904
- 905
- 906
- 907
- 908
- 909
- 910
- 911
- 912
- 913
- 914
- 915
- 916
- 917
- 918
- 919
- 920
- 921
- 922
- 923
- 924
- 925
- 926
- 927
- 928
- 929
- 930
- 931
- 932
- 933
- 934
- 935
- 936
- 937
- 938
- 939
- 940
- 941
- 942
- 943
- 944
- 945
- 946
- 947
- 948
- 949
- 950
- 951
- 952
- 953
1.31.22.PropertiesUtils.java
package com.xxxxx.issue.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.Properties;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName PropertiesUtils
* @description TODO
* @date 2020/9/23 9:23
**/
public final class PropertiesUtils {
private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
private static PropertiesUtils instance = null;
/** 间隔xxx秒产生checkpoing **/
private Integer flinkCheckpointsInterval = null;
/** 确保检查点之间有至少xxx ms的间隔 **/
private Integer flinkMinPauseBetweenCheckpoints = null;
/** 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 **/
private Integer flinkCheckpointTimeout = null;
/** 同一时间只允许进行一个检查点 **/
private Integer flinkMaxConcurrentCheckpoints = null;
/** 尝试重启次数 **/
private Integer flinkFixedDelayRestartTimes = null;
/** 每次尝试重启时之间的时间间隔 **/
private Integer flinkFixedDelayRestartInterval = null;
private String rocketmqNameServer = null;
private String rocketMqTopics = null;
/** rocketmq source 的并行度 **/
private Integer rockeqMqSourceParallelism = null;
/** redis sink 的并行度 **/
private Integer redisSinkParallelism = null;
private Integer redisDefaultExpirationTime = null;
/**
* 静态代码块
*/
private PropertiesUtils() {
try {
// 读取配置文件,通过类加载器的方式读取属性文件
InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("project-config.properties");
Properties prop = new Properties();
prop.load(in);
rocketmqNameServer = prop.getProperty("rocketmq.name.server.addr").trim();
rocketMqTopics = prop.getProperty("rocketmq.topics").trim();
flinkCheckpointsInterval = Integer.parseInt(prop.getProperty("flink.checkpoint.interval").trim());
flinkMinPauseBetweenCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.minPauseBetweenCheckpoints").trim());
flinkCheckpointTimeout = Integer.parseInt(prop.getProperty("flink.checkpoint.checkpointTimeout").trim());
flinkMaxConcurrentCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.maxConcurrentCheckpoints").trim());
flinkFixedDelayRestartTimes = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.times").trim());
flinkFixedDelayRestartInterval = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.interval").trim());
rockeqMqSourceParallelism = Integer.parseInt(prop.getProperty("flink.rockeqmq.source.parallelism").trim());
redisSinkParallelism = Integer.parseInt(prop.getProperty("flink.redis.sink.parallelism").trim());
redisDefaultExpirationTime = Integer.parseInt(prop.getProperty("redis.default.expiration.time").trim());
in.close();
in = null;
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
public static PropertiesUtils getInstance() {
if (instance == null) {
instance = new PropertiesUtils();
}
return instance;
}
public Integer getFlinkCheckpointsInterval() {
return flinkCheckpointsInterval;
}
public Integer getFlinkMinPauseBetweenCheckpoints() {
return flinkMinPauseBetweenCheckpoints;
}
public Integer getFlinkCheckpointTimeout() {
return flinkCheckpointTimeout;
}
public Integer getFlinkMaxConcurrentCheckpoints() {
return flinkMaxConcurrentCheckpoints;
}
public String getRocketmqNameServer() {
return rocketmqNameServer;
}
public String getRocketMqTopics() {
return rocketMqTopics;
}
public Integer getRockeqMqSourceParallelism() {
return rockeqMqSourceParallelism;
}
public Integer getRedisSinkParallelism() {
return redisSinkParallelism;
}
public Integer getFlinkFixedDelayRestartTimes() {
return flinkFixedDelayRestartTimes;
}
public Integer getFlinkFixedDelayRestartInterval() {
return flinkFixedDelayRestartInterval;
}
public Integer getRedisDefaultExpirationTime() {
return redisDefaultExpirationTime;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
1.31.23.RedisUtil.java
package com.xxxxx.issue.utils;
import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
import com.xxxxx.tmc.commons.constant.CacheLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName RedisUtil
* @description TODO
* @date 2020/10/28 16:06
**/
public final class RedisUtil {
private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
private static TqCacheServiceImpl cacheService = new TqCacheServiceImpl("");
public static TqCacheServiceImpl getCacheServiceInstance() {
// if (null != cacheService) {
// return cacheService;
// }
//
// cacheService = new TqCacheServiceImpl("");
return cacheService;
}
public static void main(String[] args) throws InterruptedException {
TqCacheServiceImpl cacheService = RedisUtil.getCacheServiceInstance();
// cacheService.set("issue:test",2);
// cacheService.set("issue:2:20210108:000000:0:3:1.1.10.",2);
// cacheService.set("issue:2:20210108:000000:0:4:1.1.10.1.",23);
// LOG.info("=================================");
// LOG.info(cacheService.get("issue:2:20210108:000000:0:4:1.1.10.1.").toString());
// for(int i = 0 ; i < 10000; ++i) {
// cacheService.set("issue:test",i);
// System.out.println(cacheService.get("issue:test"));
// }
LOG.info("=================================");
String code = "issue:2:20210113:000000:0:4:1.1.10.1.";
System.out.println(cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE));
LOG.info("=================================");
//cacheService.destroy();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
1.31.24.IssueConstants.java
package com.xxxxx.issue.constant;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName IssueConstants
* @description TODO
* @date 2020/9/21 8:55
**/
public final class IssueConstants {
/** 编码的前缀 **/
public static final String ISSUE_CODE_PREFIX = "issue:";
public static final String ISSUE_CODE_COLON = ":";
public static final String TOPIC_SPLITTER = "##";
/** 处理维度 **/
/** 部门以及下辖 **/
public static final String DIMENSION_DEPARTMENT_ALL = "0";
/** 仅仅是自己处理的 **/
public static final String DIMENSION_DEPARTMENT_ONLY = "1";
/** 单人 **/
public static final String DIMENSION_DEPARTMENT_USER = "2";
/** 1、事件受理数 **/
public static final String ROCKETMQ_ACCEPT_CONSUMER_GROUP_1 = "group_1";
public static final String ROCKETMQ_ACCEPT_TAG = "issue_accept_operat";
/** 指标类型 **/
public static final String ISSUE_ACCEPT_TYPE = "1";
/** 2、发生事件数 **/
public static final String ROCKETMQ_HAPPEN_CONSUMER_GROUP_2 = "group_2";
public static final String ROCKETMQ_HAPPEN_TAG = "issue_accept_operat || issue_add_operat || issue_delete_operat";
/** 指标类型 **/
public static final String ISSUE_HAPPEN_TYPE = "2";
/** 3、事件办结的TAG **/
public static final String ROCKETMQ_PASS_COMPLETE_CONSUMER_GROUP_3 = "group_3";
public static final String ROCKETMQ_PASS_COMPLETE_TAG = "issue_inspect_pass_operat || issue_complete_operat";
/** 指标类型 **/
public static final String ISSUE_PASS_COMPLETE_TYPE = "3";
/** 4、签收件次 **/
public static final String ROCKETMQ_SIGNFOR_CONSUMER_GROUP_4 = "group_4";
public static final String ROCKETMQ_SIGNFOR_TAG = "issue_signfor_operat";
/** 指标类型 **/
public static final String ISSUE_SIGNFOR_TYPE = "4";
/** 5、处置件次的TAG **/
public static final String ROCKETMQ_HANDLE_CONSUMER_GROUP_5 = "group_5";
public static final String ROCKETMQ_HANDLE_TAG = "issue_comment_operat || issue_assignReply_operat || issue_complete_operat "
+ " || issue_report_operat || issue_assign_operat";
/** 指标类型 **/
public static final String ISSUE_HANDLE_TYPE = "5";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
1.31.25.IssueAcceptRedisSink.java
package com.xxxxx.issue.redissink;
import com.xxxxx.issue.constant.IssueConstants;
import com.xxxxx.issue.utils.PropertiesUtils;
import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
import com.xxxxx.tmc.commons.constant.CacheLevel;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @author tuzuoquan
* @version 1.0
* @ClassName IssuePassRedisSink
* @description TODO
* @date 2020/9/22 14:49
**/
public class IssueAcceptRedisSink extends RichSinkFunction<Tuple4<String, Integer,String, String>> {
private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptRedisSink.class);
TqCacheServiceImpl cacheService;
public IssueAcceptRedisSink() {
//this.cacheService = RedisUtil.getCacheServiceInstance();
// this.cacheService = new TqCacheServiceImpl("");
}
@Override
public void invoke(Tuple4<String, Integer,String, String> input) {
try {
generateKeyAndData(input.f0,input.f1,input.f2,input.f3);
} catch (Exception e) {
LOG.error("处理受理数出错,错误信息是: ", e.getMessage());
throw e;
}
}
@Override
public void open(Configuration parameters) throws Exception {
try {
LOG.info("=======================cacheService init=============================");
this.cacheService = new TqCacheServiceImpl("");
LOG.info("=======================cacheService end=============================");
} catch (Exception var3) {
LOG.error("Redis has not been properly initialized: ", var3);
throw var3;
}
}
@Override
public void close() throws IOException {
try {
LOG.info("=======================cacheService close start =============================");
this.cacheService.destroy();
this.cacheService = null;
LOG.info("=======================cacheService close end =============================");
} catch (Exception e) {
LOG.error("Redis cacheService has been destroy");
}
}
private synchronized void generateKeyAndData(String handlerOrgCode,Integer operatType,String dayTime,String tenantId) {
if (StringUtils.isNotBlank(handlerOrgCode)) {
String[] codeSegment = handlerOrgCode.split("\\.");
int length = codeSegment.length;
//部门仅自己维度
String theirOwnCode = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX) //表示事件
.append(IssueConstants.ISSUE_ACCEPT_TYPE) //受理数
.append(IssueConstants.ISSUE_CODE_COLON)
.append(dayTime) //日期:类似20200918
.append(IssueConstants.ISSUE_CODE_COLON)
.append(tenantId) // 租户id 0-9
.append(IssueConstants.ISSUE_CODE_COLON)
.append(IssueConstants.DIMENSION_DEPARTMENT_ONLY) //仅自己维度
.append(IssueConstants.ISSUE_CODE_COLON)
.append(length) //层级
.append(IssueConstants.ISSUE_CODE_COLON)
.append(handlerOrgCode) //自己的这个code
.toString();
Long theirOwnNum = 1L;
// redis 是否存在 这个code
if (null != this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE)) {
theirOwnNum = (Long) this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE) + 1;
LOG.info("theirOwnNum=" + theirOwnNum);
}
//this.cacheService.set(theirOwnCode,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),theirOwnNum);
this.cacheService.setWithCacheLevel("default", theirOwnCode,
PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), theirOwnNum,
CacheLevel.REMOTE);
for (int level = 1; level <= length; level++) {
//issue:type:date:dimension:level:code
String codePrefix = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX) //表示事件
.append(IssueConstants.ISSUE_ACCEPT_TYPE) //受理数 指标类型
.append(IssueConstants.ISSUE_CODE_COLON)
.append(dayTime) //日期:类似20200918
.append(IssueConstants.ISSUE_CODE_COLON)
.append(tenantId) // 租户id 0-9
.append(IssueConstants.ISSUE_CODE_COLON)
.append(IssueConstants.DIMENSION_DEPARTMENT_ALL) //部门
.append(IssueConstants.ISSUE_CODE_COLON)
.append(level) //层级
.append(IssueConstants.ISSUE_CODE_COLON)
.toString();
StringBuilder codeSuffix = new StringBuilder();
for (int j = 0; j < level; j++) {
if (StringUtils.isBlank(codeSuffix.toString())) {
codeSuffix.append(codeSegment[j]);
continue;
}
codeSuffix.append(".").append(codeSegment[j]);
}
codeSuffix.append(".");
String code = codePrefix + codeSuffix.toString();
LOG.info(code);
Long num = 1L;
if (null != this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE)) {
num = (Long) this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE) + 1;
}
//this.cacheService.set(code,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),num);
this.cacheService.setWithCacheLevel("default", code,
PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), num,
CacheLevel.REMOTE);
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java
package com.xxxxx.issue.flink.handler;
import com.alibaba.fastjson.JSON;
import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
import com.xxxxx.issue.constant.IssueConstants;
import com.xxxxx.issue.redissink.IssueAcceptRedisSink;
import com.xxxxx.issue.utils.DateUtils;
import com.xxxxx.issue.utils.PropertiesUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.rocketmq.flink.RocketMQConfig;
import org.apache.rocketmq.flink.RocketMQSource;
import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* @author jun
* @version 1.1
* @ClassName IssueAcceptFlinkHandlerByCustomRedisSink
* @description 今日"受理数"实时处理 type = 1
* 含义:预受理中心受理为事件的数量
* 区域范围:本部门、本级及下辖(含同级职能部门)
* 时间区间:今日
* 指标取值:TAG:issue_accept_operat operatType:61
*
* code规则定义:
* issue:type:date:tenant_id:dimension:level:code
* issue: 表示事件
* type 指标类型 1:受理数 2:发生事件数 3:办结数
* date 时间,日期:类似20200918
* tenant_id 租户的id
* dimension 0:部门 1:自己
* level 层级
* code 最后一段的code
*
* 有code的为6段时:
* 部门维度 (dimension为数字标识0) 6段值 issue:type:date:dimension:level:code
* 仅仅自己 (dimension为数字标识1) 1个key issue:type:date:dimension:level:code
*
* 带用户id的:issue:type:date:dimension:userid key为id:code (当前的受理数没有单人的统计)
* 单人dimension的值为2
*
* @date 2020/9/14 16:37
**/
public class IssueAcceptFlinkHandlerByCustomRedisSink {
private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptFlinkHandlerByCustomRedisSink.class);
private static final Integer OPERATTYPE_1 = 61;
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
PropertiesUtils instance = PropertiesUtils.getInstance();
//重启策略之固定间隔 (Fixed delay)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
instance.getFlinkFixedDelayRestartTimes(),
Time.of(instance.getFlinkFixedDelayRestartInterval(), TimeUnit.MINUTES)));
//设置间隔多长时间产生checkpoint
env.enableCheckpointing(instance.getFlinkCheckpointsInterval());
//设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(instance.getFlinkMinPauseBetweenCheckpoints());
//检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(instance.getFlinkCheckpointTimeout());
//同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(instance.getFlinkMaxConcurrentCheckpoints());
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, instance.getRocketmqNameServer());
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, IssueConstants.ROCKETMQ_ACCEPT_CONSUMER_GROUP_1);
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, IssueConstants.ROCKETMQ_ACCEPT_TAG);
String topic = instance.getRocketMqTopics();
if (StringUtils.isBlank(topic)) {
return;
}
String[] topics = topic.split(IssueConstants.TOPIC_SPLITTER);
//合并所有source后的Source
DataStream<Map> finalDataStreamSource = null;
for(String topicItem : topics) {
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topicItem.trim());
DataStream<Map> dataStreamSource = env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "issueInfo"), consumerProps));
finalDataStreamSource = (null == finalDataStreamSource) ? dataStreamSource : finalDataStreamSource.union(dataStreamSource);
}
if (null == finalDataStreamSource) {
return;
}
SingleOutputStreamOperator<Tuple4<String,Integer,String,String>> mainDataStream = finalDataStreamSource
.process(new ProcessFunction<Map, Tuple4<String,Integer,String,String>>() {
@Override
public void processElement(Map in, Context ctx, Collector<Tuple4<String,Integer,String,String>> out) throws Exception {
String issueInfo = in.get("issueInfo").toString();
LOG.info(issueInfo);
try {
IssueSyncMessageBodyVO issueMsgVO = JSON.parseObject(issueInfo,IssueSyncMessageBodyVO.class);
//当天日期
String dayTime = DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE);
//处理key
String handlerOrgCode = null;
Integer operatType = null;
String tenantId = null;
if (null != issueMsgVO && null != issueMsgVO.getBody()) {
// 处理方组织code
handlerOrgCode = issueMsgVO.getBody().getHandleOrgCode();
operatType = issueMsgVO.getOperatType();
tenantId = issueMsgVO.getBody().getTenantId();
}
if (null != operatType && 0 == operatType.compareTo(IssueAcceptFlinkHandlerByCustomRedisSink.OPERATTYPE_1)) {
//generateKeyAndData(handlerOrgCode,operatType,dayTime,out);
Tuple4<String,Integer,String,String> outVal = new Tuple4<>(handlerOrgCode,operatType,dayTime,tenantId);
out.collect(outVal);
}
} catch (Exception e) {
LOG.error("消费事件信息出错,错误的事件是:{},错误信息:{}",issueInfo, e);
}
}
}).name("issueAccept-mq-source")
.uid("issueAccept")
.setParallelism(instance.getRockeqMqSourceParallelism());
//创建redis的配置
mainDataStream.setParallelism(instance.getRedisSinkParallelism());
mainDataStream.addSink(new IssueAcceptRedisSink())
.name("IssueAcceptRedisSink")
.uid("IssueAcceptRedisSink");
try {
String jobName = null;
if (args.length == 0) {
jobName = IssueAcceptFlinkHandlerByCustomRedisSink.class.getSimpleName();
} else {
jobName = args[0];
}
env.execute(jobName);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
1.32.Flink其它案例
1.32.1.使用DataGen生成数据
package com.toto.demo.test;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
public class DataGeneratorSourceDemo {
public static void main(String[] args) {
test1();
}
private static void test1() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String[] fieldNames = new String[] { "id", "state", "score" };
DataGenerator<?>[] fieldGenerators = new DataGenerator[] { //
RandomGenerator.intGenerator(0, 100), //
RandomGenerator.booleanGenerator(), //
RandomGenerator.intGenerator(0, 100) //
};
//第一列是DataGenerator对应的数组,第二列字段名称
RowDataGenerator rowDataGenerator = new RowDataGenerator(fieldGenerators, fieldNames);
DataStreamSource<RowData> source =
//DataGeneratorSource中的第一个参数是RowDataGenerator,第二个参数是间隔时间,第三个参数是数据条数
env.addSource(new DataGeneratorSource<>(rowDataGenerator, 10, 20L), TypeInformation.of(RowData.class))
.setParallelism(1);
source.print().setParallelism(2);
try {
env.execute();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
输出结果:
1> +I(73,true,43)
2> +I(35,true,99)
1> +I(44,false,19)
2> +I(47,true,8)
1> +I(93,false,37)
2> +I(38,false,79)
1> +I(40,false,16)
2> +I(70,false,57)
1> +I(78,false,50)
2> +I(57,false,71)
1> +I(58,false,56)
2> +I(78,true,68)
1> +I(78,true,67)
2> +I(51,true,3)
1> +I(22,false,89)
2> +I(83,false,0)
1> +I(42,false,32)
2> +I(74,false,18)
1> +I(99,true,73)
2> +I(84,false,89)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
1.32.2.使用value state进行存储临时数据
package com.toto.demo.test;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class StateDemo {
private static final List<Integer> data = new ArrayList<>(Arrays.asList(1, 2, 3));
public static void main(String[] args) {
state();
}
public static void state() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(data, TypeInformation.of(Integer.class)).keyBy(v -> v % 2)
.process(new KeyedProcessFunction<Integer, Integer, Integer>() {
private ValueState<Integer> sumState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> vsdesc = new ValueStateDescriptor<>("sum", Integer.class);
sumState = getRuntimeContext().getState(vsdesc);
}
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
int sum = sumState.value() == null ? 0 : sumState.value();
System.out.println("oldSum:\t" + sum);
System.out.println("value:\t" + value);
sum += value;
sumState.update(sum);
out.collect(sum);
}
}).print().setParallelism(2);
try {
System.out.println(env.getExecutionPlan());
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54