当前位置: > 其它学习 > Elasticsearch >

ELK整合:ElasticSearch定期删除过期数据

时间:2019-12-27 13:57来源:linux.it.net.cn 作者:IT
ELK 由三部分组成elasticsearch、logstash、kibana,elasticsearch是一个近似实时的搜索平台,它让你以前所未有的速度处理大数据成为可能。
 
Logstash:日志收集工具,可以从本地磁盘,网络服务(自己监听端口,接受用户日志),消息队列中收集各种各样的日志,然后进行过滤分析,并将日志输出到Elasticsearch中。
 
Elasticsearch:日志分布式存储/搜索工具,原生支持集群功能,可以将指定时间的日志生成一个索引,加快日志查询和访问。
 
Kibana:可视化日志Web展示工具,对Elasticsearch中存储的日志进行展示,还可以生成炫丽的仪表盘。
 
1:logstash安装部署文档
1.上传logstash安装包,解压logstash.tar.gz:
 
执行命令:tar -zxvf logstash.tar.gz
 
2.修改jdk版本为1.8,进入logstash/bin目录,修改logstash.lib.sh:
 
查看logstash.lib.sh中:
 
  setup_java() { if [ -z "$JAVACMD" ] ; then if [ -n "$JAVA_HOME" ] ;
 
then JAVACMD="$JAVA_HOME/bin/java" else JAVACMD="java"
 
  定义了一个setup_java的函数,setup_java被setup函数调用,最终被bin/logstash启动脚本调用,因此,我们只需要在logstash或logstash.lib.sh的行首位置添加两个环境变量:(为自己jdk地址)
 
   export JAVA_CMD="/opt/springboot/jdk1.8/jdk1.8.0_171/bin"
 
   export JAVA_HOME="/opt/springboot/jdk1.8/jdk1.8.0_171"
 
3.测试是否安装成功
 
 在logstash的bin目录下执行:
 
 ./logstash  -e 'input { stdin { } } output { stdout {} }'
 
运行成功后,输入数据,会打印出相关数据,及表示安装成功。
 
2:ElasticSearch安装部署文档
Elasticsearch是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。
   Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
 
ElasticSearch和solr对比:
 
 
部署jdk:上传jdk-8u171-linux-x64.tar.gz,解压。
 
部署es:上传elasticsearch-6.3.1.tar.gz,解压。
 
修改启动脚本:
 
我们需要在启动elasticsearch时,就需要指定jdk版本。
即修改elasticsearch的启动脚本(elasticsearch_HOME/bin/elasticsearch)
 
[root@master01 elasticsearch-6.0.0]# vim bin/elasticsearch
 
# 添加以下代码
 
export JAVA_HOME=/opt/springboot/jdk1.8/jdk1.8.0_171
 
export PATH=$JAVA_HOME/bin:$PATH
 
运行:切换到普通用户: su   es;
 
      进入es的安装bin目录下,运行elasticSearch命令。
 
遇到的问题:
报错:max size virtual memory for user [elastic] is too low, increase to [unlimited]
 
修改:
 
/etc/security/limits.conf
 
* hard memlock unlimited
 
* soft memlock unlimited
 
* hard nofile 65536
 
* soft nofile 65536
 
*  - as unlimited
 
/etc/sysctl.conf
 
fs.file-max = 2097152
 
vm.max_map_count = 262144
 
vm.swappiness = 1
 
elasticsearch.yml
 
node.name: elasticsearch
 
http.cors.enabled: true
 
http.cors.allow-origin: "*"
 
transport.host: 192.168.1.24
 
transport.tcp.port: 9300
 
http.port: 9200
 
network.host: 0.0.0.0
 
bootstrap.memory_lock: false
 
bootstrap.system_call_filter: false
 
 
 
3:springboot上传日志到elasticsearch
配置logstash相关配置文件
在bin目录下新建jdbc.conf,配置输入输出位置信息。
 
springboot整合logstash,添加依赖,配置文件,conf文件中配置监听的端口号,写到elasticsearch中。
 
input {
 
     tcp {
 
         port => 9250
 
         mode => "server"
 
         tags => ["tags"]
 
         codec => json_lines
 
         }
 
}
 
output{
 
      elasticsearch {
 
      hosts => ["192.168.1.24:9200"]
 
      index => "spboot-%{+YYYY.MM.dd}"
 
      }
 
      stdout{ codec => rubydebug }
 
}
 
启动logstash,指定配置文件,执行命令logstash -f ./jdbc.conf
 
后台执行命令:nohup ./logstash -f ./jdbc.conf > ./logstash.log 2>&1 &
 
上传到es中的文件名格式:spboot-2019.02.26,es清除数据的时候会根据格式删除当天的索引。
 
springboot整合logstash,上传日志到es:
springboot添加相关依赖
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>5.1</version>
</dependency>
 
在springboot项目中的src/main/resources目录下,创建logback-spring.xml
文件,配置相关信息。
 
指定logstash的安装地址及打印相关日志
 
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <include resource="org/springframework/boot/logging/logback/base.xml" />
    <contextName>logback</contextName>
    <!-- 记录文件到特定目录 -->
    <!-- <property name="log.path" value="E:\\test\\logback.log" /> -->
    <property name="log.path" value="./logback.log" />
    <appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>192.168.1.24:9250</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>
    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
         <level>ERROR</level>
        </filter>-->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <!--输出到文件-->
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>logback.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <root level="info">
        <appender-ref ref="stash"/>
        <!--<appender-ref ref="console" />
        <appender-ref ref="file" />-->
    </root>
    <!-- logback为 java 中的包
    <logger name="com.dudu.controller"/>
    logback.LogbackDemo:类的全路径
    <logger name="com.dudu.controller.LearnController" level="WARN" additivity="false">
     <appender-ref ref="console"/>
    </logger> -->
</configuration>
 
 
优化ElasticSearch定期删除过期数据
在/opt/springboot/elasticsearch/delete-es目录下创建es-del.sh文件
 
  文件内容:
 
 
#!/bin/bash
# @Author: richard
# @Date:   2017-08-11 17:27:49
# @Last Modified by:   richard
# @Last Modified time: 2017-08-11 18:04:58
#保留近 N 天
KEEP_DAYS=7 
# 删除前 N的所有天到 前N+10天==>每天执行
function get_todelete_days()
{
    # declare -A DAY_ARR
    # DAY_ARR=""
    for i in $(seq 1 10);
    do
        THIS_DAY=$(date -d "$(($KEEP_DAYS+$i)) day ago" +%Y.%m.%d)
 
        DAY_ARR=( "${DAY_ARR[@]}" $THIS_DAY)
    done
    echo ${DAY_ARR[*]} 
}
# 返回数组的写法
TO_DELETE_DAYS=(`get_todelete_days`)
for day in "${TO_DELETE_DAYS[@]}"
do
    echo "$day will be delete"  
    curl -XDELETE 'http://127.0.0.1:9200/*-'${day}
done
 
 
在目录下启动定时任务执行此文件,
 
输入:crontab -e
 
输入内容:
 
30 23 * * 7 /opt/springboot/elasticsearch/delete-es/es-del.sh
 
然后按Esc输入  :wq   即可。
 
每周日晚上23:30执行一次。
 
定时任务执行log文件地址:/var/spool/mail/root 可查看执行错误信息。
 



(责任编辑:IT)
------分隔线----------------------------