数据输入和输出

无论我们写什么样的程序,目的都是一样的:以某种方式组织数据服务我们的目的。 但是数据不仅仅由随机位和字节组成。我们建立数据元素之间的关系以便于表示实体,或者现实世界中存在的 事物 。 如果我们知道一个名字和电子邮件地址属于同一个人,那么它们将会更有意义。

尽管在现实世界中,不是所有的类型相同的实体看起来都是一样的。 一个人可能有一个家庭电话号码,而另一个人只有一个手机号码,再一个人可能两者兼有。 一个人可能有三个电子邮件地址,而另一个人却一个都没有。一位西班牙人可能有两个姓,而讲英语的人可能只有一个姓。

面向对象编程语言如此流行的原因之一是对象帮我们表示和处理现实世界具有潜在的复杂的数据结构的实体,到目前为止,一切都很完美!

但是当我们需要存储这些实体时问题来了,传统上,我们以行和列的形式存储数据到关系型数据库中,相当于使用电子表格。 正因为我们使用了这种不灵活的存储媒介导致所有我们使用对象的灵活性都丢失了。

但是否我们可以将我们的对象按对象的方式来存储?这样我们就能更加专注于 使用 数据,而不是在电子表格的局限性下对我们的应用建模。 我们可以重新利用对象的灵活性。

一个 对象 是基于特定语言的内存的数据结构。为了通过网络发送或者存储它,我们需要将它表示成某种标准的格式。 JSON 是一种以人可读的文本表示对象的方法。 它已经变成 NoSQL 世界交换数据的事实标准。当一个对象被序列化成为 JSON,它被称为一个 JSON 文档

Elastcisearch 是分布式的 文档 存储。它能存储和检索复杂的数据结构—​序列化成为JSON文档—​以 实时 的方式。 换句话说,一旦一个文档被存储在 Elasticsearch 中,它就是可以被集群中的任意节点检索到。

当然,我们不仅要存储数据,我们一定还需要查询它,成批且快速的查询它们。 尽管现存的 NoSQL 解决方案允许我们以文档的形式存储对象,但是他们仍旧需要我们思考如何查询我们的数据,以及确定哪些字段需要被索引以加快数据检索。

在 Elasticsearch 中, 每个字段的所有数据 都是 默认被索引的 。 即每个字段都有为了快速检索设置的专用倒排索引。而且,不像其他多数的数据库,它能在 相同的查询中 使用所有这些倒排索引,并以惊人的速度返回结果。

在本章中,我们展示了用来创建,检索,更新和删除文档的 API。就目前而言,我们不关心文档中的数据或者怎样查询它们。 所有我们关心的就是在 Elasticsearch 中怎样安全的存储文档,以及如何将文档再次返回。

什么是文档?

在大多数应用中,多数实体或对象可以被序列化为包含键值对的 JSON 对象。 一个 可以是一个字段或字段的名称,一个 可以是一个字符串,一个数字,一个布尔值, 另一个对象,一些数组值,或一些其它特殊类型诸如表示日期的字符串,或代表一个地理位置的对象:

{
    "name":         "John Smith",
    "age":          42,
    "confirmed":    true,
    "join_date":    "2014-06-01",
    "home": {
        "lat":      51.5,
        "lon":      0.1
    },
    "accounts": [
        {
            "type": "facebook",
            "id":   "johnsmith"
        },
        {
            "type": "twitter",
            "id":   "johnsmith"
        }
    ]
}

通常情况下,我们使用的术语 对象文档 是可以互相替换的。不过,有一个区别: 一个对象仅仅是类似于 hash 、 hashmap 、字典或者关联数组的 JSON 对象,对象中也可以嵌套其他的对象。 对象可能包含了另外一些对象。在 Elasticsearch 中,术语 文档 有着特定的含义。它是指最顶层或者根对象, 这个根对象被序列化成 JSON 并存储到 Elasticsearch 中,指定了唯一 ID。

Warning
字段的名字可以是任何合法的字符串,但不可以包含时间段。

文档元数据

一个文档不仅仅包含它的数据 ,也包含 元数据 —— 有关 文档的信息。 三个必须的元数据元素如下:

_index

文档在哪存放

_type

文档表示的对象类别

_id

文档唯一标识

_index

一个 索引 应该是因共同的特性被分组到一起的文档集合。 例如,你可能存储所有的产品在索引 products 中,而存储所有销售的交易到索引 sales 中。 虽然也允许存储不相关的数据到一个索引中,但这通常看作是一个反模式的做法。

Tip

实际上,在 Elasticsearch 中,我们的数据是被存储和索引在 分片 中,而一个索引仅仅是逻辑上的命名空间, 这个命名空间由一个或者多个分片组合在一起。 然而,这是一个内部细节,我们的应用程序根本不应该关心分片,对于应用程序而言,只需知道文档位于一个 索引 内。 Elasticsearch 会处理所有的细节。

我们将在 索引管理 介绍如何自行创建和管理索引,但现在我们将让 Elasticsearch 帮我们创建索引。 所有需要我们做的就是选择一个索引名,这个名字必须小写,不能以下划线开头,不能包含逗号。我们用 website 作为索引名举例。

_type

数据可能在索引中只是松散的组合在一起,但是通常明确定义一些数据中的子分区是很有用的。 例如,所有的产品都放在一个索引中,但是你有许多不同的产品类别,比如 "electronics" 、 "kitchen" 和 "lawn-care"。

这些文档共享一种相同的(或非常相似)的模式:他们有一个标题、描述、产品代码和价格。他们只是正好属于“产品”下的一些子类。

Elasticsearch 公开了一个称为 types (类型)的特性,它允许您在索引中对数据进行逻辑分区。不同 types 的文档可能有不同的字段,但最好能够非常相似。 我们将在 类型和映射 中更多的讨论关于 types 的一些应用和限制。

一个 _type 命名可以是大写或者小写,但是不能以下划线或者句号开头,不应该包含逗号, 并且长度限制为256个字符. 我们使用 blog 作为类型名举例。

_id

ID 是一个字符串,当它和 _index 以及 _type 组合就可以唯一确定 Elasticsearch 中的一个文档。 当你创建一个新的文档,要么提供自己的 _id ,要么让 Elasticsearch 帮你生成。

其他元数据

还有一些其他的元数据元素,他们在 类型和映射 进行了介绍。通过前面已经列出的元数据元素, 我们已经能存储文档到 Elasticsearch 中并通过 ID 检索它—​换句话说,使用 Elasticsearch 作为文档的存储介质。

索引文档

通过使用 index API ,文档可以被 索引 —— 存储和使文档可被搜索。 但是首先,我们要确定文档的位置。正如我们刚刚讨论的,一个文档的 _index_type_id 唯一标识一个文档。 我们可以提供自定义的 _id 值,或者让 index API 自动生成。

使用自定义的 ID

如果你的文档有一个自然的标识符 (例如,一个 user_account 字段或其他标识文档的值),你应该使用如下方式的 index API 并提供你自己 _id

PUT /{index}/{type}/{id}
{
  "field": "value",
  ...
}

举个例子,如果我们的索引称为 website ,类型称为 blog ,并且选择 123 作为 ID ,那么索引请求应该是下面这样:

PUT /website/blog/123
{
  "title": "My first blog entry",
  "text":  "Just trying this out...",
  "date":  "2014/01/01"
}

Elasticsearch 响应体如下所示:

{
   "_index":    "website",
   "_type":     "blog",
   "_id":       "123",
   "_version":  1,
   "created":   true
}

该响应表明文档已经成功创建,该索引包括 _index_type_id 元数据, 以及一个新元素: _version

在 Elasticsearch 中每个文档都有一个版本号。当每次对文档进行修改时(包括删除), _version 的值会递增。 在 处理冲突 中,我们讨论了怎样使用 _version 号码确保你的应用程序中的一部分修改不会覆盖另一部分所做的修改。

Autogenerating IDs

如果你的数据没有自然的 ID, Elasticsearch 可以帮我们自动生成 ID 。 请求的结构调整为: 不再使用 PUT 谓词(“使用这个 URL 存储这个文档”), 而是使用 POST 谓词(“存储文档在这个 URL 命名空间下”)。

现在该 URL 只需包含 _index_type :

POST /website/blog/
{
  "title": "My second blog entry",
  "text":  "Still trying this out...",
  "date":  "2014/01/01"
}

除了 _id 是 Elasticsearch 自动生成的,响应的其他部分和前面的类似:

{
   "_index":    "website",
   "_type":     "blog",
   "_id":       "AVFgSgVHUP18jI2wRx0w",
   "_version":  1,
   "created":   true
}

自动生成的 ID 是 URL-safe、 基于 Base64 编码且长度为20个字符的 GUID 字符串。 这些 GUID 字符串由可修改的 FlakeID 模式生成,这种模式允许多个节点并行生成唯一 ID ,且互相之间的冲突概率几乎为零。

取回一个文档

为了从 Elasticsearch 中检索出文档,我们仍然使用相同的 _index , _type , 和 _id ,但是 HTTP 谓词更改为 GET :

GET /website/blog/123?pretty

响应体包括目前已经熟悉了的元数据元素,再加上 _source 字段,这个字段包含我们索引数据时发送给 Elasticsearch 的原始 JSON 文档:

{
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "123",
  "_version" : 1,
  "found" :    true,
  "_source" :  {
      "title": "My first blog entry",
      "text":  "Just trying this out...",
      "date":  "2014/01/01"
  }
}
Note

在请求的查询串参数中加上 pretty 参数,正如前面的例子中看到的,这将会调用 Elasticsearch 的 pretty-print 功能,该功能 使得 JSON 响应体更加可读。但是, _source 字段不能被格式化打印出来。相反,我们得到的 _source 字段中的 JSON 串,刚好是和我们传给它的一样。

GET 请求的响应体包括 {"found": true} ,这证实了文档已经被找到。 如果我们请求一个不存在的文档,我们仍旧会得到一个 JSON 响应体,但是 found 将会是 false 。 此外, HTTP 响应码将会是 404 Not Found ,而不是 200 OK

我们可以通过传递 -i 参数给 curl 命令,该参数能够显示响应的头部:

curl -i -XGET http://localhost:9200/website/blog/124?pretty

显示响应头部的响应体现在类似这样:

HTTP/1.1 404 Not Found
Content-Type: application/json; charset=UTF-8
Content-Length: 83

{
  "_index" : "website",
  "_type" :  "blog",
  "_id" :    "124",
  "found" :  false
}

返回文档的一部分

默认情况下, GET 请求会返回整个文档,这个文档正如存储在 _source 字段中的一样。但是也许你只对其中的 title 字段感兴趣。单个字段能用 _source 参数请求得到,多个字段也能使用逗号分隔的列表来指定。

GET /website/blog/123?_source=title,text

_source 字段现在包含的只是我们请求的那些字段,并且已经将 date 字段过滤掉了。

{
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "123",
  "_version" : 1,
  "found" :   true,
  "_source" : {
      "title": "My first blog entry" ,
      "text":  "Just trying this out..."
  }
}

或者,如果你只想得到 _source 字段,不需要任何元数据,你能使用 _source 端点:

GET /website/blog/123/_source

那么返回的的内容如下所示:

{
   "title": "My first blog entry",
   "text":  "Just trying this out...",
   "date":  "2014/01/01"
}

检查文档是否存在

如果只想检查一个文档是否存在--根本不想关心内容—​那么用 HEAD 方法来代替 GET 方法。 HEAD 请求没有返回体,只返回一个 HTTP 请求报头:

curl -i -XHEAD http://localhost:9200/website/blog/123

如果文档存在, Elasticsearch 将返回一个 200 ok 的状态码:

HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

若文档不存在, Elasticsearch 将返回一个 404 Not Found 的状态码:

curl -i -XHEAD http://localhost:9200/website/blog/124
HTTP/1.1 404 Not Found
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

当然,一个文档仅仅是在检查的时候不存在,并不意味着一毫秒之后它也不存在:也许同时正好另一个进程就创建了该文档。

更新整个文档

在 Elasticsearch 中文档是 不可改变 的,不能修改它们。相反,如果想要更新现有的文档,需要 重建索引 或者进行替换, 我们可以使用相同的 index API 进行实现,在 索引文档 中已经进行了讨论。

PUT /website/blog/123
{
  "title": "My first blog entry",
  "text":  "I am starting to get the hang of this...",
  "date":  "2014/01/02"
}

在响应体中,我们能看到 Elasticsearch 已经增加了 _version 字段值:

{
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "123",
  "_version" : 2,
  "created":   false (1)
}
  1. created 标志设置成 false ,是因为相同的索引、类型和 ID 的文档已经存在。

在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档。 尽管你不能再对旧版本的文档进行访问,但它并不会立即消失。当继续索引更多的数据,Elasticsearch 会在后台清理这些已删除文档。

在本章的后面部分,我们会介绍 update API, 这个 API 可以用于 partial updates to a document 。 虽然它似乎对文档直接进行了修改,但实际上 Elasticsearch 按前述完全相同方式执行以下过程:

  1. 从旧文档构建 JSON

  2. 更改该 JSON

  3. 删除旧文档

  4. 索引一个新文档

唯一的区别在于, update API 仅仅通过一个客户端请求来实现这些步骤,而不需要单独的 getindex 请求。

创建新文档

当我们索引一个文档,怎么确认我们正在创建一个完全新的文档,而不是覆盖现有的呢?

请记住, _index_type_id 的组合可以唯一标识一个文档。所以,确保创建一个新文档的最简单办法是,使用索引请求的 POST 形式让 Elasticsearch 自动生成唯一 _id :

POST /website/blog/
{ ... }

然而,如果已经有自己的 _id ,那么我们必须告诉 Elasticsearch ,只有在相同的 _index_type_id 不存在时才接受我们的索引请求。这里有两种方式,他们做的实际是相同的事情。使用哪种,取决于哪种使用起来更方便。

第一种方法使用 op_type 查询-字符串参数:

PUT /website/blog/123?op_type=create
{ ... }

第二种方法是在 URL 末端使用 /_create :

PUT /website/blog/123/_create
{ ... }

如果创建新文档的请求成功执行,Elasticsearch 会返回元数据和一个 201 Created 的 HTTP 响应码。

另一方面,如果具有相同的 _index_type_id 的文档已经存在,Elasticsearch 将会返回 409 Conflict 响应码,以及如下的错误信息:

{
   "error": {
      "root_cause": [
         {
            "type": "document_already_exists_exception",
            "reason": "[blog][123]: document already exists",
            "shard": "0",
            "index": "website"
         }
      ],
      "type": "document_already_exists_exception",
      "reason": "[blog][123]: document already exists",
      "shard": "0",
      "index": "website"
   },
   "status": 409
}

删除文档

删除文档的语法和我们所知道的规则相同,只是使用 DELETE 方法:

DELETE /website/blog/123

如果找到该文档,Elasticsearch 将要返回一个 200 ok 的 HTTP 响应码,和一个类似以下结构的响应体。注意,字段 _version 值已经增加:

{
  "found" :    true,
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "123",
  "_version" : 3
}

如果文档没有找到,我们将得到 404 Not Found 的响应码和类似这样的响应体:

{
  "found" :    false,
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "123",
  "_version" : 4
}

即使文档不存在( Foundfalse ), _version 值仍然会增加。这是 Elasticsearch 内部记录本的一部分,用来确保这些改变在跨多节点时以正确的顺序执行。

Note
正如已经在更新整个文档中提到的,删除文档不会立即将文档从磁盘中删除,只是将文档标记为已删除状态。随着你不断的索引更多的数据,Elasticsearch 将会在后台清理标记为已删除的文档。

处理冲突

当我们使用 index API 更新文档 ,可以一次性读取原始文档,做我们的修改,然后重新索引 整个文档 。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。

很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到 Elasticsearch 中并使其可被搜索。 也许两个人同时更改相同的文档的几率很小。或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。

但有时丢失了一个变更就是 非常严重的 。试想我们使用 Elasticsearch 存储我们网上商城商品库存的数量, 每次我们卖一个商品的时候,我们在 Elasticsearch 中将库存数量减少。

有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个 web 程序并行运行,每一个都同时处理所有商品的销售,如图 Consequence of no concurrency control 所示。

Consequence of no concurrency control
Figure 7. Consequence of no concurrency control

web_1stock_count 所做的更改已经丢失,因为 web_2 不知道它的 stock_count 的拷贝已经过期。 结果我们会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望。

变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。

在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:

悲观并发控制

这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。

乐观并发控制

Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

乐观并发控制

Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。

当我们之前讨论 indexGETdelete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用 _version 号来确保 应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。

让我们创建一个新的博客文章:

PUT /website/blog/1/_create
{
  "title": "My first blog entry",
  "text":  "Just trying this out..."
}

响应体告诉我们,这个新创建的文档 _version 版本号是 1 。现在假设我们想编辑这个文档:我们加载其数据到 web 表单中, 做一些修改,然后保存新的版本。

首先我们检索文档:

GET /website/blog/1

响应体包含相同的 _version 版本号 1

{
  "_index" :   "website",
  "_type" :    "blog",
  "_id" :      "1",
  "_version" : 1,
  "found" :    true,
  "_source" :  {
      "title": "My first blog entry",
      "text":  "Just trying this out..."
  }
}

现在,当我们尝试通过重建文档的索引来保存修改,我们指定 version 为我们的修改会被应用的版本:

PUT /website/blog/1?version=1 (1)
{
  "title": "My first blog entry",
  "text":  "Starting to get the hang of this..."
}
  1. 我们想这个在我们索引中的文档只有现在的 _version1 时,本次更新才能成功。

此请求成功,并且响应体告诉我们 _version 已经递增到 2

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "1",
  "_version": 2
  "created":  false
}

然而,如果我们重新运行相同的索引请求,仍然指定 version=1 , Elasticsearch 返回 409 Conflict HTTP 响应码,和一个如下所示的响应体:

{
   "error": {
      "root_cause": [
         {
            "type": "version_conflict_engine_exception",
            "reason": "[blog][1]: version conflict, current [2], provided [1]",
            "index": "website",
            "shard": "3"
         }
      ],
      "type": "version_conflict_engine_exception",
      "reason": "[blog][1]: version conflict, current [2], provided [1]",
      "index": "website",
      "shard": "3"
   },
   "status": 409
}

这告诉我们在 Elasticsearch 中这个文档的当前 _version 号是 2 ,但我们指定的更新版本号为 1

我们现在怎么做取决于我们的应用需求。我们可以告诉用户说其他人已经修改了文档,并且在再次保存之前检查这些修改内容。 或者,在之前的商品 stock_count 场景,我们可以获取到最新的文档并尝试重新应用这些修改。

所有文档的更新或删除 API,都可以接受 version 参数,这允许你在代码中使用乐观的并发控制,这是一种明智的做法。

通过外部系统使用版本控制

一个常见的设置是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索, 这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch ,如果多个进程负责这一数据同步,你可能遇到类似于之前描述的并发问题。

如果你的主数据库已经有了版本号 — 或一个能作为版本号的字段值比如 timestamp — 那么你就可以在 Elasticsearch 中通过增加 version_type=external 到查询字符串的方式重用这些相同的版本号, 版本号必须是大于零的整数, 且小于 9.2E+18 — 一个 Java 中 long 类型的正值。

外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否 _小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。

外部版本号不仅在索引和删除请求是可以指定,而且在 创建 新文档时也可以指定。

例如,要创建一个新的具有外部版本号 5 的博客文章,我们可以按以下方法进行:

PUT /website/blog/2?version=5&version_type=external
{
  "title": "My first external blog entry",
  "text":  "Starting to get the hang of this..."
}

在响应中,我们能看到当前的 _version 版本号是 5

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 5,
  "created":  true
}

现在我们更新这个文档,指定一个新的 version 号是 10

PUT /website/blog/2?version=10&version_type=external
{
  "title": "My first external blog entry",
  "text":  "This is a piece of cake..."
}

请求成功并将当前 _version 设为 10

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 10,
  "created":  false
}

如果你要重新运行此请求时,它将会失败,并返回像我们之前看到的同样的冲突错误, 因为指定的外部版本号不大于 Elasticsearch 的当前版本号。

文档的部分更新

更新整个文档 , 我们已经介绍过 更新一个文档的方法是检索并修改它,然后重新索引整个文档,这的确如此。然而,使用 update API 我们还可以部分更新文档,例如在某个请求时对计数器进行累加。

我们也介绍过文档是不可变的:他们不能被修改,只能被替换。 update API 必须遵循同样的规则。 从外部来看,我们在一个文档的某个位置进行部分更新。然而在内部, update API 简单使用与之前描述相同的 检索-修改-重建索引 的处理过程。 区别在于这个过程发生在分片内部,这样就避免了多次请求的网络开销。通过减少检索和重建索引步骤之间的时间,我们也减少了其他进程的变更带来冲突的可能性。

update 请求最简单的一种形式是接收文档的一部分作为 doc 的参数, 它只是与现有的文档进行合并。对象被合并到一起,覆盖现有的字段,增加新的字段。 例如,我们增加字段 tagsviews 到我们的博客文章,如下所示:

POST /website/blog/1/_update
{
   "doc" : {
      "tags" : [ "testing" ],
      "views": 0
   }
}

如果请求成功,我们看到类似于 index 请求的响应:

{
   "_index" :   "website",
   "_id" :      "1",
   "_type" :    "blog",
   "_version" : 3
}

检索文档显示了更新后的 _source 字段:

{
   "_index":    "website",
   "_type":     "blog",
   "_id":       "1",
   "_version":  3,
   "found":     true,
   "_source": {
      "title":  "My first blog entry",
      "text":   "Starting to get the hang of this...",
      "tags": [ "testing" ], (1)
      "views":  0 (1)
   }
}
  1. 新的字段已被添加到 _source 中。

使用脚本部分更新文档

脚本可以在 update API中用来改变 _source 的字段内容, 它在更新脚本中称为 ctx._source 。 例如,我们可以使用脚本来增加博客文章中 views 的数量:

POST /website/blog/1/_update
{
   "script" : "ctx._source.views+=1"
}
用 Groovy 脚本编程

对于那些 API 不能满足需求的情况,Elasticsearch 允许你使用脚本编写自定义的逻辑。 许多API都支持脚本的使用,包括搜索、排序、聚合和文档更新。 脚本可以作为请求的一部分被传递,从特殊的 .scripts 索引中检索,或者从磁盘加载脚本。

默认的脚本语言 是 Groovy,一种快速表达的脚本语言,在语法上与 JavaScript 类似。 它在 Elasticsearch V1.3.0 版本首次引入并运行在 沙盒 中,然而 Groovy 脚本引擎存在漏洞, 允许攻击者通过构建 Groovy 脚本,在 Elasticsearch Java VM 运行时脱离沙盒并执行 shell 命令。

因此,在版本 v1.3.8 、 1.4.3 和 V1.5.0 及更高的版本中,它已经被默认禁用。 此外,您可以通过设置集群中的所有节点的 config/elasticsearch.yml 文件来禁用动态 Groovy 脚本:

script.groovy.sandbox.enabled: false

这将关闭 Groovy 沙盒,从而防止动态 Groovy 脚本作为请求的一部分被接受, 或者从特殊的 .scripts 索引中被检索。当然,你仍然可以使用存储在每个节点的 config/scripts/ 目录下的 Groovy 脚本。

如果你的架构和安全性不需要担心漏洞攻击,例如你的 Elasticsearch 终端仅暴露和提供给可信赖的应用, 当它是你的应用需要的特性时,你可以选择重新启用动态脚本。

你可以在 scripting reference documentation 获取更多关于脚本的资料。

我们也可以通过使用脚本给 tags 数组添加一个新的标签。在这个例子中,我们指定新的标签作为参数,而不是硬编码到脚本内部。 这使得 Elasticsearch 可以重用这个脚本,而不是每次我们想添加标签时都要对新脚本重新编译:

POST /website/blog/1/_update
{
   "script" : "ctx._source.tags+=new_tag",
   "params" : {
      "new_tag" : "search"
   }
}

获取文档并显示最后两次请求的效果:

{
   "_index":    "website",
   "_type":     "blog",
   "_id":       "1",
   "_version":  5,
   "found":     true,
   "_source": {
      "title":  "My first blog entry",
      "text":   "Starting to get the hang of this...",
      "tags":  ["testing", "search"], (1)
      "views":  1 (2)
   }
}
  1. search 标签已追加到 tags 数组中。

  2. views 字段已递增。

我们甚至可以选择通过设置 ctx.opdelete 来删除基于其内容的文档:

POST /website/blog/1/_update
{
   "script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'",
    "params" : {
        "count": 1
    }
}

更新的文档可能尚不存在

假设我们需要在 Elasticsearch 中存储一个页面访问量计数器。 每当有用户浏览网页,我们对该页面的计数器进行累加。但是,如果它是一个新网页,我们不能确定计数器已经存在。 如果我们尝试更新一个不存在的文档,那么更新操作将会失败。

在这样的情况下,我们可以使用 upsert 参数,指定如果文档不存在就应该先创建它:

POST /website/pageviews/1/_update
{
   "script" : "ctx._source.views+=1",
   "upsert": {
       "views": 1
   }
}

我们第一次运行这个请求时, upsert 值作为新文档被索引,初始化 views 字段为 1 。 在后续的运行中,由于文档已经存在, script 更新操作将替代 upsert 进行应用,对 views 计数器进行累加。

更新和冲突

在本节的介绍中,我们说明 检索重建索引 步骤的间隔越小,变更冲突的机会越小。 但是它并不能完全消除冲突的可能性。 还是有可能在 update 设法重新索引之前,来自另一进程的请求修改了文档。

为了避免数据丢失, update API 在 检索 步骤时检索得到文档当前的 version 号,并传递版本号到 _重建索引 步骤的 index 请求。 如果另一个进程修改了处于检索和重新索引步骤之间的文档,那么 _version 号将不匹配,更新请求将会失败。

对于部分更新的很多使用场景,文档已经被改变也没有关系。 例如,如果两个进程都对页面访问量计数器进行递增操作,它们发生的先后顺序其实不太重要; 如果冲突发生了,我们唯一需要做的就是尝试再次更新。

这可以通过设置参数 retry_on_conflict 来自动完成, 这个参数规定了失败之前 update 应该重试的次数,它的默认值为 0

POST /website/pageviews/1/_update?retry_on_conflict=5 (1)
{
   "script" : "ctx._source.views+=1",
   "upsert": {
       "views": 0
   }
}
  1. 失败之前重试该更新5次。

在增量操作无关顺序的场景,例如递增计数器等这个方法十分有效,但是在其他情况下变更的顺序 非常重要的。 类似 index APIupdate API 默认采用 最终写入生效 的方案,但它也接受一个 version 参数来允许你使用 optimistic concurrency control 指定想要更新文档的版本。

取回多个文档

Elasticsearch 的速度已经很快了,但甚至能更快。 将多个请求合并成一个,避免单独处理每个请求花费的网络延时和开销。 如果你需要从 Elasticsearch 检索很多文档,那么使用 multi-get 或者 mget API 来将这些检索请求放在一个请求中,将比逐个文档请求更快地检索到全部文档。

mget API 要求有一个 docs 数组作为参数,每个元素包含需要检索文档的元数据, 包括 _index_type_id 。如果你想检索一个或者多个特定的字段,那么你可以通过 _source 参数来指定这些字段的名字:

GET /_mget
{
   "docs" : [
      {
         "_index" : "website",
         "_type" :  "blog",
         "_id" :    2
      },
      {
         "_index" : "website",
         "_type" :  "pageviews",
         "_id" :    1,
         "_source": "views"
      }
   ]
}

该响应体也包含一个 docs 数组, 对于每一个在请求中指定的文档,这个数组中都包含有一个对应的响应,且顺序与请求中的顺序相同。 其中的每一个响应都和使用单个 get request 请求所得到的响应体相同:

{
   "docs" : [
      {
         "_index" :   "website",
         "_id" :      "2",
         "_type" :    "blog",
         "found" :    true,
         "_source" : {
            "text" :  "This is a piece of cake...",
            "title" : "My first external blog entry"
         },
         "_version" : 10
      },
      {
         "_index" :   "website",
         "_id" :      "1",
         "_type" :    "pageviews",
         "found" :    true,
         "_version" : 2,
         "_source" : {
            "views" : 2
         }
      }
   ]
}

如果想检索的数据都在相同的 _index 中(甚至相同的 _type 中),则可以在 URL 中指定默认的 /_index 或者默认的 /_index/_type

你仍然可以通过单独请求覆盖这些值:

GET /website/blog/_mget
{
   "docs" : [
      { "_id" : 2 },
      { "_type" : "pageviews", "_id" :   1 }
   ]
}

事实上,如果所有文档的 _index_type 都是相同的,你可以只传一个 ids 数组,而不是整个 docs 数组:

GET /website/blog/_mget
{
   "ids" : [ "2", "1" ]
}

注意,我们请求的第二个文档是不存在的。我们指定类型为 blog ,但是文档 ID 1 的类型是 pageviews ,这个不存在的情况将在响应体中被报告:

{
  "docs" : [
    {
      "_index" :   "website",
      "_type" :    "blog",
      "_id" :      "2",
      "_version" : 10,
      "found" :    true,
      "_source" : {
        "title":   "My first external blog entry",
        "text":    "This is a piece of cake..."
      }
    },
    {
      "_index" :   "website",
      "_type" :    "blog",
      "_id" :      "1",
      "found" :    false  (1)
    }
  ]
}
  1. 未找到该文档。

事实上第二个文档未能找到并不妨碍第一个文档被检索到。每个文档都是单独检索和报告的。

Note

即使有某个文档没有找到,上述请求的 HTTP 状态码仍然是 200 。事实上,即使请求 没有 找到任何文档,它的状态码依然是 200 --因为 mget 请求本身已经成功执行。 为了确定某个文档查找是成功或者失败,你需要检查 found 标记。

代价较小的批量操作

mget 可以使我们一次取回多个文档同样的方式, bulk API 允许在单个步骤中进行多次 createindexupdatedelete 请求。 如果你需要索引一个数据流比如日志事件,它可以排队和索引数百或数千批次。

bulk 与其他的请求体格式稍有不同,如下所示:

{ action: { metadata }}\n
{ request body        }\n
{ action: { metadata }}\n
{ request body        }\n
...

这种格式类似一个有效的单行 JSON 文档 ,它通过换行符(\n)连接到一起。注意两个要点:

  • 每行一定要以换行符(\n)结尾, 包括最后一行 。这些换行符被用作一个标记,可以有效分隔行。

  • 这些行不能包含未转义的换行符,因为他们将会对解析造成干扰。这意味着这个 JSON 能使用 pretty 参数打印。

Tip
为什么是有趣的格式? 中, 我们解释为什么 bulk API 使用这种格式。

action/metadata 行指定 哪一个文档什么操作

action 必须是以下选项之一:

create

如果文档不存在,那么就创建它。详情请见 创建新文档

index

创建一个新文档或者替换一个现有的文档。详情请见 索引文档更新整个文档

update

部分更新一个文档。详情请见 文档的部分更新

delete

删除一个文档。详情请见 删除文档

metadata 应该指定被索引、创建、更新或者删除的文档的 _index_type_id

例如,一个 delete 请求看起来是这样的:

{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}

request body 行由文档的 _source 本身组成—​文档包含的字段和值。它是 indexcreate 操作所必需的,这是有道理的:你必须提供文档以索引。

它也是 update 操作所必需的,并且应该包含你传递给 update API 的相同请求体: docupsertscript 等等。 删除操作不需要 request body 行。

{ "create":  { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "My first blog post" }

如果不指定 _id ,将会自动生成一个 ID :

{ "index": { "_index": "website", "_type": "blog" }}
{ "title":    "My second blog post" }

为了把所有的操作组合在一起,一个完整的 bulk 请求 有以下形式:

POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }} (1)
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "My first blog post" }
{ "index":  { "_index": "website", "_type": "blog" }}
{ "title":    "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} } (2)
  1. 请注意 delete 动作不能有请求体,它后面跟着的是另外一个操作。

  2. 谨记最后一个换行符不要落下。

这个 Elasticsearch 响应包含 items 数组,这个数组的内容是以请求的顺序列出来的每个请求的结果。

{
   "took": 4,
   "errors": false, (1)
   "items": [
      {  "delete": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "123",
            "_version": 2,
            "status":   200,
            "found":    true
      }},
      {  "create": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "123",
            "_version": 3,
            "status":   201
      }},
      {  "create": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "EiwfApScQiiy7TIKFxRCTw",
            "_version": 1,
            "status":   201
      }},
      {  "update": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "123",
            "_version": 4,
            "status":   200
      }}
   ]
}
  1. 所有的子请求都成功完成。

每个子请求都是独立执行,因此某个子请求的失败不会对其他子请求的成功与否造成影响。 如果其中任何子请求失败,最顶层的 error 标志被设置为 true ,并且在相应的请求报告出错误明细:

POST /_bulk
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "Cannot create - it already exists" }
{ "index":  { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "But we can update it" }

在响应中,我们看到 create 文档 123 失败,因为它已经存在。但是随后的 index 请求,也是对文档 123 操作,就成功了:

{
   "took": 3,
   "errors": true, (1)
   "items": [
      {  "create": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "123",
            "status":   409, (2)
            "error":    "DocumentAlreadyExistsException (3)
                        [[website][4] [blog][123]:
                        document already exists]"
      }},
      {  "index": {
            "_index":   "website",
            "_type":    "blog",
            "_id":      "123",
            "_version": 5,
            "status":   200 (4)
      }}
   ]
}
  1. 一个或者多个请求失败。

  2. 这个请求的HTTP状态码报告为 409 CONFLICT

  3. 解释为什么请求失败的错误信息。

  4. 第二个请求成功,返回 HTTP 状态码 200 OK

这也意味着 bulk 请求不是原子的: 不能用它来实现事务控制。每个请求是单独处理的,因此一个请求的成功或失败不会影响其他的请求。

不要重复指定Index和Type

也许你正在批量索引日志数据到相同的 indextype 中。 但为每一个文档指定相同的元数据是一种浪费。相反,可以像 mget API 一样,在 bulk 请求的 URL 中接收默认的 /_index 或者 /_index/_type

POST /website/_bulk
{ "index": { "_type": "log" }}
{ "event": "User logged in" }

你仍然可以覆盖元数据行中的 _index_type , 但是它将使用 URL 中的这些元数据值作为默认值:

POST /website/log/_bulk
{ "index": {}}
{ "event": "User logged in" }
{ "index": { "_type": "blog" }}
{ "title": "Overriding the default type" }

多大是太大了?

整个批量请求都需要由接收到请求的节点加载到内存中,因此该请求越大,其他请求所能获得的内存就越少。 批量请求的大小有一个最佳值,大于这个值,性能将不再提升,甚至会下降。 但是最佳值不是一个固定的值。它完全取决于硬件、文档的大小和复杂度、索引和搜索的负载的整体情况。

幸运的是,很容易找到这个 最佳点 :通过批量索引典型文档,并不断增加批量大小进行尝试。 当性能开始下降,那么你的批量大小就太大了。一个好的办法是开始时将 1,000 到 5,000 个文档作为一个批次, 如果你的文档非常大,那么就减少批量的文档个数。

密切关注你的批量请求的物理大小往往非常有用,一千个 1KB 的文档是完全不同于一千个 1MB 文档所占的物理大小。 一个好的批量大小在开始处理后所占用的物理大小约为 5-15 MB。


书籍推荐