·  阅读

JSON 是目前非常流行的一种存储文件的格式,但是在实际的应用中,也有很多的文件格式是  XML 格式的。那么我们该如何来处理 XML 格式的文件并把它们导入到 Elasticsearch 中呢?在今天的文章中,我们将以一个例子来说明。我们将使用 XML filter 来导入 XML 格式的数据。

如何在 ElasticSearch 中导入我的自定义 XML 文件,幸运的是 Logstash 可以为你提供帮助。 让我们创建一个示例 XML 文件,该文件要导入到 Elasticsearch 中。 复制下面的文本并将其另存为 “test1.xml”,你也可以使用自己的XML。

test1.xml

<xmldata>
 <head1>
  <key1>Value1</key1>
  <key2>Value2</key2>
  <id>0001</id>
  <date>Aug 13 2011 00:03:44</date>
 </head1>
 <head2>
  <key3>Value3</key3>
 </head2>
</xmldata>

创建 Logstash 配置文件

Logstash 可以使用输入文件下的多行选项来读取 XML 文件。 下面显示示例配置文件;

input {
  file {
    path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
    start_position => beginning
    sincedb_path => "/dev/null"
    codec => multiline {
       pattern => "^<\?xmldata .*\>"
       negate => true
       what => "previous"
       auto_flush_interval => 1

在你的使用中,你需要根据自己的实际情况进行修改上面的 test1.xml 的路径。

配置的过滤器部分将读取 XML。 这个例子将过滤掉特定的值: id,date, key1 及 key3。 另外,date 值将转换为 Elasticsearch 和 Kibana 正确使用的值。

filter {
  xml {
    store_xml => false
    source => "message"
    xpath => [
      "/xmldata/head1/id/text()", "id",
      "/xmldata/head1/date/text()", "date",
      "/xmldata/head1/key1/text()", "key1",
      "/xmldata/head2/key3/text()", "key3"
  mutate {
    add_field => { "timestamp" => "%{[date][0]}" }
  date {
     match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
     locale => en
  mutate {
      remove_field => ["message", "timestamp"]

在输出部分下,我们配置 Elasticsearch 和 stout,以便我们可以直接在控制台中查看输出。 在此示例中,从 XML 文件使用 Elasticsearch document_id。 可选,您可以设置 document_type 部分。

output {
  stdout { codec => rubydebug }
  elasticsearch {
    index => "logstash-xml"
    hosts => ["localhost:9200"]
    document_id => "%{[id]}"

运行 Logstash

将三个部分(input/filter/output)放在一起时,便具有 LogStash 的完整配置文件。 将此文件另存为 logstash-xml.conf:

input {
  file {
    path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
    start_position => beginning
    sincedb_path => "/dev/null"
    codec => multiline {
       pattern => "^<\?xmldata .*\>"
       negate => true
       what => "previous"
       auto_flush_interval => 1
filter {
  xml {
    store_xml => false
    source => "message"
    xpath => [
      "/xmldata/head1/id/text()", "id",
      "/xmldata/head1/date/text()", "date",
      "/xmldata/head1/key1/text()", "key1",
      "/xmldata/head2/key3/text()", "key3"
  mutate {
    add_field => { "timestamp" => "%{[date][0]}" }
  date {
     match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
     locale => en
  mutate {
      remove_field => ["message", "timestamp"]
output {
  stdout { codec => rubydebug }
  elasticsearch {
    index => "logstash-xml"
    hosts => ["localhost:9200"]
    document_id => "%{[id]}"

你可以使用以下命令测试配置:

sudo ./bin/logstash -f logstash-xml.conf --configtest

直接运行配置文件:

sudo ./bin/logstash -f logstash-xml.conf

我们可以在 Logstash 的 console 里看到如下的信息:

我们可以在 Elasticsearch 中看到被导入的数据:

GET logstash-xml/_search
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    "max_score" : 1.0,
    "hits" : [
        "_index" : "logstash-xml",
        "_type" : "_doc",
        "_id" : "0001",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2011-08-12T16:03:44.000Z",
          "key3" : [
            "Value3"
          "path" : "/Users/liuxg/elastic/logstash-7.9.0/test1.xml",
          "id" : [
            "0001"
          "@version" : "1",
          "host" : "liuxg",
          "tags" : [
            "multiline"
          "key1" : [
            "Value1"
          "date" : [
            "Aug 13 2011 00:03:44"

我们可以看到 _id 值为 “0001”,也就是在我们的 XML 文件中所定义的值。

分类:
后端
标签: