日期和时间运算符
- 您可以对日期、时间和间隔数据类型执行算术运算。有关更多信息,请参阅
Amazon Managed Service for Apache Flink
SQL 参考中的
日期、时间戳和间隔运算符
。
SQL 函数
- 其中包括以下各项。有关更多信息,请参阅
Amazon Managed Service for Apache Flink SQL 参考
中的
日期和时间函数
。
CURRENT_TIMESTAMP
- 返回执行查询的时间戳 (UTC)。
LOCALTIME
- 返回 Kinesis Data Analytics 运行环境所定义执行查询的当前时间 (UTC)。
LOCALTIMESTAMP
- 返回 Kinesis Data Analytics 运行环境定义的当前时间戳 (UTC)。
SQL 扩展
- 其中包括以下各项。有关更多信息,请参阅
Amazon Managed Service for Apache Flink SQL 参考
中的
日期和时间函数
以及
日期时间转换函数
。
TIMESTAMP_TO_CHAR
- 将时间戳转换为字符串。
上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如,您可以指定采用格式
yyyy-MM-dd hh:mm:ss
将输入字符串
2009-09-16 03:15:24
转换为时间戳。有关更多信息,请参阅
Amazon Managed Service for Apache Flink SQL 参考
中的
字符到时间戳 (Sys)
。
在本示例中,您将以下记录写入到 Amazon Kinesis 数据流中。
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
然后,您在控制台上创建一个 Kinesis Data Analytics 应用程序,并将 Kinesis 流作为流式传输源。发现过程读取流式传输源中的示例记录,并推断出具有两个列 (EVENT_TIME 和 TICKER) 的如下应用程序内部架构。
创建一个 Amazon Kinesis Data Stream 并填充事件时间和股票代码记录,如下所示:
登录 AWS Management Console 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com
-
在导航窗格中,选择 数据流。
选择 创建 Kinesis 流,然后创建带有一个分片的流。
运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。
import datetime
import json
import random
import boto3
STREAM_NAME = "ExampleInputStream"
def get_data():
return {
"EVENT_TIME": datetime.datetime.now().isoformat(),
"TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
"PRICE": round(random.random() * 100, 2),
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
if __name__ == "__main__":
generate(STREAM_NAME, boto3.client("kinesis"))
按如下方式创建应用程序:
在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com
-
选择 创建应用程序,键入应用程序名称,然后选择 创建应用程序。
在应用程序详细信息页面上,选择 连接流数据,以连接到源。
在 连接到源 页面上,执行以下操作:
选择在上一部分中创建的流。
选择以创建 IAM 角色。
选择 发现架构。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。
选择 编辑架构。将 EVENT_TIME 列的 列类型 更改为 TIMESTAMP。
选择 保存架构并更新流示例。在控制台保存架构后,选择 退出。
选择 保存并继续。
在应用程序详细信息页面上,选择 转到 SQL编辑器。要启动应用程序,请在显示的对话框中选择 是,启动应用程序。
在 SQL 编辑器中编写应用程序代码并确认结果,如下所示:
复制下面的应用程序代码并将其粘贴到编辑器中。
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
TICKER VARCHAR(4),
event_time TIMESTAMP,
five_minutes_before TIMESTAMP,
event_unix_timestamp BIGINT,
event_timestamp_as_char VARCHAR(50),
event_second INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
TICKER,
EVENT_TIME,
EVENT_TIME - INTERVAL '5' MINUTE,
UNIX_TIMESTAMP(EVENT_TIME),
TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
EXTRACT(SECOND FROM EVENT_TIME)
FROM "SOURCE_SQL_STREAM_001"