C#实现一个万物皆可排序的队列

需求

产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。

分析

一个很自然的想法就是将多条数据一起发送,这里有几个关键点:

1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。

2、数据到期判断方法:既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。

3、区分客户:不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便地聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。

4、数据的安全性问题:如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。

实现

上文提到可排序的数据结构,可以使用SortedList<TKey,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。

对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。

下边是它们的定义:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = DateTime.Now;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    keys.Add(key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加锁。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = DateTime.Now;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = first.Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送的数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。

再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    publishTasks.Add(Task.Factory.StartNew(() =>
    {
        int k = 0;
        while (true)
        {
            queue.Publish($"key_{k}", $"value_{j}_{k}");
            Thread.Sleep(15);
            k++;
        }
    }, TaskCreationOptions.LongRunning));
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var list = queue.Pull(100);
        if (list.Count <= 0)
        {
            Thread.Sleep(100);
            continue;
        }

        foreach (var item in list)
        {
            Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
        }
    }

}, TaskCreationOptions.LongRunning);

Task.WaitAll(publishTasks.ToArray());

以上就是针对这个特定需求实现的一个按照时间进行排序的队列。

万物皆可排序的队列 我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。

我这里把主要代码贴出来(完整代码和示例请看文末):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                keys.Add(key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }


    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。

再说数据安全

因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。

来看看数据丢失可能发生的两种情况:

一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考Kafka。

二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。

这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。

本文链接:https://www.dzdvip.com/34418.html 版权声明:本文内容均来源于互联网。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 395045033@qq.com,一经查实,本站将立刻删除。
(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022年7月4日 22:22
下一篇 2022年7月5日 21:39

相关推荐

  • 网站建设制作流程有哪些?

    拥有一个属于自己的网站,似乎是一件挺不错的事情,你可以做个公司类型的网站宣传业务,也可以做个个人类型的网站展示自我。 那么对于完全没有建站经验的人来说,如何做网站呢? 自己做网站有两种方式。 第一种建站方式:整套网站都由自己搭建 第一步:注册域名 注册域名比较简单,一般是到阿里云或腾讯云上注册,新用户会有所优惠。 第二步:选择服务器 首先要根据自己的网站规模,即预期达到的日流量,内容存储量等因素,去选择服务器,简单举例说明,如果做一个个人的博客网站,预期达到每天三四百人的日访问量,选择一台最低配置的服务器就足够啦。 另外购买服务器要选择国内、中国香港和国外三个区域,购买中国香港和国外的服务器不需要网站域名备案。 第三步:网站域名备案 如果你刚好选择了国内的服务器就需要给网站域名备案了,不备案的网站是没有办法上线的,而备案的流程并不复杂,但是由于各地的政策不同,备案审核的时间在3——15天之间,需要提交准备备案,避免错过网站上线时机。 第四步:选择网站程序和网页模板 有人以为做网站,需要自己开发程序和设计网页模板,其实是完全没有必要的,你可以找到市面上许多种能满足你建站需求的、开源且免费的建站程序,同时基于你选择的建站程序本身,你也能找到适合的网页模板,如果实在找不到合适的网页模板,再去考虑设计网页,但是自己设计网页对新手来说几乎是不现实的事情,这个时候,你可以尝试学习仿站,也可以找有经验的网页设计师帮助你设计网页或者仿站。 最后,满足了以上步骤,就可以绑定域名,开通网站了,开通网站以后,记得要及时的在网站上设置栏目和发布内容哟! 第二种建站方式:基于有丰富成品网站模板的自助建站平台搭建网站 第一步:注册域名 第二步:网站域名备案 第三步:选择模板 最后,绑定域名,开通网站,当然,也一定要及时的在网站上设置栏目和发布内容哟! 另外,这一种建站方式我们没有提到服务器,因为自助建站平台本身就有服务器,供所有用户共享使用。缺点是没有独立ip,配置无法调整。 说到最后,如果你问我,希望新手选择哪种建站方式,我当然推荐第一种,从SEO优化,从个性化网页,从功能体验等等方面都优于第二种建站方式。但是第二种建站方式,为何也是主流的建站方式呢?只因为它操作起来特别简单,用户考虑的问题不多,再加上有丰富的企业网站模板库,反而很适合自助建站平台去推销给那些急需建站的企业用户。

    2021年9月12日
    18
  • 教你把被骗的钱追回来(被骗的钱怎么才能追回来)

    经常在网上看到有人提问:我被诈骗了,钱怎么追回来。今天我要分享的话题是“如果被诈骗了,怎样把钱追回来?” 现在我们的微信都实名认证的,如果说对方需要收到钱的时候呢,他就需要经过实名认证,绑定银行卡等信息,这样他收到的钱才能提现到银行卡上,所以说如果说我们在微信上被别人骗了,这个时候还被对方给拉黑了,首先我们要把这张截图给保存下来。 保存完之后,我们也要打开他的头像,然后要记住他的微信号码,每一个人的他都是有专用的微信号的。然后我们返回之后,在微信主页右上方的(搜索),我们输入(腾讯110),第一个就是了! 我们点开点开之后,大家可以看到里面有一个网络举报,然后我们再点开。 点击(我要举报),这里面有举报的很多方式,比如说违法违规了,网络诈骗了,网络低俗,暴力赌博,传销,违禁商品,还有青少年身心健康保护。我们被别人骗钱了,一般都是网络诈骗。 这里边儿也有非常多的种类,比如说我们是兼职刷单被诈骗了,还是交友诈骗呢?根据你被骗的种类,你选择一个。 选择进去之后,这里边儿是需要填写我们(本人的姓名,手机号,还有对方的违规账号的id。这边还有补充信息,就是对方的二维码,然后这个获取信息来源也非常的简单,比如说选择是什么平台的)。!!证据截图的上传完成之后呢,我们点击提交就可以啦 上传完信息之后,我们只需要等待腾讯110为我们调查取证就可以了,为了方便腾讯110调查取证,那么我们需要把我们的微信到账时间设置成24小时 需要找到(我——支付——右上角有三个点儿),我们点开,这里面有一个转账到账时间,我们平时默认的是实时到账,就相当于说我们转过去,对方会立马收到,我们需要把我们的到账时间改成24小时到账就可以了 怎么样,大家都学会没有?觉得有用,记得点赞,收藏转发出去,让更多的人了解到! 首先,我要说的是,被诈骗了,把钱追回来的概率比较小。因为据公安部门统计,电信诈骗的破案率一直都在11%左右,去年,也就是2021年,开展了打击治理电信网络新型违法犯罪专项整治行动,破案率大幅上升,超过30%。但追赃挽损率仍是较低,10%左右。 为什么?第一个原因:是因为诈骗分子难抓。诈骗分子大部分常年身在国外,行踪飘忽不定,而且使用假身份、假IP、假地址、定位非常难,给公安抓捕带来很大困难。有的诈骗犯罪集团甚至还有国外武装势力庇护,抓捕更是难上加难。 第二个原因:是追缴赃款难。诈骗分子一旦诈骗得手…

    2022年7月16日
    15
  • 支付宝和微信支付的最大对手(数字人民币是什么意思)

    “什么叫作数字人民币?数字人民币指的就是以数字的形式来发行的法定货币。通俗的讲就是把现金电子数字化,通过电子支付的形式来表现。数字人民币是中国法定数字人民币面对个人客户开展试点的官方服务平台,该平台提供了数字人民币个人钱包的开通以及管理,兑换和流通服务。 数字人民币,改变超乎想象,或将颠覆全球货币格局。 随着移动互联网时代到来,数字经济迎来飞速发展。2021年,中国数字经济规模达到7.1万亿美元,位居世界第二。数据资产不断增长,那传统货币能够支持其市场化吗? 如果将数据资产兑换为传统货币,那必将占用大量货币,导致经济通缩。所以,数字货币的推出是势在必行,它可以作为数据资产的“锚”。 我国央行在2014年开始研、发数字人民币,2019年开启了试点。如今,数字人民币的功能不断优化,使用人数和交易金额不断上涨,全面普及的时代即将到来。 01 数字人民币是支付宝和微信支付的最大对手? 提到数字人民币,还没有使用过的人群可能不太明白是什么。其实,数字人民币可以理解为“线上的纸币”,它也有法定货币的地位。 数字人民币作为一种货币,自然是有交易功能的。所以,数字人民币可以用来支付各种消费,同时和现金一样,不依赖于互联网,可以实现“双离线”支付。总结起来,数字人民币有几个特点,一是安全,因为是法定货币;二是不计利息,无论是消费,还是转账都是不收利息的;三是不需要网络,不需要扫码,碰一碰就可以完成交易。 看到这几个功能,很多人觉得这不是对支付宝和微信支付形成了降维打击吗?必定会冲击它们的市场。实际上,两者完全不在一个维度,数字人民币是钱,而支付宝和微信支付是钱包,是交易工具。另外,数字人民币和支付宝、微信支付已经相互打通。 支付宝和微信支付完全不用担心,它们已经是中国移动支付领域使用率最高的两个工具,消费者已经形成了习惯,很难改变。而且,这两家有完整的金融生态系统,更是留住用户的关键。 实际上,国家队要和它们抢市场,之前银联就推出了“云闪付”。加快推进数字人民币建设,国家有着更大的战略目标。 02 数字人民币,改变将超乎想象 数字人民币瞄准的不是国内,而是全球。 过去几十年来,全球都是以美元为核心的货币交易体系,还建立了以美元为主的环球银行金融电信协会(SWIFT),覆盖全球200多个国家,连接1.15万家金融机构。 这几年,人民币加快国际化进程,在2021年底,人民币国际支付份额达…

    2022年10月13日
    33
  • 广点通账户怎么搭建?广点通操作技巧

    Sem竞价从07年开始,如今已经发展10年的历程了, 随着信息流广告兴起,越来越多的公司开始注重新渠道的拓展,广点通、 今日头条、广点通等一起账户陆续开下来了,那么公司谁负责新渠道的账户?账户很有可能就落到了SEM出身的竞价员手上,公司广点通的账户怎么操作?账户搭建思路跟sem竞价有什么区别?如果你也曾苦恼过,请跟小编一起来看看广点通账户的操作技巧吧。 一、广点通跟信息流广告有什么区别? 信息流又叫feed流,是主动型推送广告。信息流的广告有很多种, 比如:今日头条、一点咨询、广点通(腾讯信息客户端、天天快报)、新浪扶翼(新浪新闻客户端)、搜狐汇算(搜狐新闻客户端)、网易新闻客户端, 这些都是以资讯为主的信息流广告。 微博粉丝通/粉丝头条、广点通、陌陌,这些是以社交媒体为主的信息流广告。所以信息流与广点通是一种包含与被包含的关系。 二、广点通跟SEM竞价有什么关系? 百度、360等搜索引擎营销是被动式的营销,守株待兔式的、被动的等待潜在客户去搜索某一个关键字,才会有可能展示我 们的一条广告,这还取决于你的匹配、你的出价,所以我们把搜索引擎营销,即SEM,说是被动式的营销;而社交新媒体型,比 如 百度信息流、广点通、今日头条等,是一种变被动为主动式的营销方式,通过大数据调取,主动推送这一条广告,去刺激访客 需求,促成转化。广点通跟SEM竞价有什么关系?我们可以通过广点通多重定向和数据整合来实现主动精准投放,为被动搜索型 的SEM引流,所以SEM和DSP是互相配合的“战友”关系,或用个更时髦的词,就是CP关系,是1 1>2的关系。 三、广点通账户怎么搭建? 我们做任何推广前, 都需要对自己的产品做人物画像和需求分析, 越了解受众越能做好营销。我之前写过一片文章《广点 通推广 你的产品适合做广点通吗?》, 是关于人物画像和需求分析的,大家不清楚的可以回顾下。人物画像9要素:性别、职业、学 历、兴趣、收入、年龄、地区、时段、设备,举个例子:我们以ui培训为例,假如是大学生刚毕业,可能最迫切的就是工作、就业, 你推送的广告就可以展示“保障就业“,假如是上班族想提升,看中的就是升职加薪,你推送的广告就可以展示“学员高薪就业“, 对应的文案素材、对应的页面、对应的客服解与答、成单。是不是我们基本的账户结构就出来了?回顾下我们做竞价账户的思路,是 不是雷同?所以我们做账户的思…

    2021年6月21日
    18
  • SEM推广投放三步曲

    推广分析”中,我们对行业本身的客观属性、行业覆盖的目标客户特征、行业内部推广投放的特征进行了综合的分析。那么接下来,我们就根据前期的分析思路,真刀实枪的来策划一个漂亮的推广方案了。

    2021年5月15日
    33
  • 给正在创业的人思考分享两点

    我认为如今的创业大自然环境有相应的虚假宣传。我近期遇到许多不可靠的创业者,说难听的,有的连为人处事的基础道理都不明白的人还要创业。

    2021年5月15日
    15