blob: 247cd155a070bb556c192335e5a18ed2f422d2b7 [file] [log] [blame]
import os
import argparse
import io
import codecs
import json
import re
import pathlib
from pathlib import Path
from os import listdir
from os.path import isdir, exists, join
from re import split
sdks = {}
clientClassPrefix = {}
# Eventstreaming operations are only available on the async clients
asyncOnlyOperations = {}
def generateDocsMap(apiDefinitionsPath, apiDefinitionsRelativeFilePath):
rootPath = pathlib.Path(r'./services')
for serviceModelPaths in rootPath.rglob('service-2.json'):
tokenizePath = str(Path(serviceModelPaths).parent).split("/")
getServiceName = tokenizePath[len(tokenizePath)-1]
if (getServiceName == "codegen-resources"):
getServiceName = str(serviceModelPaths).split("services/")[1].split("/src/main/resources")[0]
with codecs.open(serviceModelPaths, 'rb', 'utf-8') as apiDefinition:
apiContent = json.loads(apiDefinition.read())
if "uid" in apiContent["metadata"].keys():
sdks[apiContent["metadata"]["uid"]] = getServiceName
clientClassPrefix[apiContent["metadata"]["uid"]] = getClientClassNameFromMetadata(apiContent["metadata"])
asyncOnlyOps = generateAsyncOnlyOps(apiContent)
if len(asyncOnlyOps.keys()) > 0:
asyncOnlyOperations[apiContent["metadata"]["uid"]] = asyncOnlyOps
return sdks
def generateAsyncOnlyOps(apiContent):
asyncOnlyOps = {}
for op in apiContent['operations'].keys():
eventStreamInOut = hasEventStreamInputOutput(apiContent, op)
if len(eventStreamInOut.keys()) > 0:
asyncOnlyOps[op] = eventStreamInOut
return asyncOnlyOps
def hasEventStreamInputOutput(serviceModel, operationName):
inOut = {}
opModel = serviceModel['operations'][operationName]
if 'input' in opModel.keys():
inputShapeName = opModel['input']['shape']
if hasEventStreamMember(serviceModel, inputShapeName):
inOut['input'] = True
if 'output' in opModel.keys():
outputShapeName = opModel['output']['shape']
if hasEventStreamMember(serviceModel, outputShapeName):
inOut['output'] = True
return inOut
def hasEventStreamMember(serviceModel, shapeName):
shapeModel = serviceModel['shapes'][shapeName]
if 'members' in shapeModel.keys():
for name,memberModel in shapeModel['members'].items():
if isEventStream(serviceModel,memberModel['shape']):
return True
return False
def isEventStream(serviceModel, shapeName):
shapeModel = serviceModel['shapes'][shapeName]
return 'eventstream' in shapeModel and shapeModel['eventstream']
def splitOnWordBoundaries(toSplit) :
result = toSplit
result = re.sub(r'[^A-Za-z0-9]+', " ", result)
result = re.sub(r'([^a-z]{2,})v([0-9]+)', r'\g<1> v\g<2>' , result)
result = re.sub(r'([^A-Z]{2,})V([0-9]+)', r'\g<1> V\g<2>', result)
result = re.sub(r'(?<=[a-z])(?=[A-Z]([a-zA-Z]|[0-9]))', r' ', result)
result = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\g<1> \g<2>', result)
result = re.sub(r'([0-9])([a-zA-Z])', r'\g<1> \g<2>', result)
result = re.sub(r' +', ' ', result)
return result.split(" ");
def capitalize(str):
if(str is None or len(str) == 0):
return str
strFirstCaps = str[0].title() + str[1:]
return strFirstCaps
def lowerCase(str):
if(str is None or len(str) == 0):
return str
return str.lower()
def pascalCase(str):
splits = splitOnWordBoundaries(str)
modifiedStr = ""
for i in range(0, len(splits)) :
modifiedStr += capitalize(lowerCase(splits[i]))
return modifiedStr
def getClientClassNameFromMetadata(metadataNode):
toSanitize = ""
if "serviceId" in metadataNode.keys():
toSanitize = metadataNode["serviceId"]
clientName = pascalCase(toSanitize)
clientName = removeLeading(clientName, "Amazon")
clientName = removeLeading(clientName, "Aws")
clientName = removeTrailing(clientName, "Service" )
return clientName
def removeLeading(str, toRemove) :
if(str is None) :
return str
if(str.startswith(toRemove)):
return str.replace(toRemove, "")
return str
def removeTrailing(str, toRemove) :
if(str is None) :
return str
if(str.endswith(toRemove)):
return str.replace(toRemove, "")
return str
def insertDocsMapToRedirect(apiDefinitionsBasePath, apiDefinitionsRelativeFilePath, templateFilePath, outputFilePath):
generateDocsMap(apiDefinitionsBasePath, apiDefinitionsRelativeFilePath)
output = ""
with codecs.open(templateFilePath, 'rb', 'utf-8') as redirect_template:
current_template = redirect_template.read();
output = current_template.replace("${UID_SERVICE_MAPPING}", json.dumps(sdks, ensure_ascii=False))
output = output.replace("${UID_CLIENT_CLASS_MAPPING}", json.dumps(clientClassPrefix, ensure_ascii=False))
output = output.replace("${SERVICE_NAME_TO_ASYNC_ONLY_OPERATION_MAPPING}", json.dumps(asyncOnlyOperations, ensure_ascii=False))
with open(outputFilePath, 'w') as redirect_output:
redirect_output.write(output)
def Main():
parser = argparse.ArgumentParser(description="Generates a Cross-link redirect file.")
parser.add_argument("--apiDefinitionsBasePath", action="store")
parser.add_argument("--apiDefinitionsRelativeFilePath", action="store")
parser.add_argument("--templateFilePath", action="store")
parser.add_argument("--outputFilePath", action="store")
args = vars( parser.parse_args() )
argMap = {}
argMap[ "apiDefinitionsBasePath" ] = args[ "apiDefinitionsBasePath" ] or "./../services/"
argMap[ "apiDefinitionsRelativeFilePath" ] = args[ "apiDefinitionsRelativeFilePath" ] or "/src/main/resources/codegen-resources/service-2.json"
argMap[ "templateFilePath" ] = args[ "templateFilePath" ] or "./scripts/doc_crosslinks/crosslink_redirect.html"
argMap[ "outputFilePath" ] = args[ "outputFilePath" ] or "./crosslink_redirect.html"
insertDocsMapToRedirect(argMap["apiDefinitionsBasePath"], argMap["apiDefinitionsRelativeFilePath"], argMap["templateFilePath"], argMap["outputFilePath"])
print("Generated Cross link at " + argMap["outputFilePath"])
Main()