#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) Alibaba Cloud Computing
# All rights reserved.
import logging
import time
from multiprocessing import Pool
from elasticsearch import Elasticsearch
from .. import LogClient, LogException
from ..version import ES_MIGRATION_USER_AGENT
from .collection_task import CollectionTaskStatus, run_collection_task
from .collection_task_config import CollectionTaskConfig
from .index_logstore_mappings import IndexLogstoreMappings
from .mapping_index_converter import MappingIndexConverter
from .util import split_and_strip
results = []
def log_result(result):
results.append(result)
[docs]class MigrationManager(object):
"""
MigrationManager, migrate data from elasticsearch to aliyun log service
:type hosts: string
:param hosts: a comma-separated list of source ES nodes. e.g. "localhost:9200,other_host:9200"
:type indexes: string
:param indexes: a comma-separated list of source index names. e.g. "index1,index2"
:type query: string
:param query: used to filter docs, so that you can specify the docs you want to migrate. e.g. '{"query": {"match": {"title": "python"}}}'
:type scroll: string
:param scroll: specify how long a consistent view of the index should be maintained for scrolled search. e.g. "5m"
:type endpoint: string
:param endpoint: specify the endpoint of your log services. e.g. "cn-beijing.log.aliyuncs.com"
:type project_name: string
:param project_name: specify the project_name of your log services. e.g. "your_project"
:type access_key_id: string
:param access_key_id: specify the access_key_id of your account.
:type access_key: string
:param access_key: specify the access_key of your account.
:type logstore_index_mappings: string
:param logstore_index_mappings: specify the mappings of log service logstore and ES index. e.g. '{"logstore1": "my_index*", "logstore2": "index1,index2"}, "logstore3": "index3"}'
:type pool_size: int
:param pool_size: specify the size of process pool. e.g. 10
:type time_reference: string
:param time_reference: specify what ES doc's field to use as log's time field. e.g. "field1"
:type source: string
:param source: specify the value of log's source field. e.g. "your_source"
:type topic: string
:param topic: specify the value of log's topic field. e.g. "your_topic"
:type wait_time_in_secs: int
:param wait_time_in_secs: specify the waiting time between initialize aliyun log and executing data migration task. e.g. 60
:type auto_creation: bool
:param auto_creation: specify whether to let the tool create logstore and index automatically for you. e.g. True
"""
def __init__(self, hosts=None, indexes=None, query=None, scroll="5m", endpoint=None, project_name=None,
access_key_id=None, access_key=None, logstore_index_mappings=None, pool_size=10, time_reference=None,
source=None, topic=None, wait_time_in_secs=60, auto_creation=True):
self.hosts = hosts
self.indexes = indexes
self.query = query
self.scroll = scroll
self.endpoint = endpoint
self.project_name = project_name
self.access_key_id = access_key_id
self.access_key = access_key
self.logstore_index_mappings = logstore_index_mappings
self.pool_size = pool_size
self.time_reference = time_reference
self.source = source
self.topic = topic
self.wait_time_in_secs = wait_time_in_secs
self.auto_creation = auto_creation
def migrate(self):
es = Elasticsearch(split_and_strip(self.hosts, ","))
log_client = LogClient(self.endpoint, self.access_key_id, self.access_key)
log_client.set_user_agent(ES_MIGRATION_USER_AGENT)
index_lst = self.get_index_lst(es, self.indexes)
index_logstore_mappings = IndexLogstoreMappings(index_lst, self.logstore_index_mappings)
if self.auto_creation:
self.init_aliyun_log(es, log_client, self.project_name, index_logstore_mappings, self.wait_time_in_secs)
shard_cnt = self.get_shard_count(es, self.indexes, self.query)
p = Pool(min(shard_cnt, self.pool_size))
for i in range(shard_cnt):
config = CollectionTaskConfig(task_id=i,
slice_id=i,
slice_max=shard_cnt,
hosts=self.hosts,
indexes=self.indexes,
query=self.query,
scroll=self.scroll,
endpoint=self.endpoint,
project=self.project_name,
access_key_id=self.access_key_id,
access_key=self.access_key,
index_logstore_mappings=index_logstore_mappings,
time_reference=self.time_reference,
source=self.source,
topic=self.topic)
p.apply_async(func=run_collection_task, args=(config,), callback=log_result)
p.close()
p.join()
return self.logging_summary_info(shard_cnt)
@classmethod
def logging_summary_info(cls, shard_cnt):
total_started_task_cnt = shard_cnt
success_task_cnt = 0
fail_task_cnt = 0
doc_cnt = 0
summary_info = ""
logging.info("========Tasks Info========")
summary_info += "========Tasks Info========" + "\n"
for res in results:
logging.info(res)
summary_info += str(res) + "\n"
doc_cnt += res.count
if res.status == CollectionTaskStatus.SUCCESS:
success_task_cnt += 1
else:
fail_task_cnt += 1
logging.info("========Summary========")
summary_info += "========Summary========" + "\n"
total_started_task_cnt_info = "Total started task count: %d" % total_started_task_cnt
logging.info(total_started_task_cnt_info)
summary_info += total_started_task_cnt_info + "\n"
success_task_cnt_info = "Successful task count: %d" % success_task_cnt
logging.info(success_task_cnt_info)
summary_info += success_task_cnt_info + "\n"
fail_task_cnt_info = "Failed task count: %d" % fail_task_cnt
logging.info(fail_task_cnt_info)
summary_info += fail_task_cnt_info + "\n"
doc_cnt_info = "Total collected documentation count: %d" % doc_cnt
logging.info(doc_cnt_info)
summary_info += doc_cnt_info + "\n"
return summary_info
@classmethod
def get_shard_count(cls, es, indexes, query=None):
resp = es.count(index=indexes, body=query)
return resp["_shards"]["total"]
@classmethod
def get_index_lst(cls, es, indexes):
resp = es.indices.stats(index=indexes)
return resp["indices"].keys()
@classmethod
def init_aliyun_log(cls, es, log_client, project_name, index_logstore_mappings, wait_time_in_secs):
logging.info("Start to init aliyun log")
cls._create_logstores(log_client, project_name, index_logstore_mappings)
cls._create_index_configs(es, log_client, project_name, index_logstore_mappings)
logging.info("Init aliyun log successfully")
logging.info("Enter wating time, wait_time_in_secs=%d", wait_time_in_secs)
time.sleep(wait_time_in_secs)
logging.info("Exit wating time")
@classmethod
def _create_logstores(cls, log_client, project_name, index_logstore_mappings):
logstores = index_logstore_mappings.get_all_logstores()
for logstore in logstores:
try:
log_client.create_logstore(project_name=project_name, logstore_name=logstore)
except LogException as e:
if e.get_error_code() == "LogStoreAlreadyExist":
logging.info("The logstore %s is already exist, skip the creation step.", logstore)
continue
else:
raise
@classmethod
def _create_index_configs(cls, es, log_client, project_name, index_logstore_mappings):
logstores = index_logstore_mappings.get_all_logstores()
for logstore in logstores:
indexes = index_logstore_mappings.get_indexes(logstore)
first_index = True
for index in indexes:
resp = es.indices.get(index=index)
for mapping in resp[index]["mappings"].values():
index_config = MappingIndexConverter.to_index_config(mapping)
if first_index:
try:
log_client.create_index(project_name, logstore, index_config)
first_index = False
except LogException as e:
if e.get_error_code() == "IndexAlreadyExist":
continue
else:
raise
else:
log_client.update_index(project_name, logstore, index_config)