POST /customer/doc/1/_update?pretty
{
"doc": { "name": "Jane Doe" }
}
To update an Elasticsearch document using the Spark connector, it is absolutely essential that the dataframe contains the document _id field so that ES updates the correct document. Therefore, when loading documents from Elasticsearch we must always make sure that we retrieve the _id for all documents. This is controlled by adding the Spark option
"es.read.metadata": "true"
when reading from Elasticsearch.
Setting up the scene
To give an example, we have extracted a small JSON file from the IMDB movie database and we are going to ingest it in Elasticsearch. To make the example simpler, we have chosen to include only specific fields (title, num_votes, genres and plots) from the JSON file.>>> json_movies = spark.read.json("/path/to/movies.json") >>> json_movies.select("title", "num_votes", "genres", "plots") \ ... .write.format("org.elasticsearch.spark.sql") \ ... .mode("Overwrite") \ ... .save("movies/data") >>> json_movies.count() 250
Interacting with Elasticsearch
Now that we have ingested all documents to Elasticsearch, we are going to query Elasticsearch and retrieve the documents to a Spark dataframe.>>> es_conf = { ... "es.read.metadata": "true", (1) ... "es.read.field.as.array.include" : "genres, plots" ... } >>> movies = spark.read.format("org.elasticsearch.spark.sql") \ ... .options(**es_conf).load("movies/data") >>> movies.count() (2) 250 >>> movies.select("title", "_metadata", "num_votes").head() (3)
Row(title=u'Throne of Blood', _metadata={u'_score': None, u'_type': u'data', u'_id': u'AWB5p5DxB6YFESvhmC1k', u'_index': u'movies'}, num_votes=30191)(1) Also, include the document metadata, a dictionary that includes the document _id. (2) Verify that 250 documents have been written correctly. (3) Print a sample document. In this example we decided not to create a custom _id field for each document. In this case, Elasticsearch will create and assign a unique id. Notice that the _id field is included in the _metadata object. Next, we are going to search for all movies that have been voted more than 500,000 votes and mark them as ‘hot’, since they are highly voted. So, before starting we want to make sure that currently there are no movies marked as hot.
movies.filter("hot == true").count() # Error: pyspark.sql.utils.AnalysisException: u"cannot resolve '`hot`' given input columns: [title, plots, _metadata, num_votes, genres];
>>> movies.filter("num_votes >= 500000").count() 78The error above signifies that there is no document containing the field hot. However, there are 78 movies that have been voted at least 500,000 times. From those document we will only work with fields _metadata[‘_id’] and num_votes.
>>> movies.select("title", "_metadata._id", "num_votes").show(4) +--------------------+--------------------+---------+ | title| _id|num_votes| +--------------------+--------------------+---------+ | Catch Me If You Can|AWBzpUimB6YFESvhmCoR| 498525| |Spring, Summer, F...|AWBzpUimB6YFESvhmCoZ| 61194| |Lagaan: Once Upon...|AWBzpUimB6YFESvhmCoa| 67322| | Reservoir Dogs|AWBzpUqPB6YFESvhmCof| 643090| +--------------------+--------------------+---------+We are going to create a new dataframe containing the hot movies. The dataframe should contain only their id and a column named hot = True.
>> from pyspark.sql.functions import expr, lit >>> hot = movies.filter(movies["num_votes"] > 500000) \ ... .select(expr("_metadata._id as id")) \ ... .withColumn("hot", lit(True)) >>> hot.count() 78 >>> hot.head() Row(id=u'AWCjCgsKDHrqgAB6UlJ5', hot=True)
Updating the index
Finally, the last step is to update the existing movies index with the information of this dataframe:>>> es_conf = { ... "es.mapping.id": "id", (1) ... "es.mapping.exclude": "id", (2) ... "es.write.operation": "update" (3) ... } >>> hot.write.format("org.elasticsearch.spark.sql") \ ... .options(**es_conf) \ ... .mode("append") \ ... .save("movies/data")
Pay attention to the important bits of this update:
(1) We must inform the connector that the document _id to will be found in the id field. (2) We must exclude the id field from being saved in the movies collection. If we ommit this line, there will be a new field named id (3) Use the 'update' operation so that existing documents are updated. Also, this can be 'upsert' in case documents not found in the index should be created.On this page
Share this
Share this
More resources
Learn more about Pythian by reading the following blogs and articles.
SELECT COUNT(*) FROM Tab -> No Rows Selected
SELECT COUNT(*) FROM Tab -> No Rows Selected
Jul 11, 2007 12:00:00 AM
4
min read
Oracle upgrade failures due to METHOD_OPT and XDBCONFIG
Oracle upgrade failures due to METHOD_OPT and XDBCONFIG
Nov 11, 2015 12:00:00 AM
7
min read
Reducing Contention on Hot Cursor Objects (Cursor: Pin S)

Reducing Contention on Hot Cursor Objects (Cursor: Pin S)
Oct 20, 2020 12:00:00 AM
16
min read
Ready to unlock value from your data?
With Pythian, you can accomplish your data transformation goals and more.