Spark SQL - Convert JSON String to Map

access_time 7 days ago visibility6 comment 0

Function from_json

Spark SQL function from_json(jsonStr, schema[, options]) returns a struct value with the given JSON string and format. Parameter options is used to control how the json is parsed. It accepts the same options as the json data source in Spark DataFrame reader APIs.

Single object

The following code snippet convert a JSON string to a dictionary object in Spark SQL:

spark-sql> SELECT from_json('{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"}', 'Attr_INT INT, ATTR_DOUB
LE DOUBLE, ATTR_DATE DATE');
from_json({"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"})
{"Attr_INT":1,"ATTR_DOUBLE":10.201,"ATTR_DATE":2021-01-01}

Function schema_of_json

Use function schema_of_json to find out the schema of the JSON string. It is very useful to save time to find out schema for a JSON column in your Spark DataFrame.

Example:

spark-sql> SELECT schema_of_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]');
schema_of_json([{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}])
array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>

In the above example, the schema for JSON array string '[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]' is inferred as 'array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>'

JSON array

The following code snippet parse a JSON array string using Spark SQL functions:

SELECT from_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]','array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>');

The output looks like the following:

spark-sql> SELECT from_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]','array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>');
from_json([{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}])
[{"ATTR_DATE":"2021-01-01","ATTR_DOUBLE":10.201,"Attr_INT":1},{"ATTR_DATE":"2021-02-01","ATTR_DOUBLE":10.201,"Attr_INT":1}]

Once JSON array string column is converted to an array of map, we can directly references the values:

SELECT r.json.Attr_INT, r.json.ATTR_DATE, r.json.ATTR_DOUBLE FROM (SELECT from_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]','array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>') AS json) r;

Result:

Attr_INT        ATTR_DATE       ATTR_DOUBLE
[1,1]   ["2021-01-01","2021-02-01"]     [10.201,10.201]

Explode JSON array

In the above example, each column is an array type. We can explode the array of map first to flat the result.

SELECT explode(r.json) FROM (SELECT from_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]','array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>') AS json) r;

The above SQL will generate of rows of JSON object (map):

col
{"ATTR_DATE":"2021-01-01","ATTR_DOUBLE":10.201,"Attr_INT":1}
{"ATTR_DATE":"2021-02-01","ATTR_DOUBLE":10.201,"Attr_INT":1}

We can then directly reference the JSON attribute:

SELECT r1.col.Attr_INT, r1.col.ATTR_DATE, r1.col.ATTR_DOUBLE FROM (SELECT explode(r.json) AS col FROM (SELECT from_json('[{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-01-01"},{"Attr_INT":1, "ATTR_DOUBLE":10.201, "ATTR_DATE": "2021-02-01"}]','array<struct<ATTR_DATE:string,ATTR_DOUBLE:double,Attr_INT:bigint>>') AS json) r) AS r1;

The output is now flattened:

Attr_INT        ATTR_DATE       ATTR_DOUBLE
1       2021-01-01      10.201
1       2021-02-01      10.201

Appendix - JSON options

The following are all the options can be specified (extracted from Spark Scala API documentation):

  • primitivesAsString (default false): infers all primitive values as a string type
  • prefersDecimal (default false): infers all floating-point values as a decimal type. If the values do not fit in decimal, then it infers them as doubles.
  • allowComments (default false): ignores Java/C++ style comment in JSON records
  • allowUnquotedFieldNames (default false): allows unquoted JSON field names
  • allowSingleQuotes (default true): allows single quotes in addition to double quotes
  • allowNumericLeadingZeros (default false): allows leading zeros in numbers (e.g. 00012)
  • allowBackslashEscapingAnyCharacter (default false): allows accepting quoting of all character using backslash quoting mechanism
  • allowUnquotedControlChars (default false): allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not.
  • mode (default PERMISSIVE): allows a mode for dealing with corrupt records during parsing.
    • PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets malformed fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema.
    • DROPMALFORMED : ignores the whole corrupted records.
    • FAILFAST : throws an exception when it meets corrupted records.
  • columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord.
  • dateFormat (default yyyy-MM-dd): sets the string that indicates a date format. Custom date formats follow the formats at Datetime Patterns. This applies to date type.
  • timestampFormat (default yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]): sets the string that indicates a timestamp format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp type.
  • multiLine (default false): parse one record, which may span multiple lines, per file
  • encoding (by default it is not set): allows to forcibly set one of standard basic or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding is not specified and multiLine is set to true, it will be detected automatically.
  • lineSep (default covers all \r\r\n and \n): defines the line separator that should be used for parsing.
  • samplingRatio (default is 1.0): defines fraction of input JSON objects used for schema inferring.
  • dropFieldIfAllNull (default false): whether to ignore column of all null values or empty array/struct during schema inference.
  • locale (default is en-US): sets a locale as language tag in IETF BCP 47 format. For instance, this is used while parsing dates and timestamps.
  • pathGlobFilter: an optional glob pattern to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery.
  • recursiveFileLookup: recursively scan a directory for files. Using this option disables partition discovery
copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Follow Kontext

Get our latest updates on LinkedIn or Twitter.

Want to publish your article on Kontext?

Learn more

More from Kontext

visibility 6
thumb_up 0
access_time 5 days ago

Spark SQL provides functions to calculate covariances of a set of number pairs. There are two functions:  covar_pop(expr1, expr2) and covar_samp(expr1, expr2) . The first one calculates population covariance while the second one calculates sample covariance.  Example: SELECT ...

visibility 8
thumb_up 0
access_time 9 days ago

Spark LAG function provides access to a row at a given offset that comes before the current row in the windows. This function can be used in a SELECT statement to compare values in the current row with values in a previous row. lag(input[, offset[, default]]) OVER ([PARYITION BY ..] ORDER BY ...) ...

visibility 5
thumb_up 0
access_time 5 days ago

In Spark SQL, function FIRST_VALUE (FIRST) and LAST_VALUE (LAST) can be used to to find the first or the last value of given column or expression for a group of rows. If parameter `isIgnoreNull` is specified as true, they return only non-null values (unless all values are null). first(expr[ ...