New Snowflake features released in Q1’17

We recently celebrated an important milestone in reaching 500+ customers since Snowflake became generally available in June 2015. As companies of all sizes increasingly adopt Snowflake, we wanted to look back and provide an overview of the major new Snowflake features we released during Q1 of this year, and highlight the value these features provide for our customers.

Expanding global reach and simplifying on-boarding experience

Giving our customers freedom of choice, along with a simple, secure, and guided “Getting Started” experience, was a major focus of the last quarter.

  • We added a new region outside of the US; customers now have the option to analyze and store their data in Snowflake accounts deployed in EU-Frankfurt. Choosing the appropriate region is integrated into our self-service portal when new customers sign up.
  • In addition, we added our high-value product editions, Enterprise and Enterprise for Sensitive Data (ESD), to our self-service offerings across all available regions. For example, with Enterprise, customers can quickly implement auto-scale mode for multi-cluster warehouses to support varying, high concurrency workloads. And customers requiring HIPAA compliance can choose ESD.
  • Exploring other venues for enabling enterprises to get started quickly with Snowflake, we partnered with the AWS Marketplace team to include our on-demand Snowflake offerings, including the EU-Frankfurt option, in their newly-launched SaaS subscriptions.

Improving out-of-the-box performance & SQL coverage

We are committed to building the fastest cloud DW for your concurrent workloads with the SQL you love.

  • One key performance improvement introduced this quarter was the reduction of compilation times for JSON data. Internal TPC-DS tests demonstrate a reduction between 30-60% for most of the TPC-DS queries (single stream on a single, 100TB JSON table). In parallel, we worked on improving query compile time in general, providing up to a 50% improvement in performance for short queries.
  • Another new key capability is the support for bulk data inserts on a table concurrently with other DML operations (e.g. DELETE, UPDATE, MERGE). By introducing more fine-grained locking at the micro-partition level, we are able to allow concurrent DML statements on the same table.
  • To improve our data clustering feature (currently in preview), we added support for specifying expressions on table columns in clustering keys. This enables more fine-grained control over the data in the columns used for clustering.
  • Also, we reduced the startup time for virtual warehouses (up to XL in size) to a few seconds, ensuring almost instantaneous provisioning for most virtual warehouses.
  • We extended our SQL by adding support for the ANSI SQL TABLESAMPLE clause. This is useful when a user wants to limit a query operation performed on a table to only a random subset of rows from the table.

Staying Ahead with Enterprise-ready Security

From day one, security has always been core to Snowflake’s design.

  • We expanded Snowflake’s federated authentication and single sign-on capability by integrating with many of the most popular SAML 2.0-compliant identity providers. Now, in addition to Okta, Snowflake now supports ADFS/AD, Azure AD, Centrify, and OneLogin, to name just a few.
  • To advance Snowflake’s built-in auditing, we introduced new Information Schema table functions (LOGIN_HISTORY and LOGIN_HISTORY_BY_USER) that users can query to retrieve the short-term history of all successful and failed login requests in the previous 7 days. If required, users can maintain a long-term history by copying the output from these functions into regular SQL tables.

Improving our ecosystem

Enabling developers and builders to create applications with their favorite tools and languages remains a high priority for us.

  • With respect to enterprise-class ETL, we successfully collaborated with Talend in building a native Snowflake connector based on Talend’s new and modern connector SDK. The connector, currently in preview, has already been deployed by a number of joint customers with great initial feedback on performance and ease-of-use.
  • To tighten the integration of our Snowflake service with platforms suited for machine learning and advanced data transformations, we released a new version of our Snowflake Connector for Spark, drastically improving performance by pushing more query operations, including JOINs and various aggregation functions, down to Snowflake. Our internal 10 TB TPC-DS performance benchmark tests demonstrate that running TPC-DS queries using this new v2 Spark connector is up to 70% faster compared to executing SQL in Spark with Parquet or CSV (see this Blog post for details).
  • We continue to improve our drivers for our developer community. Listening to feedback from our large Python developer community, we worked on a new version of Snowflake’s native Python client driver, resulting in up to 40% performance improvements when fetching result sets from Snowflake. And, after we open-sourced our JDBC driver last quarter, we have now made the entire source code available on our official GitHub repository.
  • And, last, but not least, to enhance our parallel data loading via the COPY command, ETL developers can now dynamically add file metadata information, such as the actual file name and row number, which might not be part of the initial payload.

Increasing transparency and usability

These features are designed to strike the right balance between offering a service that is easy to operate and exposing actionable insights into the running service.

  • One major addition to our service is Query Profile, now general available and fully integrated into Snowflake’s web interface. Query Profile is a graphical tool you can use to detect performance bottlenecks and areas for improving query performance.
  • Various UI enhancements were implemented: Snowflake’s History page now supports additional filtering by the actual SQL text and query identifier. We also added UI support for creating a Parquet file format in preparation for loading Parquet data into variant-type table columns in Snowflake.
  • A new Information Schema table function (TABLE_STORAGE_METRICS) exposes information about the data storage for individual tables. In particular, a user can now better understand how tables are impacted by Continuous Data Protection, particularly Time Travel and Fail-safe retention periods, as well as which tables contain cloned data.
  • We also recently introduced smarter virtual warehouse billing through Warehouse Billing Continuation (see this Blog post for details). If a warehouse is suspended and resumed within 60 minutes of the last charge, we do not charge again for the servers in the warehouse. WBC eliminates additional credit charges, and we hope it will reduce the need for our customers to strictly monitor and control when warehouses are suspended and resized.

Scaling and investing in service robustness

These service enhancements aren’t customer visible, but are crucial for scaling to meet the demands of our rapidly growing base of customers.

  • As part of rolling out the new EU (Frankfurt) region, we increased automation of our internal deployment procedures to (a) further improve engineering efficiency while (b) laying the foundation for rapidly adding new regions based on customer feedback.
  • We further streamlined and strengthened our various internal testing and pre-release activities, allowing us to ship new features to our customers on a weekly basis – all in a fully transparent fashion with no downtown or impact to users.

Conclusion and Acknowledgements

This summary list of features delivered in Q1 highlights the high velocity and broad range of features the Snowflake Engineering Team has successfully delivered in a short period of time. We are committed to putting our customers first and maintaining this steady pace of shipping enterprise-ready features each quarter. Stay tuned for another feature-rich Q2.

For more information, please feel free to reach out to us at info@snowflake.net. We would love to help you on your journey to the cloud. And keep an eye on this blog or follow us on Twitter (@snowflakedb) to keep up with all the news and happenings here at Snowflake Computing.

Snowflake and Spark, Part 2: Pushing Spark Query Processing to Snowflake

Welcome to the second post in our ongoing Blog series describing Snowflake’s integration with Spark. In Part 1, we discussed the value of using Spark and Snowflake together to power an integrated data processing platform, with a particular focus on ETL scenarios.

In this post, we change perspective and focus on performing some of the more resource-intensive processing in Snowflake instead of Spark, which results in significant performance improvements. As part of this, we walk you through the details of Snowflake’s ability to push query processing down from Spark into Snowflake. We also touch on how this pushdown can help you transition from a traditional ETL process to a more flexible and powerful ELT model.

Query pushdown is supported with v2.1 (and later) of the Snowflake Connector for Spark. As you explore the capabilities of the connector, make sure you are using the latest version, available from Spark Packages or Maven Central (source code in Github).

Overview of Querying in Spark with Snowflake

Before we get into the specifics of query pushdown, let’s review the basic query flow between Spark and Snowflake. Processing queries with the Snowflake Connector for Spark involves the same steps as data loading (as discussed in Part 1 of this Blog series), but in a slightly different order:

  1. The Spark driver sends the SQL query to Snowflake using a Snowflake JDBC connection.
  2. Snowflake uses a virtual warehouse to process the query and copies the query result into AWS S3.
  3. The connector retrieves the data from S3 and populates it into DataFrames in Spark.
Query flow from Spark to Snowflake
Figure 1: Query flow from Spark to Snowflake

As with data loading (discussed in Part 1), note how the Snowflake worker nodes (i.e. servers in the virtual warehouse) perform all the heavy processing for the data egress, and the slave nodes in the Spark cluster perform the data ingress. This allows you to size your Snowflake virtual warehouse and Spark clusters to balance compute capacity and IO bandwidth against S3 for optimal performance. Assuming unbounded ingress and egress capacity on S3, this approach gives you virtually unlimited capacity for transferring data back and forth between Spark and Snowflake by simply scaling both clusters to the levels that your workload requires.

Highly Optimized Performance through Query Pushdown

In earlier versions of the Spark connector, Spark’s PrunedFilteredScan interface allowed simple projection and filter operations (e.g. .select(.) and .filter(.) in Scala) to be translated and pushed to Snowflake, instead of being processed in Spark. These Snowflake optimizations were helpful in many situations; however, other operations, such as joins, aggregations, and even scalar SQL functions, could only be performed in Spark. This approach is typically not ideal for more capable Spark data sources, such as Snowflake, which can perform these functions more efficiently.

Starting with v2.1, the connector introduces advanced optimization capabilities for better performance. At the heart of the performance optimizations is the ability to push down queries to Snowflake. With Snowflake as the data source for Spark, v2.1 of the connector can push large and complex Spark logical plans (in their entirety or in parts) to be processed in Snowflake, thus enabling Snowflake to do more of the work and leverage its performance efficiencies. This capability establishes a tight integration between the two systems and combines the powerful query-processing of Snowflake with the computational capabilities of Apache Spark and its ecosystem.

Enabling Query Pushdown

Enable the query pushdown feature for the connector using the following static method call:

SnowflakeConnectorUtils.enablePushdownSession(spark)

Why Pushdown?

Users of both Snowflake and Spark may find that a large amount of the data they would like to use resides in Snowflake. A federated setup exists when two or more interconnected systems can process all or parts of a particular data task flow, leading to the common question of where different parts of the computation should occur. A common concern with federated setups is performance for processing large data sets. For the best performance, you typically want to avoid reading lots of data or transferring large intermediate results between the interconnected systems. Ideally, most of the processing happens close to where the data is stored to leverage any capabilities of the participating stores to dynamically eliminate data that is not needed.

Spark already supports a good set of functionality for relational data processing, as well as connectivity with a variety of data sources, including the columnar Parquet format. Snowflake, however, can achieve much better query performance via efficient pruning of data enabled through our micro-partition metadata tracking and clustering optimizations (see the Snowflake documentation for more details). This metadata allows Snowflake to scan data more efficiently when given query predicates by using aggregate information on micro-partitions, such as min and max values, since data that is determined not to contain relevant values can be skipped entirely. Additionally, metadata such as cardinality of column values (number of distinct values), allows Snowflake to better optimize for operations such as join ordering.

Given that filter, projection, join, and aggregation operations on data all have the potential to significantly reduce the result set of a given query, the data pruning used by Snowflake can and should be leveraged. This also has the benefit of reducing data that has to be transferred to Spark via S3 and the network, which in turn improves response times.

To support pushing more work to Snowflake, the Snowflake connector integrates deeply with the query plan generation process in Spark.

Query Plan Generation

To understand how query pushdown works, let’s take a look at the typical process flow of a Spark DataFrame query. Spark contains its own optimizer, Catalyst, that performs a set of source-agnostic optimizations on the logical plan of a DataFrame (predicate pushdowns, constant folding, etc.). DataFrames are executed lazily. This means Spark can evaluate and optimize relational operators applied to a DataFrame and only execute the DataFrame when an action is invoked. Consider the following expansion on our zip code example from Part 1 of the series:

val dfZipCodes = spark.read.format(...).option(...,...).load()
val dfFilteredZips = dfZipCodes.filter("zip_code < 98000")
val dfCities = dfFilteredZips.select(city)

The same example can also be expressed as:

dfZipCodes.createOrReplaceTempView("temp_zip_codes")
val dfSQLCities = spark.sql("SELECT city from temp_zip_codes WHERE zip_code < 98000")

In either case, Spark delays planning and executing the code until an action such as collect()show(), or count() occurs.

When an action is required, Spark’s optimizer, Catalyst, first produces an optimized logical plan. The process then moves to the physical planning stage. This is where Spark determines whether to push down a query to Snowflake, as shown in the following diagram:

Location of Snowflake alternative physical plan in Catalyst query plan
Figure 2: Location of Snowflake alternative physical plan in a Catalyst query plan
(based on an image originally published in this DataBricks blog post)

Structure of a Snowflake Plan

So, how does the connector allow query pushdown to happen? With query pushdown enabled, Catalyst inserts a Snowflake plan as one possible physical plan for Spark to choose based on cost, as illustrated in the diagram above.

Input: After passing through Catalyst, a DataFrame is represented as a logical plan tree, with nodes representing data sources and operators. For example, consider the following code:

val dfZips = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","zip_codes").load()
val dfMayors = spark.read.format("net.snowflake.spark.snowflake").option("dbtable","city_mayors").load()
val dfResult = dfZips.filter("zip_code > 98000").join(dfMayors.select($"first",$"last",$"city",$"city_id"), dfZip("city_id") === dfMayors("city_id"), "inner")

DataFrame dfResult may be internally represented by Spark in a data structure similar to the following:

Data structure representation of join on two tables with filtering and projection
Figure 3: Data structure representation of join on two tables with filtering and projection

The tree represents a join of two Snowflake tables, after applying a filter on the left-side relation (zip_codes table) and a projection on the right-side relation (city_mayors table) .

Translation: The connector traverses the above data structure and procedurally generates a Snowflake plan to execute it. In previous iterations of our connector, Spark performed the join on zip_codes and city_mayors. With the new feature enabled, however, the connector is able to verify that zip_codes and city_mayors are joinable relations within Snowflake and thus recognize that the join can be performed completely in Snowflake.

This same process can also be applied to SORT, GROUP BY, and LIMIT operations, and more.

Performance Results

Pushing queries down to Snowflake can greatly improve end-to-end performance. To illustrate this, we ran a suite of TPC-DS queries that mirror three different workloads in Cloudera’s Impala benchmarks:

  • Workload A (Interactive Queries)
  • Workload B (Reporting)
  • Workload C (Analytic Queries)

We compared the end-to-end performance between Snowflake and Spark using 10TB scale, with queries executed on a 3X-Large virtual warehouse (for Snowflake) and an equivalent 64-node C3.2XLarge EC2 cluster (for Spark). For each workload, we tested 3 different modes:

  • Spark-Snowflake Integration with Full Query Pushdown: Spark using the Snowflake connector with the new pushdown feature enabled.
  • Spark on S3 with Parquet Source (Snappy): Spark reading from S3 directly with data files formatted as Parquet and compressed with Snappy.
  • Spark on S3 with CSV Source (gzip): Spark reading from S3 directly with data files formatted as CSV and compressed with gzip.

The following 3 charts show the performance comparison (in seconds) for the TPC-DS queries in each workload. Note that the numbers for Spark-Snowflake with Pushdown represent the full round-trip times for Spark to Snowflake and back to Spark (via S3), as described in Figure 1:

  1. Spark planning + query translation.
  2. Snowflake query processing + unload to S3.
  3. Spark read from S3.

The scale for the charts is logarithmic to make reading easier.

Performance comparison between queries in Workload A with pushdown vs no pushdown
Figure 4: Performance comparison between queries in Workload A with pushdown vs no pushdown
Performance comparison between queries in Workload B with pushdown vs no pushdown
Figure 5: Performance comparison between queries in Workload B with pushdown vs no pushdown
Performance comparison between queries in Workload C with pushdown vs no pushdown
Figure 6: Performance comparison between queries in Workload C with pushdown vs no pushdown

As demonstrated, fully pushing query processing to Snowflake provides the most consistent and overall best performance, with Snowflake on average doing better than even native Spark-with-Parquet.

Note that the columnar format of Parquet is sometimes leveraged by Spark for efficient pruning of unneeded data, but Snowflake answers many of the more complex queries significantly faster than Spark-with-Parquet, e.g. queries 59 and 79 in Workload C (Analytics).

ETL vs ELT

With traditional ETL, most data transformation (filtering, sorting, etc.) typically takes place before loading to limit the data size and ensure optimal querying performance. Snowflake, with its low storage costs and powerful SQL capabilities, combined with the significant performance improvements provided by query pushdown, enables transitioning to a more modern and effective ELT model, in which you load all your data into Snowflake and then perform any data transformations directly in Snowflake. And all of this is accomplished without any changes to your familiar Spark programming experience using Scala, Python, or SparkSQL.

Summary and Next Steps

This second post in our Blog series about Spark and Snowflake showed how you can use the Snowflake Connector for Spark to realize significant performance improvements by pushing data processing from Spark into Snowflake. This makes Snowflake the data repository of choice for your ELT scenarios, even if you have existing code in Spark for your data ingress pipeline.

So what’s next? We are continuously looking for ways to improve the experience of working with both Spark and Snowflake. Currently, we are exploring removing the requirement for user-managed S3 buckets for data transfer between Spark and Snowflake, and using Snowflake internal stages instead. Keep an eye on this blog to learn more about our progress on this front.

In the meantime, we encourage you to try Snowflake integrated with Spark in your data processing solutions today:

Also, are you interested in helping design and build the next-generation Spark-Snowflake integration? If so, we invite you to take a look at the open Engineering positions on our careers page.

And, as always, you can follow us on Twitter (@snowflakedb) to keep up with all the latest news and happenings here at Snowflake Computing.

Snowflake and Spark, Part 1: Why Spark?

This is the first post in an ongoing series describing Snowflake’s integration with Spark. In this post, we introduce the Snowflake Connector for Spark (package available from Maven Central or Spark Packages, source code in Github) and make the case for using it to bring Spark and Snowflake together to power your data-driven solutions.

What is Spark?

Apache Spark is a distributed data processing system with support for functional, declarative and imperative programming styles. Its popularity and power lie in its myriad of programming paradigms, supported APIs (Scala, R, and Python), machine-learning libraries, and tight integration with the Hadoop ecosystem. Spark also provides connectivity to a wide variety of data sources, including SQL-based relational database systems and NoSQL systems. As a result, Spark has become the tool of choice for data engineering tasks.

With the introduction of the Snowflake Connector for Spark in June 2016, Snowflake enabled connectivity to and from Spark. The connector provides the Spark ecosystem with access to Snowflake as a fully-managed and governed repository for all data types, including JSON, Avro, CSV, XML, machine-born data, etc. The connector also enables powerful use cases that integrate Spark and Snowflake, including:

  • Complex ETL: Using Spark, you can easily build complex, functionally rich and highly scalable data ingestion pipelines for Snowflake. With a large set of readily-available connectors to diverse data sources, Spark facilitates data extraction, which is typically the first part in any complex ETL pipeline. Spark also helps with computationally-involved tasks for data transformation such as sessionization, data cleansing, data consolidation, and data unification, which usually happens at later stages in the ETL pipeline. Using the Snowflake Connector for Spark, the data produced by these complex ETL pipelines can now easily be stored in Snowflake for broad, self-service access across the organization using standard SQL and SQL tools.
  • Machine Learning: Spark provides a rich ecosystem for machine learning and predictive analytics functionality, e.g. the popular machine learning library, MLlib. With the integration between Spark and Snowflake, Snowflake provides you with an elastic, scalable repository for all the data underlying your algorithm training and testing. With machine learning, processing capacity needs can fluctuate heavily. Snowflake can easily expand its compute capacity to allow your machine learning in Spark to process vast amounts of data.

Enabling Spark in AWS EMR with Snowflake

With the Snowflake Connector for Spark, you can use Spark clusters, e.g. in AWS EMR or Data Bricks, and connect them easily with Snowflake. For example, you can create an EMR cluster with Spark pre-installed when selecting Spark as the application. Before we dive into the details of using Snowflake with Spark, the following code samples illustrate how to create and connect to a Spark cluster in AWS EMR and start a spark-shell using the connector:

STEP 1: Create a Spark cluster in AWS EMR 5.4.0 with Spark 2.1 using the AWS CLI. For example, in US-West-2:

aws emr create-cluster \
   --applications Name=Ganglia Name=Spark \
   --ec2-attributes '{"KeyName":"","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":""}' \
   --service-role EMR_DefaultRole \
   --enable-debugging \
   --release-label emr-5.4.0 \
   --log-uri 's3n:///elasticmapreduce/' \
   --name '' \
   --instance-groups \
      '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"m3.xlarge","Name":"Master Instance Group"}, \
      {"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"m3.xlarge","Name":"Core Instance Group"}]' \
   --configurations '[{"Classification":"spark","Properties":{"maximizeResourceAllocation":"true"},"Configurations":[]}]' \
   --scale-down-behavior TERMINATE_AT_INSTANCE_HOUR \
   --region us-west-2

STEP 2: Connect to the cluster using ssh:

ssh i ~/.pem hadoop@.compute.amazonaws.com 

STEP 3: Start spark-shell with the Snowflake connector packages. Alternatively, you can also pre-load the packages using the packages option when creating the cluster. For example:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3

Where:

  • spark-snowflake_2.11 specifies the connector artifact ID (for Scala 2.11).
  • 2.1.3 specifies the connector version. Note that this version is for Spark 2.1. For Spark 2.0, use 2.1.3-spark_2.0 instead.

Also, note that, if you are not running from an EMR cluster, you need to add the package for AWS support to the packages list. For instance, when you run spark-shell from a local installation, your packages list will look like this:

spark-shell --packages net.snowflake:snowflake-jdbc:3.0.14,net.snowflake:spark-snowflake_2.11:2.1.3,org.apache.hadoop:hadoop-aws:2.8.0

STEP 4: In spark-shell, you then need to define which Snowflake database and virtual warehouse to use. The connector also needs access to a staging area in AWS S3 which needs to be defined. You can do that with the following Scala commands in spark-shell:

// Configuration of the staging area for the connector in AWS S3 
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "")

// Snowflake instance parameters
val defaultOptions = Map(
  "sfURL" -> "",
  "sfAccount" > "",
  "sfUser" -> "",
  "sfPassword" -> "",
  "sfDatabase" -> "",
  "sfSchema" -> "public",
  "sfWarehouse" -> "",
  "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
  "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
  "tempdir" -> "s3n:///"
)

With these steps, you now have a Spark cluster and a spark-shell running in AWS EMR that you can use to showcase some of the most common use cases, such as complex ETL and machine learning.

Overview of Loading Data from Spark into Snowflake

Before we get into our discussion of ETL using Spark and Snowflake, let’s first take a look at what happens behind the scenes when the Snowflake Spark connector persists Spark data frames in Snowflake.

For data loading operations, the Spark connector for Snowflake performs three separate steps, similar to the spark-redshift connector:

  1. The first step persists the contents of the data frame in the staging area in AWS S3. This step currently uses the S3 bucket provided during the configuration of the Spark connector (as described in the Snowflake documentation). Future versions of the connector will not require an S3 bucket.
  2. The second step then connects to Snowflake from Spark using JDBC. On that JDBC connection, it issues a COPY command to load the data.
  3. As the third and final step, the COPY command retrieves the data from the staging area in S3 and using the current virtual warehouse to load it into tables in the Snowflake database.

Note that, in the picture above, the slave nodes in the Spark cluster and the compute nodes in Snowflake (i.e. the virtual warehouse) process the data flow. This approach allows for much greater scale than a more conventional approach where the data flow goes through the JDBC connection. Here you can scale the Spark cluster or the Snowflake virtual warehouse independently to increase data transfer bandwidth between Spark and Snowflake while your bandwidth will always be limited to a single JDBC connection.

Complex ETL in Spark with Snowflake

Today, data engineers need to continuously extract and transform data from various sources before it can be made available for consumption by business users through querying and analytics. Given the volume of data that today’s organizations face, a significant amount of compute and storage capacity needs to be made available to perform these tasks in a timely and scalable fashion. Spark has become the tool of choice for many data engineers to implement the computation steps along their data processing pipelines. This is due to the high efficiency of Spark with its in-memory execution capabilities, the availability of libraries from the Spark ecosystem, and the ease of development with languages such as Scala and Python.

The following example illustrates how Spark can be used to implement a simple data ingestion pipeline that performs several transformations on the new data before storing it in Snowflake. The example uses a web log scenario. Assume that new data is read from a web server log file, in this case using the Apache web log format. Log lines are made available as a list in Scala. The sample parses the IP addresses from the log lines and transforms them into ZIP codes using REST calls to the FreeGeoIP web service. The resulting list of ZIP codes is then stored in Snowflake.

// *************************************
// Geo mapping of Apache web log lines 
// *************************************
import scala.util.parsing.json.JSON._
import scala.io.Source.{fromInputStream}
import java.net._

// Example data: Apache web server log lines
var weblog: List[String] = List(
 """8.8.8.8 - - [07/Mar/2017:00:05:49 -0800] "GET /foo/index.html HTTP/1.1" 401 12846""",
 """8.8.8.8 - - [07/Mar/2017:00:06:51 -0800] "GET /bar/index.html HTTP/1.1" 200 4523""")
// Function to parse web log line into a list of zip codes
def ParseLogLine (strLine : String) : Integer = {
 var Array(ip, d1, d2, dt, tz, request, url, proto, ret, size) = strLine.split(" ")

 // REST call to URL
 var url2 = "http://freegeoip.net/json/".concat(s"$ip")
 var result2 = scala.io.Source.fromURL(url2).mkString

 // Parse into JSON
 var json: Option[Any] = parseFull(result2)
 var map: Map[String,Any] = json.get.asInstanceOf[Map[String, Any]]
 var zip: String = map("zip_code").asInstanceOf[String]

var intZip: Integer = zip.trim.mkString.toInt 

return intZip
}
// Parse web log
var zips: List[Integer] = weblog.map { line => ParseLogLine(line)
}
// Let's take a look at the zip codes we found in the log
zips.foreach(println(_))

// Get a data frame for the zip codes list
var df = zips.toDF()

// Push the list/df content to Snowflake as a new table
import org.apache.spark.sql.SaveMode
df.write.format("net.snowflake.spark.snowflake").options(defaultOptions).option("dbtable", "zip_codes").mode(SaveMode.Overwrite).save()

Now we have the zip codes in Snowflake and can start using them in Snowflake queries and BI tools that connect to Snowflake.

Machine Learning in Spark with Snowflake Connectivity

Part of Spark’s appeal is how easy it is to use machine learning capabilities over the data that has been made available to Spark. For example, MLlib, a popular library for machine learning, comes as part of the standard Spark configuration. With these machine learning capabilities in hand, organizations can easily gain new insights and business value from the data that they acquire.

Expanding on our previous web log example, you may wonder what zip codes or broader geographical areas the requests in the web server logs are coming from. The following Scala code illustrates how to retrieve a query in Snowflake and apply machine learning functions to the query:

// *************************************
// Retrieve zip codes stored in Snowflake into Spark
// *************************************

// Function to retrieve a Snowflake query result into a Spark data frame
def snowflakedf(sql: String) = {
 spark.read
 .format("net.snowflake.spark.snowflake")
 .options(defaultOptions)
 .option("query", sql)
 .load()
}

// Snowflake SQL query to retrieve all ZIP codes
val df2 = snowflakedf("SELECT * FROM zip_codes")

// *************************************
// Machine learning over zip codes stored in Snowflake 
// *************************************
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Convert DF with SQL result into Vectors for the MLlib API
var vectors = df2.rdd.map(r => Vectors.dense(r.getDecimal(0).doubleValue()))
vectors.cache

var numClusters = 2
var numIterations = 20
var clusters = KMeans.train(vectors, numClusters, numIterations)
clusters.clusterCenters

You can now use snowflakedf(.) to define Spark data frames that are populated with data from the Snowflake query.

Summary and Next Steps

As we’ve illustrated in this post, Spark is a powerful tool for data wrangling. Its rich ecosystem provides compelling capabilities for complex ETL and machine learning. With the deep integration into Spark provided by the connector, Snowflake can now serve as the fully-managed and governed database for all your Spark data, including traditional relational data, JSON, Avro, CSV, XML, machine-born data, etc. This makes Snowflake your repository of choice in any Spark-powered solution.

So what’s next? We encourage you to try Snowflake and its integration with Spark in your data processing solutions today:

Also, are you interested in helping design and build the next-generation Spark-Snowflake integration? If so, we invite you to take a look at the open Engineering positions on our careers page.

In part 2 of this series, we’ll take a look behind the scenes to better understand how the Spark connector processes queries that retrieve data from Snowflake into Spark and how this can be used to enable high-performance ELT solutions using Spark and Snowflake. In the meantime, keep an eye on this blog or follow us on Twitter (@snowflakedb) to keep up with all the news and happenings here at Snowflake Computing.