Monday, March 26, 2012

More conversation_endpoints

So I took the time to build a reproduction of the conversation_endpoint problem that was discussed in another thread. I build two databases, with a send and receive queue. This is essentially the way the code works here at my site. I have a script near the bottom that sends messages every 5 minutes for 2 hours. If there is any logic that removes conversation_endpoints 30 min then the Message record table will show them.

Please let me know what I am doing wrong, so I can change my production code to help eliminate the large buildup in the sys.conversation_endpoints.

Thanks!

use master

go

if exists ( select * from sys.databases where name = 'SBSource' )

drop database SBSource

go

if exists ( select * from sys.databases where name = 'SBTarget' )

drop database SBTarget

go

-- Setup environment for test

create database SBSource

GO

ALTER DATABASE SBSource SET ENABLE_BROKER

ALTER DATABASE SBSource SET TRUSTWORTHY ON

GO

create database SBTarget

GO

ALTER DATABASE SBTarget SET ENABLE_BROKER

ALTER DATABASE SBTarget SET TRUSTWORTHY ON

GO

use SBSource

go

CREATE MESSAGE TYPE [msgTest] AUTHORIZATION [dbo];

CREATE CONTRACT [Test] ( [msgTest] SENT BY ANY );

CREATE QUEUE dbo.[SourceQueue] WITH STATUS = ON , RETENTION = OFF;

CREATE SERVICE [SBSourceTest] authorization [dbo]

ON QUEUE [dbo].[SourceQueue]

( [Test] );

CREATE ROUTE [ToTarget] AUTHORIZATION [dbo] WITH SERVICE_NAME = N'SBTargetTest' , ADDRESS = N'LOCAL';

GO

create procedure dbo.ProcessEndDialogMessages

as

begin

set nocount on;

declare @.conversation_handle uniqueidentifier,

@.message_type sysname,

@.message_body xml;

begin transaction;

WAITFOR (

RECEIVE @.conversation_handle = [conversation_handle],

@.message_type = [message_type_name],

@.message_body = [message_body]

FROM dbo.[SourceQueue]), TIMEOUT 1000;

while @.conversation_handle IS NOT NULL

begin

end conversation @.conversation_handle;

commit;

begin transaction;

set @.conversation_handle = null;

WAITFOR (

RECEIVE @.conversation_handle = [conversation_handle],

@.message_type = [message_type_name],

@.message_body = [message_body]

FROM dbo.[SourceQueue]), TIMEOUT 1000;

end

commit;

end

go

Alter QUEUE dbo.[SourceQueue] WITH STATUS = ON, Activation ( STatus = on, procedure_name = dbo.ProcessEndDialogMessages, MAX_QUEUE_READERS = 2, EXECUTE AS 'dbo' )

go

use SBTarget

go

CREATE MESSAGE TYPE [msgTest] AUTHORIZATION [dbo];

CREATE CONTRACT [Test] ( [msgTest] SENT BY ANY );

CREATE QUEUE dbo.[TargetQueue] WITH STATUS = ON , RETENTION = OFF;

CREATE SERVICE [SBTargetTest] authorization [dbo]

ON QUEUE [dbo].[TargetQueue]

( [Test] );

CREATE ROUTE [ToSource] AUTHORIZATION [dbo] WITH SERVICE_NAME = N'SBSourceTest' , ADDRESS = N'LOCAL';

GO

create table dbo.MessageRecord ( Conversation_handle uniqueidentifier, Inserted datetime )

go

create procedure dbo.ProcessTargetQueue

as

begin

set nocount on;

declare @.conversation_handle uniqueidentifier,

@.message_type sysname,

@.message_body xml;

begin transaction;

WAITFOR (

RECEIVE @.conversation_handle = [conversation_handle],

@.message_type = [message_type_name],

@.message_body = [message_body]

FROM dbo.[TargetQueue]), TIMEOUT 1000;

while @.conversation_handle IS NOT NULL

begin

insert into dbo.MessageRecord ( Conversation_handle, Inserted ) values ( @.conversation_handle, getdate() );

end conversation @.conversation_handle;

commit;

begin transaction;

set @.conversation_handle = null;

WAITFOR (

RECEIVE @.conversation_handle = [conversation_handle],

@.message_type = [message_type_name],

@.message_body = [message_body]

FROM dbo.[TargetQueue]), TIMEOUT 1000;

end

commit;

end

GO

Alter QUEUE dbo.[TargetQueue] WITH STATUS = ON, Activation ( STatus = on, procedure_name = dbo.ProcessTargetQueue, MAX_QUEUE_READERS = 2, EXECUTE AS 'dbo' )

go

use sbsource

go

-- start sending messages, check count in conv_endpoints along the way

set xact_abort on

set nocount on

declare @.EndAt datetime,

@.msg xml,

@.ch uniqueidentifier;

set @.EndAt = DATEADD( hh, 2, getdate() )

set @.msg = '<message>dfsafa</message>';

while getdate() < @.EndAt

begin

set @.ch = null;

begin transaction;

begin dialog conversation @.ch

from service [SBSourceTest]

to service 'SBTargetTest'

on contract [Test]

with

encryption=off;

send on conversation @.ch message type [msgTest] (@.msg);

note the abscence of an end conversation, so no fire and forget!

commit;

waitfor delay '00:05:00'

end

GO

-- check on data after complete.

select state, count(*)

from SBSource.sys.conversation_endpoints

group by state;

select state, count(*)

from SBTarget.sys.conversation_endpoints c

inner join sbtarget.dbo.MessageRecord m on c.conversation_handle = m.conversation_handle

group by state;

select m.*, c.*

from SBTarget.sys.conversation_endpoints c

inner join sbtarget.dbo.MessageRecord m on c.conversation_handle = m.conversation_handle

select *

from SBTarget.dbo.targetqueue

Please use the SQLServer feedback site to report this issue: https://connect.microsoft.com/SQLServer/Feedback

Thanks,
~ Remus

No comments:

Post a Comment