You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
PadelClub/PadelClub/Data/Gen/generator.py

593 lines
24 KiB

import json
import re
import os
from pathlib import Path
from typing import Dict, List, Any
import argparse
import sys
import logging
from datetime import datetime
import inflect
class SwiftModelGenerator:
def __init__(self, json_data: Dict[str, Any]):
self.data = json_data
self.pluralizer = inflect.engine()
def generate_model(self, model_data: Dict[str, Any]) -> str:
model_name = model_data["name"]
is_sync = model_data.get("synchronizable", False)
is_observable = model_data.get("observable", False)
properties = model_data["properties"]
# Get protocol specific configurations
resource = self.make_resource_name(model_name)
resource_name = model_data.get("resource_name", resource)
token_exempted = model_data.get("tokenExemptedMethods", [])
copy_server_response = model_data.get("copy_server_response", "false")
lines = ["// Generated by SwiftModelGenerator", "// Do not modify this file manually", ""]
# Import statement
lines.append("import Foundation")
lines.append("import LeStorage")
if is_observable:
lines.append("import SwiftUI")
lines.append("")
# Class declaration
if is_observable:
lines.append("@Observable")
protocol = "SyncedStorable" if is_sync else "Storable"
base_class = "SyncedModelObject" if is_sync else "BaseModelObject"
lines.append(f"class Base{model_name}: {base_class}, {protocol} {{")
lines.append("")
# Add SyncedStorable protocol requirements
lines.extend(self._generate_protocol_requirements(resource_name, token_exempted, copy_server_response))
lines.append("")
# Properties
for prop in properties:
swift_type = prop["type"]
if prop.get("optional", False):
swift_type += "?"
default_value = prop.get("defaultValue", "nil" if prop.get("optional", False) else self._get_default_value(swift_type))
lines.append(f" var {prop['name']}: {swift_type} = {default_value}")
lines.append("")
# Add constructor
lines.extend(self._generate_constructor(model_name, properties))
lines.append("")
# CodingKeys
lines.extend(self._generate_coding_keys(model_name, properties, is_observable))
lines.append("")
# Encryption methods
encrypted_props = [p for p in properties if "encryption" in p]
if encrypted_props:
lines.extend(self._generate_encryption_methods(properties))
lines.append("")
# Codable implementation
lines.extend(self._generate_decoder(model_name, properties, is_observable, is_sync))
lines.append("")
lines.extend(self._generate_encoder(properties, is_observable, is_sync))
lines.append("")
# Foreign Key convenience
foreign_key_methods = self._generate_foreign_key_methods(properties)
if foreign_key_methods:
lines.extend(foreign_key_methods)
# lines.append("")
# Copy method
lines.extend(self._generate_copy_method(model_name, properties))
lines.append("")
# Add relationships function
lines.extend(self._generate_relationships(model_name, properties))
lines.append("")
lines.append("}")
return "\n".join(lines)
def _generate_constructor(self, model_name: str, properties: List[Dict[str, Any]]) -> List[str]:
"""Generate a constructor with all properties as parameters with default values."""
lines = [" init("]
# Generate parameter list
params = []
for prop in properties:
name = prop['name']
type_name = prop['type']
is_optional = prop.get("optional", False)
# Always include a default value
default_value = prop.get("defaultValue")
if default_value is None:
if is_optional:
default_value = "nil"
else:
default_value = self._get_default_value(type_name)
# Format the parameter with its type and default value
param = f" {name}: {type_name}"
if is_optional:
param += "?"
param += f" = {default_value}"
params.append(param)
# Join parameters with commas
lines.extend([f"{param}," for param in params[:-1]])
lines.append(f"{params[-1]}") # Last parameter without comma
# Constructor body
lines.extend([
" ) {",
" super.init()",
])
# Property assignments
for prop in properties:
name = prop['name']
lines.append(f" self.{name} = {name}")
lines.append(" }")
lines.extend([
" required public override init() {",
" super.init()",
" }",
])
return lines
def _generate_foreign_key_methods(self, properties: List[Dict[str, Any]]) -> List[str]:
lines = []
for prop in properties:
if "foreignKey" in prop:
foreign_key = prop["foreignKey"]
prop_name = prop["name"]
method_name = f"{prop_name}Value"
is_optional = prop.get("optional", False)
lines.extend([f" func {method_name}() -> {foreign_key.rstrip('*')}? {{"])
if is_optional:
lines.extend([
f" guard let {prop_name} = self.{prop_name} else {{ return nil }}"
])
if foreign_key.endswith("*"):
foreign_key = foreign_key[:-1] # Remove the asterisk
lines.extend([
f" return self.store?.findById({prop_name})"
])
else:
lines.extend([
f" return Store.main.findById({prop_name})"
])
lines.extend([" }", ""]) # Close the method and add a blank line
return lines
def _generate_coding_keys(self, model_name: str, properties: List[Dict[str, Any]], is_observable: bool) -> List[str]:
lines = [" enum CodingKeys: String, CodingKey {"]
if model_name == 'Tournament':
lines.append(" case isCanceled = \"isCanceled\"")
lines.append(" case payment = \"payment\"")
for prop in properties:
name = prop['name']
# Add underscore prefix to case name if observable
case_name = f"_{name}" if is_observable else name
# Use custom codingKey if provided, otherwise use the property name
coding_key_value = prop.get("codingKey", name)
lines.append(f" case {case_name} = \"{coding_key_value}\"")
lines.append(" }")
return lines
def _generate_encryption_methods(self, properties: List[Dict[str, Any]]) -> List[str]:
lines = []
for prop in properties:
if "encryption" in prop:
name = prop['name']
enc_type = prop['encryption']
if enc_type == "tournament_payment":
lines.extend([
f" private static func _decode{name.capitalize()}(container: KeyedDecodingContainer<CodingKeys>) throws -> TournamentPayment? {{",
f" var data = try container.decodeIfPresent(Data.self, forKey: ._{name})",
" if data == nil {",
" data = try container.decodeIfPresent(Data.self, forKey: .payment)",
" }",
" ",
" if let data {",
" do {",
" let decoded: String = try data.decryptData(pass: CryptoKey.pass.rawValue)",
" let sequence = decoded.compactMap { NumberFormatter.standard.number(from: String($0))?.intValue }",
" return TournamentPayment(rawValue: sequence[18])",
" } catch {",
" Logger.error(error)",
" }",
" }",
" return nil",
" }",
"",
f" private func _encode{name.capitalize()}(container: inout KeyedEncodingContainer<CodingKeys>) throws {{",
f" guard let {name} else {{",
f" try container.encodeNil(forKey: ._{name})",
" return",
" }",
" ",
" let max: Int = TournamentPayment.allCases.count",
" var sequence = (1...18).map { _ in Int.random(in: (0..<max)) }",
f" sequence.append({name}.rawValue)",
" sequence.append(contentsOf: (1...13).map { _ in Int.random(in: (0..<max ))} )",
"",
" let stringCombo: [String] = sequence.map { $0.formatted() }",
" let joined: String = stringCombo.joined(separator: \"\")",
" if let data = joined.data(using: .utf8) {",
" let encryped: Data = try data.encrypt(pass: CryptoKey.pass.rawValue)",
f" try container.encodeIfPresent(encryped, forKey: ._{name})",
" }",
" }"
])
elif enc_type == "tournament_iscanceled":
lines.extend([
f" private static func _decode{name.capitalize()}(container: KeyedDecodingContainer<CodingKeys>) throws -> Bool {{",
f" var data = try container.decodeIfPresent(Data.self, forKey: ._{name})",
" if data == nil {",
" data = try container.decodeIfPresent(Data.self, forKey: .isCanceled)",
" }",
" if let data {",
" do {",
" let decoded: String = try data.decryptData(pass: CryptoKey.pass.rawValue)",
" let sequence = decoded.compactMap { NumberFormatter.standard.number(from: String($0))?.intValue }",
" return Bool.decodeInt(sequence[18])",
" } catch {",
" Logger.error(error)",
" }",
" }",
" return false",
" }",
"",
f" private func _encode{name.capitalize()}(container: inout KeyedEncodingContainer<CodingKeys>) throws {{",
" let max: Int = 9",
" var sequence = (1...18).map { _ in Int.random(in: (0...max)) }",
f" sequence.append(self.{name}.encodedValue)",
" sequence.append(contentsOf: (1...13).map { _ in Int.random(in: (0...max ))} )",
" ",
" let stringCombo: [String] = sequence.map { $0.formatted() }",
" let joined: String = stringCombo.joined(separator: \"\")",
" if let data = joined.data(using: .utf8) {",
" let encryped: Data = try data.encrypt(pass: CryptoKey.pass.rawValue)",
f" try container.encode(encryped, forKey: ._{name})",
" }",
" }"
])
return lines
def _generate_decoder(self, model_name: str, properties: List[Dict[str, Any]], is_observable: bool, is_sync: bool = False) -> List[str]:
lines = [" required init(from decoder: Decoder) throws {",
" let container = try decoder.container(keyedBy: CodingKeys.self)"]
for prop in properties:
name = prop['name']
type_name = prop['type']
is_optional = prop.get("optional", False)
default_value = prop.get("defaultValue", "nil" if is_optional else self._get_default_value(type_name))
option = prop.get("option")
# Use the correct case reference based on observable flag
case_ref = f"_{name}" if is_observable else name
if "encryption" in prop:
enc_type = prop['encryption']
if enc_type == "standard":
if is_optional:
lines.append(f" self.{name} = try container.decodeEncryptedIfPresent(key: .{case_ref})")
else:
lines.append(f" self.{name} = try container.decodeEncrypted(key: .{case_ref})")
elif enc_type in ["tournament_payment", "tournament_iscanceled"]:
lines.append(f" self.{name} = try Self._decode{name.capitalize()}(container: container)")
else:
# Handle Date with fractional option
if type_name == "Date" and option == "fractional":
if is_optional:
lines.append(f" if let dateString = try container.decodeIfPresent(String.self, forKey: .{case_ref}) {{")
lines.append(f" self.{name} = Date.iso8601FractionalFormatter.date(from: dateString)")
lines.append(" } else {")
lines.append(f" self.{name} = {default_value}")
lines.append(" }")
else:
lines.append(f" let dateString = try container.decode(String.self, forKey: .{case_ref})")
lines.append(f" self.{name} = Date.iso8601FractionalFormatter.date(from: dateString) ?? {default_value}")
else:
lines.append(f" self.{name} = try container.decodeIfPresent({type_name}.self, forKey: .{case_ref}) ?? {default_value}")
lines.append(" try super.init(from: decoder)")
lines.append(" }")
return lines
def _generate_encoder(self, properties: List[Dict[str, Any]], is_observable: bool, is_sync: bool = False) -> List[str]:
lines = [" override func encode(to encoder: Encoder) throws {",
" var container = encoder.container(keyedBy: CodingKeys.self)"]
for prop in properties:
name = prop['name']
type_name = prop['type']
is_optional = prop.get('optional', False)
option = prop.get("option")
# Use the correct case reference based on observable flag
case_ref = f"_{name}" if is_observable else name
if "encryption" in prop:
enc_type = prop['encryption']
if enc_type == "standard":
if is_optional:
lines.append(f" try container.encodeAndEncryptIfPresent(self.{name}?.data(using: .utf8), forKey: .{case_ref})")
else:
lines.append(f" try container.encodeAndEncryptIfPresent(self.{name}.data(using: .utf8), forKey: .{case_ref})")
elif enc_type in ["tournament_payment", "tournament_iscanceled"]:
lines.append(f" try _encode{name.capitalize()}(container: &container)")
else:
if type_name == "Date" and option == "fractional":
if is_optional:
lines.append(f" if let date = self.{name} {{")
lines.append(f" try container.encode(Date.iso8601FractionalFormatter.string(from: date), forKey: .{case_ref})")
lines.append(" } else {")
lines.append(f" try container.encodeNil(forKey: .{case_ref})")
lines.append(" }")
else:
lines.append(f" try container.encode(Date.iso8601FractionalFormatter.string(from: self.{name}), forKey: .{case_ref})")
else:
lines.append(f" try container.encode(self.{name}, forKey: .{case_ref})")
lines.append(" try super.encode(to: encoder)")
lines.append(" }")
return lines
def _generate_copy_method(self, model_name: str, properties: List[Dict[str, Any]]) -> List[str]:
model_variable = model_name.lower()
lines = [f" func copy(from other: any Storable) {{"]
lines.append(f" guard let {model_variable} = other as? Base{model_name} else {{ return }}")
for prop in properties:
name = prop['name']
lines.append(f" self.{name} = {model_variable}.{name}")
lines.append(" }")
return lines
def _generate_protocol_requirements(self, resource_name: str, token_exempted: List[str], copy_server_response: str) -> List[str]:
"""Generate the static functions required by SyncedStorable protocol."""
# Convert HTTP methods to proper format
formatted_methods = [f".{method.lower()}" for method in token_exempted]
methods_str = ", ".join(formatted_methods) if formatted_methods else ""
return [
f" static func resourceName() -> String {{ return \"{resource_name}\" }}",
f" static func tokenExemptedMethods() -> [HTTPMethod] {{ return [{methods_str}] }}",
f" static var copyServerResponse: Bool = {copy_server_response}",
]
def _generate_relationships(self, model_name, properties: List[Dict[str, Any]]) -> List[str]:
# Find all properties with foreign keys
foreign_key_props = [p for p in properties if "foreignKey" in p]
if not foreign_key_props:
# If no foreign keys, return empty array
return [
" static func relationships() -> [Relationship] {",
" return []",
" }"
]
lines = [
" static func relationships() -> [Relationship] {",
" return ["
]
# Generate relationship entries
for prop in foreign_key_props:
name = prop["name"]
foreign_key = prop["foreignKey"].rstrip('*') # Remove asterisk if present
lines.append(f" Relationship(type: {foreign_key}.self, keyPath: \\Base{model_name}.{name}),")
# Close the array and function
lines.extend([
" ]",
" }"
])
return lines
def _get_default_value(self, type_name: str) -> str:
"""Get default value for non-optional types"""
if "String" in type_name:
return "\"\""
elif "Int" in type_name:
return "0"
elif "Double" in type_name:
return "0.0"
elif "Bool" in type_name:
return "false"
elif "Date" in type_name:
return "Date()"
else:
return "nil" # For any other type
def get_output_filename(self, model_name: str) -> str:
"""Generate the output filename for a model."""
return f"Base{model_name}.swift"
def make_resource_name(self, text):
p = inflect.engine()
# Split camelCase into words
words = re.findall('[A-Z][^A-Z]*', text)
if not words:
words = [text]
words = [word.lower() for word in words]
words[-1] = p.plural(words[-1])
return '-'.join(words)
def process_directory(input_dir: str, output_dir: str, logger: logging.Logger, dry_run: bool = False) -> int:
"""Process all JSON files in the input directory and generate Swift models."""
try:
input_path = validate_directory(input_dir)
if not dry_run:
output_path = validate_directory(output_dir, create=True)
json_files = list(input_path.glob("*.json"))
if not json_files:
logger.warning(f"No JSON files found in '{input_dir}'")
return 0
logger.info(f"Found {len(json_files)} JSON files to process")
successful_files = 0
for json_file in json_files:
try:
with open(json_file, 'r') as f:
json_data = json.load(f)
generator = SwiftModelGenerator(json_data)
# Generate each model in the JSON file
for model in json_data["models"]:
model_name = model["name"]
swift_code = generator.generate_model(model)
if dry_run:
logger.info(f"Would generate Base{model_name}.swift")
continue
# Write to output file with Base prefix
output_file = output_path / generator.get_output_filename(model_name)
with open(output_file, 'w') as f:
f.write(swift_code)
logger.info(f"Generated Base{model_name}.swift")
successful_files += 1
except json.JSONDecodeError as e:
logger.error(f"Error parsing {json_file.name}: {e}")
except KeyError as e:
logger.error(f"Missing required key in {json_file.name}: {e}")
except Exception as e:
logger.error(f"Error processing {json_file.name}: {e}")
return successful_files
except Exception as e:
logger.error(f"Fatal error: {e}")
return 0
def setup_logging(verbose: bool) -> logging.Logger:
"""Configure logging based on verbosity level."""
logger = logging.getLogger('SwiftModelGenerator')
handler = logging.StreamHandler()
if verbose:
logger.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
else:
logger.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
return logger
def validate_directory(path: str, create: bool = False) -> Path:
"""Validate and optionally create a directory."""
dir_path = Path(path)
if dir_path.exists():
if not dir_path.is_dir():
raise ValueError(f"'{path}' exists but is not a directory")
elif create:
dir_path.mkdir(parents=True)
else:
raise ValueError(f"Directory '{path}' does not exist")
return dir_path
def main():
parser = argparse.ArgumentParser(
description="Generate Swift model classes from JSON definitions",
epilog="Example: %(prog)s -i ./model_definitions -o ../MyProject/Models"
)
parser.add_argument(
"-i", "--input-dir",
required=True,
help="Directory containing JSON model definitions"
)
parser.add_argument(
"-o", "--output-dir",
required=True,
help="Directory where Swift files will be generated"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Enable verbose output"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be generated without actually creating files"
)
parser.add_argument(
"--version",
action="version",
version="%(prog)s 1.0.0"
)
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.verbose)
# Process the directories
start_time = datetime.now()
logger.info("Starting model generation...")
successful_files = process_directory(
args.input_dir,
args.output_dir,
logger,
args.dry_run
)
# Report results
duration = datetime.now() - start_time
logger.info(f"\nGeneration completed in {duration.total_seconds():.2f} seconds")
logger.info(f"Successfully processed {successful_files} files")
# Return appropriate exit code
sys.exit(0 if successful_files > 0 else 1)
if __name__ == "__main__":
main()