您可以为此使用Watcher商业产品并定义以下手表:
PUT _watcher/watch/transaction_alert{ "trigger": { "schedule": { "interval": "1m" } }, "input": { "search": { "request": { "indices": "transactions", "types": "transaction", "body": { "query": { "match_all": {} }, "size": 0, "aggs": { "groupBy": { "terms": { "field": "CustomerName" }, "aggs": { "points_sum": { "stats": { "field": "TransactionAmount" } } } } } } } } }, "condition": { "script": { "inline": "return ctx.payload.aggregations.groupBy.buckets.findAll{ cust -> cust.points_sum.avg >= 200}" } }, "actions": { "send_email": { "email": { "to": "<username>@<domainname>", "subject": "Customer Notification - Transaction > 200", "body": "The attached customers have a transaction average above 0" "attachments" : {"data.yml" : { "data" : { "format" : "yaml" }} } } } }}
更新
总结一下:
- 看守是一种商业产品
- Elastalert不支持(至今),需要一些努力才能使其运行
还有另一种使用Logstash来实现此目的的更简单,更便宜的方法。即使
elasticsearch输入插件不支持聚合,也可以使用
http_poller输入插件定期发送聚合查询到Elasticsearch。然后使用过滤器可以检查是否达到所需的阈值,最后使用
配置基本上是这样的(请注意,您需要对上面的聚合查询进行URL编码,并使用
source=...参数将其发送给ES
)。另请注意,我已经修改了您的查询,以根据
points_sum.avg(desc)对存储桶进行排序
input { http_poller { urls => { test1 => 'http://localhost:9200/your-index/_search?source=%7B%22query%22%3A%7B%22match_all%22%3A%7B%7D%7D%2C%22aggs%22%3A%7B%22groupBy%22%3A%7B%22terms%22%3A%7B%22field%22%3A%22CustomerName%22%2C%22order%22%3A%7B%22points_sum.avg%22%3A%22desc%22%7D%7D%2C%22aggs%22%3A%7B%22points_sum%22%3A%7B%22stats%22%3A%7B%22field%22%3A%22TransactionAmount%22%7D%7D%7D%7D%7D%2C%22size%22%3A0%7D' } # checking every 10 seconds interval => 10 prec => "json" }}filter { split { field => "[aggregations][groupBy][buckets]" }}output { if [aggregations][groupBy][buckets][points_sum][avg] > 200 { email { to => "<username>@<domainname>" subject => "Customer Notification - Transaction > 200", body => "The customer %{[aggregations][groupBy][buckets][key]} has a transaction average above 0" } }}
同意,这是一个非常简单的实现,但是它应该可以运行,并且您可以基于它来使它变得更智能,借助Logstash和您的想象力,极限就是天空;-)
更新2
另一个node.js工具调用elasticwatch也可以用来做到这一点。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)