我们对MYSQL数据表更新实时触发sql server里面对应表的增删改操作,首先,增加原来的mysql的表的触发器,包括以下三个:
插入数据:
delimiter ||
DROP TRIGGER IF EXISTS t_afterinsert_on_accounts ||
CREATE TRIGGER t_afterinsert_on_accounts
AFTER INSERT ON sugarcrm642ce.accounts
FOR EACH ROW
BEGIN
insert into sugarcrm642cebackup.accountsbackup
(`id`,
`name`,
`date_entered`,
`date_modified`,
`modified_user_id`,
`created_by`,
`description`,
`deleted`,
`assigned_user_id`,
`account_type`,
`industry`,
`annual_revenue`,
`phone_fax`,
`billing_address_street`,
`billing_address_city`,
`billing_address_state`,
`billing_address_postalcode`,
`billing_address_country`,
`rating`,
`phone_office`,
`phone_alternate`,
`website`,
`ownership`,
`employees`,
`ticker_symbol`,
`shipping_address_street`,
`shipping_address_city`,
`shipping_address_state`,
`shipping_address_postalcode`,
`shipping_address_country`,
`parent_id`,
`sic_code`,
`campaign_id`,
`BehaviorType`) values(new.id,new.name,new.date_entered,new.date_modified,new.modified_user_id,new.created_by,new.description,new.deleted,
new.assigned_user_id,new.account_type,new.industry,new.annual_revenue,new.phone_fax,new.billing_address_street,new.billing_address_city,new.billing_address_state,
new.billing_address_postalcode,new.billing_address_country,new.rating,new.phone_office,new.phone_alternate,new.website,new.ownership,new.employees,new.ticker_symbol,
new.shipping_address_street,new.billing_address_city,new.shipping_address_state,new.shipping_address_postalcode,new.shipping_address_country,new.parent_id,
new.sic_code,new.campaign_id,'I');
END||
更新触发器
delimiter ||
DROP TRIGGER IF EXISTS t_afterupdate_on_accounts ||
CREATE TRIGGER t_afterupdate_on_accounts
AFTER update ON sugarcrm642ce.accounts
FOR EACH ROW
BEGIN
insert into sugarcrm642cebackup.accountsbackup
(`id`,
`name`,
`date_entered`,
`date_modified`,
`modified_user_id`,
`created_by`,
`description`,
`deleted`,
`assigned_user_id`,
`account_type`,
`industry`,
`annual_revenue`,
`phone_fax`,
`billing_address_street`,
`billing_address_city`,
`billing_address_state`,
`billing_address_postalcode`,
`billing_address_country`,
`rating`,
`phone_office`,
`phone_alternate`,
`website`,
`ownership`,
`employees`,
`ticker_symbol`,
`shipping_address_street`,
`shipping_address_city`,
`shipping_address_state`,
`shipping_address_postalcode`,
`shipping_address_country`,
`parent_id`,
`sic_code`,
`campaign_id`,
`BehaviorType`) values(new.id,new.name,new.date_entered,new.date_modified,new.modified_user_id,new.created_by,new.description,new.deleted,
new.assigned_user_id,new.account_type,new.industry,new.annual_revenue,new.phone_fax,new.billing_address_street,new.billing_address_city,new.billing_address_state,
new.billing_address_postalcode,new.billing_address_country,new.rating,new.phone_office,new.phone_alternate,new.website,new.ownership,new.employees,new.ticker_symbol,
new.shipping_address_street,new.billing_address_city,new.shipping_address_state,new.shipping_address_postalcode,new.shipping_address_country,new.parent_id,
new.sic_code,new.campaign_id,'U');
END||
删除触发器
delimiter ||
DROP TRIGGER IF EXISTS t_afterdelete_on_accounts ||
CREATE TRIGGER t_afterdelete_on_accounts
AFTER delete ON sugarcrm642ce.accounts
FOR EACH ROW
BEGIN
insert into sugarcrm642cebackup.accountsbackup
(`id`,
`name`,
`date_entered`,
`date_modified`,
`modified_user_id`,
`created_by`,
`description`,
`deleted`,
`assigned_user_id`,
`account_type`,
`industry`,
`annual_revenue`,
`phone_fax`,
`billing_address_street`,
`billing_address_city`,
`billing_address_state`,
`billing_address_postalcode`,
`billing_address_country`,
`rating`,
`phone_office`,
`phone_alternate`,
`website`,
`ownership`,
`employees`,
`ticker_symbol`,
`shipping_address_street`,
`shipping_address_city`,
`shipping_address_state`,
`shipping_address_postalcode`,
`shipping_address_country`,
`parent_id`,
`sic_code`,
`campaign_id`,
`BehaviorType`) values(old.id,old.name,old.date_entered,old.date_modified,old.modified_user_id,old.created_by,old.description,old.deleted,
old.assigned_user_id,old.account_type,old.industry,old.annual_revenue,old.phone_fax,old.billing_address_street,old.billing_address_city,old.billing_address_state,
old.billing_address_postalcode,old.billing_address_country,old.rating,old.phone_office,old.phone_alternate,old.website,old.ownership,old.employees,old.ticker_symbol,
old.shipping_address_street,old.billing_address_city,old.shipping_address_state,old.shipping_address_postalcode,old.shipping_address_country,old.parent_id,
old.sic_code,old.campaign_id,'D');
END||
上面必须增加behaviortype字段,因为如果是I代表插入,U代表更新,D代表删除,接着增加mysql的备份表,专门记录这三种操作。
CREATE TABLE `accountsbackup` (
`id` char(36) NOT NULL,
`name` varchar(150) DEFAULT NULL,
`date_entered` datetime DEFAULT NULL,
`date_modified` datetime DEFAULT NULL,
`modified_user_id` char(36) DEFAULT NULL,
`created_by` char(36) DEFAULT NULL,
`description` text,
`deleted` tinyint(1) DEFAULT NULL,
`assigned_user_id` char(36) DEFAULT NULL,
`account_type` varchar(50) DEFAULT NULL,
`industry` varchar(50) DEFAULT NULL,
`annual_revenue` varchar(100) DEFAULT NULL,
`phone_fax` varchar(100) DEFAULT NULL,
`billing_address_street` varchar(150) DEFAULT NULL,
`billing_address_city` varchar(100) DEFAULT NULL,
`billing_address_state` varchar(100) DEFAULT NULL,
`billing_address_postalcode` varchar(20) DEFAULT NULL,
`billing_address_country` varchar(255) DEFAULT NULL,
`rating` varchar(100) DEFAULT NULL,
`phone_office` varchar(100) DEFAULT NULL,
`phone_alternate` varchar(100) DEFAULT NULL,
`website` varchar(255) DEFAULT NULL,
`ownership` varchar(100) DEFAULT NULL,
`employees` varchar(10) DEFAULT NULL,
`ticker_symbol` varchar(10) DEFAULT NULL,
`shipping_address_street` varchar(150) DEFAULT NULL,
`shipping_address_city` varchar(100) DEFAULT NULL,
`shipping_address_state` varchar(100) DEFAULT NULL,
`shipping_address_postalcode` varchar(20) DEFAULT NULL,
`shipping_address_country` varchar(255) DEFAULT NULL,
`parent_id` char(36) DEFAULT NULL,
`sic_code` varchar(10) DEFAULT NULL,
`campaign_id` char(36) DEFAULT NULL,
`BehaviorType` varchar(45) DEFAULT NULL,
`ExecutingState` varchar(45) DEFAULT NULL,
`ModificationTime` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
这个表如无意外,ID不要设置成为唯一索引,因为本身就存在多条同一个记录的更改。接下来在sql server建立对应的表。
USE [SugarCRMDB]
GO
/****** Object: Table [dbo].[account] Script Date: 2015/6/24 13:49:20 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbo]. [account](
[id] [char] (36) NOT NULL,
[name] [varchar] (150) NULL,
[date_entered] [datetime] NULL,
[date_modified] [datetime] NULL,
[modified_user_id] [char] (36) NULL,
[created_by] [char] (36) NULL,
[description] [text] NULL,
[deleted] [smallint] NULL,
[assigned_user_id] [char] (36) NULL,
[account_type] [varchar] (50) NULL,
[industry] [varchar] (50) NULL,
[annual_revenue] [varchar] (100) NULL,
[phone_fax] [varchar] (100) NULL,
[billing_address_street] [varchar] (150) NULL,
[billing_address_city] [varchar] (100) NULL,
[billing_address_state] [varchar] (100) NULL,
[billing_address_postalcode] [varchar] (20) NULL,
[billing_address_country] [varchar] (255) NULL,
[rating] [varchar] (100) NULL,
[phone_office] [varchar] (100) NULL,
[phone_alternate] [varchar] (100) NULL,
[website] [varchar] (255) NULL,
[ownership] [varchar] (100) NULL,
[employees] [varchar] (10) NULL,
[ticker_symbol] [varchar] (10) NULL,
[shipping_address_street] [varchar] (150) NULL,
[shipping_address_city] [varchar] (100) NULL,
[shipping_address_state] [varchar] (100) NULL,
[shipping_address_postalcode] [varchar] (20) NULL,
[shipping_address_country] [varchar] (255) NULL,
[parent_id] [char] (36) NULL,
[sic_code] [varchar] (10) NULL,
[campaign_id] [char] (36) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
SET ANSI_PADDING OFF
GO
最后建立sql server存储过程,这边尤其要注意的是要rtrim mysql char类型,因为sql server对这个是完全填充的,如果有多余空格插入会报错,同时 要定义为 别名,因为在 下面的插入是根据别名字段来的。
USE [SugarCRMDB]
GO
/****** Object: StoredProcedure [dbo].[trigger_account] Script Date: 2015/6/24 14:38:38 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER procedure [dbo]. [trigger_account]
as
--insert into sugarcrmtablebackup(id,name) select * from openquery(MySql, 'select id,name from sugarcrmtablebackup where NOT ISNULL(ExecutingState) ')
--插入BehaviorState为I的记录
--insert into account(id,name,date_entered,date_modified,modified_user_id,created_by,[description],deleted,assigned_user_id,account_type,industry,annual_revenue,phone_fax,billing_address_street,billing_address_city,billing_address_state,billing_address_postalcode,billing_address_country,rating,phone_office,phone_alternate,website,[ownership],employees,ticker_symbol,shipping_address_street,shipping_address_city,shipping_address_state,shipping_address_postalcode,shipping_address_country,parent_id,sic_code,campaign_id)
--select * from openquery(MySql, 'select id,name,date_entered,date_modified,modified_user_id,created_by,description,deleted,assigned_user_id,account_type,industry,annual_revenue,phone_fax,billing_address_street,billing_address_city,billing_address_state,billing_address_postalcode,billing_address_country,rating,phone_office,phone_alternate,website,ownership,employees,ticker_symbol,shipping_address_street,shipping_address_city,shipping_address_state,shipping_address_postalcode,shipping_address_country,parent_id,sic_code,campaign_id from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''I''')
insert into account( id ,name, date_entered,date_modified ,modified_user_id, created_by,[description] ,deleted, assigned_user_id,account_type ,industry, annual_revenue,phone_fax ,billing_address_street, billing_address_city,billing_address_state ,billing_address_postalcode, billing_address_country,rating ,phone_office, phone_alternate,website ,[ownership], employees,ticker_symbol ,shipping_address_street, shipping_address_city,shipping_address_state ,shipping_address_postalcode, shipping_address_country,parent_id ,sic_code, campaign_id)
select * from openquery(MySql , 'select rtrim(id), name,date_entered,date_modified,rtrim(modified_user_id),rtrim(created_by),description,deleted,rtrim(assigned_user_id),account_type,industry,annual_revenue,phone_fax,billing_address_street,billing_address_city,billing_address_state,billing_address_postalcode,billing_address_country,rating,phone_office,phone_alternate,website,ownership,employees,ticker_symbol,shipping_address_street,shipping_address_city,shipping_address_state,shipping_address_postalcode,shipping_address_country,rtrim(parent_id),sic_code,rtrim(campaign_id) from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''I''')
update openquery (MySql, 'select * from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''i''') set ExecutingState='D' ,ModificationTime= getdate()
--删除BehaviorState为U的记录
if object_id ('tempdb..#temp') is not null Begin
drop table #temp
End
select * into #temp from openquery( MySql, 'select rtrim(id),name from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''D''')
declare @count int --变量定义
set @count =(select count(*) from #temp );
if @count >0 begin
delete from account where id in (select rtrim(id ) from #temp)
update openquery (MySql, 'select * from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''D''') set ExecutingState='D' ,ModificationTime= getdate()
End
--更新BehaviorState为D的记录
if object_id ('tempdb..#temp1') is not null Begin
drop table #temp1
End
select * into #temp1 from openquery( MySql, 'select rtrim(id) id, name,date_entered,date_modified,rtrim(modified_user_id) modified_user_id,rtrim(created_by) created_by,description,deleted,rtrim(assigned_user_id) assigned_user_id,account_type,industry,annual_revenue,phone_fax,billing_address_street,billing_address_city,billing_address_state,billing_address_postalcode,billing_address_country,rating,phone_office,phone_alternate,website,ownership,employees,ticker_symbol,shipping_address_street,shipping_address_city,shipping_address_state,shipping_address_postalcode,shipping_address_country,rtrim(parent_id) parent_id,sic_code,rtrim(campaign_id) campaign_id from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''U''')
declare @count2 int --变量定义
set @count2 =(select count(*) from #temp1 )
if @count2 >0 begin
if EXISTS (SELECT * FROM account a, #temp1 b WHERE a.ID = b.ID )
--declare @a int
update account set id=t .id, name=t .name, date_entered=t .date_entered, date_modified=t .date_modified, modified_user_id=t .modified_user_id, created_by=t .created_by,
[description]=t .[description], deleted=t .deleted, assigned_user_id=t .assigned_user_id, account_type=t .account_type, industry=t .industry, annual_revenue=t .annual_revenue,
phone_fax=t .phone_fax, billing_address_street=t .billing_address_street, billing_address_city=t .billing_address_city, billing_address_state=t .billing_address_state,
billing_address_postalcode =t. billing_address_postalcode,billing_address_country =t. billing_address_country,rating =t. rating,phone_office =t. phone_office,
phone_alternate=t .phone_alternate, website=t .website, [ownership]=t .[ownership], employees=t .employees, ticker_symbol=t .ticker_symbol, shipping_address_street=t .shipping_address_street,
shipping_address_city =t. shipping_address_city,shipping_address_state =t. shipping_address_state,shipping_address_postalcode =t. shipping_address_postalcode,
shipping_address_country =t. shipping_address_country,parent_id =t. parent_id,sic_code =t. sic_code,campaign_id =t. campaign_id from account inner join( select id ,name, date_entered,date_modified ,modified_user_id, created_by,[description] ,deleted, assigned_user_id,account_type ,industry, annual_revenue,phone_fax ,billing_address_street, billing_address_city,billing_address_state ,billing_address_postalcode, billing_address_country,rating ,phone_office, phone_alternate,website ,[ownership], employees,ticker_symbol ,shipping_address_street, shipping_address_city,shipping_address_state ,shipping_address_postalcode, shipping_address_country,parent_id ,sic_code, campaign_id from #temp1) t on t. id=account .id
if EXISTS (SELECT * FROM account a, #temp1 b WHERE a.ID != b.ID )
if object_id ('tempdb..#temp2') is not null Begin
drop table #temp2
end
select * into #temp2 from #temp1 a where a. id not in (select id from account )
insert into account ( id ,name, date_entered,date_modified ,modified_user_id, created_by,[description] ,deleted, assigned_user_id,account_type ,industry, annual_revenue,phone_fax ,billing_address_street, billing_address_city,billing_address_state ,billing_address_postalcode, billing_address_country,rating ,phone_office, phone_alternate,website ,[ownership], employees,ticker_symbol ,shipping_address_street, shipping_address_city,shipping_address_state ,shipping_address_postalcode, shipping_address_country,parent_id ,sic_code, campaign_id)
select t .id ,t. name,t .date_entered, t.date_modified ,t. modified_user_id,t .created_by, t.[description] ,t. deleted,t .assigned_user_id, t.account_type ,t. industry,t .annual_revenue, t.phone_fax ,t. billing_address_street,t .billing_address_city, t.billing_address_state ,t. billing_address_postalcode,t .billing_address_country, t.rating ,phone_office, t.phone_alternate ,t. website,t .[ownership], t.employees ,t. ticker_symbol,t .shipping_address_street, t.shipping_address_city ,t. shipping_address_state,t .shipping_address_postalcode, t.shipping_address_country ,t. parent_id,t .sic_code, t.campaign_id from #temp2 t
--inner join ( select id from account) a on a.id !=t.id
if not exists (SELECT * FROM account a,#temp1 b WHERE a .ID = b .ID) and not exists (SELECT * FROM account a, #temp1 b WHERE a.ID != b.ID )
insert into account ( id ,name, date_entered,date_modified ,modified_user_id, created_by,[description] ,deleted, assigned_user_id,account_type ,industry, annual_revenue,phone_fax ,billing_address_street, billing_address_city,billing_address_state ,billing_address_postalcode, billing_address_country,rating ,phone_office, phone_alternate,website ,[ownership], employees,ticker_symbol ,shipping_address_street, shipping_address_city,shipping_address_state ,shipping_address_postalcode, shipping_address_country,parent_id ,sic_code, campaign_id)
select t .id ,t. name,t .date_entered, t.date_modified ,t. modified_user_id,t .created_by, t.[description] ,t. deleted,t .assigned_user_id, t.account_type ,t. industry,t .annual_revenue, t.phone_fax ,t. billing_address_street,t .billing_address_city, t.billing_address_state ,t. billing_address_postalcode,t .billing_address_country, t.rating ,phone_office, t.phone_alternate ,t. website,t .[ownership], t.employees ,t. ticker_symbol,t .shipping_address_street, t.shipping_address_city ,t. shipping_address_state,t .shipping_address_postalcode, t.shipping_address_country ,t. parent_id,t .sic_code, t.campaign_id from #temp1 t
update openquery (MySql, 'select * from sugarcrm642cebackup.accountsbackup where ISNULL(ExecutingState) and BehaviorType=''U''') set ExecutingState='D' ,ModificationTime= getdate()
End
--delete from sql_tem
--delete openquery(MySql, 'SELECT * FROM sugarcrmtablebackup')
最后再开启sql server代理,因为本身这个需要计划任务每秒去monitor这个mysql变化,因为sql server这边只能设置10秒更新一次,那这边就只能如此,如果要看是否成功,可以右击该计划任务查看历史记录。
sqlserver数据实时同步到mysql
1.安装安装mysqlconnector
2.配置mysqlconnector
ODBC数据管理器->系统DSN->添加->mysql ODBC 5.3 ANSI driver->填入data source name如jt,mysql的ip、用户名、密码即可
3.新建链接服务器
exec sp_addlinkedserver
@server='jt', --ODBC里面data source name
@srvproduct='mysql', --自己随便
@provider='MSDASQL', --固定这个
@datasrc=NULL,
@location=NULL,
@provstr='DRIVER={MySQL ODBC 5.3 ANSI Driver};SERVER=192.168.5.188;DATABASE=suzhou;UID=root;PORT=3306;',
@catalog = NULL
exec sp_addlinkedsrvlogin
@rmtsrvname='jt',
@useself='false',
@rmtuser='root',
@rmtpassword='password';
select * from openquery(jt,'SELECT * FROM sz ; ')
GO
USE [master]
GO
EXEC master.dbo.sp_serveroption @server=N'jt', @optname=N'rpc out', @optvalue=N'TRUE'
GO
EXEC master.dbo.sp_serveroption @server=N'jt', @optname=N'remote proc transaction promotion', @optvalue=N'false'
GO
---4.sqlserver和mysql新建库和表
create database suzhou;
create table sz(
id int not null identity(1,1) primary key,
orderno char(20) not null,
ordertime datetime not null default getdate(),
remark varchar(200)
)
go
create table sz(
id int(11) not null ,
orderno char(20) not null,
ordertime datetime(6) not null ,
remark varchar(200),
primary key (id)
) engine=innodb default charset=utf8;
---5.建立回环
--建立LOOPBACK 服务器链接
EXEC sp_addlinkedserver @server = N'loopback' , @srvproduct = N' ' , @provider = N'SQLNCLI',
@datasrc = @@SERVERNAME
go
--设置服务器链接选项,阻止SQL Server 由于远过程调用而将本地事务提升为分布事务(重点)
USE [master]
GO
EXEC master.dbo.sp_serveroption @server=N'loopback', @optname=N'rpc out', @optvalue=N'TRUE'
GO
EXEC master.dbo.sp_serveroption @server=N'loopback', @optname=N'remote proc transaction promotion', @optvalue=N'false'
GO
----6.编写触发器和存储过程
----6.1 insert
--重写触发器
use suzhou
go
alter trigger tr_insert_sz on suzhou.dbo.sz
for insert
as
declare @id int, @orderno char(20),@ordertime datetime,@remark varchar(200)
select @id=id,@orderno=orderno,@ordertime=ordertime,@remark =remark from inserted;
begin
print @id
print @orderno
print @ordertime
print @remark
exec loopback.suzhou.dbo.sp_insert @id,@orderno,@ordertime,@remark
end
go
--存储过程
use suzhou
go
create PROCEDURE sp_insert(
@id int,
@orderno char(20),
@ordertime datetime,
@remark varchar(200)
)
AS
BEGIN
SET NOCOUNT ON;
Insert openquery(jt, 'select * from sz')(id,orderno,ordertime,remark)values(@id,@orderno,@ordertime,@remark)
END
go
----6.2 update
--重写触发器
use suzhou
go
create trigger tr_update_sz on suzhou.dbo.sz
for update
as
declare @orderno char(20),@remark varchar(200)
select @orderno=orderno,@remark =remark from inserted;
begin
exec loopback.suzhou.dbo.sp_update @orderno,@remark
end
go
--存储过程
use suzhou
go
create PROCEDURE sp_update(
@orderno char(20),
@remark varchar(200)
)
AS
BEGIN
SET NOCOUNT ON;
update openquery(jt, 'select * from sz') set remark=@remark where orderno=@orderno
END
go
---update数据测试
use suzhou
go
update sz set remark='ocpyang' where orderno='a001'
go
----6.3 delete
--重写触发器
use suzhou
go
create trigger tr_delete_sz on suzhou.dbo.sz
for delete
as
declare @orderno char(20)
select @orderno=orderno from deleted;
begin
exec loopback.suzhou.dbo.sp_delete @orderno
end
go
--存储过程
use suzhou
go
create PROCEDURE sp_delete(
@orderno char(20)
)
AS
BEGIN
SET NOCOUNT ON;
delete openquery(jt, 'select * from sz') where orderno=@orderno
END
go
---delete数据测试
use suzhou
go
delete from sz where orderno='a001'
go