Текст википедии используется во множестве экспериментов машинного обучения, но как его получить самостоятельно, например для самой свежей версии или редкого языка?
Достаточно функциональной я нашёл библиотеку mwparserfromhell
Распаршенные данные можно сохранить в текстовых файлах, базе данных или в табличном формате на распределённой файловой системе. Но начать в любом случае нужно с дампа, который находится на dumps.wikimedia.org
Для параллельной обработки любым из способов удобнее будет скачать дамп в виде множества файлов вида enwiki-20200201-pages-articles-multistream1.xml-p10p30302.bz2
Для локальной обработки подойдёт код вида:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import xml.etree.ElementTree as ET from tqdm import tqdm import mwparserfromhell wikidump = "enwiki-20200201-pages-articles-multistream1.xml-p10p30302" doc = {} fields = { "title": "title", "text": "text", } for _, elem in tqdm(ET.iterparse(wikidump, events=("end",))): prefix, has_namespace, postfix = elem.tag.partition('}') tag = postfix if postfix else prefix if tag in fields: doc[fields[tag]] = elem.text if tag == "page": wikicode = mwparserfromhell.parse(doc["text"]) doc["plain_text"] = wikicode.strip_code() doc["wikilinks"] = wikicode.filter_wikilinks() doc["external_links"] = wikicode.filter_external_links() doc["templates"] = wikicode.filter_templates() elem.clear() |
Но mwparserfromhell довольно требовательная к ресурсам библиотека. На обработку всей английской википедии требуется порядка 8 CPU дней. Поэтому воспользуется фреймворком распределённых вычислений Apache Spark.
Для доступности со всех вычислительных нод, следует разместить данные в распределённой файловой системе HDFS. Предварительно распаковав bz2 файлы, чтобы не тратить на это вычисления при каждом чтении.
1 |
hdfs dfs -put enwiki-20200201-pages-articles-multistream*.xml* hdfs://namenode:9000/user/root/wiki/enwiki-20200201/ |
Теперь xml файлы доступны для обработки, но ещё не разделены на отдельные страницы. Сохранить отдельные документы я решил в parquet файлы, которые могут хранить схематизированные таблицы. Для работы с xml я решил воспользоваться библиотекой com.databricks:spark-xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
spark-shell --master spark://sparkmaster:7077 --packages com.databricks:spark-xml_2.12:0.8.0 --executor-memory 12G --executor-cores 8 Spark context available as 'sc' (master = spark://ryzen:7077, app id = app-20200314145829-0001). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-preview2 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) Type in expressions to have them evaluated. Type :help for more information. import org.apache.spark.sql.SparkSession import com.databricks.spark.xml._ spark.read.option("rowTag", "page").xml("hdfs://namenode:9000/user/root/wiki/enwiki-20200201").select("id", "title", "revision.text._VALUE").withColumnRenamed("_VALUE", "text").write.format("parquet").save("hdfs://namenode:9000/user/root/wiki_parsed.parquet") |
Далее из текстов, содержащих wiki разметку требуется выделить чистый текст. В mwparserfromhell для этого есть метод strip_code().
Т.к. не на всех нодах нужная python библиотека может быть предустановлена, выполним дополнительный шаг prepare c pip install —user mwparserfromhell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import ArrayType, StringType, StructType, StructField from pyspark import SparkConf import subprocess import sys import mwparserfromhell def prepare(x): subprocess.check_call([sys.executable, "-m", "pip", "install", "--user", "mwparserfromhell"]) return x def parse_wiki(s): wikicode = mwparserfromhell.parse(s) return ( wikicode.strip_code(), [str(x) for x in wikicode.filter_wikilinks()], [str(x) for x in wikicode.filter_external_links()], [str(x) for x in wikicode.filter_templates()], ) if __name__ == "__main__": hdfs_base = "hdfs://namenode:9000/user/root/" conf = SparkConf().setAll([ ('spark.executor.memory', '12g'), ('spark.executor.cores', '8'), ('spark.hadoop.dfs.replication', '2'), ]) spark = SparkSession.builder.appName("Wiki Word Count").config(conf=conf).getOrCreate() spark.sparkContext.parallelize(range(100)).mapPartitions(prepare).collect() df = spark.read.load(hdfs_base + "wiki_parsed.parquet") my_udf = udf(lambda z: parse_wiki(z if z else ""), StructType([ StructField("plaintext", StringType()), StructField("wikilinks", ArrayType(StringType())), StructField("external_links", ArrayType(StringType())), StructField("templates", ArrayType(StringType())), ])) parsed_df = df.withColumn("parsed", my_udf(df.text)).select( "id", "title", "parsed.plaintext", "parsed.wikilinks", "parsed.external_links", "parsed.templates" ) parsed_df.write.save(hdfs_base + "wiki_mwparserfromhell.parquet", format="parquet") |
Для его запуска нужно выполнить
1 |
spark-submit --master spark://sparkmaster:7077 wiki_plaintext.py |
После выполнения скрипта, можно посмотреть на результат
1 2 3 4 5 6 7 8 9 |
hdfs_base = "hdfs://namenode:9000/user/root/" df = spark.read.load(hdfs_base + "wiki_mwparserfromhell.parquet") df.limit(1).show() +-------+--------------------+--------------------+--------------------+--------------+--------------------+ | id| title| plaintext| wikilinks|external_links| templates| +-------+--------------------+--------------------+--------------------+--------------+--------------------+ |1363416|Calliope (disambi...|Calliope is the m...|[[[Calliope]], [[...| []|[{{Wiktionary|Cal...| +-------+--------------------+--------------------+--------------------+--------------+--------------------+ |