So I took the time to build a reproduction of the conversation_endpoint problem that was discussed in another thread. I build two databases, with a send and receive queue. This is essentially the way the code works here at my site. I have a script near the bottom that sends messages every 5 minutes for 2 hours. If there is any logic that removes conversation_endpoints 30 min then the Message record table will show them.
Please let me know what I am doing wrong, so I can change my production code to help eliminate the large buildup in the sys.conversation_endpoints.
Thanks!
use master
go
if exists ( select * from sys.databases where name = 'SBSource' )
drop database SBSource
go
if exists ( select * from sys.databases where name = 'SBTarget' )
drop database SBTarget
go
-- Setup environment for test
create database SBSource
GO
ALTER DATABASE SBSource SET ENABLE_BROKER
ALTER DATABASE SBSource SET TRUSTWORTHY ON
GO
create database SBTarget
GO
ALTER DATABASE SBTarget SET ENABLE_BROKER
ALTER DATABASE SBTarget SET TRUSTWORTHY ON
GO
use SBSource
go
CREATE MESSAGE TYPE [msgTest] AUTHORIZATION [dbo];
CREATE CONTRACT [Test] ( [msgTest] SENT BY ANY );
CREATE QUEUE dbo.[SourceQueue] WITH STATUS = ON , RETENTION = OFF;
CREATE SERVICE [SBSourceTest] authorization [dbo]
ON QUEUE [dbo].[SourceQueue]
( [Test] );
CREATE ROUTE [ToTarget] AUTHORIZATION [dbo] WITH SERVICE_NAME = N'SBTargetTest' , ADDRESS = N'LOCAL';
GO
create procedure dbo.ProcessEndDialogMessages
as
begin
set nocount on;
declare @.conversation_handle uniqueidentifier,
@.message_type sysname,
@.message_body xml;
begin transaction;
WAITFOR (
RECEIVE @.conversation_handle = [conversation_handle],
@.message_type = [message_type_name],
@.message_body = [message_body]
FROM dbo.[SourceQueue]), TIMEOUT 1000;
while @.conversation_handle IS NOT NULL
begin
end conversation @.conversation_handle;
commit;
begin transaction;
set @.conversation_handle = null;
WAITFOR (
RECEIVE @.conversation_handle = [conversation_handle],
@.message_type = [message_type_name],
@.message_body = [message_body]
FROM dbo.[SourceQueue]), TIMEOUT 1000;
end
commit;
end
go
Alter QUEUE dbo.[SourceQueue] WITH STATUS = ON, Activation ( STatus = on, procedure_name = dbo.ProcessEndDialogMessages, MAX_QUEUE_READERS = 2, EXECUTE AS 'dbo' )
go
use SBTarget
go
CREATE MESSAGE TYPE [msgTest] AUTHORIZATION [dbo];
CREATE CONTRACT [Test] ( [msgTest] SENT BY ANY );
CREATE QUEUE dbo.[TargetQueue] WITH STATUS = ON , RETENTION = OFF;
CREATE SERVICE [SBTargetTest] authorization [dbo]
ON QUEUE [dbo].[TargetQueue]
( [Test] );
CREATE ROUTE [ToSource] AUTHORIZATION [dbo] WITH SERVICE_NAME = N'SBSourceTest' , ADDRESS = N'LOCAL';
GO
create table dbo.MessageRecord ( Conversation_handle uniqueidentifier, Inserted datetime )
go
create procedure dbo.ProcessTargetQueue
as
begin
set nocount on;
declare @.conversation_handle uniqueidentifier,
@.message_type sysname,
@.message_body xml;
begin transaction;
WAITFOR (
RECEIVE @.conversation_handle = [conversation_handle],
@.message_type = [message_type_name],
@.message_body = [message_body]
FROM dbo.[TargetQueue]), TIMEOUT 1000;
while @.conversation_handle IS NOT NULL
begin
insert into dbo.MessageRecord ( Conversation_handle, Inserted ) values ( @.conversation_handle, getdate() );
end conversation @.conversation_handle;
commit;
begin transaction;
set @.conversation_handle = null;
WAITFOR (
RECEIVE @.conversation_handle = [conversation_handle],
@.message_type = [message_type_name],
@.message_body = [message_body]
FROM dbo.[TargetQueue]), TIMEOUT 1000;
end
commit;
end
GO
Alter QUEUE dbo.[TargetQueue] WITH STATUS = ON, Activation ( STatus = on, procedure_name = dbo.ProcessTargetQueue, MAX_QUEUE_READERS = 2, EXECUTE AS 'dbo' )
go
use sbsource
go
-- start sending messages, check count in conv_endpoints along the way
set xact_abort on
set nocount on
declare @.EndAt datetime,
@.msg xml,
@.ch uniqueidentifier;
set @.EndAt = DATEADD( hh, 2, getdate() )
set @.msg = '<message>dfsafa</message>';
while getdate() < @.EndAt
begin
set @.ch = null;
begin transaction;
begin dialog conversation @.ch
from service [SBSourceTest]
to service 'SBTargetTest'
on contract [Test]
with
encryption=off;
send on conversation @.ch message type [msgTest] (@.msg);
note the abscence of an end conversation, so no fire and forget!
commit;
waitfor delay '00:05:00'
end
GO
-- check on data after complete.
select state, count(*)
from SBSource.sys.conversation_endpoints
group by state;
select state, count(*)
from SBTarget.sys.conversation_endpoints c
inner join sbtarget.dbo.MessageRecord m on c.conversation_handle = m.conversation_handle
group by state;
select m.*, c.*
from SBTarget.sys.conversation_endpoints c
inner join sbtarget.dbo.MessageRecord m on c.conversation_handle = m.conversation_handle
select *
from SBTarget.dbo.targetqueue
Please use the SQLServer feedback site to report this issue: https://connect.microsoft.com/SQLServer/Feedback
Thanks,
~ Remus
No comments:
Post a Comment