肥宅钓鱼网
当前位置: 首页 钓鱼百科

自动化运维管理平台的设计(自动化运维之日志统一管理)

时间:2023-08-19 作者: 小编 阅读量: 1 栏目名: 钓鱼百科

{Event.struct.httpRequest.parameters.value}参数说明:告警开始时间、告警ID、事件ID、事情数量、告警级别….HTTP返回码、触发告警字符串、响应动作、响应时间、响应大小、http包头的值,中间省略的部分请自行查看手册。其实ImpervaWAF的总日志字段数不少于一两百个,单从这一点可以看出确实好于国产WAF太多。

一、日志收集及告警项目背景

近来安全测试项目较少,想着把安全设备、nginx日志收集起来并告警, 话不多说,直接说重点,搭建背景:

1. 日志源:安全设备日志(Imperva WAF、绿盟WAF、paloalto防火墙)、nginx日志等;2. 日志分析开源软件:ELK,告警插件:Sentinl 或elastalert,告警方式:钉钉和邮件;3. 安全设备日志->logstash->es,nginx日志由于其他部门已有一份(flume->kafka)我们通过kafka->logstash->es再输出一份,其中logstash的正则过滤规则需要配置正确,不然比较消耗性能,建议写之前使用grokdebug先测试好再放入配置文件;4. 搭建系统:centos 7 , JDK 1.8, Python 2.75. ELK统一版本为5.5.2

由于es和kibana的安装都比较简单,就不在下文中说明安装及配置方法了。相关软件的下载链接如下:

Es:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

Kibana: https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz

Logstash: https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz

测试链接Grokdebug:http://grokdebug.herokuapp.com

二、安全设备日志收集

2.1 Imperva WAF配置

策略->操作集->新增日志告警规则

我这边没有用设备自身的一些日志规则,而是根据手册自定义了一些需要的日志字段,可在自定义策略的消息填入如下字段:

StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn} AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity} RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType} Alert_description=$!{Alert.description}EventType=$!{Event.eventType} PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp} SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol} DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort} WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName} URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction} ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize} Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}

参数说明:告警开始时间、告警ID、事件ID、事情数量、告警级别…. HTTP返回码、触发告警字符串、响应动作、响应时间、响应大小、http包头的值,中间省略的部分请自行查看手册。其实Imperva WAF的总日志字段数不少于一两百个,单从这一点可以看出确实好于国产WAF太多。

针对Imperva WAF的logstash配置如下:

input{ syslog{ type => "syslog" port => 514 } } filter { grok { match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID} EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo} Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName} Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description} EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP} SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP} DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod} Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL} ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key} Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime} ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"] # 实际复制粘贴可能会有点格式问题,注意参数之间空一个空格即可 remove_field => ["message"] } geoip { source => "SrcIP" #IP归属地解析插件 } } output{ elasticsearch{ hosts => "es单机或集群地址:9200" index => "impervasyslog" } }

说明:其中geoip为elk自带插件,可以解析ip归属地,比如ip归属的国家及城市,在仪表盘配置“地图炮”装X、查看攻击源地理位置的时候有点用,

2.2 绿盟WAF配置

日志报表->日志管理配置->Syslog配置&日志发生参数

针对绿盟WAF的logstash配置如下:

input和output参照imperva waf,贴出最要的grok部分,如下:

grok { match => ["message",%{DATA:syslog_flag} site_id:%{NUMBER:site_id} protect_id:%{NUMBER:protect_id} dst_ip:%{IPV4:dst_ip} dst_port:%{NUMBER:dst_port} src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method} domain:%{DATA:domain} uri:%{DATA:uri} alertlevel:%{DATA:alert_level} event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time} policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id} action:%{DATA:action} block:%{DATA:block} block_info:%{DATA:block_info} http:%{GREEDYDATA:URL}

2.3 paloalto防火墙配置

(6.1版本,其他版本可能会有点差异)

新建syslog服务器->日志转发,具体看截图

针对PA的logstash配置如下:

input和output参照imperva waf,贴出最要的grok部分,如下:

grok { match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"]

贴两张最终的效果图:

三、Nginx日志收集

由于nginx日志已经被其他大数据部门收集过一遍了,为避免重复读取,我们从其他部门的kafka拉取过来即可,这里说一下nginx收集的方式,flume->kafka 示例配置方式如下:

Flume配置如下

配置扫描日志文件

log_analysis_test.conf配置文件

a1.sources=s1 #可以理解为输入端,定义名称为s1a1.channels=c1 #传输频道,类似队列,定义为c1,设置为内存模式a1.sinks=k1 #可以理解为输出端,定义为sk1

#source配置

a1.sources.s1.type=execa1.sources.s1.command=tail -F/data/log/nginx/crf_crm.access.loga1.sources.s1.channels=c1

#channel配置

a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink 设置Kafka接收器a1.sinks.k1.channel=c1a1.sinks.k1.topic=crm_nginx_log_topic #设置Kafka的Topica1.sinks.k1.brokerList=x.x.x.x:9092 #设置Kafka的broker地址和端口号a1.sinks.k1.requiredAcks=1a1.sinks.k1.batchSize=20

kafka配置

kafka下载

wgethttp://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz

配置zookeeper,根据机器状况更改jvm 内存设置

vimbin/zookeeper-server-start.sh

配置kafka

vimbin/kafka-server-start.sh 根据机器状况更改jvm 内存设置

启动zookeeper

nohupbin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

nohup bin/kafka-server-start.shconfig/server.properties &

创建应用服务器的topic

kafka-topics.sh --create –zookeeper x.x.x.x:2181--replication-factor 2 --partitions 5 --topic crm_nginx_log_topic

分区及副本以自己的情况而定

查看Topic是否创建成功

./bin/kafka-topics.sh --list --zookeeperx.x.x.x:2181

启动kafka生产者

bin/kafka-console-producer.sh --broker-listx.x.x.x:9092 --topic pay

启动kafka消费者

kafka-console-consumer.sh --zookeeperx.x.x.x:2181--topic crm_nginx_log_topic

logstash 配置文件

input{ kafka { codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "crm_nginx_log_topic" zk_connect => "x.x.x.x:2181" # zookeeper的地址 } grok {省略} } output{ elasticsearch{ hosts => "es单机或集群地址:9200" index => "impervasyslog" }

四、安全告警

安全告警可以是Sentinl或 elastalert,Sentinl是kibana插件,可以集成到kibana内图形化展示,但是写规则时需要对JS较熟悉,elastalert 是es的插件,不支持集成到kibana界面进行图形化展示。下面分别对这2个插件进行安装及配置说明。

4.1 Sentinl

安装如下:

kibana-plugin install https://github.comsirensolutionssentinlreleasesdownloadtag-

5.4sentinl-v5.4.1.zip

安装后,以IP请求频次告警设置为例:

每2分钟去es查询一次

针对需要监控的IP字段

Input的body内容即es查询语法

编写过滤条件

{ "script": { "script":"payload.json='超过阀值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i<first.length;i){varip_count = parseInt(first[i].doc_count);if (ip_count >= threshold){match=true;payload.json= '【ip】:'first[i].key' 【count】:'first[i].doc_count'\\n';}};match;" } }

如果想对状态码做监控,参考如下:

Action里面添加钉钉群机器人的webhook

钉钉报警如下

钉钉的接口文档链接:

https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.karFPe&treeId=257&articleId=105735&docType=1

邮件告警设置如下:

4.2 elastalert

前置条件:

JDK 1.8python 2.7easy_install -U setuptools (最新setuptools-39.2.0)yum install gccyum install python-develyum install libffi-devel

安装

git clonehttps://github.com/Yelp/elastalert.git cd elastalert python setup.py install pip install -r requirements.txt cp config.yaml.example config.yaml

下载https://github.com/xuyaoqiang/elastalert-dingtalk-plugin

elastalert-dingtalk-plugin中的elastalert_modules复制到elastalert下

创建索引

elastalert-create-index

输入 es的IP 及 es的端口,其余根据自己的环境写,一般默认即可

配置config.yaml,以下为关键配置信息:

rules_folder: example_rules #指定rule的目录

run_every:

minutes: 1 #每一分钟去探测一次

buffer_time:

minutes: 15 #缓存15分钟

es_host: x.x.x.x

es_port: 9200

writeback_index: elastalert_status #创建的告警索引

alert_time_limit:

days: 2 #失败重试的时间限制

下面先以钉钉告警为例:

在example_rules里面新建钉钉告警配置文件,内容如下

es_host: x.x.x.x

es_port: 9200

name: xxx安全告警

type: cardinality

index: nsfocuswaf_syslog

cardinality_field: src_ip

max_cardinality: 30

timeframe:

minutes: 5 #单IP 5分钟内访问超过30次就会告警

aggregation_key: src_ip

summary_table_fields:

  • src_ip
  • dst_ip
  • dst_port
  • Attack_types
  • stat_time

alert:

-“elastalert_modules.dingtalk_alert.DingTalkAlerter”

dingtalk_webhook: “your webhook”

dingtalk_msgtype: text

群机器人的配置比较简单,自己搜索一下即可

注意如果告警匹配了N条,却只发出1条告警,修改elastalert.py代码,在856行后面增加如下代码:

修改dingtalk_alert.py的代码,增加如下内容:

src_ip = matches[0]['src_ip'] #src_ip为grok过滤后自定义的字段,以下相同 dst_ip = matches[0]['dst_ip'] dst_port = matches[0]['dst_port'] attack_types = matches[0]['Attack_types'] start_time = matches[0]['stat_time'] 修改payload初的代码,如下: payload = { "msgtype": self.dingtalk_msgtype, "text": { "content": r"发现源IP地址:{}在5分钟内,针对服务器IP:{}的{}端口发动了3次攻击,攻击类型是:{},攻击时间:{},请及时处理,谢谢!".format(src_ip,dst_ip,dst_port,attack_types,start_time) },

如果没有过滤除自定义的字段,只有message字段的话,可以新增如下代码:

message = matches[0]['message'] src_ip_pattern =r"src_ip:(\d \.\d \.\d \.\d )" dst_ip_pattern =r"dst_ip:(\d \.\d \.\d \.\d )" dst_port_pattern =r"dst_port:(\d )" time_pattern =r"stat_time:(\d{4}-\d{2}-\d{2}\s \d{2}:\d{2}:\d{2})" type_pattern =r"event_type:(\w )" src_ip= re.findall(src_ip_pattern, message)[0] dst_ip= re.findall(dst_ip_pattern, message)[0] dst_port= re.findall(dst_port_pattern, message)[0] start_time= re.findall(time_pattern, message)[0] attack_types= re.findall(type_pattern, message)[0]

最终效果图如下:

邮件的rule配置文件如下:

es_host: x.x.x.x es_port: 9200 name: 信息安全告警 type: cardinality index: nsfocuswaf_syslog cardinality_field: src_ip max_cardinality: 5 timeframe: minutes: 5 alert: - "email" smtp_host: mail.crfchina.com smtp_port: 25 smtp_auth_file:/opt/elastalert/smtp_auth_file.yaml email_reply_to: xxxxx@qq.com #回复给谁 from_addr: xxxxxx@qq.com #发件人 email: - xxxxx@qq.com #收件人 alert_subject: "信息安全告警" alert_text_type: alert_text_only alert_text: | XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 您好, 网站:{}正被人恶意攻击,请及时处理!! 当前匹配XX规则数:{} 发生时间:{} 攻击者的IP:{} 攻击类型:{} 请求的URL:{} alert_text_args: - domain -num_hits -stat_time -src_ip -Attack_types -URL

邮件告警最终效果如下:

五、总结

相关进程运行命令如下:

后台启动es

nohup su - elasticsearch -c'/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &

后台启动logstash

nohup ./bin/logstash -f x.x.x.conf &

后台启动kibana

nohup ./kibana &

启动flume

flume-ng agent --conf conf --conf-fileconf/log_analysis_test.conf --name a1 – Dflume.root.logger=INFO,console

启动kafka

nohup bin/kafka-server-start.shconfig/server.properties &

后台启动elastalert

nohup python -m elastalert.elastalert--config ./config.yaml --verbose --rule ./example_rules/DD_rule.yaml &

常见的告警策略除了来自安全设备的正则之外,大量的IP请求、错误状态码、nginx的request请求中包含的特征码也都是常见的告警规则。

,
    推荐阅读
  • 鱼胶的功效与作用禁忌(鱼胶有什么功效与作用禁忌)

    下面内容希望能帮助到你,我们来一起看看吧!鱼胶的功效与作用禁忌鱼胶当中含有高级胶原蛋白、各种维生素以及其他的一些微量元素,它当中的一些功能促进肠胃的消化与吸收,也能增强肌肉的一些韧劲,更好地促进生长发育等现象,同时还有美容养颜,延缓衰老,补充水分的作用。鱼胶并没有一定的禁忌,它是一种老少皆宜的食品,所以,在平时也可适当的来进行服用,要保持乐观向上的心态来面对生活。

  • 苹果需要什么化肥(苹果常用的肥料)

    以下内容希望对你有帮助!苹果需要什么化肥化肥:常用的有氮素化肥,如尿素、硫酸铵、硝酸铵、碳酸氢铵等。磷肥主要是过磷酸钙、钙美磷肥;钾肥常用硫酸钾和氮化钾、硝酸钾、草木灰。除单一元素的化肥之外,还有多种元素复合在一起的化肥,叫做复合肥。按复合元素的成分又分氮磷肥、磷钾肥、氮磷钾3元复合肥。

  • 大连92平米小户型装修(大连装修大连大有恬园二手房整体翻新打造88平北欧风婚房)

    灰色布艺沙发,配了一组色彩跳跃的小圆几地上是黑白几何纹的地毯电视背景同时融入了电视和壁炉元素电视两侧是对称的隔板,一边放了钢琴,一边是收纳柜从餐厅区望向客厅,餐厅区空间做了斜切面原木的小餐桌可以伸缩,平时占地小主卧门口的一小块黑板墙主卧门空间配合餐厅区域做了斜切,不规则的空间做了收纳白色调的房间不做多余的装饰如何能选到设计好的装修公司?

  • 员工不上班私自打卡(员工忘打卡被主管罚款)

    但实际上,现在很多公司依旧会实行打卡制度,还以此来作为考核员工的重要标准。然而,就在上个月,我因为上班的时候太匆忙,忘记了打卡,结果被这个主管发现了,直接就罚了我200元工资。而调查结果数据显示,90.57%的职场人,是按时上下班的“打卡族”,其中39.62%认为打卡让自己身心备受束缚,痛恨机械的打卡制度。但是从公司管理者的角度来说,打卡制度又是完全不可能取消的,因为人的惰性使然,公司必须要有规章制度才能很好的运转。

  • 白水泥和黑水泥的区别(白水泥凝固后怕不怕水)

    黑水泥浆种类繁多,主要成分是硅酸盐,强度高,附着力强,一般用于工程施工。早期石灰与火山灰的混合物与现代的石灰火山灰水泥很相似,用它胶结碎石制成的混凝土,硬化后不但强度较高,而且还能抵抗淡水或含盐水的侵蚀。长期以来,它作为一种重要的胶凝材料,广泛应用于土木建筑、水利、国防等工程。

  • 风水八大局断语(关于风水的4种冲和25种风水择日用事的术语)

    关于风水之冲:1.门冲:即为两门正对或是偏对,这当然包括阳台门正对的居室其它门,2.刃冲:这在居室中较多,如进门处做了一个玄关,其厚度10公分左右,则此玄关的边即为刃冲3.角冲:就是向内的直角墙的角4.路冲:走廊的尽头正对门即为路冲的一种关于风水择日用事术语:1.动土:阳宅建筑时,按所择日时,用锄头在吉方锄下第一锄土的时间称“动土”。

  • 哪些树适合盆栽结不结果都行 哪些树开花不结果

    柠檬最大的好处就是挂果期长,一年有大半年的时间都是观果期,好看不说,还有丝丝缕缕的清香。盆栽石榴是比较适合栽种在阳台的一个果树品种。百香果盆栽这几年比较受欢迎,很多花友都在养,非常好养活,扦插苗第一年就能结果,不过第一年结得少,第二年开始就会疯狂挂果,它是多年生的,一盆苗可以养好几年,枝条剪下来还可以无限扦插!

  • 我真没想重生啊番外篇在哪看(我真没想重生啊)

    万万没想到,社会精英、钻石单身汉的陈汉升居然重生了,一觉醒来变成了高三毕业生。十字路口的陈汉升也在犹豫,宝藏女孩沈幼楚和白月光萧容鱼,应该选择谁?可是复杂的是其中各种感情纠葛和主角陈汉升的骚操作。近期,终极修罗场已经拉开序幕,陈汉升也交待了自己的完整计划。其中将孩子调包的剧情,引起读者的巨大抵制和骂声。沈幼楚已经完全接受了替萧容鱼喂孩子的事实。其实,剧情发展到后面,沈幼楚基本上已经特别出彩的剧情。

  • 螺旋输送机性价比高(螺旋输送机)

    螺旋输送机可分为两种类型:U型螺旋输送机,管式螺旋输送机。适用于多种行业,石料场,砂石场,煤炭场,化工场,钙粉场,石英砂场,水泥场,建材场等。用于对各种粉状,颗粒状和小块状等松散物料的水平输送和垂直提升。螺旋输送机头尾轴承外置,不使用uc轴承使用哈瓦洛大轴承座。头尾轴45#钢淬火调质,有韧性更耐磨,吊轴承采用花键吊轴形式,噪音低。内部采用德国原料高力石墨点黄钢制造,使用寿命长。

  • 火山爆发是怎样形成的(火山爆发的原因介绍)

    火山爆发是怎样形成的火山爆发的原因是,地球内部温度和密度不均匀,在地幔内部形成地幔对流或地幔柱。在外力作用下,这些熔融物质汇聚在一起,并在地球的浅部形成岩浆囊。还有一种火山的成因是由于板块相互作用,比如在板块的俯冲带或碰撞带,由于摩擦形成了局部高温,一些含水矿物的脱水也降低了岩石的熔点,这时也会形成岩浆囊,从而引发火山活动。