Unit Of Work: 每 http 请求一次事务

事务管理很困难,但不一定总会让人头疼。 在本文中,我将展示如何为 WEBAPI RESTful 端点的每次 http 请求(或一次服务器往返)建立事务。

通常,API 构造业务逻辑端点是:

  • api/users/register
  • api/orders/process
  • api/cart/checkout

这将业务逻辑的很大一部分隐藏在外观(或界面)后面,并在领域内以托管代码进行处理。

理想情况下,这些逻辑部分封装在一个执行单元中,因此,如果出现问题,可以撤消对数据库的所有修改,然后重试。 这称为数据库事务

例如,映像一个业务逻辑 API 管道:

  • 将临时记录添加到数据库
  • 检查外部 API,看看我们是否可以继续
  • 从数据库中删除临时记录
  • 执行业务逻辑,在数据库中生成 3 条新记录
  • 从数据库中获取新存储项
  • 序列化为 JSON 用于 HTTP 响应

以下为上述工作流程的视图效果:

业务逻辑 API 管道
业务逻辑 API 管道

假设这些步骤任何一个都可能失败。 数据库可能处于脱机状态,或者外部端点可能返回 500 错误,可能无权从数据库中删除记录,JSON 序列化可能失败,等等。

最重要的是,你可能不想要数据库中任何不一致的数据,但这很难调试和解决,从而导致混乱和污染。

一种(简单的)解决方案是将所有内容封装到一个数据库事务中,在万一发生故障时选择回滚。

为此,我们可以考虑一下 UOW(Unit Of Work,工作单元),它将跨越 1 个业务逻辑(创建用户,签出购物车,处理订单)。

工作单元将业务逻辑封装在事务中,并在失败(异常)的情况下提交(保存到数据库)事务或回滚(放弃更改)。

api-工作单元
api-工作单元

为了将其应用于 ASP.NET Core 项目,我们可以将其转换为ActionFilter。如果看 .NET Core 项目的过滤器和中间件管道,我们可以看到相似之处。

Filter and Middleware pipeline
Filter and Middleware pipeline

当请求进入时,它由管道处理,执行中间件,最后到执行你的 WEBAPI 控制器。路由中间件根据路由(/api/users/234 → UsersController.GetUserById(234))确定需要实例化哪个Controller。

过滤器有几种,其中大多数是自我解释的。每种过滤器都有其用途,通常可以根据 IActionFilter 接口编写 ActionFilters。 或者,你也可以编写自己的 ActionFilterAttribute,以便可以将其用作控制器类或控制器方法上的特性(Attribute)。

对于工作单元,我将编写一个 UnitOfWorkFilter,它将直接永久地放入管道中。这样就可以在每个 REST 调用上执行它。

using System; 
using System.Data; 
using System.Threading.Tasks; 
using Microsoft.AspNetCore.Mvc.Filters; 
///
namespace Pzy
{ 
   public class UnitOfWorkFilter : IAsyncActionFilter 
   { 
       private readonly IDbTransaction transaction; 
       public UnitOfWorkFilter(IDbTransaction transaction) 
       { 
           this.transaction = transaction; 
       } 
       public async Task OnActionExecutionAsync(ActionExecutingContext context, ActionExecutionDelegate next) 
       { 
           var connection = transaction.Connection; 
           if (connection.State != ConnectionState.Open) 
               throw new NotSupportedException("The provided connection was not open!"); 
           var executedContext = await next.Invoke(); 
           if (executedContext.Exception == null) 
           { 
               transaction.Commit(); 
           } 
           else 
           { 
               transaction.Rollback(); 
           } 
       } 
   } 
}

IDbTransaction 被注入,我们将在下面的部分中讨论注入,现仅假设它在创建类时提供。

action(REST 调用)执行后,我们将从事务中获取连接,检查该连接是否处于打开状态,然后调用管道中的下一项。 这可以是另一个action过滤器,也可以是控制器方法或修改响应(JSON,XML,CSV)的结果过滤器。

请注意,下一个调用将执行其后继者,并且该调用者将在管道中调用其后继管道,直到请求链完成。 每当完成时,程序都将在第 20 行的UnitOfWorkFilter中捕获它,并检查是否发生异常。 如果发生异常,则回滚事务(第23行),否则将变更提交到数据库。

现在介绍如何将其注册到管道中。 这是通过 ASP.NET Core 的内置依赖注入机制完成的。

DI 的麻烦事是弄清楚你的注册服务需要支持的生命周期。ASP.NET Core 的 DI 库提供以下生命周期:

  • Singleton
  • Scoped
  • Transient

如果需要有关 ASP.NET Core 的 DI 更多详细信息,请查看官方 msdn 页面。但是,本质上...

Singleton

(ASP.NET Core 中)单例服务在应用程序启动时被创建一次(并且只创建一次)。这使得它非常适合缓存服务或计算引擎。例如,后者是为相同提供的输入生成相同输出的服务。这也可以被认为是"纯函数"或纯函数的集合。

你可能会问,为什么不使用静态类代替单例服务?

尽管对于应用程序域(如单例服务),静态类实例化一次是事实,但 DI 库提供了一种为特定类型的服务注册实现的方法。这允许您在运行测试时注册虚假或模拟实现,并且需要单例服务的特定输出。

Scoped

每次请求进入时,都会实例化作用域服务,每个请求都会获取其自己的服务实例。请求终止后,将释放服务。

这是 UnitOfWork 的的理想之选,因为它需要每个请求是唯一的,并且需要跨越请求的生存期。事务也一样,也需要是每个请求的资源。其他类型的作用域服务可能是,应用程序上下文中的经过身份验证的用户或特定于请求的其他服务。

Transient

每次请求瞬时服务时,将实例化它。请记住,放入此服务的任何状态仅对本服务的使用者可用,而对任何其他消费服务不可用。

数据库访问类(如存储库)可以这样使用,只要它们不包含共享状态。

依赖注入不仅仅是一个注册表,它还可以管理已注册服务的生命周期,创建实例并将实例化服务注入其使用者。

编写辅助函数来注册同类服务被认为是一种好习惯,也可以使代码更简洁。

应用程序的引导程序是通过 ASP.NET Core 中的 Startup 类完成的,在这里我们将注册我们的服务。 将所有同类的引导程序代码封装到辅助函数中也被认为是一种很好的做法。 这也使代码更整洁。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading.Tasks; 
using Microsoft.AspNetCore.Builder; 
using Microsoft.AspNetCore.Hosting; 
using Microsoft.AspNetCore.HttpsPolicy; 
using Microsoft.AspNetCore.Mvc; 
using Microsoft.Extensions.Configuration; 
using Microsoft.Extensions.DependencyInjection; 
using Microsoft.Extensions.Logging; 
using Microsoft.Extensions.Options; 
using  Pzy.Models; 
///
namespace Pzy
{ 
   public class Startup 
   { 
       public Startup(IConfiguration configuration) 
       { 
           Configuration = configuration; 
       } 
       public IConfiguration Configuration { get; } 
       // This method gets called by the runtime. Use this method to add services to the container. 
       public void ConfigureServices(IServiceCollection services) 
       { 
           services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); 
           var connectionString = Configuration.GetConnectionString("myDb"); 
           services.UseAllOfType<IService>(new[] { typeof(Startup).Assembly }, ServiceLifetime.Scoped); 
           services.UseSqlServer(connectionString); 
           services.UseOneTransactionPerHttpCall(); 
       } 
       // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. 
       public void Configure(IApplicationBuilder app, IHostingEnvironment env) 
       { 
           if (env.IsDevelopment()) 
           { 
               app.UseDeveloperExceptionPage(); 
           } 
           else 
           { 
               app.UseHsts(); 
           } 
           app.UseHttpsRedirection(); 
           app.UseMvc(); 
       } 
   } 
}

考虑第 28 行到第 31 行,我们从配置中获取连接字符串,供以后使用。首先,我们注册所有实现特定类型(IService)的服务,这将是我们的业务逻辑,存储库,服务等,要具有标记接口(如 IService),我们允许从应用程序程序集收集所有服务,并在一行代码中注册它们。

第 30 行和第 31 行表示按请求(http调用)设置 SqlServer 和工作单元(+事务)的同类帮助程序功能。

以下是辅助函数的实现。

using System; 
using System.Data; 
using System.Data.SqlClient; 
using System.Linq; 
using System.Reflection; 
using Microsoft.AspNetCore.Builder; 
using Microsoft.Extensions.Configuration; 
using Microsoft.Extensions.DependencyInjection; 
///
namespace Pzy
{ 
   public static class ServiceCollectionExtensions 
   { 
       public static void UseAllOfType<T>(this IServiceCollection serviceCollection, Assembly[] assemblies, ServiceLifetime lifetime = ServiceLifetime.Scoped) 
       { 
           var typesFromAssemblies = assemblies.SelectMany(a => a.DefinedTypes.Where(x => x.IsClass && x.GetInterfaces().Contains(typeof(T)))); 
           foreach (var type in typesFromAssemblies) 
               serviceCollection.Add(new ServiceDescriptor(type, type, lifetime)); 
       } 
       public static void UseSqlServer(this IServiceCollection serviceCollection, string connectionString) 
       { 
           serviceCollection.AddScoped<IDbConnection>((serviceProvider) => 
           { 
               return new SqlConnection(connectionString); 
           }); 
       } 
       public static void UseOneTransactionPerHttpCall(this IServiceCollection serviceCollection, IsolationLevel level = IsolationLevel.ReadUncommitted) 
       { 
           serviceCollection.AddScoped<IDbTransaction>((serviceProvider) => 
           { 
               var connection = serviceProvider 
                   .GetService<IDbConnection>(); 
               connection.Open(); 
               return connection.BeginTransaction(level); 
           }); 
           serviceCollection.AddScoped(typeof(UnitOfWorkFilter), typeof(UnitOfWorkFilter)); 
           serviceCollection 
               .AddMvc(setup => 
               { 
                   setup.Filters.AddService<UnitOfWorkFilter>(1); 
               }); 
       } 
   } 
}

你现在可以注册服务(IService),并通过构造函数将 IDbTransaction 直接注入服务中。 

using System; 
using System.Collections.Generic; 
using System.Data; 
using System.Linq; 
using System.Threading.Tasks; 
using Dapper; 
///
namespace  Pzy.Models 
{ 
   public class MyItemsRepository : IService 
   { 
       private readonly IDbConnection connection; 
       private readonly IDbTransaction transaction; 
       public MyItemsRepository(IDbTransaction transaction) 
       { 
           this.transaction = transaction; 
           this.connection = transaction.Connection; 
       } 
       public Task<IEnumerable<MyItem>> GetAll() 
       { 
           return Query($"SELECT * FROM MyTable"); 
       } 
       public async Task<MyItem> GetById(Guid id) 
       { 
           return (await Query($"SELECT * FROM MyTable WHERE Id = @id", new { id })).First(); 
       } 
       public Task<IEnumerable<MyItem>> GetBetweenCreatedDateRange(DateTimeOffset start, DateTimeOffset end) 
       { 
           return Query($"SELECT * FROM MyTable WHERE @start <= CreatedTimeStamp AND CreatedTimeStamp <= @end", new { start, end }); 
       } 
       public Task<IEnumerable<MyItem>> Search(string searchTerm) 
       { 
           return Query($"SELECT * FROM MyTable WHERE Description LIKE '%' + @searchTerm + '%'", new { searchTerm }); 
       } 
       public async Task<MyItem> AddOrUpdate(MyItem item) 
       { 
           var sqlStatement = "UPDATE MyTable SET Description = @Description WHERE Id = @Id;"; 
           var isAdd = IsAdd(item); 
           if (isAdd) 
           { 
               item.Id = Guid.NewGuid(); 
               item.CreatedTimeStamp = DateTimeOffset.Now; 
               sqlStatement = "INSERT INTO MyTable(Id, CreatedTimeStamp, Description) VALUES (@Id, @CreatedTimeStamp, @Description);"; 
           } 
           var changes = await connection.ExecuteAsync(sqlStatement, item, transaction); 
           if (changes == -1) 
           { 
               var message = isAdd ? "create new item" : "save item"; 
               throw new InvalidOperationException($"Could {message} with id {item.Id}"); 
           } 
           return item; 
       } 
       public async Task Delete(Guid id) 
       { 
           var changes = await connection.ExecuteAsync("DELETE FROM MyTable WHERE Id = @id;", new { id }, transaction); 
           if (changes == -1) 
           { 
               throw new InvalidOperationException($"Could delete item with id {id}"); 
           } 
       } 
       private bool IsAdd(MyItem item) 
       { 
           return item.Id == Guid.Empty; 
       } 
       private Task<IEnumerable<MyItem>> Query(string query, object param = null) 
       { 
           return connection.QueryAsync<MyItem>(query, param, transaction); 
       } 
   } 
}

我创建了一个带有示例实现和示例测试的仓库,所有代码都可以在这里找到,希望它对你有用。

本文编译自:ASP.NET Core: One transaction per server roundtrip

《Unit Of Work: 每 http 请求一次事务》的相关评论

发表评论

必填项已用 * 标记,邮箱地址不会被公开。