Oct 30, 2019 No Comments 在Centos7上部署ELK实时日志分析平台 #### 什么是日志? 日志就是程序产生的,遵循一定格式(通常包含时间戳)的文本数据 通常日志由服务器生成,输出到不同的文件中,一般会有系统日志、 应用日志、安全日志。这些日志分散地存储在不同的机器上。 通常当系统发生故障时,工程师需要登录到各个服务器上,使用 grep / sed / awk 等 Linux 脚本工具去日志里查找故障原因。在没有日志系统的情况下,首先需要定位处理请求的服务器,如果这台服务器部署了多个实例,则需要去每个应用实例的日志目录下去找日志文件。每个应用实例还会设置日志滚动策略(如:每天生成一个文件),还有日志压缩归档策略等。 这样一系列流程下来,对于我们排查故障以及及时找到故障原因,造成了比较大的麻烦。因此,如果我们能把这些日志集中管理,并提供集中检索功能,不仅可以提高诊断的效率,同时对系统情况有个全面的理解,避免事后救火的被动。 我认为,日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程。 针对这些问题,为了提供分布式的实时日志搜集和分析的监控系统,我们采用了业界通用的日志数据管理解决方案 - 它主要包括 Elasticsearch 、 Logstash 和 Kibana 三个系统。通常,业界把这套方案简称为ELK,取三个系统的首字母,但是我们实践之后将其进一步优化为EFK,F代表Filebeat,用以解决Logstash导致的问题。 #### 一、ELK是什么鬼? ELK实际上是三个工具的集合,Elasticsearch + Logstash + Kibana,这三个工具组合形成了一套实用、易用的监控架构,很多公司利用它来搭建可视化的海量日志分析平台。 1. ElasticSearch ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。 1. Logstash Logstash是一个用于管理日志和事件的工具,你可以用它去收集日志、转换日志、解析日志并将他们作为数据提供给其它模块调用,例如搜索、存储等。 1. Kibana Kibana是一个优秀的前端日志展示框架,它可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持。 #### 二、ELK有何优势? 1. 强大的搜索功能,elasticsearch可以以分布式搜索的方式快速检索,而且支持DSL的语法来进行搜索,简单的说,就是通过类似配置的语言,快速筛选数据。 2. 完美的展示功能,可以展示非常详细的图表信息,而且可以定制展示内容,将数据可视化发挥的淋漓尽致。 3. 分布式功能,能够解决大型集群运维工作很多问题,包括监控、预警、日志收集解析等。 #### 三、ELK一般用来做啥? ELK组件在海量日志系统的运维中,可用于解决: * 分布式日志数据集中式查询和管理 * 系统监控,包含系统硬件和应用各个组件的监控 * 故障排查 * 安全信息和事件管理 * 报表功能 ELK组件在大数据运维系统中,主要可解决的问题如下: * 日志查询,问题排查,上线检查 * 服务器监控,应用监控,错误报警,Bug管理 * 性能分析,用户行为分析,安全漏洞分析,时间管理  如上图:Logstash收集AppServer产生的Log,并存放到ElasticSearch集群中,而Kibana则从ES集群中查询数据生成图表,再返回给Browser。 #### ELK环境部署: (0)基础环境介绍 系统: Centos7 防火墙: 关闭 Sellinux: 关闭 机器环境: 两台 elk-node1: #master机器 elk-node2: #slave机器 注明: master-slave模式: master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据);同时,master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。 如果master宕机了,那么客户端在日志采集配置中将elasticsearch主机指向改为slave,就可以保证ELK日志的正常采集和web展示。 由于elk-node1和elk-node2两台是虚拟机,没有外网ip,所以访问需要通过宿主机进行代理转发实现。 有以下两种转发设置:(任选其一) 通过访问宿主机的19200,19201端口分别转发到elk-node1,elk-node2的9200端口 通过访问宿主机的15601端口转发到elk-node1的5601端口 宿主机:内网ip为192.168.1.7) (为了不让线上的真实ip暴露,这里任意给了一个ip做记录) a)通过宿主机的haproxy服务进行代理转发,如下是宿主机上的代理配置: [root@kvm-server conf]# pwd /usr/local/haproxy/conf [root@kvm-server conf]# cat haproxy.cfg .......... .......... listen node1-9200 mode tcp option tcplog balance roundrobin server weight 1 check inter 1s rise 2 fall 2 listen node2-9200 mode tcp option tcplog balance roundrobin server weight 1 check inter 1s rise 2 fall 2 listen node1-5601 mode tcp option tcplog balance roundrobin server weight 1 check inter 1s rise 2 fall 2 重启haproxy服务 [root@kvm-server conf]# /etc/init.d/haproxy restart 设置宿主机防火墙 [root@kvm-server conf]# cat /etc/sysconfig/iptables ......... -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT [root@kvm-server conf]# /etc/init.d/iptables restart b)通过宿主机的NAT端口转发实现 [root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19200 -j DNAT --to-destination [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d -p tcp -m tcp --sport 9200 -j SNAT --to-source [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19200 -j ACCEPT [root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 19201 -j DNAT --to-destination [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d -p tcp -m tcp --sport 9200 -j SNAT --to-source [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 19201 -j ACCEPT [root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp --dport 15601 -j DNAT --to-destination [root@kvm-server conf]# iptables -t nat -A POSTROUTING -d -p tcp -m tcp --sport 5601 -j SNAT --to-source [root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state --state NEW -m tcp --dport 15601 -j ACCEPT [root@kvm-server conf]# service iptables save [root@kvm-server conf]# service iptables restart 提醒一点: nat端口转发设置成功后,/etc/sysconfig/iptables文件里要注释掉下面两行!不然nat转发会有问题!一般如上面在nat转发规则设置好并save和restart防火墙之后就会自动在/etc/sysconfig/iptables文件里删除掉下面两行内容了。 [root@kvm-server conf]# vim /etc/sysconfig/iptables .......... #-A INPUT -j REJECT --reject-with icmp-host-prohibited #-A FORWARD -j REJECT --reject-with icmp-host-prohibited [root@linux-node1 ~]# service iptables restart (1). Elasticsearch安装配置 基础环境安装(elk-node1和elk-node2同时操作) 1)下载并安装GPG Key [root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch 2)添加yum仓库 [root@elk-node1 ~]# vim /etc/yum.repos.d/elasticsearch.repo [elasticsearch-2.x] name=Elasticsearch repository for 2.x packages baseurl=http://packages.elastic.co/elasticsearch/2.x/centos gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 3)安装elasticsearch [root@elk-node1 ~]# yum install -y elasticsearch 4)安装相关测试软件 #提前先下载安装epel源:epel-release-latest-7.noarch.rpm,否则yum会报错:No Package..... [root@elk-node1 ~]# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm [root@elk-node1 ~]# rpm -ivh epel-release-latest-7.noarch.rpm #安装Redis [root@elk-node1 ~]# yum install -y redis #安装Nginx [root@elk-node1 ~]# yum install -y nginx #安装java [root@elk-node1 ~]# yum install -y java 安装完java后,检测 [root@elk-node1 ~]# java -version openjdk version "1.8.0_102" OpenJDK Runtime Environment (build 1.8.0_102-b14) OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode) 配置部署(下面先进行elk-node1的配置) 1)配置修改配置文件 [root@elk-node1 ~]# mkdir -p /data/es-data [root@elk-node1 ~]# vim /etc/elasticsearch/elasticsearch.yml 【将里面内容情况,配置下面内容】 cluster.name: huanqiu # 组名(同一个组,组名必须一致) node.name: elk-node1 # 节点名称,建议和主机名一致 path.data: /data/es-data # 数据存放的路径 path.logs: /var/log/elasticsearch/ # 日志存放的路径 bootstrap.mlockall: true # 锁住内存,不被使用到交换分区去 network.host: # 网络设置 http.port: 9200 # 端口 2)启动并查看 [root@elk-node1 ~]# chown -R elasticsearch.elasticsearch /data/ [root@elk-node1 ~]# systemctl start elasticsearch [root@elk-node1 ~]# systemctl status elasticsearch CGroup: /system.slice/elasticsearch.service └─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI... 注意:上面可以看出elasticsearch设置的内存最小256m,最大1g [root@linux-node1 src]# netstat -antlp |egrep "9200|9300" tcp6 0 0 :::9200 :::* LISTEN 3005/java tcp6 0 0 :::9300 :::* LISTEN 3005/java 然后通过web访问(访问的浏览器最好用google浏览器)  3)通过命令的方式查看数据(在112.110.115.10宿主机或其他外网服务器上查看,如下) [root@kvm-server src]# curl -i -XGET '' -d '{"query":{"match_all":{}}}' HTTP/1.1 200 OK Content-Type: application/json; charset=UTF-8 Content-Length: 95 { "count" : 0, "_shards" : { "total" : 0, "successful" : 0, "failed" : 0 } } 这样感觉用命令来查看,特别的不爽。 4)接下来安装插件,使用插件进行查看 (下面两个插件要在elk-node1和elk-node2上都要安装) 4.1)安装head插件 a)插件安装方法一 [root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head b)插件安装方法二 首先下载head插件,下载到/usr/loca/src目录下 下载地址:https://github.com/mobz/elasticsearch-head [root@elk-node1 src]# unzip elasticsearch-head-master.zip [root@elk-node1 src]# ls elasticsearch-head-master elasticsearch-head-master.zip 在/usr/share/elasticsearch/plugins目录下创建head目录 然后将上面下载的elasticsearch-head-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/head下,接着重启elasticsearch服务即可! [root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/ [root@elk-node1 plugins]# mkdir head [root@elk-node1 plugins]# ls head [root@elk-node1 plugins]# cd head [root@elk-node1 head]# cp -r /usr/local/src/elasticsearch-head-master/* ./ [root@elk-node1 head]# pwd /usr/share/elasticsearch/plugins/head [root@elk-node1 head]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins [root@elk-node1 head]# ll total 40 -rw-r--r--. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project -rw-r--r--. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js -rw-r--r--. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js -rw-r--r--. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html -rw-r--r--. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE -rw-r--r--. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json -rw-r--r--. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties -rw-r--r--. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test [root@elk-node1 _site]# systemctl restart elasticsearch 插件访问(最好提前将elk-node2节点的配置和插件都安装后,再来进行访问和数据插入测试)  先插入数据实例,测试下 如下:打开”复合查询“,在POST选项下,任意输入如/index-demo/test,然后在下面输入数据(注意内容之间换行的逗号不要漏掉); 数据输入好之后(如下输入wangshibo;hello world内容),下面点击”验证JSON“->”提交请求“,提交成功后,观察右栏里出现的信息:有index,type,version等信息,failed:0(成功消息) 再查看测试实例,如下: "复合查询"下,选择GET选项,在/index-demo/test/后面输入上面POST结果中的id号,不输入内容,即{}括号里为空! 然后点击”验证JSON“->"提交请求",观察右栏内就有了上面插入的数据了(即wangshibo,hello world) 打开"基本查询",查看下数据,如下,即可查询到上面插入的数据:  打开“数据浏览”,也能查看到插入的数据:  如下:一定要提前在elk-node2节点上也完成配置(配置内容在下面提到),否则上面插入数据后,集群状态会呈现×××yellow状态,elk-node2完成配置加入到集群里后就会恢复到正常的绿色状态。  4.2)安装kopf监控插件 a)监控插件安装方法一 [root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf b)监控插件安装方法二 首先下载监控插件kopf,下载到/usr/loca/src目录下 下载地址:https://github.com/lmenezes/elasticsearch-kopf [root@elk-node1 src]# unzip elasticsearch-kopf-master.zip [root@elk-node1 src]# ls elasticsearch-kopf-master elasticsearch-kopf-master.zip 在/usr/share/elasticsearch/plugins目录下创建kopf目录 然后将上面下载的elasticsearch-kopf-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/kopf下,接着重启elasticsearch服务即可! [root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/ [root@elk-node1 plugins]# mkdir kopf [root@elk-node1 plugins]# cd kopf [root@elk-node1 kopf]# cp -r /usr/local/src/elasticsearch-kopf-master/* ./ [root@elk-node1 kopf]# pwd /usr/share/elasticsearch/plugins/kopf [root@elk-node1 kopf]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins [root@elk-node1 kopf]# ll total 40 -rw-r--r--. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker -rw-r--r--. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs -rw-r--r--. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE -rw-r--r--. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json -rw-r--r--. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties -rw-r--r--. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests [root@elk-node1 _site]# systemctl restart elasticsearch 访问插件:(如下,同样要提前安装好elk-node2节点上的插件,否则访问时会出现集群节点为×××的yellow告警状态)!/cluster  下面进行节点elk-node2的配置 (如上的两个插件也在elk-node2上同样安装) 注释:其实两个的安装配置基本上是一样的。 [root@elk-node2 src]# mkdir -p /data/es-data [root@elk-node2 ~]# cat /etc/elasticsearch/elasticsearch.yml cluster.name: huanqiu node.name: elk-node2 path.data: /data/es-data path.logs: /var/log/elasticsearch/ bootstrap.mlockall: true network.host: http.port: 9200 discovery.zen.ping.multicast.enabled: false discovery.zen.ping.unicast.hosts: ["", ""] # 修改权限配置 [root@elk-node2 src]# chown -R elasticsearch.elasticsearch /data/ # 启动服务 [root@elk-node2 src]# systemctl start elasticsearch [root@elk-node2 src]# systemctl status elasticsearch ● elasticsearch.service - Elasticsearch Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled) Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago Docs: http://www.elastic.co Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS) Main PID: 17800 (java) CGroup: /system.slice/elasticsearch.service └─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra... Oct 09 13:42:22 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:22,295][WARN ][transport ] [elk-node2] Transport res...943817] Oct 09 13:42:23 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:23,111][WARN ][transport ] [elk-node2] Transport res...943846] ...... # 查看端口 [root@elk-node2 src]# netstat -antlp|egrep "9200|9300" tcp6 0 0 :::9200 :::* LISTEN 2928/java tcp6 0 0 :::9300 :::* LISTEN 2928/java tcp6 0 0 TIME_WAIT - tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT - 通过命令的方式查看elk-node2数据(在112.110.115.10宿主机或其他外网服务器上查看,如下) [root@kvm-server ~]# curl -i -XGET '' -d '{"query":{"match_all":{}}}' HTTP/1.1 200 OK Content-Type: application/json; charset=UTF-8 Content-Length: 95 { "count" : 1, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 } 然后通过web访问elk-node2 访问两个插件:!/cluster 这里几图和elk-node1基本相同,这里不再操作。 (2)Logstash安装配置(这个在客户机上是要安装的。elk-node1和elk-node2都安装) 基础环境安装(客户端安装logstash,收集到的数据写入到elasticsearch里,就可以登陆logstash界面查看到了) 1)下载并安装GPG Key [root@elk-node1 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch 2)添加yum仓库 [root@hadoop-node1 ~]# vim /etc/yum.repos.d/logstash.repo [logstash-2.1] name=Logstash repository for 2.1.x packages baseurl=http://packages.elastic.co/logstash/2.1/centos gpgcheck=1 gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch enabled=1 3)安装logstash [root@elk-node1 ~]# yum install -y logstash 4)logstash启动 [root@elk-node1 ~]# systemctl start elasticsearch [root@elk-node1 ~]# systemctl status elasticsearch ● elasticsearch.service - Elasticsearch Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled) Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago Docs: http://www.elastic.co Main PID: 8275 (java) CGroup: /system.slice/elasticsearch.service └─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac... 数据的测试 1)基本的输入输出 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }' Settings: Default filter workers: 1 Logstash startup completed hello #输入这个 2016-11-11T06:41:07.690Z elk-node1 hello #输出这个 wangshibo #输入这个 2016-11-11T06:41:10.608Z elk-node1 wangshibo #输出这个 2)使用rubydebug详细输出 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }' Settings: Default filter workers: 1 Logstash startup completed hello #输入这个 { #输出下面信息 "message" => "hello", "@version" => "1", "@timestamp" => "2016-11-11T06:44:06.711Z", "host" => "elk-node1" } wangshibo #输入这个 { #输出下面信息 "message" => "wangshibo", "@version" => "1", "@timestamp" => "2016-11-11T06:44:11.270Z", "host" => "elk-node1" } 3) 把内容写到elasticsearch中 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => [""]} }' Settings: Default filter workers: 1 Logstash startup completed #输入下面的测试数据 123456 wangshibo huanqiu hahaha 使用rubydebug和写到elasticsearch中的区别: 其实就在于后面标准输出的区别,前者使用codec;后者使用elasticsearch 写到elasticsearch中在logstash中查看,如下图: 注意: master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据),master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。如下,master收集到的数据放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。   4)即写到elasticsearch中又写在文件中一份 [root@elk-node1 ~]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => [""]} stdout{ codec => rubydebug}}' Settings: Default filter workers: 1 Logstash startup completed huanqiupc { "message" => "huanqiupc", "@version" => "1", "@timestamp" => "2016-11-11T07:27:42.012Z", "host" => "elk-node1" } wangshiboqun { "message" => "wangshiboqun", "@version" => "1", "@timestamp" => "2016-11-11T07:27:55.396Z", "host" => "elk-node1" } 以上文本可以长期保留、操作简单、压缩比大。下面登陆elasticsearch界面中查看;  logstash的配置和文件的编写 1)logstash的配置 简单的配置方式: [root@elk-node1 ~]# vim /etc/logstash/conf.d/01-logstash.conf input { stdin { } } output { elasticsearch { hosts => [""]} stdout { codec => rubydebug } } 它的执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf Settings: Default filter workers: 1 Logstash startup completed beijing #输入内容 { #输出下面信息 "message" => "beijing", "@version" => "1", "@timestamp" => "2016-11-11T07:41:48.401Z", "host" => "elk-node1" }  2)收集系统日志 [root@elk-node1 ~]# vim file.conf input { file { path => "/var/log/messages" type => "system" start_position => "beginning" } } output { elasticsearch { hosts => [""] index => "system-%{+YYYY.MM.dd}" } } 执行上面日志信息的收集,如下,这个命令会一直在执行中,表示日志在监控收集中;如果中断,就表示日志不在收集!所以需要放在后台执行~ [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf & 登陆elasticsearch界面,查看本机系统日志的信息:   3)收集java日志,其中包含上面讲到的日志收集 [root@elk-node1 ~]# vim file.conf input { file { path => "/var/log/messages" type => "system" start_position => "beginning" } } input { file { path => "/var/log/elasticsearch/huanqiu.log" type => "es-error" start_position => "beginning" } } output { if [type] == "system"{ elasticsearch { hosts => [""] index => "system-%{+YYYY.MM.dd}" } } if [type] == "es-error"{ elasticsearch { hosts => [""] index => "es-error-%{+YYYY.MM.dd}" } } } 注意: 如果你的日志中有type字段,那你就不能在conf文件中使用type 执行如下命令收集: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf & 登陆elasticsearch界面,查看数据:  有个问题: 每个报错都给收集成一行了,不是按照一个报错,一个事件模块收集的。 下面将行换成事件的方式展示: [root@elk-node1 ~]# vim multiline.conf input { stdin { codec => multiline { pattern => "^\[" negate => true what => "previous" } } } output { stdout { codec => "rubydebug" } } 执行命令: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f multiline.conf Settings: Default filter workers: 1 Logstash startup completed 123 456 [123 { "@timestamp" => "2016-11-11T09:28:56.824Z", "message" => "123\n456", "@version" => "1", "tags" => [ [0] "multiline" ], "host" => "elk-node1" } 123] [456] { "@timestamp" => "2016-11-11T09:29:09.043Z", "message" => "[123\n123]", "@version" => "1", "tags" => [ [0] "multiline" ], "host" => "elk-node1" } 在没有遇到[的时候,系统不会收集,只有遇见[的时候,才算是一个事件,才收集起来。 (3)Kibana安装配置 1)kibana的安装: [root@elk-node1 ~]# cd /usr/local/src [root@elk-node1 src]# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz [root@elk-node1 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz [root@elk-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/ [root@elk-node1 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana 2)修改配置文件: [root@elk-node1 config]# pwd /usr/local/kibana/config [root@elk-node1 config]# cp kibana.yml kibana.yml.bak [root@elk-node1 config]# vim kibana.yml server.port: 5601 server.host: "" elasticsearch.url: "" kibana.index: ".kibana" 因为他一直运行在前台,要么选择开一个窗口,要么选择使用screen。 安装并使用screen启动kibana: [root@elk-node1 ~]# yum -y install screen [root@elk-node1 ~]# screen #这样就另开启了一个终端窗口 [root@elk-node1 ~]# /usr/local/kibana/bin/kibana log [18:23:19.867] [info][status][plugin:kibana] Status changed from uninitialized to green - Ready log [18:23:19.911] [info][status][plugin:elasticsearch] Status changed from uninitialized to yellow - Waiting for Elasticsearch log [18:23:19.941] [info][status][plugin:kbn_vislib_vis_types] Status changed from uninitialized to green - Ready log [18:23:19.953] [info][status][plugin:markdown_vis] Status changed from uninitialized to green - Ready log [18:23:19.963] [info][status][plugin:metric_vis] Status changed from uninitialized to green - Ready log [18:23:19.995] [info][status][plugin:spyModes] Status changed from uninitialized to green - Ready log [18:23:20.004] [info][status][plugin:statusPage] Status changed from uninitialized to green - Ready log [18:23:20.010] [info][status][plugin:table_vis] Status changed from uninitialized to green - Ready 然后按ctrl+a+d组合键,这样在上面另启的screen屏里启动的kibana服务就一直运行在前台了 [root@elk-node1 ~]# screen -ls There is a screen on: 15041.pts-0.elk-node1 (Detached) 1 Socket in /var/run/screen/S-root. (3)访问kibana:[]( 如下,如果是添加上面设置的java日志收集信息,则在下面填写es-error_;如果是添加上面设置的系统日志信息system_,以此类型(可以从logstash界面看到日志收集项) 然后点击上面的Discover,在Discover中查看: 查看日志登陆,需要点击“Discover”-->"message",点击它后面的“add” 注意: 需要右边查看日志内容时带什么属性,就在左边点击相应属性后面的“add” 如下图,添加了message和path的属性:  这样,右边显示的日志内容的属性就带了message和path  点击右边日志内容属性后面隐藏的<<,就可将内容向前缩进 添加新的日志采集项,点击Settings->+Add New,比如添加system系统日志。注意后面的*不要忘了。   删除kibana里的日志采集项,如下,点击删除图标即可。  如果打开kibana查看日志,发现没有日志内容,出现“No results found”,如下图所示,这说明要查看的日志在当前时间没有日志信息输出,可以点击右上角的时间钟来调试日志信息的查看。   4)收集nginx的访问日志 修改nginx的配置文件,分别在nginx.conf的http和server配置区域添加下面内容: http 标签中 log_format json '{"@timestamp":"$time_iso8601",' '"@version":"1",' '"client":"$remote_addr",' '"url":"$uri",' '"status":"$status",' '"domain":"$host",' '"host":"$server_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"referer": "$http_referer",' '"ua": "$http_user_agent"' '}'; ##### server标签中 access_log /var/log/nginx/access_json.log json; 截图如下:  启动nginx服务: [root@elk-node1 ~]# systemctl start nginx [root@elk-node1 ~]# systemctl status nginx ● nginx.service - The nginx HTTP and reverse proxy server Loaded: loaded (/usr/lib/systemd/system/nginx.service; disabled; vendor preset: disabled) Active: active (running) since Fri 2016-11-11 19:06:55 CST; 3s ago Process: 15119 ExecStart=/usr/sbin/nginx (code=exited, status=0/SUCCESS) Process: 15116 ExecStartPre=/usr/sbin/nginx -t (code=exited, status=0/SUCCESS) Process: 15114 ExecStartPre=/usr/bin/rm -f /run/nginx.pid (code=exited, status=0/SUCCESS) Main PID: 15122 (nginx) CGroup: /system.slice/nginx.service ├─15122 nginx: master process /usr/sbin/nginx ├─15123 nginx: worker process └─15124 nginx: worker process Nov 11 19:06:54 elk-node1 systemd[1]: Starting The nginx HTTP and reverse proxy server... Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: the configuration file /etc/nginx/nginx.conf syntax is ok Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: configuration file /etc/nginx/nginx.conf test is successful Nov 11 19:06:55 elk-node1 systemd[1]: Started The nginx HTTP and reverse proxy server. 编写收集文件,这次使用json的方式收集: [root@elk-node1 ~]# vim json.conf input { file { path => "/var/log/nginx/access_json.log" codec => "json" } } output { stdout { codec => "rubydebug" } } 启动日志收集程序: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf #或加个&放在后台执行 访问nginx页面(在elk-node1的宿主机上执行访问页面的命令:curl [就会出现以下内容](就会出现以下内容): [root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf Settings: Default filter workers: 1 Logstash startup completed { "@timestamp" => "2016-11-11T11:10:53.000Z", "@version" => "1", "client" => "", "url" => "/index.html", "status" => "200", "domain" => "", "host" => "", "size" => 3700, "responsetime" => 0.0, "referer" => "-", "ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/ zlib/1.2.3 libidn/1.18 libssh2/1.4.2", "path" => "/var/log/nginx/access_json.log" } 注意: 上面的json.conf配置只是将nginx日志输出,还没有输入到elasticsearch里,所以这个时候在elasticsearch界面里是采集不到nginx日志的。 需要配置一下,将nginx日志输入到elasticsearch中,将其汇总到总文件file.conf里,如下也将nginx-log日志输入到elasticserach里:(后续就可以只用这个汇总文件,把要追加的日志汇总到这个总文件里即可) [root@elk-node1 ~]# cat file.conf input { file { path => "/var/log/messages" type => "system" start_position => "beginning" } file { path => "/var/log/elasticsearch/huanqiu.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" } } file { path => "/var/log/nginx/access_json.log" codec => json start_position => "beginning" type => "nginx-log" } } output { if [type] == "system"{ elasticsearch { hosts => [""] index => "system-%{+YYYY.MM.dd}" } } if [type] == "es-error"{ elasticsearch { hosts => [""] index => "es-error-%{+YYYY.MM.dd}" } } if [type] == "nginx-log"{ elasticsearch { hosts => [""] index => "nignx-log-%{+YYYY.MM.dd}" } } } 可以加上--configtest参数,测试下配置文件是否有语法错误或配置不当的地方,这个很重要!! [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK 然后接着执行logstash命令(由于上面已经将这个执行命令放到了后台,所以这里其实不用执行,也可以先kill之前的,再放后台执行),然后可以再访问nginx界面测试下 [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf & 登陆elasticsearch界面查看: 将nginx日志整合到kibana界面里,如下:  5)收集系统日志 编写收集文件并执行。 [root@elk-node1 ~]# cat syslog.conf input { syslog { type => "system-syslog" host => "" port => "514" } } output { stdout { codec => "rubydebug" } } 对上面的采集文件进行执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf 重新开启一个窗口,查看服务是否启动: [root@elk-node1 ~]# netstat -ntlp|grep 514 tcp6 0 0 :::* LISTEN 17842/java [root@elk-node1 ~]# vim /etc/rsyslog.conf #*.* @@remote-host:514 【在此行下面添加如下内容】 *.* @@ [root@elk-node1 ~]# systemctl restart rsyslog 回到原来的窗口(即上面采集文件的执行终端),就会出现数据: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf Settings: Default filter workers: 1 Logstash startup completed { "message" => "Stopping System Logging Service...\n", "@version" => "1", "@timestamp" => "2016-11-13T10:35:30.000Z", "type" => "system-syslog", "host" => "", "priority" => 30, "timestamp" => "Nov 13 18:35:30", "logsource" => "elk-node1", "program" => "systemd", "severity" => 6, "facility" => 3, "facility_label" => "system", "severity_label" => "Informational" } 再次添加到总文件file.conf中: [root@elk-node1 ~]# cat file.conf input { file { path => "/var/log/messages" type => "system" start_position => "beginning" } file { path => "/var/log/elasticsearch/huanqiu.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" } } file { path => "/var/log/nginx/access_json.log" codec => json start_position => "beginning" type => "nginx-log" } syslog { type => "system-syslog" host => "" port => "514" } } output { if [type] == "system"{ elasticsearch { hosts => [""] index => "system-%{+YYYY.MM.dd}" } } if [type] == "es-error"{ elasticsearch { hosts => [""] index => "es-error-%{+YYYY.MM.dd}" } } if [type] == "nginx-log"{ elasticsearch { hosts => [""] index => "nignx-log-%{+YYYY.MM.dd}" } } if [type] == "system-syslog"{ elasticsearch { hosts => [""] index => "system-syslog-%{+YYYY.MM.dd}" } } } 执行总文件(先测试下总文件配置是否有误,然后先kill之前在后台启动的file.conf文件,再次执行): [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf & 测试: 向日志中添加数据,看elasticsearch和kibana的变化: [root@elk-node1 ~]# logger "hehehehehehe1" [root@elk-node1 ~]# logger "hehehehehehe2" [root@elk-node1 ~]# logger "hehehehehehe3" [root@elk-node1 ~]# logger "hehehehehehe4" [root@elk-node1 ~]# logger "hehehehehehe5"  添加到kibana界面中:   6)TCP日志的收集 编写日志收集文件,并执行:(有需要的话,可以将下面收集文件的配置汇总到上面的总文件file.conf里,进而输入到elasticsearch界面里和kibana里查看) [root@elk-node1 ~]# cat tcp.conf input { tcp { host => "" port => "6666" } } output { stdout { codec => "rubydebug" } } [root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf 开启另外一个窗口,测试一(安装nc命令:yum install -y nc): [root@elk-node1 ~]# nc 6666 回到原来的窗口(即上面采集文件的执行终端),就会出现数据: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf Settings: Default filter workers: 1 Logstash startup completed { "message" => "", "@version" => "1", "@timestamp" => "2016-11-13T11:01:15.280Z", "host" => "", "port" => 49743 } 测试二: [root@elk-node1 ~]# echo "hehe" | nc 6666 [root@elk-node1 ~]# echo "hehe" > /dev/tcp/ 回到之前的执行端口,在去查看,就会显示出来: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf Settings: Default filter workers: 1 Logstash startup completed....... { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:39:58.263Z", "host" => "", "port" => 53432 } { "message" => "hehe", "@version" => "1", "@timestamp" => "2016-11-13T11:40:13.458Z", "host" => "", "port" => 53457 } 7)使用filter 编写文件: [root@elk-node1 ~]# cat grok.conf input { stdin{} } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } output { stdout{ codec => "rubydebug" } } 执行检测: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f grok.conf Settings: Default filter workers: 1 Logstash startup completed GET /index.html 15824 0.043 #输入这个,下面就会自动形成字典的形式 { "message" => " GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2016-11-13T11:45:47.882Z", "host" => "elk-node1", "client" => "", "method" => "GET", "request" => "/index.html", "bytes" => "15824", "duration" => "0.043" } 其实上面使用的那些变量在程序中都有定义: [root@elk-node1 ~]# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/ [root@elk-node1 patterns]# ls aws bro firewalls haproxy junos mcollective mongodb postgresql redis bacula exim grok-patterns java linux-syslog mcollective-patterns nagios rails ruby [root@elk-node1 patterns]# cat grok-patterns filter { # drop sleep events grok { match => { "message" =>"SELECT SLEEP" } add_tag => [ "sleep_drop" ] tag_on_failure => [] # prevent default _grokparsefailure tag on real records } if "sleep_drop" in [tags] { drop {} } grok { match => [ "message", "(?m)^# User@Host: %{USER:user} [^ [^ ]+\] @ (?:(?\S*) )? (?: (?: \s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?(?\w+)\s+.*)\n#\s*" ] } date { match => [ "timestamp", "UNIX" ] remove_field => [ "timestamp" ] } } 8)mysql慢查询 收集文件: [root@elk-node1 ~]# cat mysql-slow.conf input { file { path => "/root/slow.log" type => "mysql-slowlog" codec => multiline { pattern => "^# User@Host" negate => true what => "previous" } } } filter { # drop sleep events grok { match => { "message" =>"SELECT SLEEP" } add_tag => [ "sleep_drop" ] tag_on_failure => [] # prevent default _grokparsefailure tag on real records } if "sleep_drop" in [tags] { drop {} } grok { match => [ "message", "(?m)^# User@Host: %{USER:user} [^ [^ ]+\] @ (?:(?\S*) )? (?: (?: \s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?(?\w+)\s+.*)\n#\s*" ] } date { match => [ "timestamp", "UNIX" ] remove_field => [ "timestamp" ] } } output { stdout { codec =>"rubydebug" } } 执行检测: 上面需要的/root/slow.log是自己上传的,然后自己插入数据保存后,会显示: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf Settings: Default filter workers: 1 Logstash startup completed { "@timestamp" => "2016-11-14T06:53:54.100Z", "message" => "# Time: 161114 11:05:18", "@version" => "1", "path" => "/root/slow.log", "host" => "elk-node1", "type" => "mysql-slowlog", "tags" => [ [0] "_grokparsefailure" ] } { "@timestamp" => "2016-11-14T06:53:54.105Z", "message" => "# User@Host: test[test] @ []\n# Query_time: 1.725889 Lock_time: 0.000430 Rows_sent: 0 Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30", "@version" => "1", "tags" => [ [0] "multiline", [1] "_grokparsefailure" ], "path" => "/root/slow.log", "host" => "elk-node1", "type" => "mysql-slowlog" } 接下来描述会遇见到的一个问题: 一旦我们的elasticsearch出现问题,就不能进行日志采集处理了! 这种情况下该怎么办呢? 解决方案; 可以在client和elasticsearch之间添加一个中间件作为缓存,先将采集到的日志内容写到中间件上,然后再从中间件输入到elasticsearch中。 这就完美的解决了上述的问题了。 (4)ELK中使用redis作为中间件,缓存日志采集内容 1)redis的配置和启动 [root@elk-node1 ~]# vim /etc/redis.conf #修改下面两行内容 daemonize yes bind [root@elk-node1 ~]# systemctl start redis [root@elk-node1 ~]# lsof -i:6379 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN) [root@elk-node1 ~]# redis-cli -h> info # Server redis_version:2.8.19 ....... 2)编写从Client端收集数据的文件 [root@elk-node1 ~]# vim redis-out.conf input { stdin {} } output { redis { host => "" port => "6379" db => "6" data_type => "list" key => "demo" } } 3)执行收集数据的文件,并输入数据hello redis [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf Settings: Default filter workers: 1 Logstash startup completed #下面输入数据hello redis hello redis 4)在redis中查看数据 [root@elk-node1 ~]# redis-cli -h> info # Server ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0 #在最下面一行,显示是db6> select 6 OK[6]> keys * 1) "demo"[6]> LINDEX demo -1 "{\"message\":\"hello redis\",\"@version\":\"1\",\"@timestamp\":\"2016-11-14T08:04:25.981Z\",\"host\":\"elk-node1\"}" 5)继续随便写点数据 [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf Settings: Default filter workers: 1 Logstash startup completed hello redis 123456 asdf ert wang shi bo guohuihui as we r g asdfjkdfsak 5423wer 34rt3 6y 7uj u io9 sdjfhsdk890 huanqiu huanqiuchain hqsb asda 6)在redis中查看 在redis中查看长度: [root@elk-node1 ~]# redis-cli -h> info # Server redis_version:2.8.19 ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0 #显示是db6> select 6 OK[6]> keys * 1) "demo"[6]> LLEN demo (integer) 24 7)将redis中的内容写到ES中 [root@elk-node1 ~]# vim redis-in.conf input { redis { host => "" port => "6379" db => "6" data_type => "list" key => "demo" } } output { elasticsearch { hosts => [""] index => "redis-in-%{+YYYY.MM.dd}" } } 执行: [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf & 在redis中查看,发现数据已被读出:[6]> LLEN demo (integer) 0 登陆elasticsearch界面查看:  8)接着,将收集到的所有日志写入到redis中。这了重新定义一个添加redis缓存后的总文件shipper.conf。(可以将之前执行的总文件file.conf停掉) [root@elk-node1 ~]# vim shipper.conf input { file { path => "/var/log/messages" type => "system" start_position => "beginning" } file { path => "/var/log/elasticsearch/huanqiu.log" type => "es-error" start_position => "beginning" codec => multiline { pattern => "^\[" negate => true what => "previous" } } file { path => "/var/log/nginx/access_json.log" codec => json start_position => "beginning" type => "nginx-log" } syslog { type => "system-syslog" host => "" port => "514" } } output { if [type] == "system"{ redis { host => "" port => "6379" db => "6" data_type => "list" key => "system" } } if [type] == "es-error"{ redis { host => "" port => "6379" db => "6" data_type => "list" key => "demo" } } if [type] == "nginx-log"{ redis { host => "" port => "6379" db => "6" data_type => "list" key => "nginx-log" } } if [type] == "system-syslog"{ redis { host => "" port => "6379" db => "6" data_type => "list" key => "system-syslog" } } } 执行上面的文件(提前将上面之前启动的file.conf文件的执行给结束掉!) [root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf --configtest Configuration OK [root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf Settings: Default filter workers: 1 Logstash startup completed 在redis中查看: [root@elk-node1 ~]# redis-cli -h> info # Server redis_version:2.8.19 ....... ....... # Keyspace db6:keys=1,expires=0,avg_ttl=0 #显示是db6> select 6 OK[6]> keys * 1) "demo" 2) "system"[6]> keys * 1) "nginx-log" 2) "demo" 3) "system" 另开一个窗口,添加点日志: [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" [root@elk-node1 ~]# logger "12325423" 又会增加日志:[6]> keys * 1) "system-syslog" 2) "nginx-log" 3) "demo" 4) "system" 其实可以在任意的一台ES中将数据从redis读取到ES中。 下面咱们在elk-node2节点,将数据从redis读取到ES中: 编写文件: [root@elk-node2 ~]# cat file.conf input { redis { type => "system" host => "" port => "6379" db => "6" data_type => "list" key => "system" } redis { type => "es-error" host => "" port => "6379" db => "6" data_type => "list" key => "es-error" } redis { type => "nginx-log" host => "" port => "6379" db => "6" data_type => "list" key => "nginx-log" } redis { type => "system-syslog" host => "" port => "6379" db => "6" data_type => "list" key => "system-syslog" } } output { if [type] == "system"{ elasticsearch { hosts => [""] index => "system-%{+YYYY.MM.dd}" } } if [type] == "es-error"{ elasticsearch { hosts => [""] index => "es-error-%{+YYYY.MM.dd}" } } if [type] == "nginx-log"{ elasticsearch { hosts => [""] index => "nignx-log-%{+YYYY.MM.dd}" } } if [type] == "system-syslog"{ elasticsearch { hosts => [""] index => "system-syslog-%{+YYYY.MM.dd}" } } } 执行: [root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf --configtest Configuration OK [root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf & 去redis中检查,发现数据已经被读出到elasticsearch中了。[6]> keys * (empty list or set) 同时登陆logstash和kibana看,发现可以正常收集到日志了。 可以执行这个 去查看nginx日志 [root@elk-node1 ~]# ab -n10000 -c1 也可以启动多个redis写到ES中,具体根据自己的实际情况而定。 以上即是本人部署ELK环境的整个操作记录,在此留笔,希望能帮助到一些朋友。 ©著作权归作者所有:来自51CTO博客作者IT张sir的原创作品,如需转载,请注明出处,否则将追究法律责任 最后更新于 2019-10-30 10:56:49 并被添加「」标签,已有 3702 位童鞋阅读过。 本站使用「署名 4.0 国际」创作共享协议,可自由转载、引用,但需署名作者且注明文章出处