One of tasks that a DBA has to do in the most of the companies is to answer the common question of "WHO CHANGED a TRSANACTION???". SQL Server 2008 R2 has new feature named CHANGE DATA CAPTURE (CDC).
This feature is only for track of changes that happening to a table, but it is NOT reporting the user who made the changes. So, in our company we thought about a workaround that will use all the features of CDC and will give us the user who made the changes.
To accomplish this goal we are adding two fields in each table or any table that you would like to enable CDC on them. Those fields are "LastUpdatedBy" and "LastUpdatedDate". LastUpdatedBy will be user name and the other will track the date and time when a row has been changed. It could be an Insert, Update, or Delete transaction.
In the first step, we will enable the CDC feature by using "sys.sp_cdc_enable_db". In the next step, I am using a cursor to enable CDC on all my tables. You can enable all tables and disable any CDC that does not need them by sp_cdc_disable_table. While I am enabling CDC on each table, I will add default to those columns as well.
After you enable CDC on any table, you can find them under system tables. The naming on each table is as follow: cdc.[Schema Name]_[Table Name]_CT. So If you have table that it is under [dbo] schema and table name is [EmployeeInformation] you will see something like "cdc.dbo_EmployeeInformation_CT" under system tables. CDC will create same table structure for each table and will add 5 columns in front of the other columns.
EXEC
sys.sp_cdc_enable_db
Go
Create Default LastUpdatedBy_Def As SUser_Name()
Create Default LastUpdatedDate_Def As GetDate()
Go
Declare @tblObjectId Int, @tblName Varchar(128), @SchemaId Int, @SchemaName Varchar(128)
Declare @strSQL Varchar(2000)
Declare mySysCursor Cursor For
Select
SysTbls.OBJECT_ID As tblObjectId
,(SysTbls.name) As tblName
,SysTbls.schema_id As SchemaId
,(SysSchem.name) As SchemaNameFrom
sys.tables As SysTbls
Inner Join Sys.schemas As SysSchem On SysTbls.schema_id = SysSchem.schema_idWhere
SysTbls.type = 'U'
And SysTbls.SCHEMA_ID <> 12
And SysTbls.name Not In ('sysdiagrams',
'systranschemas')Order By
SysTbls.schema_id
,SysTbls.OBJECT_ID
Open mySysCursor
Fetch Next From mySysCursor
Into @tblObjectId, @tblName, @SchemaId, @SchemaName
While
@@FETCH_STATUS = 0Begin
Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName
Set @strSQL = @strSQL + ' Add LastUpdatedBy Int Null'
Exec (@strSQL)
Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName
Set @strSQL = @strSQL + ' Add LastUpdatedDate DateTime Null'
Exec (@strSQL)
Set @strSQL = 'Update ' + @SchemaName + '.' + @tblName
Set @strSQL = @strSQL + ' Set '
Set @strSQL = @strSQL +
' , LastUpdatedBy = (Case When IsNull(LastUpdatedBy, '-') = '-' Then '-' Else LastUpdatedBy End)'Set @strSQL = @strSQL +
' , LastUpdatedDate = (Case When IsNull(LastUpdatedDate, ''01/01/1900'') = ''01/01/1900'' Then ''01/01/1900'' Else LastUpdatedDate End)'Set @strSQL = @strSQL +
' Where LastUpdatedBy Is Null Or LastUpdatedDate Is Null 'Exec (@strSQL)
Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName
Set @strSQL = @strSQL + ' Alter Column LastUpdatedBy Int Not Null'
Exec (@strSQL)
Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName
Set @strSQL = @strSQL + ' Alter Column LastUpdatedDate DateTime Not Null'
Exec (@strSQL)
Set @strSQL = 'EXEC sp_bindefault ''LastUpdatedBy_Def'', ''' + @SchemaName + '.' + @tblName + '.LastUpdatedBy'''
Exec (@strSQL)
Set @strSQL = 'EXEC sp_bindefault ''LastUpdatedDate_Def'', ''' + @SchemaName + '.' + @tblName + '.LastUpdatedDate'''
Exec (@strSQL)
Set @strSQL = 'EXECUTE sys.sp_cdc_enable_table '
Set @strSQL = @strSQL + ' @source_schema = N''' + @SchemaName + ''''
Set @strSQL = @strSQL + ' ,@source_name = N'''
+ @tblName +
''''Set @strSQL = @strSQL +
' ,@role_name = N''cdc_Admin''';Exec (@strSQL)
Fetch Next From mySysCursor
Into @tblObjectId, @tblName, @SchemaId, @SchemaName
End
Close mySysCursor
DeAllocate mySysCursor
GO
In the next step, I will create and Audit table that will hold all the CDC tables extracted information in a more user friendly way. I will extract any changes from CDC tables and store them in my Audit table. Advantage of storing CDC information in an Audit table is that, I can store this information in a different server or database and free my CDC space from current database.
CREATE TABLE [dbo].[Audit]( |
[AuditID] [int] IDENTITY(1,1) NOT NULL, |
[AuditDate] [datetime] NOT NULL, |
[AuditType] [char](1) NOT NULL, |
[SchemaName] [varchar](50) NOT NULL, |
[TableName] [varchar](128) NOT NULL, |
[PrimaryKeyFields] [varchar](512) NOT NULL, |
[PrimaryKeyValues] [varchar](max) NOT NULL, |
[FieldName] [varchar](128) NOT NULL, |
[OldValue] [varchar](max) NULL, |
[NewValue] [varchar](max) NULL, |
[UserName] [varchar](255) NOT NULL |
CONSTRAINT [PK_Audit] PRIMARY KEY CLUSTERED ([AuditID] ASC ) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, |
ON [PRIMARY] |
GO
In the last step, I will collect all the information from CDC tables and populate those in my Audit table. CDC for each Insert and Delete is creating a single row, but for Update is creating two rows. The assign code for Delete is "1" and Insert is "2". In Update case as I mention there are two rows, one of them keeps Old Values (assign code is "3") and the other row is New Values with assign code of "4".
Every time this procedure is running it will catch all the changes that happened during the last 24 hours and store them in my Audit table. So, my job will not get millions of records each time is running and so not long time running job.
Declare @SchemaId Int, @SchemaName Varchar(128), @TableId Int, @TableName Varchar(128), @ColumnId Int, @ColumnName Varchar(128) |
Declare @strSQL Varchar(4000), @CDCTableLoopNo SmallInt, @CountCDCTables SmallInt |
Declare @CDCTableName Varchar(128), @CDCSchemaName Varchar(128), @CDCCaptureInstance Varchar(128) |
Declare @PrimaryKeyFields Varchar(Max) |
|
Set @CDCTableLoopNo = 0 |
|
Declare @CDCTables Table (CDCObjectId Int, TableObjectId Int, Capture_Instance Varchar(128), Role_Name Varchar(128), Index_Name Varchar(128), TableName Varchar(128), SchemaId Int, SchemaName Varchar(128), RN SmallInt) |
Insert Into @CDCTables |
Select |
CDC_CT.Object_Id As CDCObjectId |
,CDC_CT.Source_Object_Id As TableObjectId |
,QuoteName(CDC_CT.Capture_Instance +'_CT') As Capture_Instance |
,CDC_CT.Role_Name |
,CDC_CT.Index_Name |
,QuoteName(SysTbl.name) As TableName |
,SysSchem.schema_id |
,QuoteName(SysSchem.name) As SchemaName |
,ROW_NUMBER() Over (Order By CDC_CT.Object_Id) As RN |
From |
CDC.change_tables CDC_CT |
Inner Join Sys.tables As SysTbl On CDC_CT.Source_Object_Id = SysTbl.object_id |
Inner Join Sys.schemas As SysSchem On SysTbl.schema_id = SysSchem.schema_id |
|
Set @CountCDCTables = @@ROWCOUNT |
|
Declare @IndexColumns Table (TableObjectId Int, TableName Varchar(128), SchemaId Int, SchemaName Varchar(128), IndexId Int, ColumnId Int, ColumnName Varchar(128)) |
Insert Into @IndexColumns |
Select |
sysTbls.object_id As TableObjectId |
,QuoteName(sysTbls.name) As TableName |
,SysSchem.schema_id As SchemaId |
,QuoteName(SysSchem.name) As SchemaName |
,SysIndex.index_id |
,SysIndexCols.column_id |
,QuoteName(SysCols.name) As ColumnName |
From |
sys.tables As sysTbls |
Inner Join sys.schemas As SysSchem On sysTbls.schema_id = SysSchem.schema_id |
Inner Join sys.indexes As SysIndex On sysTbls.object_id = SysIndex.object_id |
Inner Join sys.index_columns As SysIndexCols On SysIndex.object_id = SysIndexCols.object_id And SysIndex.index_id = SysIndexCols.index_id |
Inner Join sys.columns As SysCols On SysIndexCols.column_id = SysCols.column_id And SysIndexCols.object_id = SysCols.object_id |
Where |
SysIndex.type = 1 |
And SysSchem.schema_id <> 12 |
Order By |
sysTbls.object_id |
|
|
While @CDCTableLoopNo < @CountCDCTables |
Begin |
|
Select @CDCTableName = '', @CDCSchemaName = '', @CDCCaptureInstance = '', @PrimaryKeyFields = '' |
|
|
Select |
@PrimaryKeyFields = STUFF((Select ',' + SubQry.ColumnName From @IndexColumns As SubQry Where SubQry.TableObjectId = MainQry.TableObjectId And SubQry.SchemaId = MainQry.SchemaId For XML Path('')), 1, 1, '') |
From |
@IndexColumns As MainQry |
Where |
MainQry.SchemaName = @CDCSchemaName |
And MainQry.TableName = @CDCTableName |
Group By |
MainQry.SchemaId |
,MainQry.TableObjectId |
,MainQry.TableName |
|
If @PrimaryKeyFields = '' |
Begin |
Set @PrimaryKeyFields = '''-''' |
End |
|
Declare mySysCursor Cursor For |
Select |
SysSchem.schema_id As SchemaId |
,QuoteName(SysSchem.name) As SchemaName |
,SysTbls.object_id As TableId |
,QuoteName(SysTbls.name) As TableName |
,SysCols.column_id As ColumnId |
,QuoteName(SysCols.name) As ColumnName |
From |
sys.schemas As SysSchem |
Inner Join sys.tables As SysTbls On SysSchem.schema_id = SysTbls.schema_id |
Inner Join Sys.columns As SysCols On SysTbls.object_id = SysCols.object_id |
Where |
QuoteName(SysTbls.name) = @CDCTableName |
And QuoteName(SysSchem.name) = @CDCSchemaName |
And SysCols.name Not In ('LastUpdatedBy', 'LastUpdatedDate') |
|
Open mySysCursor; |
Fetch Next From mySysCursor |
Into @SchemaId, @SchemaName, @TableId, @TableName, @ColumnId, @ColumnName |
|
While @@FETCH_STATUS = 0 |
Begin |
Set @strSQL = ' |
Insert Into Audit (AuditDate, AuditType, SchemaName, TableName, PrimaryKeyFields, PrimaryKeyValues, FieldName, OldValue, NewValue, UserName) |
Select |
Convert(DateTime, TimeMapping.tran_end_time, 101) As AuditDate |
,(Case MAX(CDC_Tbl.__$operation) When 1 Then ' +'''D''' + ' When 2 Then ' + '''I''' + ' When 3 Then ' + '''U''' + ' When 4 Then ' + '''U''' + ' End) As AuditType |
,('''+ @SchemaName + ''') As SchemaName |
,(''' + @TableName + ''') As TableName |
' |
If @PrimaryKeyFields = '''-''' |
Begin |
Set @strSQL = @strSQL + ' ,''-'' As PrimaryKeyFields' |
Set @strSQL = @strSQL + ' ,''-'' As PrimaryKeyValues' |
End |
Else |
Begin |
Set @strSQL = @strSQL + ' ,''' + @PrimaryKeyFields + ''' As PrimaryKeyFields' |
Set @strSQL = @strSQL + ',(Select Stuff((Select Distinct '','' + Convert(Varchar(Max), SubQry.' + @PrimaryKeyFields + ') From cdc.' + @CDCCaptureInstance + ' As SubQry Where SubQry.__$start_lsn = CDC_Tbl.__$start_lsn And SubQry.__$seqval = DC_Tbl.__$seqval For XML Path('''')), 1, 1, '''')) As PrimaryKeyValues ' |
End |
Set @strSQL = @strSQL + |
' |
,('''+ @ColumnName + ''') As FieldName |
,Max(Case CDC_Tbl.__$operation When 2 Then '''' When 3 Then Convert(Varchar(Max), '+ @ColumnName + ') When 4 Then '''' When 1 Then Convert(Varchar(Max), ' + @ColumnName + ') End) As OldValue |
,Max(Case CDC_Tbl.__$operation When 2 Then Convert(Varchar(Max), '+ @ColumnName +') When 3 Then '''' When 4 Then Convert(Varchar(Max), '+ @ColumnName + ') When 1 Then '''' End) As NewValue |
,Max(Case CDC_Tbl.__$operation When 2 Then LastUpdatedBy When 3 Then ''-'' When 4 Then LastUpdatedBy When 1 Then LastUpdatedBy End) As UserName |
From |
cdc.'+ @CDCCaptureInstance + ' As CDC_Tbl |
Left Outer Join cdc.lsn_time_mapping As TimeMapping On CDC_Tbl.__$start_lsn = TimeMapping.start_lsn |
Where |
TimeMapping.tran_end_time Between DATEADD(Day, -1, GetDate()) And DATEADD(Day, 0, GetDate()) |
Group By |
Convert(DateTime, TimeMapping.tran_end_time, 101) |
,CDC_Tbl.__$start_lsn |
,CDC_Tbl.__$seqval |
' |
If @PrimaryKeyFields <> '''-''' |
Begin |
Set @strSQL = @strSQL + ',' + @PrimaryKeyFields + '' |
End |
Set @strSQL = @strSQL + |
Having |
Max(Case CDC_Tbl.__$operation When 2 Then '''' When 3 Then Convert(Varchar(Max), '+ @ColumnName + ') When 4 Then '''' When 1 Then Convert(Varchar(Max), '+ @ColumnName +') End) |
<> Max(Case CDC_Tbl.__$operation When 2 Then Convert(Varchar(Max), '+ @ColumnName +') When 3 Then '''' When 4 Then Convert(Varchar(Max), '+ @ColumnName +') When 1 Then '''' End) |
Order By |
__$start_lsn |
' |
Exec (@strSQL) |
|
Fetch Next From mySysCursor |
Into @SchemaId, @SchemaName, @TableId, @TableName, @ColumnId, @ColumnName |
End |
|
Close mySysCursor |
DeAllocate mySysCursor |
|
Set @CDCTableLoopNo = @CDCTableLoopNo + 1 |
End |
|
No comments:
Post a Comment