歡迎光臨
每天分享高質量文章

自動化運維之日誌統一管理

來源:FreeBuf

ID:freebuf

一、日誌收集及告警專案背景

近來安全測試專案較少,想著把安全裝置、nginx日誌收集起來並告警, 話不多說,直接說重點,搭建背景:

1. 日誌源:安全裝置日誌(Imperva WAF、綠盟WAF、paloalto防火牆)、nginx日誌等;

2. 日誌分析開源軟體:ELK,告警外掛:Sentinl 或elastalert,告警方式:釘釘和郵件;

3. 安全裝置日誌->logstash->es,nginx日誌由於其他部門已有一份(flume->kafka)我們透過kafka->logstash->es再輸出一份,其中logstash的正則過濾規則需要配置正確,不然比較消耗效能,建議寫之前使用grokdebug先測試好再放入配置檔案;

4. 搭建系統:centos 7 , JDK 1.8, Python 2.7

5. ELK統一版本為5.5.2

由於es和kibana的安裝都比較簡單,就不在下文中說明安裝及配置方法了。相關軟體的下載連結如下:

Es:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

Kibana: https://artifacts.elastic.co/downloads/kibana/kibana-5.5.2-linux-x86_64.tar.gz

Logstash: https://artifacts.elastic.co/downloads/logstash/logstash-5.5.2.tar.gz

測試連結Grokdebug:http://grokdebug.herokuapp.com

二、安全裝置日誌收集

2.1 Imperva WAF配置

策略->操作集->新增日誌告警規則

我這邊沒有用裝置自身的一些日誌規則,而是根據手冊自定義了一些需要的日誌欄位,可在自定義策略的訊息填入如下欄位:

StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn}    AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity}     RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType}     Alert_description=$!{Alert.description}EventType=$!{Event.eventType}    PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp}    SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol}    DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort}    WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName}    URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction}    ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize}    Headers_value=$!{Event.struct.httpRequest.essay-headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}

引數說明:告警開始時間、告警ID、事件ID、事情數量、告警級別…. HTTP傳回碼、觸發告警字串、響應動作、響應時間、響應大小、http包頭的值,中間省略的部分請自行檢視手冊。其實Imperva WAF的總日誌欄位數不少於一兩百個,單從這一點可以看出確實好於國產WAF太多。

針對Imperva WAF的logstash配置如下:

input{       syslog{       type => "syslog"       port => 514        }    }    filter {    grok  {    match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID}    EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo}    Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName}    Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description}    EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP}    SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP}    DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod}    Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL}    ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key}    Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime}    ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"]      # 實際複製貼上可能會有點格式問題,註意引數之間空一個空格即可      remove_field => ["message"]    }    geoip {       source => "SrcIP"   #IP歸屬地解析外掛      }    }    output{       elasticsearch{         hosts => "es單機或叢集地址:9200"            index => "impervasyslog"       }      }

說明:其中geoip為elk自帶外掛,可以解析ip歸屬地,比如ip歸屬的國家及城市,在儀錶盤配置“地圖炮”裝X、檢視攻擊源地理位置的時候有點用,

2.2 綠盟WAF配置

日誌報表->日誌管理配置->Syslog配置&日誌發生引數

針對綠盟WAF的logstash配置如下:

input和output參照imperva waf,貼出最要的grok部分,如下:

grok  {         match => ["message",%{DATA:syslog_flag}  site_id:%{NUMBER:site_id}     protect_id:%{NUMBER:protect_id}  dst_ip:%{IPV4:dst_ip}  dst_port:%{NUMBER:dst_port}     src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method}    domain:%{DATA:domain}  uri:%{DATA:uri}  alertlevel:%{DATA:alert_level}    event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time}     policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id}  action:%{DATA:action}     block:%{DATA:block} block_info:%{DATA:block_info}  http:%{GREEDYDATA:URL}

2.3 paloalto防火牆配置

(6.1版本,其他版本可能會有點差異)

新建syslog伺服器->日誌轉發,具體看截圖

 

針對PA的logstash配置如下:

input和output參照imperva waf,貼出最要的grok部分,如下:

grok  {         match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"]

貼兩張最終的效果圖:

三、Nginx日誌收集

由於nginx日誌已經被其他大資料部門收集過一遍了,為避免重覆讀取,我們從其他部門的kafka拉取過來即可,這裡說一下nginx收集的方式,flume->kafka 示例配置方式如下:

Flume配置如下

配置掃描日誌檔案

log_analysis_test.conf配置檔案

a1.sources=s1 #可以理解為輸入端,定義名稱為s1

a1.channels=c1 #傳輸頻道,類似佇列,定義為c1,設定為記憶體樣式

a1.sinks=k1 #可以理解為輸出端,定義為sk1

#source配置

a1.sources.s1.type=exec

a1.sources.s1.command=tail -F/data/log/nginx/crf_crm.access.log

a1.sources.s1.channels=c1

#channel配置

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink 設定Kafka接收器

a1.sinks.k1.channel=c1

a1.sinks.k1.topic=crm_nginx_log_topic #設定Kafka的Topic

a1.sinks.k1.brokerList=x.x.x.x:9092 #設定Kafka的broker地址和埠號

a1.sinks.k1.requiredAcks=1

a1.sinks.k1.batchSize=20

kafka配置

kafka下載

wgethttp://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz

配置zookeeper,根據機器狀況更改jvm 記憶體設定

vimbin/zookeeper-server-start.sh

配置kafka

 vimbin/kafka-server-start.sh   根據機器狀況更改jvm 記憶體設定

啟動zookeeper

nohupbin/zookeeper-server-start.sh config/zookeeper.properties &

啟動kafka

nohup bin/kafka-server-start.shconfig/server.properties &

建立應用伺服器的topic

kafka-topics.sh --create –zookeeper x.x.x.x:2181--replication-factor 2 --partitions 5 --topic crm_nginx_log_topic

分割槽及副本以自己的情況而定

檢視Topic是否建立成功

./bin/kafka-topics.sh --list --zookeeperx.x.x.x:2181

啟動kafka生產者

bin/kafka-console-producer.sh --broker-listx.x.x.x:9092 --topic pay

啟動kafka消費者

kafka-console-consumer.sh --zookeeperx.x.x.x:2181--topic crm_nginx_log_topic

logstash 配置檔案

input{       kafka {           codec => "plain"           group_id => "logstash1"           auto_offset_reset => "smallest"           reset_beginning => true           topic_id => "crm_nginx_log_topic"           zk_connect => "x.x.x.x:2181" # zookeeper的地址       }    grok {省略}    }    output{       elasticsearch{         hosts => "es單機或叢集地址:9200"             index => "impervasyslog"       }

四、安全告警

安全告警可以是Sentinl或 elastalert,Sentinl是kibana外掛,可以整合到kibana內圖形化展示,但是寫規則時需要對JS較熟悉,elastalert 是es的外掛,不支援整合到kibana介面進行圖形化展示。下麵分別對這2個外掛進行安裝及配置說明。

4.1 Sentinl

安裝如下:

kibana-plugin install https://github.comsirensolutionssentinlreleasesdownloadtag-

5.4sentinl-v5.4.1.zip

安裝後,以IP請求頻次告警設定為例:

每2分鐘去es查詢一次

針對需要監控的IP欄位

Input的body內容即es查詢語法

編寫過濾條件

{      "script": {        "script":"payload.json='超過閥值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i= threshold){match=true;payload.json += '【ip】:' + first[i].key + '  【count】:' +first[i].doc_count + '\\n';}};match;"      }    }

如果想對狀態碼做監控,參考如下:

Action裡面新增釘釘群機器人的webhook

釘釘報警如下

釘釘的介面檔案連結:

https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.karFPe&treeId;=257&articleId;=105735&docType;=1

郵件告警設定如下:

4.2 elastalert

前置條件:

JDK 1.8

python 2.7

easy_install -U setuptools (最新setuptools-39.2.0)

yum install gcc

yum install python-devel

yum install libffi-devel

安裝

git clonehttps://github.com/Yelp/elastalert.git    cd elastalert    python setup.py install              pip install -r requirements.txt      cp config.yaml.example config.yaml

下載https://github.com/xuyaoqiang/elastalert-dingtalk-plugin

elastalert-dingtalk-plugin中的elastalert_modules複製到elastalert下

建立索引

    elastalert-create-index

輸入 es的IP 及 es的埠,其餘根據自己的環境寫,一般預設即可

配置config.yaml,以下為關鍵配置資訊:

rules_folder: example_rules #指定rule的目錄

run_every:

minutes: 1 #每一分鐘去探測一次

buffer_time:

minutes: 15 #快取15分鐘

es_host: x.x.x.x

es_port: 9200

writeback_index: elastalert_status #建立的告警索引

alert_time_limit:

days: 2 #失敗重試的時間限制

下麵先以釘釘告警為例:

在example_rules裡面新建釘釘告警配置檔案,內容如下

es_host: x.x.x.x

es_port: 9200

name: xxx安全告警

type: cardinality

index: nsfocuswaf_syslog

cardinality_field: src_ip

max_cardinality: 30

timeframe:

minutes: 5 #單IP 5分鐘內訪問超過30次就會告警

aggregation_key: src_ip

summary_table_fields:

  • src_ip

  • dst_ip

  • dst_port

  • Attack_types

  • stat_time

alert:

-“elastalert_modules.dingtalk_alert.DingTalkAlerter”

dingtalk_webhook: “your webhook”

dingtalk_msgtype: text

群機器人的配置比較簡單,自己搜尋一下即可

註意如果告警匹配了N條,卻只發出1條告警,修改elastalert.py程式碼,在856行後面增加如下程式碼:

修改dingtalk_alert.py的程式碼,增加如下內容:

src_ip = matches[0]['src_ip']   #src_ip為grok過濾後自定義的欄位,以下相同    dst_ip = matches[0]['dst_ip']    dst_port = matches[0]['dst_port']    attack_types = matches[0]['Attack_types']    start_time =  matches[0]['stat_time']    修改payload初的程式碼,如下:    payload = {    "msgtype": self.dingtalk_msgtype,           "text": {               "content": r"發現源IP地址:{}在5分鐘內,針對伺服器IP:{}的{}埠發動了3次攻擊,攻擊型別是:{},攻擊時間:{},請及時處理,謝謝!".format(src_ip,dst_ip,dst_port,attack_types,start_time)                },

如果沒有過濾除自定義的欄位,只有message欄位的話,可以新增如下程式碼:

message = matches[0]['message']    src_ip_pattern =r"src_ip:(\d+\.\d+\.\d+\.\d+)"    dst_ip_pattern =r"dst_ip:(\d+\.\d+\.\d+\.\d+)"    dst_port_pattern =r"dst_port:(\d+)"    time_pattern =r"stat_time:(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})"    type_pattern =r"event_type:(\w+)"     src_ip= re.findall(src_ip_pattern, message)[0]     dst_ip= re.findall(dst_ip_pattern, message)[0]     dst_port= re.findall(dst_port_pattern, message)[0]     start_time= re.findall(time_pattern, message)[0]     attack_types= re.findall(type_pattern, message)[0]

最終效果圖如下:

郵件的rule配置檔案如下:

es_host: x.x.x.x    es_port: 9200    name: 資訊保安告警    type: cardinality    index: nsfocuswaf_syslog    cardinality_field: src_ip    max_cardinality: 5    timeframe:     minutes: 5    alert:    - "email"    smtp_host: mail.crfchina.com    smtp_port: 25    smtp_auth_file:/opt/elastalert/smtp_auth_file.yaml    email_reply_to: xxxxx@qq.com #回覆給誰    from_addr: xxxxxx@qq.com   #發件人    email:    - xxxxx@qq.com    #收件人    alert_subject: "資訊保安告警"    alert_text_type: alert_text_only    alert_text: |    XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX      您好,         網站:{}正被人惡意攻擊,請及時處理!!      當前匹配XX規則數:{}      發生時間:{}      攻擊者的IP:{}      攻擊型別:{}      請求的URL:{}    alert_text_args:      - domain      -num_hits      -stat_time      -src_ip      -Attack_types      -URL

郵件告警最終效果如下:

五、總結

相關行程執行命令如下:

後臺啟動es

    nohup su - elasticsearch -c'/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &

後臺啟動logstash

    nohup ./bin/logstash -f x.x.x.conf &

後臺啟動kibana

    nohup ./kibana &

啟動flume

flume-ng agent --conf conf --conf-fileconf/log_analysis_test.conf --name a1 –    Dflume.root.logger=INFO,console

啟動kafka

    nohup bin/kafka-server-start.shconfig/server.properties &

後臺啟動elastalert

nohup python -m elastalert.elastalert--config ./config.yaml --verbose    --rule ./example_rules/DD_rule.yaml  &

常見的告警策略除了來自安全裝置的正則之外,大量的IP請求、錯誤狀態碼、nginx的request請求中包含的特徵碼也都是常見的告警規則。

參考連結:

http://www.freebuf.com/articles/others-articles/161905.html

http://www.freebuf.com/articles/web/160254.html

    贊(0)

    分享創造快樂