IsolationForest is an algorithm to detect anomalies in data.
from sklearn.ensemble import IsolationForest
import pandas as pd
import category_encoders as ce
from collections import Counter
from pyod.models.iforest import IForest
df = pd.read_csv('files_1/data.csv')
# In Scikit-Learn's IsolationForest the decision_function returns values in the range of [-0.5, 0.5] where -.5 is the most anomalous.
Ref: StackOverflow
# Note: OneHot encoder in 'category_encoders' is not dropping one-column as two out of three columns are sufficient to identify a row.
d = pd.DataFrame({'a': ['a', 'b', 'a', 'b', 'c']})
ce_one_hot = ce.OneHotEncoder(cols = ['a'])
ce_one_hot.fit_transform(d)
Output:
# So, we will use "BinaryEncoder". The choice of the encoder is a crucial decision in this exercise. The final result about a point being anomalous or not also depends on your choice of encoding for the data.
enc = ce.binary.BinaryEncoder(cols = ['a', 'c', 'd'])
df = enc.fit_transform(df)
model = IsolationForest(random_state = 42, contamination = 'auto', behaviour = "new").fit(df) # skLearn
sklearn_anomalies = model.fit_predict(df)
Counter(sklearn_anomalies)
# Counter({-1: 20163, 1: 108861})
# Note: Source code for pyod.models.iforest (Ref: iforest)
# PyOD uses the skLearn's implementation of IsolationForest only.
# See this import in 'iforest' code: "from sklearn.ensemble import IsolationForest"
model_pyod = IForest(behaviour="new", max_samples=100)
pyod_anomalies = model_pyod.fit_predict(df)
Counter(pyod_anomalies)
# Counter({1: 12761, 0: 116263})
pyod_anomalies_modified = [1 if x == 0 else -1 for x in pyod_anomalies]
Counter(pyod_anomalies_modified)
# Counter({-1: 12761, 1: 116263})
anomalies_df = pd.DataFrame({
'sklearn': sklearn_anomalies,
'pyodm': pyod_anomalies_modified,
'pyod': pyod_anomalies
})
anomalies_df.corr()
Spark-iForest
Ref: Spark-iForest
Issue with this package is that it depends on a older version of Hadoop and Spark.
We have "spark-3.0.0-preview2-bin-hadoop3.2.tgz".
Required is: pyspark==2.4.0
Traceback (most recent call last):
File "/home/admin/ashish/script.py", line 20, in
iforest = IForest(contamination=0.3, maxDepth=2)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 111, in wrapper
File "/home/admin/anaconda3/lib/python3.7/site-packages/pyspark_iforest/ml/iforest.py", line 245, in __init__
self._java_obj = self._new_java_obj("org.apache.spark.ml.iforest.IForest", self.uid)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 69, in _new_java_obj
File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1554, in __call__
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 98, in deco
File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.ml.iforest.IForest.
: java.lang.NoClassDefFoundError: org/apache/spark/ml/util/MLWritable$class
at org.apache.spark.ml.iforest.IForest.(IForest.scala:335)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.ml.util.MLWritable$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 12 more
Fix for above issue is to use the correct version of Hadoop and Spark.
Software Setup:
Spark Version: v2.2.0
Sklearn Version: v0.19.1
Ref: Read me
from pyspark.ml.linalg import Vectors
import tempfile
spark = SparkSession \
.builder.master("local[*]") \
.appName("IForestExample") \
.getOrCreate()
data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([7.0, 9.0]),),
(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
# NOTE: features need to be dense vectors for the model input
df = spark.createDataFrame(data, ["features"])
from pyspark_iforest.ml.iforest import *
# Init an IForest Object
iforest = IForest(contamination=0.3, maxDepth=2)
# Fit on a given data frame
model = iforest.fit(df)
# Check if the model has summary or not, the newly trained model has the summary info
model.hasSummary
# Show model summary
summary = model.summary
# Show the number of anomalies
summary.numAnomalies
# Predict for a new data frame based on the fitted model
transformed = model.transform(df)
# Collect spark data frame into local df
rows = transformed.collect()
temp_path = tempfile.mkdtemp()
iforest_path = temp_path + "/iforest"
# Save the iforest estimator into the path
iforest.save(iforest_path)
# Load iforest estimator from a path
loaded_iforest = IForest.load(iforest_path)
model_path = temp_path + "/iforest_model"
# Save the fitted model into the model path
model.save(model_path)
# Load a fitted model from a model path
loaded_model = IForestModel.load(model_path)
# The loaded model has no summary info
loaded_model.hasSummary
# Use the loaded model to predict a new data frame
loaded_model.transform(df).show()
To run the above code on a YARN based Spark cluster, then we made following changes:
... from pyspark import SparkConf
... from pyspark.sql.session import SparkSession
... from pyspark.ml.linalg import Vectors
... from pyspark.ml.feature import VectorAssembler
... spark = SparkSession.builder.master("yarn").appName("IForestExample").getOrCreate()
... df_3 = spark.createDataFrame(encoded_pandas_df)
... df_4 = VectorAssembler(inputCols = list(encoded_pandas_df.columns), outputCol="features").transform(df_3)
Following output is seen in PySpark logs:
Row(a=7, b=10, c=0, d=13, features=DenseVector([7.0, 10.0, 0.0, 13.0]), anomalyScore=0.48188775467190864, prediction=0.0)
...
Row(a=12, b=194, c=0, d=15, features=DenseVector([12.0, 194.0, 0.0, 15.0]), anomalyScore=0.47085503202503476, prediction=0.0)
Row(a=13, b=34, c=0, d=15, features=DenseVector([13.0, 34.0, 0.0, 15.0]), anomalyScore=0.47085503202503476, prediction=0.0)
Note: To explain why a point is anomalous, we can train different models taking subsets of dimensions. One at time, then two at a time and so on. Then generate the anomalies from individual models.
A point that is anomalous in the output of super model with all the features should also be anomalous on at least one of the dimensions of the data. This is because the anomalous point has been found out to be in isolation.
Pages
- Index of Lessons in Technology
- Index of Book Summaries
- Index of Book Lists And Downloads
- Index For Job Interviews Preparation
- Index of "Algorithms: Design and Analysis"
- Python Course (Index)
- Data Analytics Course (Index)
- Index of Machine Learning
- Postings Index
- Index of BITS WILP Exam Papers and Content
- Lessons in Investing
- Index of Math Lessons
- Index of Management Lessons
- Book Requests
- Index of English Lessons
- Index of Medicines
- Index of Quizzes (Educational)
Isolation Forest Implementation using skLearn, PyOD, and spark-iForest
Subscribe to:
Comments (Atom)
No comments:
Post a Comment