Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush-shah authored Nov 26, 2024
2 parents acb5aba + 2e9efe5 commit 32a445b
Show file tree
Hide file tree
Showing 98 changed files with 4,370 additions and 1,741 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.importer import import_test_case_class
from metadata.utils import importer

if TYPE_CHECKING:
from pandas import DataFrame
Expand Down Expand Up @@ -59,7 +59,8 @@ def __init__(
"""
self._test_case = test_case
self.runner = runner
self.validator_cls: Type[BaseTestValidator] = import_test_case_class(
# TODO this will be removed on https://github.com/open-metadata/OpenMetadata/pull/18716
self.validator_cls: Type[BaseTestValidator] = importer.import_test_case_class(
entity_type,
self._get_source_type(),
self.test_case.testDefinition.fullyQualifiedName, # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,20 @@
from metadata.data_quality.validations.runtime_param_setter.table_diff_params_setter import (
TableDiffParamsSetter,
)
from metadata.data_quality.validations.table.sqlalchemy.tableDiff import (
TableDiffValidator,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.sampler.sqlalchemy.sampler import SQASampler

# We want to use the explicit class name here but the packages might not exist
try:
from metadata.data_quality.validations.table.sqlalchemy.tableDiff import (
TableDiffValidator,
)
except ImportError:

class TableDiffValidator:
pass


def removesuffix(s: str, suffix: str) -> str:
"""A custom implementation of removesuffix for python versions < 3.9
Expand Down
14 changes: 9 additions & 5 deletions ingestion/src/metadata/ingestion/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@
from typing import Generic, Optional, TypeVar

from pydantic import BaseModel, Field
from typing_extensions import Annotated

from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)

# Entities are instances of BaseModel
Entity = BaseModel

T = TypeVar("T")


class Either(BaseModel, Generic[T]):
"""Any execution should return us Either an Entity of an error for us to handle"""

left: Optional[StackTraceError] = Field(
None, description="Error encountered during execution"
)
right: Optional[T] = Field(None, description="Correct instance of an Entity")
left: Annotated[
Optional[StackTraceError],
Field(description="Error encountered during execution", default=None),
]
right: Annotated[
Optional[T], Field(description="Correct instance of an Entity", default=None)
]
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
logger.debug(
"Requesting [dbt_catalog], [dbt_manifest] and [dbt_run_results] data"
)
params_data = {"order_by": "-finished_at", "limit": "1", "status": "10"}
params_data = {"order_by": "-finished_at", "limit": "1"}
if project_id:
params_data["project_id"] = project_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Elasticsearch source to extract metadata
"""
import shutil
import traceback
from pathlib import Path
from typing import Any, Iterable, Optional

Expand All @@ -21,6 +22,7 @@
CreateSearchIndexRequest,
)
from metadata.generated.schema.entity.data.searchIndex import (
IndexType,
SearchIndex,
SearchIndexSampleData,
)
Expand Down Expand Up @@ -103,6 +105,7 @@ def yield_search_index(
fields=parse_es_index_mapping(
search_index_details.get(index_name, {}).get("mappings")
),
indexType=IndexType.Index,
)
yield Either(right=search_index_request)
self.register_record(search_index_request=search_index_request)
Expand Down Expand Up @@ -143,6 +146,56 @@ def yield_search_index_sample_data(
)
)

def get_search_index_template_list(self) -> Iterable[dict]:
"""
Get List of all search index template
"""
yield from self.client.indices.get_index_template().get("index_templates", [])

def get_search_index_template_name(
self, search_index_template_details: dict
) -> Optional[str]:
"""
Get Search Index Template Name
"""
return search_index_template_details and search_index_template_details["name"]

def yield_search_index_template(
self, search_index_template_details: Any
) -> Iterable[Either[CreateSearchIndexRequest]]:
"""
Method to Get Search Index Template Entity
"""
try:
if self.source_config.includeIndexTemplate:
index_name = self.get_search_index_template_name(
search_index_template_details
)
index_template = search_index_template_details["index_template"]
if index_name:
search_index_template_request = CreateSearchIndexRequest(
name=EntityName(index_name),
displayName=index_name,
searchIndexSettings=index_template.get("template", {}).get(
"settings", {}
),
service=FullyQualifiedEntityName(
self.context.get().search_service
),
fields=parse_es_index_mapping(
index_template.get("template", {}).get("mappings")
),
indexType=IndexType.IndexTemplate,
description=index_template.get("_meta", {}).get("description"),
)
yield Either(right=search_index_template_request)
self.register_record(
search_index_request=search_index_template_request
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Could not include index templates due to {exc}")

def close(self):
try:
if Path(self.service_connection.sslConfig.certificates.stagingDir).exists():
Expand Down
45 changes: 44 additions & 1 deletion ingestion/src/metadata/ingestion/source/search/search_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SearchServiceTopology(ServiceTopology):
cache_entities=True,
),
],
children=["search_index"],
children=["search_index", "search_index_template"],
post_process=["mark_search_indexes_as_deleted"],
)
search_index: Annotated[
Expand All @@ -107,6 +107,21 @@ class SearchServiceTopology(ServiceTopology):
],
)

search_index_template: Annotated[
TopologyNode, Field(description="Search Index Template Processing Node")
] = TopologyNode(
producer="get_search_index_template",
stages=[
NodeStage(
type_=SearchIndex,
context="search_index_template",
processor="yield_search_index_template",
consumer=["search_service"],
use_cache=True,
)
],
)


class SearchServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Expand Down Expand Up @@ -178,6 +193,34 @@ def get_search_index(self) -> Any:
continue
yield index_details

def yield_search_index_template(
self, search_index_template_details: Any
) -> Iterable[Either[CreateSearchIndexRequest]]:
"""Method to Get Search Index Templates"""

def get_search_index_template_list(self) -> Optional[List[Any]]:
"""Get list of all search index templates"""

def get_search_index_template_name(self, search_index_template_details: Any) -> str:
"""Get Search Index Template Name"""

def get_search_index_template(self) -> Any:
if self.source_config.includeIndexTemplate:
for index_template_details in self.get_search_index_template_list():
if search_index_template_name := self.get_search_index_template_name(
index_template_details
):
if filter_by_search_index(
self.source_config.searchIndexFilterPattern,
search_index_template_name,
):
self.status.filter(
search_index_template_name,
"Search Index Template Filtered Out",
)
continue
yield index_template_details

def yield_create_request_search_service(
self, config: WorkflowSource
) -> Iterable[Either[CreateSearchServiceRequest]]:
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class AbstractNativeApplication implements NativeApplication {
protected CollectionDAO collectionDAO;
private App app;
protected SearchRepository searchRepository;
protected boolean isJobInterrupted = false;

// Default service that contains external apps' Ingestion Pipelines
private static final String SERVICE_NAME = "OpenMetadata";
Expand Down Expand Up @@ -299,6 +298,11 @@ protected void pushAppStatusUpdates(
@Override
public void interrupt() throws UnableToInterruptJobException {
LOG.info("Interrupting the job for app: {}", this.app.getName());
isJobInterrupted = true;
stop();
}

protected void stop() {
LOG.info("Default stop behavior for app: {}", this.app.getName());
// Default implementation: no-op or generic cleanup logic
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,13 @@ public class SearchIndexApp extends AbstractNativeApplication {

@Getter private EventPublisherJob jobData;
private final Object jobDataLock = new Object();
private volatile boolean stopped = false;
private ExecutorService producerExecutor;
private final ExecutorService jobExecutor = Executors.newCachedThreadPool();
private BlockingQueue<Runnable> producerQueue = new LinkedBlockingQueue<>(100);
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
private JobExecutionContext jobExecutionContext;
private volatile boolean stopped = false;

public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
super(collectionDAO, searchRepository);
Expand All @@ -190,6 +191,7 @@ public void init(App app) {
@Override
public void startApp(JobExecutionContext jobExecutionContext) {
try {
this.jobExecutionContext = jobExecutionContext;
initializeJob(jobExecutionContext);
String runType =
(String) jobExecutionContext.getJobDetail().getJobDataMap().get("triggerType");
Expand Down Expand Up @@ -533,11 +535,17 @@ private void reCreateIndexes(String entityType) throws SearchIndexException {
}

@SuppressWarnings("unused")
public void stopJob() {
@Override
public void stop() {
LOG.info("Stopping reindexing job.");
stopped = true;
jobData.setStatus(EventPublisherJob.Status.STOP_IN_PROGRESS);
sendUpdates(jobExecutionContext);
shutdownExecutor(jobExecutor, "JobExecutor", 60, TimeUnit.SECONDS);
shutdownExecutor(producerExecutor, "ProducerExecutor", 60, TimeUnit.SECONDS);
LOG.info("Stopped reindexing job.");
jobData.setStatus(EventPublisherJob.Status.STOPPED);
sendUpdates(jobExecutionContext);
}

private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
Expand Down Expand Up @@ -596,7 +604,9 @@ private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionC
}
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
} finally {
sendUpdates(jobExecutionContext);
if (!stopped) {
sendUpdates(jobExecutionContext);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,30 +263,52 @@ public void triggerOnDemandApplication(App application) {
}

public void stopApplicationRun(App application) {
if (application.getFullyQualifiedName() == null) {
throw new IllegalArgumentException("Application's fullyQualifiedName is null.");
}
try {
// Interrupt any scheduled job
JobDetail jobDetailScheduled =
scheduler.getJobDetail(new JobKey(application.getName(), APPS_JOB_GROUP));
if (jobDetailScheduled != null) {
LOG.debug("Stopping Scheduled Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailScheduled.getKey());
}

// Interrupt any on-demand job
JobDetail jobDetailOnDemand =
scheduler.getJobDetail(
new JobKey(
String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP));

if (jobDetailOnDemand != null) {
LOG.debug("Stopping On Demand Execution for App : {}", application.getName());
scheduler.interrupt(jobDetailOnDemand.getKey());
boolean isJobRunning = false;
// Check if the job is already running
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : currentJobs) {
if ((jobDetailScheduled != null
&& context.getJobDetail().getKey().equals(jobDetailScheduled.getKey()))
|| (jobDetailOnDemand != null
&& context.getJobDetail().getKey().equals(jobDetailOnDemand.getKey()))) {
isJobRunning = true;
}
}
} catch (Exception ex) {
LOG.error("Failed to stop job execution.", ex);
if (!isJobRunning) {
throw new UnhandledServerException("There is no job running for the application.");
}
JobKey scheduledJobKey = new JobKey(application.getName(), APPS_JOB_GROUP);
if (jobDetailScheduled != null) {
LOG.debug("Stopping Scheduled Execution for App: {}", application.getName());
scheduler.interrupt(scheduledJobKey);
try {
scheduler.deleteJob(scheduledJobKey);
} catch (SchedulerException ex) {
LOG.error("Failed to delete scheduled job: {}", scheduledJobKey, ex);
}
} else {
JobKey onDemandJobKey =
new JobKey(
String.format("%s-%s", application.getName(), ON_DEMAND_JOB), APPS_JOB_GROUP);
if (jobDetailOnDemand != null) {
LOG.debug("Stopping On Demand Execution for App: {}", application.getName());
scheduler.interrupt(onDemandJobKey);
try {
scheduler.deleteJob(onDemandJobKey);
} catch (SchedulerException ex) {
LOG.error("Failed to delete on-demand job: {}", onDemandJobKey, ex);
}
}
}
} catch (SchedulerException ex) {
LOG.error("Failed to stop job execution for app: {}", application.getName(), ex);
}
}
}
Loading

0 comments on commit 32a445b

Please sign in to comment.