【易运维】GaussDB(DWS)审计日志转储实践

GaussDB(DWS)存在丰富的审计日志记录信息,通常客户现场存在审计需求时,往往需要记录长历史全部数据库访问信息;默认情况下,数据库为保护自身存储空间,audit_space_limit默认控制保留1GB日志空间,远远无法满足客户对审计的分析、挖掘需求;故促使我们需要实践审计日志转储。

 

1 实施设计

  • 打开相关审计日志开关;

开启审计audit_enabled = on; (默认打开)

除了默认开启的审计内容,其它可参照打开,

打开用户访问越权:audit_user_violation 0 -> 1;

数据库对象操作:audit_system_object 12295 -> 524287;

DMLselect操作:audit_dml_state 0 -> 1;

Select操作:audit_dml_state_select 0 -> 1;

函数操作:audit_function_exec 0 -> 1;

COPY操作: audit_copy_exec 0 -> 1;

  • 创建转储数据库用户表

固化系统级schema,用于存储审计日志,如pg_audit;

转储用户表命名: pg_audit.audit_dtl;

同时设计成列存合并文件表(colversion=2.0);

  • 设计转储审计表分区定义

保留最近7天每天一个分区,用户可以快速查询、挖掘近期审计;

此外,历史每月一个分区;

分区命名格式:<表名>_<YYYYMMDD> <表名>_<YYYYMM>

  • 每隔2小时,转储10分钟前审计日志,并删除转储走的审计日志;

设置2小时,目的是防止超2小时,审计日志超过默认系统定义的1GB,导致审计日志遗漏;

只转储10分钟前的日志,避免10分钟内审计日志还在异步变动的,导致转储过程中遗漏;

 

2 实践代码

2.1 审计转储用户表

用户表:

Create table if not exists pg_audit.audit_dtl
(
Audit_date date,
Audit_time timestamp,
Type varchar(100),
Result varchar(100),
Usename varchar(100),
Database varchar(100),
Client_conninfo varchar(500),
Object_name varchar(100),
Detail_info text,
Node_name varchar(100),
Thread_id varchar(500),
Pid bigint,
Query_id bigint,
Local_port varchar(100),
Remote_port varchar(100)
)
With ( orientation=column, compression=Middle, colversion=2.0)
Distribute by hash(pid)
Partition by range ( audit_date )
(
Partition audit_dtl_202103 values less than (‘2021-03-18’::date),
Partition audit_dtl_20210318 values less than (‘2021-03-19’::date),
Partition audit_dtl_20210319 values less than (‘2021-03-20’::date),
Partition audit_dtl_20210320 values less than (‘2021-03-21’::date),
Partition audit_dtl_20210321 values less than (‘2021-03-22’::date),
Partition audit_dtl_20210322 values less than (‘2021-03-23’::date),
Partition audit_dtl_20210323 values less than (‘2021-03-24’::date),
Partition audit_dtl_20210324 values less than (‘2021-03-25’::date)
)
Enable row movement;

转储控制表:

Create table if not exists pg_audit.audit_dtl_ctl
(
Ctl_typ varchar(20),
Ctl_val varchar(500)
) With ( orientation=column, compression=Middle, colversion=2.0)
Distribute by hash(Ctl_typ);
插入初始值:
Insert into pg_audit.audit_dtl_ctl values( ‘last_partition’,’1900-01-01’);
Insert into pg_audit.audit_dtl_ctl values( ‘last_status’,’1900-01-01 00:00:01’);

删除已转储审计日志:(系统只提供了单CN审计日志清理,封装一层所有CN日志清理函数)

Create or replace function public.pgxc_delete_audit(starttime timestamp with time zone,endtime timestamp with time zone)
Returns Boolean
Language plpgsql
Not fenced not shippable
As $$
Declare 
  row_name record;
  query_str text;
  query_str_nodes text;
  Begin
query_str_nodes := ‘select node_name from pgxc_node where node_type=’’c’’’;
For row_name in execute (query_str_nodes) loop
  query_str := ‘execute direct on (‘||row_name.node_name||’) ‘’select pg_delete_audit(‘’’’’||start_time||’’’’’,’’’’’||end_time||’’’’’)’’’;
  execute query_str;
end loop;
return true;
  end; $$

2.2 Gsql代码

\timing on 

/* 程序参数提取 */
Select 7 as partition_interval /* 保留最近7天每日分区 */,
10 as tm_interval /* 每次提取10分钟前审计日志 */,
Case when ctl_val::date=current_date then 0 
when extract(month from current_date- partition_interval+1) != extract(month from current_date- partition_interval) then 2
else 1
end as is_create_prt /* 是否处理 */,
to_char(current_date- partition_interval+1,’YYYYMMDD’) as p1,
to_char(current_date- partition_interval+1,’YYYYMM’) as p2,
to_char(current_date +1,’YYYYMMDD’) as p3,
to_char(current_date +2,’YYYY-MM-DD’) as p4,
‘\copy’ as cp,
‘’’’ as quete
From pg_audit.audit_dtl_ctl
Where ctl_typ=’last_partition’;

\if ${ERROR}
	\goto WITHERROR
\endif

\if ${ is_create_prt} == 1
	\goto merge_part
\elif ${ is_create_prt} == 2
	\goto create_month
\else
	\goto insert_audit
\endif

/* 跨月分区合并 */
\label create_month
Alter table pg_audit.audit_dtl rename partition audit_dtl_${p1} to audit_dtl_${p2};
\goto create_cur

/* 合并最旧一日分区到月份中 */
\label merge_part
Alter table pg_audit.audit_dtl rename partition audit_dtl_${p2} to audit_dtl_${p2}_bak;
\if ${ERROR}
	\goto WITHERROR
\endif
Alter table pg_audit.audit_dtl merge partition audit_dtl_${p2}_bak, audit_dtl_${p1}  into partition audit_dtl_${p2};
\if ${ERROR}
	\goto WITHERROR
\endif

/* 新增当日分区 */
\label create_cur
Alter table pg_audit.audit_dtl addpartition audit_dtl_${p3} values less than (‘${p4}’::date);
\if ${ERROR}
	\goto WITHERROR
\endif

Update pg_audit.audit_dtl_ctl
Set ctl_val=to_char(current_date,’YYYY-MM-DD’)
Where ctl_typ=’last_partition’;
\if ${ERROR}
	\goto WITHERROR
\endif

/* 审计日志提取、转储,采用copy性能更佳 */
\label insert_audit
Select ctl_val as lst_tm,to_char(current_timestamp-interval ‘${tm_inerval}’ minute,’YYYY-MM-DD HH24:MI:SS’ as new_tm from pg_audit.audit_dtl_ctl where ctl_typ=’last_status’;

/* 元命令,清理临时数据 */
\! rm -f /tmp/.tmp_audit_dtl

/* 输出审计日志 */
\o /tmp/.tmp_audit_dtl.sql
\qecho ${cp} (select time::date,time,type,result,username,database,client_conninfo,object_name,detail_info,node_name,thread_id,case when thread_id=${quote}null${quote} then null else split_part(thread_id, ${quote}@${quote},1) end as pid, case when thread_id=${quote}null${quote} then null else split_part(thread_id, ${quote}@${quote},2) end as query_id,local_port,remote_port from pgxc_query_audit(${quote}${lst_tm}${quote}::timestamp, ${quote}${new_tm}${quote}::timestamp) ) to ${quote}/tmp/.tmp_audit_dtl${quote} with (delimiter ${quote}<@|@>${quote},format ${quote}csv${quote},encoding ${quote}UTF8${quote});
\o
\i /tmp/.tmp_audit_dtl.sql
\if ${ERROR}
	\goto WITHERROR
\endif

\copy pg_audit.audit_dtl from ‘/tmp/.tmp_audti_dtl’ with (delimiter ‘<@|@>’,format ‘csv’,encoding ‘UTF8’);
\if ${ERROR}
	\goto WITHERROR
\endif

Update pg_audit.audit_dtl_ctl
Set ctl_val=’${new_tm}’
Where ctl_typ=’last_status’;
\if ${ERROR}
	\goto WITHERROR
\endif

/* 删除转储的原审计日志 */
Select public.pgxc_delete_audit(‘${lst_tm}’::timestamp,’${new_tm}’::timestamp);
\if ${ERROR}
	\goto WITHERROR
\endif

\goto FINISH

\label WITHERROR
\! Rm -f /tmp/.tmp_audit_dtl
\q 1

\label FINISH
\! Rm -f /tmp/.tmp_audit_dtl
\q 0

 

2.3 循环执行

添加到crontab中每隔2小时执行


 

(完)