from django.http import HttpResponseRedirect from django.contrib import admin from django.urls import path, reverse from django.contrib import messages from django.shortcuts import render, redirect from django.contrib.auth import get_user_model from django.utils.html import format_html from django.core.mail import send_mail import csv import io import time import logging from .models import Entity, Prospect, Activity, Status, ActivityType, EmailTemplate, DeclinationReason from .forms import FileImportForm, EmailTemplateSelectionForm from .filters import ProspectStatusFilter, StaffUserFilter, ProspectProfileFilter, ProspectDeclineReasonFilter from tournaments.models import CustomUser from tournaments.models.enums import UserOrigin from sync.admin import SyncedObjectAdmin User = get_user_model() logger = logging.getLogger(__name__) class ProspectInline(admin.StackedInline): model = Prospect.entities.through extra = 1 verbose_name = "Prospect" verbose_name_plural = "Prospects" autocomplete_fields = ['prospect'] @admin.register(Entity) class EntityAdmin(SyncedObjectAdmin): list_display = ('name', 'address', 'zip_code', 'city') search_fields = ('name', 'address', 'zip_code', 'city') # filter_horizontal = ('prospects',) inlines = [ProspectInline] @admin.register(EmailTemplate) class EmailTemplateAdmin(SyncedObjectAdmin): list_display = ('name', 'subject', 'body') search_fields = ('name', 'subject') def contacted_by_sms(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, ActivityType.SMS, Status.CONTACTED, None) contacted_by_sms.short_description = "Contacted by SMS" def mark_as_customer(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.CUSTOMER, None) mark_as_customer.short_description = "Mark as customer" def mark_as_should_test(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.SHOULD_TEST, None) mark_as_should_test.short_description = "Mark as should test" def mark_as_testing(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.TESTING, None) mark_as_testing.short_description = "Mark as testing" def declined_too_expensive(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.DECLINED, DeclinationReason.TOO_EXPENSIVE) declined_too_expensive.short_description = "Declined too expensive" def declined_use_something_else(modeladmin, request, queryset): create_default_activity_for_prospect(modeladmin, request, queryset, None, Status.DECLINED, DeclinationReason.USE_OTHER_PRODUCT) declined_use_something_else.short_description = "Declined use something else" def create_default_activity_for_prospect(modeladmin, request, queryset, type, status, reason): for prospect in queryset: activity = Activity.objects.create( type=type, status=status, declination_reason=reason, related_user = request.user ) activity.prospects.add(prospect) modeladmin.message_user( request, f'{queryset.count()} prospects were marked as {status}.' ) def create_activity_for_prospect(modeladmin, request, queryset): # Only allow single selection if queryset.count() != 1: messages.error(request, "Please select exactly one prospect.") return prospect = queryset.first() # Build the URL with pre-populated fields url = reverse('admin:biz_activity_add') url += f'?prospect={prospect.id}' return redirect(url) create_activity_for_prospect.short_description = "Create activity" @admin.register(Prospect) class ProspectAdmin(SyncedObjectAdmin): readonly_fields = ['related_activities', 'entity_names', 'current_status', 'id'] fieldsets = [ (None, { 'fields': ['related_activities', 'id', 'entity_names', 'first_name', 'last_name', 'email', 'phone', 'official_user', 'name_unsure', 'entities', 'related_user'] }), ] list_display = ('entity_names', 'first_name', 'last_name', 'email', 'last_update', 'current_status') list_filter = (ProspectStatusFilter, ProspectDeclineReasonFilter, 'creation_date', StaffUserFilter, 'source', ProspectProfileFilter) search_fields = ('first_name', 'last_name', 'email', 'entities__name') date_hierarchy = 'creation_date' change_list_template = "admin/biz/prospect/change_list.html" ordering = ['-last_update'] filter_horizontal = ['entities'] actions = ['send_email', create_activity_for_prospect, contacted_by_sms, mark_as_should_test, mark_as_testing, mark_as_customer, declined_too_expensive, declined_use_something_else] raw_id_fields = ['official_user'] def related_activities(self, obj): activities = obj.activities.all() if activities: activity_links = [] for activity in activities: url = f"/kingdom/biz/activity/{activity.id}/change/" activity_links.append(f'{activity.html_desc()}') return format_html('
'.join(activity_links)) return "No events" related_activities.short_description = "Related Activities" def get_urls(self): urls = super().get_urls() custom_urls = [ path('import_file/', self.admin_site.admin_view(self.import_file), name='import_file'), path('import_app_users/', self.admin_site.admin_view(self.import_app_users), name='import_app_users'), path('cleanup/', self.admin_site.admin_view(self.cleanup), name='cleanup'), ] return custom_urls + urls def cleanup(self, request): Entity.objects.all().delete() Prospect.objects.all().delete() Activity.objects.all().delete() messages.success(request, 'cleanup biz objects') return redirect('admin:biz_prospect_changelist') def import_app_users(self, request): users = CustomUser.objects.filter(origin=UserOrigin.APP) created_count = 0 for user in users: is_customer = user.purchases.count() > 0 entity_name = user.latest_event_club_name() prospect, prospect_created = Prospect.objects.get_or_create( email=user.email, defaults={ 'first_name': user.first_name, 'last_name': user.last_name, 'phone': user.phone, 'name_unsure': False, 'official_user': user, 'source': 'App', } ) if entity_name: entity, entity_created = Entity.objects.get_or_create( name=entity_name, defaults={'name': entity_name} ) prospect.entities.add(entity) if is_customer: activity = Activity.objects.create( status=Status.CUSTOMER, ) activity.prospects.add(prospect) if prospect_created: created_count += 1 messages.success(request, f'Imported {created_count} app users into prospects') return redirect('admin:biz_prospect_changelist') def import_file(self, request): """ Handle file import - displays form and processes file upload """ if request.method == 'POST': form = FileImportForm(request.POST, request.FILES) if form.is_valid(): # Call the import_csv method with the uploaded file try: result = self.import_csv(form.cleaned_data['file'], form.cleaned_data['source']) messages.success(request, f'File imported successfully: {result}') return redirect('admin:biz_prospect_changelist') except Exception as e: messages.error(request, f'Error importing file: {str(e)}') else: messages.error(request, 'Please correct the errors below.') else: form = FileImportForm() context = { 'form': form, 'title': 'Import File', 'app_label': self.model._meta.app_label, 'opts': self.model._meta, 'has_change_permission': self.has_change_permission(request), } return render(request, 'admin/biz/prospect/import_file.html', context) def import_csv(self, file, source): """ Process the uploaded CSV file CSV format: entity_name,last_name,first_name,email,phone,attachment_text,status,related_user """ try: # Read the file content file_content = file.read().decode('utf-8') csv_reader = csv.reader(io.StringIO(file_content)) created_prospects = 0 updated_prospects = 0 created_entities = 0 created_events = 0 for row in csv_reader: if len(row) < 8: continue # Skip rows that don't have enough columns entity_name = row[0].strip() last_name = row[1].strip() first_name = row[2].strip() email = row[3].strip() phone = row[4].strip() if row[4].strip() else None if phone and not phone.startswith('0'): phone = '0' + phone attachment_text = row[5].strip() if row[5].strip() else None status_text = row[6].strip() if row[6].strip() else None related_user_name = row[7].strip() if row[7].strip() else None # Create or get Entity entity = None if entity_name: entity, entity_created = Entity.objects.get_or_create( name=entity_name, defaults={'name': entity_name} ) if entity_created: created_entities += 1 # Get related user if provided related_user = None if related_user_name: try: related_user = User.objects.get(username=related_user_name) except User.DoesNotExist: # Try to find by first name if username doesn't exist related_user = User.objects.filter(first_name__icontains=related_user_name).first() # Create or update Prospect prospect, prospect_created = Prospect.objects.get_or_create( email=email, defaults={ 'first_name': first_name, 'last_name': last_name, 'phone': phone, 'name_unsure': False, 'related_user': related_user, 'source': source, } ) if prospect_created: created_prospects += 1 else: # Check if names are different and mark as name_unsure if (prospect.first_name != first_name or prospect.last_name != last_name): prospect.name_unsure = True # Update related_user if provided if related_user: prospect.related_user = related_user prospect.save() updated_prospects += 1 # Associate entity with prospect if entity: prospect.entities.add(entity) # Create Event if attachment_text or status is provided if attachment_text or status_text: # Map status text to Status enum status_value = None declination_reason = None if status_text: if 'CONTACTED' in status_text: status_value = Status.CONTACTED elif 'RESPONDED' in status_text: status_value = Status.RESPONDED elif 'SHOULD_TEST' in status_text: status_value = Status.SHOULD_TEST elif 'CUSTOMER' in status_text: status_value = Status.CUSTOMER elif 'TESTING' in status_text: status_value = Status.TESTING elif 'LOST' in status_text: status_value = Status.LOST elif 'DECLINED_TOO_EXPENSIVE' in status_text: status_value = Status.DECLINED declination_reason = DeclinationReason.TOO_EXPENSIVE elif 'USE_OTHER_PRODUCT' in status_text: status_value = Status.DECLINED declination_reason = DeclinationReason.USE_OTHER_PRODUCT elif 'USE_ANDROID' in status_text: status_value = Status.DECLINED declination_reason = DeclinationReason.USE_ANDROID elif 'NOK' in status_text: status_value = Status.DECLINED declination_reason = DeclinationReason.UNKNOWN elif 'DECLINED_UNRELATED' in status_text: status_value = Status.DECLINED_UNRELATED activity = Activity.objects.create( type=ActivityType.SMS, attachment_text=attachment_text, status=status_value, declination_reason=declination_reason, description=f"Imported from CSV - Status: {status_text}" if status_text else "Imported from CSV" ) activity.prospects.add(prospect) created_events += 1 result = f"Successfully imported: {created_prospects} new prospects, {updated_prospects} updated prospects, {created_entities} new entities, {created_events} new events" return result except Exception as e: raise Exception(f"Error processing CSV file: {str(e)}") def send_email(self, request, queryset): logger.info('send_email to prospects form initiated...') if 'apply' in request.POST: form = EmailTemplateSelectionForm(request.POST) if form.is_valid(): email_template = form.cleaned_data['email_template'] self.process_selected_items_with_template(request, queryset, email_template) self.message_user(request, f"Email sent to {queryset.count()} prospects using the '{email_template.name}' template.", messages.SUCCESS) return HttpResponseRedirect(request.get_full_path()) else: form = EmailTemplateSelectionForm() return render(request, 'admin/biz/select_email_template.html', { 'prospects': queryset, 'form': form, 'title': 'Send Email to Prospects' }) send_email.short_description = "Send email" def process_selected_items_with_template(self, request, queryset, email_template): sent_count = 0 error_emails = [] all_emails = [] logger.info(f'Sending email to {queryset.count()} users...') for prospect in queryset: mail_body = email_template.body.replace('{{name}}', prospect.first_name) all_emails.append(prospect.email) try: send_mail( email_template.subject, mail_body, request.user.email, [prospect.email], fail_silently=False, ) sent_count += 1 except Exception as e: error_emails.append(prospect.email) time.sleep(1) @admin.register(Activity) class ActivityAdmin(SyncedObjectAdmin): list_display = ('prospect_names', 'creation_date', 'status', 'type', 'description', 'attachment_text', ) list_filter = ('status', 'type') search_fields = ('description',) filter_horizontal = ('prospects',) date_hierarchy = 'creation_date' def get_form(self, request, obj=None, **kwargs): form = super().get_form(request, obj, **kwargs) # Pre-populate fields from URL parameters if 'prospect' in request.GET: try: prospect_id = request.GET['prospect'] prospect = Prospect.objects.get(id=prospect_id) form.base_fields['prospects'].initial = [prospect] form.base_fields['related_user'].initial = request.user # You can set other fields based on the prospect # form.base_fields['title'].initial = f"Event for {prospect.}" # form.base_fields['status'].initial = 'pending' except (Prospect.DoesNotExist, ValueError): pass return form def get_event_display(self, obj): return str(obj) get_event_display.short_description = 'Activity' # @admin.register(EmailCampaign) # class EmailCampaignAdmin(admin.ModelAdmin): # list_display = ('subject', 'event', 'sent_at') # list_filter = ('sent_at',) # search_fields = ('subject', 'content') # date_hierarchy = 'sent_at' # readonly_fields = ('sent_at',) # @admin.register(EmailTracker) # class EmailTrackerAdmin(admin.ModelAdmin): # list_display = ( # 'campaign', # 'prospect', # 'tracking_id', # 'sent_status', # 'opened_status', # 'clicked_status' # ) # list_filter = ('sent', 'opened', 'clicked') # search_fields = ( # 'prospect__name', # 'prospect__email', # 'campaign__subject' # ) # readonly_fields = ( # 'tracking_id', 'sent', 'sent_at', # 'opened', 'opened_at', # 'clicked', 'clicked_at' # ) # date_hierarchy = 'sent_at' # def sent_status(self, obj): # return self._get_status_html(obj.sent, obj.sent_at) # sent_status.short_description = 'Sent' # sent_status.allow_tags = True # def opened_status(self, obj): # return self._get_status_html(obj.opened, obj.opened_at) # opened_status.short_description = 'Opened' # opened_status.allow_tags = True # def clicked_status(self, obj): # return self._get_status_html(obj.clicked, obj.clicked_at) # clicked_status.short_description = 'Clicked' # clicked_status.allow_tags = True # def _get_status_html(self, status, date): # if status: # return format_html( # ' {}', # date.strftime('%Y-%m-%d %H:%M') if date else '' # ) # return format_html('')