Catalyzing Insights: Unraveling NYC’s 311 Service Requests with Apache Spark and Elasticsearch

Introduction: Apache Spark offers unparalleled capabilities for processing large datasets, making it indispensable for big data tasks. In this guide, we’ll delve into the 311 Service Requests dataset from New York City’s open data initiative, enrich it, and then store the transformed data in Elasticsearch for quick querying and analysis.

Prerequisites:

  1. Apache Spark and PySpark installed.
  2. Elasticsearch set up and running.

Step-by-step guide:

1. Setting up the Environment:

First, install the required libraries:

pip install pyspark
pip install elasticsearch

2. Reading the CSV file:

We’ll read the dataset directly from the internet.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC311toElasticsearch").getOrCreate()

url = "https://data.cityofnewyork.us/resource/fhrw-4uyv.csv?$limit=500000"
df = spark.read.csv(url, header=True, inferSchema=True)

3. Performing Transformations:

Let’s find out which complaint types are most common. We’ll then add a column indicating if a complaint type is one of the top 10 most common.

from pyspark.sql.functions import col, rank, window

windowSpec = window.Window.orderBy(col("count").desc())
top_complaints = df.groupBy("complaint_type").count().withColumn("rank", rank().over(windowSpec)).filter(col("rank") <= 10)

df = df.join(top_complaints, ["complaint_type"], "left_outer").withColumn("is_top_complaint", col("rank").isNotNull())

4. Writing to Elasticsearch:

Ensure Elasticsearch is correctly set up and the desired index is defined. We’ll write our DataFrame to an Elasticsearch index named “nyc_311_data”.

from elasticsearch import Elasticsearch

# Establish a connection
es = Elasticsearch()

# Configuration for Elasticsearch
es_write_conf = {
    "es.nodes": "localhost",
    "es.port": "9200",
    "es.resource": "nyc_311_data/docs",
    "es.input.json": "true",
    "es.mapping.id": "unique_key"  # 'unique_key' column in the dataset as the identifier
}

# Write DataFrame to Elasticsearch
df.write.format("org.elasticsearch.spark.sql").options(**es_write_conf).mode("overwrite").save()

Conclusion: Utilizing Apache Spark’s data processing prowess, we’ve successfully transformed a real-world dataset from NYC and indexed it in Elasticsearch. This combination empowers users to gain insights from vast datasets and swiftly query them. As demonstrated, even extensive datasets like NYC’s 311 Service Requests can be processed efficiently and made ready for real-time analytics.

Note: In real-world scenarios, it’s essential to ensure that you have the right permissions to download, process, and use the data.

GitHub: https://github.com/naveedanjum/new-york-311-data-anlysis-pyspark