Ophelian Spark ML Functions
NullDebug Class
The null_clean
method cleans and debugs null values in a Spark DataFrame within a determined offset of proportionality.
Parameters
self
: DataFrame objectpartition_by
:str
, name of the partition columnoffset
:float
, offset controller
Returns
- A new Spark DataFrame without columns that had more Nulls than the set offset
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import NullDebug
# Create a sample DataFrame
data = {'partition': ['A', 'B', 'C', 'A', 'B', 'C'], 'values': [1, None, 3, None, 5, 6]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Clean null values
clean_df = NullDebug.null_clean(sdf, 'partition', 0.5)
clean_df.show()
CorrMat Class
The correlation_matrix
method computes the correlation matrix for a given DataFrame.
Parameters
self
: DataFrame objectpivot_col
:str
, column name for pivotingmethod
:str
, method for correlation (default is "pearson")offset
:float
, threshold for filtering correlationsrep
:int
, number of partitions for the computation
Returns
- A DataFrame with correlation values
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import CorrMat
# Create a sample DataFrame
data = {'pivot': [1, 2, 3, 4], 'value1': [1.0, 2.0, 3.0, 4.0], 'value2': [2.0, 4.0, 6.0, 8.0]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Compute correlation matrix
corr_df = CorrMat.correlation_matrix(sdf, 'pivot')
corr_df.show()
Shape Class
The shape
method returns the shape of the DataFrame.
Parameters
self
: DataFrame object
Returns
- Tuple representing the shape of the DataFrame
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Shape
# Create a sample DataFrame
data = {'col1': [1, 2, 3], 'col2': [4, 5, 6]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Get shape of DataFrame
shape = Shape.shape(sdf)
print(shape)
Rolling Class
The rolling_down
method performs a rolling operation on a column.
Parameters
self
: DataFrame objectop_col
:str
, column name to apply the operation onnat_order
:str
, column name for natural orderingmin_p
:int
, minimum periods (default is 2)window
:int
, window size (default is 2)method
:str
, method of rolling operation (default is "sum")
Returns
- A DataFrame with the rolling operation applied
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Rolling
# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'value': [1, 2, 3, 4, 5]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Apply rolling operation
rolling_df = Rolling.rolling_down(sdf, 'value', 'date')
rolling_df.show()
DynamicSampling Class
The sample_n
method samples n
rows from the DataFrame.
Parameters
self
: DataFrame objectn
:int
, number of samples
Returns
- A DataFrame with
n
sampled rows
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import DynamicSampling
# Create a sample DataFrame
data = {'col1': [1, 2, 3, 4, 5], 'col2': [6, 7, 8, 9, 10]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Sample 2 rows
sampled_df = DynamicSampling.sample_n(sdf, 2)
sampled_df.show()
Selects Class
The select_regex
method selects columns based on a regular expression.
Parameters
self
: DataFrame objectreg_expr
:str
, regular expression pattern
Returns
- A DataFrame with selected columns
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Selects
# Create a sample DataFrame
data = {'col1': [1, 2], 'col2': [3, 4], 'col3': [5, 6]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Select columns with regex
selected_df = Selects.select_regex(sdf, 'col[1-2]')
selected_df.show()
MapItem Class
The map_item
method maps column values based on a given expression.
Parameters
self
: DataFrame objectmap_expr
:dict
, mapping expressionorigin_col
:str
, original column name
Returns
- A DataFrame with mapped values
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import MapItem
# Create a sample DataFrame
data = {'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Map column values
mapping_expr = {1: 'one', 2: 'two', 3: 'three'}
mapped_df = MapItem.map_item(sdf, mapping_expr, 'col1')
mapped_df.show()
Reshape Class
The narrow_format
method reshapes the DataFrame from wide to narrow format.
Parameters
self
: DataFrame objectfix_cols
:str
, fixed column namenew_cols
:list
, new column names
Returns
- A reshaped DataFrame
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Reshape
# Create a sample DataFrame
data = {'id': [1, 2], 'val1': [3, 4], 'val2': [5, 6]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Reshape to narrow format
narrow_df = Reshape.narrow_format(sdf, 'id', ['variable', 'value'])
narrow_df.show()
PctChange Class
The pct_change
method calculates the percentage change between rows.
Parameters
self
: DataFrame objectperiods
:int
, number of periods to shift (default is 1)order_by
:str
, column name for orderingpct_cols
:list
, columns for percentage changepartition_by
:str
, column name for partitioningdesc
:bool
, descending order (default is False)
Returns
- A DataFrame
with percentage changes
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import PctChange
# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'value': [1, 2, 3, 4, 5]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate percentage change
pct_df = PctChange.pct_change(sdf, order_by='date', pct_cols=['value'])
pct_df.show()
CrossTabular Class
The cross_pct
method calculates the cross-tabulation percentage.
Parameters
self
: DataFrame objectgroup_by
:str
, column name for groupingpivot_col
:str
, column name for pivotingagg_dict
:dict
, aggregation dictionaryoperand
:str
, operand for calculation (default is "sum")cols
:str
, columns to include (default is "all")
Returns
- A DataFrame with cross-tabulation percentages
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import CrossTabular
# Create a sample DataFrame
data = {'group': ['A', 'B', 'A', 'B'], 'pivot': [1, 2, 1, 2], 'value': [10, 20, 30, 40]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate cross-tabulation percentage
cross_pct_df = CrossTabular.cross_pct(sdf, 'group', 'pivot', {'value': 'sum'})
cross_pct_df.show()
Joins Class
The join_small_right
method performs a broadcast join on a small right DataFrame.
Parameters
self
: DataFrame objectsmall_df
: DataFrame to joinon
:str
, column name for joininghow
:str
, join type
Returns
- A joined DataFrame
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import Joins
# Create sample DataFrames
data1 = {'col1': [1, 2], 'col2': [3, 4]}
data2 = {'col1': [1, 2], 'col3': [5, 6]}
df1 = pd.DataFrame(data1)
df2 = pd.DataFrame(data2)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)
# Perform broadcast join
joined_df = Joins.join_small_right(sdf1, sdf2, 'col1', 'inner')
joined_df.show()
DaskSpark Class
The spark_to_dask
method converts a Spark DataFrame to a Dask DataFrame.
Parameters
self
: DataFrame objectoption
:str
, file format option (default is "csv")mode
:str
, write mode (default is "overwrite")checkpoint_path
:str
, checkpoint path
Returns
- A Dask DataFrame
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import DaskSpark
# Create a sample DataFrame
data = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Convert Spark DataFrame to Dask DataFrame
dask_df = DaskSpark.spark_to_dask(sdf)
print(dask_df.compute())
SortinoRatioCalculator Class
The sortino_ratio
method calculates the Sortino ratio.
Parameters
self
: DataFrame object
Returns
- The Sortino ratio
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import SortinoRatioCalculator
# Create a sample DataFrame
data = {'date': pd.date_range(start='1/1/2020', periods=5), 'close': [100, 102, 104, 103, 105]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate Sortino ratio
sortino_calculator = SortinoRatioCalculator(sdf)
sortino_ratio = sortino_calculator.sortino_ratio()
print(sortino_ratio)
SharpeRatioCalculator Class
The calculate
method calculates the Sharpe ratio.
Parameters
self
: DataFrame objectreturns_col
:str
, column name for returnsrisk_free_rate
:float
, risk-free rate
Returns
- The Sharpe ratio
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import SharpeRatioCalculator
# Create a sample DataFrame
data = {'returns': [0.1, 0.2, 0.15, 0.3, 0.25]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate Sharpe ratio
sharpe_calculator = SharpeRatioCalculator(sdf, 'returns', 0.02)
sharpe_ratio = sharpe_calculator.calculate()
print(sharpe_ratio)
EfficientFrontierRatioCalculator Class
The calculate_efficient_frontier_ratio
method calculates the efficient frontier ratio.
Parameters
self
: DataFrame object
Returns
- The efficient frontier ratio
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import EfficientFrontierRatioCalculator
# Create a sample DataFrame
data = {'return': [0.1, 0.2, 0.15, 0.3, 0.25]}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate efficient frontier ratio
eff_frontier_calculator = EfficientFrontierRatioCalculator(sdf)
eff_frontier_ratio = eff_frontier_calculator.calculate_efficient_frontier_ratio()
print(eff_frontier_ratio)
RiskParityCalculator Class
The calculate_risk_parity_weights
method calculates the risk parity weights.
Parameters
self
: DataFrame object
Returns
- The risk parity weights
Example
import pandas as pd
from pyspark.sql import SparkSession
from ophelian.ophelian_spark.functions import RiskParityCalculator
# Create a sample DataFrame
data = {'returns': [0.1, 0.2, 0.15, 0.3, 0.25], 'asset': ['A', 'B', 'C', 'D', 'E']}
df = pd.DataFrame(data)
spark = SparkSession.builder.appName("example").getOrCreate()
sdf = spark.createDataFrame(df)
# Calculate risk parity weights
risk_parity_calculator = RiskParityCalculator(sdf)
risk_parity_weights = risk_parity_calculator.calculate_risk_parity_weights()
print(risk_parity_weights)
Updated 13 days ago