Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates to platform message sent to Salesforce #629

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
20 changes: 12 additions & 8 deletions src/server/api/API_ingest/shelterluv_people.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import requests, os
from models import ShelterluvPeople
from config import engine
from sqlalchemy.orm import sessionmaker
import os
import requests
import structlog
from sqlalchemy.orm import sessionmaker

from config import engine
from models import ShelterluvPeople

logger = structlog.get_logger()

try:
Expand Down Expand Up @@ -44,17 +47,18 @@ def store_shelterluv_people_all():

with Session() as session:
logger.debug("Truncating table shelterluvpeople")

session.execute("TRUNCATE TABLE shelterluvpeople")

logger.debug("Start getting shelterluv contacts from people table")

while has_more:
r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset),
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
if r.status_code != 200:
logger.error("HTTP status code: %s Error detail: %s", r.status_code, r.text)
raise Exception("Error pulling Shelterluv people")

response = r.json()
for person in response["people"]:
#todo: Does this need more "null checks"?
session.add(ShelterluvPeople(firstname=person["Firstname"],
lastname=person["Lastname"],
id=person["ID"] if "ID" in person else None,
Expand Down
69 changes: 30 additions & 39 deletions src/server/api/API_ingest/updated_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,47 @@ def get_updated_contact_data():
qry = """ -- Collect latest foster/volunteer dates
select json_agg (upd) as "cd"
from (
select
sf.source_id as "Id" , -- long salesforce string
array_agg(sl.source_id) filter (where sl.source_id is not null) as "Person_Id__c", -- short PAWS-local shelterluv id
select
salesforce.source_id as "contactId",
shelterluv.person_ids as "personIds",
case
when
(extract(epoch from now())::bigint - max(foster_out) < 365*86400) -- foster out in last year
or (extract(epoch from now())::bigint - max(foster_return) < 365*86400) -- foster return
then 'Active'
else 'Inactive'
end as "Foster_Activity__c",
max(foster_out) as "Foster_Start_Date__c",
max(foster_return) as "Foster_End_Date__c",
min(vol.first_date) "First_volunteer_date__c",
max(vol.last_date) "Last_volunteer_date__c",
sum(vol.hours) as "Total_volunteer_hours__c",
array_agg(vc.source_id::integer) filter(where vc.source_id is not null) as "Volgistics_Id__c"
when volgistics.last_shift_date > now() - interval '1 year' then 'Active' else 'InActive'
end as "volunteerStatus",
shelterluv.foster_start as "fosterStartDate",
shelterluv.foster_end as "fosterEndDate",
volgistics.first_volunteer_date as "firstVolunteerDate",
volgistics.last_shift_date as "lastShiftDate",
volgistics.total_hours as "totalVolunteerHours",
volgistics.volg_ids as "volgisticIds"
from (
select source_id, matching_id from pdp_contacts sf
where sf.source_type = 'salesforcecontacts'
) sf
left join pdp_contacts sl on sl.matching_id = sf.matching_id and sl.source_type = 'shelterluvpeople'
select * from pdp_contacts pc where source_type = 'salesforcecontacts'
) salesforce
left join (
select
person_id,
max(case when event_type=1 then time else null end) * 1000 adopt,
max(case when event_type=2 then time else null end) * 1000 foster_out,
-- max(case when event_type=3 then time else null end) rto,
max(case when event_type=5 then time else null end) * 1000 foster_return
from sl_animal_events
group by person_id
) sle on sle.person_id::text = sl.source_id
left join pdp_contacts vc on vc.matching_id = sf.matching_id and vc.source_type = 'volgistics'
select matching_id, array_agg(distinct v."number"::int) volg_ids, sum(hours) total_hours,
min(from_date) first_volunteer_date, max(from_date) last_shift_date
from volgistics v
left join volgisticsshifts v2 on v2.volg_id::varchar = v.number
inner join pdp_contacts pc on pc.source_id = v2.volg_id::varchar and pc.source_type = 'volgistics'
group by matching_id
) volgistics on volgistics.matching_id = salesforce.matching_id
left join (
select
volg_id,
sum(hours) as hours,
extract(epoch from min(from_date)) * 1000 as first_date,
extract(epoch from max(from_date)) * 1000 as last_date
from volgisticsshifts
group by volg_id
) vol on vol.volg_id::text = vc.source_id
where sl.matching_id is not null or vc.matching_id is not null
group by sf.source_id
matching_id, array_agg(distinct p.internal_id) as person_ids,
max(case when event_type=1 then to_timestamp(time) else null end) adopt,
min(case when event_type=2 then to_timestamp(time) else null end) foster_start,
max(case when event_type=5 then to_timestamp(time) else null end) foster_end
from shelterluvpeople p
left join sl_animal_events sae on sae.person_id::varchar = p.internal_id
inner join pdp_contacts pc on pc.source_id = p.internal_id
group by matching_id
) shelterluv on shelterluv.matching_id = salesforce.matching_id
where volgistics.matching_id is not null or shelterluv.matching_id is not null
) upd;
"""

with Session() as session:
result = session.execute(qry)
sfdata = result.fetchone()[0]
if sfdata:
logger.debug(sfdata)
logger.debug("Query for Salesforce update returned %d records", len(sfdata))
return sfdata
13 changes: 6 additions & 7 deletions src/server/api/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,9 @@ def user_test2():

@internal_api.route("/api/internal/ingestRawData", methods=["GET"])
def ingest_raw_data():
try:
ingest_sources_from_api.start()
except Exception as e:
logger.error(e)

ingest_sources_from_api.start()
return jsonify({'outcome': 'OK'}), 200


@internal_api.route("/api/internal/get_updated_data", methods=["GET"])
def get_contact_data():
logger.debug("Calling get_updated_contact_data()")
Expand All @@ -49,7 +44,11 @@ def get_contact_data():
logger.debug("Returning %d contact records", len(contact_json))
else:
logger.debug("No contact records found")
return jsonify({'outcome': 'OK'}), 200
return jsonify({
'outcome': 'OK',
'data': contact_json,
'length': len(contact_json) if contact_json else 0
}), 200


@internal_api.route("/api/internal/start_flow", methods=["GET"])
Expand Down
11 changes: 5 additions & 6 deletions src/server/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import logging
import os
import sys
import sqlalchemy as db
import models
from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES

import logging
import sqlalchemy as db
import structlog
from structlog.processors import CallsiteParameter

from constants import IS_LOCAL, BASE_PATH, RAW_DATA_PATH, OUTPUT_PATH, LOGS_PATH, REPORT_PATH, ZIPPED_FILES

# structlog setup for complete app

Expand All @@ -17,7 +16,7 @@
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt=None, utc=True ),
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.CallsiteParameterAdder(
[
CallsiteParameter.FILENAME,
Expand Down Expand Up @@ -67,7 +66,7 @@
+ POSTGRES_DATABASE
)

engine = db.create_engine(DB)
engine = db.create_engine(DB, pool_pre_ping=True)

# Run Alembic to create managed tables
# from alembic.config import Config
Expand Down
11 changes: 7 additions & 4 deletions src/server/pub_sub/salesforce_message_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def send_pipeline_update_messages(contacts_list):
schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json

payloads = []

batches = 0
while len(contacts_list) > 0:
if len(contacts_list) > BATCH_SIZE:
current_batch = contacts_list[:BATCH_SIZE]
Expand All @@ -85,9 +86,11 @@ def send_pipeline_update_messages(contacts_list):
"schema_id": schema_id,
"payload": buf.getvalue()
}
payloads.append(payload)

stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data)
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
logger.info('Sent %s contacts in message', len(current_batch))
batches = batches + 1


logger.info("%s total pipeline update messages sent", len(payloads))
logger.info('completed sending platform messages, %s messages sent', batches)