Skip to content

Commit

Permalink
modify fetched message method without delete queue message when get q…
Browse files Browse the repository at this point in the history
…ueue message.
  • Loading branch information
yang-xiaodong committed Dec 17, 2017
1 parent 6d5c771 commit 3f4f349
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
14 changes: 10 additions & 4 deletions src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ namespace DotNetCore.CAP.MySql
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly MySqlOptions _options;
private readonly string _processId;

public MySqlFetchedMessage(int messageId, MessageType type, MySqlOptions options)
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;

_processId = processId;
_options = options;
}

Expand All @@ -22,15 +24,19 @@ public MySqlFetchedMessage(int messageId, MessageType type, MySqlOptions options

public void RemoveFromQueue()
{
// ignored
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}

public void Requeue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"insert into `{_options.TableNamePrefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);"
, new {MessageId, MessageType });
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)

public Task<IFetchedMessage> FetchNextMessageAsync()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;
DELETE FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId";
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";

return FetchNextMessageCoreAsync(sql, new { ProcessId = Guid.NewGuid().ToString() });
return FetchNextMessageCoreAsync(sql, processId);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
Expand Down Expand Up @@ -139,18 +139,18 @@ public bool ChangeReceivedState(int messageId, string state)
}
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args);
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}

if (fetchedMessage == null)
return null;

return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, Options);
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}

public void Dispose()
Expand Down

0 comments on commit 3f4f349

Please sign in to comment.