使用SingleStore作为时间序列数

中科白癜风医院微博 https://auto.qingdaonews.com/content/2018-06/19/content_20138493.htm

译者:杨晓娟

摘要

SingleStore是一个非常通用的数据库系统。它基于关系型技术,支持多模型功能,如键值、JSON、全文搜索、地理空间和时间序列。

本文将使用Kaggle的历史SP股票数据来探索SingleStore对时间序列数据的支持。我们还将构建一个快速仪表板,使用Streamlit可视化烛台图。

本文中使用的SQL脚本、Python代码和笔记本文件可在GitHub上获得,支持DBC、HTML和iPython格式。

介绍

自关系数据库技术出现以来,许多管理数据的新需求应运而生。马丁·福勒(MartinFowler)等知名人士提出了混合持久化(PolyglotPersistence)作为管理各种数据和数据处理需求的一种解决方案,如图1所示。

然而,混合持久化是有代价的,并招致了非议,比如:

在一篇经常被引用的混合持久化帖子中,马丁·福勒为一家虚构的零售商绘制了一个Web应用程序,该应用程序使用Riak、Neo4j、MongoDB、Cassandra和一个RDBMS来处理不同的数据集。不难想象,他的零售商的DevOps工程师会一个接一个地辞职。

—斯蒂芬·皮门特尔(StephenPimentel)

此外:

我过去曾看到,如果你尝试采用其中的六种[技术],你至少需要18名员工来操作存储端——就是说,六种存储技术。那样是不可扩展的,而且成本太高。

—大卫·麦克罗里(DaveMcCrory)

近年来,也有一些使用微服务来实现混合持久化架构的建议。但是,SingleStore可以通过在单个多模型数据库系统中支持不同的数据类型和处理需求来提供更简单的解决方案。这带来了许多好处,例如更低的TCO(总拥有成本)、开发人员学习多种产品的负担更少、没有集成的麻烦等等。我们将在一系列文章中更详细地讨论SingleStore的多模型功能,现在则从时间序列数据开始。

首先,我们需要在SingleStore网站上创建一个免费托管服务帐户,并在Databricks网站上创建一个免费社区版(CE)帐户。在撰写本文时,SingleStore的托管服务帐户附带美元的积分,这对于本文中描述的案例研究来说绰绰有余。对于DatabricksCE,我们不要注册试用版而是注册免费帐户。在之前的文章中,我们指出Spark非常适合使用SingleStore进行ETL,所以这也是此处使用Spark的原因。

如果你没有Kaggle帐户,请创建一个并下载all_stocks_5yr.csv文件。Kaggle网站声明该文件大小为29.58MB。数据集由以下字段组成:

date:从年2月8日到年2月7日的五年每日期间。没有缺失值。

open:开盘价。11个缺失值。

high:最高价。8个缺失值。

low:最低价。8个缺失值。

close:收盘价。没有缺失值。

volume:成交量。没有缺失值。

name:交易代码。个唯一值。没有缺失值。

在开始阶段,我们会用到date、close和name信息。

配置DatabricksCE

文章给出了有关如何配置DatabricksCE以及和SingleStore一起使用的详细说明,我们可以在这个用例中使用它们。

上传CSV文件

要使用CSV文件,我们需要将其上传到DatabricksCE环境。文章提供了有关如何上传CSV文件的详细说明,我们可以在这个用例中使用它们。

创建数据库表

在我们的SingleStore托管服务帐户中,使用SQL编辑器新建一个timeseries_db数据库。如下所示:

复制

SQL:CREATEDATABASEIFNOTEXISTStimeseries_db;1.2.

再创建一个表,如下所示:

复制

SQL:USEtimeseries_db;CREATEROWSTORETABLEIFNOTEXISTStick(tsDATETIMESERIESTIMESTAMP,symbolVARCHAR(5),priceNUMERIC(18,4),KEY(ts));1.2.3.4.5.6.7.8.

每行有一个叫作ts的时间值属性。我们使用DATETIME而不是DATETIME(6),因为在本例中我们不使用小数秒。SERIESTIMESTAMP将表列指定为默认时间戳。在ts上创建一个KEY,因为这能让我们高效地筛选值的范围。

填写笔记本

现在新建一个DatabricksCEPython笔记本,名为DataLoaderforTimeSeries。把新笔记本附加到Spark集群上。

在一个新的代码单元中,添加以下代码:

复制

Python:frompyspark.sql.typesimport*tick_schema=StructType([StructField("ts",TimestampType(),True),StructField("open",DoubleType(),True),StructField("high",DoubleType(),True),StructField("low",DoubleType(),True),StructField("price",DoubleType(),True),StructField("volume",IntegerType(),True),StructField("symbol",StringType(),True)])1.2.3.4.5.6.7.8.9.10.11.12.

此模式确保我们有正确的列类型。

在下一个代码单元格中新建一个Dataframe,如下所示:

复制

Python:tick_df=spark.read.csv("/FileStore/all_stocks_5yr.csv",header=True,schema=tick_schema)1.2.3.4.

这会读取CSV文件并创建一个名为tick_df的Dataframe。我们还告诉Spark有一个标题行,并要求它使用前面定义的模式。

在下一个代码单元中,我们获取行数:

复制

Python:tick_df.count()1.2.

执行此操作,得到数值。

根据先前的初步分析决定,我们删除掉一些列,如下所示:

复制

Python:tick_df=tick_df.drop("open","high","low","volume")1.2.

并对数据进行排序:

复制

Python:tick_df=tick_df.sort("ts","symbol")1.2.

在下一个代码单元中,我们查看一下Dataframe的结构:

复制

Python:tick_df.show(10)1.2.

输出如下所示:

复制

PlainText:+-------------------+-------+------+

ts

price

symbol

+-------------------+-------+------+

-02-:00:00

45.08

A

-02-:00:00

14.75

AAL

-02-:00:00

78.9

AAP

-02-:00:00

67.

AAPL

-02-:00:00

36.25

ABBV

-02-:00:00

46.89

ABC

-02-:00:00

34.41

ABT

-02-:00:00

73.31

ACN

-02-:00:00

39.12

ADBE

-02-:00:00

45.7

ADI

+-------------------+-------+------+onlyshowingtop10rows1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

现在准备将Dataframe写到SingleStore。在下一个代码单元中,可以添加以下内容:

复制

Python:%run./Setup1.2.

在Setup笔记本中,需要确保已为SingleStore托管服务集群添加了服务器地址和密码。

在下一代码单元中,我们将为SingleStoreSpark连接器设置一些参数,如下所示:

复制

Python:spark.conf.set("spark.datasource.singlestore.ddlEndpoint",cluster)spark.conf.set("spark.datasource.singlestore.user","admin")spark.conf.set("spark.datasource.singlestore.password",password)spark.conf.set("spark.datasource.singlestore.disablePushdown","false")1.2.3.4.5.

最后,准备使用SparkConnector将Dataframe写入SingleStore

复制

Python:(tick_df.write.format("singlestore").option("loadDataCompression","LZ4").mode("ignore").save("timeseries_db.tick"))1.2.3.4.5.6.

这会将Dataframe写入timeseries_db数据库中的tick表。可以从SingleStore检查该表是否已成功填充。

示例查询

现在我们已经构建了系统,可以运行一些查询了。SingleStore支持一系列处理时间序列数据的有用函数。我们来看一些例子。

平均值

以下查询说明了如何计算表中全部时间序列值的简单平均值:

复制

SQL:SELECTsymbol,AVG(price)FROMtickGROUPBYsymbolORDERBYsymbol;1.2.3.4.5.

输出应该是:

复制

PlainText:+--------+---------------+

symbol

AVG(price)

+--------+---------------+

A

49.

AAL

38.

AAP

.

AAPL

.

ABBV

60.

......1.2.3.4.5.6.7.8.9.10.

时间分段

时间分段可以按固定时间间隔对不同时间序列的数据进行聚合和分组。SingleStore支持几种函数:

FIRST:与最小时间戳关联的值。此文档包含其他详细信息和示例。

LAST:与最大时间戳关联的值。此文档包含其他详细信息和示例。

TIME_BUCKET:将时间标准化为最近的存储段的开始时间。此文档包含其他详细信息和示例。

例如,可以使用TIME_BUCKET查询以五天为时间间隔进行分组的平均时间序列值,如下所示:

复制

SQL:SELECTsymbol,TIME_BUCKET("5d",ts),AVG(price)FROMtickWHEREsymbol="AAPL"GROUPBY1,2ORDERBY1,2;1.2.3.4.5.6.

输出应该是:

复制

PlainText:+--------+-----------------------+--------------+

symbol

TIME_BUCKET("5d",ts)

AVG(price)

+--------+-----------------------+--------------+

AAPL

-02-:00:00.0

67.

AAPL

-02-:00:00.0

66.

AAPL

-02-:00:00.0

64.

AAPL

-02-:00:00.0

63.

AAPL

-02-:00:00.0

61.

.........1.2.3.4.5.6.7.8.9.10.

还可以结合这些函数来创建烛台图,显示股票随时间的最高价、最低价、开盘价和收盘价,以五天为一个窗口单位,如下所示:

复制

SQL:SELECTTIME_BUCKET("5d")ASts,symbol,MIN(price)ASlow,MAX(price)AShigh,FIRST(price)ASopen,LAST(price)AScloseFROMtickWHEREsymbol="AAPL"GROUPBY2,1ORDERBY2,1;1.2.3.4.5.6.7.8.9.10.11.

输出应该是:

复制

PlainText:+------------+--------+----------+----------+----------+----------+

ts

symbol

low

high

open

close

+------------+--------+----------+----------+----------+----------+

-02-08

AAPL

66.

68.

67.

66.

-02-13

AAPL

65.

66.

66.

65.

-02-18

AAPL

63.

65.

65.

64.

-02-23

AAPL

63.

64.

63.

63.

-02-28

AAPL

60.

63.

63.

60.

..................1.2.3.4.5.6.7.8.9.10.

平滑

可以使用AVG对窗口进行聚合来平滑时间序列数据。下面是一个示例,查看价格和过去三个分时价格的移动均线:

复制

SQL:SELECTsymbol,ts,price,AVG(price)OVER(ORDERBYtsROWSBETWEEN3PRECEDINGANDCURRENTROW)ASsmoothed_priceFROMtickWHEREsymbol="AAPL";1.2.3.4.5.

输出应该是:

复制

PlainText:+--------+-----------------------+----------+----------------+

symbol

ts

price

smoothed_price

+--------+-----------------------+----------+----------------+

AAPL

-02-:00:00.0

67.

67.0000

AAPL

-02-:00:00.0

68.

68.

AAPL

-02-:00:00.0

66.

67.

AAPL

-02-:00:00.0

66.

67.49300

AAPL

-02-:00:00.0

66.

67.19380

............1.2.3.4.5.6.7.8.9.10.

截至

查找截至某个时间点的当前表行也是常见的时间序列需求。这可以用ORDERBY和LIMIT轻松实现。下面是一个例子:

复制

SQL:SELECT*FROMtickWHEREts="-10-:00:00"ANDsymbol="AAPL"ORDERBYtsDESCLIMIT1;1.2.3.4.5.6.7.

输出应该是:

复制

PlainText:+-----------------------+--------+----------+

ts

symbol

price

+-----------------------+--------+----------+

-02-:00:00.0

AAPL

.

+-----------------------+--------+----------+1.2.3.4.5.6.

插值

时间序列数据可能存在缺值。我们可以插入缺失的点。SingleStore文档提供了一个示例存储过程,可在处理tick数据时用于此目的。

加分:Streamlit可视化

之前提到过烛台图,如果能以图形而不是表格的形式看到这些图表就太好了,而使用Streamlit可以轻松做到这一点。文章展示了我们可以轻松地将Streamlit连接到SingleStore。

安装所需软件

我们需要安装以下软件包:

复制

PlainText:streamlitpandasplotlyPymysql1.2.3.4.5.

这些可以在GitHub上的requirements.txt文件中找到。运行文件如下:

复制

Shell:pipinstall-rrequirements.txt1.2.

示例应用程序

以下是streamlit_app.py的完整代码清单:

复制

Python:#streamlit_app.pyimportstreamlitasstimportpandasaspdimportplotly.graph_objectsasgoimportpymysql#Initializeconnection.definit_connection():returnpymysql.connect(**st.secrets["singlestore"])conn=init_connection()symbol=st.sidebar.text_input("Symbol",value="AAPL",max_chars=None,key=None,type="default")num_days=st.sidebar.slider("Numberofdays",2,30,5)#Performquery.data=pd.read_sql("""SELECTTIME_BUCKET(%s)ASday,symbol,MIN(price)ASlow,MAX(price)AShigh,FIRST(price)ASopen,LAST(price)AScloseFROMtickWHEREsymbol=%sGROUPBY2,1ORDERBY2,1;""",conn,params=(str(num_days)+"d",symbol.upper()))st.subheader(symbol.upper())fig=go.Figure(data=[go.Candlestick(x=data["day"],open=data["open"],high=data["high"],low=data["low"],close=data["close"],name=symbol,)])fig.update_xaxes(type="category")fig.update_layout(height=)st.plotly_chart(fig,use_container_width=True)st.write(data)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.

创建机密文件

本地Streamlit应用程序会从应用程序的根目录读取机密文件.streamlit/secrets.toml。需要按如下方式创建这个文件:

复制

PlainText:#.streamlit/secrets.toml[singlestore]host="TODO"port=database="timeseries_db"user="admin"password="TODO"1.2.3.4.5.6.7.8.9.

主机和密码的应替换为在创建集群时从SingleStore托管服务获取的相应值。

运行代码

可按如下方式运行Streamlit应用程序:

复制

Shell:streamlitrunstreamlit_app.py1.2.

在Web浏览器中的输出应如图2所示。

在网页上,可以在文本框中输入一个新的股票代码,并使用滑块来更改TIME_BUCKET的天数。随意尝试代码以满足您的需求。

总结

本文展示了SingleStore是处理时间序列数据的有效解决方案。利用SQL和内置函数的强大功能,我们可以实现很多目标。通过添加的FIRST、LAST和TIME_BUCKET,SingleStore扩展了对时间序列的支持。




转载请注明:http://www.aierlanlan.com/rzdk/2884.html