Transforming your Data¶
DataFrameTransformer class¶
- DataFrameTransformer(df)
DataFrameTransformer methods
- Column operations:
- DataFrameTransformer.drop_col(columns)
- DataFrameTransformer.replace_col(search, changeTo, columns)
- DataFrameTransformer.keep_col(columns)
- DataFrameTransformer.rename_col(column, newName)
- DataFrameTransformer.move_col(column, ref_col, position)
- Row operations :
- DataFrameTransformer.dropRow(columns)
- DataFrameTransformer.delete_row(func)
- String operations:
- DataFrameTransformer.trim_col(columns)
- DataFrameTransformer.clear_accents(columns)
- DataFrameTransformer.lookup(column, list_str, str_to_replace)
- DataFrameTransformer.remove_special_chars(columns)
- DataFrameTransformer.date_transform(column, dateFormat)
- General operation function:
- DataFrameTransformer.set_col(columns, func, dataType)
- Others: - DataFrameTransformer.count_items(col_id, col_search, new_col_feature, search_string) - DataFrameTransformer.age_calculate(column)
DataFrameTransformer class receives a dataFrame as an argument. This class has all methods listed above.
Note: Every possible transformation make changes over this dataFrame and overwrites it.
The following code shows how to instantiate the class to transform a dataFrame:
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("population", IntegerType(), True)])
countries = ['Japan', 'USA', 'France', 'Spain']
cities = ['Tokyo', 'New York', ' Paris ', 'Madrid']
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
# DataFrameTransformer Instantiation:
transformer = op.DataFrameTransformer(df)
transformer.show()
Output:
city country population Tokyo Japan 37800000 New York USA 19795791 Paris France 12341418 Madrid Spain 6489162
Transformer.trim_col(columns)¶
This methods cut left and right extra spaces in column strings provided by user.
columns
argument is expected to be a string o a list of column names.
If a string "*"
is provided, the method will do the trimming
operation in whole dataframe.
Example:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Triming string blank spaces:
transformer.trim_col("*")
# Printing trimmed dataFrame:
print('Trimmed dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
Trimmed dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
Transformer.drop_col(columns)¶
This method eliminate the list of columns provided by user.
columns
argument is expected to be a string or a list of columns
names.
Example:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# drop column specified:
transformer.drop_col("country")
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | population |
Tokyo | 37800000 |
New York | 19795791 |
Paris | 12341418 |
Madrid | 6489162 |
Transformer.keep_col(columns)¶
This method keep only columns specified by user with columns
argument in DataFrame.
columns
argument is expected to be a string or a list of columns names.
Example:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Keep columns specified by user:
transformer.keep_col(['city', 'population'])
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | population |
Tokyo | 37800000 |
New York | 19795791 |
Paris | 12341418 |
Madrid | 6489162 |
Transformer.replace_col(search, changeTo, columns)¶
This method search the search
value argument in the DataFrame
columns specified in columns
to replace it for changeTo
value.
search
and changeTo
are expected to be numbers and same dataType
(‘integer’, ‘string’, etc) each other. columns
argument is expected
to be a string or list of string column names.
If columns = '*'
is provided, searching and replacing action is made
in all columns of DataFrame that have same dataType of search
and
changeTo
.
Example:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Replace values in columns specified by user:
transformer.replace_col(search='Tokyo', changeTo='Maracaibo', columns='city')
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
Maracaibo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
Transformer.delete_row(func)¶
This method deletes rows in columns according to condition provided by user.
delete_row
method receives a function func
as an input parameter.
func
is required to be a lambda
function, which is a native
python feature.
Example 1:
# Importing sql functions
from pyspark.sql.functions import col
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Replace values in columns specified by user:
func = lambda pop: (pop > 6500000) & (pop <= 30000000)
transformer.delete_row(func(col('population')))
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
New York | USA | 19795791 |
Paris | France | 12341418 |
Example 2:
# Importing sql functions
from pyspark.sql.functions import col
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Delect rows where Tokyo isn't found in city
# column or France isn't found in country column:
func = lambda city, country: (city == 'Tokyo') | (country == 'France')
transformer.delete_row(func(col('city'), col('country')))
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
Paris | France | 12341418 |
Transformer.set_col(columns, func, data_type)¶
This method can be used to make math operations or string manipulations in row of dataFrame columns.
The method receives a list of columns (or a single column) of dataFrame
in columns
argument. A lambda
function default called func
and a string which describe the data_type
that func
function
should return.
columns
argument is expected to be a string or a list of columns
names and dataType
a string indicating one of the following options:
'integer', 'string', 'double','float'
.
It is a requirement for this method that the dataType provided must be
the same to dataType of columns
. On the other hand, if user writes
columns == '*'
the method makes operations in func
if only if
columns have same dataType that data_type
argument.
Here some examples:
Example: 1
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
print (' Replacing a number if value in cell is greater than 5:')
# Replacing a number:
func = lambda cell: (cell * 2) if (cell > 14000000 ) else cell
transformer.set_col(['population'], func, 'integer')
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
Replacing a number if value in cell is greater than 14000000:
New dataFrame:
city | country | population |
Tokyo | Japan | 75600000 |
New York | USA | 39591582 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
Example 2:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Capital letters:
func = lambda cell: cell.upper()
transformer.set_col(['city'], func, 'string')
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Tokyo | Japan | 37800000 |
New York | USA | 19795791 |
Paris | France | 12341418 |
Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
TOKYO | Japan | 37800000 |
NEW YORK | USA | 19795791 |
PARIS | France | 12341418 |
MADRID | Spain | 6489162 |
Transformer.clear_accents(columns)¶
This function deletes accents in strings dataFrames, it does not eliminate main character, but only deletes special tildes.
clear_accents
method receives column names (column
) as argument.
columns
must be a string or a list of column names.
E.g:
Building a dummy dataFrame:
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("population", IntegerType(), True)])
countries = ['Colombia', 'US@A', 'Brazil', 'Spain']
cities = ['Bogotá', 'New York', ' São Paulo ', '~Madrid']
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
New DF:
city | country | population |
Bogotá | Colombia | 37800000 |
New York | US@A | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Clear accents:
transformer.clear_accents(columns='*')
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Bogotá | Colombia | 37800000 |
New York | US@A | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
Bogota | Colombia | 37800000 |
New York | US@A | 19795791 |
Sao Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
DataFrameTransformer.remove_special_chars(columns)¶
This method remove special characters (i.e. !”#$%&/()=?) in columns of dataFrames.
remove_special_chars
method receives columns
as input. columns
must be a string or a list of strings.
E.g:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Remove special characters:
transformer.remove_special_chars(columns=['city', 'country'])
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Bogotá | Colombia | 37800000 |
New York | US@A | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
Bogotá | Colombia | 37800000 |
New York | USA | 19795791 |
São Paulo | Brazil | 12341418 |
Madrid | Spain | 6489162 |
DataFrameTransformer.rename_col(columns)¶
This method changes name of column specified by columns
argument.
columns
Is a List of tuples. Each tuple has de following form: (oldColumnName, newColumnName).
E.g:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
names = [('city', 'villes')]
# Changing name of columns:
transformer.rename_col(names)
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Bogotá | Colombia | 37800000 |
New York | US@A | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
New dataFrame:
villes | country | population |
Bogotá | Colombia | 37800000 |
New York | US@A | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
DataFrameTransformer.lookup(column, list_str, str_to_replace)¶
This method search a list of strings specified in list_str
argument
among rows in column dataFrame and replace them for str_to_replace
.
lookup
can only be run in StringType columns.
E.g:
Building a dummy dataFrame:
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True),
StructField("population", IntegerType(), True)])
countries = ['Venezuela', 'Venezuela', 'Brazil', 'Spain']
cities = ['Caracas', 'Ccs', ' São Paulo ', '~Madrid']
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
New DF:
city | country | population |
Caracas | Venezuela | 37800000 |
Ccs | Venezuela | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Capital letters:
transformer.lookup('city', "Caracas", ['Caracas', 'Ccs'])
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Caracas | Venezuela | 37800000 |
Ccs | Venezuela | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
New dataFrame:
city | country | population |
Caracas | Venezuela | 37800000 |
Caracas | Venezuela | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
DataFrameTransformer.move_col(column, ref_col, position)¶
This function move a column from one position to another according to
the reference column ref_col
and position
argument.
position
argument must be the following string: ‘after’ or ‘before’.
If position = 'after'
then, column
is placed just after
the
reference column ref_col
provided by user.
E.g:
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Capital letters:
transformer.move_col('city', 'country', position='after')
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | country | population |
Caracas | Venezuela | 37800000 |
Ccs | Venezuela | 19795791 |
São Paulo | Brazil | 12341418 |
~Madrid | Spain | 6489162 |
New dataFrame:
country | city | population |
Venezuela | Caracas | 37800000 |
Venezuela | Ccs | 19795791 |
Brazil | São Paulo | 12341418 |
Spain | ~Madrid | 6489162 |
DataFrameTransformer.count_items(col_id, col_search, new_col_feature, search_string):¶
This function can be used to split a feature with some extra information in order to make a new column feature.
See the example bellow to more explanations:
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("bill_id", IntegerType(), True),
StructField("foods", StringType(), True)])
id_ = [1, 2, 2, 3, 3, 3, 3, 4, 4]
foods = ['Pizza', 'Pizza', 'Beer', 'Hamburger', 'Beer', 'Beer', 'Beer', 'Pizza', 'Beer']
# Dataframe:
df = op.spark.createDataFrame(list(zip(id_, foods)), schema=schema)
df.show()
New DF:
bill id | foods |
1 | Pizza |
2 | Pizza |
2 | Beer |
3 | Hamburger |
3 | Beer |
3 | Beer |
3 | Beer |
4 | Pizza |
4 | Beer |
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Transformation:
transformer.count_items(col_id="bill_id",col_search="foods",new_col_feature="beer_count",search_string="Beer")
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
bill id | foods |
1 | Pizza |
2 | Pizza |
2 | Beer |
3 | Hamburger |
3 | Beer |
3 | Beer |
3 | Beer |
4 | Pizza |
4 | Beer |
New dataFrame:
bill_id | beer_count |
3 | 3 |
4 | 1 |
2 | 1 |
DataFrameTransformer.date_transform(column, current_format, output_format)¶
This method changes date format in column
from current_format
to
output_format
.
The column of dataFrame is expected to be StringType or DateType.
date_transform
returns column name.
E.g.
date_transform(self, column, current_format, output_format)
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("city", StringType(), True),
StructField("dates", StringType(), True),
StructField("population", IntegerType(), True)])
dates = ['1991/02/25', '1998/05/10', '1993/03/15', '1992/07/17']
cities = ['Caracas', 'Ccs', ' São Paulo ', '~Madrid']
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = op.spark.createDataFrame(list(zip(cities, dates, population)), schema=schema)
df.show()
New DF:
city | dates | population |
Caracas | 1991/02/25 | 37800000 |
Ccs | 1998/05/10 | 19795791 |
São Paulo | 1993/03/15 | 12341418 |
~Madrid | 1992/07/17 | 6489162 |
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Printing of original dataFrame:
print('Original dataFrame:')
transformer.show()
# Tranform string date format:
transformer.date_transform(columns="dates",
current_format="yyyy/mm/dd",
output_format="dd-mm-yyyy")
# Printing new dataFrame:
print('New dataFrame:')
transformer.show()
Original dataFrame:
city | dates | population |
Caracas | 1991/02/25 | 37800000 |
Ccs | 1998/05/10 | 19795791 |
São Paulo | 1993/03/15 | 12341418 |
~Madrid | 1992/07/17 | 6489162 |
New dataFrame:
city | dates | population |
Caracas | 25-02-1991 | 37800000 |
Ccs | 10-05-1998 | 19795791 |
São Paulo | 15-03-1993 | 12341418 |
~Madrid | 17-07-1992 | 6489162 |
DataFrameTransformer.to_csv(path_name, header=True, mode=”overwrite”, sep=”,”, *args, **kargs)¶
This functions writes a Spark dataframe as a CSV in the specified path.
This method require the path_name
to be specified by the user with the name and path of the file to
be saved.
With the mode
Specifies the behavior of the save operation when data already exists:
- “append”: Append contents of this DataFrame to existing data.
- “overwrite” (default case): Overwrite existing data.
- “ignore”: Silently ignore this operation if data already exists.
- “error”: Throw an exception if data already exists.
And with the sep
argument you can set the single character as a separator for each field and value. If None is set,
it uses the default value (“,”).
You can also pass all the options that Spark allows as **kargs to the function.
E.g.
# Importing sql types
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
# Importing optimus
import optimus as op
# Building a simple dataframe:
schema = StructType([
StructField("bill_id", IntegerType(), True),
StructField("foods", StringType(), True)])
id_ = [1, 2, 2, 3, 3, 3, 3, 4, 4]
foods = ['Pizza', 'Pizza', 'Beer', 'Hamburger', 'Beer', 'Beer', 'Beer', 'Pizza', 'Beer']
# Dataframe:
df = op.spark.createDataFrame(list(zip(id_, foods)), schema=schema)
df.show()
DF:
bill id | foods |
1 | Pizza |
2 | Pizza |
2 | Beer |
3 | Hamburger |
3 | Beer |
3 | Beer |
3 | Beer |
4 | Pizza |
4 | Beer |
Now lets write this DF as a CSV
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Write DF as CSV
transformer.to_csv("test.csv")
This will create a folder with the name “test.csv” in the current path, and inside it will be te CSV with the
concept. But with the read_csv
function you can just pass the name “test.csv” and Optimus will understand.
DataFrameTransformer.replace_na(value, columns=None)¶
This method replace nulls with specified value.
columns
argument is an optional list of column names to consider. Columns specified in subset that do not have
matching data type are ignored. For example, if value is a string, and subset contains a non-string column,
then the non-string column is simply ignored. If columns == “*” then it will choose all columns.
value
argument is the value to replace nulls with. If the value is a dict, then subset is ignored and value
must be a mapping from column name (string) to replacement value. The replacement value must be an int, long,
float, or string.
Let’s download a sample data using our amazing read_url function.
If we examine this DF we see that there are some missing values.
a | b |
1.0 | NaN |
2.0 | NaN |
NaN | 3.0 |
4.0 | 4.0 |
5.0 | 5.0 |
Remember that we have the impute_missing function that lets you choose to use the mean or the median of the columns in which the missing values are located for your imputation. But with replace_na you can say replace the nulls in one, or all columns in the dataframe with a specific value. For this example we will replace NA with 0’s.
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(df)
# Replace NA with 0's
transformer.replace_na(0.0, columns="*")
# Show DF
transformer.show()
a | b |
1.0 | 0.0 |
2.0 | 0.0 |
0.0 | 3.0 |
4.0 | 4.0 |
5.0 | 5.0 |
And that’s it!