Boost Your Analytics with Machine Learning and Advanced Data Preparation

Enterprises can now harness the power of Apache Spark to quickly and easily prepare data and build Machine Learning (ML) models directly against that data in Snowflake. Snowflake and Qubole make it easy to get started by embedding required drivers, securing credentials, simplifying connection setup and optimizing in-database processing. Customers can focus on getting started quickly with their data preparation and ML initiatives instead of worrying about complex integrations and the cost of moving large data sets.

Setting up the Snowflake connection and getting started takes only a few minutes. Customers first create a Snowflake data store in Qubole and enter details for their Snowflake data warehouse. All drivers and packages are preloaded and kept up to date, eliminating manual bootstrapping of jars into the Spark cluster. There is no further configuration or tuning required and there are no added costs for the integration. Once the connection is saved, customers can browse their snowflake tables, view metadata and see a preview of the Snowflake data all from the Qubole interface. They can then use Zeppelin notebooks to get started reading and writing data to Snowflake as they begin exploring data preparation and ML use cases.

Below is an example of the object browser view showing the available tables and properties:

Security is also handled seamlessly so customers can focus on getting started with their data, without the worry of over-protecting their credentials. Qubole provides centralized and secure credential management which eliminates the need to specify any credentials in plain text. Username and password are entered only when setting up the data store, but are otherwise inaccessible.

The solution is also designed for enterprise requirements and allows customers to use federated authentication and SSO via the embedded Snowflake drivers. With SSO enabled, customers can authenticate through an external, SAML 2.0-compliant identity provider (IdP) and achieve a higher level of security and simplicity. These capabilities help customers more easily share notebooks and collaborate on projects with little risk of sensitive information being exposed.

Below is a sample Scala program showing how to read from Snowflake using the data store object without specifying any credentials in plain text:

Beyond the simplicity and security of the integration, which helps customers get started quickly, customers will also benefit from a highly optimized Spark integration that uses Snowflake query pushdown to balance the query-processing power of Snowflake with the computational capabilities of Apache Spark. From simple projection and filter operations to advanced operations such as joins, aggregations and even scalar SQL functions, query pushdown runs these operations in Snowflake (where the data resides) to help refine and pre-filter the data before it is read into Spark. The traditional performance and cost challenges associated with moving large amounts data for processing are thereby eliminated without additional overhead or management.

With Snowflake and Qubole, customers get an optimized platform for advanced data preparation and ML that makes it simple to get started. Customers can complement their existing cloud data warehouse strategy or get more value out of ML initiatives by opening access to more data. To learn more about ML and advanced data preparation with Qubole, visit the Qubole blog.

Try Snowflake for free. Sign up and receive $400 US dollars worth of free usage. You can create a sandbox or launch a production implementation from the same Snowflake environment.

Quickstart Guide for Sagemaker + Snowflake (Part One)

Machine Learning (ML) and predictive analytics are quickly becoming irreplaceable tools for small startups and large enterprises. The questions that ML can answer are boundless. For example, you might want to ask, “What job would appeal to someone based on their  interests or the interests of jobseekers like them?” Or, “Is this attempt to access the network an indication of an intruder?” Or, “What type of credit card usage indicates fraud?”

Setting up a machine learning environment, in particular for on-premise infrastructure, has its challenges. An infrastructure team must request physical and/or virtual machines and then build and integrate those resources. This approach is both time-consuming and error prone due to the number of manual steps involved. It may work in a small environment, but the task becomes exponentially more complicated and impractical at scale.

There are many different ML systems to choose from, including  TensorFlow, XGBoost, Spark ML and MXNet, to name a few. They all come with their own installation guides, system requirements and dependencies. What’s more, implementation is just the first step. The next challenge is figuring out how to make the output from the machine learning step (e.g. a model) available for consumption. Then, all of the components for building a model within the machine learning tier and the access to the model in the API tier need to scale to provide predictions in real-time. Last but not least, the team needs to figure out where to store all the data needed to build the model.

Managing this whole process from end-to-end becomes significantly easier when using cloud-based technologies. The ability to provision infrastructure on demand (IaaS) solves the problem of manually requesting virtual machines. It also provides immediate access to compute resources whenever they are needed. But that still leaves the administrative overhead of managing the ML software and the platform to store and manage the data.   

At last year’s AWS developer conference, AWS announced Sagemaker, a “Fully managed end-to-end machine learning service that enables data scientists, developers and machine learning experts to quickly build, train and host machine learning models at scale.”

Sagemaker can access data from many different sources (specifically the underlying kernels like Python, PySpark, Spark and R), and access data provided by Snowflake.  Storing data in Snowflake also has significant advantages.

Single source of truth

If data is stored in multiple locations, inevitably those locations will get out of sync. Even if data is supposed to be immutable, often one location is modified to fix a problem for one system while other locations are not. In contrast, if data is stored in one central, enterprise-grade, scalable repository, it serves as a “single source of truth” because keeping data in sync is made easy. Different tools are not required for structured and semi-structured data. Data can be modified transactionally, which immediately reduces the risk of problems.

Shorten the data preparation cycle

According to a study published in Forbes, data preparation accounts for about 80% of the work performed by data scientists. Shortening the data preparation cycle therefore has a major impact on the overall efficiency of data scientists.

Snowflake is uniquely positioned to shorten the data preparation cycle due to its excellent support for both structured and semi-structured data into language, SQL. This means that semi-structured data and structured data can be seamlessly parsed, combined, joined and modified through SQL statements in set-based operations. This enables data scientists to use the power of a full SQL engine for rapid data cleansing and preparation.

Scale as you go

Another problem that ML implementers frequently encounter is what we at Snowflake call “works on my machine” syndrome. Small datasets easily work on a local machine but when migrated to production dataset size, reading all the data into a single machine doesn’t scale, or it may behave unexpectedly. Even if it does finish the job, it can take hours to load a terabyte-sized dataset. In Snowflake there is no infrastructure that needs to be provisioned, and Snowflake’s elasticity feature allows you to scale horizontally as well as vertically, all with the push of a button.

Connecting Sagemaker and Snowflake

Sagemaker and Snowflake both utilize cloud infrastructure as a service offerings by AWS, which enables us to build the Infrastructure when we need it, where we need it (geographically) and at any scale required.

Since building the services becomes simplified with Sagemaker and Snowflake, the question becomes how to connect the two services. And that’s exactly the subject of the following parts of this post. How do you get started? What additional configuration do you need in AWS for security and networking? How do you store credentials?

In part two of this four-part blog, I’ll explain how to build a Sagemaker ML environment in AWS from scratch. In the third post, I will put it all together and show you how to connect a Jupyter Notebook to Snowflake via the Snowflake Python connector.  With the Python connector, you can import data from Snowflake into a Jupyter Notebook. Once connected, you can begin to explore data, run statistical analysis, visualize the data and call the Sagemaker ML interfaces.

However, to perform any analysis at scale, you really don’t want to use a single server setup like Jupyter running a python kernel. Jupyter running a PySpark kernel against a Spark cluster on EMR is a much better solution for that use case. So, in part four of this series I’ll connect a Jupyter Notebook to a local Spark instance and an EMR cluster using the Snowflake Spark connector.

The Snowflake difference

Snowflake is the only data warehouse built for the cloud. Snowflake delivers the performance, concurrency and simplicity needed to store and analyze all data available to an organization in one location. Snowflake’s technology combines the power of data warehousing, the flexibility of big data platforms, the elasticity of the cloud, and live data sharing at a fraction of the cost of traditional solutions. Snowflake: Your data, no limits.

You can review the entire blog series here: Part One > Part Two > Part Three > Part Four.  

Snowflake and Spark, Part 1: Why Spark?

This is the first post in a 2-part series describing Snowflake’s integration with Spark. In this post, we introduce the Snowflake Connector for Spark (package available from Maven Central or Spark Packages, source code in Github) and make the case for using it to bring Spark and Snowflake together to power your data-driven solutions.

What is Spark?

Apache Spark is a distributed data processing system with support for functional, declarative and imperative programming styles. Its popularity and power lie in its myriad of programming paradigms, supported APIs (Scala, R, and Python), machine-learning libraries, and tight integration with the Hadoop ecosystem. Spark also provides connectivity to a wide variety of data sources, including SQL-based relational database systems and NoSQL systems. As a result, Spark has become the tool of choice for data engineering tasks.

With the introduction of the Snowflake Connector for Spark in June 2016, Snowflake enabled connectivity to and from Spark. The connector provides the Spark ecosystem with access to Snowflake as a fully-managed and governed repository for all data types, including JSON, Avro, CSV, XML, machine-born data, etc. The connector also enables powerful use cases that integrate Spark and Snowflake, including:

  • Complex ETL: Using Spark, you can easily build complex, functionally rich and highly scalable data ingestion pipelines for Snowflake. With a large set of readily-available connectors to diverse data sources, Spark facilitates data extraction, which is typically the first part in any complex ETL pipeline. Spark also helps with computationally-involved tasks for data transformation such as sessionization, data cleansing, data consolidation, and data unification, which usually happens at later stages in the ETL pipeline. Using the Snowflake Connector for Spark, the data produced by these complex ETL pipelines can now easily be stored in Snowflake for broad, self-service access across the organization using standard SQL and SQL tools.
  • Machine Learning: Spark provides a rich ecosystem for machine learning and predictive analytics functionality, e.g. the popular machine learning library, MLlib. With the integration between Spark and Snowflake, Snowflake provides you with an elastic, scalable repository for all the data underlying your algorithm training and testing. With machine learning, processing capacity needs can fluctuate heavily. Snowflake can easily expand its compute capacity to allow your machine learning in Spark to process vast amounts of data.

Enabling Spark in AWS EMR with Snowflake

With the Snowflake Connector for Spark, you can use Spark clusters, e.g. in AWS EMR or Data Bricks, and connect them easily with Snowflake. For example, you can create an EMR cluster with Spark pre-installed when selecting Spark as the application. Before we dive into the details of using Snowflake with Spark, the following code samples illustrate how to create and connect to a Spark cluster in AWS EMR and start a spark-shell using the connector:

STEP 1: Create a Spark cluster in AWS EMR 5.4.0 with Spark 2.1 using the AWS CLI. For example, in US-West-2:

aws emr create-cluster \
   --applications Name=Ganglia Name=Spark \
   --ec2-attributes '{"KeyName":"","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":""}' \
   --service-role EMR_DefaultRole \
   --enable-debugging \
   --release-label emr-5.4.0 \
   --log-uri 's3n:///elasticmapreduce/' \
   --name '' \
   --instance-groups \
      '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master Instance Group"}, \
      {"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core Instance Group"}]' \
   --configurations '[{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"},"Configurations":[]}]' \
   --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
   --region us-west-2

STEP 2: Connect to the cluster using ssh:

ssh i ~/.pem hadoop@.compute.amazonaws.com 

STEP 3: Start spark-shell with the Snowflake connector packages. Alternatively, you can also pre-load the packages using the packages option when creating the cluster. For example:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3

Where:

  • spark-snowflake_2.11 specifies the connector artifact ID (for Scala 2.11).
  • 2.1.3 specifies the connector version. Note that this version is for Spark 2.1. For Spark 2.0, use 2.1.3-spark_2.0 instead.

Also, note that, if you are not running from an EMR cluster, you need to add the package for AWS support to the packages list. For instance, when you run spark-shell from a local installation, your packages list will look like this:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3,org.apache.hadoop:hadoop-aws:2.8.0

STEP 4: In spark-shell, you then need to define which Snowflake database and virtual warehouse to use. The connector also needs access to a staging area in AWS S3 which needs to be defined. You can do that with the following Scala commands in spark-shell:

// Configuration of the staging area for the connector in AWS S3 
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "")

// Snowflake instance parameters
val defaultOptions = Map(
  "sfURL" -> "",
  "sfAccount" > "",
  "sfUser" -> "",
  "sfPassword" -> "",
  "sfDatabase" -> "",
  "sfSchema" -> "public",
  "sfWarehouse" -> "",
  "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
  "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
  "tempdir" -> "s3n:///"
)

With these steps, you now have a Spark cluster and a spark-shell running in AWS EMR that you can use to showcase some of the most common use cases, such as complex ETL and machine learning.

Overview of Loading Data from Spark into Snowflake

Before we get into our discussion of ETL using Spark and Snowflake, let’s first take a look at what happens behind the scenes when the Snowflake Spark connector persists Spark data frames in Snowflake.

For data loading operations, the Spark connector for Snowflake performs three separate steps, similar to the spark-redshift connector:

  1. The first step persists the contents of the data frame in the staging area in AWS S3. This step currently uses the S3 bucket provided during the configuration of the Spark connector (as described in the Snowflake documentation). Future versions of the connector will not require an S3 bucket.
  2. The second step then connects to Snowflake from Spark using JDBC. On that JDBC connection, it issues a COPY command to load the data.
  3. As the third and final step, the COPY command retrieves the data from the staging area in S3 and using the current virtual warehouse to load it into tables in the Snowflake database.

Note that, in the picture above, the slave nodes in the Spark cluster and the compute nodes in Snowflake (i.e. the virtual warehouse) process the data flow. This approach allows for much greater scale than a more conventional approach where the data flow goes through the JDBC connection. Here you can scale the Spark cluster or the Snowflake virtual warehouse independently to increase data transfer bandwidth between Spark and Snowflake while your bandwidth will always be limited to a single JDBC connection.

Complex ETL in Spark with Snowflake

Today, data engineers need to continuously extract and transform data from various sources before it can be made available for consumption by business users through querying and analytics. Given the volume of data that today’s organizations face, a significant amount of compute and storage capacity needs to be made available to perform these tasks in a timely and scalable fashion. Spark has become the tool of choice for many data engineers to implement the computation steps along their data processing pipelines. This is due to the high efficiency of Spark with its in-memory execution capabilities, the availability of libraries from the Spark ecosystem, and the ease of development with languages such as Scala and Python.

The following example illustrates how Spark can be used to implement a simple data ingestion pipeline that performs several transformations on the new data before storing it in Snowflake. The example uses a web log scenario. Assume that new data is read from a web server log file, in this case using the Apache web log format. Log lines are made available as a list in Scala. The sample parses the IP addresses from the log lines and transforms them into ZIP codes using REST calls to the FreeGeoIP web service. The resulting list of ZIP codes is then stored in Snowflake.

// *************************************
// Geo mapping of Apache web log lines 
// *************************************
import scala.util.parsing.json.JSON._
import scala.io.Source.{fromInputStream}
import java.net._

// Example data: Apache web server log lines
var weblog: List[String] = List(
 """8.8.8.8 - - [07/Mar/2017:00:05:49 -0800] "GET /foo/index.html HTTP/1.1" 401 12846""",
 """8.8.8.8 - - [07/Mar/2017:00:06:51 -0800] "GET /bar/index.html HTTP/1.1" 200 4523""")
// Function to parse web log line into a list of zip codes
def ParseLogLine (strLine : String) : Integer = {
 var Array(ip, d1, d2, dt, tz, request, url, proto, ret, size) = strLine.split(" ")

 // REST call to URL
 var url2 = "http://freegeoip.net/json/".concat(s"$ip")
 var result2 = scala.io.Source.fromURL(url2).mkString

 // Parse into JSON
 var json: Option[Any] = parseFull(result2)
 var map: Map[String,Any] = json.get.asInstanceOf[Map[String, Any]]
 var zip: String = map("zip_code").asInstanceOf[String]

var intZip: Integer = zip.trim.mkString.toInt 

return intZip
}
// Parse web log
var zips: List[Integer] = weblog.map { line => ParseLogLine(line)
}
// Let's take a look at the zip codes we found in the log
zips.foreach(println(_))

// Get a data frame for the zip codes list
var df = zips.toDF()

// Push the list/df content to Snowflake as a new table
import org.apache.spark.sql.SaveMode
df.write.format("net.snowflake.spark.snowflake").options(defaultOptions).option("dbtable", "zip_codes").mode(SaveMode.Overwrite).save()

Now we have the zip codes in Snowflake and can start using them in Snowflake queries and BI tools that connect to Snowflake.

Machine Learning in Spark with Snowflake Connectivity

Part of Spark’s appeal is how easy it is to use machine learning capabilities over the data that has been made available to Spark. For example, MLlib, a popular library for machine learning, comes as part of the standard Spark configuration. With these machine learning capabilities in hand, organizations can easily gain new insights and business value from the data that they acquire.

Expanding on our previous web log example, you may wonder what zip codes or broader geographical areas the requests in the web server logs are coming from. The following Scala code illustrates how to retrieve a query in Snowflake and apply machine learning functions to the query:

// *************************************
// Retrieve zip codes stored in Snowflake into Spark
// *************************************

// Function to retrieve a Snowflake query result into a Spark data frame
def snowflakedf(sql: String) = {
 spark.read
 .format("net.snowflake.spark.snowflake")
 .options(defaultOptions)
 .option("query", sql)
 .load()
}

// Snowflake SQL query to retrieve all ZIP codes
val df2 = snowflakedf("SELECT * FROM zip_codes")

// *************************************
// Machine learning over zip codes stored in Snowflake 
// *************************************
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Convert DF with SQL result into Vectors for the MLlib API
var vectors = df2.rdd.map(r => Vectors.dense(r.getDecimal(0).doubleValue()))
vectors.cache

var numClusters = 2
var numIterations = 20
var clusters = KMeans.train(vectors, numClusters, numIterations)
clusters.clusterCenters

You can now use snowflakedf(.) to define Spark data frames that are populated with data from the Snowflake query.

Summary and Next Steps

As we’ve illustrated in this post, Spark is a powerful tool for data wrangling. Its rich ecosystem provides compelling capabilities for complex ETL and machine learning. With the deep integration into Spark provided by the connector, Snowflake can now serve as the fully-managed and governed database for all your Spark data, including traditional relational data, JSON, Avro, CSV, XML, machine-born data, etc. This makes Snowflake your repository of choice in any Spark-powered solution.

So what’s next? We encourage you to try Snowflake and its integration with Spark in your data processing solutions today:

Also, are you interested in helping design and build the next-generation Spark-Snowflake integration? If so, we invite you to take a look at the open Engineering positions on our careers page.

In part 2 of this series, we’ll take a look behind the scenes to better understand how the Spark connector processes queries that retrieve data from Snowflake into Spark and how this can be used to enable high-performance ELT solutions using Spark and Snowflake. In the meantime, keep an eye on this blog or follow us on Twitter (@snowflakedb) to keep up with all the news and happenings here at Snowflake Computing.