POST /customer/doc/1/_update?pretty
{
"doc": { "name": "Jane Doe" }
}
To update an Elasticsearch document using the Spark connector, it is absolutely essential that the dataframe contains the document _id field so that ES updates the correct document. Therefore, when loading documents from Elasticsearch we must always make sure that we retrieve the _id for all documents. This is controlled by adding the Spark option "es.read.metadata": "true" when reading from Elasticsearch.
>>> json_movies = spark.read.json("/path/to/movies.json") >>> json_movies.select("title", "num_votes", "genres", "plots") \ ... .write.format("org.elasticsearch.spark.sql") \ ... .mode("Overwrite") \ ... .save("movies/data") >>> json_movies.count() 250
>>> es_conf = { ... "es.read.metadata": "true", (1) ... "es.read.field.as.array.include" : "genres, plots" ... } >>> movies = spark.read.format("org.elasticsearch.spark.sql") \ ... .options(**es_conf).load("movies/data") >>> movies.count() (2) 250 >>> movies.select("title", "_metadata", "num_votes").head() (3)
Row(title=u'Throne of Blood', _metadata={u'_score': None, u'_type': u'data', u'_id': u'AWB5p5DxB6YFESvhmC1k', u'_index': u'movies'}, num_votes=30191)(1) Also, include the document metadata, a dictionary that includes the document _id. (2) Verify that 250 documents have been written correctly. (3) Print a sample document. In this example we decided not to create a custom _id field for each document. In this case, Elasticsearch will create and assign a unique id. Notice that the _id field is included in the _metadata object. Next, we are going to search for all movies that have been voted more than 500,000 votes and mark them as ‘hot’, since they are highly voted. So, before starting we want to make sure that currently there are no movies marked as hot.
movies.filter("hot == true").count() # Error: pyspark.sql.utils.AnalysisException: u"cannot resolve '`hot`' given input columns: [title, plots, _metadata, num_votes, genres];
>>> movies.filter("num_votes >= 500000").count() 78The error above signifies that there is no document containing the field hot. However, there are 78 movies that have been voted at least 500,000 times. From those document we will only work with fields _metadata[‘_id’] and num_votes.
>>> movies.select("title", "_metadata._id", "num_votes").show(4) +--------------------+--------------------+---------+ | title| _id|num_votes| +--------------------+--------------------+---------+ | Catch Me If You Can|AWBzpUimB6YFESvhmCoR| 498525| |Spring, Summer, F...|AWBzpUimB6YFESvhmCoZ| 61194| |Lagaan: Once Upon...|AWBzpUimB6YFESvhmCoa| 67322| | Reservoir Dogs|AWBzpUqPB6YFESvhmCof| 643090| +--------------------+--------------------+---------+We are going to create a new dataframe containing the hot movies. The dataframe should contain only their id and a column named hot = True.
>> from pyspark.sql.functions import expr, lit >>> hot = movies.filter(movies["num_votes"] > 500000) \ ... .select(expr("_metadata._id as id")) \ ... .withColumn("hot", lit(True)) >>> hot.count() 78 >>> hot.head() Row(id=u'AWCjCgsKDHrqgAB6UlJ5', hot=True)
>>> es_conf = { ... "es.mapping.id": "id", (1) ... "es.mapping.exclude": "id", (2) ... "es.write.operation": "update" (3) ... } >>> hot.write.format("org.elasticsearch.spark.sql") \ ... .options(**es_conf) \ ... .mode("append") \ ... .save("movies/data")
(1) We must inform the connector that the document _id to will be found in the id field. (2) We must exclude the id field from being saved in the movies collection. If we ommit this line, there will be a new field named id (3) Use the 'update' operation so that existing documents are updated. Also, this can be 'upsert' in case documents not found in the index should be created.
Ready to optimize your Database for the future?