-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhistory_indexer.py
137 lines (109 loc) · 4.38 KB
/
history_indexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from elasticsearch_dsl import Index, Document, Integer, Text, analyzer, Keyword, Double
from elasticsearch_dsl.connections import connections
from elasticsearch import Elasticsearch, helpers
from evaluation import get_relevance_label_df
from datetime import datetime
from tqdm import tqdm
import pandas as pd
import numpy as np
import logging
import json
import os
class QA(Document):
topic = Keyword()
orgSourceUrl = Text()
sourceUrl = Text() # wayBackUrl
sourceName = Text()
dateScraped = Keyword()
date = Keyword()
month = Keyword()
question = Text(analyzer='snowball')
answer = Text(analyzer='snowball')
question_answer = Text(analyzer='snowball')
def ingest_history_data(data, es, index):
""" Ingest data as a bulk of documents to ES index """
try:
docs = []
for pair in tqdm(data):
# initialize QA document
doc = QA()
if 'normTopic' in pair:
doc.topic = pair['normTopic']
if 'sourceUrl' in pair:
doc.sourceUrl = pair['wayBackUrl']
if 'sourceUrl' in pair:
doc.orgSourceUrl = pair['sourceUrl']
if 'sourceName' in pair:
doc.sourceName = pair['sourceName']
if 'dateScraped' in pair:
doc.dateScraped = pair['dateScraped']
if 'date' in pair:
date = str(pair['date'])
year = date[:4]
month = date[4:6]
day = date[-2:]
doc.date = day + "/" + month + "/" + year
if 'month' in pair:
doc.month = pair['month']
if 'question' in pair:
doc.question = pair['question']
if 'answer' in pair:
doc.answer = pair['answer']
if 'question' in pair and 'answer' in pair:
doc.question_answer = pair['question'] + " " + pair['answer']
docs.append(doc.to_dict(include_meta=False))
# bulk indexing
response = helpers.bulk(es, actions=docs, index=index, doc_type='doc')
except Exception:
logging.error('exception occured', exc_info=True)
def get_history_qa_pairs(filename):
""" Get faq qa pair list """
df = pd.read_csv(filename, sep='\t', header=0)
months = np.sort(df.month.unique())
faq_dfs = []
for m in months:
subset = df.loc[df['month'] <= m]
print(m + "\t" + str(len(subset)))
snapshot = {'month': m, 'data': subset}
faq_dfs.append(snapshot)
return faq_dfs
if __name__ == "__main__":
try:
# Ingesting data to Elasticsearch
es = connections.create_connection(hosts=['localhost'], http_auth=('elastic', 'elastic'))
dirnames = ["CovidFAQ"]
index_name = ""
faq_qa_pairs = []
# Define a list of months to display in the timeline
months = ['2020-03', '2020-04', '2020-05', '2020-06', '2020-07', '2020-08', '2020-09', '2020-10', '2020-11', '2020-12', '2021-01', '2021-02', '2021-03', '2021-04']
# Load history data
filename = './data/CovidFAQ/historical_faqs_for_indexing.tsv'
snapshots = get_history_qa_pairs(filename)
faq_qa_pairs = None
for m in months:
for s in snapshots:
if m == s['month']:
faq_qa_pair_df = s['data']
break
index_name = "covidfaq_" + m
faq_qa_pairs = faq_qa_pair_df.T.to_dict().values()
faq_qa_pairs = list(faq_qa_pairs)
print("{} records: ".format(index_name), len(faq_qa_pairs))
# Initialize index (only perform once)
index = Index(index_name)
# Define custom settings
index.settings(
number_of_shards=1,
number_of_replicas=0
)
# Delete the index, ignore if it doesn't exist
index.delete(ignore=404)
# Create the index in Elasticsearch
index.create()
# Register a document with the index
index.document(QA)
# Ingest data to Elasticsearch
ingest_history_data(faq_qa_pairs, es, index_name)
print("Finished indexing {} records to {} index".format(len(faq_qa_pairs), index_name))
except Exception:
logging.error('exception occured', exc_info=True)