一、Flink 自定义作业配置checkppoint功能
在Flink 自定义作业 Jar包代码中加入如下代码:
如下代码含义是以EXACTLY_ONCE模式,每隔40s 保存checkpoint 到obs的${bucket}桶中的jobs/checkpoint/my_jar路径。
其中最主要是的保存checkpoint路径,一般是将checkpoint存入obs桶中,路径格式如下:
路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx
示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(40000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false); rocksDbBackend.setOptions(new OptionsFactory() { @Override public DBOptions createDBOptions(DBOptions currentOptions) { return currentOptions .setMaxLogFileSize(64 * 1024 * 1024) .setKeepLogFileNum(3); } @Override public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { return currentOptions; } }); env.setStateBackend(rocksDbBackend);
在DLI Flink Jar作业中配置【从checkpoint恢复】功能
控制台操作:
在自定义Jar作业【编辑页面】,先勾选【异常自动重启】,再勾选【从checkpoint恢复】,再填写【Checkpoint路径】。
Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:
【Checkpoint路径】格式为:${bucket}/xxx/xxx/xxx
示例:
若Jar包中代码配置为:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3
则【Checkpoint路径】填写为: mybucket/jobs/checkpoint/jar-3
注意点:
1、每个Flink Jar作业配置的Checkpoint路径不要一致,不然无法从准确的checkpoint路径恢复。
2、checkpoint路径中的obs桶需要给DLI授权,DLI服务才能访问此桶下的文件
二、查看作业是否从checkpoint恢复