查看: 222|回复: 0

哎呀,我老大写Bug啦——记一次MessageQueue的优化

[复制链接]

该用户从未签到

发表于 2019-11-4 16:58:14 | 显示全部楼层 |阅读模式
  MessageQueue,顾名思义消息队列,在系统开辟中也是用的比较多的一个中间件吧。我们这里紧张用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开辟了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。

之后我们的开辟仍旧井井有条的开辟着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,
                                                                                    

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后颠末几天的修复,终于完成烈自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。
                                                                                    

都说祸不单行,古人是不会骗我们的,Bug怎么会修得完呢?灵活,要是Bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,

what is it ?



来了,今天的主角登场了,我也要开始加班了。
RabbitMQ

  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:


RabbitMQ内存暴涨!使得整个服务器濒临瘫痪,长途登录服务器都差点挤不进去的状态,别看截图如今才1.3G,吃个午饭返来,就2.3G了,可骇不可骇?咋回事?
老板喊你返来加班啦

  先不管了,线上优先管理,手动先Reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitMQ的设置有没有问题,路径在
C:\Users\administrator\AppData\Roaming\RabbitMQ

完全是默认的设置,完全ojbk啊,那到底咋回事?继承检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用


好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,

这个封装的组件内容不多,紧张的文件一目了然,其实就是用到这个两个组件来举行的二次封装来调用

紧张的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:
  1. using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using MessageLib.ClassBean;using EasyNetQ;using System.Threading;namespace MessageLib{    public static class MessageQueue    {        public static IBus bus = MQBusBuilder.CreateMessageBus();        //消息队列        private static Queue NoticQueue = new Queue(5000);        //日志队列        private static Queue LogQueue = new Queue(5000);        //队列数目发布数量        private static int max_count_to_pulish = 1000;        ///         /// 可供外部使用的消息入列操作        ///         public static void push(Item item)        {            if (item.type == ItemType.notic)            {                NoticQueue.Enqueue(item);            }            if (item.type == ItemType.log)            {                LogQueue.Enqueue(item);            }        }        ///         /// 监听后需要调用的发布接口        ///         private static void Pulish(object source, System.Timers.ElapsedEventArgs e)        {            if (NoticQueue.Count > 0 || LogQueue.Count > 0)            {                if (bus == null || !bus.IsConnected)                {                    bus = MQBusBuilder.CreateMessageBus();                }                if (bus.IsConnected)                {                    Send(ItemType.notic);                    Send(ItemType.log);                }            }        }        ///         /// 步伐自运行并开始监听        ///         public static void Run()        {            System.Timers.Timer timer = new System.Timers.Timer();            timer.Interval = 1000;            timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;                timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);                timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;            }        ///         /// 启动线程异步调用        ///         ///         private static void Send(string channelType)        {            Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));            thread.IsBackground = true;            thread.Start(channelType);        }        ///         /// 调用发布日志及提醒两个接口        ///         ///         private static void PublishAction(object channel)        {            PublisLog();            PublisNotic();        }        ///         /// 日志消息发送至RabbitMQ指定exchange、Queue        ///         private static void PublisLog()        {            string channelName = ItemType.log;            try            {                var routingKey = channelName;                var mQQueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");                var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);                while (LogQueue.Count > 0)                {                    Item item = LogQueue.Dequeue();                    if (item != null)                    {                        var properties = new MessageProperties();                        var Message = new Message(Newtonsoft.Json.JsonConvert.SerializeObject(item));                        Message.Properties.AppId = item.appid;                        bus.Advanced.Publish(exchange, routingKey, false, Message);                    }                }            }            catch (Exception ex)            {                throw ex;            }        }        ///         /// 提醒消息发送至RabbitMQ指定exchange、Queue        ///         private static void PublisNotic()        {            string channelName = ItemType.notic;            var routingKey = channelName;            var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));            var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");            var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);            while(NoticQueue.Count > 0)            {                Item item = NoticQueue.Dequeue();                if (item != null)                {                    var properties = new MessageProperties();                    var Message = new Message(Newtonsoft.Json.JsonConvert.SerializeObject(item));                    Message.Properties.AppId = item.appid;                    bus.Advanced.Publish(exchange, routingKey, false, Message);                }            }        }    }}
复制代码
View Code然后我就发现了这一段代码!
  1.         ///         /// 步伐自运行并开始监听        ///         public static void Run()        {            System.Timers.Timer timer = new System.Timers.Timer();            timer.Interval = 1000;            timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;                timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);                timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;            }
复制代码
  1.         ///         /// 启动线程异步调用        ///         ///         private static void Send(string channelType)        {            Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));            thread.IsBackground = true;            thread.Start(channelType);        }
复制代码

  老老大写Bug了,当Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息Send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了N多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是Run()调用多次,后果更加不堪设想。
加班改起来

  开始动手吧,业务紧张推送有普通消息、错误消息和关照消息,那么将队列与线程组装一起,新增一个类QueueTask.cs:
  1. using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using MessageLib.Core;using MessageLib.Core.ClassBean;using EasyNetQ;using EasyNetQ.Topology;using System.Linq.Expressions;namespace MessageLib.Core{    public class QueueTask    {        private Queue QueueData = new Queue(5000);        //队列数目发布数量        private int max_count_to_pulish = 1000;        public  bool isRunning = false;        private string itemType = ItemType.info;        private string MessageRouter = ItemType.info;        public QueueTask(string itemType,string MessageRouter)        {            this.itemType = itemType;            this.MessageRouter = MessageRouter;        }        ///         /// 可供外部使用的消息入列操作        ///         public void Push(Item item, IBus IBus)        {            QueueData.Enqueue(item);            if (!isRunning)                Run(IBus);        }        public void Run(IBus IBus)        {            if (!isRunning)            {                Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);                isRunning = true;            }        }        private void PulishMsg(object state)        {            IBus IBus = state as IBus;            if (QueueData.Count > 0)            {                PublisMsg(itemType, IBus);            }        }        private void PublisMsg(object channel, IBus BusInstance)        {            try            {                string channelName = channel as string;                if (QueueData.Count > 0)                {                    var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                    var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);                    var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);                    while (QueueData.Count > 0)                    {                        Item item = QueueData.Dequeue();                        if (item != null)                        {                            var properties = new MessageProperties();                            var Message = new Message(Newtonsoft.Json.JsonConvert.SerializeObject(item));                            Message.Properties.AppId = item.appid;                            BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);                        }                    }                }            }            catch (Exception ex)            {                Console.WriteLine("PublisMsg error:" + ex.Message);            }        }         public void Read(IBus BusInstance,Action dealAction) where T : Item        {            try            {                string channelName = itemType;                var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);                var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);                var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>                {                    registration.Add((message, info) =>                     {                        Item data = Newtonsoft.Json.JsonConvert.DeserializeObject(message.Body);                        dealAction(data);                    });                }));            }            catch (Exception ex)            {                Console.WriteLine("Read error:" + ex.Message);            }        }    }}
复制代码

然后,在MessageQueue.cs修改为单例模式:
  1.     public static class MessageQueue    {        /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/        private static IBus bus = null;        public static bool isRunning = false;        //消息队列        private static QueueTask NoticQueue = null;        //日志队列        private static QueueTask LogQueue = null;        //自定义        private static QueueTask InfoQueue = null;        #region 同步锁        private static readonly object obj = new object();        #endregion        public static void Init(string Connection, string routeKey)        {            if (NoticQueue == null)                NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);            if (LogQueue == null)                LogQueue = new QueueTask(ItemType.error, ItemType.error);            if (InfoQueue == null)                InfoQueue = new QueueTask(ItemType.info, routeKey);            if (string.IsNullOrEmpty(MQBusBuilder.Connnection))                MQBusBuilder.Connnection = Connection;        }        public static IBus BusInstance        {            get            {                if (bus == null)                {                    lock (obj)                    {                        if (bus == null|| !bus.IsConnected)                        {                            bus = MQBusBuilder.CreateMessageBus();                        }                    }                }                return bus;            }        }        ///         /// 可供外部使用的消息入列操作        ///         public static void PushAndRun(Item item)        {            if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)                return;            if (item.type == ItemType.notic)            {                NoticQueue.Push(item, BusInstance);            }            if (item.type == ItemType.error)            {                LogQueue.Push(item, BusInstance);            }            if (item.type == ItemType.info)            {                InfoQueue.Push(item, BusInstance);            }        }        public static void Read(string itemType, Action dealAction)        {            if (itemType == ItemType.notic)            {                NoticQueue.Read(BusInstance, dealAction);            }            if (itemType == ItemType.error)            {                LogQueue.Read(BusInstance, dealAction);            }            if (itemType == ItemType.info)            {                InfoQueue.Read(BusInstance, dealAction);            }        }    }
复制代码
View Code每次推送消息的时候,每个QueueTask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。



事件二

  事情过后,B端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程暴涨!因为订单是从B端推送过来的,B端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。
  1. [url=https://github.com/EasyNetQ/EasyNetQ]https://github.com/EasyNetQ/EasyNetQ[/url]
复制代码

翻来覆去,看到这个Consume方法,继承的是IDisposable接口,得勒,知道咋回事了。

Consume.Dispose(); 多个消费者的情况下,用完请记得主动释放啊。
这回真的可以浪了。

总结

  遇到问题,冷静下来,耐得了寥寂才行。线上的问题优先管理,然后再慢慢Debug,管理不了,看源码,再管理不了,降级处理惩罚,接待共同探讨。同时也感谢一下技能群里的兄弟给的一些建议,并帮助查找资料,还好EasyNetQ是开源了,不然也打算说先不消了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不外最终都完美的管理了,心里还是挺美滋滋的,步伐猿随之而来的成就感。
  别看我们在工位上默不作声,我们大概在拯救世界呢!老板,该加工资啦!
                                                                                             

增补

2018-12-25  鉴于大伙私信我想看看原来的bug修复后的情况,毕竟是公司代码不得当完全开源,我单独把例子源码做过修改的发布出来,思绪都差不多的,对比一下文章中原来的有问题的代码就可以了吧。因为都已经修复掉了,修改后的在这里。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?用户注册

x

相关技术服务需求,请联系管理员和客服QQ:2753533861或QQ:619920289
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

帖子推荐:
客服咨询

QQ:2753533861

服务时间 9:00-22:00

快速回复 返回顶部 返回列表