怎么在ASP.NET Core 3.x 中实现并发限制?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services)
    {
      services.AddQueuePolicy(options =>
      {
        //较大并发请求数
        options.MaxConcurrentRequests = 2;
        //请求队列长度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
      //添加并发限制中间件
      app.UseConcurrencyLimiter();
      app.Run(async context =>
      {
        Task.Delay(100).Wait(); // 100ms sync-over-async
        await context.Response.WriteAsync("Hello World!");
      });
      if (env.IsDevelopment())
      {
        app.UseDeveloperExceptionPage();
      }
      app.UseHttpsRedirection();
      app.UseRouting();
      app.UseAuthorization();
      app.UseEndpoints(endpoints =>
      {
        endpoints.MapControllers();
      });
    }通过上面简单的配置,我们就可以将他引入到我们的代码中,从而做并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Actionconfigure) { services.Configure(configure); services.AddSingleton (); return services; } 
QueuePolicy采用的是SemaphoreSlim信号量设计,SemaphoreSlim、Semaphore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定 较大任务数量,当线程请求访问资源,信号量递减,而当他们释放时,信号量计数又递增。
////// 构造方法(初始化Queue策略) /// /// public QueuePolicy(IOptionsoptions) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim来限制任务较大个数 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); } 
ConcurrencyLimiterMiddleware中间件
////// Invokes the logic of the middleware. /// /// The. /// A public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }that completes when the request leaves. 
每次当我们请求的时候首先会调用_queuePolicy.TryEnterAsync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+较大并发请求数),如果当前数量超出了,那么我直接抛出,送你个503状态;
 if (result)
 {
     try
     {
       await _next(context);
     }
     finally
    {
      _queuePolicy.OnExit();
    }
    }
    else
    {
      ConcurrencyLimiterEventSource.Log.RequestRejected();
      ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
      context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
      await _onRejected(context);
    }问题来了,我这边如果说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.
await _serverSemaphore.WaitAsync();异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止
 lock (_totalRequestsLock)
  {
    if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
    {
       return false;
    }
      TotalRequests++;
    }
    //异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止
    await _serverSemaphore.WaitAsync();
    return true;
  }返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();通过该调用进行调用_serverSemaphore.Release();释放信号灯,再对总请求数递减
Stack策略
再来看看另一种方法,栈策略,他是怎么做的呢?一起来看看.再附加上如何使用的代码.
   public void ConfigureServices(IServiceCollection services)
    {
      services.AddStackPolicy(options =>
      {
        //较大并发请求数
        options.MaxConcurrentRequests = 2;
        //请求队列长度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }通过上面的配置,我们便可以对我们的应用程序执行出相应的策略.下面再来看看他是怎么实现的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Actionconfigure) { services.Configure(configure); services.AddSingleton (); return services; } 
可以看到这次是通过StackPolicy类做的策略.来一起来看看主要的方法
////// 构造方法(初始化参数) /// /// public StackPolicy(IOptionsoptions) { //栈分配 _buffer = new List (); //队列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //较大并发请求数 _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩余可用空间 _freeServerSpots = options.Value.MaxConcurrentRequests; } 
当我们通过中间件请求调用,_queuePolicy.TryEnterAsync()时,首先会判断我们是否还有访问请求次数,如果_freeServerSpots>0,那么则直接给我们返回true,让中间件直接去执行下一步,如果当前队列=我们设置的队列大小的话,那我们需要取消先前请求;每次取消都是先取消之前的保留后面的请求;
public ValueTaskTryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果队列满了,取消先前的请求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); } _queueLength++; // increment _head for next time _head++; if (_head == _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } } 
当我们请求后调用_queuePolicy.OnExit();出栈,再将请求长度递减
  public void OnExit()
    {
      lock (_bufferLock)
      {
        if (_queueLength == 0)
        {
          _freeServerSpots++;
          if (_freeServerSpots > _maxConcurrentRequests)
          {
            _freeServerSpots--;
            throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
          }
          return;
        }
        // step backwards and launch a new task
        if (_head == 0)
        {
          _head = _maxQueueCapacity - 1;
        }
        else
        {
          _head--;
        }
        //退出,出栈
        _buffer[_head].Complete(true);
        _queueLength--;
      }
    }关于怎么在ASP.NET Core 3.x 中实现并发限制问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。
文章名称:怎么在ASP.NETCore3.x中实现并发限制-创新互联
网页地址:http://www.jxjierui.cn/article/hdeoi.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 