| 
       
	ELK 由三部分组成elasticsearch、logstash、kibana,elasticsearch是一个近似实时的搜索平台,它让你以前所未有的速度处理大数据成为可能。 
	Logstash:日志收集工具,可以从本地磁盘,网络服务(自己监听端口,接受用户日志),消息队列中收集各种各样的日志,然后进行过滤分析,并将日志输出到Elasticsearch中。 
	Elasticsearch:日志分布式存储/搜索工具,原生支持集群功能,可以将指定时间的日志生成一个索引,加快日志查询和访问。 
	Kibana:可视化日志Web展示工具,对Elasticsearch中存储的日志进行展示,还可以生成炫丽的仪表盘。 
	1:logstash安装部署文档 
	1.上传logstash安装包,解压logstash.tar.gz: 
	执行命令:tar -zxvf logstash.tar.gz 
	2.修改jdk版本为1.8,进入logstash/bin目录,修改logstash.lib.sh: 
	查看logstash.lib.sh中: 
	  setup_java() { if [ -z "$JAVACMD" ] ; then if [ -n "$JAVA_HOME" ] ; 
	then JAVACMD="$JAVA_HOME/bin/java" else JAVACMD="java" 
	  定义了一个setup_java的函数,setup_java被setup函数调用,最终被bin/logstash启动脚本调用,因此,我们只需要在logstash或logstash.lib.sh的行首位置添加两个环境变量:(为自己jdk地址) 
	   export JAVA_CMD="/opt/springboot/jdk1.8/jdk1.8.0_171/bin" 
	   export JAVA_HOME="/opt/springboot/jdk1.8/jdk1.8.0_171" 
	3.测试是否安装成功 
	 在logstash的bin目录下执行: 
	 ./logstash  -e 'input { stdin { } } output { stdout {} }' 
	运行成功后,输入数据,会打印出相关数据,及表示安装成功。 
	2:ElasticSearch安装部署文档 
	Elasticsearch是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。 
	   Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。 
	ElasticSearch和solr对比: 
![]() 
	部署jdk:上传jdk-8u171-linux-x64.tar.gz,解压。 
	部署es:上传elasticsearch-6.3.1.tar.gz,解压。 
	修改启动脚本: 
	我们需要在启动elasticsearch时,就需要指定jdk版本。 
	即修改elasticsearch的启动脚本(elasticsearch_HOME/bin/elasticsearch) 
	[root@master01 elasticsearch-6.0.0]# vim bin/elasticsearch 
	# 添加以下代码 
	export JAVA_HOME=/opt/springboot/jdk1.8/jdk1.8.0_171 
	export PATH=$JAVA_HOME/bin:$PATH 
	运行:切换到普通用户: su   es; 
	      进入es的安装bin目录下,运行elasticSearch命令。 
	遇到的问题: 
	报错:max size virtual memory for user [elastic] is too low, increase to [unlimited] 
	修改: 
	/etc/security/limits.conf 
	* hard memlock unlimited 
	* soft memlock unlimited 
	* hard nofile 65536 
	* soft nofile 65536 
	*  - as unlimited 
	/etc/sysctl.conf 
	fs.file-max = 2097152 
	vm.max_map_count = 262144 
	vm.swappiness = 1 
	elasticsearch.yml 
	node.name: elasticsearch 
	http.cors.enabled: true 
	http.cors.allow-origin: "*" 
	transport.host: 192.168.1.24 
	transport.tcp.port: 9300 
	http.port: 9200 
	network.host: 0.0.0.0 
	bootstrap.memory_lock: false 
	bootstrap.system_call_filter: false 
	3:springboot上传日志到elasticsearch 
	配置logstash相关配置文件 
	在bin目录下新建jdbc.conf,配置输入输出位置信息。 
	springboot整合logstash,添加依赖,配置文件,conf文件中配置监听的端口号,写到elasticsearch中。 
	input { 
	     tcp { 
	         port => 9250 
	         mode => "server" 
	         tags => ["tags"] 
	         codec => json_lines 
	         } 
	} 
	output{ 
	      elasticsearch { 
	      hosts => ["192.168.1.24:9200"] 
	      index => "spboot-%{+YYYY.MM.dd}" 
	      } 
	      stdout{ codec => rubydebug } 
	} 
	启动logstash,指定配置文件,执行命令logstash -f ./jdbc.conf 
	后台执行命令:nohup ./logstash -f ./jdbc.conf > ./logstash.log 2>&1 & 
	上传到es中的文件名格式:spboot-2019.02.26,es清除数据的时候会根据格式删除当天的索引。 
	springboot整合logstash,上传日志到es: 
	springboot添加相关依赖 
	<dependency> 
	    <groupId>net.logstash.logback</groupId> 
	    <artifactId>logstash-logback-encoder</artifactId> 
	    <version>5.1</version> 
	</dependency> 
	在springboot项目中的src/main/resources目录下,创建logback-spring.xml 
	文件,配置相关信息。 
	指定logstash的安装地址及打印相关日志 
	<?xml version="1.0" encoding="UTF-8"?> 
	<configuration scan="true" scanPeriod="60 seconds" debug="false"> 
	    <include resource="org/springframework/boot/logging/logback/base.xml" /> 
	    <contextName>logback</contextName> 
	    <!-- 记录文件到特定目录 --> 
	    <!-- <property name="log.path" value="E:\\test\\logback.log" /> --> 
	    <property name="log.path" value="./logback.log" /> 
	    <appender name="stash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> 
	        <destination>192.168.1.24:9250</destination> 
	        <encoder class="net.logstash.logback.encoder.LogstashEncoder" /> 
	    </appender> 
	    <!--输出到控制台--> 
	    <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> 
	        <!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> 
	         <level>ERROR</level> 
	        </filter>--> 
	        <encoder> 
	            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern> 
	        </encoder> 
	    </appender> 
	    <!--输出到文件--> 
	    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> 
	        <file>${log.path}</file> 
	        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> 
	            <fileNamePattern>logback.%d{yyyy-MM-dd}.log</fileNamePattern> 
	        </rollingPolicy> 
	        <encoder> 
	            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern> 
	        </encoder> 
	    </appender> 
	    <root level="info"> 
	        <appender-ref ref="stash"/> 
	        <!--<appender-ref ref="console" /> 
	        <appender-ref ref="file" />--> 
	    </root> 
	    <!-- logback为 java 中的包 
	    <logger name="com.dudu.controller"/> 
	    logback.LogbackDemo:类的全路径 
	    <logger name="com.dudu.controller.LearnController" level="WARN" additivity="false"> 
	     <appender-ref ref="console"/> 
	    </logger> --> 
	</configuration> 
	优化ElasticSearch定期删除过期数据 
	在/opt/springboot/elasticsearch/delete-es目录下创建es-del.sh文件 
	  文件内容: 
![]() 
	#!/bin/bash 
	# @Author: richard 
	# @Date:   2017-08-11 17:27:49 
	# @Last Modified by:   richard 
	# @Last Modified time: 2017-08-11 18:04:58 
	#保留近 N 天 
	KEEP_DAYS=7  
	# 删除前 N的所有天到 前N+10天==>每天执行 
	function get_todelete_days() 
	{ 
	    # declare -A DAY_ARR 
	    # DAY_ARR="" 
	    for i in $(seq 1 10); 
	    do 
	        THIS_DAY=$(date -d "$(($KEEP_DAYS+$i)) day ago" +%Y.%m.%d) 
	        DAY_ARR=( "${DAY_ARR[@]}" $THIS_DAY) 
	    done 
	    echo ${DAY_ARR[*]}  
	} 
	# 返回数组的写法 
	TO_DELETE_DAYS=(`get_todelete_days`) 
	for day in "${TO_DELETE_DAYS[@]}" 
	do 
	    echo "$day will be delete"   
	    curl -XDELETE 'http://127.0.0.1:9200/*-'${day} 
	done 
	在目录下启动定时任务执行此文件, 
	输入:crontab -e 
	输入内容: 
	30 23 * * 7 /opt/springboot/elasticsearch/delete-es/es-del.sh 
	然后按Esc输入  :wq   即可。 
	每周日晚上23:30执行一次。 
	定时任务执行log文件地址:/var/spool/mail/root 可查看执行错误信息。 
(责任编辑:IT)  | 
    


