首页 \ 问答 \ 从生产者/消费者队列中删除取消的任务(Remove cancelled Task from producer/consumer queue)

从生产者/消费者队列中删除取消的任务(Remove cancelled Task from producer/consumer queue)

我想使用异步生产者/消费者队列(AsyncEx lib)通过总线一次发送一条消息。 现在我只是通过异步阻塞来实现这一点。 它工作正常,但我无法控制队列:(

所以我想出了以下解决方案,问题是没有从队列中删除已取消的任务。 如果我将队列限制为10(因为每个消息需要1s发送,最大队列时间应为10s左右)并且队列包含8个等待任务和2个已取消任务,则下一个排队任务将抛出InvalidOperationException,尽管无论如何都不会发送两个被取消的任务。

也许有更好的方法来做到这一点:D

    class Program
{
    static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
        new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();

    static void Main()
    {
        StartAsync().Wait();
    }

    static async Task StartAsync()
    {
        var sendingTask = StartSendingAsync();
        var tasks = new List<Task>();

        using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
        {
            for (var i = 0; i < 10; i++)
            {
                tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
            }

            try
            {
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages sent.");
            }
            catch (TaskCanceledException)
            {
                Console.WriteLine("At least one task was canceled.");
            }                
        }

        s_Queue.CompleteAdding();
        await sendingTask;
        s_Queue.Dispose();
        Console.WriteLine("Queue completed.");

        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    static async Task EnqueueMessageAsync(string message, CancellationToken token)
    {

        var tcs = new TaskCompletionSource();
        using (token.Register(() => tcs.TrySetCanceled()))
        {
            await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
            Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
            await tcs.Task;
        }
    }

    static async Task SendMessageAsync(string message)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
    }

    static async Task StartSendingAsync()
    {
        while (await s_Queue.OutputAvailableAsync())
        {
            var t = await s_Queue.DequeueAsync();
            if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;

            await SendMessageAsync(t.Item1);
            t.Item2.TrySetResult();
        }
    }
}

编辑1:

正如svik指出的那样,只有在队列已经完成时才会抛出InvalidOperationException。 所以这个解决方案甚至没有解决我的等待任务的非托管“队列”的初始问题。 如果有超过10个调用/ 10s,我得到一个完整的队列和一个额外的非托管“队列”等待任务,就像我的异步阻塞方法(AsyncMonitor)。 我想我必须提出一些其他的解决方案......

编辑2:

我有N个不同的消息生成器(我不知道有多少消息因为它不是我的代码)而且只有一个消费者通过总线发送消息并检查它们是否被正确发送(不是真正的字符串消息)。

以下代码模拟了代码应该中断的情况(队列大小为10):

  1. 排队10条消息(超时为5秒)
  2. 等待5秒(发送消息0-4,取消消息5-9)
  3. 排队11条新消息(没有超时)
  4. 消息10-19应该排队,因为队列只包含已取消的消息
  5. 消息20应抛出异常(例如QueueOverflowException),因为队列已满,生成器代码将处理或不处理

制片人:

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
    for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
    await Task.Delay(TimeSpan.FromSeconds(5));
    for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }

    try
    {
        await Task.WhenAll(tasks);
        Console.WriteLine("All messages sent.");
    }
    catch (TaskCanceledException)
    {
        Console.WriteLine("At least one task was canceled.");
        Console.WriteLine("Press any key to complete queue...");
        Console.ReadKey();
    }
}

我的目标是,我想完全控制应该发送的所有消息,但在我之前发布的代码中并非如此,因为我只能控制队列中的消息而不是消息等待入队(可能有10000条异步等待入队的消息,我不知道=>生产者代码无论如何也无法正常工作,因为它需要永远发送所有等待的消息......)

我希望这能让我更清楚我想达到的目标;)


I want to use an async producer/consumer queue (AsyncEx lib) to send messages one at a time over a bus. Right now I achieve this simply by async blocking. It's working fine, but I have no control over the queue :(

So I came up with following solution, problem is that a canceled task is not removed from the queue. If I limit the queue to say 10 (because each message takes 1s to send and max queue time shall be 10s or so) and the queue contains already 8 waiting tasks and 2 canceled tasks, than the next queued task would throw an InvalidOperationException although the two canceled task wouldn't be sent anyway.

Maybe there is a better way to do this :D

    class Program
{
    static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
        new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();

    static void Main()
    {
        StartAsync().Wait();
    }

    static async Task StartAsync()
    {
        var sendingTask = StartSendingAsync();
        var tasks = new List<Task>();

        using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
        {
            for (var i = 0; i < 10; i++)
            {
                tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
            }

            try
            {
                await Task.WhenAll(tasks);
                Console.WriteLine("All messages sent.");
            }
            catch (TaskCanceledException)
            {
                Console.WriteLine("At least one task was canceled.");
            }                
        }

        s_Queue.CompleteAdding();
        await sendingTask;
        s_Queue.Dispose();
        Console.WriteLine("Queue completed.");

        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    static async Task EnqueueMessageAsync(string message, CancellationToken token)
    {

        var tcs = new TaskCompletionSource();
        using (token.Register(() => tcs.TrySetCanceled()))
        {
            await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
            Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
            await tcs.Task;
        }
    }

    static async Task SendMessageAsync(string message)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
        Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
    }

    static async Task StartSendingAsync()
    {
        while (await s_Queue.OutputAvailableAsync())
        {
            var t = await s_Queue.DequeueAsync();
            if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;

            await SendMessageAsync(t.Item1);
            t.Item2.TrySetResult();
        }
    }
}

Edit 1:

As svik pointed out the InvalidOperationException is only thrown if the queue is already completed. So this solution doesn't even solve my initial problem of an unmanaged "queue" of waiting tasks. If there are e.g. more than 10 calls/10s I got a full queue and an additional unmanaged "queue" of waiting tasks like with my async blocking approach (AsyncMonitor). I guess I have to come up with some other solution then...

Edit 2:

I have N different producers of messages (I don't know how many there are because it's not my code) and only one consumer that sends the messages over a bus and checks if they were sent correctly (not really string messages).

The following code simulates a situation where the code should break (queue size is 10):

  1. Enqueue 10 messages (with an timeout of 5sec)
  2. Wait 5sec (message 0-4 were sent and message 5-9 were cancelled)
  3. Enqueue 11 new messages (w/o timeout)
  4. Message 10 - 19 should be enqueued because the queue only contains cancelled messages
  5. Message 20 should throw an exception (e.g. QueueOverflowException) because the queue is full, this would be handled or not by the producer code

Producers:

using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
    for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
    await Task.Delay(TimeSpan.FromSeconds(5));
    for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }

    try
    {
        await Task.WhenAll(tasks);
        Console.WriteLine("All messages sent.");
    }
    catch (TaskCanceledException)
    {
        Console.WriteLine("At least one task was canceled.");
        Console.WriteLine("Press any key to complete queue...");
        Console.ReadKey();
    }
}

The goal is, I want to have full control over all messages that should be send, but this is not the case in the code I've posted before, because I only have control over the messages in the queue but not the messages that are waiting to be enqueued (there could be 10000 messages asynchronously waiting to be enqueued and I wouldn't know => producer code wouldn't work as expected anyway because it would take forever to send all the messages that are waiting...)

I hope this makes it clearer what I want to achieve ;)


原文:https://stackoverflow.com/questions/29988340
更新时间:2021-11-18 15:11

最满意答案

事实证明,这是一个非常简单和愚蠢的错误。

看看这行代码。 你能看到错误吗? 几乎每一寸代码(我的调试版本)都花了我几个小时的搜索和大量的println语句,我终于看到了它。 在纠正问题之后,一切都按预期工作(有一些小的添加和代码清理,如果没有路径存在就会捕获......但这绝不是问题)。

if (testNode.getX() == findingX) {
    if (testNode.getY() == findingX) {

有一次我终于看到了这个,我觉得“多么愚蠢!!!”。 你现在看到了吗?


So as it turns out, it was a pretty simple and stupid mistake.

Look at this line of code. Can you see the error? It took me hours of searching and tons and tons of println statements at just about every inch of code (my version of debugging) and I finally saw it. After correcting the issue, everything works as expected (with some minor additions and code cleanup, as well as a catch if no path exists...but this was never the problem).

if (testNode.getX() == findingX) {
    if (testNode.getY() == findingX) {

Once I finally saw this I thought "how stupid!!!". Do you see it now?

相关问答

更多
  • 有用的链接 。 解决这个问题的正确版本是6.12.2.633。 它是DVD根(.iso)中“调试器”文件夹中WDK 7.1的一部分。 或者,抓住WDK 7.1网页安装程序并安装“只”调试工具的Windows(虽然这似乎涉及一百多兆字节的无关材料,不符合安装程序中的任何复选框)。 dlanod评论: 我还发现了msdn-archives ,它可以让你下载独立版本的新版本。 而这个链接确实有效。 (比WDK安装程序好得多) Useful link. The correct version that fixes ...
  • 据我所知,地图中的“ - ”字段是图形节点。 每个' - '节点最多有8个边到相邻的' - '字段。 8如果允许对角移动,否则只有4个相邻的' - '节点有效。 ' - '节点和'|'之间没有边缘 节点。 这足以实现一个简单的深度优先搜索/广度优先搜索,其中您保留一个未经检查的节点队列(深度优先的LIFO,广度优先的FIFO)和访问节点的列表(以避免循环) 。 两种算法都是相对低效的,但可以很容易地进行改进。 我不确定'+'节点的含义是什么。 从一个'+'移动到下一个'+'模式是一个自由行动? 如果是这样, ...
  • 我怀疑你的问题是一个常见的Python问题。 这一行: def alle_parents(knoten, parents = []): 加载模块时创建一个空数组,而不是每次调用该函数时都创建一个空数组。 对alle_parents()的未来调用将重用相同的数组(可能已经增大了)而不是新的空数组! 解决的一个好方法是这样做: def alle_parents(knoten, parents = None): parents = parents or [] I suspect your issue ...
  • 使用A *路径作为静态障碍物的一般准则,并对动态(较小)障碍物执行局部障碍物避免 。 雷诺兹还有一个解决瓶颈问题的算法。 他称之为排队 。 Use the A* path as a general guideline around static obstacles and perform local Obstacle Avoidance for dynamic (smaller) obstacles. Reynolds also has an algorithm for the bottleneck-pro ...
  • 通过使用加权A *,您总是可以权衡时间优化。 加权A * [或A * epsilon],预计会找到比A *更快的路径,但路径不会是最优的[但是,它会给你一个最优性的界限,作为你的epsilon /权重的一个参数] 。 You can always have a trade off of time-optimilaity by using a weighted A*. Weighted A* [or A* epsilon], is expected to find a path faster then A*, ...
  • 事实证明,这是一个非常简单和愚蠢的错误。 看看这行代码。 你能看到错误吗? 几乎每一寸代码(我的调试版本)都花了我几个小时的搜索和大量的println语句,我终于看到了它。 在纠正问题之后,一切都按预期工作(有一些小的添加和代码清理,如果没有路径存在就会捕获......但这绝不是问题)。 if (testNode.getX() == findingX) { if (testNode.getY() == findingX) { 有一次我终于看到了这个,我觉得“多么愚蠢!!!”。 你现在看到了吗? S ...
  • 你的意思是他们会在网格上对角线移动吗? 您所要做的就是打开节点扩展代码以扩展(+ 1,+ 1),( - 1,-1),(+ 1,-1)和(-1,+ 1),转动它变成正交而不是基数。 如果你的A *算法是正确的,那么他们会倾向于单个对角线移动,比如说东边然后向北移动。 如果你的意思是自由地跨越更大的区域,那就稍微不同了,而且更难做到。 Do you mean they'd move diagonally on the grid? All you'd have to do is open up the node ...
  • 我认为最简单的策略是让每个鬼都追逐最接近它的球员。 可以使用曼哈顿距离(在寻路问题中存在链接)或欧几里得距离或通过球员的路径长度来计算接近度。 最后一个选项意味着你将不得不计算两个玩家的路径。 尝试所有这些选项,并根据自己的口味选择一种。 另外,在一个侧面说明。 所有回答寻路问题的人都没有提到Dijkstra的算法 ,它甚至比BFS慢:)但是只允许搜索一次所有最短路径。 也就是说,如果你实现了A *或BFS并且有n个重影,你至少会做n个寻路查询。 使用Dijkstra,你只能从玩家开始一次。 但这一切都依赖 ...
  • 从开放列表中取出的节点必须是F cost最小的节点。 这使得使用链接列表成为一个糟糕的选择 - 您必须完全搜索它以提取节点,或者您必须完全搜索它以插入节点。 但是,糟糕的选择与否,这是不正确的,因为你们两个都没做。 此外,如果邻居已经在打开列表中,您必须比较G分数并在新路径更好的情况下重新使用它。 使用关闭集合的链接列表也是一个糟糕的选择,您只需要“添加”和“包含”,并且包含对于链接列表是可怕的。 但这并不影响正确性。 The node that you take off the open list mus ...
  • 如有疑问,请尝试浏览测试用例,看看出了什么问题。 假设我们在第二张图像中, x = 13, y = 12 if (!(x == pVal.x && y == pVal.y) && map.grid[x, y].passable ) (13,12)不是我们的目标点,并且可以通过,所以我们通过了这个测试,然后进入下一行...... float dot = 1; var heading = (grid[x, y].position - t.position).normalized; heading.y = 0; ...

相关文章

更多

最新问答

更多
  • 您如何使用git diff文件,并将其应用于同一存储库的副本的本地分支?(How do you take a git diff file, and apply it to a local branch that is a copy of the same repository?)
  • 将长浮点值剪切为2个小数点并复制到字符数组(Cut Long Float Value to 2 decimal points and copy to Character Array)
  • OctoberCMS侧边栏不呈现(OctoberCMS Sidebar not rendering)
  • 页面加载后对象是否有资格进行垃圾回收?(Are objects eligible for garbage collection after the page loads?)
  • codeigniter中的语言不能按预期工作(language in codeigniter doesn' t work as expected)
  • 在计算机拍照在哪里进入
  • 使用cin.get()从c ++中的输入流中丢弃不需要的字符(Using cin.get() to discard unwanted characters from the input stream in c++)
  • No for循环将在for循环中运行。(No for loop will run inside for loop. Testing for primes)
  • 单页应用程序:页面重新加载(Single Page Application: page reload)
  • 在循环中选择具有相似模式的列名称(Selecting Column Name With Similar Pattern in a Loop)
  • System.StackOverflow错误(System.StackOverflow error)
  • KnockoutJS未在嵌套模板上应用beforeRemove和afterAdd(KnockoutJS not applying beforeRemove and afterAdd on nested templates)
  • 散列包括方法和/或嵌套属性(Hash include methods and/or nested attributes)
  • android - 如何避免使用Samsung RFS文件系统延迟/冻结?(android - how to avoid lag/freezes with Samsung RFS filesystem?)
  • TensorFlow:基于索引列表创建新张量(TensorFlow: Create a new tensor based on list of indices)
  • 企业安全培训的各项内容
  • 错误:RPC失败;(error: RPC failed; curl transfer closed with outstanding read data remaining)
  • C#类名中允许哪些字符?(What characters are allowed in C# class name?)
  • NumPy:将int64值存储在np.array中并使用dtype float64并将其转换回整数是否安全?(NumPy: Is it safe to store an int64 value in an np.array with dtype float64 and later convert it back to integer?)
  • 注销后如何隐藏导航portlet?(How to hide navigation portlet after logout?)
  • 将多个行和可变行移动到列(moving multiple and variable rows to columns)
  • 提交表单时忽略基础href,而不使用Javascript(ignore base href when submitting form, without using Javascript)
  • 对setOnInfoWindowClickListener的意图(Intent on setOnInfoWindowClickListener)
  • Angular $资源不会改变方法(Angular $resource doesn't change method)
  • 在Angular 5中不是一个函数(is not a function in Angular 5)
  • 如何配置Composite C1以将.m和桌面作为同一站点提供服务(How to configure Composite C1 to serve .m and desktop as the same site)
  • 不适用:悬停在悬停时:在元素之前[复制](Don't apply :hover when hovering on :before element [duplicate])
  • 常见的python rpc和cli接口(Common python rpc and cli interface)
  • Mysql DB单个字段匹配多个其他字段(Mysql DB single field matching to multiple other fields)
  • 产品页面上的Magento Up出售对齐问题(Magento Up sell alignment issue on the products page)