Quick take-aways from RStudio::conf Training Day 02 (Part 2 – sparklyr)

It’s now a week since I returned from RStudio::conf 2019 in Austin, Texas and in this blog I’m going to focus using the sparklyr package (spark-lee-r) which enables R to connect to an Apache Spark instance for general purpose cluster-computing. sparklyr has its own inbuilt functions as well as allowing dbplyr to do all of the amazing features I described in my first blog post: https://surgicalinformatics.org/quick-take-aways-from-rstudioconf-training-day-02/. The code contained in this blog should work on your own local RStudio without any preconfigured cluster should you wish to experiment with sparklyr’s capabilities.

Establishing a connection

The following example code will help set up a local connection in order to experiment with some of the functionality of the dbplyr package. This is really useful if you are waiting for data or access to a database so you can have pre-prepared scripts in progress without the remote database connection.

The connection is typically stored as “sc” which you can also see in the Environment. This is the object that is referenced each time data is accessed in the spark cluster.

To check that the new connection to a spark instance has been established go to the connections tab in your RStudio interface to see if the connection has been established (this is typically located alongside your “Environment” and “History” tabs. Click on the Spark UI button to view the user interface for the current spark session in a browser (this will be helpful later if you want to view an event log for the activity of your session). Another way to check if the cluster is open is by using: spark_connection_is_open(sc). This should return “TRUE” if the connection is open.

Adding and manipulating data via the connection

Now that you have a connection established some data can be added to the spark cluster:

spark_flights becomes an object in the local environment but is really just a reference to the data in the spark cluster. Click on the Connections tab and you should see that “my_flights” is now a data frame stored in the cluster. The Spark UI page which opened in your browser will also now show some of the changes you have made. Click the Storage tab in the UI and you should see the data frame.

When manipulating the data the reference to the data frame within the local environment can be treated as if the data was stored locally. One key difference is that the creation of new data frames is delayed until the last possible minute. The following example groups flights from the nycflights13 data frame flights and calculated the average delay based on destination. Notice that the real computation happens only once the “average_delay” data frame is printed, the first command simply creates a reference in the local environment in which is saved your intended action. Also notice the “lazy” approach which occurs with sparklyr in which the total number of rows is not returned and is replaced by “… with more rows”. If the full number of rows is then desired the collect function can be used:

Caching data

Have a look at the Spark UI and check out the SQL tab. Click on one of the queries (highlight in blue) to get a breakdown of the components for each task. Notice the difference between the query in which collect() was used, it takes a lot longer to execute than the “lazy” query which sparklyr uses by default. This is really useful if you want to leave the “heavy lifting” of data transformation right until the end but if you then want to use an intermediate data frame for several further transformations (this could be sorting destinations based on average delay, only looking at destinations where the average departure time was early etc.) then it might be useful to cache the data within the cluster so that the data is transformed only once. The downside to this approach may be additional memory requirements. The following code using compute() will cache the intermediate data frame:

Now you should be able to see the “sub_flights” data frame in the Connections tab, the Storage tab of the Spark UI and the SQL code generated in the SQL tab of the UI. The cached_flights reference should also appear in the Environment tab in RStudio.

Some extra functions

As well as working through dplyr and dbplyr, sparkylr also comes with its own functions for data analysis and transformation which may be useful particularly when setting up pipelines you plan to execute later. A couple of useful examples are the ft_binnarizer and ft_bucketizer commands which I demonstrate determining destinations which are on average over 10 minutes delayed and then demonstrate grouping by distance:

These functions can be combined with others such as sdf_partition, sdf_pivot and sdf_register to prepare a data set for predictive modelling. Sparklyr has its own inbuilt functions for logistic regression (ml_logistic_regression), predictive modelling (sdf_predict)  and even some dedicated natural language processing techniques (ft_tokenizer, ft_stop_words_remover).

To finish the session close down the connection using:

The connection should now be terminated meaning the Spark UI will no longer be accessible and the connections tab has changed. Should you wish to work with any data frames or aggregated results following the disconnect then make sure to use collect() and create a new object before disconnecting.

Leave a Reply

Your email address will not be published. Required fields are marked *