介绍

InfluxDB 是 InfluxData 公司在 2013 年开源的数据库,是为了存储物联网设备、DevOps 运维这类场景下大量带有时间戳数据而设计的。
在这里插入图片描述

在这里插入图片描述

InfluxDB 源码采用 Go 语言编写,在 InfluxDB OSS 的版本中,部署方式上又分为两个版本,单机版和集群版。单机版开源,目前在 github 上有 21k+ star。集群版闭源,走商业路线。

个人认为单机版的 InfluxDB 比较鸡肋。因为一旦选择使 InfluxDB,那么数据量肯定一定达到了某个很高的程度。这时候必须使用集群版。而在数据量不够高的情况下,InfluxDB 并不会比 MongoDB 或者 ElasticSearch 有更明显的优势。

考虑到学习成本、简化上手难度,InfluxDB1.x 采用了一种类似 SQL 的 InfluxQL 语言来操作数据。2019 年 1 月推出了 InfluxDB2.0 alpha 版本。受到 2018 年最流行的脚本语言 JavaScript 影响,推出了全新的查询语言 Flux。并在 2020 年底推出了 InfluxDB 2.0 正式版本,该版本又分为两个系列,云模式的 InfluxDB Cloud 和独立部署的 InfluxDB OSS。

Flux 不是绑定在 InfluxDB 上的查询脚本语言,它是一个独立的项目,图灵完备,便于处理数据,也可以用作 InfluxDB 以外。

由于 InfluxDB 的流行程度不高,而且 2.0 版本也推出不久,所以在国内搜索到的很多 InfluxDB 相关资料都是在讲述 1.x 的内容,参考意义不大,目前最好的学习路径是 InfluxDB 官方文档。本文内容全部基于 InfluxDB OSS 2.0 版本。

这篇介绍文档包括获取、运行InfluxDB的信息。

TICK 与 InfluxDB

TICK 是 InfluxData 平台的组件集的缩写,分别代表四大组件:Telegraf(数据收集器)、InfluxDB(时序数据库)、Chronograf(可视化 UI)和 Kapacitor(监控服务)。

InfluxData 公司的愿景是帮助人们处理时序数据,仅依靠一个时序数据库是不够的,还需要解决因为时序数据自身产生的一系列问题。因此 InfluxData 决定设计并开发 TICK。

在早期 Kapacitor 的脚本语言是 TICKScript,但是并不好用,遭受到社区中很多人的诟病。因此出现了 Flux。Flux 的功能性比 InfluxQL 更强,比 TICKScript 更易用。

随着 Flux 的逐渐发展,InfluxDB 的能力范围也在逐步扩展。

什么是时序数据库?

时序数据库全称时间序列数据库,英文名 Time Series DataBase,缩写 TSDB。

这种数据库专门用作处理时间序列数据。

那什么是时间序列数据呢?就是随着时间变化而源源不断产生的数据。

举个例子,Window系统CPU利用率就是一种时间序列数据,每秒都是产生当前的CPU活动情况。

为什么需要时序数据库?

随着物联网和大数据时代的到来,全球每天产生的数据量大到令人难以想象。这些数据受到业务场景的限制分为不同的种类,每个种类对存储都有不同的要求。单凭传统的 RDBMS 很难完成各种复杂场景下的数据存储。

这时我们就需要根据不同的数据特性和业务场景的要求,选择不同的数据库。

一般选择使用哪个数据库,要从低响应时间(Low Response Time)、高可用性(High Availability)、高并发(High Concurrency)、海量数据(Big Data)和可承担成本(Affordable Cost)五个维度去权衡。

数据库的种类非常繁多,举几个常见的类型来对比一下各自的特点。

关系型数据库主流代表有 MySQL、Oracle 等,优点是具有 ACID 的特性,各方面能力都比较均衡。缺点是查询、插入和修改的性能都很一般。

KV 数据库主流带代表有 Redis、Memcached 等,优点是存储简单、读写性能极高。缺点是存储成本非常高,不适合海量数据存储。

文档型数据库最流行的是 MongoDB,相比 MySQL,数据结构灵活、更擅长存储海量数据,在海量数据的场景下读写性能很强。缺点是占用空间会很大。

搜索引擎数据库最流行的是 ElasticSearch,非常擅长全文检索和复杂查询,性能极强,并且天生集群。缺点是写入性能低、字段类型无法修改、硬件资源消耗严重。

而时序数据库,最初诞生的目的很大程度上是在对标 MongoDB,因为在时序数据库出现之前,存储时序数据这项领域一直被 MongoDB 所占据。

为什么选择 InfluxDB?

虽然时序数据库早在 13 年左右就已经出现,但真正流行的时间非常晚,一直到了 17、18 年才稍微普及,即使到了今天,时序数据库在 DB Engiens 的数据库排名中仍然很落后,最靠前的 InfluxDB 也仅仅排在了 28 位。

选择 InfluxDB 的原因非常简单,因为它目前是最流行的时序数据库。

InfluxDB 之所以能够从众多时序数据库中脱颖而出,除了自身强大以外,活跃的社区、合理的商业模式和营销功不可没。

既然时序数据库很好,为什么排名如此考后?原因是应用场景少,记录运维监控和物联网实时数据是时序数据库的用武之地。而大多数的系统使用 MySQL、MongoDB 和 Redis 这些主流数据库就可以很好地支撑。

除了 InfluxDB 以外,还有几个比较流行的时序数据库,比如基于 PostgreSQL 的 TimeScaleDB,目前排在 97 位。基于 HBase 的 OpenTSDB,排在 122 位。基于 Cassandra 的 KairosDB,目前排在 201 位。

下载

提供了最新稳定版和其他版本的InfluxDB下载地址。

安装

介绍在Ubuntu、Debian、Redhat、Centos和OS X上如何安装InfluxDB。

入门指南

介绍利用InfluxDB怎样读写时序数据。

概念介绍

理解下面的概念,会让你更加充分利用InfluxDB。

关键概念

对InfluxDB核心架构的关键概念作简要说明,对于初学者来说很重要。

专业术语

列出InfluxDB的术语及其定义。

与SQL比较

InfluxDB的设计见解和权衡

简要介绍了在设计InfluxDB的时候对性能做的一些权衡。

schema设计

InfluxDB时间序列数据结构的概述及其如何影响性能。

存储引擎

概述下InfluxDB是如何将数据存储在磁盘上。

使用指南

写入数据

查询数据

采样和数据保留

硬件指南

HTTPS设置

查询语言

本部分介绍InfluxQL,InfluxDB的SQL类查询语言,用于与InfluxDB中的数据进行交互。

InfluxQL教程

本部分的前七个文档提供了InfluxQL的教程式介绍。你可以随时下载文档相关的示例数据。

数据查询语法

涵盖InfluxQL的查询语言基础知识,包括SELECT语句,GROUP BY子句,INTO子句等。参阅数据查询还可以了解查询中的时间语法和正则表达式。

schema查询语法

涵盖schema相关的查询语法。有关InfluxQL的SHOW查询的语法说明和示例。

数据库管理

涵盖InfluxQL用于管理InfluxDB中的数据库和存储策略,具体有创建删除数据库和存储策略,以及删除数据。

函数

涵盖InfluxQL的函数。

连续查询

涵盖Continuous Queries的基本语法,高级语法和常见用例。 此页面还介绍了如何进行SHOWDROP Continuous Queries。

数学计算

涵盖InfluxQL中的数学计算。

认证和授权

介绍如何设置身份验证和如何验证InfluxDB中的请求。此页面还描述了用于管理数据库用户的不同用户类型和InfluxQL。

InfluxQL参考

InfluxQL参考文档

写入协议

InfluxDB的行协议是一种写入数据点到InfluxDB的文本格式。

行协议

行协议的教程样式文档

故障排除

FAQ

本页面讨论了被频繁问及的易混淆的InfluxDB相对于其他数据库系统以意想不到的方式行事的地方。

系统监控

系统监控意味着InfluxDB系统用户可以获得有关系统本身的所有统计和诊断信息。其目的是协助对数据库本身进行故障排除和性能分析。

查询管理

借助InfluxDB的查询管理功能,用户可以识别当前正在运行的查询,并能够终止超载系统的查询。 另外,用户可以通过几个配置设置来阻止和停止执行低效查询。

一、安装

1.下载。官方地址下载太慢了,换成了清华源

wget https://mirrors.tuna.tsinghua.edu.cn/influxdata/yum/el7-x86_64/influxdb2-2.1.1.x86_64.rpm

2.安装influxdb

yum localinstall -y ./influxdb2-2.1.1.x86_64.rpm

如果你的系统可以使用Systemd(比如CentOS 7+, RHEL 7+),也可以这样启动:

sudo systemctl start influxdb
sudo systemctl status influxdb
sudo systemctl enable influxdb

3.安装Telegraf
选择influxdb镜像源

cat <<EOF | sudo tee /etc/yum.repos.d/influxdb.repo
[influxdb]
name = InfluxData Repository - Stable
baseurl = https://repos.influxdata.com/stable/\$basearch/main
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdb.key
EOF
# yum search telegraf
yum install telegraf

二、插件使用

管理组织

组织是一组用户的工作区。所有仪表盘、任务、存储桶、成员等都属于一个组织。

创建组织

# Syntax
influx org create -n <org-name>

# Example
influx org create -n my-org

浏览组织

influx org list
# 以下是输出信息
ID                      Name
2110531d15639d85        my-org
8fc0afcb242421df        zhaoxi
c843a93b4e5f59ac        my-org1

更新组织

# Syntax
influx org update -i <org-id> -n <new-org-name>

# Example
influx org update -i 2110531d15639d85 -n my-new-org

删除组织

# Syntax
influx org delete -i <org-id>

# Example
influx org delete -i c843a93b4e5f59ac

管理桶

桶是存储时间序列数据的命名位置。所有存储桶都有一个保留期,即每个数据点持续的时间。InfluxDB丢弃时间戳早于存储桶保留期的所有点。桶属于一个组织。

创建桶

使用 influx 桶创建命令创建新桶。需要以下各项参数:

bucket name

organization name or ID

retention period (duration to keep data) in one of the following units:

  • nanoseconds (ns)
  • microseconds (us or µs)
  • milliseconds (ms)
  • seconds (s)
  • minutes (m)
  • hours (h)
  • days (d)
  • weeks (w)
# Syntax
influx bucket create -n <bucket-name> -o <org-name> -r <retention-period-duration>

# Example
influx bucket create -n my-bucket -o zhaoxi -r 72h

使用InfluxDB API创建一个bucket

INFLUX_TOKEN=YOUR_API_TOKEN
INFLUX_ORG_ID=YOUR_ORG_ID
curl --request POST \
    "http://192.168.173.129:8086/api/v2/buckets" \
    --header "Authorization: Token ${INFLUX_TOKEN}" \
    --header "Content-type: application/json" \
    --data '{
    "orgID": "'"${INFLUX_ORG_ID}"'",
    "name": "iot-center",
    "retentionRules": [
      {
        "type": "expire",
        "everySeconds": 86400,
        "shardGroupDurationSeconds": 0
      }
    ]
  }'

浏览桶

influx bucket list
# 以下是输出信息
ID                      Name            Retention       Shard group duration    Organization ID         Schema Type
02acf52169630589        _monitoring     168h0m0s        24h0m0s                 8fc0afcb242421df        implicit
7260d1a5db1f1471        _tasks          72h0m0s         24h0m0s                 8fc0afcb242421df        implicit
c1b76e90ad472138        my-bucket       infinite        168h0m0s                8fc0afcb242421df        implicit

更新桶

# Syntax
influx bucket update -i <bucket-id> -n <new-bucket-name>

# Example
influx bucket update -i c1b76e90ad472138 -n my-new-bucket

删除桶

# Syntax
influx bucket delete -n <bucket-name> -o <org-name>

# Example
influx bucket delete -n my-bucket -o zhaoxi

管理成员

创建成员

# Syntax
influx user create -n <username> -o <org-name>

# Example
influx user create -n johndoe -o my-org
# Syntax
influx user create -n <username> -p <password> -o <org-name>

# Example
influx user create -n johndoe -p PaSsWoRd -o my-org

浏览成员

influx user list
# 以下是输出信息
ID                      Name
09ef0c2b3b2e8000        admin
09f089a7f2608000        johndoe

更新成员

# Syntax
influx user update -i <user-id> -n <new-username>

# Example
influx user update -i 09f089a7f2608000 -n janedoe

删除成员

# Syntax
influx user delete -i <user-id>

# Example
influx user delete -i 09f0895351e08000

修改密码

# Syntax
influx user password -n <username> -t <token>

# Example
influx user password -n janedoe -t My5uPErSecR37t0k3n

恢复用户凭据

# Update a password
influxd recovery user update \
  --username example-username \
  --password ExAmPL3-paS5W0rD

列出InfluxDB实例中的现有用户

influxd recovery user list

为恢复目的创建用户

influxd recovery user create \
  --username example-username \
  --password ExAmPL3-paS5W0rD

写数据

CSV数据写入

Example write command

influx write -b example-bucket -f /root/example.csv

example.csv

#datatype measurement,tag,double,dateTime:RFC3339
m,host,used_percent,time
mem,host1,64.23,2020-01-01T00:00:00Z
mem,host2,72.01,2020-01-01T00:00:00Z
mem,host1,62.61,2020-01-01T00:00:10Z
mem,host2,72.98,2020-01-01T00:00:10Z
mem,host1,63.40,2020-01-01T00:00:20Z
mem,host2,73.77,2020-01-01T00:00:20Z

Resulting line protocol

mem,host=host1 used_percent=64.23 1577836800000000000
mem,host=host2 used_percent=72.01 1577836800000000000
mem,host=host1 used_percent=62.61 1577836810000000000
mem,host=host2 used_percent=72.98 1577836810000000000
mem,host=host1 used_percent=63.40 1577836820000000000
mem,host=host2 used_percent=73.77 1577836820000000000

将原始查询结果写回InfluxDB

如果要写入InfluxDB的CSV数据不包含将数据正确转换为行协议所需的注释,请使用–header标志将注释行注入CSV数据。

influx write -b example-bucket \
  -f /root/example1.csv \
  --header "#constant measurement,birds" \
  --header "#datatype dateTime:2006-01-02,long,tag"

example.csv

date,sighted,loc
2020-01-01,12,Boise
2020-06-01,78,Boise
2020-01-01,54,Seattle
2020-06-01,112,Seattle
2020-01-01,9,Detroit
2020-06-01,135,Detroit

Resulting line protocol

birds,loc=Boise sighted=12i 1577836800000000000
birds,loc=Boise sighted=78i 1590969600000000000
birds,loc=Seattle sighted=54i 1577836800000000000
birds,loc=Seattle sighted=112i 1590969600000000000
birds,loc=Detroit sighted=9i 1577836800000000000
birds,loc=Detroit sighted=135i 1590969600000000000

Influx CLI写入数据

要从命令行写入数据,请使用influx write命令。在命令中包括以下内容:

Requirement Include by
Organization Use the -o,--org, or --org-id flags.
Bucket Use the -b, --bucket, or --bucket-id flags.
Precision Use the -p, --precision flag.
API token Set the INFLUX_TOKEN environment variable or use the t, --token flag.
Data Write data using line protocol or annotated CSV. Pass a file with the -f, --file flag.

Write a single line of line protocol

influx write \
  -b bucketName \
  -o orgName \
  -p s \
  'myMeasurement,host=myHost testField="testData" 1556896326'

Write line protocol from a file

influx write \
  -b bucketName \
  -o orgName \
  -p s \
  --format=lp \
  -f /path/to/line-protocol.txt

Write annotated CSV from a file

influx write \
  -b bucketName \
  -o orgName \
  -p s \
  --format=csv \
  -f /path/to/data.csv

InfluxDB API写入数据

使用对InfluxDB API/Write端点的HTTP请求将数据写入InfluxDB。使用POST请求方法,并在请求中包含以下内容:

Requirement Include by
Organization Use the org query parameter in your request URL.
Bucket Use the bucket query parameter in your request URL.
Timestamp precision Use the precision query parameter in your request URL. Default is ns.
API token Use the Authorization: Token YOUR_API_TOKEN header.
Line protocol Pass as plain text in your request body.

示例中的URL取决于InfluxDB 2.4实例的版本和位置。

curl --request POST \
"http://localhost:8086/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET&precision=ns" \
  --header "Authorization: Token YOUR_API_TOKEN" \
  --header "Content-Type: text/plain; charset=utf-8" \
  --header "Accept: application/json" \
  --data-binary '
    airSensors,sensor_id=TLM0201 temperature=73.97038159354763,humidity=35.23103248356096,co=0.48445310567793615 1630424257000000000
    airSensors,sensor_id=TLM0202 temperature=75.30007505999716,humidity=35.651929918691714,co=0.5141876544505826 1630424257000000000
    '

在InfluxDB API中使用gzip压缩

使用InfluxDB API/write端点写入数据时,使用gzip压缩数据,并将内容编码头设置为gzip。压缩减少了网络带宽,但增加了服务器端负载。

echo "airSensors,sensor_id=TLM0201 temperature=73.97038159354763,humidity=35.23103248356096,co=0.48445310567793615 1630525358 
  airSensors,sensor_id=TLM0202 temperature=75.30007505999716,humidity=35.651929918691714,co=0.5141876544505826 1630525358" | gzip > air-sensors.gzip

curl --request POST \
"http://localhost:8086/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET&precision=ns" \
  --header "Authorization: Token YOUR_API_TOKEN" \
  --header "Content-Encoding: gzip" \
  --header "Content-Type: text/plain; charset=utf-8" \
  --header "Accept: application/json" \
  --data-binary @air-sensors.gzip

删除数据

使用influx CLI删除数据

  1. 使用influx delete命令从InfluxDB中删除点。

  2. 使用–bucket标志指定从哪个bucket中删除数据。

  3. 使用–start和–stop标志定义从中删除数据的时间范围。使用RFC3339时间戳。

4)(可选)使用-p,-predicate标志包含一个删除谓词,该谓词标识要删除的点。

删除具有特定标记值的特定测量中的点

influx delete --bucket example-bucket \
  --start '1970-01-01T00:00:00Z' \
  --stop $(date +"%Y-%m-%dT%H:%M:%SZ") \
  --predicate '_measurement="example-measurement" AND exampleTag="exampleTagValue"'
  
influx delete --bucket my-bucket \
  --start 2001-03-01T00:00:00Z \
  --stop 2022-11-14T00:00:00Z \
  --predicate '_measurement="birds" AND loc="Boise"'

删除指定时间范围内的所有点

influx delete --bucket my-bucket \
  --start 2001-03-01T00:00:00Z \
  --stop 2022-11-14T00:00:00Z

使用API删除数据

使用InfluxDB API /api/v2/delete端点从InfluxDB中删除点。

POST http://localhost:8086/api/v2/delete

删除具有特定标记值的特定测量中的点

curl --request POST "http://localhost:8086/api/v2/delete?org=zhaoxi&bucket=my-bucket" \
  --header 'Authorization: Token YOUR_API_TOKEN' \
  --header 'Content-Type: application/json' \
  --data '{
    "start": "2001-03-01T00:00:00Z",
    "stop": "2022-11-14T00:00:00Z",
    "predicate": "_measurement=\"birds\" AND loc=\"Boise\""
  }'

删除指定时间范围内的所有点

curl --request POST "http://localhost:8086/api/v2/delete?org=zhaoxi&bucket=my-bucket" \
  --header 'Authorization: Token YOUR_API_TOKEN' \
  --header 'Content-Type: application/json' \
  --data '{
    "start": "2001-03-01T00:00:00Z",
    "stop": "2022-11-14T00:00:00Z"
  }'

查询数据

开始使用Flux和InfluxDB

Flux是InfluxData的功能性数据脚本语言,旨在查询、分析和处理数据。

这些指南介绍了与Flux相关的重要概念,以及使用Flux从InfluxDB查询时间序列数据。

查询InfluxDB

**1.定义数据源 **
Flux的from()函数定义了一个InfluxDB数据源。它需要一个bucket参数。以下示例使用example-bucket作为bucket名称。

from(bucket:"example-bucket")

2.指定时间范围

在查询时间序列数据时,Flux需要一个时间范围。“Unbounded”查询是资源密集型查询,作为一种保护措施,Flux不会在没有指定范围的情况下查询数据库。

使用管道转发运算符(|>)将数据从数据源管道传输到range(),后者指定查询的时间范围。它接受两个参数:startstop。开始和停止值可以是使用负持续时间的相对值,也可以是使用时间戳的绝对值。

相对时间范围示例

// Relative time range with start only. Stop defaults to now.
from(bucket:"example-bucket")
    |> range(start: -1h)

// Relative time range with start and stop
from(bucket:"example-bucket")
    |> range(start: -1h, stop: -10m)

绝对时间范围示例

from(bucket:"example-bucket")
    |> range(start: 2022-01-01T00:00:00Z, stop: 2022-10-01T12:00:00Z)

3.过滤您的数据

将范围内的数据传递到filter()以根据数据属性或列缩小结果范围。filter()有一个参数fn,它要求谓词函数按列值计算行。

filter()迭代每个输入行,并将行数据构造为流量记录。记录作为r传递到谓词函数中,并使用谓词表达式对其进行求值。

计算为false的行将从输出数据中删除。计算结果为true的行将保留在输出数据中。

// Pattern
(r) => (r.recordProperty comparisonOperator comparisonExpression)

// Example with single filter
(r) => (r._measurement == "cpu")

// Example with multiple filters
(r) => r._measurement == "cpu" and r._field != "usage_system")

对于本例,按cpu测量值、usage_system字段和cpu总标签值进行过滤:

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")

4.生成您查询的数据

yield()输出查询结果。

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> yield()

用Flux变换数据

从InfluxDB查询数据时,通常需要以某种方式转换数据。常见的例子是聚合数据、采样数据等。

Flux提供了许多执行特定操作、转换和任务的函数。您还可以在Flux查询中创建自定义函数。Flux标准库文档中详细介绍了函数。

Window your data

Flux的window()函数根据时间值对记录进行分区。使用每个参数定义每个窗口的持续时间。

# 以五分钟间隔(5m)显示窗口数据。
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)

Aggregate windowed data

通量聚合函数取每个表中的_值并以某种方式聚合它们。使用mean()函数平均每个表的_value。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()

Add times to your aggregates

在聚合值时,结果表没有_time列,因为用于聚合的记录都具有不同的时间戳。聚合函数不推断聚合值应使用的时间。因此_ time列被删除。

下一个操作中需要_time列。要添加一个,请使用duplicate()函数将_stop列复制为每个窗口表的_time列。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

Unwindow aggregate tables

使用带有every:inf参数的window()函数将所有点收集到单个无限窗口中。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> window(every: 5m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")
    |> window(every: inf)

Helper functions

这看起来像是为了构建一个聚合数据的查询而编写的大量代码,但是,通过这个过程有助于理解数据在通过每个函数时如何改变“形状”。

Flux提供(并允许您创建)抽象这些步骤的“helper”函数。使用AggregateWidow()可以完成本指南中执行的相同操作。

from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
    |> aggregateWindow(every: 5m, fn: mean)

Flux查询数据

Query fields and tags

使用filter()根据字段、标记或任何其他列值查询数据。filter()执行的操作类似于InfluxQL和其他类似SQL的查询语言中的SELECT语句和WHERE子句。

The filter() function

filter()有一个fn参数,它需要一个谓词函数,一个由一个或多个谓词表达式组成的匿名函数。谓词函数计算每个输入行。计算结果为真的行包含在输出数据中。计算为false的行将从输出数据中排除。

// ...
    |> filter(fn: (r) => r._measurement == "example-measurement" )

fn谓词函数需要一个r参数,该参数表示filter()迭代输入数据时的每一行。行记录中的键值对表示列及其值。使用点符号或括号符号来引用谓词函数中的特定列值。使用逻辑运算符将多个谓词表达式链接在一起。

// Row record
r = {foo: "bar", baz: "quz"}

// Example predicate function
(r) => r.foo == "bar" and r["baz"] == "quz"

// Evaluation results
(r) => true and true

Filter by fields and tags

from()range()filter()的组合表示最基本的Flux查询:

  1. Use from() to define your bucket.
  2. Use range() to limit query results by time.
  3. Use filter() to identify what rows of data to output.
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "example-measurement" and r.tag == "example-tag")
    |> filter(fn: (r) => r._field == "example-field")

Group

使用Flux,可以按查询数据集中的任何列对数据进行分组。“分组”将数据划分到表中,其中每行共享指定列的公共值。本指南介绍了如何在Flux中对数据进行分组,并提供了数据在流程中如何成形的示例。

Group keys

每个表都有一个组键——表中每行具有相同值的列列表。

[_start, _stop, _field, _measurement, host]

Flux中的数据分组本质上是定义输出表的组键。理解修改组键如何形成输出数据是成功地将数据分组和转换为所需输出的关键。

group() Function

Flux的group()函数定义输出表的组键,即根据特定列的值对记录进行分组。

# group() example
dataStream
    |> group(columns: ["cpu", "host"])
# Resulting group key
[cpu, host]

分组操作示例

为了说明分组是如何工作的,定义一个数据集变量,从example-bucket bucket查询系统CPU使用情况。过滤cpu标记,使其仅返回每个编号cpu核的结果。

系统操作用于所有编号CPU核的CPU。它使用正则表达式仅过滤编号的核。

dataSet = from(bucket: "example-bucket")
    |> range(start: -2m)
    |> filter(fn: (r) => r._field == "usage_system" and r.cpu =~ /cpu[0-9*]/)
    |> drop(columns: ["host"])

Sort and limit

使用sort()按特定列对每个表中的记录进行排序,使用limit()将输出表中记录的数量限制为一个固定数n。

以下示例首先按区域、主机和值排序系统正常运行时间。

from(bucket: "example-bucket")
    |> range(start: -12h)
    |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime")
    |> sort(columns: ["region", "host", "_value"])

limit()函数将输出表中的记录数限制为固定数n。以下示例显示了过去一个小时内最多10条记录。

from(bucket:"example-bucket")
    |> range(start:-1h)
    |> limit(n:10)

您可以同时使用sort()和limit()来显示前N个记录。下面的示例返回10个最重要的系统正常运行时间值,先按区域排序,然后按主机排序,再按值排序。

from(bucket: "example-bucket")
    |> range(start: -12h)
    |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime")
    |> sort(columns: ["region", "host", "_value"])
    |> limit(n: 10)

现在,您已经创建了一个Flux查询,用于对数据进行排序和限制。Flux还提供了top()和bottom()函数来同时执行这两个函数。

Window & aggregate

对时间序列数据执行的一个常见操作是将数据分组到时间窗口,或“windowing”数据,然后将窗口化值聚合为新值。本指南介绍了使用Flux对数据进行窗口化和聚合的过程,并演示了数据在该过程中是如何形成的。

在本指南中,定义一个表示基本数据集的变量。以下示例查询主机的内存使用情况。

dataSet = from(bucket: "example-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> drop(columns: ["host"])

Windowing data

使用window()函数根据时间界限对数据进行分组。与window()一起传递的最常见参数是every,它定义了窗口之间的持续时间。其他参数可用,但在本例中,将基本数据集设置为一分钟窗口。

dataSet
    |> window(every: 1m)

每个参数都支持所有有效的持续时间单位,包括日历月(1mo)和年(1y)。

Aggregate data

聚合函数获取表中所有行的值,并使用它们执行聚合操作。结果作为新值输出到单行表中。

由于窗口数据被拆分为单独的表,因此聚合操作分别针对每个表运行,并输出仅包含聚合值的新表。

对于本例,使用mean()函数输出每个窗口的平均值:

dataSet
    |> window(every: 1m)
    |> mean()

Recreate the time column

注意_time列不在聚合输出表中。由于每个表中的记录都聚合在一起,因此它们的时间戳不再适用,列将从组键和表中删除。

还要注意_start和_stop列仍然存在。这些表示时间窗口的下限和上限。

许多Flux函数依赖于_时间列。要在聚合函数之后进一步处理数据,需要重新添加_time。使用duplicate()函数将_start或_stop列复制为新的_time列。

dataSet
    |> window(every: 1m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

“Unwindow” aggregate tables

将聚合值保存在单独的表中通常不是您想要的数据格式。使用window()函数将数据“unwindow”到单个无限(inf)窗口中。

dataSet
    |> window(every: 1m)
    |> mean()
    |> duplicate(column: "_stop", as: "_time")
    |> window(every: inf)

窗口化需要_time列,这就是为什么在聚合后需要重新创建_time列的原因。

Summing up

现在,您已经创建了一个Flux查询,用于windows和聚合数据。本指南中概述的数据转换过程应用于所有聚合操作。

Flux还提供了AggregateWidow()函数,它为您执行所有这些单独的函数。

以下Flux查询将返回相同的结果:

dataSet
    |> aggregateWindow(every: 1m, fn: mean)

Explore your Schema

Flux提供的函数可以让您探索InfluxDB中存储的数据的结构和模式。

List Bucket

使用bucket()列出组织中的bucket。

buckets()

List measurements

使用schema。measurements()列出桶中的measurements。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.measurements(bucket: "example-bucket")

List field keys

使用schma.fieldKeys:列出存储桶中的字段键。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.fieldKeys(bucket: "example-bucket")

List fields in a measurement

使用schema.measurementFieldKeys用于列出measurement中的字段键。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.measurementFieldKeys(
    bucket: "example-bucket",
    measurement: "example-measurement",
)

List tag keys

使用schema.tagKeys()列出桶中的标记键。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.tagKeys(bucket: "example-bucket")

List tag keys in a measurement

使用schema.measurementTagKeys列出测量中的标记键。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.measurementTagKeys(
    bucket: "example-bucket",
    measurement: "example-measurement",
)

List tag values

使用schema.tagValues()列出桶中给定标记的标记值。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.tagValues(bucket: "example-bucket", tag: "example-tag")

List tag values in a measurement

使用schema.measurementTagValues用于列出测量中给定标签的标签值。默认情况下,此函数返回最近30天的结果。

import "influxdata/influxdb/schema"

schema.measurementTagValues(
    bucket: "example-bucket",
    tag: "example-tag",
    measurement: "example-measurement",
)

Transform data with math

Flux是InfluxData的数据脚本和查询语言,支持数据转换中的数学表达式。本文描述了如何使用通量算术运算符“映射”数据,并使用数学运算转换值。

Basic mathematic operations

// Examples executed using the Flux REPL
> 9 + 9
18
> 22 - 14
8
> 6 * 5
30
> 21 / 7
3

100 // Parsed as an integer
100.0 // Parsed as a float

// Example evaluations
> 20 / 8
2

> 20.0 / 8.0
2.5

Custom mathematic functions

multiply = (x, y) => x * y

multiply(x: 10, y: 12)
// Returns 120
percent = (sample, total) => (sample / total) * 100.0

percent(sample: 20.0, total: 80.0)
// Returns 25.0

Transform values in a data stream

要转换输入流中的多个值,函数需要:

  • 处理管道转发数据 。
  • 计算所需的每一个操作数都存在于每一行。
  • 使用map()函数迭代每一行。

下面的示例multiplyByX()函数包括:

  • A tables parameter that represents the input data stream (<-).
  • An x parameter which is the number by which values in the _value column are multiplied.
  • A map() function that iterates over each row in the input stream. It uses the with operator to preserve existing columns in each row. It also multiples the _value column by x.
multiplyByX = (x, tables=<-) => tables
    |> map(fn: (r) => ({r with _value: r._value * x}))

data
    |> multiplyByX(x: 10)

Examples

Convert bytes to gigabytes

To convert active memory from bytes to gigabytes (GB), divide the active field in the mem measurement by 1,073,741,824.

The map() function iterates over each row in the piped-forward data and defines a new _value by dividing the original _value by 1073741824.

# 1048576 MB
# 1073741824 GB
from(bucket: "my-bucket")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "active")
    |> map(fn: (r) => ({r with _value: r._value / 1073741824}))

您可以将相同的计算转换为函数:

bytesToGB = (tables=<-) => tables
    |> map(fn: (r) => ({r with _value: r._value / 1073741824}))

data
    |> bytesToGB()

包括部分千兆字节

因为原始度量(字节)是整数,所以操作的输出是整数,不包括部分GBs。要计算部分GBs,请使用float()函数将_value列及其值转换为浮点,并将除法运算中的分母格式化为浮点。

bytesToGB = (tables=<-) => tables
    |> map(fn: (r) => ({r with _value: float(v: r._value) / 1073741824.0}))

计算百分比

要计算百分比,请使用简单除法,然后将结果乘以100。

> 1.0 / 4.0 * 100.0
25.0

Pivot vs join

要在Flux中的数学运算中查询和使用值,操作数值必须存在于一行中。pivot()join()都会这样做,但两者之间有重要区别:

Pivot更具性能

pivot()读取并操作单个数据流。join()需要两个数据流,读取和合并这两个流的开销可能很大,特别是对于较大的数据集。

对多个数据源使用联接

查询来自不同存储桶或数据源的数据时使用join()

Pivot fields into columns for mathematic calculations

data
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({r with _value: (r.field1 + r.field2) / r.field3 * 100.0}))

Join multiple data sources for mathematic calculations

import "sql"
import "influxdata/influxdb/secrets"

pgUser = secrets.get(key: "POSTGRES_USER")
pgPass = secrets.get(key: "POSTGRES_PASSWORD")
pgHost = secrets.get(key: "POSTGRES_HOST")

t1 = sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://${pgUser}:${pgPass}@${pgHost}",
    query: "SELECT id, name, available FROM example_table",
)

t2 = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field")

join(tables: {t1: t1, t2: t2}, on: ["id"])
    |> map(fn: (r) => ({r with _value: r._value_t2 / r.available_t1 * 100.0}))

Calculate percentages

从查询数据计算百分比是时间序列数据的常见用例。要计算通量百分比,操作数必须在每行中。使用map()重新映射行中的值并计算百分比。

  1. Use from(), range() and filter() to query operands.
  2. Use pivot() or join() to align operand values into rows.
  3. Use map() to divide the numerator operand value by the denominator operand value and multiply by 100.
from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "m1" and r._field =~ /field[1-2]/ )
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({ r with _value: r.field1 / r.field2 * 100.0 }))

Cumulative sum

使用cumulativeSum()函数计算值的运行总数。cumulativeSum对后续记录的值求和,并返回用求和总数更新的每一行。

Calculate the running total of values

data
    |> cumulativeSum()

Use cumulativeSum() with aggregateWindow()

AggregateWidow()将数据分段到时间窗口,将每个窗口中的数据聚合到单个点,然后删除基于时间的分段。它主要用于对数据进行下采样。

AggregateWidow()需要一个聚合函数,该函数为每个时间窗口返回一行。要将cumulativeSum()AggregateWidow一起使用,请在aggregateWidow()中使用sum,然后使用CumulativeSum()计算聚合值的运行总数。

data
    |> aggregateWindow(every: 5m, fn: sum)
    |> cumulativeSum()

Fill

使用fill()将空值替换为:上一个非空值/指定值

data
    |> fill(usePrevious: true)

// OR

data
    |> fill(value: 0.0)

First & Last

使用first()last()返回输入表中的第一条或最后一条记录。

data
    |> first()

// OR

data
    |> last()

Histograms

直方图提供了对数据分布的有价值的见解。本指南介绍如何使用Flux的histogram()函数将数据转换为累积直方图。

histogram() function

histogram()函数通过计算“bins”列表的数据频率来近似数据集的累积分布。bin只是一个数据点落在其中的范围。小于或等于边界的所有数据点都将在容器中计数。在直方图输出中,添加一列(le),该列表示每个bin的上界。Bin计数是累积的。

from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> histogram(bins: [0.0, 10.0, 20.0, 30.0, 40.0])

Bin helper functions

Flux提供了两个辅助函数,用于生成直方图箱。每个函数生成一个浮点数组,该数组设计用于histogram()函数的bins参数。

linearBins()

函数的作用是:生成一个线性分隔的浮点列表。

linearBins(start: 0.0, width: 10.0, count: 10)

// Generated list: [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, +Inf]

logarithmicBins()

函数的作用是:生成一个指数分隔的浮点列表。

logarithmicBins(start: 1.0, factor: 2.0, count: 10, infinity: true)

// Generated list: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, +Inf]

Examples

生成具有线性箱的直方图

from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> histogram(bins: linearBins(start: 65.5, width: 0.5, count: 20, infinity: false))

生成具有对数仓的直方图

from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> histogram(bins: logarithmicBins(start: 0.5, factor: 2.0, count: 10, infinity: false))

按严重程度可视化错误

使用Telegraf Syslog插件从系统中收集错误信息。查询syslog测量中的severity_code字段:

from(bucket: "example-bucket")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r._measurement == "syslog" and r._field == "severity_code")

Increase

使用increase()跟踪表中多列的增量。当跟踪计数器值随时间变化或定期重置时,此函数特别有用。

data
    |> increase()

Join

join()函数将两个或多个输入流合并为一个输出流,这些输入流的值在一组公共列上相等。Flux允许您连接两个数据流之间的任何公共列,并为跨测量连接和跨测量数学等操作打开了大门。

为了说明连接操作,使用Telegraf捕获并存储在InfluxDB内存使用和进程中的数据。

在本指南中,我们将连接两个数据流,一个表示内存使用情况,另一个表示正在运行的进程总数,然后计算每个正在运行进程的平均内存使用情况。

为了执行连接,必须有两个数据流。为每个数据流分配一个变量。

Memory used variable

定义一个memUsed变量,用于过滤mem测量值和已用字段。这返回使用的内存量(以字节为单位)。

memUsed = from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used")

Total processes variable

定义一个procTotal变量,用于过滤过程测量和总计字段。这将返回正在运行的进程数。

procTotal = from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "processes" and r._field == "total")

Join the two data streams

定义了两个数据流后,使用join()函数将它们连接在一起。join()需要两个参数:

table: 要与键连接的表的映射,这些表将通过这些键进行别名。在下面的示例中,memmemuse的别名,procprocTotal的别名。

on: 一个字符串数组,用于定义要连接表的列。两个表都必须在此列表中指定所有列。

join(
    tables: {mem:memUsed, proc:procTotal},
    on: ["_time", "_stop", "_start", "host"],
)

Real world example

以下函数通过连接httpdwrite measurement中的字段来计算写入InfluxDB集群的批大小,以便比较pointReqwriteReq。结果按群集ID分组,以便可以跨群集进行比较。

batchSize = (cluster_id, start=-1m, interval=10s) => {
    httpd = from(bucket: "telegraf")
        |> range(start: start)
        |> filter(fn: (r) => r._measurement == "influxdb_httpd" and r._field == "writeReq" and r.cluster_id == cluster_id)
        |> aggregateWindow(every: interval, fn: mean)
        |> derivative(nonNegative: true, unit: 60s)

    write = from(bucket: "telegraf")
        |> range(start: start)
        |> filter(fn: (r) => r._measurement == "influxdb_write" and r._field == "pointReq" and r.cluster_id == cluster_id)
        |> aggregateWindow(every: interval, fn: max)
        |> derivative(nonNegative: true, unit: 60s)

    return join(tables: {httpd: httpd, write: write}, on: ["_time", "_stop", "_start", "host"])
        |> map(fn: (r) => ({_time: r._time, _value: r._value_httpd / r._value_write}))
        |> group(columns: cluster_id)
}

batchSize(cluster_id: "enter cluster id here")

Median

使用median()函数返回表示输入数据的0.5分位数(第50百分位数)或中值的值。

选择以下方法之一计算中值:

  • estimate_tdigest
  • exact_mean
  • exact_selector

estimate_tdigest

(默认)一种聚合方法,使用t摘要数据结构计算大型数据源的精确0.5分位数估计。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

estimate_tdigest returns:

_value
1.5

exact_mean

取最接近0.5分位数值的两个点的平均值的聚合方法。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

exact_mean returns:

_value
1.5

exact_selector

一种选择器方法,返回至少50%的点小于的数据点。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

exact_selector returns:

_time _value
2020-01-01T00:02:00Z 1.0

Moving average

MovingAverage()TimedMovingAgerage()返回数据的移动平均值。

data
    |> movingAverage(n: 5)

// OR

data
    |> timedMovingAverage(every: 5m, period: 10m)

movingAverage()

对于表中的每一行,movingAverage()返回当前值和先前值的平均值,其中n是用于计算平均值的值总数。

Given the following input

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.2
2020-01-01T00:03:00Z 1.8
2020-01-01T00:04:00Z 0.9
2020-01-01T00:05:00Z 1.4
2020-01-01T00:06:00Z 2.0

The following would return:

|> movingAverage(n: 3)
_time _value
2020-01-01T00:03:00Z 1.33
2020-01-01T00:04:00Z 1.30
2020-01-01T00:05:00Z 1.36
2020-01-01T00:06:00Z 1.43

timedMovingAverage()

对于表中的每一行,timedMovingAverage()返回当前值和前一期间(持续时间)中所有行值的平均值。它以每个参数定义的频率返回移动平均值。

下图中的每种颜色表示用于计算平均值的时间段和返回表示平均值的点的时间。如果间隔=30m,周期=1h:

Given the following input:

_time _value
2020-01-01T00:00:00Z 1.0
2020-01-01T00:30:00Z 1.2
2020-01-01T01:00:00Z 1.8
2020-01-01T01:30:00Z 0.9
2020-01-01T02:00:00Z 1.4
2020-01-01T02:30:00Z 2.0
2020-01-01T03:00:00Z 1.9

The following would return:

|> timedMovingAverage(every: 30m, period: 1h)
_time _value
2020-01-01T00:30:00Z 1.0
2020-01-01T01:00:00Z 1.1
2020-01-01T01:30:00Z 1.5
2020-01-01T02:00:00Z 1.35
2020-01-01T02:30:00Z 1.15
2020-01-01T03:00:00Z 1.7
2020-01-01T03:00:00Z 2

Percentile & quantile

使用quantile()函数返回表示输入数据的q分位数或百分位数的值。

Percentile versus quantile

百分位数和分位数非常相似,只是用于计算返回值的数量不同。使用0和100之间的数字计算百分位数。使用0.0和1.0之间的数字来计算分位数。例如,0.5分位数与第50个百分位数相同。

选择以下方法之一来计算分位数:

  • estimate_tdigest
  • exact_mean
  • exact_selector

estimate_tdigest

(默认)一种聚合方法,使用t摘要数据结构计算大型数据源的精确0.5分位数估计。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

estimate_tdigest returns:

_value
1.5

exact_mean

取最接近0.5分位数值的两个点的平均值的聚合方法。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

exact_mean returns:

_value
1.5

exact_selector

一种选择器方法,返回至少50%的点小于的数据点。输出表由包含计算中值的单行组成。

Given the following input table:

_time _value
2020-01-01T00:01:00Z 1.0
2020-01-01T00:02:00Z 1.0
2020-01-01T00:03:00Z 2.0
2020-01-01T00:04:00Z 3.0

exact_selector returns:

_time _value
2020-01-01T00:02:00Z 1.0

查找表示第99百分位的值

使用默认方法“estimate_tdigest”返回表中包含表中第99百分位数据值的所有行。

data
    |> quantile(q: 0.99)

找到最接近分位数的值的平均值

使用exact_mean方法为每个输入表返回一行,其中包含最接近表中数据的数学分位数的两个值的平均值。例如,为了计算0.99分位数:

data
    |> quantile(q: 0.99, method: "exact_mean")

找到具有分位数值的点

使用exact_selector方法为每个输入表返回一行,其中包含表中q*100%值小于的值。例如,为了计算0.99分位数:

data
    |> quantile(q: 0.99, method: "exact_selector")

quantile()AggregateWidow()一起使用

AggregateWidow()将数据分段到时间窗口,将每个窗口中的数据聚合到单个点,然后删除基于时间的分段。它主要用于对数据进行下采样。

要在AggregateWidow()中指定分位数计算方法,请使用完整函数语法:

data
    |> aggregateWindow(
        every: 5m,
        fn: (tables=<-, column) => tables
            |> quantile(q: 0.99, method: "exact_selector"),
    )

Rate

使用derivative()计算后续值或聚合之间的变化率。rate()计算每个时间窗口的平均变化率。如果点之间的时间不同,这些函数将点归一化为一个公共时间间隔,使值易于比较。

  • 后续值之间的变化率
  • 每个时间窗口的平均变化率

后续值之间的变化率

使用derivative()函数计算后续非空值之间每单位时间的变化率。

data
    |> derivative(unit: 1s)

默认情况下,derivative()只返回正导数值,并用null替换负值。计算值作为浮点返回

Given the following input:

_time _value
2020-01-01T00:00:00Z 250
2020-01-01T00:04:00Z 160
2020-01-01T00:12:00Z 150
2020-01-01T00:19:00Z 220
2020-01-01T00:32:00Z 200
2020-01-01T00:51:00Z 290
2020-01-01T01:00:00Z 340

derivative(unit: 1m) returns:

_time _value
2020-01-01T00:04:00Z
2020-01-01T00:12:00Z
2020-01-01T00:19:00Z 10.0
2020-01-01T00:32:00Z
2020-01-01T00:51:00Z 4.74
2020-01-01T01:00:00Z 5.56

结果表示后续值之间每分钟的变化率,负值设置为空。

返回负导数值

若要返回负导数值,请将nonNegative参数设置为false,

Given the following input:

_time _value
2020-01-01T00:00:00Z 250
2020-01-01T00:04:00Z 160
2020-01-01T00:12:00Z 150
2020-01-01T00:19:00Z 220
2020-01-01T00:32:00Z 200
2020-01-01T00:51:00Z 290
2020-01-01T01:00:00Z 340

The following returns:

|> derivative(unit: 1m, nonNegative: false)
_time _value
2020-01-01T00:04:00Z -22.5
2020-01-01T00:12:00Z -1.25
2020-01-01T00:19:00Z 10.0
2020-01-01T00:32:00Z -1.54
2020-01-01T00:51:00Z 4.74
2020-01-01T01:00:00Z 5.56

结果表示后续值之间每分钟的变化率,包括负值。

每个时间窗口的平均变化率

使用聚合。rate()函数计算每个时间窗口的平均变化率。

import "experimental/aggregate"

data
    |> aggregate.rate(
        every: 1m,
        unit: 1s,
        groupColumns: ["tag1", "tag2"],
    )

aggregate.rate()返回由every定义的时间间隔内每单位时间的平均变化率(以浮点形式)。负值被替换为null。

Given the following input:

_time _value
2020-01-01T00:00:00Z 250
2020-01-01T00:04:00Z 160
2020-01-01T00:12:00Z 150
2020-01-01T00:19:00Z 220
2020-01-01T00:32:00Z 200
2020-01-01T00:51:00Z 290
2020-01-01T01:00:00Z 340

The following returns:

|> aggregate.rate(
    every: 20m,
    unit: 1m,
)
_time _value
2020-01-01T00:20:00Z 10.00
2020-01-01T00:40:00Z
2020-01-01T01:00:00Z 4.74
2020-01-01T01:20:00Z 5.56

结果表示每20分钟间隔的平均每分钟变化率,负值设置为空。时间戳表示用于平均值的时间窗口的右边界。

Conditional logic

Flux提供了if、then和else条件表达式,允许强大而灵活的Flux查询。

条件表达式语法

// Pattern
if <condition> then <action> else <alternative-action>

// Example
if color == "green" then "008000" else "ffffff"

条件表达式在以下上下文中最有用:

  • 定义变量时。

  • 当使用一次操作一行的函数时(filter()map()reduce())。

Evaluating conditional expressions

Flux按顺序计算语句,并在条件匹配时停止计算。

例如,给定以下语句:

if r._value > 95.0000001 and r._value <= 100.0 then
    "critical"
else if r._value > 85.0000001 and r._value <= 95.0 then
    "warning"
else if r._value > 70.0000001 and r._value <= 85.0 then
    "high"
else
    "normal"

当r._value为96时,输出为“临界”,不评估剩余条件。

Examples

  • 有条件地设置变量的值
  • 创建条件筛选器
  • 使用map()有条件地转换列值
  • 使用reduce()有条件地递增计数

有条件地设置变量的值

以下示例基于dueDate变量与now()的关系设置过期变量。

dueDate = 2019-05-01
overdue = if dueDate < now() then true else false

创建条件筛选器

以下示例使用示例度量仪表板变量更改查询筛选数据的方式。度量有三个可能的值:

from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(
        fn: (r) => if v.metric == "Memory" then
            r._measurement == "mem" and r._field == "used_percent"
        else if v.metric == "CPU" then
            r._measurement == "cpu" and r._field == "usage_user"
        else if v.metric == "Disk" then
            r._measurement == "disk" and r._field == "used_percent"
        else
            r._measurement != "",
    )

使用map()有条件地转换列值

以下示例使用map()函数有条件地转换列值。它基于_value列将级别列设置为特定字符串。

from(bucket: "my-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> map(
        fn: (r) => ({
            // Retain all existing columns in the mapped row
            r with
            // Set the level column value based on the _value column
            level: if r._value >= 95.0000001 and r._value <= 100.0 then
                "critical"
            else if r._value >= 85.0000001 and r._value <= 95.0 then
                "warning"
            else if r._value >= 70.0000001 and r._value <= 85.0 then
                "high"
            else
                "normal",
        }),
    )

使用reduce()有条件地递增计数

以下示例使用AggregateWidow()reduce()函数计算每五分钟窗口中超过定义阈值的记录数。

threshold = 28.0
from(bucket: "my-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    // Aggregate data into 5 minute windows using a custom reduce() function
    |> aggregateWindow(
        every: 5m,
        // Use a custom function in the fn parameter.
        // The aggregateWindow fn parameter requires 'column' and 'tables' parameters.
        fn: (column, tables=<-) => tables
            |> reduce(
                identity: {above_threshold_count: 0.0},
                fn: (r, accumulator) => ({
                    // Conditionally increment above_threshold_count if
                    // r.value exceeds the threshold
                    above_threshold_count: if r._value >= threshold then
                        accumulator.above_threshold_count + 1.0
                    else
                        accumulator.above_threshold_count + 0.0,
                }),
            ),
    )

Custom functions

Flux的函数语法允许您创建自定义函数。本指南介绍了创建自己函数的基本知识。

  • 函数定义语法
  • 在自定义函数中使用管道转发数据
  • 定义参数默认值
  • 定义具有作用域变量的函数

函数定义语法

// Basic function definition syntax
functionName = (functionParameters) => functionOperations

functionName:用于调用Flux脚本中函数的名称。

functionParameters:传递到函数中并在其操作中使用的参数的逗号分隔列表。可以为每个定义参数默认值。

functionOperations:将输入操作为所需输出的操作和函数。

Basic function examples

// Function definition
square = (n) => n * n

// Function usage
> square(n:3)
9
// Function definition
multiply = (x, y) => x * y

// Function usage
> multiply(x: 2, y: 15)
30

在自定义函数中使用管道转发数据

处理过程中的大多数Flux函数都通过管道转发数据。要处理管道转发数据,函数参数之一必须使用<-pipe-receive表达式捕获输入表。

在下面的示例中,tables参数被分配给<-表达式,该表达式表示向前传递到函数中的所有数据。然后,表被管道转发到函数定义中的其他操作中。

functionName = (tables=<-) => tables |> functionOperations

Pipe-forwardable function example

下面的示例定义了一个multByX函数,该函数将输入表中每行的_value列乘以x参数。它使用map()函数修改每个_value。

// Function definition
multByX = (tables=<-, x) => tables
    |> map(fn: (r) => ({r with _value: r._value * x}))

// Function usage
from(bucket: "example-bucket")
    |> range(start: -1m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent")
    |> multByX(x: 2.0)

定义参数默认值

使用=赋值运算符为函数定义中的函数参数指定默认值:

functionName = (param1=defaultValue1, param2=defaultValue2) => functionOperation

通过在函数调用中显式定义参数来覆盖默认值。

Example functions with defaults

下面的示例定义了一个排行榜函数,该函数返回按指定列中的值排序的有限数量的记录。它使用sort()函数按降序或升序对记录进行排序。然后使用limit()函数从排序表中返回指定数量的记录。

// Function definition
leaderBoard = (tables=<-, limit=4, columns=["_value"], desc=true) => tables
    |> sort(columns: columns, desc: desc)
    |> limit(n: limit)

// Function usage
// Get the 4 highest scoring players
from(bucket: "my-bucket")
    |> range(start: -1m)
    |> filter(fn: (r) => r._measurement == "player-stats" and r._field == "total-points")
    |> leaderBoard()

// Get the 10 shortest race times
from(bucket: "example-bucket")
    |> range(start: -1m)
    |> filter(fn: (r) => r._measurement == "race-times" and r._field == "elapsed-time")
    |> leaderBoard(limit: 10, desc: false)

定义具有作用域变量的函数

要创建变量作用域为函数的自定义函数,请将函数操作和变量放在块({})中,并使用return语句返回特定变量。

functionName = (functionParameters) => {
    exampleVar = "foo"
    
    return exampleVar
}

具有作用域变量的示例函数

基于值返回警报级别,以下函数使用条件逻辑根据数字输入值返回警报级别:

alertLevel = (v) => {
    level = if float(v: v) >= 90.0 then
        "crit"
    else if float(v: v) >= 80.0 then
        "warn"
    else if float(v: v) >= 65.0 then
        "info"
    else
        "ok"

    return level
}

alertLevel(v: 87.3)
// Returns "warn"

将十六进制颜色代码转换为名称,以下函数将十六进制(HEX)颜色代码转换为等效的HTML颜色名称。函数使用Flux dictionary包创建十六进制代码及其对应名称的字典。

import "dict"

hexName = (hex) => {
    hexNames = dict.fromList(
        pairs: [
            {key: "#00ffff", value: "Aqua"},
            {key: "#000000", value: "Black"},
            {key: "#0000ff", value: "Blue"},
            {key: "#ff00ff", value: "Fuchsia"},
            {key: "#808080", value: "Gray"},
            {key: "#008000", value: "Green"},
            {key: "#00ff00", value: "Lime"},
            {key: "#800000", value: "Maroon"},
            {key: "#000080", value: "Navy"},
            {key: "#808000", value: "Olive"},
            {key: "#800080", value: "Purple"},
            {key: "#ff0000", value: "Red"},
            {key: "#c0c0c0", value: "Silver"},
            {key: "#008080", value: "Teal"},
            {key: "#ffffff", value: "White"},
            {key: "#ffff00", value: "Yellow"},
        ],
    )
    name = dict.get(dict: hexNames, key: hex, default: "No known name")

    return name
}

hexName(hex: "#000000")
// Returns "Black"

hexName(hex: "#8b8b8b")
// Returns "No known name"

Exists

使用exists运算符检查行记录是否包含列或列的值是否为空。

(r) => exists r.column

与行函数(filter()map()reduce())一起使用exists来检查行是否包含列或该列的值是否为空。

筛选空值

from(bucket: "example-bucket")
    |> range(start: -5m)
    |> filter(fn: (r) => exists r._value)

基于存在的映射值

from(bucket: "default")
    |> range(start: -30s)
    |> map(
        fn: (r) => ({r with
            human_readable: if exists r._value then
                "${r._field} is ${string(v: r._value)}."
            else
                "${r._field} has no value.",
        }),
    )

忽略自定义聚合函数中的空值

customSumProduct = (tables=<-) => tables
    |> reduce(
        identity: {sum: 0.0, product: 1.0},
        fn: (r, accumulator) => ({r with
            sum: if exists r._value then
                r._value + accumulator.sum
            else
                accumulator.sum,
            product: if exists r._value then
                r.value * accumulator.product
            else
                accumulator.product,
        }),
    )

检查静态定义的记录是否包含键

当您使用记录文字语法静态定义记录时,Flux知道记录类型和期望的键。

  • 如果密钥存在于静态记录中,则exists返回true。

  • 如果键存在于静态记录中,但具有空值,则exists返回false。

  • 如果静态记录中不存在键,因为记录类型是静态已知的,exists将返回错误。

import "internal/debug"

p = {
    firstName: "John",
    lastName: "Doe",
    age: 42,
    height: debug.null(type: "int"),
}

exists p.firstName
// Returns true

exists p.height
// Returns false

exists p.hairColor
// Returns "error: record is missing label hairColor"

Extract scalar values

使用Flux动态查询函数从Flux查询输出中提取标量值。例如,这允许您使用查询结果动态设置变量。

要从输出中提取标量值:

  • 从输入流提取列或从输入流中提取行。

  • 使用返回的数组或记录引用标量值。

Table extraction

Flux将查询结果格式化为table流。findColumn()findRecord()都提取table流中的第一个表,这些表的组键值与fn谓词函数匹配。

Extract a column

使用findColumn()函数输出提取表中特定列的值数组。

sampleData
    |> findColumn(
        fn: (key) => key._field == "temp" and key.location == "sfo",
        column: "_value",
    )

// Returns [65.1, 66.2, 66.3, 66.8]

Use extracted column values

使用变量存储值数组。在下面的示例中,SFOTemps表示值数组。引用数组中的特定索引(从0开始的整数)以返回该索引处的值。

SFOTemps = sampleData
    |> findColumn(
        fn: (key) => key._field == "temp" and key.location == "sfo",
        column: "_value",
    )

SFOTemps
// Returns [65.1, 66.2, 66.3, 66.8]

SFOTemps[0]
// Returns 65.1

SFOTemps[2]
// Returns 66.3

Extract a row

使用findRecord()函数从提取的表中的一行输出数据。使用idx参数指定要输出的行的索引。该函数为每列输出一个带有键值对的记录。

sampleData
    |> findRecord(
        fn: (key) => key._field == "temp" and key.location == "sfo",
        idx: 0,
    )

// Returns {
//   _time:2019-11-11T12:00:00Z,
//   _field:"temp",
//   location:"sfo",
//   _value: 65.1
// }

Use an extracted row record

使用变量存储提取的行记录。在下面的示例中,tempInfo表示提取的行。使用点或括号表示法引用记录中的键。

tempInfo = sampleData
    |> findRecord(
        fn: (key) => key._field == "temp" and key.location == "sfo",
        idx: 0,
    )

tempInfo
// Returns {
//   _time:2019-11-11T12:00:00Z,
//   _field:"temp",
//   location:"sfo",
//   _value: 65.1
// }

tempInfo._time
// Returns 2019-11-11T12:00:00Z

tempInfo.location
// Returns sfo

Example helper functions

创建自定义辅助函数以从查询输出中提取标量值。

Extract a scalar field value

// Define a helper function to extract field values
getFieldValue = (tables=<-, field) => {
    extract = tables
        |> findColumn(fn: (key) => key._field == field, column: "_value")

    return extract[0]
}

// Use the helper function to define a variable
lastJFKTemp = sampleData
    |> filter(fn: (r) => r.location == "kjfk")
    |> last()
    |> getFieldValue(field: "temp")

lastJFKTemp
// Returns 71.2

Extract scalar row data

// Define a helper function to extract a row as a record
getRow = (tables=<-, field, idx=0) => {
    extract = tables
        |> findRecord(fn: (key) => true, idx: idx)

    return extract
}

// Use the helper function to define a variable
lastReported = sampleData
    |> last()
    |> getRow(field: "temp")

"The last location to report was ${lastReported.location}.
The temperature was ${string(v: lastReported._value)}°F."

// Returns:
// The last location to report was kord.
// The temperature was 38.9°F.

Sample data

以下示例数据集表示从三个位置收集的虚构温度度量。将其格式化为带注释的CSV,并使用CSV导入Flux查询。from()函数。

在查询的开头放置以下内容以使用示例数据:

import "csv"

sampleData = csv.from(csv: "
#datatype,string,long,dateTime:RFC3339,string,string,double
#group,false,true,false,true,true,false
#default,,,,,,
,result,table,_time,location,_field,_value
,,0,2019-11-01T12:00:00Z,sfo,temp,65.1
,,0,2019-11-01T13:00:00Z,sfo,temp,66.2
,,0,2019-11-01T14:00:00Z,sfo,temp,66.3
,,0,2019-11-01T15:00:00Z,sfo,temp,66.8
,,1,2019-11-01T12:00:00Z,kjfk,temp,69.4
,,1,2019-11-01T13:00:00Z,kjfk,temp,69.9
,,1,2019-11-01T14:00:00Z,kjfk,temp,71.0
,,1,2019-11-01T15:00:00Z,kjfk,temp,71.2
,,2,2019-11-01T12:00:00Z,kord,temp,46.4
,,2,2019-11-01T13:00:00Z,kord,temp,46.3
,,2,2019-11-01T14:00:00Z,kord,temp,42.7
,,2,2019-11-01T15:00:00Z,kord,temp,38.9
")

Geo-temporal data

Monitor states

Flux可帮助您监控指标和事件中的状态:

  • 找出状态持续的时间

  • 计算连续状态的数量

找出状态持续的时间

使用stateDuration()计算具有指定状态的连续行的持续时间。对于与指定状态匹配的每个连续点,stateDuration()递增并将持续时间(以指定单位)存储在用户定义的列中。

Include the following information:

  • Column to search: 任何标记键、标记值、字段键、字段值或测量值。
  • Value: 要在指定列中搜索的值(或状态)。
  • State duration column: 用于存储状态持续时间的新列─指定值持续的时间长度。
  • Unit: 用于增加状态持续时间的时间单位(默认为1s、1m、1h)。
data
    |> stateDuration(fn: (r) => r.column_to_search == "value_to_search_for", column: "state_duration", unit: 1s)
  • 对于评估为“true”的第一个点,状态持续时间设置为“0”。对于每个评估为“true”的连续点,状态持续时间增加每个连续点之间的时间间隔(以指定单位)。
  • 如果状态为“false”,则状态持续时间重置为“1”。

使用stateDuration()的示例查询

以下查询搜索过去5分钟内的doors桶,以查找门关闭的秒数。

from(bucket: "doors")
    |> range(start: -5m)
    |> stateDuration(fn: (r) => r._value == "closed", column: "door_closed", unit: 1s)

在本例中,door_closed是状态持续时间列。如果每分钟向doors桶写入数据,则_value关闭的每个连续点的状态持续时间增加60秒。如果_value未关闭,则状态持续时间重置为0。

计算连续状态的数量

使用stateCount()函数并包括以下信息:

  • Column to search: 任何标记键、标记值、字段键、字段值或测量值。
  • Value: 要在指定列中搜索。
  • State count column: 用于存储状态计数的新列─存在指定值的连续记录数。
|> stateCount(
    fn: (r) => r.column_to_search == "value_to_search_for",
    column: "state_count",
)
  • 对于评估为“true”的第一个点,状态计数设置为“1”。对于每个评估为“true”的连续点,状态计数增加1。
  • 如果状态为“false”,则状态计数重置为“1”。

使用stateCount()的示例查询

以下查询搜索过去5分钟内的doors桶,并计算有多少点关闭作为其_value。

from(bucket: "doors")
    |> range(start: -5m)
    |> stateCount(fn: (r) => r._value == "closed", column: "door_closed")

本示例将状态计数存储在door_closed列中。如果每分钟向doors桶写入数据,则_value关闭的每个连续点的状态计数增加1。如果_value未关闭,则状态计数重置为-1。

计算机器状态的示例查询

以下查询每分钟检查一次机器状态(空闲、分配或繁忙)。InfluxDB搜索过去一小时内的服务器存储桶,并统计机器状态为空闲、已分配或繁忙的记录。

from(bucket: "servers")
    |> range(start: -1h)
    |> filter(fn: (r) => r.machine_state == "idle" or r.machine_state == "assigned" or r.machine_state == "busy")
    |> stateCount(fn: (r) => r.machine_state == "busy", column: "_count")
    |> stateCount(fn: (r) => r.machine_state == "assigned", column: "_count")
    |> stateCount(fn: (r) => r.machine_state == "idle", column: "_count")

Operate on timestamps

InfluxDB中存储的每个点都有一个关联的时间戳。使用Flux处理和操作时间戳以满足您的需要。

  • 转换时间戳格式

  • 计算两个时间戳之间的持续时间

  • 检索当前时间

  • 规范化不规则时间戳

  • 同时使用时间戳和持续时间

转换时间戳格式

Unix nanosecond to RFC3339
Use the time() function to convert a Unix nanosecond timestamp to an RFC3339 timestamp.

time(v: 1568808000000000000)
// Returns 2019-09-18T12:00:00.000000000Z

RFC3339 to Unix nanosecond
Use the uint() function to convert an RFC3339 timestamp to a Unix nanosecond timestamp.

uint(v: 2019-09-18T12:00:00.000000000Z)
// Returns 1568808000000000000

计算两个时间戳之间的持续时间

Flux不支持使用时间类型值的数学运算。要计算两个时间戳之间的持续时间:

1.使用uint()函数将每个时间戳转换为Unix纳秒时间戳。

2.从另一个时间戳中减去一个Unix纳秒时间戳。

3.使用duration()函数将结果转换为duration。

time1 = uint(v: 2019-09-17T21:12:05Z)
time2 = uint(v: 2019-09-18T22:16:35Z)

duration(v: time2 - time1)
// Returns 25h4m30s

检索当前时间

Current UTC time
Use the now() function to return the current UTC time in RFC3339 format.

now()

Current system time
Import the system package and use the system.time() function to return the current system time of the host machine in RFC3339 format.

import "system"

system.time()

规范化不规则时间戳

要规范化不规则时间戳,请使用truncateMeColumn()函数将所有_time值截断为指定单位。这在join()pivot()操作中非常有用,其中点应按时间对齐,但时间戳略有不同。

data
    |> truncateTimeColumn(unit: 1m)

同时使用时间戳和持续时间

向时间戳添加持续时间
date.add() 将持续时间添加到指定时间并返回结果时间。

import "date"

date.add(d: 6h, to: 2019-09-16T12:00:00Z)

// Returns 2019-09-16T18:00:00.000000000Z

从时间戳中减去持续时间
date.sub()从指定时间中减去持续时间并返回结果时间。

import "date"

date.sub(d: 6h, from: 2019-09-16T12:00:00Z)

// Returns 2019-09-16T06:00:00.000000000Z

向前或向后移动时间戳

timeShift()函数的作用是:将指定的持续时间添加到时间列(_start、_stop、_time)中的每个值。

Shift forward in time:

from(bucket: "example-bucket")
    |> range(start: -5m)
    |> timeShift(duration: 12h)

Shift backward in time:

from(bucket: "example-bucket")
    |> range(start: -5m)
    |> timeShift(duration: -12h)

Query SQL data sources

Flux sql包提供了用于处理sql数据源的函数。sql.from()允许您查询SQL数据源,如PostgreSQL、MySQL、Snowflake、SQLite、Microsoft SQL Server、Amazon Athena和Google BigQuery,并将结果用于InfluxDB仪表盘、任务和其他操作。

  • 查询SQL数据源
  • 将SQL数据与InfluxDB中的数据连接
  • 使用SQL结果填充仪表板变量
  • 使用store secrets SQL数据库凭据
  • 采样传感器数据

查询SQL数据源

1.在Flux查询中导入sql包

2.使用sql.from()函数用于指定用于从SQL数据源查询数据的驱动程序、数据源名称(DSN)和查询:

import "sql"

sql.from(
    driverName: "mysql",
    dataSourceName: "root:123456@tcp(192.168.173.134:3306)/Forum0915",
    query: "SELECT * FROM category",
)

将SQL数据与InfluxDB中的数据连接

从InfluxDB查询SQL数据源的主要好处之一是能够使用存储在InfluxDB之外的数据丰富查询结果。

使用下面的空气传感器样本数据,以下查询将InfluxDB中存储的空气传感器度量与PostgreSQL中存储的传感器信息连接起来。联接数据允许您根据未存储在InfluxDB中的传感器信息查询和筛选结果。

// Import the "sql" package
import "sql"

// Query data from MySql
sensorInfo = sql.from(
    driverName: "mysql",
    dataSourceName: "root:123456@tcp(192.168.173.134:3306)/test",
    query: "SELECT * FROM sensors",
)

// Query data from InfluxDB
sensorMetrics = from(bucket: "example-bucket")
    |> range(start: -1y)
    |> filter(fn: (r) => r._measurement == "airSensors")

// Join InfluxDB query results with PostgreSQL query results
join(tables: {metric: sensorMetrics, info: sensorInfo}, on: ["sensor_id"])

使用SQL结果填充仪表板变量

使用sql.from()从SQL查询结果创建仪表板变量。以下示例使用下面的空气传感器采样数据创建一个变量,用于选择传感器的位置。

import "sql"

sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://localhost?sslmode=disable",
    query: "SELECT * FROM sensors",
)
    |> rename(columns: {location: "_value"})
    |> keep(columns: ["_value"])

使用store secrets SQL数据库凭据

如果SQL数据库需要身份验证,请使用InfluxDB secrets存储和填充连接凭据。默认情况下,InfluxDB base64在其内部键值存储BoltDB中编码并存储秘密。为了增加安全性,请将store secrets在Vault中。

# influxdb api
curl --request PATCH http://localhost:8086/api/v2/orgs/<org-id>/secrets \
  --header 'Authorization: Token YOURAUTHTOKEN' \
  --header 'Content-type: application/json' \
  --data '{
  "POSTGRES_HOST": "http://example.com",
  "POSTGRES_USER": "example-username",
  "POSTGRES_PASS": "example-password"
}'
# influx cli
# Syntax
influx secret update -k <secret-key>

# Example
influx secret update -k POSTGRES_PASS

Use secrets in your query

导入influxdata/influxdb/secrets包,并使用字符串插值在Flux查询中使用存储的secrets 填充连接凭据。

import "sql"
import "influxdata/influxdb/secrets"

POSTGRES_HOST = secrets.get(key: "POSTGRES_HOST")
POSTGRES_USER = secrets.get(key: "POSTGRES_USER")
POSTGRES_PASS = secrets.get(key: "POSTGRES_PASS")

sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://${POSTGRES_USER}:${POSTGRES_PASS}@${POSTGRES_HOST}",
    query: "SELECT * FROM sensors",
)

采样传感器数据

空气传感器采样数据和采样传感器信息模拟了一组传感器,这些传感器测量整个建筑物房间中的温度、湿度和一氧化碳。每个收集到的数据点都存储在InfluxDB中,并带有一个sensor_id标签,该标签标识其来自的特定传感器。样本传感器信息存储在PostgreSQL中。

样本数据包括:

从每个传感器收集并存储在InfluxDB中的空气传感器测量中的模拟数据:

温度 /湿度/有限公司

PostgreSQL中传感器表中存储的每个传感器的信息:

传感器id /地方/型号 /上次检查

Download sample air sensor data

  1. Create a bucket to store the data.

  2. Create an InfluxDB task and use the sample.data() function to download sample air sensor data every 15 minutes. Write the downloaded sample data to your new bucket:

    import "influxdata/influxdb/sample"
    
    option task = {name: "Collect sample air sensor data", every: 15m}
    
    sample.data(set: "airSensor")
        |> to(org: "example-org", bucket: "example-bucket")
    
  3. Query your target bucket after the first task run to ensure the sample data is writing successfully.

    from(bucket: "example-bucket")
        |> range(start: -1m)
        |> filter(fn: (r) => r._measurement == "airSensors")
    

Import the sample sensor information

  1. Download and install PostgreSQL.

  2. Download the sample sensor information CSV.

    Download sample sensor information

  3. Use a PostgreSQL client (psql or a GUI) to create the sensors table:

    CREATE TABLE sensors (
      sensor_id character varying(50),
      location character varying(50),
      model_number character varying(50),
      last_inspected date
    );
    
  4. Import the downloaded CSV sample data. Update the FROM file path to the path of the downloaded CSV sample data.

    COPY sensors(sensor_id,location,model_number,last_inspected)
    FROM '/path/to/sample-sensor-info.csv' DELIMITER ',' CSV HEADER;
    
  5. Query the table to ensure the data was imported correctly:

    SELECT * FROM sensors;
    

Regular expressions

当匹配大型数据集合中的模式时,正则表达式(regex)的功能非常强大。对于Flux,正则表达式主要用于谓词函数中的求值逻辑,例如筛选行、删除和保留列、状态检测等。本指南介绍如何在Flux脚本中使用正则表达式。

转到正则表达式语法

Flux提供了两个用于正则表达式的比较运算符。

=~
#  当左侧的表达式与右侧的正则表达式匹配时,计算结果为true。 
!~
#  当左侧的表达式与右侧的正则表达式不匹配时,计算结果为true。 

当在Flux脚本中使用正则表达式匹配时,请将正则表达式用/括起来。以下是基本的正则表达式比较语法:

Basic regex comparison syntax

expression =~ /regex/
expression !~ /regex/

Examples

Use a regex to filter by tag value

以下示例按cpu标记筛选记录。它只保存cpu为cpu0、cpu1或cpu2的记录。

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "cpu" and r.cpu =~ /cpu[0-2]$/)

Use a regex to filter by field key

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "mem" and r._field =~ /_percent/)

Drop columns matching a regex

from(bucket: "example-bucket")
    |> range(start: -15m)
    |> filter(fn: (r) => r._measurement == "mem")
    |> drop(fn: (column) => column !~ /_.*/)

Flux version

InfluxDB 2.4包括特定版本的Flux,可能支持也可能不支持文档化的Flux功能。了解您当前使用的Flux版本以及该特定版本支持哪些功能非常重要。

要查询随InfluxDB安装的Flux版本,请使用数组。from()创建表和运行时的特殊流。version()以使用通量版本填充列。

使用influx CLI或InfluxDB API在InfluxDB用户界面中运行以下查询:

import "array"
import "runtime"

array.from(rows: [{version: runtime.version()}])
Logo

宁波官方开源宣传和活动阵地,欢迎各位和我们共建开源生态体系!

更多推荐