SECRET OF CSS

Data Statistics and Analysis With Java and Python


Java and Python are two of the most popular computer languages in use today. Both are very mature and provide the tools and technology ecosystems to support developing solutions to the challenging problems that arise in the world of data science. Each has its idiosyncrasies. It’s important to understand how they compare tackling different problems, whether they shine or lack the required flexibility to handle the assigned tasks.  When one is preferable over the other or when they work in tandem complementing each other.

Python is a dynamically typed language, very straightforward to work with, and is certainly the language of choice to do complex computations if we don’t have to worry about intricate program flows. It provides excellent libraries (Pandas, NumPy, Matplotlib, ScyPy, PyTorch, TensorFlow, etc.) to support logical, mathematical, and scientific operations on data structures or arrays.

Java is a very robust language, strongly typed, and therefore has more stringent syntactic rules that make it less prone to programmatic errors. Like Python provides plenty of libraries to work with data structures, linear algebra, machine learning, and data processing (ND4J, Mahout, Spark, Deeplearning4J, etc.).

In this article, we’re going to focus on a narrow study of how to do simple data analysis of large amounts of tabular data and compute some statistics using Java and Python. We’ll see different techniques on how to do the data analysis on each platform, compare how they scale, and the possibilities to apply parallel computing to improve their performance.

Problem Layout

We’re going to do a straightforward analysis of a set of prices for a large list of cities in different states.  For simplicity, we assume that there is a CSV file that contains this information. We read the file and proceed to filter out some states and group the remaining by city-state pairs to do some basic statistics. We’re interested in finding solutions that can perform efficiently and scale well as the size of the input data grows.

A sample of the data is:

city state basePrice actualPrice
La Jose PA  34.17 33.19
Preachers Slough WA 27,46 90.17
Doonan Corners NY 92.0 162.46
Doonan Corners NY 97.45 159.46
Castle Rock WA 162.16 943.21
Marble Rock IA  97.13 391.49
Mineral CA 99.13 289.37
Blountville IN 92.50 557.66
Blountsville IN  122.50 557.66
Coe IN 187.85 943.98
Cecilia KY 92.85 273.61

The purpose is to show how we would approach solving these types of problems using Java and Python. As we can see, the example is very simple and limited in scope, but it will be easy to generalize to more challenging problems.

Java’s Approach

We start defining a Java record that encapsulates the data entries:

record InputEntry(String city, String state, double basePrice, double actualPrice) {}

The record is a new kind of type declaration introduced in JDK 14. It’s a concise way to define an immutable class that provides constructors, accessors, equals, and hash implementations.

Next, we read the CVS file and accumulate them in a list:

List<InputEntry> inputEntries = readRecordEntriesFromCSVFile(recordEntries.csv);

To do the grouping of the input entries by city and state we define:

record CityState(String city, String state) {};

We encapsulate the stats for all the entries that belong to a group with the following class:

record StatsAggregation(StatsAccumulator basePrice, StatsAccumulator actualPrice) {}

StatsAccumulator is part of the Guava library. You can add sets of double values to the class, and it calculates basic statistics like count, mean, variance, or standard deviation. We use the StatsAccumulator to get the statistics for the basePrice and actualPrice of the InputEntry.

Now we have all the ingredients to solve our problem. Java Streams provide a robust framework to implement data manipulation and analysis. Its declarative programming style, support for selection, filtering, grouping, and aggregations, simplify data manipulation and statistical analysis. Its framework also provides a robust implementation that can handle large volumes (even infinite streams) and operates very efficiently through the use of parallelism, laziness, and short-circuit operations. All these features make Java Streams an excellent choice to tackle these types of problems. The implementation is very simple:

Map<CityState, StatsAggregation> stats = inputEntries.stream().
    filter(i -> !(i.state().equals("MN") || i.state().equals("CA"))).collect(
		groupingBy(entry -> new CityState(entry.city(), entry.state()), 
                   collectingAndThen(Collectors.toList(), 
                                     list -> {StatsAccumulator sac = new StatsAccumulator();
                                                sac.addAll(list.stream().mapToDouble(InputEntry::basePrice));
                                              StatsAccumulator sas = new StatsAccumulator();
                                                sas.addAll(list.stream().mapToDouble(InputEntry::actualPrice));
                                               return new StatsAggregation(sac, sas);}
                                       )));

In line 2 of the code, we use Stream::filter. It’s a Boolean-valued function to test the elements in the list. We implement a lambda expression to remove any entries that contain the states of “MN” or “CA.”

We then proceed to collect the list’s elements and invoke Collectors::groupingBy() (line 3), which takes two parameters:

  • A classification function, where we use our CityState record to do the grouping by city and state (line 3).
  • A collector for the downstream that contains the items that belong to the same city-state. We use Collectors::collectingAndThen(line 4), which takes two parameters to do the reduction in two steps:
    • We use Collectors::toList (line 4), which returns a collector that accumulates all of the elements that belong to the same city-state into a list.
    • We apply a finishing transformation to this list. We use a lambda function (lines 5 to 9) to define two StatsAccumulator(s) where we compute the statistics for basePrice and actualPrice entries from the previous list, respectively. Finally, we return a newly created StatsAggregation record that contains these entries.

To summarize, we return a Map<CityState, StatsAggregation> where the keys represent the grouped city-state pairs and their values is a StatsAggregation that contains the statistics for the basePrice and actualPrice for each key.

As we mentioned before, one of the key advantages of using Java Streams is that they provide a simple mechanism to do parallel processing using multithreading. This allows the simultaneous execution of multiple threads utilizing the multicore resources of the CPU. Just adding a “parallel” to the stream as shown:

Map<CityState, StatsAggregation> stats = inputEntries.stream().parallel().

causes the stream framework to subdivide the list of entries into parts and run them in separated threads simultaneously. As all the different threads finish their computation, the framework adds them serially to the resulting Map.

There is an additional optimization using Collectors::groupingByConcurrent in line 4 instead of Collectors:groupingBy. In this case, the framework uses a concurrent map that allows inserting elements from the different threads directly into this Map instead of having to be combined serially.

With these three possibilities, we can check how they perform doing the previous stats calculations (excluding the time to load the data from the CSV file) as the load doubles from five to twenty million entries:

Serial Parallel Parallel & GroupByConcurrent
Five Million Entries 3.045 sec  1.941 sec 1.436 sec
Ten Million Entries 6.405 sec 2.876 sec 2.785 sec
Twenty Million Entries 8.507 sec 4.956 sec 4.537 sec

We see that running in Parallel improves the performance substantially; as the load increases, it almost halves the time. There is also an additional 10% gain using GroupByConcurrent.

Finally, to get the results is trivial; for example, to obtain the stats for Blountsville, IN, we just need to:

StatsAggregation aggreg = stateAggr.get(new CityState("Blountsville ", "IN"));
System.out.println("Blountsville, IN");
System.out.println("basePrice.mean: " + aggreg.basePrice().mean());
System.out.println("basePrice.populationVariance: " + aggreg.basePrice().populationVariance());
System.out.println("basePrice.populationStandardDeviation: " + aggreg.basePrice().populationStandardDeviation());
System.out.println("actualPrice.mean: " + aggreg.basePrice().mean());
System.out.println("actualPrice.populationVariance: " + aggreg.actualPrice().populationVariance());
System.out.println("actualPrice.populationStandardDeviation: " + aggreg.actualPrice().populationStandardDeviation());

The results that we obtain:

Blountsville : IN
basePrice.mean: 50.302588996763795
basePrice.sampleVariance: 830.7527439246837
basePrice.sampleStandardDeviation: 28.822781682632293
basePrice.count: 309
basePrice.min: 0.56
basePrice.max: 99.59
actualPrice.mean: 508.8927831715211
actualPrice.sampleVariance: 78883.35878833274
actualPrice.sampleStandardDeviation: 280.86181440048546
actualPrice.count: 309
actualPrice.min: 0.49
actualPrice.max: 999.33

Python’s Approach

In Python, we have several libraries that can handle data statistics and analysis. However, we find that the Pandas library is very well suited to processing large amounts of tabular data and provides very efficient filtering, grouping, and statistical analysis methods. 

Let’s review how we would analyze the previous data using Python:

import pandas as pd

def group_aggregations(df_group_by):
    df_result = df_group_by.agg(
        {'basePrice': ['count', 'min', 'max', 'mean', 'std', 'var'],
         'actualPrice': ['count', 'min', 'max', 'mean', 'std', 'var']}
    )
    return df_result
  
if __name__ == '__main__':
    df = pd.read_csv("recordEntries.csv")
    excluded_states = ['MN', 'CA']
    df_st = df.loc[~ df['state'].isin(excluded_states)]
    group_by = df_st.groupby(['city', 'state'], sort=False)
    aggregated_results = group_aggregations(group_by)

In the main section, we start by invoking pandas.read_csv() (line 11) to load the comma-separated values in the file into a Pandas DataFrame.  

In line 13 we use ~df['state'].isin(excluded_states) to get a Pandas Series of Booleans that have False for the excluded states (MN and CA). Finally, we use pandas.loc() on this series to filter them out.

Next, we use DataFrame.groupby() in line 14 to group by city and state. The result is processed by group_aggregations() to get the statistics for each group of the basePrice and actualPrice.

We see that the implementation in Python is very straightforward. To print the results for Blountsville, IN:

    print(aggregated_results.loc['Blountsville', 'IN']['basePrice'])
    print(aggregated_results.loc['Blountsville', 'IN']['actualPrice'])

This gives us the stats:

base_price:
Name: (Blountsville, IN), dtype: float64
count    309.000000
min        0.560000
max       99.590000
mean      50.302589
std       28.822782
var      830.752744
actual_price:
Name: (Blountsville, IN), dtype: float64
count      309.000000
min          0.490000
max        999.330000
mean       508.892783
std        280.861814
var      78883.358788

To run the previous code in parallel, we have to keep in mind that Python doesn’t support a fine-grained locking mechanism as Java does. We have to contend with the global interpreter lock (GIL) that only allows one thread to execute at a time no matter how many CPU multicores or threads you have. We won’t get into the details.

To support concurrency, we have to consider that we have a CPU-intensive process, therefore, the best approach is to use multiprocessing. In this case, we have to modify our implementation: 

from multiprocessing import Pool
import pandas as pd

def aggreg_basePrice(df_group):
    ct_st, grp = df_group
    return ct_st, grp.basePrice.agg(['count', 'min', 'max', 'mean', 'std', 'var'])
  
if __name__ == '__main__':
    df = pd.read_csv("recordEntries.csv")
    start = time.perf_counter()
    excluded_states = ['MN', 'CA']
    filtr = ~ df['state'].isin(excluded_states)
    df_st = df.loc[filtr]
    grouped_by_ct_st = df_st.groupby(['city', 'state'], sort=False)
    with Pool() as p:
        list_parallel = p.map(aggreg_basePrice, [(ct_st, grouped) for ct_st, grouped in grouped_by_ct_st])
    print(f'Time elapsed parallel: {round(finish - start, 2)} sec')

As we did before, we use Pandas groupby() to get the data grouped by city and state (line 14). In the next line, we use the Pool() provided by the multiprocessing library to map the grouped data using aggreg_basePrice to compute the statistics for each group. The Pool() divides the data and proceeds to compute the stats in several parallel independent processes.

We’ll not review the previous code in detail since, as we’ll show in the table below that, multiprocessing is much slower than running the process serially. Therefore it’s not worth using this approach for these types of problems. 

Another possibility to run the code concurrently is to use Modin. Modin provides a seamless way to parallelize your code and is extremely useful when you have to process large amounts of data. Changing the import statement from import pandas as pd to import modin.pandas as pd runs your code in parallel and takes advantage of the cluster of cores that might be available in your environment to speed up the code execution. For more details on how it works, please read the documentation.

As we did with Java, we provide the following table with the runtimes for the different scenarios that we just covered  (as before, we exclude the time to read the data from the CSV file):

Serial Multi Process Modin Proc
Five Million Entries 1.94 sec  20.25 sec 6.99 sec
Ten Million Entries 4.07 sec 25.1 sec 12.88 sec
Twenty Million Entries 7.62 sec 36.2 sec 25.94 sec

We see that running the code serially in Python is even slightly faster than in Java. However, using multiprocessing degrades substantially the performance. Using Moding improves the results but still is more advantageous to run the process serially.

It’s worth mentioning that, as we did before, we’re excluding the time to read the data from the CSV file from our time computations. 

We see that with CPU-intensive processes in Pandas, there is no advantage in parallelizing the code. In a sense, this is a reflection of how Pandas was originally architected. However, it’s impressive how fast Pandas runs in Serial mode and also scales very well even for large amounts of data.

It’s important to point out that the speed of the calculations for the stats in Python depends on how they are performed. To get fast computations, one needs to be careful in applying the statistical functions that are needed. For example, doing a simple pandas.DataFrame.describe() to get the stats will run very slowly.

We have seen that Java’s Streams or Python’s Pandas are two excellent choices to do analysis and statistics of large amounts of data. Both have very solid frameworks with lots of support to achieve great performance and scaling.

Java provides a very strong infrastructure, ideal to work with complex program flows. It’s very performant, allowing to efficiently run the processes in parallel. This makes it an ideal choice when there is a premium on getting the results quickly.

Python is very well fitted to do math and statistics. It’s very straightforward, reasonably fast, and well-suited to doing complex calculations.



News Credit

%d bloggers like this: