基于云存儲實現用Windows Azure Storage增強應用程序的引擎
概述:您可以在云中運行后臺進程。Kevin Hoffman 和 Nate Dudek 使用購物車示例演示如何構建應用程序引擎,以及如何使用Azure Storage實現異步消息傳送和處理。
開發人員容易依賴其有形的、可感知的基礎結構,就像其是“安全毛毯”。他們知道如何使用、如何操作,如果出現問題,他們知道問題出在哪兒。而這通常會成為開發人員采用更新的技術(例如云計算)的障礙。
心存懷疑的開發人員提出的最大疑問之一是他們如何在云中繼續運行后臺進程,即他們的引擎如何繼續運行。本文旨在通過向您演示如何構建應用程序引擎以及使用 Windows Azure Storage 實現異步消息傳送和處理來為您揭開云中缺乏后臺處理的神秘面紗。
為了證明開發人員可以拋開其有形的基礎結構這條“安全毛毯”并將其應用程序引擎置于云中,我們將介紹如何實現電子商務應用程序的一個小型子集 Hollywood Hackers,您可以從中購買到 Hollywood 用于完全忽略物理法則和過時的常識的所有神奇技術。
我們將介紹的兩個主要方案如下:
將異步文本消息 (“toasts”) 發送給使用該應用程序的用戶,以通知他們發生的重要事件(如已提交他們的購物車)或在員工之間發送消息。此方案使用 Windows Azure Queue、Windows Azure Table 和 Windows Azure 工作者角色。
此方案使用 Windows Azure Queue 和 Windows Azure 工作者角色將購物車提交給執行引擎。
使用隊列存儲進行內部應用程序消息傳送
在介紹具體的方案之前,我們需要先介紹一些有關 Windows Azure Queue 的基礎知識。云中的隊列與傳統的 .NET 應用程序中的隊列的運行方式不太一樣。在處理 AppDomain 中的數據時,您知道該數據只有一份,它完整地位于單一托管進程中。
而在云中,您的一部分數據可能在加利福尼亞,而另一部分可能在紐約,并且您可能會安排一個工作者角色在德克薩斯州對該數據進行處理,而另一工作者角色在北達科他州進行數據處理。
很多開發人員在適應這種分布式計算和分布式數據時面臨著一些不熟悉的問題,例如對可能出現的故障進行編碼、針對數據提交形成多次重試的概念甚至冪等性的理念。
只要您不將 Windows Azure Queue 視為進程內的常規 CLR 隊列,其工作方式其實非常簡單。首先,應用程序將向隊列獲取一些數量的消息(需要記住,一次不會超過 20 條)并提供一個超時。此超時控制對其他隊列處理客戶端隱藏這些消息的時間。當應用程序成功完成需要對隊列消息進行的所有處理后,將刪除該消息。
如果應用程序引發異常或處理隊列消息失敗,則在超時期限過后,其他客戶端可以再次看到該消息。因此,當一個工作者角色處理失敗后,其他工作者角色可以繼續進行處理。向隊列提交消息非常簡單:應用程序直接或借助客戶端庫形成適當的 HTTP POST 消息,然后提交字符串或字節數組。隊列專門用于進行內部應用程序消息傳送和非永久存儲,因此這些消息占用的空間需要相當小。
如上所述,您可能安排多個工作者角色都嘗試處理同一消息。雖然隱藏當前正在處理的消息的不可見超時很有幫助,但不能確保萬無一失。要完全避免沖突,您應該將您的引擎處理設計為冪等。換句話說,同一隊列消息應該可以由一個或多個工作者角色處理多次,而不會使應用程序處于不一致的狀態。
理想情況下,您希望工作者角色可以檢測出是否已完成對給定消息的處理。在編寫工作者角色來處理隊列消息時,請牢記您的代碼可能會嘗試處理已處理過的消息,盡管這個可能性微乎其微。
圖 1 中的代碼段顯示了如何使用隨 Windows Azure SDK 一起提供的 StorageClient 程序集創建消息并將其提交給 Windows Azure Queue。StorageClient 庫實際上只是 Windows Azure Storage HTTP 接口周圍的包裝。
圖 1 創建消息并將其提交給 Windows Azure Queue
string accountName;
string accountSharedKey;
string queueBaseUri;
string StorageCredentialsAccountAndKey credentials;
if (RoleEnvironment.IsAvailable)
{
// We are running in a cloud - INCLUDING LOCAL!
accountName =
RoleEnvironment.GetConfigurationSettingValue("AccountName");
accountSharedKey =
RoleEnvironment.GetConfigurationSettingValue("AccountSharedKey");
queueBaseUri = RoleEnvironment.GetConfigurationSettingValue
("QueueStorageEndpoint");
}
else
{
accountName = ConfigurationManager.AppSettings["AccountName"];
accountSharedKey =
ConfigurationManager.AppSettings["AccountSharedKey"];
queueBaseUri =
ConfigurationManager.AppSettings["QueueStorageEndpoint"];
}
credentials =
new StorageCredentialsAccountAndKey(accountName, accountSharedKey);
CloudQueueClient client =
new CloudQueueClient(queueBaseUri, credentials);
CloudQueue queue = client.GetQueueReference(queueName);
CloudQueueMessage m = new CloudQueueMessage(
/* string or byte[] representing message to enqueue */);
Queue.AddMessage(m);
對于本文中的其他示例,我們使用了可以簡化此過程的一些包裝類(位于 Hollywood Hackers 的 CodePlex 站點中,網址為:hollywoodhackers.codeplex.com/SourceControl/ListDownloadableCommits.aspx)。
異步消息傳送 (Toast)
當今,交互式網站不僅是一種時尚,更是一種需求。用戶已經習慣了完全交互式網站,以致于當他們遇到一個靜態的非交互式頁面時會認為什么地方出問題了。考慮到這一點,我們希望可以在我們的用戶使用這樣的站點時向他們發送通知。
為此,我們將利用 Windows Azure Queue 和 Windows Azure Table 存儲機制構建一個消息傳遞框架。客戶端將使用與 jQuery Gritter 插件結合的 jQuery 在用戶的瀏覽器中將通知顯示為一個 toast,類似于當您收到新的 Outlook 電子郵件、即時消息或警報聲時在 Windows 系統托盤上方淡出的消息。
當需要向某個用戶發送通知時,該用戶將被插入到隊列中。因為工作者角色負責處理隊列中的每個項目,所以該角色將動態確定如何處理每個項目。在本例中,引擎只需要執行一個操作,但在復雜的 CRM 網站或支持站點中,要執行的操作可能不計其數。
工作者角色在隊列中遇到用戶通知時,會將該通知存儲在表存儲中并將其從隊列中刪除。這樣,消息可以保留很長時間并等待用戶登錄進行處理。隊列存儲中的消息的最大保存期限比較短,不會超過幾天。當用戶訪問網站時,我們的 jQuery 腳本將異步獲取表中的所有消息,并通過在控制器上調用可返回 JavaScript Object Notation (JSON) 的方法在瀏覽器中以常見的形式顯示這些消息。
盡管隊列只處理字符串或字節數組,但我們可以通過將任何類型的結構化數據序列化為二進制文件來將其存儲在隊列中,然后在我們需要使用時再將其轉換回來。這成為將強類型化的對象傳遞到隊列中的出色技術。我們會將此技術構建到我們的隊列消息的基類中。然后,我們的系統消息類可以包含我們的數據,而且可以將整個對象提交到隊列中并根據需要進行利用(請參見圖 2)。
圖 2 在隊列中存儲結構化數據
namespace HollywoodHackers.Storage.Queue
{
[Serializable]
public class QueueMessageBase
{
public byte[] ToBinary()
{
BinaryFormatter bf = new BinaryFormatter();
MemoryStream ms = new MemoryStream();
ms.Position = 0;
bf.Serialize(ms, this);
byte[] output = ms.GetBuffer();
ms.Close();
return output;
}
public static T FromMessage
{
byte[] buffer = m.AsBytes();
MemoryStream ms = new MemoryStream(buffer);
ms.Position = 0;
BinaryFormatter bf = new BinaryFormatter();
return (T)bf.Deserialize(ms);
}
}
[Serializable]
public class ToastQueueMessage : QueueMessageBase
{
public ToastQueueMessage()
: base()
{
}
public string TargetUserName { get; set; }
public string MessageText { get; set; }
public string Title { get; set; }
public DateTime CreatedOn { get; set; }
}
請記住,要使用 BinaryFormatter 類,需要以完全信任模式(可以通過服務配置文件啟用此模式)運行 Windows Azure 工作者角色。
現在,我們需要一個簡單的包裝來與我們的隊列交互。從本質上說,我們需要能夠將消息插入隊列,獲取任何掛起的消息并清除該隊列(請參見圖 3)。
圖 3 用于與隊列交互的包裝
namespace HollywoodHackers.Storage.Queue
{
public class StdQueue
StorageBase where T : QueueMessageBase, new()
{
protected CloudQueue queue;
protected CloudQueueClient client;
public StdQueue(string queueName)
{
client = new CloudQueueClient
(StorageBase.QueueBaseUri, StorageBase.Credentials);
queue = client.GetQueueReference(queueName);
queue.CreateIfNotExist();
}
public void AddMessage(T message)
{
CloudQueueMessage msg =
new CloudQueueMessage(message.ToBinary());
queue.AddMessage(msg);
}
public void DeleteMessage(CloudQueueMessage msg)
{
queue.DeleteMessage(msg);
}
public CloudQueueMessage GetMessage()
{
return queue.GetMessage(TimeSpan.FromSeconds(60));
}
}
public class ToastQueue : StdQueue
{
public ToastQueue()
: base("toasts")
{
}
}
}
我們還需要為表存儲設置一個包裝,以便在用戶登錄到站點之前可以存儲用戶通知。可以使用 PartitionKey(行集合的標識符)和 RowKey(可唯一標識特定分區中的每個單獨行)組織表數據。選擇 PartitionKey 和 RowKey 使用的數據是在使用表存儲時所做的最重要的設計決策之一。
這些特點允許跨存儲節點進行負載平衡,并在應用程序中提供內置的可伸縮性選項。不考慮數據的數據中心關聯性,使用同一分區鍵的表存儲中的行將保留在相同的物理數據存儲中。因為針對每個用戶存儲對應的消息,所以分區鍵將是 UserName,而 RowKey 則成為標識每行的 GUID(請參見圖 4)。
圖 4 表存儲的包裝
namespace HollywoodHackers.Storage.Repositories
{
public class UserTextNotificationRepository : StorageBase
{
public const string EntitySetName =
"UserTextNotifications";
CloudTableClient tableClient;
UserTextNotificationContext notificationContext;
public UserTextNotificationRepository()
: base()
{
tableClient = new CloudTableClient
(StorageBase.TableBaseUri, StorageBase.Credentials);
notificationContext = new UserTextNotificationContext
(StorageBase.TableBaseUri,StorageBase.Credentials);
tableClient.CreateTableIfNotExist(EntitySetName);
}
public UserTextNotification[]
GetNotificationsForUser(string userName)
{
var q = from notification in
notificationContext.UserNotifications
where notification.TargetUserName ==
userName select notification;
return q.ToArray();
}
public void AddNotification
(UserTextNotification notification)
{
notification.RowKey = Guid.NewGuid().ToString();
notificationContext.AddObject
(EntitySetName, notification);
notificationContext.SaveChanges();
}
}
}
因為我們的存儲機制已經確定,所以我們需要一個工作者角色作為引擎;以便在我們的電子商務站點的后臺處理消息。為此,我們定義了一個從 Microsoft.ServiceHosting.ServiceRuntime.RoleEntryPoint 類繼承的類,并將其與云服務項目中的工作者角色關聯(請參見圖 5)。
圖 5 作為引擎的工作者角色
public class WorkerRole : RoleEntryPoint
{
ShoppingCartQueue cartQueue;
ToastQueue toastQueue;
UserTextNotificationRepository toastRepository;
public override void Run()
{
// This is a sample worker implementation.
//Replace with your logic.
Trace.WriteLine("WorkerRole1 entry point called",
"Information");
toastRepository = new UserTextNotificationRepository();
InitQueue();
while (true)
{
Thread.Sleep(10000);
Trace.WriteLine("Working", "Information");
ProcessNewTextNotifications();
ProcessShoppingCarts();
}
}
private void InitQueue()
{
cartQueue = new ShoppingCartQueue();
toastQueue = new ToastQueue();
}
private void ProcessNewTextNotifications()
{
CloudQueueMessage cqm = toastQueue.GetMessage();
while (cqm != null)
{
ToastQueueMessage message =
QueueMessageBase.FromMessage
toastRepository.AddNotification(new
UserTextNotification()
{
MessageText = message.MessageText,
MessageDate = DateTime.Now,
TargetUserName = message.TargetUserName,
Title = message.Title
});
toastQueue.DeleteMessage(cqm);
cqm = toastQueue.GetMessage();
}
}
private void ProcessShoppingCarts()
{
// We will add this later in the article!
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
DiagnosticMonitor.Start("DiagnosticsConnectionString");
// For information on handling configuration changes
// see the MSDN topic at
//http://go.microsoft.com/fwlink/?LinkId=166357.
RoleEnvironment.Changing += RoleEnvironmentChanging;
return base.OnStart();
}
private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
{
// If a configuration setting is changing
if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
{
// Set e.Cancel to true to restart this role instance
e.Cancel = true;
}
}
}
讓我們看一下工作者角色代碼。在初始化和設置所需的隊列和表存儲之后,此代碼將進入一個循環。每 10 秒鐘,它就會處理一次隊列中的消息。每次我們通過處理循環時都將獲取隊列中的消息,直到最終返回 null,這表示隊列為空。
您從隊列中看到的消息永遠不會超過 20 個,如果不信,您可以反復嘗試來驗證一下。對隊列進行的任何處理都有時間限制,必須在該時間范圍內對每個隊列消息執行有意義的操作,否則隊列消息將被視為超時,并在隊列中顯示備份,以便可以由其他工作者來處理此消息。每個消息都會作為用戶通知添加到表存儲中。關于工作者角色需要記住的重要一點是:一旦入口點方法完成,工作者角色也就結束了。這就是您需要在一個循環中保持邏輯運行的原因。
從客戶端的角度來說,我們需要能夠以 JSON 形式返回消息,以便 jQuery 可以異步輪詢并顯示新的用戶通知。為此,我們會將一些代碼添加到消息控制器中,以便可以訪問這些通知(請參見圖 6)。
圖 6 以 JSON 形式返回消息
public JsonResult GetMessages()
{
if (User.Identity.IsAuthenticated)
{
UserTextNotification[] userToasts =
toastRepository.GetNotifications(User.Identity.Name);
object[] data =
(from UserTextNotification toast in userToasts
select new { title = toast.Title ?? "Notification",
text = toast.MessageText }).ToArray();
return Json(data, JsonRequestBehavior.AllowGet);
}
else
return Json(null);
}
在 Visual Studio 2010 beta 2 下的 ASP.NET MVC 2(我們用于撰寫本文的環境)中,如果沒有 JsonRequestBehavior.AllowGet 選項,您無法將 JSON 數據返回到 jQuery 或其他客戶端。在 ASP.NET MVC 1 中,不需要此選項。現在,我們可以編寫 JavaScript,它每 15 秒將調用一次 GetMessages 方法并將以 toast 形式消息顯示通知(請參見圖 7)。
圖 7 以 toast 形式消息顯示的通知
$(document).ready(function() {
setInterval(function() {
$.ajax({
contentType: "application/json; charset=utf-8",
dataType: "json",
url: "/SystemMessage/GetMessages",
success: function(data) {
for (msg in data) {
$.gritter.add({
title: data[msg].title,
text: data[msg].text,
sticky: false
});
}
}
})
}, 15000)
});
提交和處理購物車
在我們的示例應用程序中,我們希望使用隊列存儲執行的另一個關鍵方案是提交購物車。Hollywood Hackers 有一個第三方履行系統(Hollywood Hackers 無法在其空間有限的倉庫中保留所有小工具),所以引擎需要對購物車進行一些處理。一旦引擎完成其處理,它會向用戶通知隊列提交一個消息,告知用戶已經對購物車進行了處理(或者出現了問題)。如果處理購物車時用戶處于在線狀態,該用戶將收到系統彈出的一個 toast 消息。如果用戶不在線,則會在其下次登錄到該站點時收到該彈出消息,如圖 8 所示。
圖 8 示例用戶通知
查看原圖(大圖)
我們首先需要的是一些包裝類,使我們可以與購物車隊列交互。這些包裝非常簡單,如果要查看它們的源代碼,可以在 CodePlex 站點上查看。
與標準 CRUD(創建、讀取、更新、刪除)存儲庫不同的是,隊列中的讀取操作不是單純的讀取操作。請記住,只要獲取隊列中的消息,必須在有限的時間內處理該消息,操作失敗或刪除消息都會顯示處理完成。這種模式不能順利地轉換到存儲庫模式,所以我們已經不再借助包裝類執行此操作。
現在,我們已經擁有了要與購物車隊列交互的代碼,我們可以將一些代碼放在購物車控制器中,以便將購物車內容提交到隊列中(請參見圖 9)。
圖 9 向隊列提交購物車
public ActionResult Submit()
{
ShoppingCartMessage cart = new ShoppingCartMessage();
cart.UserName = User.Identity.Name;
cart.Discounts = 12.50f;
cart.CartID = Guid.NewGuid().ToString();
List
items.Add(new ShoppingCartItem()
{ Quantity = 12, SKU = "10000101010",
UnitPrice = 15.75f });
items.Add(new ShoppingCartItem()
{ Quantity = 27, SKU = "12390123j213",
UnitPrice = 99.92f });
cart.CartItems = items.ToArray();
cartQueue.AddMessage(cart);
return View();
}
在實際情況下,您可能會從進程外狀態(例如會話存儲、緩存或窗體發布)獲得購物車。為了簡化本文代碼,我們僅僅構建了購物車的內容。
最后,購物車內容已處于隊列中,我們可以修改工作者角色,以便它可以定期檢查隊列中掛起的購物車。它每次會從隊列中選擇一個購物車,用整整一分鐘對該購物車進行處理,然后向用戶通知隊列提交一個消息,告知用戶已經對該購物車進行了處理(請參見圖 10)。
圖 10 檢查隊列中掛起的購物車
private void ProcessShoppingCarts()
{
CloudQueueMessage cqm = cartQueue.GetMessage();
while (cqm != null)
{
ShoppingCartMessage cart =
QueueMessageBase.FromMessage
toastRepository.AddNotification(new UserTextNotification()
{
MessageText = String.Format
("Your shopping cart containing {0} items has been processed.",
cart.CartItems.Length),
MessageDate = DateTime.Now,
TargetUserName = cart.UserName
});
cartQueue.DeleteMessage(cqm);
cqm = cartQueue.GetMessage();
}
}
經過對用戶通知表中的隊列消息的存取操作,位于主頁面中的 jQuery Gritter 代碼然后會在下一個 15 秒的輪詢周期中檢測是否存在新消息,然后向用戶顯示購物車 toast 通知。
總結和后續操作
本文的目的是使開發人員可以拋開其有形的數據中心這條“安全毛毯”,認識到他們可以使用 Windows Azure 執行很多操作,而不僅僅是創建簡單的“Hello World”網站。借助 Windows Azure Queues 和 Windows Azure 表存儲的強大功能,以及利用這些強大功能在應用程序和其工作者角色之間進行異步消息傳送,您可以真正使用 Windows Azure 增強應用程序的引擎了。
為使文章簡明易懂,我們將很多代碼都保留為原樣,沒有進行重構。作為熟悉新 Windows Azure Muscle 的練習,請嘗試重構本文中的一些代碼,以加深對隊列的熟練使用,甚至創建一個獨立的程序集,其中包含為任何 ASP.NET MVC 網站進行異步消息傳送和通知所需的所有代碼。
主要是親自動手實踐,創建一些站點并看看您都可以執行哪些操作。本文中的代碼位于 Hollywood Hackers 的 CodePlex 站點中。
評論
查看更多