Ophelian Spark ML Functions

NullDebug Class

The null_clean method cleans and debugs null values in a Spark DataFrame within a determined offset of proportionality.

Parameters

  • self: DataFrame object
  • partition_by: str, name of the partition column
  • offset: float, offset controller

Returns

  • A new Spark DataFrame without columns that had more Nulls than the set offset

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import NullDebug

# Create a sample DataFrame
data = {'partition': ['A', 'B', 'C', 'A', 'B', 'C'], 'values': [1, None, 3, None, 5, 6]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Clean null values
clean_df = NullDebug.null_clean(sdf, 'partition', 0.5)
clean_df.show()

CorrMat Class

The correlation_matrix method computes the correlation matrix for a given DataFrame.

Parameters

  • self: DataFrame object
  • pivot_col: str, column name for pivoting
  • method: str, method for correlation (default is "pearson")
  • offset: float, threshold for filtering correlations
  • rep: int, number of partitions for the computation

Returns

  • A DataFrame with correlation values

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import CorrMat

# Create a sample DataFrame
data = {'pivot': [1, 2, 3, 4], 'value1': [1.0, 2.0, 3.0, 4.0], 'value2': [2.0, 4.0, 6.0, 8.0]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Compute correlation matrix
corr_df = CorrMat.correlation_matrix(sdf, 'pivot')
corr_df.show()

Shape Class

The shape method returns the shape of the DataFrame.

Parameters

  • self: DataFrame object

Returns

  • Tuple representing the shape of the DataFrame

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Shape

# Create a sample DataFrame
data = {'col1': [1, 2, 3], 'col2': [4, 5, 6]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Get shape of DataFrame
shape = Shape.shape(sdf)
print(shape)

Rolling Class

The rolling_down method performs a rolling operation on a column.

Parameters

  • self: DataFrame object
  • op_col: str, column name to apply the operation on
  • nat_order: str, column name for natural ordering
  • min_p: int, minimum periods (default is 2)
  • window: int, window size (default is 2)
  • method: str, method of rolling operation (default is "sum")

Returns

  • A DataFrame with the rolling operation applied

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Rolling

# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'value': [1, 2, 3, 4, 5]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Apply rolling operation
rolling_df = Rolling.rolling_down(sdf, 'value', 'date')
rolling_df.show()

DynamicSampling Class

The sample_n method samples n rows from the DataFrame.

Parameters

  • self: DataFrame object
  • n: int, number of samples

Returns

  • A DataFrame with n sampled rows

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import DynamicSampling

# Create a sample DataFrame
data = {'col1': [1, 2, 3, 4, 5], 'col2': [6, 7, 8, 9, 10]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Sample 2 rows
sampled_df = DynamicSampling.sample_n(sdf, 2)
sampled_df.show()

Selects Class

The select_regex method selects columns based on a regular expression.

Parameters

  • self: DataFrame object
  • reg_expr: str, regular expression pattern

Returns

  • A DataFrame with selected columns

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Selects

# Create a sample DataFrame
data = {'col1': [1, 2], 'col2': [3, 4], 'col3': [5, 6]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Select columns with regex
selected_df = Selects.select_regex(sdf, 'col[1-2]')
selected_df.show()

MapItem Class

The map_item method maps column values based on a given expression.

Parameters

  • self: DataFrame object
  • map_expr: dict, mapping expression
  • origin_col: str, original column name

Returns

  • A DataFrame with mapped values

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import MapItem

# Create a sample DataFrame
data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Map column values
mapping_expr = {1: 'one', 2: 'two', 3: 'three'}
mapped_df = MapItem.map_item(sdf, mapping_expr, 'col1')
mapped_df.show()

Reshape Class

The narrow_format method reshapes the DataFrame from wide to narrow format.

Parameters

  • self: DataFrame object
  • fix_cols: str, fixed column name
  • new_cols: list, new column names

Returns

  • A reshaped DataFrame

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Reshape

# Create a sample DataFrame
data = {'id': [1, 2], 'val1': [3, 4], 'val2': [5, 6]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Reshape to narrow format
narrow_df = Reshape.narrow_format(sdf, 'id', ['variable', 'value'])
narrow_df.show()

PctChange Class

The pct_change method calculates the percentage change between rows.

Parameters

  • self: DataFrame object
  • periods: int, number of periods to shift (default is 1)
  • order_by: str, column name for ordering
  • pct_cols: list, columns for percentage change
  • partition_by: str, column name for partitioning
  • desc: bool, descending order (default is False)

Returns

  • A DataFrame

with percentage changes

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import PctChange

# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'value': [1, 2, 3, 4, 5]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate percentage change
pct_df = PctChange.pct_change(sdf, order_by='date', pct_cols=['value'])
pct_df.show()

CrossTabular Class

The cross_pct method calculates the cross-tabulation percentage.

Parameters

  • self: DataFrame object
  • group_by: str, column name for grouping
  • pivot_col: str, column name for pivoting
  • agg_dict: dict, aggregation dictionary
  • operand: str, operand for calculation (default is "sum")
  • cols: str, columns to include (default is "all")

Returns

  • A DataFrame with cross-tabulation percentages

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import CrossTabular

# Create a sample DataFrame
data = {'group': ['A', 'B', 'A', 'B'], 'pivot': [1, 2, 1, 2], 'value': [10, 20, 30, 40]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate cross-tabulation percentage
cross_pct_df = CrossTabular.cross_pct(sdf, 'group', 'pivot', {'value': 'sum'})
cross_pct_df.show()

Joins Class

The join_small_right method performs a broadcast join on a small right DataFrame.

Parameters

  • self: DataFrame object
  • small_df: DataFrame to join
  • on: str, column name for joining
  • how: str, join type

Returns

  • A joined DataFrame

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Joins

# Create sample DataFrames
data1 = {'col1': [1, 2], 'col2': [3, 4]}
data2 = {'col1': [1, 2], 'col3': [5, 6]}
df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)

# Perform broadcast join
joined_df = Joins.join_small_right(sdf1, sdf2, 'col1', 'inner')
joined_df.show()

DaskSpark Class

The spark_to_dask method converts a Spark DataFrame to a Dask DataFrame.

Parameters

  • self: DataFrame object
  • option: str, file format option (default is "csv")
  • mode: str, write mode (default is "overwrite")
  • checkpoint_path: str, checkpoint path

Returns

  • A Dask DataFrame

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import DaskSpark

# Create a sample DataFrame
data = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Convert Spark DataFrame to Dask DataFrame
dask_df = DaskSpark.spark_to_dask(sdf)
print(dask_df.compute())

SortinoRatioCalculator Class

The sortino_ratio method calculates the Sortino ratio.

Parameters

  • self: DataFrame object

Returns

  • The Sortino ratio

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import SortinoRatioCalculator

# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'close': [100, 102, 104, 103, 105]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate Sortino ratio
sortino_calculator = SortinoRatioCalculator(sdf)
sortino_ratio = sortino_calculator.sortino_ratio()
print(sortino_ratio)

SharpeRatioCalculator Class

The calculate method calculates the Sharpe ratio.

Parameters

  • self: DataFrame object
  • returns_col: str, column name for returns
  • risk_free_rate: float, risk-free rate

Returns

  • The Sharpe ratio

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import SharpeRatioCalculator

# Create a sample DataFrame
data = {'returns': [0.1, 0.2, 0.15, 0.3, 0.25]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate Sharpe ratio
sharpe_calculator = SharpeRatioCalculator(sdf, 'returns', 0.02)
sharpe_ratio = sharpe_calculator.calculate()
print(sharpe_ratio)

EfficientFrontierRatioCalculator Class

The calculate_efficient_frontier_ratio method calculates the efficient frontier ratio.

Parameters

  • self: DataFrame object

Returns

  • The efficient frontier ratio

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import EfficientFrontierRatioCalculator

# Create a sample DataFrame
data = {'return': [0.1, 0.2, 0.15, 0.3, 0.25]}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate efficient frontier ratio
eff_frontier_calculator = EfficientFrontierRatioCalculator(sdf)
eff_frontier_ratio = eff_frontier_calculator.calculate_efficient_frontier_ratio()
print(eff_frontier_ratio)

RiskParityCalculator Class

The calculate_risk_parity_weights method calculates the risk parity weights.

Parameters

  • self: DataFrame object

Returns

  • The risk parity weights

Example

import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import RiskParityCalculator

# Create a sample DataFrame
data = {'returns': [0.1, 0.2, 0.15, 0.3, 0.25], 'asset': ['A', 'B', 'C', 'D', 'E']}
df = pd.DataFrame(data)

spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)

# Calculate risk parity weights
risk_parity_calculator = RiskParityCalculator(sdf)
risk_parity_weights = risk_parity_calculator.calculate_risk_parity_weights()
print(risk_parity_weights)