Skip to content

Commit ecb7fb3

Browse files
Pykx module (#43)
* add pykx module * remove azure serverless experiment * update all IDs to be generic
1 parent 80751ec commit ecb7fb3

File tree

10 files changed

+616
-0
lines changed

10 files changed

+616
-0
lines changed

Diff for: examples/test_pykx/.terraform.lock.hcl

+39
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: examples/test_pykx/init.tf

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
terraform {
2+
backend "s3" {
3+
bucket = "blueprints-pykx-rp"
4+
key = "test-pykx.tfstate"
5+
region = "us-east-1"
6+
}
7+
required_providers {
8+
databricks = {
9+
source = "databricks/databricks"
10+
version = "~>1.23.0"
11+
}
12+
aws = {
13+
source = "hashicorp/aws"
14+
version = "~>4.54.0"
15+
}
16+
}
17+
}
18+
19+
provider "aws" {
20+
region = "us-east-1"
21+
}

Diff for: examples/test_pykx/main.tf

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
// Create Databricks-compliant VPC
3+
module "pykx" {
4+
source = "../../modules/pykx/"
5+
aws_spoke_databricks_username = var.aws_spoke_databricks_username
6+
aws_spoke_databricks_password = var.aws_spoke_databricks_password
7+
aws_spoke_ws_url = var.aws_spoke_ws_url
8+
aws_region = var.aws_region
9+
}
10+
11+
output "module_workspace_url" {
12+
value = module.pykx.job_url
13+
}

Diff for: examples/test_pykx/vars.tf

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
variable "aws_spoke_ws_url" {}
3+
4+
variable "aws_spoke_databricks_username" {
5+
}
6+
variable "aws_spoke_databricks_password" {
7+
}
8+
9+
variable "aws_region" {}
10+
11+
locals {
12+
prefix = "fs-lakehouse"
13+
}
14+
15+
locals {
16+
tags = { "org" = "fsi" }
17+
}
18+
19+
locals {
20+
ext_s3_bucket = "${local.prefix}-ext-bucket"
21+
}

Diff for: modules/pykx/README.md

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Databricks PyKX Time Series Processing Automation with Terraform
2+
3+
This repository contains Terraform configurations for automating the process of reading KDB data, converting it to Parquet format, and processing it using Databricks and Pandas UDF functions. The implementation leverages PyKX for efficient KDB interaction and Databricks for scalable data processing.
4+
5+
## Overview
6+
7+
The Terraform scripts in this repository automate the creation and configuration of Databricks notebooks and jobs for handling time-series data. The process involves:
8+
9+
1. **Reading Data from KDB**: The first notebook (`read_kdb_save_parquet.py`) reads data from a KDB database.
10+
2. **Converting to Parquet**: The data is then converted to Parquet format for efficient processing.
11+
3. **Time Series Merge and Processing**: The second notebook (`ts_compute_delta_lake.py`) performs time series data merging and processing using Pandas UDFs.
12+
4. **Databricks Job Configuration**: A Databricks workflow is configured to execute these notebooks in sequence, using a shared Databricks jobs cluster.
13+
14+
## Terraform Resources and Modules
15+
16+
### Data Sources
17+
18+
- `databricks_current_user`: Determines the current user's information for path configurations.
19+
- `databricks_spark_version`: Fetches the latest Spark version available in Databricks.
20+
- `databricks_node_type`: Retrieves the smallest node type with a local disk for cost efficiency.
21+
22+
### Notebook Resources
23+
24+
Two Databricks notebooks are created using `databricks_notebook` resources:
25+
26+
1. `read_kdb_save_parquet.py`: Located at `/Users/<your email>/01-Load-PyKX-Delta`.
27+
2. `ts_compute_delta_lake.py`: Located at `/Users/<your email>/02-Merge-Q-Time-Series`.
28+
29+
These notebooks are populated with the respective Python scripts from the local module path.
30+
31+
### Databricks Job
32+
33+
The `databricks_job` resource, `process_time_series_liquid_cluster`, is configured to run the above notebooks sequentially on a Databricks cluster. Key configurations include:
34+
35+
- **Job Cluster**: Utilizes the latest Spark version, `r6i.xlarge` nodes, and enables elastic disk with SPOT instances for cost optimization.
36+
- **Tasks**: The job comprises two tasks, `load_data` and `process_data`, each corresponding to a notebook. The `process_data` task depends on the completion of `load_data`.
37+
- **Libraries**: Each task includes necessary Python libraries such as `numpy`, `pandas`, `pyarrow`, `pykx`, `pytz`, and `toml`.
38+
39+
### Output
40+
41+
- `job_url`: Outputs the URL of the created Databricks job, providing easy access to monitor and manage the job execution.
42+
43+
44+
45+
## Terraform configuration:
46+
47+
* Clone the Repository: Clone this repository to your local machine.
48+
49+
* Configure Terraform Variables: Set the required variables in a terraform.tfvars file. This file should be located outside the GitHub project for security reasons.
50+
51+
* Initialize Terraform: Run terraform init to initialize the working directory containing Terraform configuration files.
52+
53+
* Apply Configuration: Execute terraform apply -var-file="<path_to_your_tfvars_file>/terraform.tfvars" to create the resources in your Databricks workspace.
54+
55+
Ensure you have Terraform installed and configured with the necessary provider credentials to interact with your Databricks and AWS environments.
56+
57+
### Key Features
58+
59+
* Automated Data Pipeline: Streamlines the process of reading, converting, and processing time-series data from KDB to Databricks.
60+
* Scalable and Cost-Effective: Utilizes Databricks' scalable infrastructure with cost-effective options like SPOT instances.
61+
* Sequential Task Execution: Ensures orderly processing by configuring task dependencies within the Databricks job.
62+
* Library Management: Automates the installation of required Python libraries for data processing.
63+
64+
### Prerequisites
65+
* Databricks workspace with necessary permissions.
66+
* Terraform installed and configured.
67+
* Access to a KDB database and appropriate credentials for data extraction.

Diff for: modules/pykx/main.tf

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
data "databricks_current_user" "me" {
2+
}
3+
4+
data "databricks_spark_version" "latest" {}
5+
data "databricks_node_type" "smallest" {
6+
local_disk = true
7+
}
8+
9+
resource "databricks_notebook" "load" {
10+
provider = databricks.spoke_aws_workspace
11+
source = "${path.module}/read_kdb_save_parquet.py"
12+
path = "${data.databricks_current_user.me.home}/01-Load-PyKX-Delta"
13+
}
14+
15+
resource "databricks_notebook" "time_series_merge" {
16+
provider = databricks.spoke_aws_workspace
17+
source = "${path.module}/ts_compute_delta_lake.py"
18+
path = "${data.databricks_current_user.me.home}/02-Merge-Q-Time-Series"
19+
}
20+
21+
22+
resource "databricks_job" "process_time_series_liquid_cluster" {
23+
provider = databricks.spoke_aws_workspace
24+
name = "Databricks PyKX Time Series Merge into Delta Lake (${data.databricks_current_user.me.alphanumeric})"
25+
26+
job_cluster {
27+
job_cluster_key = "shared_cap_markets"
28+
new_cluster {
29+
spark_version = data.databricks_spark_version.latest.id
30+
node_type_id = "r6i.xlarge"
31+
enable_elastic_disk = true
32+
num_workers = 1
33+
aws_attributes {
34+
availability = "SPOT"
35+
}
36+
data_security_mode = "SINGLE_USER"
37+
custom_tags = { "clusterSource" = "lakehouse-blueprints" }
38+
}
39+
}
40+
41+
task {
42+
task_key = "load_data"
43+
notebook_task {
44+
notebook_path = databricks_notebook.load.path
45+
}
46+
47+
job_cluster_key = "shared_cap_markets"
48+
49+
library {
50+
pypi {
51+
package = "numpy~=1.22"
52+
}
53+
}
54+
55+
library {
56+
pypi {
57+
package = "pandas>=1.2"
58+
}
59+
}
60+
library {
61+
pypi {
62+
package = "pyarrow>=3.0.0"
63+
}
64+
}
65+
66+
library {
67+
pypi {
68+
package = "pykx==2.1.1"
69+
}
70+
}
71+
72+
library {
73+
pypi {
74+
package = "pytz>=2022.1"
75+
}
76+
}
77+
78+
library {
79+
pypi {
80+
package = "toml~=0.10.2"
81+
}
82+
}
83+
}
84+
85+
task {
86+
task_key = "process_data"
87+
//this task will only run after task a
88+
depends_on {
89+
task_key = "load_data"
90+
}
91+
92+
notebook_task {
93+
notebook_path = databricks_notebook.time_series_merge.path
94+
}
95+
96+
job_cluster_key = "shared_cap_markets"
97+
98+
library {
99+
pypi {
100+
package = "numpy~=1.22"
101+
}
102+
}
103+
104+
library {
105+
pypi {
106+
package = "pandas>=1.2"
107+
}
108+
}
109+
library {
110+
pypi {
111+
package = "pyarrow>=3.0.0"
112+
}
113+
}
114+
115+
library {
116+
pypi {
117+
package = "pykx==2.1.1"
118+
}
119+
}
120+
121+
library {
122+
pypi {
123+
package = "pytz>=2022.1"
124+
}
125+
}
126+
127+
library {
128+
pypi {
129+
package = "toml~=0.10.2"
130+
}
131+
}
132+
}
133+
134+
}
135+
136+
137+
output "job_url" {
138+
value = databricks_job.process_time_series_liquid_cluster.id
139+
}

Diff for: modules/pykx/read_kdb_save_parquet.py

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Databricks notebook source
2+
# MAGIC %md
3+
# MAGIC
4+
# MAGIC ## Create Delta Lake Objects Governed by Unity Catalog
5+
# MAGIC
6+
# MAGIC This particular workflow shows parquet file generation directly from a `q` table in KDB. All tables saved in the cap_markets catalog, which can be inspected for all table definitions, lineage, and access controls.
7+
8+
# COMMAND ----------
9+
10+
# MAGIC %sql create catalog if not exists cap_markets; create schema if not exists q_on_dbx
11+
12+
# COMMAND ----------
13+
14+
# MAGIC %sql use catalog cap_markets; use schema q_on_dbx
15+
16+
# COMMAND ----------
17+
18+
# MAGIC %fs mkdirs /rp_ts
19+
20+
# COMMAND ----------
21+
22+
# MAGIC %md
23+
# MAGIC
24+
# MAGIC ### Create KDB Table and Save to Parquet
25+
26+
# COMMAND ----------
27+
28+
import os
29+
os.environ['QLIC'] = '/dbfs/tmp/license/'
30+
os.environ['QARGS'] = '-s 4'
31+
import pandas as pd
32+
import pykx as kx
33+
import os
34+
35+
36+
# COMMAND ----------
37+
38+
# MAGIC %%q
39+
# MAGIC
40+
# MAGIC tbl: ([] sym: `symbol$(); int_f: `int$(); float_f: `float$(); real_f: `real$(); byte_f: `byte$(); char_f: `char$(); timestamp_f: `timestamp$(); month_f: `month$(); date_f: `date$(); time_f: `time$(); minute_f: `minute$(); second_f: `second$(); time_f: `time$())
41+
# MAGIC
42+
# MAGIC tbl
43+
44+
# COMMAND ----------
45+
46+
# MAGIC %%q
47+
# MAGIC
48+
# MAGIC tbl,: enlist[`a; 1; 1.1; 1e; 0x01; "a"; .z.p; 2023.01m; 2023.01.01; 12:00:00.000; 12:00; 12:00:00; 12:00:00.000]
49+
# MAGIC tbl,: enlist[`b; 2; 2.2; 2e; 0x02; "b"; .z.p; 2023.02m; 2023.02.02; 13:00:00.000; 13:00; 13:00:00; 13:00:00.000]
50+
# MAGIC tbl,: enlist[`c; 3; 3.3; 3e; 0x03; "c"; .z.p; 2023.03m; 2023.03.03; 14:00:00.000; 14:00; 14:00:00; 14:00:00.000]
51+
# MAGIC tbl,: enlist[`d; 4; 4.4; 4e; 0x04; "d"; .z.p; 2023.04m; 2023.04.04; 15:00:00.000; 15:00; 15:00:00; 15:00:00.000]
52+
# MAGIC tbl,: enlist[`e; 5; 5.5; 5e; 0x05; "e"; .z.p; 2023.05m; 2023.05.05; 16:00:00.000; 16:00; 16:00:00; 16:00:00.000]
53+
# MAGIC
54+
55+
# COMMAND ----------
56+
57+
# MAGIC %%q
58+
# MAGIC
59+
# MAGIC tbl
60+
61+
# COMMAND ----------
62+
63+
import pyarrow as pa
64+
import pyarrow.parquet as pq
65+
import pykx as kx
66+
67+
# Fetch data from Kdb+
68+
kdb_table = kx.q('tbl') # Replace 'tbl' with your Kdb+ table name
69+
70+
71+
arrow_table = kdb_table.pa()
72+
73+
# Step 4: Write the modified table to the final Parquet file
74+
# In order to avoid downcasting errors, we need to use the int96_timestamp configuration as shown below
75+
pq.write_table(arrow_table, '/dbfs/rp_ts/output.parquet', use_deprecated_int96_timestamps=True)
76+
77+
# COMMAND ----------
78+
79+
display(spark.read.format("parquet").load("dbfs:/rp_ts/output.parquet"))
80+
81+
# COMMAND ----------
82+
83+
spark.read.format("parquet").load("dbfs:/rp_ts/output.parquet").write.saveAsTable("cap_markets.q_on_dbx.sample_dbx_table")

0 commit comments

Comments
 (0)