Monday, April 4, 2011

Change Data Capture and Track changes based on User

    One of tasks that a DBA has to do in the most of the companies is to answer the common question of "WHO CHANGED a TRSANACTION???". SQL Server 2008 R2 has new feature named CHANGE DATA CAPTURE (CDC).

This feature is only for track of changes that happening to a table, but it is NOT reporting the user who made the changes. So, in our company we thought about a workaround that will use all the features of CDC and will give us the user who made the changes.

 To accomplish this goal we are adding two fields in each table or any table that you would like to enable CDC on them. Those fields are "LastUpdatedBy" and "LastUpdatedDate". LastUpdatedBy will be user name and the other will track the date and time when a row has been changed. It could be an Insert, Update, or Delete transaction.

    In the first step, we will enable the CDC feature by using "sys.sp_cdc_enable_db". In the next step, I am using a cursor to enable CDC on all my tables. You can enable all tables and disable any CDC that does not need them by sp_cdc_disable_table. While I am enabling CDC on each table, I will add default to those columns as well.

    After you enable CDC on any table, you can find them under system tables. The naming on each table is as follow: cdc.[Schema Name]_[Table Name]_CT. So If you have table that it is under [dbo] schema and table name is [EmployeeInformation] you will see something like "cdc.dbo_EmployeeInformation_CT" under system tables. CDC will create same table structure for each table and will add 5 columns in front of the other columns.

      


  
 

 

EXEC

sys.sp_cdc_enable_db

Go

 

Create Default LastUpdatedBy_Def As SUser_Name()

Create Default LastUpdatedDate_Def As GetDate()

 

Go

 

Declare @tblObjectId Int, @tblName Varchar(128), @SchemaId Int, @SchemaName Varchar(128)

Declare @strSQL Varchar(2000)

 

Declare mySysCursor Cursor For

Select

SysTbls.OBJECT_ID As tblObjectId


,(SysTbls.name) As tblName


,SysTbls.schema_id As SchemaId


,(SysSchem.name) As SchemaName

From


sys.tables As SysTbls


Inner Join Sys.schemas As SysSchem On SysTbls.schema_id = SysSchem.schema_id

Where

SysTbls.type = 'U'


And SysTbls.SCHEMA_ID <> 12


And SysTbls.name Not In ('sysdiagrams',
'systranschemas')

Order By

SysTbls.schema_id


,SysTbls.OBJECT_ID


 

Open mySysCursor

Fetch Next From mySysCursor

Into @tblObjectId, @tblName, @SchemaId, @SchemaName

 


 

While
@@FETCH_STATUS = 0

Begin


 

    Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName

    Set @strSQL = @strSQL + ' Add LastUpdatedBy Int Null'

    Exec (@strSQL)

 

    Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName

    Set @strSQL = @strSQL + ' Add LastUpdatedDate DateTime Null'

    Exec (@strSQL)

 

    Set @strSQL = 'Update ' + @SchemaName + '.' + @tblName

    Set @strSQL = @strSQL + ' Set '

    Set @strSQL = @strSQL +
' , LastUpdatedBy = (Case When IsNull(LastUpdatedBy, '-') = '-' Then '-' Else LastUpdatedBy End)'

    Set @strSQL = @strSQL +
' , LastUpdatedDate = (Case When IsNull(LastUpdatedDate, ''01/01/1900'') = ''01/01/1900'' Then ''01/01/1900'' Else LastUpdatedDate End)'

    Set @strSQL = @strSQL +
' Where LastUpdatedBy Is Null Or LastUpdatedDate Is Null '

    Exec (@strSQL)

 
 

    Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName

    Set @strSQL = @strSQL + ' Alter Column LastUpdatedBy Int Not Null'

    Exec (@strSQL)

 

    Set @strSQL = 'Alter Table ' + @SchemaName + '.' + @tblName

    Set @strSQL = @strSQL + ' Alter Column LastUpdatedDate DateTime Not Null'

    Exec (@strSQL)

 
 

    Set @strSQL = 'EXEC sp_bindefault ''LastUpdatedBy_Def'', ''' + @SchemaName + '.' + @tblName + '.LastUpdatedBy'''

    Exec (@strSQL)

 
 

    Set @strSQL = 'EXEC sp_bindefault ''LastUpdatedDate_Def'', ''' + @SchemaName + '.' + @tblName + '.LastUpdatedDate'''

    Exec (@strSQL)

 

    Set @strSQL = 'EXECUTE sys.sp_cdc_enable_table '

    Set @strSQL = @strSQL + ' @source_schema = N''' + @SchemaName + ''''

    Set @strSQL = @strSQL + ' ,@source_name = N'''
+ @tblName +
''''

    Set @strSQL = @strSQL +
' ,@role_name = N''cdc_Admin''';

    Exec (@strSQL)

 

    Fetch Next From mySysCursor

    Into @tblObjectId, @tblName, @SchemaId, @SchemaName


 

End

 


 

Close mySysCursor

DeAllocate mySysCursor

  

 
 

  
 

GO

    In the next step, I will create and Audit table that will hold all the CDC tables extracted information in a more user friendly way. I will extract any changes from CDC tables and store them in my Audit table. Advantage of storing CDC information in an Audit table is that, I can store this information in a different server or database and free my CDC space from current database.

 
 

 CREATE TABLE [dbo].[Audit](

    [AuditID] [int] IDENTITY(1,1) NOT NULL,

    [AuditDate] [datetime] NOT NULL,

    [AuditType] [char](1) NOT NULL,

    [SchemaName] [varchar](50) NOT NULL,

    [TableName] [varchar](128) NOT NULL,

    [PrimaryKeyFields] [varchar](512) NOT NULL,

    [PrimaryKeyValues] [varchar](max) NOT NULL,

    [FieldName] [varchar](128) NOT NULL,

    [OldValue] [varchar](max) NULL,

    [NewValue] [varchar](max) NULL,

    [UserName] [varchar](255) NOT NULL

CONSTRAINT [PK_Audit] PRIMARY KEY CLUSTERED ([AuditID] ASC ) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON)ON [PRIMARY] )

ON [PRIMARY]

 
 

GO

    In the last step, I will collect all the information from CDC tables and populate those in my Audit table. CDC for each Insert and Delete is creating a single row, but for Update is creating two rows. The assign code for Delete is "1" and Insert is "2". In Update case as I mention there are two rows, one of them keeps Old Values (assign code is "3") and the other row is New Values with assign code of "4".

Every time this procedure is running it will catch all the changes that happened during the last 24 hours and store them in my Audit table. So, my job will not get millions of records each time is running and so not long time running job.

  
 

 
 

 

Declare @SchemaId Int, @SchemaName Varchar(128), @TableId Int, @TableName Varchar(128), @ColumnId Int, @ColumnName Varchar(128)

Declare @strSQL Varchar(4000), @CDCTableLoopNo SmallInt, @CountCDCTables SmallInt

Declare @CDCTableName Varchar(128), @CDCSchemaName Varchar(128), @CDCCaptureInstance Varchar(128)

Declare @PrimaryKeyFields Varchar(Max)

 
  

Set @CDCTableLoopNo = 0

 
  

Declare @CDCTables Table (CDCObjectId Int, TableObjectId Int, Capture_Instance Varchar(128), Role_Name Varchar(128), Index_Name Varchar(128), TableName Varchar(128), SchemaId Int, SchemaName Varchar(128), RN SmallInt)

Insert Into @CDCTables

Select

    CDC_CT.Object_Id As CDCObjectId

    ,CDC_CT.Source_Object_Id As TableObjectId

    ,QuoteName(CDC_CT.Capture_Instance +'_CT') As Capture_Instance

    ,CDC_CT.Role_Name

    ,CDC_CT.Index_Name

    ,QuoteName(SysTbl.name) As TableName

    ,SysSchem.schema_id

    ,QuoteName(SysSchem.name) As SchemaName

    ,ROW_NUMBER() Over (Order By CDC_CT.Object_Id) As RN

From

    CDC.change_tables CDC_CT

    Inner Join Sys.tables As SysTbl On CDC_CT.Source_Object_Id = SysTbl.object_id

    Inner Join Sys.schemas As SysSchem On SysTbl.schema_id = SysSchem.schema_id

    
  

Set @CountCDCTables = @@ROWCOUNT

 
  

Declare @IndexColumns Table (TableObjectId Int, TableName Varchar(128), SchemaId Int, SchemaName Varchar(128), IndexId Int, ColumnId Int, ColumnName Varchar(128))

Insert Into @IndexColumns

Select

    sysTbls.object_id As TableObjectId

    ,QuoteName(sysTbls.name) As TableName

    ,SysSchem.schema_id As SchemaId

    ,QuoteName(SysSchem.name) As SchemaName

    ,SysIndex.index_id

    ,SysIndexCols.column_id

    ,QuoteName(SysCols.name) As ColumnName

From

    sys.tables As sysTbls

    Inner Join sys.schemas As SysSchem On sysTbls.schema_id = SysSchem.schema_id

    Inner Join sys.indexes As SysIndex On sysTbls.object_id = SysIndex.object_id

    Inner Join sys.index_columns As SysIndexCols On SysIndex.object_id = SysIndexCols.object_id And SysIndex.index_id = SysIndexCols.index_id

    Inner Join sys.columns As SysCols On SysIndexCols.column_id = SysCols.column_id And SysIndexCols.object_id = SysCols.object_id

Where

    SysIndex.type = 1

    And SysSchem.schema_id <> 12

Order By

    sysTbls.object_id

    
  

 
  

While @CDCTableLoopNo < @CountCDCTables

Begin

 
  

    Select @CDCTableName = '', @CDCSchemaName = '', @CDCCaptureInstance = '', @PrimaryKeyFields = ''


    Select @CDCTableName = TableName, @CDCSchemaName = SchemaName, @CDCCaptureInstance = Capture_Instance From @CDCTables Where RN = @CDCTableLoopNo + 1

    
  

    Select

     @PrimaryKeyFields = STUFF((Select ',' + SubQry.ColumnName From @IndexColumns As SubQry Where SubQry.TableObjectId = MainQry.TableObjectId And SubQry.SchemaId = MainQry.SchemaId For XML Path('')), 1, 1, '')

    From

        @IndexColumns As MainQry

    Where

        MainQry.SchemaName = @CDCSchemaName

        And MainQry.TableName = @CDCTableName

    Group By

        MainQry.SchemaId

        ,MainQry.TableObjectId

        ,MainQry.TableName

        
  

    If @PrimaryKeyFields = ''

    Begin

        Set @PrimaryKeyFields = '''-'''

    End

    
  

    Declare mySysCursor Cursor For

    Select

        SysSchem.schema_id As SchemaId

        ,QuoteName(SysSchem.name) As SchemaName

        ,SysTbls.object_id As TableId

        ,QuoteName(SysTbls.name) As TableName

        ,SysCols.column_id As ColumnId

        ,QuoteName(SysCols.name) As ColumnName

    From

        sys.schemas As SysSchem

        Inner Join sys.tables As SysTbls On SysSchem.schema_id = SysTbls.schema_id

        Inner Join Sys.columns As SysCols On SysTbls.object_id = SysCols.object_id

    Where

        QuoteName(SysTbls.name) = @CDCTableName

        And QuoteName(SysSchem.name) = @CDCSchemaName

        And SysCols.name Not In ('LastUpdatedBy', 'LastUpdatedDate')

    
  

    Open mySysCursor;     

Fetch Next From mySysCursor

    Into @SchemaId, @SchemaName, @TableId, @TableName, @ColumnId, @ColumnName

 
  

    While @@FETCH_STATUS = 0

    Begin

        Set @strSQL = '

        Insert Into Audit (AuditDate, AuditType, SchemaName, TableName, PrimaryKeyFields, PrimaryKeyValues, FieldName, OldValue, NewValue, UserName)

        Select

            Convert(DateTime, TimeMapping.tran_end_time, 101) As AuditDate

            ,(Case MAX(CDC_Tbl.__$operation) When 1 Then ' +'''D''' + ' When 2 Then ' + '''I''' + ' When 3 Then ' + '''U''' + ' When 4 Then ' + '''U''' + ' End) As AuditType

            ,('''+ @SchemaName + ''') As SchemaName

            ,(''' + @TableName + ''') As TableName

        '

            If @PrimaryKeyFields = '''-'''

            Begin

                Set @strSQL = @strSQL + '    ,''-'' As PrimaryKeyFields'

                Set @strSQL = @strSQL + '    ,''-'' As PrimaryKeyValues'

            End

            Else

            Begin

                Set @strSQL = @strSQL + '    ,''' + @PrimaryKeyFields + ''' As PrimaryKeyFields'

                Set @strSQL = @strSQL + ',(Select Stuff((Select Distinct '','' + Convert(Varchar(Max), SubQry.' + @PrimaryKeyFields + ') From cdc.' + @CDCCaptureInstance + ' As SubQry Where SubQry.__$start_lsn = CDC_Tbl.__$start_lsn And SubQry.__$seqval = DC_Tbl.__$seqval For XML Path('''')), 1, 1, '''')) As PrimaryKeyValues '

            End

        Set @strSQL = @strSQL +

        '            

            ,('''+ @ColumnName + ''') As FieldName

            ,Max(Case CDC_Tbl.__$operation When 2 Then '''' When 3 Then Convert(Varchar(Max), '+ @ColumnName + ') When 4 Then '''' When 1 Then Convert(Varchar(Max), ' + @ColumnName + ') End) As OldValue

            ,Max(Case CDC_Tbl.__$operation When 2 Then Convert(Varchar(Max), '+ @ColumnName +') When 3 Then '''' When 4 Then Convert(Varchar(Max), '+ @ColumnName + ') When 1 Then '''' End) As NewValue             

            ,Max(Case CDC_Tbl.__$operation When 2 Then LastUpdatedBy When 3 Then ''-'' When 4 Then LastUpdatedBy When 1 Then LastUpdatedBy End) As UserName

        From

            cdc.'+ @CDCCaptureInstance + ' As CDC_Tbl

            Left Outer Join cdc.lsn_time_mapping As TimeMapping On CDC_Tbl.__$start_lsn = TimeMapping.start_lsn

        Where

            TimeMapping.tran_end_time Between DATEADD(Day, -1, GetDate()) And DATEADD(Day, 0, GetDate())

        Group By

            Convert(DateTime, TimeMapping.tran_end_time, 101)

            ,CDC_Tbl.__$start_lsn

            ,CDC_Tbl.__$seqval

            '

            If @PrimaryKeyFields <> '''-'''

            Begin

             Set @strSQL = @strSQL + ',' + @PrimaryKeyFields + ''

            End

            Set @strSQL = @strSQL +
'            

        Having

            Max(Case CDC_Tbl.__$operation When 2 Then '''' When 3 Then Convert(Varchar(Max), '+ @ColumnName + ') When 4 Then '''' When 1 Then Convert(Varchar(Max), '+ @ColumnName +') End)

            <> Max(Case CDC_Tbl.__$operation When 2 Then Convert(Varchar(Max), '+ @ColumnName +') When 3 Then '''' When 4 Then Convert(Varchar(Max), '+ @ColumnName +') When 1 Then '''' End)

        Order By

            __$start_lsn

        '

        Exec (@strSQL)

         

        Fetch Next From mySysCursor

        Into @SchemaId, @SchemaName, @TableId, @TableName, @ColumnId, @ColumnName     
 

    End    

 
  

    Close mySysCursor

    DeAllocate mySysCursor

    
  

    Set @CDCTableLoopNo = @CDCTableLoopNo + 1     

End


 
  

No comments:

Post a Comment