Skip to content
Snippets Groups Projects
Commit 3fa33178 authored by Binghong Xing's avatar Binghong Xing
Browse files

Merge branch 'BinghongXing' into 'main'

Update 8 files

See merge request !3
parents 127b66dc 761079fb
Branches
No related tags found
1 merge request!3Update 8 files
Showing
with 312 additions and 0 deletions
import logging, json, requests, socket
from flask import current_app
def main():
traffic_url = "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
headers_url = {
"Cache-Control": "no-cache",
"Ocp-Apim-Subscription-Key": "257b936748224c4b84e8bad2af33ffbb"
}
data = requests.get(traffic_url, headers=headers_url).json()
current_app.logger.info(f'Harvested one traffic observation')
requests.post(url='http://router.fission/enqueue/traffic',
headers={'Content-Type': 'application/json'},
data=json.dumps(data)
)
return 'OK'
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: errors
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 1
replicas: 1
config:
retention.ms: 86400000
segment.bytes: 1073741824
max.message.bytes: 7000000
max.request.size: 7500000
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: road
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 3
replicas: 1
config:
retention.ms: 86400000
segment.bytes: 1073741824
max.message.bytes: 7000000
max.request.size: 7500000
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: robservations
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 3
replicas: 1
config:
retention.ms: 86400000
segment.bytes: 1073741824
max.message.bytes: 7000000
max.request.size: 7500000
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: tobservations
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 3
replicas: 1
config:
retention.ms: 86400000
segment.bytes: 1073741824
max.message.bytes: 7000000
max.request.size: 7500000
import json, datetime
from flask import current_app
from kafka import KafkaConsumer
def main():
consumer = KafkaConsumer(
'traffic',
bootstrap_servers=['my-cluster-kafka-bootstrap.kafka.svc:9092'],
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
current_app.logger.info(f'Processed a traffic observation')
#kafka_message = context.request.get_data().decode('utf-8')
observations = []
for message in consumer:
kafka_message = message.value
current_app.logger.info(f'Processed a traffic observation: {kafka_message}')
for obs in kafka_message:
interval_start = obs["latest_stats"].get("interval_start")
if interval_start:
parsed_timestamp = datetime.datetime.fromisoformat(interval_start)
formatted_timestamp = parsed_timestamp.strftime("%Y-%m-%d %H:%M:%S")
else:
formatted_timestamp = None
observation = {
"id": obs.get("id"),
"name": obs.get("name"),
"organization": {
"id": obs["organization"].get("id")
},
"origin": {
"id": obs["origin"].get("id")
},
"destination": {
"id": obs["destination"].get("id")
},
"latest_stats": {
"interval_start": formatted_timestamp,
"travel_time": obs["latest_stats"].get("travel_time"),
"delay": obs["latest_stats"].get("delay"),
"speed": obs["latest_stats"].get("speed"),
"excess_delay": obs["latest_stats"].get("excess_delay"),
"congestion": obs["latest_stats"].get("congestion"),
"score": obs["latest_stats"].get("score"),
"flow_restriction_score": obs["latest_stats"].get("flow_restriction_score"),
"average_density": obs["latest_stats"].get("average_density"),
"density": obs["latest_stats"].get("density"),
"enough_data": obs["latest_stats"].get("enough_data"),
"ignored": obs["latest_stats"].get("ignored"),
"closed": obs["latest_stats"].get("closed"),
"estimated_percent": obs["latest_stats"].get("estimated_percent")
}
}
observations.append(observation)
return {
"status": 200,
"body": json.dumps(observations)
}
#!/bin/sh
pip3 install -r ${SRC_PKG}/requirements.txt -t ${SRC_PKG} && cp -r ${SRC_PKG} ${DEPLOY_PKG}
import logging, json, requests, socket
from flask import current_app, request
from aiokafka import AIOKafkaProducer
import asyncio
async def publish(queue, payload):
producer = AIOKafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc:9092',
max_request_size=7500000)
await producer.start()
try:
await producer.send_and_wait(queue, payload)
finally:
await producer.stop()
def main():
asyncio.run (
publish (
request.headers.get('X-Fission-Params-Topic'),
json.dumps(request.get_json()).encode('utf-8')
)
)
current_app.logger.info(f'Enqueued to topic {request.headers.get("X-Fission-Params-Topic")}')
return 'OK'
aiokafka==0.8.0
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.6.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.6"
message.max.bytes: 10000000
replica.fetch.max.bytes: 10000000
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 50Gi
deleteClaim: true
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 50Gi
deleteClaim: true
entityOperator:
topicOperator: {}
userOperator: {}
#!/bin/sh
pip3 install -r ${SRC_PKG}/requirements.txt -t ${SRC_PKG} && cp -r ${SRC_PKG} ${DEPLOY_PKG}
import json
from flask import current_app, request
from elasticsearch8 import Elasticsearch
def config(k):
with open(f'/configs/default/shared-data/{k}', 'r') as f:
return f.read()
def main():
# Get Elasticsearch username and password from environment variables
es_username = config('ES_USERNAME')
es_password = config('ES_PASSWORD')
# Connect to Elasticsearch
client = Elasticsearch(
'https://elasticsearch-master.elastic.svc.cluster.local:9200',
verify_certs=False,
ssl_show_warn=False,
basic_auth=(es_username, es_password) # Use environment variables
)
current_app.logger.info(f'Observations to add: {request.get_json(force=True)}')
request_data = request.get_json(force=True)
request_json = {"request_data": request_data}
res = client.index(
index='robservations',
body=request_json
)
current_app.logger.info(f'Indexed observation successfully')
return 'ok'
elasticsearch8==8.11.0
import json, datetime
from flask import current_app
from kafka import KafkaConsumer, KafkaProducer
def process_message(obs):
interval_start = obs["properties"].get("lastUpdated")
if interval_start:
parsed_timestamp = datetime.datetime.fromisoformat(interval_start)
formatted_timestamp = parsed_timestamp.strftime("%Y-%m-%d %H:%M:%S")
else:
formatted_timestamp = None
observation = {
"id": obs["properties"].get("id"),
"sourceName": obs["properties"].get("source", {}).get("sourceName"),
"sourceId": obs["properties"].get("source", {}).get("sourceId"),
"status": obs["properties"].get("status"),
"closedRoadName": obs["properties"].get("closedRoadName"),
"eventType": obs["properties"].get("eventType"),
"eventDueTo": obs["properties"].get("eventDueTo"),
"ImpactDirection": obs["properties"].get("impact", {}).get("direction"),
"ImpactType": obs["properties"].get("impact", {}).get("impactType"),
"numberLanesImpacted": obs["properties"].get("impact", {}).get("numberLanesImpacted"),
"speedLimitOnSite": obs["properties"].get("impact", {}).get("speedLimitOnSite"),
"durationstart": obs["properties"].get("duration", {}).get("start"),
"durationend": obs["properties"].get("duration", {}).get("end"),
"lastUpdated": formatted_timestamp
}
return observation
def main():
consumer = KafkaConsumer(
'road',
bootstrap_servers=['my-cluster-kafka-bootstrap.kafka.svc:9092'],
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
current_app.logger.info(f'Processed a Road observation')
producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc:9092')
for message in consumer:
kafka_message = message.value
current_app.logger.info(f'Processed a traffic observation')
features = kafka_message.get('features', [])
for obs in features:
observation = process_message(obs)
observation_json = json.dumps(observation)
producer.send('robservations', value=observation_json.encode('utf-8'))
current_app.logger.info(f'Processed all traffic observations')
consumer.close()
producer.close()
return {
"status": 200,
"body": "Processed observations sent to 'observations' topic."
}
#!/bin/sh
pip3 install -r ${SRC_PKG}/requirements.txt -t ${SRC_PKG} && cp -r ${SRC_PKG} ${DEPLOY_PKG}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment