Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ Configure filters to exclude data you don't want, ie.::
CITIES_LIGHT_INCLUDE_COUNTRIES = ['FR']
CITIES_LIGHT_INCLUDE_CITY_TYPES = ['PPL', 'PPLA', 'PPLA2', 'PPLA3', 'PPLA4', 'PPLC', 'PPLF', 'PPLG', 'PPLL', 'PPLR', 'PPLS', 'STLMT',]

To tune import performance, you can set ``CITIES_LIGHT_BULK_BATCH_SIZE`` (default: 500).
This controls the batch size for bulk inserts of Country, Region, and SubRegion during import.
Set to 0 to disable batching and use per-row saves::

CITIES_LIGHT_BULK_BATCH_SIZE = 500 # or 0 to disable

Now, run migrations, it will only create tables for models that are not
disabled::

Expand Down
46 changes: 31 additions & 15 deletions src/cities_light/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import os

from urllib.request import urlopen
from urllib.request import Request, urlopen
from urllib.parse import urlparse

from .exceptions import SourceFileDoesNotExist
Expand All @@ -30,10 +30,9 @@ def download(self, source: str, destination: str, force: bool = False):
return False
# If the files are different, download/copy happens
logger.info("Downloading %s into %s", source, destination)
source_stream = urlopen(source)
# wb: open as write and binary mode
with open(destination, "wb") as local_file:
local_file.write(source_stream.read())
with urlopen(source) as source_stream:
with open(destination, "wb") as local_file:
local_file.write(source_stream.read())

return True

Expand All @@ -56,20 +55,37 @@ def source_matches_destination(source: str, destination: str):
@staticmethod
def needs_downloading(source: str, destination: str, force: bool):
"""Return True if source should be downloaded to destination."""
src_file = urlopen(source)
src_size = int(src_file.headers["content-length"])
# Getting last modified timestamp
src_last_modified = time.strptime(
src_file.headers["last-modified"],
"%a, %d %b %Y %H:%M:%S %Z", # taking time with second
)
parsed = urlparse(source)

if parsed.scheme == "file":
source_path = os.path.abspath(os.path.join(parsed.netloc, parsed.path))
if not os.path.exists(source_path):
raise SourceFileDoesNotExist(source_path)
src_size = os.path.getsize(source_path)
src_mtime = os.path.getmtime(source_path)
src_last_modified = time.gmtime(src_mtime)
else:
# Use HEAD for http/https to avoid fetching the body
if parsed.scheme in ("http", "https"):
req = Request(source, method="HEAD")
src_file = urlopen(req)
else:
src_file = urlopen(source)

try:
src_size = int(src_file.headers["content-length"])
src_last_modified = time.strptime(
src_file.headers["last-modified"],
"%a, %d %b %Y %H:%M:%S %Z",
)
except (KeyError, TypeError, ValueError):
return True
finally:
src_file.close()

if os.path.exists(destination) and not force:
local_time = time.gmtime(os.path.getmtime(destination))
local_size = os.path.getsize(destination)
# Checking the timestamp of creation and the file size,
# if destination timestamp is equal or greater and the size
# is also equal then return falase as no need to download
if local_time >= src_last_modified and local_size == src_size:
return False
return True
156 changes: 153 additions & 3 deletions src/cities_light/management/commands/cities_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import progressbar

from ...settings import (
BULK_BATCH_SIZE,
COUNTRY_SOURCES,
REGION_SOURCES,
SUBREGION_SOURCES,
Expand All @@ -43,6 +44,7 @@
)
from ...exceptions import InvalidItems
from ...geonames import Geonames
from ...abstract_models import to_ascii
from ...loading import get_cities_models
from ...validators import timezone_validator

Expand Down Expand Up @@ -181,6 +183,9 @@ def progress_finish(self):
def handle(self, *args, **options):
# initialize lazy identity maps
self._clear_identity_maps()
self._country_insert_buffer = []
self._region_insert_buffer = []
self._subregion_insert_buffer = []

if not os.path.exists(DATA_DIR):
self.logger.info("Creating %s", DATA_DIR)
Expand Down Expand Up @@ -210,6 +215,11 @@ def handle(self, *args, **options):
# free some memory
self._clear_identity_maps()

# Flush insert buffers before switching to next source type
self._flush_country_buffer()
self._flush_region_buffer()
self._flush_subregion_buffer()

destination_file_name = url.split("/")[-1]

force = options.get("force_all", False)
Expand Down Expand Up @@ -311,6 +321,129 @@ def _clear_identity_maps(self):
lambda: collections.defaultdict(dict)
)

def _flush_country_buffer(self):
"""Flush country insert buffer with bulk_create."""
buffer = getattr(self, "_country_insert_buffer", [])
if not buffer:
return
self._country_insert_buffer = []
for country in buffer:
name_ascii = to_ascii(country.name).strip()
country.name_ascii = name_ascii or ""
seen = set()
to_insert = []
for country in buffer:
if country.geoname_id in seen:
continue
seen.add(country.geoname_id)
to_insert.append(country)
existing = set(
Country.objects.filter(
geoname_id__in=[x.geoname_id for x in to_insert]
).values_list("geoname_id", flat=True)
)
to_insert = [c for c in to_insert if c.geoname_id not in existing]
if to_insert:
with transaction.atomic():
Country.objects.bulk_create(to_insert)
for c in Country.objects.filter(
geoname_id__in=[x.geoname_id for x in buffer]
).values_list("code2", "pk"):
self._country_codes[c[0]] = c[1]

def _flush_region_buffer(self):
"""Flush region insert buffer with bulk_create."""
buffer = getattr(self, "_region_insert_buffer", [])
if not buffer:
return
self._region_insert_buffer = []
country_ids = {r.country_id for r in buffer}
countries = {
c["pk"]: c["name"]
for c in Country.objects.filter(pk__in=country_ids).values("pk", "name")
}
for region in buffer:
name_ascii = to_ascii(region.name).strip()
region.name_ascii = name_ascii or ""
region.display_name = "%s, %s" % (
region.name,
countries.get(region.country_id, ""),
)
seen = set()
to_insert = []
for region in buffer:
if region.geoname_id in seen:
continue
seen.add(region.geoname_id)
to_insert.append(region)
existing = set(
Region.objects.filter(
geoname_id__in=[x.geoname_id for x in to_insert]
).values_list("geoname_id", flat=True)
)
to_insert = [r for r in to_insert if r.geoname_id not in existing]
if to_insert:
with transaction.atomic():
Region.objects.bulk_create(to_insert)
for r in Region.objects.filter(
geoname_id__in=[x.geoname_id for x in buffer]
).values_list("country__code2", "geoname_code", "pk"):
if r[0] in self._country_codes:
self._region_codes[self._country_codes[r[0]]][r[1]] = r[2]

def _flush_subregion_buffer(self):
"""Flush subregion insert buffer with bulk_create."""
buffer = getattr(self, "_subregion_insert_buffer", [])
if not buffer:
return
self._subregion_insert_buffer = []
country_ids = {s.country_id for s in buffer if s.country_id}
countries = (
{
c["pk"]: c["name"]
for c in Country.objects.filter(pk__in=country_ids).values("pk", "name")
}
if country_ids
else {}
)
for subregion in buffer:
name_ascii = to_ascii(subregion.name).strip()
subregion.name_ascii = name_ascii or ""
subregion.display_name = "%s, %s" % (
subregion.name,
countries.get(subregion.country_id, ""),
)
seen = set()
to_insert = []
for subregion in buffer:
if subregion.geoname_id in seen:
continue
seen.add(subregion.geoname_id)
to_insert.append(subregion)
existing = set(
SubRegion.objects.filter(
geoname_id__in=[x.geoname_id for x in to_insert]
).values_list("geoname_id", flat=True)
)
to_insert = [s for s in to_insert if s.geoname_id not in existing]
if to_insert:
with transaction.atomic():
SubRegion.objects.bulk_create(to_insert)
for s in SubRegion.objects.filter(
geoname_id__in=[x.geoname_id for x in buffer]
).values_list("country__code2", "region__geoname_code", "geoname_code", "pk"):
country_id = self._country_codes.get(s[0])
if country_id is not None:
if s[1] not in self._region_codes[country_id]:
region_pk = (
Region.objects.filter(country_id=country_id, geoname_code=s[1])
.values_list("pk", flat=True)
.first()
)
if region_pk:
self._region_codes[country_id][s[1]] = region_pk
self._subregion_codes[country_id][s[1]][s[2]] = s[3]

def _get_country_id(self, country_code2):
"""
Simple lazy identity map for code2->country
Expand Down Expand Up @@ -388,7 +521,12 @@ def country_import(self, items):

country_items_post_import.send(sender=self, instance=country, items=items)

self.save(country, force_insert=force_insert, force_update=force_update)
if force_insert and BULK_BATCH_SIZE > 0:
self._country_insert_buffer.append(country)
if len(self._country_insert_buffer) >= BULK_BATCH_SIZE:
self._flush_country_buffer()
else:
self.save(country, force_insert=force_insert, force_update=force_update)

def region_import(self, items):
try:
Expand Down Expand Up @@ -437,7 +575,12 @@ def region_import(self, items):
region_items_post_import.send(sender=self, instance=region, items=items)

if save:
self.save(region, force_insert=force_insert, force_update=force_update)
if force_insert and BULK_BATCH_SIZE > 0:
self._region_insert_buffer.append(region)
if len(self._region_insert_buffer) >= BULK_BATCH_SIZE:
self._flush_region_buffer()
else:
self.save(region, force_insert=force_insert, force_update=force_update)

def subregion_import(self, items):

Expand Down Expand Up @@ -499,7 +642,14 @@ def subregion_import(self, items):
subregion_items_post_import.send(sender=self, instance=subregion, items=items)

if save:
self.save(subregion, force_insert=force_insert, force_update=force_update)
if force_insert and BULK_BATCH_SIZE > 0:
self._subregion_insert_buffer.append(subregion)
if len(self._subregion_insert_buffer) >= BULK_BATCH_SIZE:
self._flush_subregion_buffer()
else:
self.save(
subregion, force_insert=force_insert, force_update=force_update
)

def city_import(self, items):
try:
Expand Down
10 changes: 10 additions & 0 deletions src/cities_light/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@
for more information please visit #273


.. py:data:: BULK_BATCH_SIZE

Batch size for bulk_create during import. Set to 0 to disable batching
and use per-row saves. Default is 500. Applies to Country, Region, and
SubRegion inserts only; City and updates use per-row save.

.. py:data:: CITIES_LIGHT_APP_NAME

Modify it only if you want to define your custom cities models, that
Expand All @@ -111,6 +117,7 @@
from django.conf import settings

__all__ = [
"BULK_BATCH_SIZE",
"FIXTURES_BASE_URL",
"COUNTRY_SOURCES",
"REGION_SOURCES",
Expand Down Expand Up @@ -220,6 +227,9 @@
"file://{0}".format(os.path.join(DATA_DIR, "fixtures/")),
)

# Batch size for bulk_create during import. Set to 0 to disable batching.
BULK_BATCH_SIZE = getattr(settings, "CITIES_LIGHT_BULK_BATCH_SIZE", 500)


class ICountry:
"""
Expand Down
Loading