Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for exporting to OpenSearch #288

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,34 @@ That's it! Except, if your Elasticsearch database is not located at `http://loca
username = 'root'
secret = 'password'

### OpenSearch

news-please supports export to OpenSearch. OpenSearch is the same as ElasticSearch with minor differences in the client implementation, so has same features that ElasticSearch

Example pipline settings
[Scrapy]

ITEM_PIPELINES = {
'newsplease.pipeline.pipelines.ArticleMasterExtractor':100,
'newsplease.pipeline.pipelines.OpensearchStorage':350
}

if your Opensearch database is not located at `http://localhost:9200`, uses a different username/password or CA-certificate authentication. In these cases, you will also need to change the following.

[Opensearch]

host = localhost
port = 9200
# means that requesth should be send through https
use_ssl = True

...

# Credentials used for authentication
username = 'root'
secret = 'password'


### PostgreSQL
news-please allows for storing of articles to a PostgreSQL database, including the versioning feature. To export to PostgreSQL, open the corresponding config file (`config_lib.cfg` for library mode and `config.cfg` for CLI mode) and add the PostgresqlStorage module to the pipeline and adjust the database credentials:

Expand Down
54 changes: 52 additions & 2 deletions newsplease/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import psycopg2
import pymysql
from elasticsearch import Elasticsearch
from opensearchpy import OpenSearch
from scrapy.utils.log import configure_logging

from newsplease.pipeline.pipelines import RedisStorageClient
Expand Down Expand Up @@ -55,6 +56,7 @@ class NewsPleaseLauncher(object):
mysql = None
postgresql = None
elasticsearch = None
opensearch = None
redis = None
number_of_active_crawlers = 0
config_directory_default_path = "~/news-please-repo/config/"
Expand All @@ -63,13 +65,14 @@ class NewsPleaseLauncher(object):

__single_crawler = False

def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch,
def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch, is_reset_opensearch,
is_reset_json, is_reset_mysql, is_reset_postgresql, is_reset_redis, is_no_confirm, library_mode=False):
"""
The constructor of the main class, thus the real entry point to the tool.
:param cfg_file_path:
:param is_resume:
:param is_reset_elasticsearch:
:param is_reset_opensearch:
:param is_reset_json:
:param is_reset_mysql:
:param is_reset_postgresql:
Expand Down Expand Up @@ -113,6 +116,7 @@ def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch,
self.mysql = self.cfg.section("MySQL")
self.postgresql = self.cfg.section("Postgresql")
self.elasticsearch = self.cfg.section("Elasticsearch")
self.opensearch = self.cfg.section("Opensearch")
self.redis = self.cfg.section("Redis")

# perform reset if given as parameter
Expand All @@ -124,10 +128,13 @@ def __init__(self, cfg_directory_path, is_resume, is_reset_elasticsearch,
self.reset_files()
if is_reset_elasticsearch:
self.reset_elasticsearch()
if is_reset_opensearch:
self.reset_opensearch()
if is_reset_redis:
self.reset_redis()

# close the process
if is_reset_elasticsearch or is_reset_json or is_reset_mysql or is_reset_postgresql or is_reset_redis:
if is_reset_elasticsearch or is_reset_opensearch or is_reset_json or is_reset_mysql or is_reset_postgresql or is_reset_redis:
sys.exit(0)

self.json_file_path = self.cfg_directory_path + self.cfg.section('Files')['url_input_file_name']
Expand Down Expand Up @@ -497,6 +504,45 @@ def reset_elasticsearch(self):
self.log.error("Failed to connect to Elasticsearch. "
"Please check if the database is running and the config is correct: %s" % error)

def reset_opensearch(self):
"""
Resets the Opensearch Database.
"""

print("""
Cleanup OpenSearch database:
This will truncate all tables and reset the whole OpenSearch database.
""")

confirm = self.no_confirm

if not confirm:
confirm = 'yes' in builtins.input(
"""
Do you really want to do this? Write 'yes' to confirm: {yes}"""
.format(yes='yes' if confirm else ''))

if not confirm:
print("Did not type yes. Thus aborting.")
return

try:
# initialize DB connection
conn = OpenSearch(
hosts=[{"host": self.opensearch["host"], "port": self.opensearch["port"]}],
http_compress=True,
http_auth=(str(self.opensearch["username"]), str(self.opensearch["secret"])),
use_ssl=self.opensearch["use_ssl"]
)

print("Resetting OpensSearch database...")
conn.indices.delete(index=self.opensearch["index_current"], ignore=[400, 404])
conn.indices.delete(index=self.opensearch["index_archive"], ignore=[400, 404])
except ConnectionError as error:
self.log.error("Failed to connect to OpenSearch. "
"Please check if the database is running and the config is correct: %s" % error)


def reset_files(self):
"""
Resets the local data directory.
Expand Down Expand Up @@ -709,6 +755,7 @@ def stop(self):
cfg_file_path=plac.Annotation('path to the config file', 'option', 'c'),
resume=plac.Annotation('resume crawling from last process', 'flag'),
reset_elasticsearch=plac.Annotation('reset Elasticsearch indexes', 'flag'),
reset_opensearch=plac.Annotation('reset Opensearch indexes', 'flag'),
reset_json=plac.Annotation('reset JSON files', 'flag'),
reset_mysql=plac.Annotation('reset MySQL database', 'flag'),
reset_postgresql=plac.Annotation('reset Postgresql database', 'flag'),
Expand All @@ -720,6 +767,7 @@ def cli(
cfg_file_path,
resume,
reset_elasticsearch,
reset_opensearch,
reset_json,
reset_mysql,
reset_postgresql,
Expand All @@ -731,6 +779,7 @@ def cli(

if reset_all:
reset_elasticsearch = True
reset_opensearch = True
reset_json = True
reset_mysql = True
reset_postgresql = True
Expand All @@ -743,6 +792,7 @@ def cli(
cfg_file_path,
resume,
reset_elasticsearch,
reset_opensearch,
reset_json,
reset_mysql,
reset_postgresql,
Expand Down
40 changes: 40 additions & 0 deletions newsplease/config/config.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,46 @@ mapping = {"properties": {
}}


[Opensearch]

# Elasticsearch-Connection required for saving detailed meta-information
host = 'localhost'
port = 9200
use_ssl = True
index_current = 'news-please'
index_archive = 'news-please-archive'

# Opensearch supports user authentication by CA certificates, but not implemented yet
# use_ca_certificates = False
# ca_cert_path = /path/to/cacert.pem
# client_cert_path = /path/to/client_cert.pem
# client_key_path = /path/to/client_key.pem
username = 'root'
secret = 'password'

# Properties of the document type used for storage.
mapping = {"properties": {
"url": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"source_domain": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"title_page": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"title_rss": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"localpath": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"filename": {"type": "keyword"},
"ancestor": {"type": "keyword"},
"descendant": {"type": "keyword"},
"version": {"type": "long"},
"date_download": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"date_modify": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"date_publish": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"title": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"description": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"text": {"type": "text"},
"authors": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"image_url": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"language": {"type": "keyword"}
}}


[Redis]

# Redis required for saving meta-information
Expand Down
40 changes: 40 additions & 0 deletions newsplease/config/config_lib.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,46 @@ mapping = {
}


[Opensearch]

# Elasticsearch-Connection required for saving detailed meta-information
host = 'localhost'
port = 9200
use_ssl = True
index_current = 'news-please'
index_archive = 'news-please-archive'

# Opensearch supports user authentication by CA certificates, but not implemented yet
# use_ca_certificates = False
# ca_cert_path = /path/to/cacert.pem
# client_cert_path = /path/to/client_cert.pem
# client_key_path = /path/to/client_key.pem
username = 'root'
secret = 'password'

# Properties of the document type used for storage.
mapping = {"properties": {
"url": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"source_domain": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"title_page": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"title_rss": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"localpath": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"filename": {"type": "keyword"},
"ancestor": {"type": "keyword"},
"descendant": {"type": "keyword"},
"version": {"type": "long"},
"date_download": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"date_modify": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"date_publish": {"type": "date", "format":"yyyy-MM-dd HH:mm:ss"},
"title": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"description": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"text": {"type": "text"},
"authors": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"image_url": {"type": "text","fields":{"keyword":{"type":"keyword"}}},
"language": {"type": "keyword"}
}}


[Redis]

# Redis required for saving meta-information
Expand Down
89 changes: 89 additions & 0 deletions newsplease/pipeline/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import psycopg2
from dateutil import parser as dateparser
from elasticsearch import Elasticsearch
from opensearchpy import OpenSearch
from scrapy.exceptions import DropItem

from redis import StrictRedis
Expand Down Expand Up @@ -664,6 +665,94 @@ def process_item(self, item, spider):
self.log.error("Lost connection to Elasticsearch, this module will be deactivated: %s" % error)
return item

class OpensearchStorage(ExtractedInformationStorage):
"""
Handles remote storage of the meta data in Elasticsearch or Opensearch
"""

log = None
cfg = None
es = None
index_current = None
index_archive = None
mapping = None
running = False

def __init__(self):
self.log = logging.getLogger('opensearch.trace')
self.log.addHandler(logging.NullHandler())
self.cfg = CrawlerConfig.get_instance()
self.database = self.cfg.section("Opensearch")

self.conn = OpenSearch(
hosts=[{"host": self.database["host"], "port": self.database["port"]}],
http_compress = True,
http_auth=(str(self.database["username"]), str(self.database["secret"])),
use_ssl = self.database["use_ssl"]
)
self.index_current = self.database["index_current"]
self.index_archive = self.database["index_archive"]
self.mapping = self.database["mapping"]

# check connection to Database and set the configuration

try:
# check if server is available
self.conn.ping()

# raise logging level due to indices.exists() habit of logging a warning if an index doesn't exist.
os_log = logging.getLogger('elasticsearch')
os_level = os_log.getEffectiveLevel()
os_log.setLevel('ERROR')

# check if the necessary indices exist and create them if needed
if not self.conn.indices.exists(index=self.index_current):
self.conn.indices.create(index=self.index_current, ignore=[400, 404])
self.conn.indices.put_mapping(index=self.index_current, body=self.mapping)
if not self.conn.indices.exists(self.index_archive):
self.conn.indices.create(index=self.index_archive, ignore=[400, 404])
self.conn.indices.put_mapping(index=self.index_archive, body=self.mapping)
self.running = True

# restore previous logging level
os_log.setLevel(os_level)

except ConnectionError as error:
self.running = False
self.log.error("Failed to connect to Opensearch, this module will be deactivated. "
"Please check if the database is running and the config is correct: %s" % error)

def process_item(self, item, spider):

if self.running:
try:
version = 1
ancestor = None

# search for previous version
request = self.conn.search(index=self.index_current, body={'query': {'match': {'url.keyword': item['url']}}})
if request['hits']['total']['value'] > 0:
# save old version into index_archive
old_version = request['hits']['hits'][0]
old_version['_source']['descendent'] = True
self.conn.index(index=self.index_archive, body=old_version['_source'])
version += 1
ancestor = old_version['_id']

# save new version into old id of index_current
self.log.info("Saving to Opensearch: %s" % item['url'])
extracted_info = ExtractedInformationStorage.extract_relevant_info(item)
extracted_info['ancestor'] = ancestor
extracted_info['version'] = version
self.conn.index(index=self.index_current, id=ancestor,
body=extracted_info)


except ConnectionError as error:
self.running = False
self.log.error("Lost connection to Opensearch, this module will be deactivated: %s" % error)
return item


class DateFilter(object):
"""
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PyMySQL>=0.7.9
psycopg2-binary>=2.8.4
hjson>=1.5.8
elasticsearch>=2.4
opensearch-py>=2.7
beautifulsoup4>=4.3.2
readability-lxml>=0.6.2
langdetect>=1.0.7
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"psycopg2-binary>=2.8.4",
"hjson>=1.5.8",
"elasticsearch>=2.4",
"opensearch-py>=2.7",
"beautifulsoup4>=4.3.2",
"readability-lxml>=0.6.2",
"langdetect>=1.0.7",
Expand Down