SYSLOG-NG와 MySQL 을 이용한 시스로그 응용 하기

syslog-ng 와 데이타베이스개념은 시스로그 메시지가 날라 오면 filter 키워드에 의하여 걸러진 시스로그는 

templete 에 의하여 MySQL   쿼리를 실행하게 되는데 실행시 쿼리를 이용하여 적절하게 메시지를 분석하고 그에 맞는 처리를 합니다.
그리고 난후 log에 의하여 destination이결정 되게 되는데 DB뿐 아니라 실행파일 실행, 텍스트 형태든 어떤 방식이든 로그를 남길수 있습니다.
예를 들면  INSERT ~ SELECT 문 에 의하여  SELECT 문의 조건에 부합 되지 않으면 안 넣게 할수도 있습니다.
뒤에 SELECT 문에 JOIN 을 써서 적절한 데이타를 뽑아 낼수도 있습니다.


주의 할점은 쿼리가 큰 경우에는 쿼리 실행이 늦어 지므로 최적화를 반드시 해야 하고 될수 있는한 쿼리를 단순화 하여
짧게 만들어야 합니다.

 

mysql syslog-ng 만으로 구현한 것입니다.


예전에 장비담당으로 장비 유지 보수할때 관리하던 네트워크 장비들을 DB에 넣고 SNMP PHP의 소켓을 이용하여 장비 점검 시스템을 구축 했던 적이 있었는데 

LINUX 상에서 장비 점검 시스템 구현 할때 사용 했던 것 입니다여러가지 쿼리를 구현하다보니 무리가 있었지만 JOIN 의 경우 간단한 데이타만 참조 하고 쿼리 속도만 빠르게 한다면 무리 없이 사용 할수 있을 것입니다.

자체적으로 시스로그 없이 할수 있는 것이 DB와 리눅스 뿐이었으므로 쿼리는 복잡하지만 비용없이 간단하게 구현 할수 있을 겁니다.

 

 

일반적으로




다음은 syslog-ng.conf 의 내용입니다.

# 옵션 부분 건드릴 필요 없음

options {
    sync (0);
    time_reopen (10); #
재실행시 대기 시간
    log_fifo_size (4096);
    long_hostnames (off);
    use_dns (no);
    use_fqdn (no);
    create_dirs (no);
    keep_hostname (yes);
};

 

#시스템 커널 소스 -- 사용안하므로 주석 처리
#source kernsrc { file ("/proc/kmsg" log_prefix("kernel: ")); };

 

# 네트워크 의 모든 주소로 부터 오는 시스로그 메시지 캡춰
source src {
    unix-stream ("/dev/log");
    internal();
    udp(ip(0.0.0.0) port(514));
};

 

# filter 필터명 { 조건 명령어들 };

# destination 목적지명 {
    pipe("/var/log/syslog.pipe" template("
임시파이프에남길텍스트") template-escape(yes));
    -->
일종의 버퍼인 파이프에 임시로 저장한후 외부 프로그램으로 해당 텍스트를 DB에 넣던지 마음대로 요리 할수 있음
    program("
쉘프로그램");

    --> 특정 프로그램 실행

    file("기록남길 파일 경로명")

    --> 파일로 쌓이게 하기

}

 

OR 연산자를 사용한 두개의 문자가 포함 되었을 때

 

filter invalid_fail { match("invalid") and match("bad");};

# match 는 특정 문자가 포함 되어 잇을때 사용 , 정규식 사용가능
#
문자가 포함 되어 있을때 SQL 명령 실행

destination  invalid_fail  {
 pipe("/var/log/syslog.pipe" template("
INSERT INTO `dev_operation` (IP_ADDRESS,`
지점`,`사업장`,`장비모델`,MEMO,OPER_TIME,TYPE,LEVEL)

SELECT '$HOST',`지점`,`사업장`,`장비타입`,'$MSG','$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC','INVALID',3 FROM device.nms WHERE `대표IP` ='$HOST';\n
") template-escape(yes));
#};

à 해당 호스트에 해당하는 정보를 장비DB에서 얻은후 받은 시간과 메시지타입을 찍어서 DB에 넣는다.

엔터문자로 구분 하였으나 모두 붙여서 사용해야 됨

 


#
정규식 사용가능 Regular Expression

filter 5972_uplink_removed {match("(73|74|75) removed"); };
destination  5972_uplink_removed {
 pipe("/var/log/syslog.pipe" template("INSERT INTO `dev_operation` (IP_ADDRESS,`
지점`,`사업장`,`장비모델`,OPER_TIME,MEMO,TYPE,LEVEL) SELECT '$HOST',`지점`,`사업장`,`장비타입`,'$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC','$MSG','UPLINK_REMOVED',1  FROM device.nms WHERE `대표IP` ='$HOST';\n") template-escape(yes));
};

# AND 연산자를 이용하여 다중 논리 사용가능
filter dasan_linkon { match("[0-9]{1,2} link on") and match("operational"); };

# MySQL SUBSTRING_INDEX 를 이용하여  숫자를 추출 하였습니다.
destination  dasan_linkon {
 pipe("/var/log/syslog.pipe" template("UPDATE inet_user AS iu, megapass as n SET iu.ON_USE_TIME='$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC' WHERE n.WIRECD = iu.WIRECD AND n.INET_TIE = iu.INET_TIE  AND iu.INET_TERM =CONCAT('1',LPAD(TRIM(SUBSTRING_INDEX(SUBSTRING_INDEX('$MSG','port',-1),'link on',1)),3,'0')) AND n.IP_ADDRESS ='$HOST';\n") template-escape(yes));
};

 

문자중 특정 숫자를 필터링
## CPU
부하 --> 90% 이상만 필터링

# 현재값 CPU overload warning : CPU load [91] > threshold low [70]
filter dasan_cpuload {match("CPU") and match("overload") and match("warning");  };

 

MySQL SUBSTRING_INDEX 를 이용하여  숫자를 추출 하였습니다.

추출후 SUBSTRING_INDEX(SUBSTRING_INDEX('$MSG','load [',-1),'] >',1) > 95 LIMIT 1 와 같이 쿼리를 사용하면 95% 이상 넘어서는 시스로그 메시지만 걸러 내어 DB에 넣을수 있다.

 

destination  dasan_cpuload {
 pipe("/var/log/syslog.pipe" template("INSERT  INTO `dev_operation` (IP_ADDRESS,`
지점`,`사업장`,`장비모델`,OPER_TIME,MEMO,TYPE,LEVEL)  SELECT '$HOST',`지점`,`사업장`,`장비타입`,'$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC','$MSG','CPULOAD',1  FROM device.nms WHERE `대표IP` ='$HOST' AND   SUBSTRING_INDEX(SUBSTRING_INDEX('$MSG','load [',-1),'] >',1) > 95 LIMIT 1;\n") template-escape(yes));
};


 

match 에서 맥주소 형태의 정규식 쓰는 경우


비정상맥 흘리는 공격 감지
#  22:75:77:97:ce:7d PORT[5] blocked

filter dasan_mac_flood {match("[a-zA-Z0-9]{1,2}\:[a-zA-Z0-9]{1,2}\:[a-zA-Z0-9]{1,2}\:[a-zA-Z0-9]{1,2}\:[a-zA-Z0-9]{1,2}\:[a-zA-Z0-9]{1,2} PORT\[[0-9]{1,2}\] blocked") and match("mac-flood-guard");};
destination  dasan_mac_flood {
 pipe("/var/log/syslog.pipe" template("INSERT INTO `dev_operation` (IP_ADDRESS,`
지점`,`사업장`,`장비모델`,OPER_TIME,TYPE,VALUE,MEMO,LEVEL,INPUT_TAG) SELECT '$HOST',`지점`,`사업장`,`장비타입`,'$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC','MAC_FLOOD',SUBSTRING_INDEX('$MSG',' PORT[',1),'$MSG',2,(SELECT IF(COUNT(*)>0,'N','Y') FROM dev_operation WHERE STATUS IS NULL and TYPE='MAC_FLOOD' AND OPER_TIME<'$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC' AND OPER_TIME>=DATE_SUB('$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC', INTERVAL 30 MINUTE) AND IP_ADDRESS ='$HOST' AND SUBSTRING_INDEX('$MSG',' PORT[',1) GROUP BY VALUE) FROM nms WHERE `대표IP`='$HOST';\n") template-escape(yes));
};

 

 SELECT 를 이용하여 DB를 참조 할 필요 없는 경우 VALUES 를 바로 사용하여 직접 넣을수 있다 DB를 참조 하지 않기 때문에 입력 속도가 빠르다.

destination  dasan_mac_flood {
 pipe("/var/log/syslog.pipe" template("INSERT INTO `dev_operation` (IP_ADDRESS,`
지점`,`사업장`,`장비모델`,OPER_TIME,TYPE,VALUE,MEMO,LEVEL) VALUES ('$HOST',`지점`,`사업장`,`장비타입`,'$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC','MAC_FLOOD',SUBSTRING_INDEX('$MSG',' PORT[',1),'$MSG',2,,'Y')]\n") template-escape(yes));
};

 

 

# 그냥 파일로 쌓이게 하기
destination std {
 file("/checkup/syslog/HOSTS/$HOST/$YEAR/$MONTH/$YEAR$MONTH$DAY_$HOST_syslog.txt"
  owner(root) group(root) perm(0600) dir_perm(0700) create_dirs(yes)
 );
};


#
스크립트 실행 시키기
destination phpRun {
 program("/www/sbin/phpscript.php" template("$YEAR-$MONTH-$DAY $HOUR:$MIN:$SEC $HOST $LEVEL $MSG\n") template_escape(no));
};

log { source(src);  destination(phpRun); };



# filter
의 형태로 걸리면 destination 으로 보내어서 SQL 문을 실행 시킨후 특정 DB에 넣음
log { source(src); filter(dasan_cpuload); destination(dasan_cpuload); };
log { source(src); filter(5972_loopblock); destination(5972_loopblock); };
log { source(src); filter(dasan_linkfail); destination(dasan_linkfail);  };
log { source(src); filter(5972_install); destination(5972_install); };
log { source(src); filter(5972_removed); destination(5972_removed); };
log { source(src); filter(5972_uplink_installed); destination(5972_uplink_installed); };
log { source(src); filter(5972_checksum); destination(5972_checksum); };
log { source(src); filter(dasan_fanfault); destination(dasan_fanfault); };

# destination
이 여러개 될수도 있음
log { source(src);  filter(5972_loop);  destination(5972_loop); destination(5972_loop2); destination(5972_loop_devicemac); };


 

 

 

댓글

이 블로그의 인기 게시물

XCOPY를 이용한 당일날짜의 파일만 카피하는방법

뽀롱 뽀롱 뽀로로 DVD 목록