You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/biz/admin.py

471 lines
19 KiB

from django.http import HttpResponseRedirect
from django.contrib import admin
from django.urls import path, reverse
from django.contrib import messages
from django.shortcuts import render, redirect
from django.contrib.auth import get_user_model
from django.utils.html import format_html
from django.core.mail import send_mail
import csv
import io
import time
import logging
from .models import Entity, Prospect, Activity, Status, ActivityType, EmailTemplate, DeclinationReason
from .forms import FileImportForm, EmailTemplateSelectionForm
from .filters import ProspectStatusFilter, StaffUserFilter, ProspectProfileFilter, ProspectDeclineReasonFilter
from tournaments.models import CustomUser
from tournaments.models.enums import UserOrigin
from sync.admin import SyncedObjectAdmin
User = get_user_model()
logger = logging.getLogger(__name__)
class ProspectInline(admin.StackedInline):
model = Prospect.entities.through
extra = 1
verbose_name = "Prospect"
verbose_name_plural = "Prospects"
autocomplete_fields = ['prospect']
@admin.register(Entity)
class EntityAdmin(SyncedObjectAdmin):
list_display = ('name', 'address', 'zip_code', 'city')
search_fields = ('name', 'address', 'zip_code', 'city')
# filter_horizontal = ('prospects',)
inlines = [ProspectInline]
@admin.register(EmailTemplate)
class EmailTemplateAdmin(SyncedObjectAdmin):
list_display = ('name', 'subject', 'body')
search_fields = ('name', 'subject')
def contacted_by_sms(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, ActivityType.SMS, Status.CONTACTED, None)
contacted_by_sms.short_description = "Contacted by SMS"
def mark_as_customer(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.CUSTOMER, None)
mark_as_customer.short_description = "Mark as customer"
def mark_as_should_test(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.TESTING, None)
mark_as_should_test.short_description = "Mark as should test"
def mark_as_testing(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.CUSTOMER, None)
mark_as_testing.short_description = "Mark as testing"
def declined_too_expensive(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.DECLINED, DeclinationReason.TOO_EXPENSIVE)
declined_too_expensive.short_description = "Declined too expensive"
def declined_use_something_else(modeladmin, request, queryset):
create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.DECLINED, DeclinationReason.USE_OTHER_PRODUCT)
declined_use_something_else.short_description = "Declined use something else"
def create_default_activity_for_prospect(modeladmin, request, queryset, type, status, reason):
for prospect in queryset:
activity = Activity.objects.create(
type=type,
status=status,
declination_reason=reason,
related_user = request.user
)
activity.prospects.add(prospect)
modeladmin.message_user(
request,
f'{queryset.count()} prospects were marked as {status}.'
)
def create_activity_for_prospect(modeladmin, request, queryset):
# Only allow single selection
if queryset.count() != 1:
messages.error(request, "Please select exactly one prospect.")
return
prospect = queryset.first()
# Build the URL with pre-populated fields
url = reverse('admin:biz_activity_add')
url += f'?prospect={prospect.id}'
return redirect(url)
create_activity_for_prospect.short_description = "Create activity"
@admin.register(Prospect)
class ProspectAdmin(SyncedObjectAdmin):
readonly_fields = ['related_activities', 'entity_names', 'current_status', 'id']
fieldsets = [
(None, {
'fields': ['related_activities', 'id', 'entity_names', 'first_name', 'last_name', 'email', 'phone', 'official_user', 'name_unsure', 'entities']
}),
]
list_display = ('entity_names', 'first_name', 'last_name', 'email', 'last_update', 'current_status')
list_filter = (ProspectStatusFilter, ProspectDeclineReasonFilter, 'creation_date', StaffUserFilter, 'source', ProspectProfileFilter)
search_fields = ('first_name', 'last_name', 'email', 'entities__name')
date_hierarchy = 'creation_date'
change_list_template = "admin/biz/prospect/change_list.html"
ordering = ['-last_update']
filter_horizontal = ['entities']
actions = ['send_email', create_activity_for_prospect, contacted_by_sms, mark_as_should_test, mark_as_testing, mark_as_customer, declined_too_expensive, declined_use_something_else]
raw_id_fields = ['official_user']
def related_activities(self, obj):
activities = obj.activities.all()
if activities:
activity_links = []
for activity in activities:
url = f"/kingdom/biz/activity/{activity.id}/change/"
activity_links.append(f'<a href="{url}">{activity.html_desc()}</a>')
return format_html('<br>'.join(activity_links))
return "No events"
related_activities.short_description = "Related Activities"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('import_file/', self.admin_site.admin_view(self.import_file), name='import_file'),
path('import_app_users/', self.admin_site.admin_view(self.import_app_users), name='import_app_users'),
path('cleanup/', self.admin_site.admin_view(self.cleanup), name='cleanup'),
]
return custom_urls + urls
def cleanup(self, request):
Entity.objects.all().delete()
Prospect.objects.all().delete()
Activity.objects.all().delete()
messages.success(request, 'cleanup biz objects')
return redirect('admin:biz_prospect_changelist')
def import_app_users(self, request):
users = CustomUser.objects.filter(origin=UserOrigin.APP)
created_count = 0
for user in users:
is_customer = user.purchases.count() > 0
entity_name = user.latest_event_club_name()
prospect, prospect_created = Prospect.objects.get_or_create(
email=user.email,
defaults={
'first_name': user.first_name,
'last_name': user.last_name,
'phone': user.phone,
'name_unsure': False,
'official_user': user,
'source': 'App',
}
)
if entity_name:
entity, entity_created = Entity.objects.get_or_create(
name=entity_name,
defaults={'name': entity_name}
)
prospect.entities.add(entity)
if is_customer:
activity = Activity.objects.create(
status=Status.CUSTOMER,
)
activity.prospects.add(prospect)
if prospect_created:
created_count += 1
messages.success(request, f'Imported {created_count} app users into prospects')
return redirect('admin:biz_prospect_changelist')
def import_file(self, request):
"""
Handle file import - displays form and processes file upload
"""
if request.method == 'POST':
form = FileImportForm(request.POST, request.FILES)
if form.is_valid():
# Call the import_csv method with the uploaded file
try:
result = self.import_csv(form.cleaned_data['file'], form.cleaned_data['source'])
messages.success(request, f'File imported successfully: {result}')
return redirect('admin:biz_prospect_changelist')
except Exception as e:
messages.error(request, f'Error importing file: {str(e)}')
else:
messages.error(request, 'Please correct the errors below.')
else:
form = FileImportForm()
context = {
'form': form,
'title': 'Import File',
'app_label': self.model._meta.app_label,
'opts': self.model._meta,
'has_change_permission': self.has_change_permission(request),
}
return render(request, 'admin/biz/prospect/import_file.html', context)
def import_csv(self, file, source):
"""
Process the uploaded CSV file
CSV format: entity_name,last_name,first_name,email,phone,attachment_text,status,related_user
"""
try:
# Read the file content
file_content = file.read().decode('utf-8')
csv_reader = csv.reader(io.StringIO(file_content))
created_prospects = 0
updated_prospects = 0
created_entities = 0
created_events = 0
for row in csv_reader:
if len(row) < 8:
continue # Skip rows that don't have enough columns
entity_name = row[0].strip()
last_name = row[1].strip()
first_name = row[2].strip()
email = row[3].strip()
phone = row[4].strip() if row[4].strip() else None
if phone and not phone.startswith('0'):
phone = '0' + phone
attachment_text = row[5].strip() if row[5].strip() else None
status_text = row[6].strip() if row[6].strip() else None
related_user_name = row[7].strip() if row[7].strip() else None
# Create or get Entity
entity = None
if entity_name:
entity, entity_created = Entity.objects.get_or_create(
name=entity_name,
defaults={'name': entity_name}
)
if entity_created:
created_entities += 1
# Get related user if provided
related_user = None
if related_user_name:
try:
related_user = User.objects.get(username=related_user_name)
except User.DoesNotExist:
# Try to find by first name if username doesn't exist
related_user = User.objects.filter(first_name__icontains=related_user_name).first()
# Create or update Prospect
prospect, prospect_created = Prospect.objects.get_or_create(
email=email,
defaults={
'first_name': first_name,
'last_name': last_name,
'phone': phone,
'name_unsure': False,
'related_user': related_user,
'source': source,
}
)
if prospect_created:
created_prospects += 1
else:
# Check if names are different and mark as name_unsure
if (prospect.first_name != first_name or prospect.last_name != last_name):
prospect.name_unsure = True
# Update related_user if provided
if related_user:
prospect.related_user = related_user
prospect.save()
updated_prospects += 1
# Associate entity with prospect
if entity:
prospect.entities.add(entity)
# Create Event if attachment_text or status is provided
if attachment_text or status_text:
# Map status text to Status enum
status_value = None
declination_reason = None
if status_text:
if 'CONTACTED' in status_text:
status_value = Status.CONTACTED
elif 'RESPONDED' in status_text:
status_value = Status.RESPONDED
elif 'SHOULD_TEST' in status_text:
status_value = Status.SHOULD_TEST
elif 'CUSTOMER' in status_text:
status_value = Status.CUSTOMER
elif 'TESTING' in status_text:
status_value = Status.TESTING
elif 'LOST' in status_text:
status_value = Status.LOST
elif 'DECLINED_TOO_EXPENSIVE' in status_text:
status_value = Status.DECLINED
declination_reason = DeclinationReason.TOO_EXPENSIVE
elif 'USE_OTHER_PRODUCT' in status_text:
status_value = Status.DECLINED
declination_reason = DeclinationReason.USE_OTHER_PRODUCT
elif 'USE_ANDROID' in status_text:
status_value = Status.DECLINED
declination_reason = DeclinationReason.USE_ANDROID
elif 'NOK' in status_text:
status_value = Status.DECLINED
declination_reason = DeclinationReason.UNKNOWN
elif 'DECLINED_UNRELATED' in status_text:
status_value = Status.DECLINED_UNRELATED
activity = Activity.objects.create(
type=ActivityType.SMS,
attachment_text=attachment_text,
status=status_value,
declination_reason=declination_reason,
description=f"Imported from CSV - Status: {status_text}" if status_text else "Imported from CSV"
)
activity.prospects.add(prospect)
created_events += 1
result = f"Successfully imported: {created_prospects} new prospects, {updated_prospects} updated prospects, {created_entities} new entities, {created_events} new events"
return result
except Exception as e:
raise Exception(f"Error processing CSV file: {str(e)}")
def send_email(self, request, queryset):
logger.info('send_email to prospects form initiated...')
if 'apply' in request.POST:
form = EmailTemplateSelectionForm(request.POST)
if form.is_valid():
email_template = form.cleaned_data['email_template']
self.process_selected_items_with_template(request, queryset, email_template)
self.message_user(request, f"Email sent to {queryset.count()} prospects using the '{email_template.name}' template.", messages.SUCCESS)
return HttpResponseRedirect(request.get_full_path())
else:
form = EmailTemplateSelectionForm()
return render(request, 'admin/biz/select_email_template.html', {
'prospects': queryset,
'form': form,
'title': 'Send Email to Prospects'
})
send_email.short_description = "Send email"
def process_selected_items_with_template(self, request, queryset, email_template):
sent_count = 0
error_emails = []
all_emails = []
logger.info(f'Sending email to {queryset.count()} users...')
for prospect in queryset:
mail_body = email_template.body.replace('{{name}}', prospect.first_name)
all_emails.append(prospect.email)
try:
send_mail(
email_template.subject,
mail_body,
request.user.email,
[prospect.email],
fail_silently=False,
)
sent_count += 1
except Exception as e:
error_emails.append(prospect.email)
time.sleep(1)
@admin.register(Activity)
class ActivityAdmin(SyncedObjectAdmin):
list_display = ('prospect_names', 'creation_date', 'status', 'type', 'description', 'attachment_text', )
list_filter = ('status', 'type')
search_fields = ('description',)
filter_horizontal = ('prospects',)
date_hierarchy = 'creation_date'
def get_form(self, request, obj=None, **kwargs):
form = super().get_form(request, obj, **kwargs)
# Pre-populate fields from URL parameters
if 'prospect' in request.GET:
try:
prospect_id = request.GET['prospect']
prospect = Prospect.objects.get(id=prospect_id)
form.base_fields['prospects'].initial = [prospect]
form.base_fields['related_user'].initial = request.user
# You can set other fields based on the prospect
# form.base_fields['title'].initial = f"Event for {prospect.}"
# form.base_fields['status'].initial = 'pending'
except (Prospect.DoesNotExist, ValueError):
pass
return form
def get_event_display(self, obj):
return str(obj)
get_event_display.short_description = 'Activity'
# @admin.register(EmailCampaign)
# class EmailCampaignAdmin(admin.ModelAdmin):
# list_display = ('subject', 'event', 'sent_at')
# list_filter = ('sent_at',)
# search_fields = ('subject', 'content')
# date_hierarchy = 'sent_at'
# readonly_fields = ('sent_at',)
# @admin.register(EmailTracker)
# class EmailTrackerAdmin(admin.ModelAdmin):
# list_display = (
# 'campaign',
# 'prospect',
# 'tracking_id',
# 'sent_status',
# 'opened_status',
# 'clicked_status'
# )
# list_filter = ('sent', 'opened', 'clicked')
# search_fields = (
# 'prospect__name',
# 'prospect__email',
# 'campaign__subject'
# )
# readonly_fields = (
# 'tracking_id', 'sent', 'sent_at',
# 'opened', 'opened_at',
# 'clicked', 'clicked_at'
# )
# date_hierarchy = 'sent_at'
# def sent_status(self, obj):
# return self._get_status_html(obj.sent, obj.sent_at)
# sent_status.short_description = 'Sent'
# sent_status.allow_tags = True
# def opened_status(self, obj):
# return self._get_status_html(obj.opened, obj.opened_at)
# opened_status.short_description = 'Opened'
# opened_status.allow_tags = True
# def clicked_status(self, obj):
# return self._get_status_html(obj.clicked, obj.clicked_at)
# clicked_status.short_description = 'Clicked'
# clicked_status.allow_tags = True
# def _get_status_html(self, status, date):
# if status:
# return format_html(
# '<span style="color: green;">✓</span> {}',
# date.strftime('%Y-%m-%d %H:%M') if date else ''
# )
# return format_html('<span style="color: red;">✗</span>')