Skip to content

Commit

Permalink
fix: Bugs related to revisioner and definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
scruwys authored and Scott Cruwys committed Jan 24, 2021
1 parent a091264 commit bfbc8ec
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 60 deletions.
2 changes: 2 additions & 0 deletions app/definitions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def to_doc(self):
'datastore_engine': self.schema.datastore.engine,
'schema': self.schema.name,
'name': self.name,
'exact_name': self.search_label,
'description': self.short_desc,
'tags': self.tags,
}
Expand Down Expand Up @@ -551,6 +552,7 @@ def to_doc(self):
'schema': self.table.schema.name,
'table': self.table.name,
'name': self.name,
'exact_name': self.search_label,
'description': self.short_desc,
'tags': self.table.tags,
}
Expand Down
16 changes: 9 additions & 7 deletions app/omnisearch/backends/elastic_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ElasticBackend(base.BaseSearchBackend):

ALLOWED_FACET_MAP = {
'datastores': 'datastore_id',
'datastore_engines': 'datastore_engine',
'engines': 'datastore_engine',
'schemas': 'schema.keyword',
'tags': 'tags.keyword',
}
Expand Down Expand Up @@ -91,14 +91,16 @@ def execute(self, query, types=None, datastores=None, start=0, size=100, **facet
t = time.time()
s = Search(index=index, using=self.client)
s = s.query(
'multi_match',
type='phrase_prefix',
'simple_query_string',
fields=[
'schema',
'table',
'description',
'name^1.1',
'exact_name^100',
'name.raw^30',
'name^10',
'table^5',
'schema^3',
'description^3',
'text^1.1',
'tags',
],
query=query,
).filter(
Expand Down
47 changes: 30 additions & 17 deletions app/omnisearch/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,34 @@
'now'
]

SNAKE_CASE_SPLIT_TOKENIZER = {
'type': 'simple_pattern_split',
'pattern': '_',
}

DBO_NAME_SPLIT_TOKENIZER = {
'type': 'simple_pattern_split',
'pattern': '|'.join([
'\\.',
' ',
'_',
])
}

TABLE_INDEX_SETTINGS = {
'settings': {
'analysis': {
'analyzer': {
'slug_case_split': {
'tokenizer': 'slug_case_split',
'dbo_name_split': {
'tokenizer': 'dbo_name_split',
},
'custom_english_stop': {
'type': 'stop',
'stopwords': ENGLISH_STOPWORDS,
}
},
'tokenizer': {
'slug_case_split': {
'type': 'simple_pattern_split',
'pattern': '_',
}
'dbo_name_split': DBO_NAME_SPLIT_TOKENIZER,
}
}
},
Expand All @@ -165,7 +175,10 @@
},
'name': {
'type': 'text',
'analyzer': 'slug_case_split',
'analyzer': 'dbo_name_split',
},
'exact_name': {
'type': 'text',
},
'description': {
'type': 'text',
Expand All @@ -179,7 +192,7 @@
},
'text': {
'type': 'text',
'analyzer': 'slug_case_split',
'analyzer': 'dbo_name_split',
}
}
},
Expand All @@ -203,19 +216,16 @@
'settings': {
'analysis': {
'analyzer': {
'slug_case_split': {
'tokenizer': 'slug_case_split',
'dbo_name_split': {
'tokenizer': 'dbo_name_split',
},
'custom_english_stop': {
'type': 'stop',
'stopwords': ENGLISH_STOPWORDS
}
},
'tokenizer': {
'slug_case_split': {
'type': 'simple_pattern_split',
'pattern': '_',
}
'dbo_name_split': DBO_NAME_SPLIT_TOKENIZER,
}
}
},
Expand All @@ -235,11 +245,14 @@
},
'table': {
'type': 'text',
'analyzer': 'slug_case_split',
'analyzer': 'dbo_name_split',
},
'name': {
'type': 'text',
'analyzer': 'slug_case_split',
'analyzer': 'dbo_name_split',
},
'exact_name': {
'type': 'text',
},
'description': {
'type': 'text',
Expand All @@ -253,7 +266,7 @@
},
'text': {
'type': 'text',
'analyzer': 'slug_case_split',
'analyzer': 'dbo_name_split',
}
}
},
Expand Down
13 changes: 10 additions & 3 deletions app/revisioner/actions/created.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ def apply(self, batch_size=2000):

for page_num in paginator.page_range:
page = paginator.get_page(page_num)
data = [
self.get_attributes(revision) for revision in page.object_list
]
data = []

for revision in page.object_list:
attributes = self.get_attributes(revision)
if attributes is not None:
data.append(attributes)

if len(data):
self.bulk_insert(data)
Expand Down Expand Up @@ -135,6 +138,8 @@ def get_attributes(self, revision):
"""Get the instance attributes from the Revision.
"""
table_id = revision.parent_instance_id
if table_id is None:
return None
defaults = {
'workspace_id': self.workspace_id,
'table_id': table_id,
Expand All @@ -158,6 +163,8 @@ def get_attributes(self, revision):
"""
metadata = revision.metadata.copy()
table_id = revision.parent_instance_id
if table_id is None:
return None
defaults = {
'workspace_id': self.workspace_id,
'table_id': table_id,
Expand Down
53 changes: 28 additions & 25 deletions app/revisioner/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from app.inspector import service as inspector


def make(collector, *args, **kwargs): # noqa: C901
def make(collector, logger=None, *args, **kwargs): # noqa: C901
"""We take an initial pass at making the definition based on the OID, if supported. The collector
marks the raw database object as "processed" if we are able to match it to a Django model instance.
"""
Expand All @@ -14,6 +14,9 @@ def make(collector, *args, **kwargs): # noqa: C901
for number, row in enumerate(inspector.tables_and_views(collector.datastore)):
schema_name = row.pop('table_schema')

if logger and schema_name != last_schema_name:
logger.info('Starting to process schema: %s', schema_name)

if schema_name not in definition:
schema_instance = collector.schemas.find_by_oid(row['schema_object_id'])

Expand Down Expand Up @@ -74,42 +77,42 @@ def make(collector, *args, **kwargs): # noqa: C901
'indexes': indexes,
})

schema = definition[schema_name]['schema']
if number > 0 and last_schema_name != schema_name:
schema = definition[last_schema_name]['schema']

if not schema['instance']:
schema['instance'] = collector.schemas.search_unassigned(name=schema['name'])
if not schema['instance']:
schema['instance'] = collector.schemas.search_unassigned(name=schema['name'])

if schema['instance']:
collector.schemas.mark_as_processed(schema['instance'].pk)
if schema['instance']:
collector.schemas.mark_as_processed(schema['instance'].pk)

for table in definition[schema_name]['tables']:
if not table['instance'] and schema['instance']:
table['instance'] = collector.tables.search_unassigned(name=table['name'], schema_id=schema['instance'].pk)
for table in definition[last_schema_name]['tables']:
if not table['instance'] and schema['instance']:
table['instance'] = collector.tables.search_unassigned(name=table['name'], schema_id=schema['instance'].pk)

if not table['instance']:
continue
if not table['instance']:
continue

collector.tables.mark_as_processed(table['instance'].pk)
collector.tables.mark_as_processed(table['instance'].pk)

for column in table['columns']:
if column['instance']:
continue
for column in table['columns']:
if column['instance']:
continue

column['instance'] = collector.columns.search_unassigned(name=column['name'], table_id=table['instance'].pk)
column['instance'] = collector.columns.search_unassigned(name=column['name'], table_id=table['instance'].pk)

if column['instance']:
collector.columns.mark_as_processed(column['instance'].pk)
if column['instance']:
collector.columns.mark_as_processed(column['instance'].pk)

for index in table['indexes']:
if index['instance']:
continue
for index in table['indexes']:
if index['instance']:
continue

index['instance'] = collector.indexes.search_unassigned(name=index['name'], table_id=table['instance'].pk)
index['instance'] = collector.indexes.search_unassigned(name=index['name'], table_id=table['instance'].pk)

if index['instance']:
collector.indexes.mark_as_processed(index['instance'].pk)
if index['instance']:
collector.indexes.mark_as_processed(index['instance'].pk)

if number > 0 and last_schema_name != schema_name:
yield (last_schema_name, definition.pop(last_schema_name))

last_schema_name = schema_name
Expand Down
4 changes: 2 additions & 2 deletions app/revisioner/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def start_revisioner_run(self, run_id, *arg, **kwargs):
collector = DefinitionCollector(self._run.datastore)
run_tasks = []

for schema, schema_definition in definition.make(collector):
self.log.info(f'Uploading schema: {schema}')
for schema, schema_definition in definition.make(collector, logger=self.log):
storage_path = f'revisioner/{self._run.datastore_id}/run_id={self._run.id}/{schema}.json.gz'
blob.put_object(storage_path, schema_definition)
run_tasks.append(
RunTask(run=self._run, storage_path=storage_path, status=RunTask.PENDING),
)
self.log.info(f'Finished processing: {schema}')

RunTask.objects.bulk_create(run_tasks, ignore_conflicts=True)

Expand Down
8 changes: 6 additions & 2 deletions app/revisioner/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ def queue_runs(self, datastore_slug=None, *args, **kwargs):

@app.task(bind=True)
@logging.task_logger(__name__)
def detect_run_timeout(self, minutes=60, *args, **kwargs):
"""Garbage collection. Clears out runs if they haven't finished running after 60 minutes.
def detect_run_timeout(self, minutes=60 * 2, *args, **kwargs):
"""Garbage collection. Clears out runs if they haven't finished running after 120 minutes.
"""
date_from = timezone.now() - timedelta(minutes=minutes)
runs = Run.objects.filter(created_at__lte=date_from, finished_at=None)

for run in runs:
self.log.info(
f'(run: {run.id}) Marking run as timed out'
)

run.mark_as_finished()

RevisionerError.objects.create(
Expand Down
2 changes: 0 additions & 2 deletions docker-development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ services:
- metamapper
webserver:
<< : *metamapper_defaults
build:
context: ./
command: webserver
ports:
- 5050:5050
Expand Down
2 changes: 1 addition & 1 deletion www/src/app/Common/CodeMirror/CodeMirrorEditor.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CodeMirrorEditor extends React.Component {
readOnly: this.props.readOnly,
defaultValue: this.props.defaultValue,
onChange: this.props.onChange,
className: this.props.textAreaClassName
className: this.props.textAreaClassName,
})

return React.createElement('div', null, editor);
Expand Down
2 changes: 2 additions & 0 deletions www/src/app/Datastores/Readme/ReadmeEditor.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const ReadmeEditor = (props) => {
value={props.value}
onChange={props.onChange}
placeholder="read"
lineWrapping={true}
lineNumbers={true}
/>
</form>
)
Expand Down
2 changes: 1 addition & 1 deletion www/src/pages/Omnisearch/OmnisearchResults.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class OmnisearchResults extends Component {
<FacetCheckboxWidget
title="Schema"
form={form}
name="schema"
name="schemas"
options={this.getSchemas()}
/>
<FacetCheckboxWidget
Expand Down

0 comments on commit bfbc8ec

Please sign in to comment.