【数据湖探索】FLINK 自定义jar作业配置checkppoint功能

一、Flink 自定义作业配置checkppoint功能

在Flink 自定义作业 Jar包代码中加入如下代码:  
如下代码含义是以EXACTLY_ONCE模式,每隔40s 保存checkpoint 到obs的${bucket}桶中的jobs/checkpoint/my_jar路径。    
其中最主要是的保存checkpoint路径,一般是将checkpoint存入obs桶中,路径格式如下:    
路径格式:obs://${dataUserAk}:${dataUserSk}@${endpoint}/${bucket}/xxx/xxx/xxx  
示例:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3  

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointInterval(40000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend("obs://${userAk}:${userSk}@obs.cn-north-7.ulanqab.huawei.com:443/${bucket}/jobs/checkpoint/my_jar"), false);
        rocksDbBackend.setOptions(new OptionsFactory() {
            @Override
            public DBOptions createDBOptions(DBOptions currentOptions) {
                return currentOptions
                        .setMaxLogFileSize(64 * 1024 * 1024)
                        .setKeepLogFileNum(3);
            }

            @Override
            public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
                return currentOptions;
            }
        });
        env.setStateBackend(rocksDbBackend);


在DLI Flink Jar作业中配置【从checkpoint恢复】功能

控制台操作:

在自定义Jar作业【编辑页面】,先勾选【异常自动重启】,再勾选【从checkpoint恢复】,再填写【Checkpoint路径】。  
Checkpoint路径与用户在Jar包中设置的checkpoint路径相对应,格式如下:  
【Checkpoint路径】格式为:${bucket}/xxx/xxx/xxx  
示例:      
若Jar包中代码配置为:obs://xxxxxxx:xxxxxxxxxx@obs.cn-north-7.ulanqab.huawei.com:443/mybucket/jobs/checkpoint/jar-3   
则【Checkpoint路径】填写为: mybucket/jobs/checkpoint/jar-3

注意点:  
1、每个Flink Jar作业配置的Checkpoint路径不要一致,不然无法从准确的checkpoint路径恢复。  
2、checkpoint路径中的obs桶需要给DLI授权,DLI服务才能访问此桶下的文件  


二、查看作业是否从checkpoint恢复

(完)