JSON 是目前非常流行的一种存储文件的格式,但是在实际的应用中,也有很多的文件格式是 XML 格式的。那么我们该如何来处理 XML 格式的文件并把它们导入到 Elasticsearch 中呢?在今天的文章中,我们将以一个例子来说明。我们将使用 XML filter 来导入 XML 格式的数据。
如何在 ElasticSearch 中导入我的自定义 XML 文件,幸运的是 Logstash 可以为你提供帮助。 让我们创建一个示例 XML 文件,该文件要导入到 Elasticsearch 中。 复制下面的文本并将其另存为 “test1.xml”,你也可以使用自己的XML。
test1.xml
<xmldata>
<head1>
<key1>Value1</key1>
<key2>Value2</key2>
<id>0001</id>
<date>Aug 13 2011 00:03:44</date>
</head1>
<head2>
<key3>Value3</key3>
</head2>
</xmldata>
创建 Logstash 配置文件
Logstash 可以使用输入文件下的多行选项来读取 XML 文件。 下面显示示例配置文件;
input {
file {
path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
start_position => beginning
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^<\?xmldata .*\>"
negate => true
what => "previous"
auto_flush_interval => 1
在你的使用中,你需要根据自己的实际情况进行修改上面的 test1.xml 的路径。
配置的过滤器部分将读取 XML。 这个例子将过滤掉特定的值: id,date, key1 及 key3。 另外,date 值将转换为 Elasticsearch 和 Kibana 正确使用的值。
filter {
xml {
store_xml => false
source => "message"
xpath => [
"/xmldata/head1/id/text()", "id",
"/xmldata/head1/date/text()", "date",
"/xmldata/head1/key1/text()", "key1",
"/xmldata/head2/key3/text()", "key3"
mutate {
add_field => { "timestamp" => "%{[date][0]}" }
date {
match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
locale => en
mutate {
remove_field => ["message", "timestamp"]
在输出部分下,我们配置 Elasticsearch 和 stout,以便我们可以直接在控制台中查看输出。 在此示例中,从 XML 文件使用 Elasticsearch document_id。 可选,您可以设置 document_type 部分。
output {
stdout { codec => rubydebug }
elasticsearch {
index => "logstash-xml"
hosts => ["localhost:9200"]
document_id => "%{[id]}"
运行 Logstash
将三个部分(input/filter/output)放在一起时,便具有 LogStash 的完整配置文件。 将此文件另存为 logstash-xml.conf:
input {
file {
path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
start_position => beginning
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^<\?xmldata .*\>"
negate => true
what => "previous"
auto_flush_interval => 1
filter {
xml {
store_xml => false
source => "message"
xpath => [
"/xmldata/head1/id/text()", "id",
"/xmldata/head1/date/text()", "date",
"/xmldata/head1/key1/text()", "key1",
"/xmldata/head2/key3/text()", "key3"
mutate {
add_field => { "timestamp" => "%{[date][0]}" }
date {
match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
locale => en
mutate {
remove_field => ["message", "timestamp"]
output {
stdout { codec => rubydebug }
elasticsearch {
index => "logstash-xml"
hosts => ["localhost:9200"]
document_id => "%{[id]}"
你可以使用以下命令测试配置:
sudo ./bin/logstash -f logstash-xml.conf --configtest
直接运行配置文件:
sudo ./bin/logstash -f logstash-xml.conf
我们可以在 Logstash 的 console 里看到如下的信息:
我们可以在 Elasticsearch 中看到被导入的数据:
GET logstash-xml/_search
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
"max_score" : 1.0,
"hits" : [
"_index" : "logstash-xml",
"_type" : "_doc",
"_id" : "0001",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2011-08-12T16:03:44.000Z",
"key3" : [
"Value3"
"path" : "/Users/liuxg/elastic/logstash-7.9.0/test1.xml",
"id" : [
"0001"
"@version" : "1",
"host" : "liuxg",
"tags" : [
"multiline"
"key1" : [
"Value1"
"date" : [
"Aug 13 2011 00:03:44"
我们可以看到 _id 值为 “0001”,也就是在我们的 XML 文件中所定义的值。