ElasticKit:PHP Elasticsearch查询构造器

作者:袖梨 2026-06-06

PHP 操作 Elasticsearch,查询一复杂就是五层嵌套的关联数组。改一个 filtermust,要重写整段结构;加一个动态条件,要拼 $params['body']['query']['bool']['filter'][] 这种路径。

ElasticKit:PHP Elasticsearch 查询构建器

对比

手写数组:

$params = [
    'body' => [
        'query' => [
            'bool' => [
                'must' => [
                    ['match' => ['title' => 'elasticsearch']]
                ],
                'filter' => [
                    ['range' => ['price' => ['gte' => 10, 'lte' => 100]]],
                    ['term' => ['status' => 'published']]
                ]
            ]
        ],
        'highlight' => [
            'fields' => ['title' => new stdClass()]
        ],
        'sort' => [['price' => 'asc']],
        'size' => 20
    ]
];if ($categoryId) {
    $params['body']['query']['bool']['filter'][] = ['term' => ['category_id' => $categoryId]];
}

ElasticKit:

ProductIndex::query()
    ->bool([
        'must'   => fn ($q) => $q->match('title', 'elasticsearch'),
        'filter' => fn ($q) => $q
            ->range('price', [10, 100])
            ->term('status', 'published')
            ->when($categoryId, fn ($q) => $q->term('category_id', $categoryId)),
    ])
    ->highlight('title')
    ->sort('price', 'asc')
    ->size(20)
    ->get();

为什么要做 ElasticKit

用 PHP 管理 Elasticsearch,DSL 手写数组、Index 管理、scroll 遍历、零停机重建、批量写入等都需要自行组织。现有方案要么过重,要么过于轻量,要么自建概念,需要额外学习成本。ElasticKit 希望在中间做一些平衡。

ElasticKit 的目标:

  • 全链路——Index 管理、查询、CRUD、批量写入、零停机重建一站搞定
  • API 与 ES 保持一致——方法名即 JSON key,无额外概念
  • 不封闭——覆盖不到的场景传原生数组,不强制走 DSL
  • 不绑框架——无框架依赖,DSL 层可独立使用

DSL 层

查询构建

方法名即 ES JSON key,链式调用:

// 简单查询
$query->match('title', 'elasticsearch');
$query->term('status', 'published');
$query->range('price', ['gte' => 10, 'lte' => 100]);// bool 组合
$query->bool([
    'must'   => fn ($q) => $q->match('title', 'elasticsearch'),
    'filter' => fn ($q) => $q->range('price', [10, 100])->term('status', 'published'),
]);// 条件查询
$query->match('title', 'elasticsearch')
    ->when($categoryId, fn ($q) => $q->term('category_id', $categoryId));

toArray() 返回数组,toJson() 返回 JSON 字符串——调试时随时查看生成的 DSL:

$query = new Query();
$query->match('title', 'elasticsearch');$query->toArray();
// ['query' => ['match' => ['title' => 'elasticsearch']]]$query->toJson();
// {"query":{"match":{"title":"elasticsearch"}}}

DSL 对象在 toArray() 时才序列化,构建过程无额外性能开销。

聚合

链式调用添加平级聚合,闭包处理嵌套:

// 按分类聚合,每个分类取平均价格和 Top 3 商品
$query->aggs('by_category', function ($a) {
    $a->terms(['field' => 'category_id', 'size' => 20]);
    $a->aggs('avg_price', ['avg' => ['field' => 'price']]);
    $a->aggs('top_products', ['top_hits' => ['size' => 3, 'sort' => ['price' => 'desc']]]);
})->aggs('price_stats', ['stats' => ['field' => 'price']]);// Index 层快捷方法——直接返回标量
ProductIndex::query()->avg('price');   // 29.99
ProductIndex::query()->max('price');   // 199.99

多态参数

每个 DSL 方法接受字符串、数组、闭包、对象四种输入:

// 字符串
$query->match('title', 'elasticsearch');// 数组
$query->range('price', ['gte' => 10, 'lte' => 100]);// 闭包——精细控制参数
$query->match('title', function (Match_ $m) {
    $m->query('elasticsearch')->fuzziness('AUTO');
});// 对象
$query->term((new Term())->field('status')->value('published'));

所有输入最终都包装成内部对象,toArray() 时统一序列化。DSL 覆盖不到的场景,直接传原生数组:

Query::make([
    'query' => fn ($q) => $q->match('title', 'test'),
    'post_filter' => fn ($q) => $q->term('status', 'published'),
    'size' => 20,
])->sort('price', 'asc');

Index 层

一切从一个 Index 子类开始:

use ElasticKitIndexIndex;class ProductIndex extends Index
{
    protected $name = 'products';
    protected $mappings = [
        'properties' => [
            'title'  => ['type' => 'text'],
            'price'  => ['type' => 'float'],
            'status' => ['type' => 'keyword'],
        ],
    ];
}// 注册 Client,全局一次
Index::setClient(
    ElasticElasticsearchClientBuilder::create()->setHosts(['http://localhost:9200'])->build()
);

name、mappings、settings 在子类中声明,查询、CRUD、重建通过 ProductIndex:: 静态调用。

搜索

$results = ProductIndex::query()
    ->match('title', 'elasticsearch')
    ->sort('price', 'asc')
    ->size(20)
    ->get();$results->total();        // 命中数
$results->docs();         // _source 数组
$results->aggregations(); // 聚合结果ProductIndex::query()->first();
ProductIndex::query()->term('status', 'published')->count();

分页

// 手动分页
$results = ProductIndex::query()->paginate($page, $perPage);// 自动解析请求
Index::setPageResolver(fn () => [$_GET['page'] ?? 1, $_GET['per_page'] ?? 20]);
$results = ProductIndex::query()->paginate();// 可扩展对接框架分页器(以 Laravel 为例)
Index::setPaginatorResolver(fn ($results, $page, $perPage) =>
    new LengthAwarePaginator($results->docs(), $results->total(), $perPage, $page)
);
$results->toPaginator();

大数据遍历

原生 scroll 需手动管理 scrollId、循环、清理:

$results = $client->search(['scroll' => '5m', ...]);
while (count($results['hits']['hits']) > 0) {
    // 处理...
    $results = $client->scroll(['scroll_id' => $results['_scroll_id'], 'scroll' => '5m']);
}
$client->clearScroll(['scroll_id' => $results['_scroll_id']]);

Cursor 封装为生成器,用完自动清理:

foreach (ProductIndex::query()->cursor() as $results) {
    foreach ($results->docs() as $doc) {
        // 处理
    }
}

文档 CRUD

$doc = ProductIndex::doc(1);
$doc->create(['title' => 'New Product', 'price' => 29.99]);
$doc->source();
$doc->update(['price' => 39.99]);
$doc->retryOnConflict(3)->update(['price' => 39.99]);
$doc->delete();

批量操作

$bulk = new Bulk(new ProductIndex());
$bulk->batchSize(500);
$bulk->index(1, ['title' => 'Product A']);
$bulk->index(2, ['title' => 'Product B']);
$bulk->delete(3);
$bulk->execute();

零停机重建

索引结构变更(加字段、改分词器)需要重建索引。数据源在 Index 子类中用生成器定义:

class ProductIndex extends Index
{
    public function source(array $context = []): iterable
    {
        foreach (Product::all() as $product) {
            yield $product->id => $product->toArray();
        }
    }
}

一条命令完成重建:

$rebuild = new Rebuild(new ProductIndex());
$result = $rebuild->batchSize(500)->run();
// 创建新索引(带时间戳后缀) → 批量导入 → 切别名指向新索引 → 确认后清理旧索引

切别名后发现问题:

$rebuild->rollback($result['oldIndex']);  // 别名切回旧索引
$rebuild->clean($result['oldIndex']);     // 确认无误后清理旧索引

数据源是生成器,百万级数据不会撑爆内存。run() 支持传 context 过滤数据:$rebuild->run(['after' => '2025-01-01'])

source() 也适用于增量同步场景——通过 context 传入指定 ID,支持单文档或多文档的增量更新。

事件系统

11 个事件钩子,三层命名(search.query.beforebulk.execute.afterrebuild.run.failed),支持通配符:

Index::listen('search.query.after', function (Event $e) {
    Log::info("{$e->name} on {$e->index}", ['duration' => $e->duration]);
});Index::listen('rebuild.run.after', function (Event $e) {
    Mail::to($admin)->send("索引 {$e->index} 重建完成");
});Index::listen('rebuild.import.failed', function (Event $e) {
    Sms::to($admin)->send("索引重建导入失败:{$e->index}");
});

覆盖范围

类别数量示例
查询类型10 大类 50+ 方法match、term、range、bool、nested、geo、function_score…
聚合47 种terms、date_histogram、composite、significant_terms…
搜索参数26 个sort、highlight、rescore、collapse、suggest…

PHPStan Level 8 静态分析 + 360+ 单元测试。完整列表见 DSL 文档,Index 层功能见 Index 文档。

安装

# ES 8.x(PHP 8.1+)
composer require ykan/elastickit:^8@beta# ES 7.x(PHP 7.4+)
composer require ykan/elastickit:^7@beta

核心 API 已稳定。Beta 阶段主要在完善文档和收集反馈。


MIT 协议。GitHub: github.com/ykan821/Ela…

相关文章

精彩推荐