BlockingQueue.cs 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. using System.Collections.Concurrent;
  2. using System.Threading;
  3. namespace SKMC.Api.Common.Types
  4. {
  5. /// <summary>
  6. /// 阻塞队列
  7. /// </summary>
  8. /// <typeparam name="T"></typeparam>
  9. public class BlockingQueue<T>
  10. {
  11. private readonly ConcurrentQueue<T> queue;
  12. private readonly ManualResetEvent manualResetEvent;
  13. public BlockingQueue()
  14. {
  15. queue = new ConcurrentQueue<T>();
  16. manualResetEvent = new ManualResetEvent(false);
  17. }
  18. public int Count { get => queue.Count; }
  19. public void Add(T item)
  20. {
  21. queue.Enqueue(item);
  22. manualResetEvent.Set();
  23. }
  24. public T Take()
  25. {
  26. bool isEmpty = false;
  27. T item = default(T);
  28. while (true)
  29. {
  30. if (queue.Count > 0)
  31. {
  32. queue.TryDequeue(out item);
  33. manualResetEvent.Reset();
  34. }
  35. else
  36. {
  37. isEmpty = true;
  38. }
  39. if (item != null) return item;
  40. if (isEmpty) manualResetEvent.WaitOne();
  41. }
  42. }
  43. public void Stop() => manualResetEvent.Set();
  44. }
  45. }