It’s now a week since I returned from RStudio::conf 2019 in Austin, Texas and in this blog I’m going to focus using the sparklyr package (spark-lee-r) which enables R to connect to an Apache Spark instance for general purpose cluster-computing. sparklyr has its own inbuilt functions as well as allowing dbplyr to do all of the amazing features I described in my first blog post: https://surgicalinformatics.org/quick-take-aways-from-rstudioconf-training-day-02/. The code contained in this blog should work on your own local RStudio without any preconfigured cluster should you wish to experiment with sparklyr’s capabilities.
Establishing a connection
The following example code will help set up a local connection in order to experiment with some of the functionality of the dbplyr package. This is really useful if you are waiting for data or access to a database so you can have pre-prepared scripts in progress without the remote database connection.
1 2 3 4 5 6 |
library(tidyverse) library(dbplyr) library(sparklyr) library(nycflights13) sc <- spark_connect(master = "local") |
The connection is typically stored as “sc” which you can also see in the Environment. This is the object that is referenced each time data is accessed in the spark cluster.
To check that the new connection to a spark instance has been
established go to the connections tab in your RStudio interface to see if the
connection has been established (this is typically located alongside your
“Environment” and “History” tabs. Click on the Spark UI button to view the user
interface for the current spark session in a browser (this will be helpful
later if you want to view an event log for the activity of your session).
Another way to check if the cluster is open is by using:
spark_connection_is_open(sc)
. This should return “TRUE” if the connection is
open.
Adding and manipulating data via the connection
Now that you have a connection established some data can be added to the spark cluster:
1 |
spark_flights <- sdf_copy_to(sc, flights, "my_flights") |
spark_flights becomes an object in the local environment but is really just a reference to the data in the spark cluster. Click on the Connections tab and you should see that “my_flights” is now a data frame stored in the cluster. The Spark UI page which opened in your browser will also now show some of the changes you have made. Click the Storage tab in the UI and you should see the data frame.
When manipulating the data the reference to the data frame within the local environment can be treated as if the data was stored locally. One key difference is that the creation of new data frames is delayed until the last possible minute. The following example groups flights from the nycflights13 data frame flights and calculated the average delay based on destination. Notice that the real computation happens only once the “average_delay” data frame is printed, the first command simply creates a reference in the local environment in which is saved your intended action. Also notice the “lazy” approach which occurs with sparklyr in which the total number of rows is not returned and is replaced by “… with more rows”. If the full number of rows is then desired the collect function can be used:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
spark_flights %>% group_by(dest) %>% summarise(avg_delay = mean(arr_delay, na.rm = TRUE)) -> average_delay average_delay # # Source: spark<?> [?? x 2] # dest avg_delay # <chr> <dbl> # 1 IAH 4.24 # 2 PBI 8.56 # 3 BOS 2.91 # 4 CLT 7.36 # 5 SNA -7.87 # 6 XNA 7.47 # 7 SYR 8.90 # 8 JAX 11.8 # 9 CHS 10.6 # 10 MEM 10.6 # # ... with more rows average_delay %>% collect() # # A tibble: 105 x 2 # dest avg_delay # <chr> <dbl> # 1 IAH 4.24 # 2 PBI 8.56 # 3 BOS 2.91 # 4 CLT 7.36 # 5 SNA -7.87 # 6 XNA 7.47 # 7 SYR 8.90 # 8 JAX 11.8 # 9 CHS 10.6 # 10 MEM 10.6 # # ... with 95 more rows |
Caching data
Have a look at the Spark UI and check out the SQL tab. Click on
one of the queries (highlight in blue) to get a breakdown of the components for
each task. Notice the difference between the query in which collect()
was
used, it takes a lot longer to execute than the “lazy” query which sparklyr uses
by default. This is really useful if you want to leave the “heavy lifting” of
data transformation right until the end but if you then want to use an
intermediate data frame for several further transformations (this could be
sorting destinations based on average delay, only looking at destinations where
the average departure time was early etc.) then it might be useful to cache the
data within the cluster so that the data is transformed only once. The downside
to this approach may be additional memory requirements. The following code
using compute()
will cache the intermediate data frame:
1 2 3 4 5 6 7 8 |
cached_flights <- spark_flights %>% group_by(dest) %>% summarise(avg_delay = mean(arr_delay, na.rm = TRUE)) %>% compute("sub_flights") #If you have already created the “average_delay” reference then you #could also run: #cached_flights <- compute(average_delay, “sub_flights”) |
Now you should be able to see the “sub_flights” data frame in the Connections
tab, the Storage tab of the Spark UI and the SQL code generated in the SQL tab
of the UI. The cached_flights
reference should also appear in the Environment
tab in RStudio.
Some extra functions
As well as working through dplyr and dbplyr, sparkylr also comes
with its own functions for data analysis and transformation which may be useful
particularly when setting up pipelines you plan to execute later. A couple of
useful examples are the ft_binnarizer
and ft_bucketizer
commands which I
demonstrate determining destinations which are on average over 10 minutes
delayed and then demonstrate grouping by distance:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
cached_flights %>% ft_binarizer( input_col = "avg_delay", output_col = "delayed", threshold = 15) %>% select(avg_delay, delayed) %>% head(100) # # Source: spark<?> [?? x 2] # avg_delay delayed # <dbl> <dbl> # 1 4.24 0 # 2 8.56 0 # 3 2.91 0 # 4 7.36 0 # 5 -7.87 0 # 6 7.47 0 # 7 8.90 0 # 8 11.8 1 # 9 10.6 1 # 10 10.6 1 # # ... with more rows |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
spark_flights %>% filter(!is.na(arr_delay)) %>% ft_bucketizer( input_col = "distance", output_col = "distance_grouping", splits = c(0, 500, 1000, 1500, 2000, 2500, 6000)) %>% select(distance, distance_grouping) %>% head(100) # # Source: spark<?> [?? x 2] # distance distance_grouping # <dbl> <dbl> # 1 1400 2 # 2 1416 2 # 3 1089 2 # 4 1576 3 # 5 762 1 # 6 719 1 # 7 1065 2 # 8 229 0 # 9 944 1 # 10 733 1 # # ... with more rows |
These functions can be combined with others such as
sdf_partition
, sdf_pivot
and sdf_register
to prepare a data set for predictive
modelling. Sparklyr has its own inbuilt functions for logistic regression
(ml_logistic_regression
), predictive modelling (sdf_predict
) and even some dedicated natural language
processing techniques (ft_tokenizer
, ft_stop_words_remover
).
To finish the session close down the connection using:
1 |
spark_disconnect(sc) |
The connection should now be terminated meaning the Spark UI will
no longer be accessible and the connections tab has changed. Should you wish to
work with any data frames or aggregated results following the disconnect then
make sure to use collect()
and create a new object before disconnecting.