查看: 205|回复: 0

你所不知道的日志异步落库

[复制链接]

该用户从未签到

发表于 2019-11-4 16:58:55 | 显示全部楼层 |阅读模式
互联网设计架构过程中,日志异步落库,俨然已经是高并发环节中不可缺少的一环。为什么说是高并发环节中不可缺少的呢? 原因在于,如果直接用mq进行日志落库的时候,低并发下,生产端生产数据,然后由消费端异步落库,是没有什么问题的,而且性能也都是非常的好,估计tp99应该都在1ms以内。但是一旦并发增长起来,慢慢的你就发现生产端的tp99一直在增长,从1ms,变为2ms,4ms,直至send timeout。尤其在大促的时候,我司的体系就经历过这个情况,当时mq的发送耗时超过200ms,甚至一度有不少timeout产生。
考虑到这种情况在高并发的情况下才出现,以是今天我们就来探索更加可靠的方法来进行异步日志落库,保证所使用的方式不会因为过高的并发而出现接口ops连续下降甚至到不可用的情况。

方案一: 基于log4j的异步appender实现
此种方案,依靠于log4j。在log4j的异步appender中,通过mq进行生产消费入库。相称于在接口和mq之间建立了一个缓冲区,使得接口和mq的依靠分离,从而不让mq的操作影响接口的ops。
此种方案由于使用了异步方式,且由于异步的discard policy计谋,当大量数据过来,缓冲区满了之后,会抛弃部门数据。此种方案适用于能够容忍数据丢失的业务场景,不适用于对数据完整有严格要求的业务场景。
来看看具体的实现方式:
首先,我们需要自定义一个Appender,继承自log4j的AppenderSkeleton类,实现方式如下:
  1. public class AsyncJmqAppender extends AppenderSkeleton {    @Resource(name = "messageProducer")    private MessageProducer messageProducer;    @Override    protected void append(LoggingEvent loggingEvent) {        asyncPushMessage(loggingEvent.getMessage());    }    /**     * 异步调用jmq输出日志     * @param message     */    private void asyncPushMessage(Object message) {        CompletableFuture.runAsync(() -> {            Message messageConverted = (Message) message;            try {                messageProducer.send(messageConverted);            } catch (JMQException e) {                e.printStackTrace();            }        });    }    @Override    public boolean requiresLayout() {        return false;    }    @Override    public void close() {    }}
复制代码
然后在log4j.xml中,为此类进行配置:
  1.                                                         
复制代码
最后就可以按照如下的方式进行正常使用了:
  1. private static Logger logger = LoggerFactory.getLogger("filelog_appender_logger");
复制代码
注意: 此处需要注意log4j的一个性能问题。在log4j的conversionPattern中,匹配符最好不要出现 C% L%通配符,压测实践表明,这两个通配符会导致log4j打日志的效率降低10倍。
方案一很轻便,且剥离了接口直接依靠mq导致的性能问题。但是无法办理数据丢失的问题(但是我们其实可以在本地搞个计谋落盘来不及处理的数据,可以大大的淘汰数据丢失的几率)。但是很多的业务场景,是需要数据不丢失的,以是这就衍生出我们的另一套方案来。

方案二:增量消费log4j日志
此种方式,是开启worker在后台增量消费log4j的日志信息,和接口完全脱离。此种方式相比方案一,可以保证数据的不丢失,且可以做到完全不影响接口的ops。但是此种方式,由于是后台worker在后台启动进行扫描,会导致落库的数据慢一些,比如一分钟之后才落库完毕。以是适用于对落库数据及时性不高的场景。
具体的实现步骤如下:
首先,将需要进行增量消费的日志统一打到一个文件夹,以天为单位,每天生成一个带时间戳日志文件。由于log4j不支持直接带时间戳的日志文件生成,以是这里需要引入log4j.extras组件,然后配置log4j.xml如下:

之后在代码中的申明方式如下:
  1. private static Logger businessLogger = LoggerFactory.getLogger("file_rolling_logger");
复制代码
最后在需要记录日志的地方使用方式如下:
  1. businessLogger.error(JsonUtils.toJSONString(myMessage))
复制代码
这样就可以将日志打印到一个单独的文件中,且按照日期,每天生成一个。
然后,当日志文件生成完毕后,我们就可以开启我们的worker进行增量消费了,这里的增量消费方式,我们选择RandomAccessFile这个类来进行,由于其独特的位点读取方式,可以使得我们非常方便的根据位点的位置来消费增量文件,从而避免了逐行读取这种低效率的实现方式。
注意,为每个日志文件都单独创建了一个位点文件,里面存储了对应的文件的位点读取信息。当worker扫描开始的时候,会首先读取位点文件里面的位点信息,然后找到相应的日志文件,从位点信息位置开始进行消费。这就是整个增量消费worker的焦点。具体代码实现如下(代码太长,做了折叠):
[code]/** * @Description: 增量日志扫描worker * @Detail: 此worker主要用来扫描增量日志,日志本身会在不停的插入中,此worker会不停的扫描此日志来将数据上传到kafka集群 * @date 2018-04-08 10:30 */public class LimitBuyScanWorker {    /**     * 日志和位点文件生存的目录     */    private static final String FILE_DIRECTORY = "D:\\export\\Instances\\order\\server1\\logs\\";    /**     * 每次步进的长度,此处为1000行     */    private static final int SCAN_STEP = 1000;    /**     * 日志文件名前缀     */    private static final String LOG_FILE_PREFIX = "limitbuy.soa.order.";    /**     * 位点文件名后缀     */    private static final String OFT_FILE_APPENDIX = ".offset";    public void logScanner() {        //当前时间        Date currentDate = new Date();        //今日        String currentDay = DateUtil.formatDate("yyyy-MM-dd", currentDate);        //今日日志文件路径        String currentLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay;        logger.error("今日的日志文件路径:" + currentLogFilePath);        //今日位点文件路径        String currentOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay + OFT_FILE_APPENDIX;        //昨日        String yesterDay = DateUtil.formatDate("yyyy-MM-dd", DateUtil.queryPlusDay(currentDate, -1));        //昨日日志文件路径        String yesterdayLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay;        logger.error("昨日的日志文件路径:" + yesterdayLogFilePath);        //昨日位点文件路径        String yesterdayOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay + OFT_FILE_APPENDIX;        //先检测昨日位点和文件体积是否一致,不一致则代表未消费完毕        boolean yesterdayConsumedOK = checkIfConsumeOK(yesterdayLogFilePath, yesterdayOffsetFilePath);        logger.error("昨日的日志文件已被消费完毕:" + yesterdayConsumedOK);        //昨日的文件已扫描完毕        if (yesterdayConsumedOK) {            //扫描并消费今日增量日志            scanAndConsumeLog(currentLogFilePath, currentOffsetFilePath);        }        //昨日的文件未扫描完毕        else {            //扫描并消费昨日增量日志            scanAndConsumeLog(yesterdayLogFilePath, yesterdayOffsetFilePath);        }    }    /**     * 检测日志是否被扫描消费完毕,true:消费完毕;false:未消费完毕     * @Description 此举主要防止log4j在零点大促开始的时候,突然的滚动文件造成的部门增量日志不会被消费的问题     * @param logFilePath     * @param offsetFilePath     */    private boolean checkIfConsumeOK(String logFilePath, String offsetFilePath) {        try {            //打开文件            RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");            //得到当前位点            long currentOffset = checkOffset(offsetFilePath);            //得到文件总长            long currentFileLength = randomAccessFile.length();            //比对            if (currentOffset >= currentFileLength) {                return true;            }            return false;        } catch (FileNotFoundException e) {            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(FileNotFoundException):", e);            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());            return false;        } catch (IOException e) {            logger.error("com.jd.limitbuy.service.worker.logScanner 出错(IOException):", e);            AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出错:" + e.getMessage());            return false;        }    }    /**     * 扫描并消费增量日志     * @param logFilePath     * @param offsetFilePath     */    private void scanAndConsumeLog(String logFilePath, String offsetFilePath) {        try {            RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");            //得到当前位点            long currentOffset = checkOffset(offsetFilePath);            logger.error("开始位点==>" + currentOffset);            //重置位点到当前位点            if (currentOffset

相关技术服务需求,请联系管理员和客服QQ:2753533861或QQ:619920289
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

帖子推荐:
客服咨询

QQ:2753533861

服务时间 9:00-22:00

快速回复 返回顶部 返回列表