Code Snippets & Tips
Articles
Union two pandas DataFrame
This code snippet shows you how to union two pandas DataFrames in python using concat method in pandas namespace.If the schema is different, pandas will autmatically merge it. Output `` category value user 0 A 0 2.0 1 B 1 3.0 2 C 2 2.0 3 D 3 1.0 4 E 4 1.0 0 A 0 NaN 1 B 1 NaN 2 C 2 NaN 3 D 3 NaN 4 E 4 NaN ` For the second DataFrame, column user doesn't exist. Pandas uses NaN` to mark it.
Pandas DataFrame groupBy and then COUNT DISTINCT
This code snippet shows you how to group a pandas DataFrame via certain column and then do a distinct count of unique values in another column. It is similar as COUNT DISTINCT aggregation functions in SQL. It also sort the values by the group by column. Example output: `` category value user group-count 0 A 0 5 7 80 A 80 4 7 70 A 70 7 7 60 A 60 10 7 50 A 50 9 7 .. ... ... ... ... 29 J 29 9 7 19 J 19 9 7 9 J 9 9 7 89 J 89 8 7 99 J 99 7 7 [100 rows x 4 columns] ``
Snowflake - Flatten nested JSON array
Snowflake provides a number of JSON related functions to convert string (varchar) to JSON object and extract JSON values from the object or flatten nested array, etc. The code snippet shows an example of flattening the following JSON string using lateral flatten: ``json { "a":"a", "b":[ { "c":"c1", "d":[ 1, 2, 3, 4, 5, 6, 7 ] }, { "c":"c2", "d":[ 10, 20, 30, 40, 50, 60, 70 ] } ] } ``
Deploy Azure Automation Runbooks via GitHub Actions
This code snippet shows how to use deploy Azure Automation PowerShell runbooks using GitHub Actions workflow. Automation Runbooks can be used to run scripts regularly including PowerShell workflows, Python scripts, etc. If you have multiple scripts to deploy, you can create a PowerShell script file to deploy. The following assumptions are made: The PowerShell runbook script (myrunbook.ps1) is located in the specified folder (scripts/runbooks). The runbook is PowerShell type. GitHub Actions secrets are setup to use OIDC to authenticate with Azure. The resource will be deployed into automation account named myautomationaccount under resource group 'rg'.
Flatten Pandas DataFrame after Aggregate
In code snippet Pandas DataFrame Group by one Column and Aggregate using MAX, MIN, MEAN and MEDIAN, it shows how to do aggregations in a pandas DataFrame. This code snippet shows you how to flatten the DataFrame (multiindex) after aggregations. Sample output: category value\max value\min value\mean value\median 0 A 90 0 45 45 1 B 91 1 46 46 2 C 92 2 47 47 3 D 93 3 48 48 4 E 94 4 49 49 5 F 95 5 50 50 6 G 96 6 51 51 7 H 97 7 52 52 8 I 98 8 53 53 9 J 99 9 54 54
Pandas DataFrame Group by one Column and Aggregate using MAX, MIN, MEAN and MEDIAN
This code snippet provides one example of grouping a pandas DataFrame by one column and then aggregating on multiple columns using different functions including max, min, mean and median. We pass in a dictionary for each column that needs to be aggregated: the key is the column name, and the value is a list of aggregation functions supported by pandas DataFrame. The result DataFrame will have multiple levels as following output shows: Sample output: value max min mean median category A 90 0 45 45 B 91 1 46 46 C 92 2 47 47 D 93 3 48 48 E 94 4 49 49 F 95 5 50 50 G 96 6 51 51 H 97 7 52 52 I 98 8 53 53 J 99 9 54 54
Get First Top N Rows in a Pandas DataFrame
Method pandas.DataFrame.head can be used to retrieve top N records from a DataFrame object. It has one optional parameter for the number of rows to return; the default value is 5 if not specified. Syntax `` DataFrame.head(n=5) `` Sample output: A B 0 0.000000 0.000000 1 1.010101 10.101010 2 2.020202 20.202020 3 3.030303 30.303030 4 4.040404 40.404040 5 5.050505 50.505051 6 6.060606 60.606061 7 7.070707 70.707071 8 8.080808 80.808081 9 9.090909 90.909091
Check Column Data Types in a Pandas DataFrame
This code snippet provide examples of checking column data types in a DataFrame using dtype and dtypes. Sample output: >>> print(df.dtypes) a int64 b object dtype: object >>> print(df.a.dtype) int64
Rename Columns in Pandas DataFrame
This code snippet provides a simple approach to rename column names in a Panda DataFrame. It directly sets DataFrame.columns property. Output: !2022090404325-image.png
Python - Get a List of Files in a Directory
This code snippet provides an example of listing files in a directory in the file system using Python. Module os is used. There are multiple approaches to implement this: os.listdir - returns both files and directory in a directory. We can check whether the returned items are files using os.path.isfile method. os.walk - generates two lists for each directory it traverse - files and directories. The walk can be used to retrieve all the sub folders recursively. Sample output: ['F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209020059.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209020429.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209020710.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209020921.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209021147.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209021357.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209021637.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209022007.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209030218.log', 'F:\\Projects\\kontext-logs\\RawLogs-0\\c716eb-202209030818.log'] Reference: os — Miscellaneous operating system interfaces
Python Convert Relative to Absolute File Path
This code snippet shows you how to convert a relative path to absolute path using os module in Python and vice versa. We utilize os.path.abspath and os.path.relpath functions. For the later, it accepts the second parameter as the base path which will be used to calculate the relative path. Output: \*The current directory when running this code is C:\Users\userid. C:\Users\test.csv ..\test.csv
Use sort() and orderBy() with PySpark DataFrame
In Spark DataFrame, two APIs are provided to sort the rows in a DataFrame based on the provided column or columns: sort and orderBy. orderBy is just the alias for sort API. Syntax `` DataFrame.sort(cols, *kwargs) ` For *cols, we can used it to specify a column name, a Column object (pyspark.sql.Column), or a list of column names or Column objects. For kwargs, we can use it to specify additional arguments. For PySpark, we can specify a parameter named ascending. By default the value is True. It can be a list of boolean values for each columns that are used to sort the records. The code snippet provides the examples of sorting a DataFrame. Sample outputs ` +---+----+ | id|col1| +---+----+ | 2| E| | 4| E| | 6| E| | 8| E| | 1| O| | 3| O| | 5| O| | 7| O| | 9| O| +---+----+ +---+----+ | id|col1| +---+----+ | 2| E| | 4| E| | 6| E| | 8| E| | 1| O| | 3| O| | 5| O| | 7| O| | 9| O| +---+----+ +---+----+ | id|col1| +---+----+ | 8| E| | 6| E| | 4| E| | 2| E| | 9| O| | 7| O| | 5| O| | 3| O| | 1| O| +---+----+ ``
Time Travel with Delta Table in PySpark
Delta Lake provides time travel functionalities to retrieve data at certain point of time or at certain version. This can be done easily using the following two options when reading from delta table as DataFrame: versionAsOf - an integer value to specify a version. timestampAsOf - A timestamp or date string. This code snippet shows you how to use them in Spark DataFrameReader APIs. It includes three examples: Query data as version 1 Query data as 30 days ago (using computed value via Spark SQL) Query data as certain timestamp ('2022-09-01 12:00:00.999999UTC 10:00') You may encounter issues if the timestamp is earlier than the earlier commit: pyspark.sql.utils.AnalysisException: The provided timestamp (2022-08-03 00:00:00.0) is before the earliest version available to this table (2022-08-27 10:53:18.213). Please use a timestamp after 2022-08-27 10:53:18. Similarly, if the provided timestamp is later than the last commit, you may encounter another issue like the following: pyspark.sql.utils.AnalysisException: The provided timestamp: 2022-09-07 00:00:00.0 is after the latest commit timestamp of 2022-08-27 11:30:47.185. If you wish to query this version of the table, please either provide the version with "VERSION AS OF 1" or use the exact timestamp of the last commit: "TIMESTAMP AS OF '2022-08-27 11:30:47'". References Delta Lake with PySpark Walkthrough
Use expr() Function in PySpark DataFrame
Spark SQL function expr() can be used to evaluate a SQL expression and returns as a column (pyspark.sql.column.Column). Any operators or functions that can be used in Spark SQL can also be used with DataFrame operations. This code snippet provides an example of using expr() function directly with DataFrame. It also includes the snippet to derive a column without using this function. \* The code snippet assumes a SparkSession object already exists as 'spark'. Output: `` +---+-----+-----+ | id|idv1|idv2| +---+-----+-----+ | 1| 11| 11| | 2| 12| 12| | 3| 13| 13| | 4| 14| 14| | 5| 15| 15| | 6| 16| 16| | 7| 17| 17| | 8| 18| 18| | 9| 19| 19| +---+-----+-----+ ``
PySpark DataFrame - Add or Subtract Milliseconds from Timestamp Column
This code snippets shows you how to add or subtract milliseconds (or microseconds) and seconds from a timestamp column in Spark DataFrame. It first creates a DataFrame in memory and then add and subtract milliseconds/seconds from the timestamp column ts using Spark SQL internals. Output: `` +---+--------------------------+--------------------------+--------------------------+--------------------------+ |id |ts |ts1 |ts2 |ts3 | +---+--------------------------+--------------------------+--------------------------+--------------------------+ |1 |2022-09-01 12:05:37.227916|2022-09-01 12:05:37.226916|2022-09-01 12:05:37.228916|2022-09-01 12:05:38.227916| |2 |2022-09-01 12:05:37.227916|2022-09-01 12:05:37.226916|2022-09-01 12:05:37.228916|2022-09-01 12:05:38.227916| |3 |2022-09-01 12:05:37.227916|2022-09-01 12:05:37.226916|2022-09-01 12:05:37.228916|2022-09-01 12:05:38.227916| |4 |2022-09-01 12:05:37.227916|2022-09-01 12:05:37.226916|2022-09-01 12:05:37.228916|2022-09-01 12:05:38.227916| +---+--------------------------+--------------------------+--------------------------+--------------------------+ ` \*Note - the code assuming SparkSession object already exists via variable name spark`.
ASP.NET Core Find out Page Referer
Referer is usually sent as part of HTTP request headers. For instance, the Referer field in HTTP request header can look like the following: `` Referer: https://kontext.tech/ ` This code snippet provides example of retrieving referer information via HttpRequest.GetTypedHeaders` method in ASP.NET Core Razor pages. It also shows you how to get the relative path and query string only. The same code can also be used in web API controllers. Note: The referer can be NULL if user directly type the page URL in browser.
Use when() and otherwise() with PySpark DataFrame
In Spark SQL, CASE WHEN clause can be used to evaluate a list of conditions and to return one of the multiple results for each column. The same can be implemented directly using pyspark.sql.functions.when and pyspark.sql.Column.otherwise functions. If otherwise is not used together with when, None will be returned for unmatched conditions. Output: `` +---+------+ | id|id_new| +---+------+ | 1| 1| | 2| 200| | 3| 3000| | 4| 400| | 5| 5| | 6| 600| | 7| 7| | 8| 800| | 9| 9000| +---+------+ ``
PySpark - Save DataFrame into Hive Table using insertInto
This code snippets provides one example of inserting data into Hive table using PySpark DataFrameWriter.insertInto API. `` DataFrameWriter.insertInto(tableName: str, overwrite: Optional[bool] = None) ` It takes two parameters: tableName - the table to insert data into; overwrite` - whether to overwrite existing data. By default, it won't overwrite existing data. This function uses position-based resolution for columns instead of column names.
HDFS - List Folder Recursively
This code snippet provides one example to list all the folders and files recursively under one HDFS path. The key is to use -R option of the ls sub command. Sample output: !2022082465735-image.png
Find Number of Rows of Hive Table via Scala
To find the number of rows/records in a Hive table, we can use Spark SQL count aggregation function: Hive SQL - Aggregate Functions Overview with Examples. This code snippet provide example of Scala code to implement the same. spark-shell is used directly for simplicity. The code snippet can also run Jupyter Notebooks or Zeppelin with Spark kernel. Alternatively, it can be compiled to jar file and then submit as job via spark-submit. !2022082315649-image.png
Subscribe to Multiple Topics in Kafka Consumer
When fetching records from Kafka bootstrap servers, we can specify one consumer to subscribe to more than one topic. This can be done in the init function by passing a list of topics: `` topics = ['kontext-events', 'other-events'] bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( *topics, client_id='local-test', bootstrapservers=bootstrapservers, autooffsetreset='earliest' ) ` Alternatively, we can use subscribe() function to subscribe to multiple topics. We can also regular expressions to match topics via patterns: ` subscribe(topics=(), pattern=None, listener=None) ``
Kafka Consumer - Fetch Records from Beginning
This code snippet reads data from topic's beginning offset by utilizing seektobeginning() API. This will sets each partitions subscribed by the consumers to fetch records from the earliest available offsets. Example output: polling... Kontext message 1 Kontext message 2 Kontext message 3 Kontext message 4 Kontext message 5 New kontext events~!! New events! polling... polling...
Convert Timestamp to Milliseconds since Epoch in Python
This code snippets provides different approaches to convert a Python timestamp object to an integer that denotes the total milliseconds since Epoch (midnight 1st Jan 1970 UTC, UNIX time). Output: `` 1661169251815.902 1661133251815.902 ` \*Your result can be different depends on the time when you run the code. The result is the same no matter if we use utcnow() or now()` as they both represent the same time. However, if your input timestamp format is string, you need to be careful about time zone information when converting them to timestamp, i.e. specify the right time zone accordingly.
Python - Read Messages in Kafka Topic
This code snippet provides example of reading messages from Kafka topics. In common practices, one Kafka consumer is used to read from one partition for a topic; if there are multiple partitions for the same topic, each consumer can run in different servers or containers. Those consumers for different partitions are also configured within one consumer group. In this example, all partitions are assigned to the same consumer. This code snippet utilize Python package kafka-python. It can be installed via the following command if you have pip installed: `` pip install kafka-python ` Or we can also use built-in pip in Python 3: ` python -m pip install kafka-python `` References Apache Kafka Python Producer and Consumer Clients Introduction Install and Run Kafka 3.2.0 On WSL Install and Run Kafka 2.6.0 On Windows 10
PySpark DataFrame - inner, left and right Joins
This code snippet shows you how to perform inner, left and right joins with DataFrame.join API. `` def join(self, other, on=None, how=None) ` Supported join types The default join type is inner. The supported values for parameter how are: inner, cross, outer, full, fullouter, full\outer, left, leftouter, left\outer, right, rightouter, right\outer, semi, leftsemi, left\semi, anti, leftanti and left\_anti. To learn about the these different join types, refer to article Spark SQL Joins with Examples. Join via multiple columns If there are more than one column to join, we can specify on parameter as a list of column name: ` df1.join(df2, on=['id','other_column'], how='left') ` Output from the code snippet: ` +---+----+ | id|attr| +---+----+ | 1| A| | 2| B| +---+----+ +---+--------+ | id|attr_int| +---+--------+ | 1| 100| | 2| 200| | 3| 300| +---+--------+ +---+----+--------+ | id|attr|attr_int| +---+----+--------+ | 1| A| 100| | 2| B| 200| +---+----+--------+ +---+----+--------+ | id|attr|attr_int| +---+----+--------+ | 1| A| 100| | 2| B| 200| +---+----+--------+ +---+----+--------+ | id|attr|attr_int| +---+----+--------+ | 1| A| 100| | 2| B| 200| | 3|null| 300| +---+----+--------+ ``
Start Spark History Server UI
This code snippet provides the simple CLI to start Spark history server service. About Spark History Server Spark History Server can be used to look for historical Spark jobs that completed successfully or failed. By default, Spark execution logs are saved into local temporary folders. You can add configuration items into spark-default.xml to save logs to HDFS. For example, the following configurations ensure the logs are stored into my local Hadoop environment. `` spark.eventLog.enabled true spark.eventLog.dir hdfs://localhost:9000/shared/spark-logs spark.history.fs.logDirectory hdfs://localhost:9000/shared/spark-logs ` !2022082171715-image.png In the code snippet, SPARK_HOME `is the environment variable name that points to the location where you Spark is installed. If this variable is not defined, you can directly specify the full path to the shell script (sbin/start-history-server.sh). History Server URL By default, the URL is http://localhost:18080/http://localhost:18080/ in local environment. You can replace localhost with your server address where the history server is started. Usually it locates in edge servers. The UI looks like the following screenshot: !2022082171913-image.png By clicking the link of each App, you will be able to find the job details for each Spark applications.
PySpark - Read Parquet Files in S3
This code snippet provides an example of reading parquet files located in S3 buckets on AWS (Amazon Web Services). The bucket used is from New York City taxi trip record data. S3 bucket location is: s3a://ursa-labs-taxi-data/2009/01/data.parquet. To run the script, we need to setup the package dependency on Hadoop AWS package, for example, org.apache.hadoop:hadoop-aws:3.3.0. This can be easily done by passing configuration argument using spark-submit: `` spark-submit --conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.0 ` This can also be done via SparkConf: ` conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.0') ` Use temporary AWS credentials In this code snippet, AWS AnonymousAWSCredentialsProvider is used. If the bucket is not public, we can also use TemporaryAWSCredentialsProvider. ` conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider') conf.set('spark.hadoop.fs.s3a.access.key', ) conf.set('spark.hadoop.fs.s3a.secret.key', ) conf.set('spark.hadoop.fs.s3a.session.token', ) `` If you have used AWS CLI or SAML tools to cache local credentials ( ~/.aws/credentials), you then don't need to specify the access keys assuming the credential has access to the S3 bucket you are reading data from.
Start Hive Beeline CLI
This code snippet provides example to start Hive Beeline CLI in Linux. Beeline is the successor of Hive CLI. In the shell scripts, the environment variable $HIVE_HOME is the home folder of Hive installation in the system. In a cluster environment, it usually refers to the Hive client installation on an edge server. Output: `` $HIVE_HOME/bin/beeline -u jdbc:hive2:// Connecting to jdbc:hive2:// Hive Session ID = 65a40cd9-02ce-4965-93b6-cff9db461b70 Connected to: Apache Hive (version 3.1.3) Driver: Hive JDBC (version 3.1.3) Transaction isolation: TRANSACTIONREPEATABLEREAD Beeline version 3.1.3 by Apache Hive 0: jdbc:hive2://> ``
Hive - Retrieve Current User
This code snippet provides example of retrieving current user via current_user() function in HQL (Hive QL) code. Output: `` 0: jdbc:hive2://> select currentuser();OK+----------+| c0 |+----------+| kontext |+----------+ ``
Hive SQL - Merge Statement on ACID Tables
Hive supports standard ANSI SQL MERGE statement from version 2.2. However it can be only be applied to tables that support ACID transactions. To learn more about ACID support in Hive, refer to article: Hive ACID Inserts, Updates and Deletes with ORC. Sample table This code snippet merges into a sample table named testdb.crudtable. It has two records before the merge. !20220819124209-image.png The staging table was created using the following statements: `` create table crudtablestg (id int, value string, op string); insert into crudtablestg values (1,'AA','U'),(2,'B','D'),(3,'C', 'I'); ` It has one additional column named op to indicate the delta changes: U - updates D - deletes I - inserts (i.e. new records) Syntax ` MERGE INTO AS T USING AS S ON WHEN MATCHED [AND ] THEN UPDATE SET WHEN MATCHED [AND ] THEN DELETE WHEN NOT MATCHED [AND ] THEN INSERT VALUES `` Output After the merge, record 1 is updated; record 2 is deleted and record 3 is inserted into the table.
Iterate through PySpark DataFrame Rows via foreach
DataFrame.foreach can be used to iterate/loop through each row (pyspark.sql.types.Row) in a Spark DataFrame object and apply a function to all the rows. This method is a shorthand for DataFrame.rdd.foreach. Note: Please be cautious when using this method especially if your DataFrame is big. Output: `` +-----+--------+ | col1| col2| +-----+--------+ |Hello| Kontext| |Hello|Big Data| +-----+--------+ col1=Hello, col2=Kontext col1=Hello, col2=Big Data ``
Concatenate Columns in Spark DataFrame
This code snippet provides one example of concatenating columns using a separator in Spark DataFrame. Function concatws is used directly. For Spark SQL version, refer to Spark SQL - Concatenate w/o Separator (concat\ws and concat). Syntax of concat\_ws `` pyspark.sql.functions.concat_ws(sep: str, *cols: ColumnOrName) ` Output: ` +-----+--------+--------------+ | col1| col2| col1_col2| +-----+--------+--------------+ |Hello| Kontext| Hello,Kontext| |Hello|Big Data|Hello,Big Data| +-----+--------+--------------+ ``
Remove Special Characters from Column in PySpark DataFrame
Spark SQL function regexreplace can be used to remove special characters from a string column in Spark DataFrame. Depends on the definition of special characters, the regular expressions can vary. For instance, [^0-9a-zA-Z\-]+ can be used to match characters that are not alphanumeric or are not hyphen(-) or underscore(\_); regular expression '[@\+\#\$\%\^\!]+' can match these defined special characters. This code snippet replace special characters with empty string. Output: `` +---+--------------------------+ |id |str | +---+--------------------------+ |1 |ABCDEDF!@#$%%^123456qwerty| |2 |ABCDE!!! | +---+--------------------------+ +---+-------------------+ | id| replaced_str| +---+-------------------+ | 1|ABCDEDF123456qwerty| | 2| ABCDE| +---+-------------------+ ``
PySpark DataFrame - Calculate Distinct Count of Column(s)
This code snippet provides an example of calculating distinct count of values in PySpark DataFrame using countDistinct PySpark SQL function. Output: `` +---+-----+ | ID|Value| +---+-----+ |101| 56| |101| 67| |102| 70| |103| 93| |104| 70| +---+-----+ +-----------------+------------------+ |DistinctCountOfID|DistinctCountOfRow| +-----------------+------------------+ | 4| 5| +-----------------+------------------+ ``
PySpark DataFrame - Calculate sum and avg with groupBy
This code snippet provides an example of calculating aggregated values after grouping data in PySpark DataFrame. To group data, DataFrame.groupby or DataFrame.groupBy can be used; then GroupedData.agg method can be used to aggregate data for each group. Built-in aggregation functions like sum, avg, max, min and others can be used. Customized aggregation functions can also be used. Output: `` +----------+--------+ |TotalScore|AvgScore| +----------+--------+ | 392| 78.4| +----------+--------+ ``
PySpark DataFrame - percent_rank() Function
In Spark SQL, PERCENT\RANK(Spark SQL - PERCENT\RANK Window Function). This code snippet implements percentile ranking (relative ranking) directly using PySpark DataFrame percent_rank API instead of Spark SQL. Output: `` +-------+-----+------------------+ |Student|Score| percent_rank| +-------+-----+------------------+ | 101| 56| 0.0| | 109| 66|0.1111111111111111| | 103| 70|0.2222222222222222| | 110| 73|0.3333333333333333| | 107| 75|0.4444444444444444| | 102| 78|0.5555555555555556| | 108| 81|0.6666666666666666| | 104| 93|0.7777777777777778| | 105| 95|0.8888888888888888| | 106| 95|0.8888888888888888| +-------+-----+------------------+ ``
PySpark DataFrame - rank() and dense_rank() Functions
In Spark SQL, rank and denserank functions can be used to rank the rows within a window partition. In Spark SQL, we can use RANK(Spark SQL - RANK Window Function) and DENSE\RANK(Spark SQL - DENSE\RANK Window Function). This code snippet implements ranking directly using PySpark DataFrame APIs instead of Spark SQL. It created a window that partitions the data by TXN_DT attribute and sorts the records in each partition via AMT column in descending order. The frame boundary of the window is defined as unbounded preceding and current row. Output: `` +----+------+-------------------+----+----------+ |ACCT| AMT| TXNDT|rank|denserank| +----+------+-------------------+----+----------+ | 101|102.01|2021-01-01 00:00:00| 1| 1| | 102| 93.0|2021-01-01 00:00:00| 2| 2| | 101| 10.01|2021-01-01 00:00:00| 3| 3| | 103| 913.1|2021-01-02 00:00:00| 1| 1| | 101|900.56|2021-01-03 00:00:00| 1| 1| | 102|900.56|2021-01-03 00:00:00| 1| 1| | 103| 80.0|2021-01-03 00:00:00| 3| 2| +----+------+-------------------+----+----------+ ` As printed out, the difference between dense_rank and rank `is that the former will not generate any gaps if the ranked values are the same for multiple rows.
PySpark DataFrame - Add Row Number via row_number() Function
In Spark SQL, rownumber can be used to generate a series of sequential number starting from 1 for each record in the specified window. Examples can be found in this page: Spark SQL - ROW\NUMBER Window Functions. This code snippet provides the same approach to implement rownumber directly using PySpark DataFrame APIs instead of Spark SQL. It created a window that partitions the data by ACCT attribute and sorts the records in each partition via TXNDT column in descending order. The frame boundary of the window is defined as unbounded preceding and current row. Output: `` +----+------+-------------------+ |ACCT| AMT| TXN_DT| +----+------+-------------------+ | 101| 10.01|2021-01-01 00:00:00| | 101|102.01|2021-01-01 00:00:00| | 102| 93.0|2021-01-01 00:00:00| | 103| 913.1|2021-01-02 00:00:00| | 101|900.56|2021-01-03 00:00:00| +----+------+-------------------+ +----+------+-------------------+------+ |ACCT| AMT| TXN_DT|rownum| +----+------+-------------------+------+ | 101|900.56|2021-01-03 00:00:00| 1| | 101| 10.01|2021-01-01 00:00:00| 2| | 101|102.01|2021-01-01 00:00:00| 3| | 102| 93.0|2021-01-01 00:00:00| 1| | 103| 913.1|2021-01-02 00:00:00| 1| +----+------+-------------------+------+ ``
PySpark DataFrame Fill Null Values with fillna or na.fill Functions
In PySpark, DataFrame.fillna, DataFrame.na.fill and DataFrameNaFunctions.fill are alias of each other. We can use them to fill null values with a constant value. For example, replace all null integer columns with value 0, etc. Output: `` +--------------+-------+--------+ | strcol|intcol|bool_col| +--------------+-------+--------+ |Hello Kontext!| 100| true| |Hello Context!| 0| null| +--------------+-------+--------+ +--------------+-------+--------+ | strcol|intcol|bool_col| +--------------+-------+--------+ |Hello Kontext!| 100| true| |Hello Context!| null| false| +--------------+-------+--------+ +--------------+-------+--------+ | strcol|intcol|bool_col| +--------------+-------+--------+ |Hello Kontext!| 100| true| |Hello Context!| 0| false| +--------------+-------+--------+ ``
PySpark User Defined Functions (UDF)
User defined functions (UDF) in PySpark can be used to extend built-in function library to provide extra functionality, for example, creating a function to extract values from XML, etc. This code snippet shows you how to implement an UDF in PySpark. It shows two slightly different approaches - one use udf decorator and another without. Output: `` +--------------+-------+--------+--------+ | strcol|intcol|strlen1|strlen2| +--------------+-------+--------+--------+ |Hello Kontext!| 100| 14| 14| |Hello Context!| 100| 14| 14| +--------------+-------+--------+--------+ ``
PySpark DataFrame - Convert JSON Column to Row using json_tuple
PySpark SQL functions json_tuple can be used to convert DataFrame JSON string columns to tuples (new rows in the DataFrame). Syntax of this function looks like the following: `` pyspark.sql.functions.json_tuple(col, *fields) ` The first parameter is the JSON string column name in the DataFrame and the second is the filed name list to extract. If you need to extract complex JSON documents like JSON arrays, you can follow this article - PySpark: Convert JSON String Column to Array of Object (StructType) in DataFrame. Output ` StructType([StructField('id', LongType(), True), StructField('c0', StringType(), True), StructField('c1', StringType(), True), StructField('c2', StringType(), True)]) +---+---+------+----------+ | id| c0| c1| c2| +---+---+------+----------+ | 1| 1|10.201|2021-01-01| | 2| 2|20.201|2022-01-01| +---+---+------+----------+ ``
PySpark DataFrame - Extract JSON Value using get_json_object Function
PySpark SQL functions getjsonobject can be used to extract JSON values from a JSON string column in Spark DataFrame. This is equivalent as using Spark SQL directly: Spark SQL - Extract Value from JSON String. Syntax of this function looks like the following: `` pyspark.sql.functions.getjsonobject(col, path) ` The first parameter is the JSON string column name in the DataFrame and the second is the JSON path. This code snippet shows you how to extract JSON values using JSON path. If you need to extract complex JSON documents like JSON arrays, you can follow this article - PySpark: Convert JSON String Column to Array of Object (StructType) in DataFrame. Output ` StructType([StructField('id', LongType(), True), StructField('jsoncol', StringType(), True), StructField('ATTRINT0', StringType(), True), StructField('ATTRDATE_1', StringType(), True)]) +---+--------------------+----------+-----------+ | id| jsoncol|ATTRINT0|ATTRDATE_1| +---+--------------------+----------+-----------+ | 1|[{"Attr_INT":1, "...| 1| 2022-01-01| +---+--------------------+----------+-----------+ ``
Replace Values via regexp_replace Function in PySpark DataFrame
PySpark SQL APIs provides regexp_replace built-in function to replace string values that match with the specified regular expression. It takes three parameters: the input column of the DataFrame, regular expression and the replacement for matches. `` pyspark.sql.functions.regexp_replace(str, pattern, replacement) ` Output The following is the output from this code snippet: ` +--------------+-------+----------------+ | strcol|intcol|strcolreplaced| +--------------+-------+----------------+ |Hello Kontext!| 100| Hello kontext!| |Hello Context!| 100| Hello kontext!| +--------------+-------+----------------+ `` All uppercase 'K' or 'C' are replaced with lowercase 'k'.
PySpark DataFrame - union, unionAll and unionByName
PySpark DataFrame provides three methods to union data together: union, unionAll and unionByName. The first two are like Spark SQL UNION ALL clause which doesn't remove duplicates. unionAll is the alias for union. We can use distinct method to deduplicate. The third function will use column names to resolve columns instead of positions. unionByName can also be used to merge two DataFrames with different schemas. Syntax of unionByName `` DataFrame.unionByName(other, allowMissingColumns=False) ` If allowMissingColumns is specified as True, the missing columns in both DataFrame will be added with default value null. This parameter is only available from Spark 3.1.0. For lower versions, the follow error may appear: df1.unionByName(df4, allowMissingColumns=True).show(truncate=False) TypeError: unionByName() got an unexpected keyword argument 'allowMissingColumns' Outputs The following are the outputs from the code snippet. ` +---+---+---+ |c1 |c2 |c3 | +---+---+---+ |1 |A |100| |2 |B |200| +---+---+---+ +---+---+---+ |c1 |c2 |c3 | +---+---+---+ |1 |A |100| |1 |A |100| |2 |B |200| +---+---+---+ +---+---+---+ |c1 |c2 |c3 | +---+---+---+ |1 |A |100| |2 |B |200| +---+---+---+ +---+---+----+----+ |c1 |c2 |c3 |c4 | +---+---+----+----+ |1 |A |100 |null| |3 |C |null|ABC | +---+---+----+----+ ``
PySpark DataFrame - drop and dropDuplicates
PySpark DataFrame APIs provide two drop related methods: drop and dropDuplicates (or drop_duplicates). The former is used to drop specified column(s) from a DataFrame while the latter is used to drop duplicated rows. This code snippet utilizes these tow functions. Outputs: `` +----+------+ |ACCT|AMT | +----+------+ |101 |10.01 | |101 |10.01 | |101 |102.01| +----+------+ +----+----------+------+ |ACCT|TXN_DT |AMT | +----+----------+------+ |101 |2021-01-01|102.01| |101 |2021-01-01|10.01 | +----+----------+------+ +----+----------+------+ |ACCT|TXN_DT |AMT | +----+----------+------+ |101 |2021-01-01|102.01| |101 |2021-01-01|10.01 | +----+----------+------+ ``
Spark SQL - window Function
Spark SQL has built-in function window to bucketize rows into one or more time windows given a timestamp specifying column. The syntax of the function looks like the following: window(timeColumn: ColumnOrName, windowDuration: str, slideDuration: Optional[str] = None, startTime: Optional[str] = None) This function is available from Spark 2.0.0. slideDuration must be less than or equal to windowDuration. \*These SQL statements can be directly used in PySpark DataFrame APIs too via spark.sql function. This code snippet prints out the following outputs: Query 1: `` 2022-08-01 12:01:00 {"start":2022-08-01 12:00:00,"end":2022-08-01 12:30:00} 2022-08-01 12:15:00 {"start":2022-08-01 12:00:00,"end":2022-08-01 12:30:00} 2022-08-01 12:31:01 {"start":2022-08-01 12:30:00,"end":2022-08-01 13:00:00} ` The first two rows are in the same window [00:00, 00:30). Query 2: ` 2022-08-01 12:01:00 {"start":2022-08-01 12:00:00,"end":2022-08-01 12:30:00}2022-08-01 12:01:00 {"start":2022-08-01 11:45:00,"end":2022-08-01 12:15:00}2022-08-01 12:15:00 {"start":2022-08-01 12:15:00,"end":2022-08-01 12:45:00}2022-08-01 12:15:00 {"start":2022-08-01 12:00:00,"end":2022-08-01 12:30:00}2022-08-01 12:31:01 {"start":2022-08-01 12:30:00,"end":2022-08-01 13:00:00}2022-08-01 12:31:01 {"start":2022-08-01 12:15:00,"end":2022-08-01 12:45:00} ``
Spark SQL - session_window Function
Spark SQL has built-in function session_window to create a window column based on a timestamp column and gap duration. The syntax of the function looks like the following: session\_window(timeColumn: ColumnOrName, gapDuration: [pyspark.sql.column.Column, str]) This function is available from Spark 3.2.0. \*These SQL statements can be directly used in PySpark DataFrame APIs too via spark.sql function. This code snippet prints out the following output: `` 2022-08-01 12:01:00 {"start":2022-08-01 12:01:00,"end":2022-08-01 12:31:00} 2022-08-01 12:15:00 {"start":2022-08-01 12:15:00,"end":2022-08-01 12:45:00} 2022-08-01 12:31:01 {"start":2022-08-01 12:31:01,"end":2022-08-01 13:01:01} ``
Vanilla JavaScript - Wildcard (*) Selector
In vanilla JavaScript, we can use wildcard (\*) to match multiple elements (nodes) in the HTML DOM object. We can also match elements with attribute starting with, ending with or contains certain values. For example, the following selectors will [name^="password"] will match all elements with namestarting with password. [name$="password"] will match all elements with nameending with password. [name\*="password"] will match all elements with namecontaining password.
PySpark DataFrame - Select Columns using select Function
In PySpark, we can use select function to select a subset or all columns from a DataFrame. Syntax `` DataFrame.select(*cols) ` This function returns a new DataFrame object based on the projection expression list. This code snippet prints out the following output: ` +---+----------------+-------+---+ | id|customer_profile| name|age| +---+----------------+-------+---+ | 1| {Kontext, 3}|Kontext| 3| | 2| {Tech, 10}| Tech| 10| +---+----------------+-------+---+ ``
Clean out All Style Attributes using HTML Agility Pack .NET
HTML Agility Pack (HAP) is one of the most commonly used .NET package to parse HTML. It creates a document object model in memory, which can be use to manipulate the nodes (including both elements and attributes). The package can be added to your project from NuGet via the following CLI: `` dotnet add package HtmlAgilityPack --version 1.11.43 ` This code snippet provides one example of using this HAP to remove all style `attributes. The script can be run as C# script. For more details, refer to Html Agility Pack.
HTML Parsing using HTML Agility Pack .NET
HTML Agility Pack (HAP) is one of the most commonly used .NET package to parse HTML. It creates a document object model in memory, which can be use to manipulate the nodes (including both elements and attributes). The package can be added to your project from NuGet via the following CLI: `` dotnet add package HtmlAgilityPack --version 1.11.43 ` This code snippet provides one example of using this HAP to remove all script `elements. The script can be run as C# script. For more details, refer to Html Agility Pack.
PySpark DataFrame - Expand or Explode Nested StructType
Hive SQL - Union data with UNION ALL and UNION DISTINCT
Hive SQL - Data Sampling using TABLESAMPLE
PySpark DataFrame - Filter Records using where and filter Function
PySpark DataFrame - Add Column using withColumn
PySpark DataFrame - explode Array and Map Columns
Python Format with Dictionary Object
Access the Index in Python 'for' Loops
Spark SQL - Left and Right Padding (lpad and rpad) Functions
Spark SQL - Check if String Contains a String
Spark SQL - isnull and isnotnull Functions
Spark SQL - Concatenate w/o Separator (concat_ws and concat)
Spark SQL - Create Map from Arrays via map_from_arrays Function
List Tables in Hive Database
Pandas DataFrame - Get Row and Column Count
Spark SQL - Get Next Monday, Tuesday, Wednesday, Thursday, etc.
Spark SQL - Make Date, Timestamp and Intervals
Spark SQL - Get Current Timezone
Spark SQL - Date and Timestamp Truncate Functions
Spark SQL - Extract Day, Month, Year and other Part from Date or Timestamp
Spark SQL - Add Day, Month and Year to Date
Pandas DataFrame - Iterate over Rows
Spark SQL - Return JSON Array Length (json_array_length)
Spark SQL - Return JSON Object Keys (json_object_keys)
Spark SQL - Conversion between UTC and Timestamp with Time Zone
Spark SQL - Date/Timestamp Conversation from/to UNIX Date/Timestamp
Spark SQL - Convert Date/Timestamp to String via date_format Function
Spark SQL - Convert Delimited String to Map using str_to_map Function
Spark SQL - element_at Function
Spark SQL - Convert String to Int (int and cast function)
Spark SQL - flatten Function
C# - Get Current and Last Month Start and End Date
.NET - Update EF Core Tools
C#: Get Index value in foreach Loop of IEnumerable
Create Partitioned Hive Table
Azure CLI - Get VM IP Address List
Spark SQL - Average (AVG) Calculation
PostgreSQL: Create and List User
Spark SQL - Group By
BigQuery - Convert Bytes to BASE64 or HEX String
BigQuery MD5, SHA1, SHA256, SHA512 Hash Functions
Hive - Rename Table
EF Core Transactions Example
Read Environment Variables in .NET Azure Functions
C#: Base64 URL Encoder and Decoder
Excel: Convert DateTime Column to Text
gzip Compress a directory (folder)
Teradata Role Access on Databases
Spark - Check if Array Column Contains Specific Value
BigQuery - First/Last Day of Month
BigQuery: Create Partitioned Table as SELECT
Spark "ROW_ID"
BigQuery SQL - WITH Clause
BigQuery SQL - SELECT * EXCEPT Clause
BigQuery SQL - Retrieve DISTINCT Values
BigQuery SQL - UNION ALL
BigQuery SQL - COALESCE and IFNULL Functions
Spark DataFrame: Show Full Column Content without Truncation
Deduplicate Spark DataFrame via dropDuplicates() and distinct()
Pandas Save DataFrame to SQLite
Teradata SQL - First Day of a Month
Git - Repoint Remote Repository URL
Convert Pandas DataFrame to Spark DataFrame
Create Pandas DataFrame from MySQL
Teradata: Grant Select Access on One or Several Columns Only
Teradata SQL: Rights on a Database
Teradata SQL: Rights Held by Database
Teradata - Add User to a Role
Spark SQL - Calculate Covariance
Spark SQL - Standard Deviation Calculation
Spark SQL - FIRST_VALUE or LAST_VALUE
Spark SQL - Convert Object to JSON String
Spark SQL - Extract Value from JSON String
Check Ports in Use on Windows
MySQL / MariaDB - Create Database User with Password
Check MySQL Database Version
Spark SQL - Construct Virtual Table in Memory using Values
Teradata Cast BIGINT to String
Teradata NULLIFZERO and ZEROIFNULL Function
Find out Teradata AMP (vprocs) Count
Scala: Read JSON file as Spark DataFrame
C# Regex - Remove Heading Tags
Teradata SQL - Round Numbers to Hundreds or Thousands
Teradata SQL - Get Rid of Spaces from String
Teradata SQL LIKE: Contains, Starts With, Ends With Functions
This page shows you how to use LIKE
Trim Leading or/and Trailing Zeros in Teradata
This page shows how to trim or remove leading or/and trailing zeros using Teradata SQL.
Truncate table in Teradata
Teradata SQL - Use OREPLACE to Replace or Remove Characters
Pandas DataFrame Plot - Scatter and Hexbin Chart
Pandas DataFrame Plot - Area Chart
Pandas DataFrame Plot - Pie Chart
Convert String to Date using Python
Pandas DataFrame Plot - Line Chart
Pandas DataFrame Plot - Bar Chart
Teradata SUBSTRING / SUBSTR and REGEXP_SUBSTR Functions
Teradata SQL - COALESCE and NULLIF Functions
Teradata SQL - TRYCAST / TRY_CAST Function
Teradata SQL - LEAD and LAG OLAP Functions
Teradata SQL - DATEADD Function Alternative for ADD / Subtract Days or Months
Ingest Data into Hadoop HDFS through Jupyter Notebook
Connect to SQL Server via JayDeBeApi in Python
Python JayDeBeApi module allows you to connect from Python to databases using Java JDBC drivers.
Read and Write XML files in PySpark
Create and Read Pickle Files in Python
Pass Environment Variables to Executors in PySpark
Check installed packages in Python
Different programming languages have different package management tools.
How to Kill Running Jobs in Hadoop
The following code snippet shows how to list and kill Hadoop jobs including (MapReduce and YARN jobs).
Read and parse JSON in SQL / Teradata
JSON is commonly used in modern applications for data storage and transfers. Pretty much all programming languages provide APIs to parse JSON.
Calculate Difference/Duration between Two Timestamps in PowerShell
This code snippet shows how to calculate time differences.
Read and parse JSON in Python
Extract Values from JSON String Column in SQL Server
JSON is commonly used in modern applications for data storage and transfers. Pretty much all programming languages provide APIs to parse JSON.
Write and read parquet files in Scala / Spark
Parquet is columnar store format published by Apache. It's commonly used in Hadoop ecosystem. There are many programming language APIs that have been implemented to support writing and reading parquet files.
Convert string to date in T-SQL / SQL Server
This code snippet shows how to convert string to date.
List Hadoop running jobs
Hadoop provides a number of CLIs. hadoop job command can be used to retrieve running job list. You can also use YARN resource manager UI to view the jobs too.
Check HDFS folder size in Shell / Hadoop
Hadoop provides a number of CLIs that can be used to perform many tasks/activities. This code snippet shows you how to check file/folder size in HDFS.
Read JSON file as Spark DataFrame in Python / Spark
Spark has easy fluent APIs that can be used to read data from JSON file as DataFrame object.
Select top N records in SQL / Teradata
In different databases, the syntax of selecting top N records are slightly different. They may also differ from ISO standards.
List all environment variables in PowerShell
Environment variables are commonly used in many frameworks or programs.
Write and read parquet files in Python / Spark
Parquet is columnar store format published by Apache. It's commonly used in Hadoop ecosystem. There are many programming language APIs that have been implemented to support writing and reading parquet files.
Calculate time difference in SQL / Teradata
This code snippet shows how to calculate time differences.
Convert string to date in Scala / Spark
This code snippet shows how to convert string to date.
Read JSON file as Spark DataFrame in Scala / Spark
Spark has easy fluent APIs that can be used to read data from JSON file as DataFrame object.
Convert List to Spark Data Frame in Scala / Spark
In Spark, SparkContext.parallelize function can be used to convert list of objects to RDD and then RDD can be converted to DataFrame object through SparkSession.
Convert string to date in Python / Spark
This code snippet shows how to convert string to date.
Convert varchar to date in SQL / Teradata
This code snippet shows how to convert string to date in Teradata.
Convert List to Spark Data Frame in Python / Spark
In Spark, SparkContext.parallelize function can be used to convert list of objects to RDD and then RDD can be converted to DataFrame object through SparkSession.
Select from dual in T-SQL / SQL Server
In Oracle database, you can select from dual table if you only want to return a one row result set. In many other databases, the query engine supports select directly from constant values without specifying a table name.
Select from dual in SQL / Hive
In Oracle database, you can select from dual table if you only want to return a one row result set. In many other databases, the query engine supports select directly from constant values without specifying a table name.
Calculate time difference in T-SQL / SQL Server
Select top N records in SQL / Hive
In different databases, the syntax of selecting top N records are slightly different. They may also differ from ISO standards.
Read and parse JSON in C# / .NET Framework
JSON is commonly used in modern applications for data storage and transfers. Pretty much all programming languages provide APIs to parse JSON.