Isolation Forest Implementation using skLearn, PyOD, and spark-iForest



IsolationForest is an algorithm to detect anomalies in data.

from sklearn.ensemble import IsolationForest
import pandas as pd
import category_encoders as ce
from collections import Counter
from pyod.models.iforest import IForest

df = pd.read_csv('files_1/data.csv') 

# In Scikit-Learn's IsolationForest the decision_function returns values in the range of [-0.5, 0.5] where -.5 is the most anomalous.
Ref: StackOverflow

# Note: OneHot encoder in 'category_encoders' is not dropping one-column as two out of three columns are sufficient to identify a row.

d = pd.DataFrame({'a': ['a', 'b', 'a', 'b', 'c']})
ce_one_hot = ce.OneHotEncoder(cols = ['a'])
ce_one_hot.fit_transform(d) 

Output:
# So, we will use "BinaryEncoder". The choice of the encoder is a crucial decision in this exercise. The final result about a point being anomalous or not also depends on your choice of encoding for the data. enc = ce.binary.BinaryEncoder(cols = ['a', 'c', 'd']) df = enc.fit_transform(df) model = IsolationForest(random_state = 42, contamination = 'auto', behaviour = "new").fit(df) # skLearn sklearn_anomalies = model.fit_predict(df) Counter(sklearn_anomalies) # Counter({-1: 20163, 1: 108861}) # Note: Source code for pyod.models.iforest (Ref: iforest) # PyOD uses the skLearn's implementation of IsolationForest only. # See this import in 'iforest' code: "from sklearn.ensemble import IsolationForest" model_pyod = IForest(behaviour="new", max_samples=100) pyod_anomalies = model_pyod.fit_predict(df) Counter(pyod_anomalies) # Counter({1: 12761, 0: 116263}) pyod_anomalies_modified = [1 if x == 0 else -1 for x in pyod_anomalies] Counter(pyod_anomalies_modified) # Counter({-1: 12761, 1: 116263}) anomalies_df = pd.DataFrame({ 'sklearn': sklearn_anomalies, 'pyodm': pyod_anomalies_modified, 'pyod': pyod_anomalies }) anomalies_df.corr()

Spark-iForest

Ref: Spark-iForest Issue with this package is that it depends on a older version of Hadoop and Spark. We have "spark-3.0.0-preview2-bin-hadoop3.2.tgz". Required is: pyspark==2.4.0 Traceback (most recent call last): File "/home/admin/ashish/script.py", line 20, in iforest = IForest(contamination=0.3, maxDepth=2) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 111, in wrapper File "/home/admin/anaconda3/lib/python3.7/site-packages/pyspark_iforest/ml/iforest.py", line 245, in __init__ self._java_obj = self._new_java_obj("org.apache.spark.ml.iforest.IForest", self.uid) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 69, in _new_java_obj File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1554, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.ml.iforest.IForest. : java.lang.NoClassDefFoundError: org/apache/spark/ml/util/MLWritable$class at org.apache.spark.ml.iforest.IForest.(IForest.scala:335) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.spark.ml.util.MLWritable$class at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 12 more Fix for above issue is to use the correct version of Hadoop and Spark. Software Setup: Spark Version: v2.2.0 Sklearn Version: v0.19.1 Ref: Read me from pyspark.ml.linalg import Vectors import tempfile spark = SparkSession \ .builder.master("local[*]") \ .appName("IForestExample") \ .getOrCreate() data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([7.0, 9.0]),), (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] # NOTE: features need to be dense vectors for the model input df = spark.createDataFrame(data, ["features"]) from pyspark_iforest.ml.iforest import * # Init an IForest Object iforest = IForest(contamination=0.3, maxDepth=2) # Fit on a given data frame model = iforest.fit(df) # Check if the model has summary or not, the newly trained model has the summary info model.hasSummary # Show model summary summary = model.summary # Show the number of anomalies summary.numAnomalies # Predict for a new data frame based on the fitted model transformed = model.transform(df) # Collect spark data frame into local df rows = transformed.collect() temp_path = tempfile.mkdtemp() iforest_path = temp_path + "/iforest" # Save the iforest estimator into the path iforest.save(iforest_path) # Load iforest estimator from a path loaded_iforest = IForest.load(iforest_path) model_path = temp_path + "/iforest_model" # Save the fitted model into the model path model.save(model_path) # Load a fitted model from a model path loaded_model = IForestModel.load(model_path) # The loaded model has no summary info loaded_model.hasSummary # Use the loaded model to predict a new data frame loaded_model.transform(df).show() To run the above code on a YARN based Spark cluster, then we made following changes: ... from pyspark import SparkConf ... from pyspark.sql.session import SparkSession ... from pyspark.ml.linalg import Vectors ... from pyspark.ml.feature import VectorAssembler ... spark = SparkSession.builder.master("yarn").appName("IForestExample").getOrCreate() ... df_3 = spark.createDataFrame(encoded_pandas_df) ... df_4 = VectorAssembler(inputCols = list(encoded_pandas_df.columns), outputCol="features").transform(df_3) Following output is seen in PySpark logs: Row(a=7, b=10, c=0, d=13, features=DenseVector([7.0, 10.0, 0.0, 13.0]), anomalyScore=0.48188775467190864, prediction=0.0) ... Row(a=12, b=194, c=0, d=15, features=DenseVector([12.0, 194.0, 0.0, 15.0]), anomalyScore=0.47085503202503476, prediction=0.0) Row(a=13, b=34, c=0, d=15, features=DenseVector([13.0, 34.0, 0.0, 15.0]), anomalyScore=0.47085503202503476, prediction=0.0) Note: To explain why a point is anomalous, we can train different models taking subsets of dimensions. One at time, then two at a time and so on. Then generate the anomalies from individual models. A point that is anomalous in the output of super model with all the features should also be anomalous on at least one of the dimensions of the data. This is because the anomalous point has been found out to be in isolation.

No comments:

Post a Comment