ElasticSearch 살펴보기 - Aggregation
Aggregation
- 인덱스를 활용한 분산 처리 가능
- 데이터가 양이 많을 수록 CPU/메모리 사용률이 높아짐 -> ES 캐시를 사용하여 최적화 가능
- 특정 필드에서 aggregation 연산을 할 때, 모든 필드 값을 메모리에 로드 -> 리소스 사용율 증가
ES cache 종류 | |
---|---|
node query cache | - 노드내 모든 샤드가 공유 - LRU 캐시 - elasticsearch.yml 에 ’index.queries.cache.enabled: true’ 설정하여 캐시 활성화 |
shard request cache | - 샤드에서 수행된 쿼리 결과를 캐싱 - 샤드의 내용이 변경되면 캐시가 삭제됨 -> 업데이트가 많은 샤드에서는 shard request cache 적용이 성능 저하를 유발 |
field data cache | aggregation 동인 필드 값을 메모리에 캐시 |
Aggregation API 동작 실행 방식
쿼리 수행 결과에 대해 aggregation 결과 수행
"query" : { ... } // 생략시 match._all
"aggs" : { ... }
}
- bucket : 쿼리 결과로 grouping 된 문서들의 모음
분산환경에서 Aggregation 동작 방식
- coordinate node 는 각 shard 결과를 취합해서 size 만큼 최종결과 반환
- 각 shard 에 shard_size 크기만큼 1차 집계 결과 요청
- shard_size 외 결과는 누락되며, 최종 결과의 정확도에 영향
Aggregation Query Request / Response 예제
Request
{
"query" : { ... } // 생략시 match._all
"aggs" : {
"<aggregation_name1>": {
"<aggregation_type>": {
"field": "{필드명}"
}
}
[,"meta": { [<meta_data_body>] } ]?
[,"aggregations": { [<sub_aggregations>] } ]?
"<aggregation_name2>": {
"global": {}, // (bucket 외에) 전체 문서에 대해서 aggregation 수행할 때 설정
"aggs": {
"<sub_aggregation_name1>": {
"<sub_aggregation_type>": {
"field": "{필드명}"
}
}
}
}
}
}
Response
{
"took" : 2, // 소요시간
"timed_out" : false, // timeout 발생 여부
"_shards" : { // 검색에 영향 받은 shard 정보
"total": 5, // - 영향받은 shard 총 개수
"success": 3, // - 검색이 처리된 shard 개수
"skipped": 2, // - 검색이 처리되지 않은(skip) shard 개수
"failed": 0, // - 검색이 처리 실패한 shard 개수
},
"hits": { // 검색 결과
"total": 100, // 검색 쿼리가 일치한 문서 총 개수
"max_score": 100, // 검색 결과에 포함된 문서의 score 최대값
"hits": {} // 검색 결과 문서 목록 (default: 10개만 반환)
}
"aggregations" : {
"<aggregation_name>": {
// 집계 결과
}
}
}
Metric Aggregation
- 특정 필드에 대한 sum, avg, sort, geo_bounds 계산
- (검색 결과 출력 없이) 집계 결과만 출력해야하는 경우 (size=0 으로 쿼리)
- ex.
GET {인덱스명}/_search?size=0
- ex.
- 집계 연산간 각 문서에 대해 적용될 script 작성 가능
- query 작성시 constant_score 를 사용하여, score 값을 계산하지 않도록 해서 성능 최적화
Metric Aggregation 종류 | |
---|---|
single-value | - 결과 값이 단일값 - sum, avg 등 |
multi-value | - 결과 값이 여러개가 될 수 있음 - stats, geo_bounds |
Metric Aggregation 유형별 예제
sum / avg / min / max / value_count (count)
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"sum": { // 또는 avg, min, max, value_count
"field": "{필드명}"
"script": { ... } // 실행할 script (optional)
}
}
}
}
stats
- sum, avg, min, max, value_count 를 한번에 연산
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"stats": {
"field": "{필드명}"
"script": { ... } // 실행할 script (optional)
}
}
}
}
extended_stats
- sum, avg, min, max, value_count 를 한번에 연산
- 추가 응답
- sum_of_squares : 제곱합 (평균 편차)
- variance : 분산
- std_derivation : 표준 편차
- std_derivation_bound : 표춘편차 범위
- upper : 표준편차 상한
- lower : 표준편차 하한
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"extended_stats": {
"field": "{필드명}"
"script": { ... } // 실행할 script (optional)
}
}
}
}
cardinality
- uniq 개수 카운트
- (정확한 값이 아닌) 근사치 반환
- ‘{필드명}’ 에 (분석기를 사용하는) text 타입이 아닌 keyword 타입을 지정해야함 (ex. “field” : xxx.yyy.keyword )
- precision_threshold 값을 지정해 정확도 조정이 가능 (0 ~ 40000 값 설정 가능)
- 정확도를 높일 수록 많은 메모리 필요
- cardinality 는 hash 를 기반으로 계산됨
- murmur3 플러그인을 사용하면 hash 값을 미리 생성해 성능 향상 효과가 있다.
- 분산환경에서는 각 shard 에서 중복제거한 목록을 coordinate 노드로 전송해야하며, 중간 결과를 임시 저장 및 전송하기 위해 트래픽/메모리 소비가 크다
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"cardinality": {
"field": "{필드명}"
"script": { ... } // 실행할 script (optional)
}
}
}
}
percentiles
- 백분위 구간 별 count
- 근사값으로 계산
- percents 값을 사용해 집계 구간을 지정 가능
- compression 옵션으로 정확도 조절이 가능
- 정확도를 높일 수록 메모리 사용량 증가
- 참고: TDigest 알고리즘 (노드를 사용한 근사치 계산)
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"percentiles": {
"field": "{필드명}"
"percents": [10, 20, 30, ... , 90]
}
}
}
}
percentile_rank
- 백분위 구간/위치 연산
- 근사값으로 계산
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"percentile_rank": {
"field": "{필드명}"
"values" : [100, 200] // 검색하고자하는 필드값
}
}
}
}
geo_bounds
- 지형 경계 box 계산
- geo_point 타입의 field 에 대해서만 연산 가능
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"geo_bounds": {
"field": "{geo_point 타입의 필드명}"
"percents": [10, 20, 30, ... , 90]
}
}
}
}
geo_centroid
- 지형 중심 계산
- geo_point 타입의 field 에 대해서만 연산 가능
{
"query" : {
"constant_score": {
"filter": {
"match" : {
"{필드명}": "{검색어}".
}
}
}
},
"aggs": {
"{agg_name}" : {
"geo_centroid": {
"field": "{geo_point 타입의 필드명}"
}
}
}
}
Bucket Aggregation
- metric 계산없이 bucket 생성
- bucket 집계 결과로 bucket 을 만들고 계속 중첩된 집계 가능
- 중첩이 많을 수록 메모리 사용률 증가
- ES에 최대 허용 bucket 수 제한 있음 ( search.max_buckets 에 설정 가능)
- bucket
- 집계된 결과 데이터
- 메모리에 저장
Bucket Aggregation 동작 방식
- 검색 처리 노드에서 각 shard 결과를 취합해서 size 만큼 최종결과 반환
- 각 shard 에 shard_size 크기의 집계 결과 요청
- shard_size 외 결과는 누락되며, 최종 결과의 정확도에 영향
Bucket Aggregation 응답 포맷
{
"aggregations": {
"{agg_name}" : {
"doc_count_error_upper_bound": 10, // 집계 오류 상한
"sum_other_doc_count": 1111, // 결과에 누락된 문서 개수 (남은 결과가 더 있는지 여부 판단용)
"buckets" : [
// 집계 결과
]
}
}
}
Bucket Aggregation 유형혈 예제
range
- from 부터 to 미만의 범위에 대한 집계 연산
{
"aggs": {
"{agg_name}" : {
"range": {
"field": "{필드명}"
"ranges": [
{
"key": "{범위 이름}", // optional
"from" : 1,
"to": 100" // 100은 연산범위에 미포함, 100 미만
},
{ "from" : 1000 },
...
]
}
}
}
}
date_range
- 날짜 범위(from ~ to 미만)의 집계 연산
- 날짜 포맷은 ES 지원 형식만 사용 가능 (UTC)
{
"aggs": {
"{agg_name}" : {
"range": {
"field": "{필드명}"
"ranges": [
{
"key": "{범위 이름}", // optional
"from" : 1,
"to": 100" // 100은 연산범위에 미포함, 100 미만
},
{ "from" : 1000 },
...
]
}
}
}
}
histogram
- 일정 간격 단위 집계 연산
{
"aggs": {
"{agg_name}" : {
"histogram": {
"field": "{필드명}"
"interval": 1000 // 1000 간격으로 집계
"min_doc_count": 1
// 최소 1개의 결과가 있을때만 출력 (optional)
}
}
}
}
date_histogram
- year, quarter, month, week, day, hour, minute, second 등 시간 간격으로 집계 연산
- “30m” (30분 간격), “1.5h” (90분 간격) 등 지정 가능
- 날짜 포맷은 ES 지원 형식만 사용 가능 (UTC)
{
"aggs": {
"{agg_name}" : {
"date_histogram": {
"field": "{필드명}"
"interval": "day" // 일 간격으로 집계
"format": "yyyy-MM-dd" // 집계 결과에서 시간 출력 포맷
// ( yyyy-MM-dd’T’HH:mm:ss.SSS지정 가능)
"time_zone": "+09:00" // 날짜를 한국시간(+09:00) 으로 변환하여 연산
"offset": "+1h" // 01 시부터 다음날 01까지 day 간격으로 집계
}
}
}
}
terms
- ‘{필드명}’ 필드에 대해 빈도수가 높은 term 순위의 집계
- bucket 이 동적으로 생성되는 다중 bucket 집계 연산
- ‘{필드명}’ 에는 (형태소분석이 필요없는) keyword 데이터 타입을 명시해야 함
- (정확한 값이 아닌) 근사치를 계산
- size, shard_size 지정으로 정확도 향상 가능
- 값이 클수록 메모리 사용량 및 연산 사간 증가
{
"aggs": {
"{agg_name}" : {
"terms": {
"field": "{필드명, xxx.keyword}". // keyword 타입을 명시해야 함
"size": 5 // 반환할 결과값 크기 (default: 10), 값이 클수록 정확도 향상
"shard_size": 100 // 각 샤드에 최초 수행되는 집계 결과 크기
}
}
}
}
Pipeline Aggregation
- 다른 집계 결과로 생성된 bucket 에 대해 추가적인 집계 연산 수행
- Parent, Sibling 두 종류의 파이프라인 집계 방식 있음
- buckets_path 로 참조할 집계 결과의 경로를 지정
Pipeline Aggregation 유형별 예제
Pipeline Aggregation - Sibling
{
"aggs": {
"{agg_name1}" : {
"{agg_type1}": {
...
},
"aggs": {
"{agg_name2}" : {
"{agg_type2}": {
...
}
}
},
"{pipeline_agg_name}": {
"{pipeline_agg_type}": { // ex. max_bucket (가장 큰 값을 가진 bucket 출력)
"buckets_path": "{agg_name1} > {agg_name2}.{metric name(optional)}"
}
}
}
}
}
- pipeline_agg_type
- min_bucket
- avg_bucket
- sum_bucket
- stats_bucket
- extend_stats_bucket
- percentiles_bucket
- moving_avg_bucket
Pipeline Aggregation - Parent
{
"aggs": {
"{agg_name1}" : {
"{agg_type1}": {
...
},
"aggs": {
"{agg_name2}" : {
"{agg_type2}": {
...
}
},
"{pipeline_agg_name}": {
"{pipeline_agg_type}": { // ex. derivative (차이/변화량 계산하여 집계 결과에 추가)
"buckets_path": "{agg_name2}"
}
}
}
}
}
}
- 집계 결과로 생성된 bucket 을 이용해 계산 후, 기존 집계 결과에 계산 결과를 추가
- ex. bucket 간의 수치 차이(변화량) 등 계산
- (histogram 등) 집계 결과에 누락되는 구간이 없도록 min_doc_count 값을 0으로 설정
- gap : 누락되는 구간
- gap_policy : 누락되는 구간의 처리 방식
- skip : 누락된 bucket 을 skip 하고 pipeline 집계
- insert_zeros: 누락된 값을 0으로 대체 후, pipeline 집계
- pipeline_agg_type
- derivative (변화량)
- cumulative_sum (누적)
- bucket_script
- bucket_selector
- serial_diff (시계열 차분)
Pipeline Aggregation - Parent : bucket_script 사용예
{
...
"{pipeline_bucket_name}" {
"bucket_script": {
"buckets_path": {
"val1": "{agg_name1}",
"val2": "{agg_name2}",
},
"script": "params.val1 / params.val2"
}
}
}
Pipeline Aggregation - Parent : bucket_selector 사용예
{
...
"{pipeline_bucket_name}" {
"bucket_selector": {
"buckets_path": {
"val1": "{agg_name1}",
"val2": "{agg_name2}",
},
"script": "params.val1 > params.val2"
}
}
}
Pipeline Aggregation - Parent : serial_diff 사용예
{
...
"{pipeline_bucket_name}" {
"serial_diff": {
"buckets_path": "{agg_name1}"
"lag": "7"
}
}
}